Maxwell版本1.39.2
一: 添加zk的pox文件
<!-- customize HA -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
二: 创建zk工具类
在 com.zendesk.maxwell.util 包下创建 CuratorUtil 类,后面会使用此类实现高可用
package com.zendesk.maxwell.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CuratorUtil {
private final String zookeeperServers;
private final int sessionTimeoutMs;
private final int connectionTimeoutMs;
private final int baseSleepTimeMs;
private final int maxRetries;
private CuratorFramework client;
public CuratorUtil(String zookeeperServers, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs,
int maxRetries) {
this.zookeeperServers = zookeeperServers;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxRetries = maxRetries;
}
/*
* 构造 zookeeper 客户端,并连接 zookeeper 集群
*/
public void start() {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(this.baseSleepTimeMs, this.maxRetries);
client = CuratorFrameworkFactory.newClient(
this.zookeeperServers,
this.sessionTimeoutMs,
this.connectionTimeoutMs,
retryPolicy
);
client.start();
}
/*
* 实现分布式锁
*/
public void highAvailable() {
// 1.连接 Zookeeper 客户端
this.start();
// 2.向 zookeeper 注册自己
String lockPath = "/maxwell/ha/lock";
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
// 3.获取锁
lock.acquire();
// 4.将自己信息注册到 leader 路径
String leaderPath = "/maxwell/ha/leader";
client.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(leaderPath);
} catch (Exception e) {
e.printStackTrace();
}
}
}
三: 修改 com.zendesk.maxwell 包下的MaxwellConfig类
3.1 添加属性
// 类新增属性
public String zookeeperServers;
public int sessionTimeoutMs;
public int connectionTimeoutMs;
public int baseSleepTimeMs;
public int maxRetries;
3.2 buildOptionParser 方法添加代码
parser.accepts( "zookeeper", "zookeeper servers support maxwell high available" )
.withRequiredArg();
parser.accepts( "session_timeout_ms", "session timeout ms with zookeeper" )
.withRequiredArg();
parser.accepts( "connection_timeout_ms", "connection timeout ms with zookeeper" )
.withRequiredArg();
parser.accepts( "base_sleep_time_ms", "base sleep time ms if retry" )
.withRequiredArg();
parser.accepts( "max_retries", "max retry times" )
.withRequiredArg();
3.3 setup 方法添加代码
this.haMode = fetchBooleanOption("ha", options, properties, false);
this.zookeeperServers = fetchStringOption("zookeeper",options, properties, null);
this.sessionTimeoutMs = fetchIntegerOption("session_timeout_ms",options, properties, 5000);
this.connectionTimeoutMs = fetchIntegerOption("connection_timeout_ms",options, properties, 5000);
this.baseSleepTimeMs = fetchIntegerOption("base_sleep_time_ms",options, properties, 5000);
this.maxRetries = fetchIntegerOption("max_retries",options, properties, 3);
if (haMode && zookeeperServers == null){
LOGGER.error("you must specify --zookeeper because you want to use maxwell in ha mode");
}
四:修改 com.zendesk.maxwell.Maxwell 的main函数
将代码段
if ( config.haMode ) {
new MaxwellHA(maxwell, config.jgroupsConf, config.raftMemberID, config.clientID).startHA();
} else {
maxwell.start();
}
全部注释掉,修改为
if ( config.haMode ) {
CuratorUtil curatorUtil = new CuratorUtil(config.zookeeperServers, config.sessionTimeoutMs, config.connectionTimeoutMs, config.baseSleepTimeMs, config.maxRetries);
curatorUtil.highAvailable();
}
maxwell.start();
然后重新打包就能得到基于zk的高可用版本了,打包时可以将test包删除,防止出现错误
源码下载地址
五: 启动脚本
5.1 创建配置文件 config.properties
log_level=info
mysql login info
host=localhost
port=3306
user=root
password=root123
schema_database=maxwell
options to pass into the jdbc connection, given as opt=val&opt2=val2
#jdbc_options=opt1=100&opt2=hello
producer=kafka
*** kafka ***
producer=kafka
#kafka.bootstrap.servers=hosta:9092,hostb:9092
kafka.bootstrap.servers=localhost:9092
kafka.max.request.size = 104857600
kafka_topic=mysql.%{database}.%{table}
kafka_version=2.7.0
alternative kafka topic to write DDL (alter/create/drop) to. Defaults to kafka_topic
#ddl_kafka_topic=maxwell_ddl
hash function to use. "default" is just the JVM's 'hashCode' function.
#kafka_partition_hash=default # [default, murmur3]
how maxwell writes its kafka key.
'hash' looks like:
{"database":"test","table":"tickets","pk.id":10001}
'array' looks like:
["test","tickets",[{"id":10001}]]
default: "hash"
#kafka_key_format=hash # [hash, array]
5.2 启动脚本编写 startup.sh
#!/bin/bash
single(){
bin/maxwell --filter 'exclude: ., include: cp.*' --kafka_version=2.7.0 --config=config.properties --daemon
echo -e "\033[32m单机版启动成功\n\033[0m"
}
ha(){
zookeeper 多个用,分割
bin/maxwell --filter 'exclude: ., include: cp.*' --kafka_version=2.7.0 --config=config.properties --ha --zookeeper=127.0.0.1:2181 --daemon
echo -e "\033[32m高可用版启动成功\n\033[0m"
}
case "$1" in
'ha')
ha
;;
*)
single
;;
esac
5.2.1 高可用版本启动命令
./startup.sh ha
5.2.2 单机版启动命令
./startup.sh
版权归原作者 SongJingzhou 所有, 如有侵权,请联系我们删除。