0


使用OGG实现Oracle到kafka数据同步(详细版本)

[toc]

一、环境规划

  • 实现目标:配置OGG,实现从Oracle到kafka数据增量同步。同步消息格式为json。

  • 主机规划:

身份

主机名

版本

IP

OGG版本

源端

orcldb

Release 11.2.0.4.0

172.16.10.152

OGG-12.3.0.1.4

目标端

kafka

kafka_2.11-1.1.1

172.16.100.241

OGG_BigData_Linux_x64_12.3.2.1.1

二、安装包下载

三、搭建过程

1.Oracle部署

1)系统配置:

#修改主机名,防火墙关闭,selinux关闭等

[root@172-16-10-152 ~]# hostnamectl set-hostname orcldb

[root@172-16-10-152 ~]# su -

Last login: Mon Aug 10 09:08:26 EDT 2020 from 172.16.10.110 on pts/0

[root@orcldb ~]# systemctl stop firewalld

[root@orcldb ~]# systemctl disable firewalld

Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.

Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.service.

[root@orcldb ~]# vim /etc/sysconfig/selinux

..

SELINUX=disabled

...

#修改相关参数

[root@orcldb ~]# vim /etc/security/limits.conf

oracle soft nproc 2047

oracle hard nproc 16384

oracle soft nofile 1024

oracle hard nofile 65536

[root@orcldb src]# vim /etc/sysctl.conf

[root@orcldb src]# sysctl -p

net.ipv6.conf.all.disable_ipv6 = 1

fs.aio-max-nr = 1048576

fs.file-max = 6815744

kernel.shmall = 2097152

kernel.shmmax = 536870912

kernel.shmmni = 4096

kernel.sem = 250 32000 100 128

net.ipv4.ip_local_port_range = 9000 65500

net.core.rmem_default = 262144

net.core.rmem_max = 4194304

net.core.wmem_default = 262144

net.core.wmem_max = 1048576

[root@orcldb ~]# vim /etc/pam.d/login

session required /lib64/security/pam_limits.so

session required pam_limits.so

[root@orcldb src]# vim /etc/profile

if [ $USER = "oracle" ]; then

if [ $SHELL = "/bin/ksh" ]; then

   ulimit -p 16384

   ulimit -n 65536

else

   ulimit -u 16384 -n 65536

fi

fi

[root@orcldb src]# source /etc/profile

2)安装相应软件包,创建用户,组以及相关文件夹

[root@orcldb ~]# yum -y install binutils compat-libcap1 compat-libstdc++-33 gcc gcc-c++ glibc glibc-devel ksh libaio libaio-devel libgcc libstdc++ libstdc++-devel libXi libXtst make sysstat unixODBC unixODBC-devel

[root@orcldb src]# groupadd -g 501 oinstall

[root@orcldb src]# groupadd -g 502 dba

[root@orcldb src]# groupadd -g 503 oper

[root@orcldb src]# useradd -u 502 -g oinstall -G dba,oper oracle

[root@orcldb src]# echo oracle | passwd --stdin oracle

Changing password for user oracle.

passwd: all authentication tokens updated successfully.

[root@orcldb src]# mkdir /u01/app/oracle -pv

[root@orcldb ~]# mkdir -pv /u01/app/oracle/product/11.2.0/db_home1

[root@orcldb ~]# chown oracle.oinstall /u01 -R

[root@orcldb src]# chown -R oracle.oinstall database/

3)静默安装Oracle

#挂载软件包,此软件包oracle已不再发行

挂载软件包

[root@orcldb ~]# mkdir /mnt/cdrom

[root@orcldb ~]# mount -o loop oracle_11204_x86_64_linux.iso /mnt/cdrom/

mount: /dev/loop0 is write-protected, mounting read-only

[root@orcldb cdrom]# cp ./* /usr/local/src/

[root@orcldb src]# yum -y install unzip

[root@orcldb src]# unzip p13390677_112040_Linux-x86-64_1of7.zip

[root@orcldb src]# unzip p13390677_112040_Linux-x86-64_2of7.zip

#配置Oracle响应文件

[oracle@orcldb src]#cat /usr/local/src/database/response/db_install.rsp

oracle.install.responseFileVersion=/oracle/install/rspfmt_dbinstall_response_schema_v11_2_0

oracle.install.option=INSTALL_DB_SWONLY

ORACLE_HOSTNAME=oracledb

UNIX_GROUP_NAME=oinstall

