0


【原创】OGG21.3 CENTOS配置ORACLE 19C ADG至KAFKA

一、环境及OGG安装

客户目前的操作系统环境为AIX 7.2 ,硬件为IBM小机。Oracle环境使用rac+adg单机部署,rac使用asm管理磁盘。部署goldengate为了降低对主库的影响,因此部署在dg节点。

原文写在移动云盘笔记上,贴过来图片不会自动更过来,因此把截图删除了,大家凑活看看吧,整整三天从无到有。

目标库为kafaka,最终给flink用于消费产生的数据。

由于小机环境不好模拟,以下为本次技术验证所提供的环境

主机ip

配置

系统版本

Oracle/kafka版本

Goldengate版本

备注

10.89.136.3

4c 16g 60g系统盘 500g数据盘

Centos 7.8

Oracle 19.3 for linux

Adg主库

10.89.136.4

4c 16g 60g系统盘 500g数据盘

Centos 7.8

Oracle 19.3 for linux

Oracle GoldenGate 21.3.0.0.0 for Oracle on Linux x86-64

Adg从库

10.89.136.5

4c 16g 60g系统盘 500g数据盘

Centos 7.8

kafka_2.13-3.7.0

Oracle GoldenGate for Big Data 21.4.0.0.0 on Linux x86-64

目标库kafka

以下为从库adg安装ogg整体实施步骤。

1.上传安装文件及编辑响应文件

源端:

传输安装包213000_fbo_ggs_Linux_x64_Oracle_shiphome.zip至 /opt/目录

解压安装

mkdir -p /u01/app/ogg

chown -R oracle:oinstall /u01/app/ogg

su - oracle

cd /opt

unzip 213000_fbo_ggs_Linux_x64_Oracle_shiphome.zi

cd fbo_ggs_Linux_x64_Oracle_shiphome/Disk1

ll -h

pwd

cat response/oggcore.rsp | grep -Ev "^#|^$"

#备份响应文件

cp response/oggcore.rsp response/oggcore_bak.rsp

#编辑响应文件

vim response/oggcore.rsp

2.配置OGG环境变量

配置oracle的环境变量文件/home/oracle/.bash_profile里配置,为了怕出问题,我把OGG_HOME等环境变量在/etc/profile配置了一份。

vim /home/oracle/.bash_profile

文件内容如下:

.bash_profile

Get the aliases and functions

if [ -f ~/.bashrc ]; then

    . ~/.bashrc

fi

User specific environment and startup programs

PATH=$PATH:$HOME/.local/bin:$HOME/bin

export PATH

export ORACLE_BASE=/u01/app/oracle

export ORACLE_HOME=/u01/app/oracle/product/19.3.0

export PATH=$PATH:$ORACLE_HOME/bin:/usr/local/bin

注意这里是你的主机,注意修改否则创建监听会出问题

export ORACLE_HOSTNAME=p19cstd

注意这里是你的主机,注意修改否则创建监听会出问题

export ORACLE_SID=p19cstd

export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$ORACLE_HOME/rdbms/lib:$ORACLE_HOME/network/lib:/lib:/usr/lib

export CLASSPATH=$ORACLE_HOME/jlib:$ORACLE_HOME/rdbms/jlib:$ORACLE_HOME/network/jlib

export NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8"

#rlwrap

alias sqlplus='rlwrap sqlplus'

alias rman='rlwrap rman'

#for ogg

export OGG_HOME=/u01/app/ogg

export PATH=$OGG_HOME/bin:$PATH:$ORACLE_HOME/bin

export LD_LIBRARY_PATH=$OGG_HOME:$ORACLE_HOME/lib:/usr/lib

alias ggsci='cd $OGG_HOME;ggsci'

执行生效

cd ~

source .bash_profile

进入ogg目录 ,执行静默安装

cd /opt/fbo_ggs_Linux_x64_Oracle_shiphome/Disk1

#填写oggcore.rsp的绝对路径地址 ./runInstaller -silent -responseFile /opt/fbo_ggs_Linux_x64_Oracle_shiphome/Disk1/response/oggcore.rsp

提示交换空间不足,用root用户配置2G交换空间。

使用root用户操作:

创建2G的交换文件
dd if=/dev/zero of=/swapfile bs=4096 count=512k

#格式化swapfile交换分区

mkswap /swapfile

#为了swapfile交换分区的安全 需要设置权限

chown root:root /swapfile

chmod 0600 /swapfile

#开启swapfile交换分区 前提必须格式化

swapon /swapfile

