kafka connect当写入到Mysql这类的关系型数据库时,使用JdbcSinkConnector,且kafka中的数据需要具备schemas,否则是无法写入的。
只有两种数据可以写入:
1.使用Confluent Schema Registry 在写入kafka时,就用Avro、Protobuf 或 JSON Schema的converter进行schema的转换
2.带着schema的Json数据
{
"schema": {
"type": "struct", "optional": false, "version": 1, "fields": [
{ "field": "ID", "type": "string", "optional": true },
{ "field": "Artist", "type": "string", "optional": true },
{ "field": "Song", "type": "string", "optional": true }
] },
"payload": {
"ID": 1,
"Artist": "Rick Astley",
"Song": "Never Gonna Give You Up"
}
}
第一种方式的操作如下:
首先需要启动schemaRegistry,并启动schemaRegistry服务器,配置和启动命令如下:
listeners=http://0.0.0.0:18081
kafkastore.bootstrap.servers=192.168.83.98:9092
kafkastore.topic=_schemas
kafkastore.timeout.ms=5000
kafkastore.topic.replication.factor=1
# 如果使用 SSL/TLS 加密连接,请取消注释并提供相应的配置
# kafkastore.ssl.truststore.location=/path/to/truststore.jks
# kafkastore.ssl.truststore.password=truststore_password
# 如果启用了身份验证,请取消注释并提供相应的配置
# kafkastore.sasl.mechanism=PLAIN
# kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
# username="schema_registry" \
# password="schema_registry_password";
# 可选的配置项
# debug=true
# host.name=<Schema Registry 主机名>
# compatibility=BACKWARD, FORWARD, FULL, NONE
# master.eligibility=true
# authentication.method=NONE, BASIC, DIGEST, SSL, SASL_PLAIN, SASL_SCRAM_256, SASL_SCRAM_512
启动命令:
./confluent-6.0.1/bin/schema-registry-start -daemon schema-registry.properties
使用JdbcSourceConnector接入数据时,就使用AvroConverter进行处理,并指定schema.registry的服务器
{
"name": "KTFH_O_ORG4",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"topics": "xlg_test_kafka3",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://192.168.80.231:18081",
"connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp?user=root&password=Zx123456@shining11&useSSL=false",
"table.whitelist" : "xlg_test",
"mode":"bulk",
"topic.prefix":"mysql-"
}
}
在sink的时候,也指定AvroConverter并指定schema.registry服务器
{
"name": "KTFH_O_ORG3",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"auto.create": "true",
"topics": "mysql-xlg_test",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://192.168.80.231:18081",
"connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp",
"connection.user": "root",
"connection.password": "Zx123456@shining11",
"insert.mode": "INSERT",
"table.name.format" : "xlg_test2"
}
}
版权归原作者 im_cheer 所有, 如有侵权,请联系我们删除。