INVENTORY_LOCATION=/u01/app/oracle/inventory

SELECTED_LANGUAGES=en,zh_CN

ORACLE_HOME=/u01/app/oracle/product/11.2.0/db_home1

ORACLE_BASE=/u01/app/oracle

oracle.install.db.InstallEdition=EE

oracle.install.db.EEOptionsSelection=false

oracle.install.db.optionalComponents=oracle.rdbms.partitioning:11.2.0.4.0,oracle.oraolap:11.2.0.4.0,oracle.rdbms.dm:11.2.0.4.0,oracle.rdbms.dv:11.2.0.4.0,oracle.rdbms.lbac:11.2.0.4.0,oracle.rdbms.rat:11.2.0.4.0

oracle.install.db.DBA_GROUP=dba

oracle.install.db.OPER_GROUP=dba

oracle.install.db.CLUSTER_NODES=

oracle.install.db.isRACOneInstall=

oracle.install.db.racOneServiceName=

oracle.install.db.config.starterdb.type=

oracle.install.db.config.starterdb.globalDBName=

oracle.install.db.config.starterdb.SID=

oracle.install.db.config.starterdb.characterSet=AL32UTF8

oracle.install.db.config.starterdb.memoryOption=true

oracle.install.db.config.starterdb.memoryLimit=

oracle.install.db.config.starterdb.installExampleSchemas=false

oracle.install.db.config.starterdb.enableSecuritySettings=true

oracle.install.db.config.starterdb.password.ALL=

oracle.install.db.config.starterdb.password.SYS=

oracle.install.db.config.starterdb.password.SYSTEM=

oracle.install.db.config.starterdb.password.SYSMAN=

oracle.install.db.config.starterdb.password.DBSNMP=

oracle.install.db.config.starterdb.control=DB_CONTROL

oracle.install.db.config.starterdb.gridcontrol.gridControlServiceURL=

oracle.install.db.config.starterdb.automatedBackup.enable=false

oracle.install.db.config.starterdb.automatedBackup.osuid=

oracle.install.db.config.starterdb.automatedBackup.ospwd=

oracle.install.db.config.starterdb.storageType=

oracle.install.db.config.starterdb.fileSystemStorage.dataLocation=

oracle.install.db.config.starterdb.fileSystemStorage.recoveryLocation=

oracle.install.db.config.asm.diskGroup=

oracle.install.db.config.asm.ASMSNMPPassword=

MYORACLESUPPORT_USERNAME=

MYORACLESUPPORT_PASSWORD=

SECURITY_UPDATES_VIA_MYORACLESUPPORT=

DECLINE_SECURITY_UPDATES=true

PROXY_HOST=

PROXY_PORT=

PROXY_USER=

PROXY_PWD=

PROXY_REALM=

COLLECTOR_SUPPORTHUB_URL=

oracle.installer.autoupdates.option=SKIP_UPDATES

oracle.installer.autoupdates.downloadUpdatesLoc=

AUTOUPDATES_MYORACLESUPPORT_USERNAME=

AUTOUPDATES_MYORACLESUPPORT_PASSWORD=

静默安装

[oracle@orcldb database]#./runInstaller -silent -force -ignoresysprereqs -ignoreprereq -responseFile /usr/local/src/database/response/db_install.rsp

...

The installation of Oracle Database 11g was successful.

Please check '/u01/app/oracle/inventory/logs/silentInstall2020-08-10_09-33-29AM.log' for more details.

As a root user, execute the following script(s):

    1. /u01/app/oracle/inventory/orainstRoot.sh

    2. /u01/app/oracle/product/11.2.0/db_home1/root.sh

Successfully Setup Software.

#执行相应脚本

[root@orcldb database]# /u01/app/oracle/inventory/orainstRoot.sh

Changing permissions of /u01/app/oracle/inventory.

Adding read,write permissions for group.

Removing read,write,execute permissions for world.

Changing groupname of /u01/app/oracle/inventory to oinstall.

The execution of the script is complete.

[root@orcldb database]# /u01/app/oracle/product/11.2.0/db_home1/root.sh

Check /u01/app/oracle/product/11.2.0/db_home1/install/root_orcldb_2020-08-10_09-37-42.log for the output of root script

[oracle@orcldb database]$ netca /silent /responsefile /usr/local/src/database/response/netca.rsp

Check the trace file for details: /u01/app/oracle/cfgtoollogs/netca/trace_OraDb11g_home1-2008109AM3839.log

