0


Scala编程 读取Kafka处理并写入Redis

部分知识(可略过)

Kafka

    Kafka是一种分布式流处理平台,它是一个高吞吐量、可扩展、持久化的消息队列系统,用于处理实时数据流。Kafka的核心概念包括生产者(Producer)、消费者(Consumer)和主题(Topic)。生产者负责将数据发布到Kafka集群,消费者则从Kafka集群中订阅并消费数据。主题是数据的分类或者分区,每个主题可以有多个分区,而每个分区又可以有多个副本。这种分区和复制的机制使得Kafka具备了高可用性和容错性。同时,Kafka还提供了丰富的API和生态系统,使得开发者可以方便地构建基于Kafka的实时数据处理应用。

Redis

    Redis是一个开源的内存数据结构存储系统,它可以用作数据库、缓存和消息中间件。Redis支持多种数据结构,包括字符串、哈希表、列表、集合、有序集合等。它以键值对的形式存储数据,并且数据存储在内存中,因此具有快速的读写性能。Redis还提供持久化功能,可以将数据保存到磁盘上,以便在重启后恢复数据。由于其高性能、灵活性和丰富的功能,Redis被广泛应用于各种场景,如缓存加速、实时计数、排行榜、消息队列等。

Scala

    Scala是一种面向对象的编程语言,也是一种函数式编程语言,它结合了面向对象编程和函数式编程的特性。Scala运行在Java虚拟机上,因此可以与Java代码无缝地集成。Scala具有静态类型系统,支持类型推断,可以提高代码的可读性和可维护性。Scala还提供了许多高级特性,如高阶函数、模式匹配、类型类等,使得编写高效、简洁、可重用的代码变得更加容易。Scala在大数据处理、Web应用程序、分布式系统等领域得到了广泛应用。

Spark Streaming(DStream)

    Spark Streaming是Apache Spark的一个组件,它提供了实时数据处理和流式计算的功能。它允许开发人员使用Spark的强大的批处理引擎来处理实时数据流。Spark Streaming可以从多种数据源(如Kafka、Flume、HDFS等)接收实时数据,并将其分成小的批次进行处理。每个批次都可以像处理静态数据一样使用Spark的高级API进行处理,包括使用SQL查询、机器学习算法和图处理等。

RDD

    RDD是Apache Spark中的一个核心概念。RDD是一个可分区、可并行计算的数据集合,它可以在分布式计算环境中进行高效的数据处理和分析。RDD具有容错性,即在计算过程中可以自动恢复失败的节点,并且可以在内存中缓存数据,以提高计算性能。RDD提供了一系列的转换操作(如map、filter、reduce等)和行动操作(如count、collect、save等),可以对数据集进行各种复杂的计算和操作。通过使用RDD,可以方便地进行大规模数据处理和分析。

Jedis

    Jedis是一个Java语言编写的用于操作Redis数据库的客户端库。Jedis客户端库可以让Java开发者通过简单的API调用来连接、操作和管理Redis数据库。它提供了丰富的功能和灵活的接口,使得开发者可以方便地在Java应用程序中使用Redis。

Scala编程

目的

    使用Scala编程,用Spark Streaming采集Kafka消费者端口接收到的信息,对信息进行处理求出每个电影ID对应的平均分数并写入到Redis数据库中。

导入Maven依赖

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.4.8</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
    </dependencies>

创建Scala文件导入依赖

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

创建Spark相关对象

    // 创建SparkConf对象,名称为"KafkaToRedis",并将运行模式设置为本地模式
    val sparkConf = new SparkConf().setAppName("KafkaToRedis").setMaster("local")

    // 创建SparkContext对象
    val sc = new SparkContext(sparkConf)

    // 创建StreamingContext对象, 设置每个批次的时间间隔为1秒
    val ssc = new StreamingContext(sc, Seconds(1))

创建Kafka数据流

  • bootstrap.servers:Kafka消费者的IP地址以及端口号
  • auto.offset.reset:对偏移量的采集设置,latest:最新偏移量,即程序运行后会从Kafka消费者接收到的最新的消息开始采集。earliest:最早偏移量,即程序运行后会从Kafka消费者接收的第一条消息开始采集。
  • group.id:消费者组ID,需要到kafka/config/consumer.properties中去查看
    // 定义了Kafka的相关参数,包括Kafka的地址、键和值的反序列化器、偏移量重置方式和消费者组ID
    val kafkaParams = Map("bootstrap.servers" -> "192.168.181.128:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "latest",//latest:最新偏移量,earliest:最早偏移量
      "group.id" -> "test-consumer-group")

    // 定义了要订阅的主题集合
    val topics = Set("order")

    // 创建了一个直接流,将Kafka的消息流转换为DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

处理数据流

** (通过kafka的API调用使用JAVA编写kafka生产者实现每秒向消费者发送一条消息,该程序本文未展出)**