#进行查看swapfile文件是否生效

free -h

永久生效

#设置开机自动挂载交换分区 /etc/fstab 添加以下行

/swapfile swap swap defaults 0 0

使用

mount -a 来检测文件是否正确配置。

切换回oracle用户

su - oracle

执行安装,报错,需要重新选择目录。

原因为响应文件中,没有指定ogg的目录,继续更改响应文件,最终响应文件如下图

继续执行安装

./runInstaller -silent -responseFile /opt/fbo_ggs_Linux_x64_Oracle_shiphome/Disk1/response/oggcore.rsp

提示安装成功。

查看依赖是否正常

ldd /u01/app/ogg/ggsci

至此源端ogg软件已经安装完毕。

下面来配置源端oracle数据库。

需要在主库上配置一些设置,进入 10.89.136.3 主库,查看当前是否为归档模式

以下操作均在主库上执行:

archive log list

如下图,默认为归档状态。

Oracle打开日志相关

OGG基于辅助日志等进行实时传输,故需要打开相关日志确保可获取事务内容,通过下面的命令查看该状态

select force_logging,supplemental_log_data_min from v$database;

若都为NO,则需要通过命令修改

alter database force logging;

alter database add supplemental log data;

如上图,强制日志模式为打开状态,故只需要执行第二条打开辅助日志语句。

再次查看日志状态

在adg备库dg上可以看到,日志状态已经同步更改

oracle创建复制用户

建立目录及数据文件

首先root用户建立相关文件夹,并赋予权限,主库及从库同步操作。

mkdir -p /u01/app/oggdata/p19c
chown -R oracle:oinstall /u01/app/oggdata/p19c
chown -R oracle:oinstall /u01/app/oggdata

主库建立表空间文件及ogg用户及授权。

切换为oracle用户

su - oracle

sqlplus / as sysdba

重要: 检查备库standby文件的管理方式,不能为manual,否则会导致备库不同步,dg库可能到时要重搭。

show parameter STANDBY_FILE_MANAGEMENT;

如果为manual,则改为AUTO。执行如下语句

alter system set standby_file_management=AUTO scope=both;

执行以下语句,注意实际过程中此用户密码可以设置为永不过期,否则受到密码过期影响。

create tablespace oggtbs datafile '/u01/app/oggdata/p19c/oggtbs01.dbf' size 1000M autoextend on;

create profile oggprofile limit PASSWORD_LIFE_TIME UNLIMITED;

create user ogg identified by oggL123 default tablespace oggtbs profile oggprofile;

#由于ogg用户所需权限比较多,默认给与dba权限,生产环境可以安装实际需求进行授权。

grant connect,resource,dba to ogg;

备注:

如果不给与dba权限,可能需要如下权限

grant connect,resource,unlimited tablespace to ogg;

grant executeon utl_file to ogg;

grant select any dictionary,select any table to ogg;

grant alter any table to ogg;

grant flashback any table to ogg;

grant execute on DBMS_FLASHBACK to ogg;

修改GOLDENGATE参数

(主库是否可以不做更改,只在dg库上做更改)

alter system set enable_goldengate_replication=true scope=both sid='*';

DG库上更改一下goldengate参数

alter system set enable_goldengate_replication=true scope=both sid='*';

数据库stream参数调整,一个进程建议1.25g左右,由于建库时只使用了3g内存,因此我配置为1g

alter system set streams_pool_size=1g; 
由于要从dg库抽取日志文件,extract抽取时主要加入参数,
TranlogOptions MINEFROMACTIVEDG
ogg版本12.1.2.1.0 及以后版本支持,但只支持oracle asm用户及经典抽取模式,不支持集成抽取模式。
因此本环境配置为从主库抽取。
数据库源端启用scott用户来验证同步过程。
alter session set "_ORACLE_SCRIPT"=true;
创建scott.sql脚本
@/u01/app/oracle/product/19.3.0/rdbms/admin/scott.sql
退出,重新登录
sqlplus / as  sysdba
alter user SCOTT account unlock;
密码为TIGER 区分大小写
2、源端创建测试用表:
在scott下创建测试表kafka:
create table KAFKA
(
  empno    NUMBER(4) not null,
  ename    VARCHAR2(10),
  job      VARCHAR2(9),
  mgr      NUMBER(4),
  hiredate DATE,
  sal      NUMBER(7,2),
  comm     NUMBER(7,2),
  deptno   NUMBER(2)
);
alter table KAFKA
  add constraint PK_KAFKA primary key (EMPNO);