Oracle Net Services configuration failed. The exit code is 1

#配置database响应文件

/usr/local/src/database/response/dbca.rsp

[GENERAL]

RESPONSEFILE_VERSION = "11.2.0"

OPERATION_TYPE = "createDatabase"

[CREATEDATABASE]

GDBNAME = "orcl"

SID = "orcl"

TEMPLATENAME = "General_Purpose.dbc"

STORAGETYPE=FS

DATAFILEDESTINATION =/u01/app/oracle/oradata

CHARACTERSET = ZHS16GBK"

NATIONALCHARACTERSET= "AL16UTF16"

LISTENERS=LISTENER

TOTALMEMORY = "90000"

SYSPASSWORD = "oracle"

SYSTEMPASSWORD = "oracle"

database静默安装

[oracle@orcldb response]$ dbca -silent -responseFile /usr/local/src/database/response/dbca.rsp

安装完成

SQL> select * from v$version;

BANNER


Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production

PL/SQL Release 11.2.0.4.0 - Production

CORE 11.2.0.4.0 Production

TNS for Linux: Version 11.2.0.4.0 - Production

NLSRTL Version 11.2.0.4.0 - Production

4)开启归档,附加日志等

SQL> alter database archivelog;

Database altered.

SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

Database altered.

SQL> Alter database force logging;

Database altered.

SQL> ALTER DATABASE OPEN;

Database altered.

SQL> ALTER SYSTEM ARCHIVE LOG CURRENT;

System altered.

SQL> select LOG_MODE, SUPPLEMENTAL_LOG_DATA_MIN,FORCE_LOGGING from v$database;

LOG_MODE SUPPLEME FOR


ARCHIVELOG YES YES

2.Oracle OGG部署

1)创建相关路径,解压安装包,授权

[root@orcldb ~]# mkdir /ogg/ogginstall -p

[root@orcldb ~]# chown -R oracle.oinstall /ogg/ogginstall

[root@orcldb ~]# cd /ogg/ogginstall/

[root@orcldb ogginstall]# pwd

/ogg/ogginstall

[root@orcldb ~]# mv 123014_fbo_ggs_Linux_x64_shiphome.zip /home/oracle/

[root@orcldb ~]# cd /home/oracle

[root@orcldb oracle]# ls

123014_fbo_ggs_Linux_x64_shiphome.zip

[root@orcldb oracle]# chmown oracle.oinstall 123014_fbo_ggs_Linux_x64_shiphome.zip

-bash: chmown: command not found

[root@orcldb oracle]# chown oracle.oinstall 123014_fbo_ggs_Linux_x64_shiphome.zip

[root@orcldb oracle]# ll

total 331876

-rw-r--r--. 1 oracle oinstall 339837611 Aug 10 05:40 123014_fbo_ggs_Linux_x64_shiphome.zip

[root@orcldb oracle]# pwd

/home/oracle

[oracle@orcldb ~]$ chmod 755 -R fbo_ggs_Linux_x64_shiphome/

2)编辑响应文件

[oracle@orcldb response]$ cp oggcore.rsp oggcore.rsp.bak

[oracle@orcldb response]$ pwd

/home/oracle/fbo_ggs_Linux_x64_shiphome/Disk1/response

oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2

INSTALL_OPTION=ORA11g

SOFTWARE_LOCATION=/ogg/ogginstall

START_MANAGER=true

MANAGER_PORT=7809

DATABASE_LOCATION=/u01/app/oracle/product/11.2.0/db_home1

INVENTORY_LOCATION=/u01/app/oracle/inventory

UNIX_GROUP_NAME=oinstall

3)配置环境变量

[oracle@orcldb ~]$ vim .bash_profile

...

export PATH=/ogg/oggoinstall:$PATH

export LD_LIBRARY_PATH=$ORACLE_HOME/lib

4)静默安装

[oracle@orcldb Disk1]$ ./runInstaller -silent -nowait -responseFile /home/oracle/fbo_

ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp

Starting Oracle Universal Installer...

Checking Temp space: must be greater than 120 MB. Actual 17853 MB Passed

Checking swap space: must be greater than 150 MB. Actual 3967 MB Passed

...

5)创建OGG需要的目录

GGSCI (orcldb) 2> create subdirs

Creating subdirectories under current directory /ogg/ogginstall

Parameter file /ogg/ogginstall/dirprm: created.

