0


flink中使用kafka用于消费kafka数据

注意:服务器flink版本等要与代码一致,不然会发布失败,本地成功

pom文件


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.imooc.flink.java</groupId>
    <artifactId>flink-train-java</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

       <!-- <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--kafka-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-connector-kafka_2.11</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--        </dependency>-->

        <dependency>
            <groupId>com.taosdata.jdbc</groupId>
            <artifactId>taos-jdbcdriver</artifactId>
            <version>2.0.18</version>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>3.4.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
            <!--<scope>compile</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.8.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--/*********** flink json**************/-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.9.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
        </dependency>
    </dependencies>

    <!--<build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.4.5</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>${project.build.sourceEncoding}</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>-->
    <build>
        <plugins>
            <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <!-- 声明绑定到 maven 的 compile 阶段 -->
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>

                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                    <version>2.4.5</version>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

主函数

package com.zx.iot.consumer;

import com.alibaba.fastjson.JSONObject;
import com.zx.iot.dto.Equipment;
import com.zx.iot.dto.Thing;
import com.zx.iot.producer.IotProducer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
//@Slf4j

/**
 * 使用Java API来开发Flink的实时处理应用程序.
 * <p>
 * wc统计的数据我们源自于socket
 */
public class IotDataFlink {
    public static void main(String[] args) throws Exception {
        // step1 :获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.enableCheckpointing(1000);
        //配置kafka
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.6.24.56:9092");//kafka10.6.24.56:2181
        properties.setProperty("group.id", "test");
        DataStream<String> stream = env
                .addSource(new FlinkKafkaConsumer<>("iot-data", new SimpleStringSchema(), properties));

        DataStream<Thing> answerDataStream = stream.map((String strMsg) -> {
            Thing thing = new Thing();
            try {
                JSONObject jsonObject = JSONObject.parseObject(strMsg);
                thing.setTs(jsonObject.get("ts").toString());
                String data = jsonObject.get("data").toString();
                //thing.setData(data);
                List<Equipment> list = new ArrayList<>();
                if(data!=null){
                    String[] equipmentArray = data.split(",");
                    for (int i = 0;i<equipmentArray.length;i++
                         ) {
                        String equipmentStr = equipmentArray[i];
                        String[] equipmentInfoArray = equipmentStr.split(":");
                        if(equipmentInfoArray.length==2){
                            Equipment equipment = new Equipment();
                            equipment.setEquipmentCode(equipmentInfoArray[0]);
                            equipment.setStatus(equipmentInfoArray[1]);
                            list.add(equipment);
                        }
                    }
                }
                thing.setList(list);
                System.err.println(thing.toString());
                IotProducer iotProducer = new IotProducer();
                iotProducer.sendList("iot-data-calculate",thing);
            } catch (Exception ex) {
                System.err.println("catch");
            } finally {
                return thing;
            }
        }).filter(x->x!=null);

        /*stream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                System.err.println(tokens[0]);
                for(String token : tokens) {
                    if(token.length() > 0) {
                        collector.collect(new Tuple2<String,Integer>(token,1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5000)).sum(1).print().setParallelism(10);*///

        env.execute("wordcount");
    }
}

数据接收类

**Thing类**
package com.zx.iot.dto;

import java.util.List;

public class Thing {
    private String ts;
    private String data;
    private List<Equipment> list;
    public String getTs() {
        return ts;
    }

    public void setTs(String ts) {
        this.ts = ts;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    public List<Equipment> getList() {
        return list;
    }

    public void setList(List<Equipment> list) {
        this.list = list;
    }

    public Thing(String ts, String data, List<Equipment> list) {
        this.ts = ts;
        this.data = data;
        this.list = list;
    }

    public Thing() {
    }

    @Override
    public String toString() {
        return "Thing{" +
                "ts='" + ts + '\'' +
                ", data='" + data + '\'' +
                ", list=" + list +
                '}';
    }
}
**Equipment类**
package com.zx.iot.dto;

public class Equipment {
    private String equipmentCode;
    private String status;

    public String getEquipmentCode() {
        return equipmentCode;
    }

    public void setEquipmentCode(String equipmentCode) {
        this.equipmentCode = equipmentCode;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public Equipment(String equipmentCode, String status) {
        this.equipmentCode = equipmentCode;
        this.status = status;
    }

    public Equipment() {
    }

    @Override
    public String toString() {
        return "Equipment{" +
                "equipmentCode='" + equipmentCode + '\'' +
                ", status='" + status + '\'' +
                '}';
    }
}

监听后发送数据

package com.zx.iot.producer;

import com.alibaba.fastjson.JSON;
import com.zx.iot.dto.Equipment;
import com.zx.iot.dto.Thing;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class IotProducer {

    /*public static void main(String[] args) {
        IotProducer iotProducer = new IotProducer();
        Thing thing = new Thing();
        List list = new ArrayList();
        Equipment equipment = new Equipment();
        equipment.setStatus("1");
        equipment.setEquipmentCode("fdghudj237utcdysihxj237yh");
        list.add(equipment);

        thing.setList(list);
        thing.setTs("9872120988421");
        thing.setData("fduoijwps");
        try {
            iotProducer.sendList("iot-data-calculate",thing);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }*/
    public static void sendList(String topic, Thing thing) throws Exception {
        // 0 初始化flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 分区数:不能超过cpu数
        env.setParallelism(3);

        // 1 读取集合中数据
        /*ArrayList<String> wordsList = new ArrayList<>();
        wordsList.add("hello");
        wordsList.add("world");
        wordsList.add("intmall");*/
        //JSONArray Object = JSONArray.toJSON(list);

        //DataStreamSource<Thing> stream = env.fromElements(thing);
        String json = JSON.toJSONString(thing);
        DataStreamSource<String> stream =env.fromElements(json);
        System.err.println("发送。。。。。。。。。。"+topic);
        // 2 kafka 生产者配置信息
        Properties properties = new Properties();
        //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.6.24.56:9092");
        properties.setProperty("bootstrap.servers", "10.6.24.56:9092");//kafka10.6.24.56:9092
        properties.setProperty("group.id", "test");

        // 3 创建kafka生产者
        FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);

        // 4 生产者和 flink 流关联
        stream.addSink(kafkaProducer);
        System.err.println("发送后。。。。。。。。。。");
        // 5 执行
        env.execute("sender");
    }
}
标签: kafka flink java

本文转载自: https://blog.csdn.net/qq_39246466/article/details/128093235
版权归原作者 择业 所有, 如有侵权,请联系我们删除。

“flink中使用kafka用于消费kafka数据”的评论:

还没有评论