插入测试数据:
insert into kafka select * from emp where sal is not null;
select * from kafka;
 

3、源端增加配置管理、抽取、投递进程
3.1、添加kafka表附加日志
./ggsci
dblogin userid [email protected]:1521/p19c password oggL123
#添加表
add trandata scott.kafka
info trandata scott.kafka

3.2、配置OGG的全局变量

GGSCI (source as ogg@orcl) 6> edit param ./globals

加入下面内容:

oggschema ogg

3.3、配置MGR进程

edit params mgr

加入如下内容:

PORT 7809

DYNAMICPORTLIST 7810-7860

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5

PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 30

lagreporthours 1

laginfominutes 30

lagcriticalminutes 60

启动mgr进程

start mgr

创建心跳表,用于监控同步进度,这步可选,但最好加上,后面监控会方便很多

GGSCI >ADD HEARTBEATTABLE

3.4、编辑抽取进程

edit params e_ka 添加如下内容

extract e_ka

dblogin userid ogg@10.89.136.3:1521/p19c password oggL123

setenv(NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8")

setenv(ORACLE_SID="p19c")

reportcount every 30 minutes,rate

numfiles 5000

discardfile ./dirrpt/e_ka.dsc,append,megabytes 1000

warnlongtrans 2h,checkinterval 30m

exttrail ./dirdat/ka

dboptions allowunusedcolumn

tranlogoptions archivedlogonly

tranlogoptions altarchivelogdest primary /u01/app/archive

dynamicresolution

fetchoptions nousesnapshot

ddl include mapped

ddloptions addtrandata,report

notcpsourcetimer

NOCOMPRESSDELETES

NOCOMPRESSUPDATES

GETUPDATEBEFORES

----------scott.kafka

table SCOTT.KAFKA,tokens(

TKN-CSN = @GETENV('TRANSACTION', 'CSN'),

TKN-COMMIT-TS = @GETENV ('GGHEADER', 'COMMITTIMESTAMP'),

TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')

);

3.5、添加抽取进程

add extract e_ka,tranlog,begin now

add exttrail ./dirdat/ka,extract e_ka,megabytes 500

启动抽取进程

start e_ka

info all 查看

查看报告:

view report E_KA 查看是哪边配置错误

适应实际环境,去除对应参数,最终配置如下:

extract e_ka

userid ogg@10.89.136.3:1521/p19c password oggL123

setenv(NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8")

setenv(ORACLE_SID="p19c")

reportcount every 30 minutes,rate

numfiles 5000

discardfile ./dirrpt/e_ka.dsc,append,megabytes 1000

warnlongtrans 2h,checkinterval 30m

exttrail ./dirdat/ka

dboptions allowunusedcolumn

fetchoptions nousesnapshot

ddl include mapped

ddloptions addtrandata,report

notcpsourcetimer

NOCOMPRESSDELETES

NOCOMPRESSUPDATES

GETUPDATEBEFORES

----------scott.kafka

table SCOTT.KAFKA,tokens(

TKN-CSN = @GETENV('TRANSACTION', 'CSN'),

TKN-COMMIT-TS = @GETENV ('GGHEADER', 'COMMITTIMESTAMP'),

TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')

);

最后提示:

注册

GGSCI (p19cstd as ogg@p19c) 36> register extract e_ka,database;

2024-05-21 18:18:10 INFO OGG-02003 Extract group E_KA successfully registered with database at SCN 3642044.

再次启动 start e_ka

启动成功:

3.6、投递进程配置

edit params d_ka

加入下面配置

extract d_ka

rmthost 10.89.136.5,mgrport 7809,compress

userid ogg@10.89.136.3:1521/p19c password oggL123

PASSTHRU

numfiles 5000

rmttrail ./dirdat/ka

table scott.kafka;

3.7、添加投递进程

add extract d_ka,exttrailsource ./dirdat/ka

add rmttrail ./dirdat/ka,extract d_ka,megabytes 500

启动投递进程

start d_ka

4、表结构定义文件

4.1、defgen配置

edit params test_kafka

defsfile /u01/app/ogg/dirdef/kafka.def

userid ogg@10.89.136.3:1521/p19c password oggL123

table scott.kafka;

4.2、生成表结构定义文件

[oracle@source ogg12]$ pwd

/u01/app/oracle/ogg12

[oracle@source ogg12]$ ./defgen paramfile dirprm/test_kafka.prm

4.3、将生成的表定义文件发送到目标端

scp -P29022 /u01/app/ogg/dirdef/kafka.def root@10.89.136.5:/data460/server/ogg214/dirdef/

5、OGG for bigdata 端配置

5.1先目标端安装好 ogg for bigdata
上传并解压安装OGG for bigdata 软件

首先安装jdk及scala以及kafaka

本文使用的jdk为openjdk 1.8版本

scala 为2.13.14版本

kafka 为kafka_2.13-3.7.0.tgz

具体安装过程略。

软件目录为 : /data460/server

使用kafka自带zookeeper ,用于验证。

/etc/profile添加如下内容

#set java environment

JAVA_HOME=/data460/server/java/jdk8u392-b08

JRE_HOME=$JAVA_HOME/jre

CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib

PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

export JAVA_HOME JRE_HOME CLASS_PATH PATH

#set for scala

scala

export SCALA_HOME=/data460/server/scala

export PATH=$PATH:$SCALA_HOME/bin

#kafak

KAFKA_HOME=/data460/server/kafka

export PATH=$PATH:$KAFKA_HOME/bin

下面开始安装ogg for bigdata 21.4版本

1.1、创建ogg软件安装目录

mkdir -p /data460/server/ogg214
unzip 214000_ggs_Linux_x64_BigData_64bit.zip -d /data460/server/ogg214
cd /data460/server/ogg214

tar -xvf ggs_Linux_x64_BigData_64bit.tar

#删除安装压缩文件

rm ggs_Linux_x64_BigData_64bit.tar

配置环境变量

vim /etc/profile

增加ogg环境变量

export GGHOME=/data460/server/ogg214

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64:$GG_HOME:/lib

执行命令生效

source /etc/profile

测试一下环境变量有没有问题: [root@zookeeper ~]# cd /data460/server/ogg214 [root@zookeeper ogg12]# ./ggsci

没问题。
初始化目录

create subdirs

没问题

配置MGR进程

编辑MGR进程

GGSCI (zookeeper) 5> edit params mgr

写入下面内容

PORT 7809

DYNAMICPORTLIST 7810-7860

AUTORESTART ER *, RETRIES 3, WAITMINUTES 5

PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 30

lagreporthours 1

laginfominutes 30

lagcriticalminutes 60

启动mgr进程

GGSCI (zookeeper) 7> start mgr

Manager started.

GGSCI (zookeeper) 8> info all

Program Status Group Lag at Chkpt Time Since Chkpt

MANAGER RUNNING

配置 heartbeattable。

5.1、添加checkpoint表

在确保zookeeper集群和kafka正常的情况下做下面配置:

checkpoint即复制可追溯的一个偏移量记录,在全局配置里添加checkpoint表即可。

edit param ./GLOBALS

CHECKPOINTTABLE ogg.checkpoint

5.2、配置replicate进程

edit params rkafka

REPLICAT rkafka

-- Trail file for this example is located in "AdapterExamples/trail" directory

-- Command to add REPLICAT

-- add replicat rkafka, exttrail AdapterExamples/trail/tr

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES, RATE

GROUPTRANSOPS 10000

GETUPDATEBEFORES ----12.3版本要加此参数,若不加,在普通update时,即便抽取进程加了GETUPDATEBEFORES等参数,kafka表中的被修改字段修改前的值也不会被写入,11G版本不需要此参数亦可,详情看后面实施过程遇到的错误列表

MAP SCOTT., TARGET SCOTT.;

说明:REPLICATE rkafka定义rep进程名称;sourcedefs即在4.6中在源服务器上做的表映射文件;TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;REPORTCOUNT即复制任务的报告生成频率;GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系

5.3、配置custom_kafka_producer.properties

cd dirprm/

pwd

目前位置在:/data460/server/ogg214/dirprm

vim custom_kafka_producer.properties

bootstrap.servers=localhost:9092

acks=1

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

100KB per partition

batch.size=102400

linger.ms=10000

5.4、配置kafka.props

pwd

在这个位置

/data460/server/ogg214/dirprm

vim kafka.props

写入下面内容

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

#The following resolves the topic name using the short table name

gg.handler.kafkahandler.topicMappingTemplate=kafka

#The following selects the message key using the concatenated primary keys

#gg.handler.kafkahandler.keyMappingTemplate=$ {primaryKeys}

gg.handler.kafkahandler.format=json

gg.handler.kafkahandler.SchemaTopicName=scott

gg.handler.kafkahandler.BlockingSend =true

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=op

gg.handler.kafkahandler.format.includePrimaryKeys=true

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka

gg.classpath=dirprm/:/data460/server/kafka/libs/:/data460/server/ogg214/:/data460/server/ogg214/lib/

#Sample gg.classpath for HDP

#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

5.5、添加应用进程
add replicat rkafka,exttrail ./dirdat/ka

--默认是从ka00000开始读取, 如果需要修改应用进程读物位置可以执行:alter rkafka extseqno 3 extrba 1123

启动应用进程:

start rkafka

info all

停止,查看原因。

view report rkafka

参考文章: https://www.modb.pro/db/414869

内容更改为:

gg.handlerlist = kafkahandler

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties

#The following resolves the topic name using the short table name

gg.handler.kafkahandler.topicMappingTemplate=kafka

gg.handler.kafkahandler.SchemaTopicName=scott

gg.handler.kafkahandler.format=avro_op

gg.handler.kafkahandler.format=delimitedtext
gg.handler.kafkahandler.format.fieldDelimiter=|
gg.handler.kafkahandler.format.pkUpdateHandling=update
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.format.nullValueRepresentation=
gg.handler.kafkahandler.format.iso8601Format=false
gg.handler.kafkahandler.format.metaColumnsTemplate=$ {optype},$ {objectname}
gg.handler.kafkahandler.format.insertOpKey=I
gg.handler.kafkahandler.format.updateOpKey=U
gg.handler.kafkahandler.format.deleteOpKey=D
gg.handler.kafkahandler.format.truncateOpKey=T
gg.contentreplaceregex=(?<=^.{10}):
gg.contentreplacestring=CDATA[ ]
goldengate.userexit.timestamp=utc+8
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=INFO

gg.report.time=30sec

#Sample gg.classpath for Apache Kafka

gg.classpath=dirprm/:/data460/server/kafka/libs/:/data460/server/ogg214/:/data460/server/ogg214/lib/

#Sample gg.classpath for HDP

#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

再次启动

start rkafka
显示错误

主要两个问题,警告一个为目标端没有创建heartbeat table ,第二个没有交换空间,主要是没有交换空间。

创建交换空间。

创建2g的交换空间。

再次启动kafka

6、验证数据同步

源端对表kafka做insert、普通update、主键+普通列一起修改的PK Update操作并切换归档:

SQL> insert into kafka(empno,ename)values(321,'aa');

1 row created.

SQL> commit;

Commit complete.

SQL> update kafka set ename='ggg' where empno=321;

1 row updated.

SQL> commit;

Commit complete.

SQL> update kafka set ename='ggg',empno=123 where empno=321;

1 row updated.

SQL> commit;

Commit complete.

SQL> delete from kafka where empno=123;

1 row deleted.

SQL> commit;

Commit complete.

SQL> alter system switch logfile;

System altered.

目标端查看:

cd /data460/server/kafka

./bin/kafka-topics.sh --list --bootstrap-server localhost:9092

可以看到自动创建了kafka的topic

查看具体数据

./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic kafka --from-beginning

确认已捕获所有变更。

二、带数据初始化举例

以上表没有含源表的初始存在的数据。下面我们实验一个包含初始数据的例子。

源端scott用户创建测试用表

create table scemp as select * from emp;

create table scdept as select * from dept;

ALTER TABLE scemp ADD CONSTRAINT PK_scemp PRIMARY KEY (EMPNO);

ALTER TABLE scdept ADD CONSTRAINT PK_scdept PRIMARY KEY (DEPTNO);

2.1、源端OGG操作

1、添加附加日志

从dg库的ogg登录主库

GGSCI (p19cstd) 2> dblogin userid ogg@10.89.136.3:1521/p19c password oggL123

添加新增表的辅助日志

add trandata SCOTT.SCEMP

add trandata SCOTT.SCDEPT

2、源端配置初始化进程

数据初始化,指的是从源端Oracle 数据库将已存在的需要的数据同步至目标端,配置初始化进程:

注意使用oracle用户进入进行编辑,否则可能起进程的时候可能会提示无法找到文件。

可以看到时原先的表的extract和pump进程

增加如下配置。

add extract initsc,sourceistable

edit params initsc

填入以下内容

EXTRACT initsc

SETENV (NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8")

userid ogg@10.89.136.3:1521/p19c password oggL123

RMTHOST 10.89.136.5, MGRPORT 7809

RMTFILE ./dirdat/ed,maxfiles 999, megabytes 500

----------SCOTT.SCEMP

table SCOTT.SCEMP,tokens(

TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')

);

----------SCOTT.SCDEPT

table SCOTT.SCDEPT,tokens(

TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE')

);

3、源端生成表结构

GoldenGate 提供了 DEFGEN 工具,用于生成数据定义,当源表和目标表中 的定义不同时,GoldenGate 进程将引用该专用工具。在运行 DEFGEN 之前,需要 为其创建一个参数文件:

edit params init_scott 加入下面配置 defsfile /u01/app/ogg/dirdef/init_scott.def userid ogg@10.89.136.3:1521/p19c password oggL123 table scott.SCEMP; table scott.SCDEPT;

生成表结构文件,需要执行shell命令,如果配置中的文件已经存在,执行下面命令会报错,所以 在执行前需要先删除:

cd dirdef

调用工具生成

cd /u01/app/ogg

./defgen paramfile dirprm/init_scott.prm

表结构生成完成

将生成的定义文件传送到目标端, 目标端的replicate进程会使用这个文件。

scp -P29022 /u01/app/ogg/dirdef/init_scott.def root@10.89.136.5:/data460/server/ogg214/dirdef/

4、配置抽取进程

利用已有的抽取进程

停止现有投递及取进程

修改抽取进程参数,加入新的表。

edit params e_ka

加入以下内容

----------SCOTT.SCEMP table SCOTT.SCEMP,tokens( TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE') ); ----------SCOTT.SCDEPT table SCOTT.SCDEPT,tokens( TKN-OP-TYPE = @GETENV ('GGHEADER', 'OPTYPE') );

5、配置投递进程

edit params d_ka

加入新增表

table scott.scemp; table scott.scdept;

如下图

2.2、目标端操作

1.增加 replicat 初始化进程

ADD replicat init01, specialrun

edit params init01

添加如下配置

SPECIALRUN end runtime setenv(NLS_LANG="SIMPLIFIED CHINESE_CHINA.AL32UTF8") targetdb libfile libggjava.so set property=./dirprm/kafka.props SOURCEDEFS ./dirdef/init_scott.def EXTFILE ./dirdat/ed reportcount every 1 minutes, rate grouptransops 10000 map scott.SCEMP,target SCOTT.SCEMP; map scott.SCDEPT,target SCOTT.SCDEPT;

2、配置应用进程

因为之前已经配置了rkafka进程,现在只需要在这个进程里面加那两张表的配置就行。

view params rkafka

增加

MAP SCOTT.SCEMP, TARGET SCOTT.SCEMP; MAP SCOTT.SCDEPT, TARGET SCOTT.SCDEPT;

2.3、同步数据

1、源端操作

启动进程:

info all

start e_ka

start d_ka
info all

2、源端初始化进程

start initsc

info initsc

报错

进入对应目录,发现INITSC.cpe的用户组不对。如下图

移除原先文件
rm INITSC.cpe

add extract initsc,sourceistable
重新启动initsc试试。

start initsc

view report initsc

警告为 maxfiles 参数已经弃用,配置文件initsc去除maxfiles参数。

但数据已经抽取。

再次启动,源端数据已经抽取。

3、目标端启动初始化进程

启动目标端初始化进程

start init01

view report init01

提示错误,没有checkpoint表

根据文章 https://stackoverflow.com/questions/62984676/goldengate-specialrun-abending-with-ogg-02419-missing-checkpoint-file-name/74760498#74760498

目的端添加checkpoint

ADD CHECKPOINTTABLE

错误如下图

该函数没实现,看来for bigdata的这个功能不好用。

根据https://www.cnblogs.com/zfg1987love/articles/10670360.html

1.5.9章节,添加nodbcheckpoint 参数

目的删除init01组

delete init01

根据https://docs.oracle.com/en/middleware/goldengate/core/19.1/gclir/add-replicat.html

添加

ADD replicat init01, specialrun,NODBCHECKPOINT

提示对应参数不支持。

最后更具stackoverflow需要退出ggsci进行执行,默认初始化数据进程需要在非ggsci环境执行。

./replicat paramfile /data460/server/ogg214/dirprm/init01.prm

查看kafka数据,初始化成功了。
[root@ecs-test bin]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka --from-beginning

相关限制说明

测试环境为啥不能读取ogg备库的限制?需要asm磁盘管理用户读取日志。

标签: oracle kafka 数据库

本文转载自: https://blog.csdn.net/xiaoyang1982/article/details/139162222
版权归原作者 小扬的马甲 所有, 如有侵权,请联系我们删除。

“【原创】OGG21.3 CENTOS配置ORACLE 19C ADG至KAFKA”的评论:

还没有评论