Report file /ogg/ogginstall/dirrpt: created.

Checkpoint file /ogg/ogginstall/dirchk: created.

Process status files /ogg/ogginstall/dirpcs: created.

SQL script files /ogg/ogginstall/dirsql: created.

Database definitions files /ogg/ogginstall/dirdef: created.

Extract data files /ogg/ogginstall/dirdat: created.

Temporary files /ogg/ogginstall/dirtmp: created.

Credential store files /ogg/ogginstall/dircrd: created.

Masterkey wallet files /ogg/ogginstall/dirwlt: created.

Dump files /ogg/ogginstall/dirdmp: created.

3.kafka OGG部署

1)创建软件安装目录,解压安装包

[root@kafka ~]# mkdir -p /opt/ogg

[root@kafka ~]# tar xf OGG_BigData_Linux_x64_12.3.2.1.1.tar -C /opt/ogg/

2)编辑环境变量

[root@kafka ~]# vim /etc/profile

export OGG_HOME=/opt/ogg

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$OGG_HOME/lib

export PATH=$OGG_HOME:$PATH

[root@kafka ~]# source /etc/profile

#配置完成

[root@kafka ~]# ggsci

Oracle GoldenGate Command Interpreter

Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305

Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09

Operating system character set identified as UTF-8.

Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.

GGSCI (kafka) 1>

3)创建OGG需要的目录

GGSCI (kafka) 1> create subdirs

Creating subdirectories under current directory /opt/ogg

Parameter file /opt/ogg/dirprm: created.

Report file /opt/ogg/dirrpt: created.

Checkpoint file /opt/ogg/dirchk: created.

Process status files /opt/ogg/dirpcs: created.

SQL script files /opt/ogg/dirsql: created.

Database definitions files /opt/ogg/dirdef: created.

Extract data files /opt/ogg/dirdat: created.

Temporary files /opt/ogg/dirtmp: created.

Credential store files /opt/ogg/dircrd: created.

Masterkey wallet files /opt/ogg/dirwlt: created.

Dump files

4.OGG配置(含测试准备)

Oracle配置:

1)Oracle创建专用的表空间,并创建复制用户授权

#创建表空间数据文件存放目录

[root@orcldb ~]# mkdir -p /u01/app/oracle/oggdata/orcl

[root@orcldb ~]# chown -R oracle:oinstall /u01/app/oracle/oggdata/orcl

#创建表空间

SQL> create tablespace oggtbs datafile '/u01/app/oracle/oggdata/orcl/oggtbs01.dbf' size 1000M autoextend on;

Tablespace created.

#创建用户并授权

SQL> create user ogg identified by ogg default tablespace oggtbs;

User created.

SQL> grant dba to ogg;#这里权限实际应该比较严谨,只是为了方便

Grant succeeded.

2)Oracle配置OGG全局变量

[oracle@orcldb ogginstall]$ ./ggsci

GGSCI (orcldb) 1> dblogin userid ogg password ogg

Successfully logged into database.

GGSCI (orcldb as ogg@orcl) 2> edit param ./globals

oggschema ogg

allowOutputDir /opt/ogg/dirdat --这里这个是目标端复制文件的位置 OGG12的新特性

3)Oracle配置OGG管理器mgr

GGSCI (orcldb as ogg@orcl) 4> edit param mgr

PORT 7809 --PORT mgr的默认监听端口

DYNAMICPORTLIST 7810-7909 --DYNAMICPORTLIST 动态端口集,当上述指定的端口不可用时在这个区间选择1个,最大范围256

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 --表示重启所有EXTRACT进程,每次间隔3分钟 最多可以执行5次

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 --trail文件定期清理

4)Oracle创建测试用户和测试表

SQL> create user test_ogg identified by test_ogg default tablespace users;

User created.

SQL> grant dba to test_ogg;

Grant succeeded.

SQL> conn test_ogg/test_ogg;

Connected.

SQL> create table test_ogg(id int ,name varchar(20),primary key(id));

Table created.

5)Oracle配置OGG添加待复制表

GGSCI (orcldb as ogg@orcl) 5> add trandata test_ogg.test_ogg

2020-08-19 22:32:12 INFO OGG-15132 Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.

2020-08-19 22:32:12 INFO OGG-15133 TRANDATA for scheduling columns has been added on table TEST_OGG.TEST_OGG.

2020-08-19 22:32:12 INFO OGG-15135 TRANDATA for instantiation CSN has been added on table TEST_OGG.TEST_OGG.

