0


Kafka Connect JdbcSinkConnector的schema处理

kafka connect当写入到Mysql这类的关系型数据库时,使用JdbcSinkConnector,且kafka中的数据需要具备schemas,否则是无法写入的。

只有两种数据可以写入:

1.使用Confluent Schema Registry 在写入kafka时,就用Avro、Protobuf 或 JSON Schema的converter进行schema的转换

2.带着schema的Json数据

{
  "schema": {
    "type": "struct", "optional": false, "version": 1, "fields": [
      { "field": "ID", "type": "string", "optional": true },
      { "field": "Artist", "type": "string", "optional": true },
      { "field": "Song", "type": "string", "optional": true }
    ] },
  "payload": {
    "ID": 1,
    "Artist": "Rick Astley",
    "Song": "Never Gonna Give You Up"
  }
}

第一种方式的操作如下:

首先需要启动schemaRegistry,并启动schemaRegistry服务器,配置和启动命令如下:

listeners=http://0.0.0.0:18081

kafkastore.bootstrap.servers=192.168.83.98:9092
kafkastore.topic=_schemas
kafkastore.timeout.ms=5000
kafkastore.topic.replication.factor=1

# 如果使用 SSL/TLS 加密连接,请取消注释并提供相应的配置
# kafkastore.ssl.truststore.location=/path/to/truststore.jks
# kafkastore.ssl.truststore.password=truststore_password

# 如果启用了身份验证,请取消注释并提供相应的配置
# kafkastore.sasl.mechanism=PLAIN
# kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
#   username="schema_registry" \
#   password="schema_registry_password";

# 可选的配置项
# debug=true
# host.name=<Schema Registry 主机名>
# compatibility=BACKWARD, FORWARD, FULL, NONE
# master.eligibility=true
# authentication.method=NONE, BASIC, DIGEST, SSL, SASL_PLAIN, SASL_SCRAM_256, SASL_SCRAM_512

启动命令:

./confluent-6.0.1/bin/schema-registry-start -daemon schema-registry.properties

使用JdbcSourceConnector接入数据时,就使用AvroConverter进行处理,并指定schema.registry的服务器

{
     "name": "KTFH_O_ORG4",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
         "tasks.max": "1",
         "topics": "xlg_test_kafka3",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://192.168.80.231:18081",
         "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp?user=root&password=Zx123456@shining11&useSSL=false",
         "table.whitelist" : "xlg_test",
         "mode":"bulk",
         "topic.prefix":"mysql-"
      }
}

在sink的时候,也指定AvroConverter并指定schema.registry服务器

{
     "name": "KTFH_O_ORG3",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max": "1",
         "auto.create": "true",
         "topics": "mysql-xlg_test",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url": "http://192.168.80.231:18081",
         "connection.url": "jdbc:mysql://192.168.83.22:13319/hebei_air_529_temp",
         "connection.user": "root",
         "connection.password": "Zx123456@shining11",
         "insert.mode": "INSERT",
         "table.name.format" : "xlg_test2"
      }
}
标签: kafka 分布式

本文转载自: https://blog.csdn.net/im_cheer/article/details/131238182
版权归原作者 im_cheer 所有, 如有侵权,请联系我们删除。

“Kafka Connect JdbcSinkConnector的schema处理”的评论:

还没有评论