0


Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
在这里插入图片描述

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.4

kafka_2.10-0.8.2.2

spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

在这里插入图片描述

3、操作完成后,再点击Add Library选项

在这里插入图片描述

4、进入以下界面

在这里插入图片描述

5、点击完成即可
6、最后创建如下项目结构的文件

在这里插入图片描述

四、编写代码,运行程序

编写生产者代码

packagemy.kafka;importjava.io.BufferedReader;importjava.io.File;importjava.io.FileNotFoundException;importjava.io.FileReader;importjava.io.IOException;importjava.util.Properties;importkafka.javaapi.producer.Producer;importkafka.producer.KeyedMessage;importkafka.producer.ProducerConfig;publicclassKafkaSend{privatefinalProducer<String,String> producer;publicfinalstaticStringTOPIC="kafkasendspark";publicKafkaSend(){Properties props =newProperties();// 此处配置的是kafka的端口  
        props.put("metadata.broker.list","localhost:9092");// 配置value的序列化类  
        props.put("serializer.class","kafka.serializer.StringEncoder");// 配置key的序列化类  
        props.put("key.serializer.class","kafka.serializer.StringEncoder");  
        props.put("request.required.acks","-1");  
        producer =newProducer<String,String>(newProducerConfig(props));}voidproduce(){int lineNo =1;File file =newFile("/data/case6/buyer_favorite1");BufferedReader reader =null;try{  
            reader =newBufferedReader(newFileReader(file));String tempString =null;while((tempString = reader.readLine())!=null){String key =String.valueOf(lineNo);String data = tempString;  
                producer.send(newKeyedMessage<String,String>(TOPIC, key, data));System.out.println(data);  
                lineNo++;Thread.sleep(100);}}catch(FileNotFoundException e){System.err.println(e.getMessage());}catch(IOException e){System.err.println(e.getMessage());}catch(InterruptedException e){System.err.println(e.getMessage());}}publicstaticvoidmain(String[] args){System.out.println("start");newKafkaSend().produce();System.out.println("finish");}}

编写消费者代码

packagemy.scalaimportorg.apache.spark.SparkConf  
importorg.apache.spark.streaming.StreamingContext  
importorg.apache.spark.streaming.Seconds  
importscala.collection.immutable.Map  
importorg.apache.spark.streaming.kafka.KafkaUtils  
importkafka.serializer.StringDecoder  
importkafka.serializer.StringDecoder  
object SparkReceive {def main(args: Array[String]){val sparkConf =new SparkConf().setAppName("countuser").setMaster("local")val ssc =new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint("checkpoint")val topics = Set("kafkasendspark")val brokers ="localhost:9092"val zkQuorum ="localhost:2181"val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers,"serializer.class"->"kafka.serializer.StringEncoder")val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)val addFunc =(currValues: Seq[Int], prevValueState: Option[Int])=>{//通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  val currentCount = currValues.sum  
      // 已累加的值  val previousCount = prevValueState.getOrElse(0)// 返回累加后的结果,是一个Option[Int]类型  
      Some(currentCount + previousCount)}val result=lines.map(line =>(line._2.split("\t"))).map( row =>(row(0),1)).updateStateByKey[Int](addFunc).print()  
  
    ssc.start();  
    ssc.awaitTermination()}}

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
在这里插入图片描述

标签: kafka spark java

本文转载自: https://blog.csdn.net/weixin_52323239/article/details/131611370
版权归原作者 piaow_ 所有, 如有侵权,请联系我们删除。

“Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作”的评论:

还没有评论