0


Java通过calcite实时读取kafka中的数据

引入maven依赖

    <dependency>

        <groupId>org.apache.calcite</groupId>

        <artifactId>calcite-kafka</artifactId>

        <version>1.28.0</version>

    </dependency>

测试代码

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.ResultSetMetaData;

import java.sql.SQLException;

import java.util.Properties;

public class CalciteDemo {

public static void main(String[] args) throws SQLException {

    String model = "inline:" +

            "{\n" +

            "  \"version\": \"1.0\",\n" +

            "  \"defaultSchema\": \"KAFKA\",\n" +

            "  \"schemas\": [\n" +

            "    {\n" +

            "    \"name\": \"KAFKA\",\n" +

            "    \"tables\": [\n" +

            "      {\n" +

            "        \"name\": \"TEST_TABLE\",\n" +

            "        \"factory\": \"org.apache.calcite.adapter.kafka.KafkaTableFactory\",\n" +

            "        \"stream\": { \"stream\": true },\n" +

            "        \"operand\": {\n" +

            "          \"bootstrap.servers\": \"192.168.x.xx:9092\",\n" +

            "          \"topic.name\": \"my-cloud-events\",\n" +

            "          \"consumer.params\": {\n" +

            "            \"group.id\": \"calcite-ut-consumer\",\n" +

            "            \"key.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\",\n" +

            "            \"value.deserializer\": \"org.apache.kafka.common.serialization.ByteArrayDeserializer\"\n" +

            "          }\n" +

            "        }\n" +

            "      }\n" +

            "    ]\n" +

            "    }\n" +

            "  ]\n" +

            "}";

    Properties info = new Properties();

    info.put("model", model);

    Connection connection = DriverManager.getConnection("jdbc:calcite:", info);

    final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

    final String sql7 = "SELECT STREAM * FROM \"KAFKA\".\"TEST_TABLE\"";

    print(calciteConnection,sql7);

    connection.close();

    calciteConnection.close();

}

public static void print(CalciteConnection calciteConnection, String sql7) throws SQLException {

    final PreparedStatement statement = calciteConnection.prepareStatement(sql7);

    final ResultSet resultSet = statement.executeQuery();

    ResultSetMetaData metadata = resultSet.getMetaData();

    while (resultSet.next()) {

        for (int i = 1; i <= metadata.getColumnCount(); i++) {

            System.out.print(metadata.getColumnLabel(i) + "=" + resultSet.getString(i) + ",");

        }

        System.out.println();

    }

}

}

发送测试数据

运行结果

标签: java kafka linq

本文转载自: https://blog.csdn.net/u010479989/article/details/143849934
版权归原作者 常量侠 所有, 如有侵权,请联系我们删除。

“Java通过calcite实时读取kafka中的数据”的评论:

还没有评论