GGSCI (orcldb as ogg@orcl) 6> info trandata test_ogg.test_ogg

Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.

Columns supplementally logged for table TEST_OGG.TEST_OGG: "ID".

Prepared CSN for table TEST_OGG.TEST_OGG: 1397441

6)Oracle配置OGG extract进程,并添加extract进程

GGSCI (orcldb as ogg@orcl) 7> edit param extkafka

extract extkafka --进程名称

dynamicresolution --动态解析

SETENV (ORACLE_SID = "orcl") --设置环境变量,Oracle sid

SETENV (NLS_LANG = "american_america.AL32UTF8") --设置环境变量,字符集

userid ogg,password ogg --OGG连接数据库的用户和密码

exttrail /ogg/ogginstall/dirdat/to --trail文件的保存路径和文件名,注意只能2位其余由OGG不齐。一共8位。

table test_ogg.test_ogg; --table 指的就是表名,支持通配符*,注意必须用;结尾

添加extract进程

GGSCI (orcldb as ogg@orcl) 8> add extract extkafka,tranlog,begin now

EXTRACT added.

添加trail文件与extract绑定

GGSCI (orcldb as ogg@orcl) 9> add exttrail /ogg/ogginstall/dirdat/to,extract extkafka

EXTTRAIL added.

7)Oracle配置OGG pump进程(与extract类似用来发送trail到目标端)

GGSCI (orcldb as ogg@orcl) 10> edit param pukafka

extract pukafka --进程名称

passthru --禁止OGG与Oracle交互

dynamicresolution --动态解析

userid ogg,password ogg --OGG连接数据库的用户和密码

rmthost 172.16.100.241 mgrport 7809 --目标端kafka mgr服务的地址和监听端口

rmttrail /opt/ogg/dirdat/to --目标端trail文件存储位置,这里和源端存储位置不同,这也是前面全局变量为什么加上新参数的原因

table test_ogg.test_ogg; --table 指的就是表名,支持通配符*,注意必须用;结尾

#分别将本地trail文件和目标端的trail文件绑定到extract进程

GGSCI (orcldb as ogg@orcl) 11> add extract pukafka,exttrailsource /ogg/ogginstall/dirdat/to

EXTRACT added.

GGSCI (orcldb as ogg@orcl) 12> add rmttrail /opt/ogg/dirdat/to,extract pukafka

RMTTRAIL added.

8)Oracle配置define文件

Oracle与MySQL,Hadoop集群(HDFS,Hive,kafka等)等之间数据传输可以定义为异构数据类型的传输,故需要定义表之间的关系映射

GGSCI (orcldb) 4> edit param test_ogg

defsfile /ogg/ogginstall/dirdef/test_ogg.test_ogg

userid ogg,password ogg

table test_ogg.test_ogg;

在Oracle OGG主目录下执行

[oracle@orcldb ogginstall]$ pwd

/ogg/ogginstall

[oracle@orcldb ogginstall]$ ./defgen paramfile dirprm/test_ogg.prm

...


** Running with the following parameters **


defsfile /ogg/ogginstall/dirdef/test_ogg.test_ogg

Source Context :

SourceModule : [defgen.main]

SourceID : [/scratch/aime/adestore/views/aime_adc00jza/oggcore/OpenSys/src/app/defgen/defgen.c]

SourceMethod : [create_defgen_file]

SourceLine : [1522]

ThreadBacktrace : [12] elements

                      : [./defgen(ggs::gglib::MultiThreading::MainThread::Run(int, char**))]

                      : []

                      : [/lib64/libc.so.6(__libc_start_main)]

                      : [./defgen(_ZN3ggs5gglib14MultiThreading6Thread9RunThreadEPNS2_10ThreadArgsE+0x173) [0x4c46b3]]

                      : [ng::MainThread::Run(int, char**)]

                      : [./defgen()]

                      : [./defgen(ggs::gglib::MultiThreading::MainThread::Run(int, char**))]

                      : [./defgen(ggs::gglib::MultiThreading::Thread::RunThread(ggs::gglib::MultiThreading::Thread::ThreadArgs*))�]

                      : [./defgen(ggs::gglib::MultiThreading::MainThread::Run(int, char**))]

                      : [./defgen()]

                      : [/lib64/libc.so.6(__libc_start_main)]

                      : [./defgen()]

2020-08-20 15:49:24 ERROR OGG-00037 DEFSFILE file /ogg/ogginstall/dirdef/test_ogg.test_ogg already exists.