** ** 消费者接收端数据如下

    处理方法:

    1.因为采集到的一行data数据为{"movie_id":"84","movie_rank":8,"user_id":"365"},需要对数据进行分割提取处理。

    2.以data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()为例

            split(",")(0)将data按逗号分割提取第一个,得到{"movie_id":"84"

           split(":")(1)将上一步得到的数据按‘:’经行分割提取第二个,得到"84"

            replaceAll("\"", "")将上一步得到的数据中的所有双引号替换为空字符串,得到 84  

            .trim()去除上一步得到的数据的两端的空格,得到84

        按照这个方法将提取到的数据存入movieId,movieRank,userId中

    3. 计算平均值

            读取redis数据库中键为movie_rank,字段为movieId的值。

            如果数据库中查不到,则将这条数据存入数据库中,格式为(电影平均分,评论次数)。

            如果找到了,则读取电影平均分和评论次数设置为旧参数

            计算公式:新的平均分=((旧平均分*旧评论次数)+  新的评论分数)/(旧评论次数+1)

            存入计算结果,(新电影平均分,旧评论次数+1)
    // 对每行数据进行处理
    kafkaStream.foreachRDD { rdd =>
      rdd.foreach { record =>

        //设置jedis客户端连接redis数据库
        val jedis = new Jedis("192.168.181.128", 6379)

        //创建data对象储存采集到的value值
        val data = record.value()

        //处理数据提取出movie_id,movie_rank,user_id的值
        val movieId = data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()
        val movieRank = data.split(",")(1).split(":")(1).trim()
        val userId = data.split(",")(2).split(":")(1).replaceAll("\"", "").replaceAll("}", "").trim()

        // 读取redis中哈希表键为movie_rank,字段为movieId的值(读取对应电影ID的平均分)
        val movieRankInfo = jedis.hget("movie_rank", movieId)

        // 如果Redis中没有该电影的评分信息,则将该电影的评分信息存入Redis
        if (movieRankInfo == null) {
          jedis.hset("movie_rank", movieId, s"$movieRank,1")
        } else {
          // 如果Redis中已经有该电影的评分信息,则更新该电影的评分信息
          val oldRank = movieRankInfo.split(",")(0).toDouble
          val oldCount = movieRankInfo.split(",")(1).toInt
          val newRank = (oldRank * oldCount + movieRank.toDouble) / (oldCount + 1)
          jedis.hset("movie_rank", movieId, s"$newRank,${oldCount + 1}")
        }
      }
    }

启动Spark Streaming

ssc.start()
ssc.awaitTermination()

完整代码

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis

object KafkaToRedis {
  def main(args: Array[String]): Unit = {

    // 创建SparkConf对象,名称为"KafkaToRedis",并将运行模式设置为本地模式
    val sparkConf = new SparkConf().setAppName("KafkaToRedis").setMaster("local")

    // 创建SparkContext对象
    val sc = new SparkContext(sparkConf)

    // 创建StreamingContext对象, 设置每个批次的时间间隔为1秒
    val ssc = new StreamingContext(sc, Seconds(1))

    // 定义了Kafka的相关参数,包括Kafka的地址、键和值的反序列化器、偏移量重置方式和消费者组ID
    val kafkaParams = Map("bootstrap.servers" -> "192.168.181.128:9092",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "auto.offset.reset" -> "latest",//latest:最新偏移量,earliest:最早偏移量
      "group.id" -> "test-consumer-group")

    // 定义了要订阅的主题集合
    val topics = Set("order")

    // 创建了一个直接流,将Kafka的消息流转换为DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

    // 对每行数据进行处理
    kafkaStream.foreachRDD { rdd =>
      rdd.foreach { record =>

        val jedis = new Jedis("192.168.181.128", 6379)
        val data = record.value()
        val movieId = data.split(",")(0).split(":")(1).replaceAll("\"", "").trim()
        val movieRank = data.split(",")(1).split(":")(1).trim()
        val userId = data.split(",")(2).split(":")(1).replaceAll("\"", "").replaceAll("}", "").trim()

        val movieRankInfo = jedis.hget("movie_rank", movieId)

        // 如果Redis中没有该电影的评分信息,则将该电影的评分信息存入Redis
        if (movieRankInfo == null) {
          jedis.hset("movie_rank", movieId, s"$movieRank,1")
        } else {
          // 如果Redis中已经有该电影的评分信息,则更新该电影的评分信息
          val oldRank = movieRankInfo.split(",")(0).toDouble
          val oldCount = movieRankInfo.split(",")(1).toInt
          val newRank = (oldRank * oldCount + movieRank.toDouble) / (oldCount + 1)
          jedis.hset("movie_rank", movieId, s"$newRank,${oldCount + 1}")
        }
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

测试结果

代码运行结果

    Scala程序正常运行,每秒都在抓取数据计算并写入Redis

Redis 查询结果

    进入redis/bin目录,登录redis数据库
./redis-cli -h 192.168.181.128
     选择0号数据库,查看该数据库中的键

    查看movie_rank哈希表的所有值
192.168.181.128:6379> hgetall movie_rank 

让Scala程序运行一段时间再次查询

数据库的内容正在实时改变,说明程序正常运行


本文转载自: https://blog.csdn.net/qq_65960840/article/details/135316818
版权归原作者 LiyC; 所有, 如有侵权,请联系我们删除。

“Scala编程 读取Kafka处理并写入Redis”的评论:

还没有评论