0


Java通过calcite实时读取kafka中的数据

引入maven依赖

  1. <dependency>
  2. <groupId>org.apache.calcite</groupId>
  3. <artifactId>calcite-kafka</artifactId>
  4. <version>1.28.0</version>
  5. </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

  1. public static void main(String[] args) throws SQLException {
  2. String model = "inline:" +
  3. "{\n" +
  4. " \"version\": \"1.0\",\n" +
  5. " \"defaultSchema\": \"KAFKA\",\n" +
  6. " \"schemas\": [\n" +
  7. " {\n" +
  8. " \"name\": \"KAFKA\",\n" +
  9. " \"tables\": [\n" +
  10. " {\n" +
  11. " \"name\": \"TEST_TABLE\",\n" +
  12. " \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +
  13. " \"stream\": { \"stream\": true },\n" +
  14. " \"operand\": {\n" +
  15. " \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +
  16. " \"topic.name\": \"my-cloud-events\",\n" +
  17. " \"consumer.params\": {\n" +
  18. " \"group.id\": \"calcite-ut-consumer\",\n" +
  19. " \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +
  20. " \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +
  21. " }\n" +
  22. " }\n" +
  23. " }\n" +
  24. " ]\n" +
  25. " }\n" +
  26. " ]\n" +
  27. "}";
  28. Properties info = new Properties();
  29. info.put("model", model);
  30. Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
  31. final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
  32. final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";
  33. print(calciteConnection,sql7);
  34. connection.close();
  35. calciteConnection.close();
  36. }
  37. public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {
  38. final PreparedStatement statement = calciteConnection.prepareStatement(sql7);
  39. final ResultSet resultSet = statement.executeQuery();
  40. ResultSetMetaData metadata = resultSet.getMetaData();
  41. while (resultSet.next()) {
  42. for (int i = 1; i <= metadata.getColumnCount(); i++) {
  43. System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");
  44. }
  45. System.out.println();
  46. }
  47. }

}

发送测试数据

运行结果

标签: java kafka linq

本文转载自: https://blog.csdn.net/u010479989/article/details/143849934
版权归原作者 常量侠 所有, 如有侵权,请联系我们删除。

“Java通过calcite实时读取kafka中的数据”的评论:

还没有评论