#这里我已经生成完了 所以报错。把上面的test_ogg.test_ogg发送给目标端相应目录

2020-08-20 15:49:24 ERROR OGG-01668 PROCESS ABENDING.

[oracle@orcldb dirdef]$ scp -r ./test_ogg.test_ogg root@172.16.100.241:/opt/ogg/dirdef/

root@172.16.100.241's password:

test_ogg.test_ogg

kafka配置:(相同参数不再说明)

1)启动kafka服务

[root@kafka kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties

[root@kafka kafka]# bin/kafka-server-start.sh config/server.properties

[root@kafka kafka]# jps

26305 ScheduleMain

933 Jps

17126 ValidWebMain

9159 NodeManager

12779 JournalNode

13867 QuorumPeerMain

17197 StreamAppWebMain

20247 start.jar

20120 Kafka

30521 EngineMain

13626 DataNode

7452 JobHistoryServer

24573

2)kafka配置管理器mgr

GGSCI (kafka) 1> edit param mgr

PORT 7809

DYNAMICPORTLIST 7810-7909

AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3

PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3

3)kafka配置checkpoint

checkpoint复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。

GGSCI (kafka) 2> edit param ./GLOBALS

CHECKPOINTTABLE test_ogg.checkpoint

4)kafka配置replicate进程

GGSCI (kafka) 3> edit param rekafka

REPLICAT rekafka --relicate进程名字

sourcedefs /ogg/ogginstall/dirdef/test_ogg.test_ogg --define的映射文件

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props --TARGETDB LIBFILE即定义kafka一些库文件以及配置文件位置

REPORTCOUNT EVERY 1 MINUTES, RATE --复制任务的报告生成频率

GROUPTRANSOPS 10000 --以事务传输时,事务合并的单位,减少IO操作

MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg; --源端与目标端的映射关系

5)设置kafka的配置文件

[root@kafka ogg]# cd /opt/ogg/dirprm/

[root@kafka dirprm]# ls

[root@kafka dirprm]# vim kafka.props

gg.handlerlist=kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

gg.handler.kafkahandler.topicMappingTemplate=test_ogg #kafka topic名称,OGG12有修改

gg.handler.kafkahandler.format=json

gg.handler.kafkahandler.mode=op

gg.classpath=dirprm/:/opt/dtstack/DTBase/kafka/libs/:/opt/ogg/:/opt/ogg/lib/

[root@kafka dirprm]# pwd

/opt/ogg/dirprm

[root@kafka dirprm]# vim custom_kafka_producer.properties

bootstrap.servers=172.16.100.241:9092

acks=1

compression.type=gzip

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=102400

linger.ms=10000

6)kafka配置添加trail文件到replicate进程

GGSCI (kafka) 1> add replicat rekafka exttrail /opt/ogg/dirdat/to,checkpointtable test_ogg.checkpoint

REPLICAT added.

四、测试

启动两端进程:按照以下顺序

   源端mgr ---> 目标端mgr ---> 源端extract ---> 源端pump ---> 目标replicate

   本例中在OGG命令行中执行

GGSCI (orcldb) 1> start mgr

GGSCI (kafka) 1> start mgr

GGSCI (orcldb) 2> start extkafka

GGSCI (orcldb) 3> start pukafka

GGSCI (kafka) 2> start rekafka

查看所有进程状态:

GGSCI (orcldb) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:08

EXTRACT RUNNING PUKAFKA 00:00:00 00:00:00

GGSCI (kafka) 1> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

REPLICAT RUNNING REKAFKA 00:00:00 00:00:02

插入测试:

五、错误分析

PUMP(PUKAFKA)进程状态为abended,使用view report PUKAFKA查看报错信息如下:2020-08-20 09:45:47 ERROR OGG-25127 Received an error reply requesting a graceful shutdown. (Reply received
is 'GSOutput file /ogg/ogginstall/dirdat/to000000 is not in any allowed output directories.'.).

建议两端trail保存路径相同,避免混淆。这里路径配置文件是源端的实际应该是目标端。

百度对这类问题解释很全面

标签: oracle kafka linux

本文转载自: https://blog.csdn.net/Kayleigh520/article/details/128547855
版权归原作者 创作者mateo 所有, 如有侵权,请联系我们删除。

“使用OGG实现Oracle到kafka数据同步(详细版本)”的评论:

还没有评论