0


Spark实时(三):Structured Streaming入门案例

Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

 <!-- 配置以下可以解决 在jdk1.8环境下打包时报错 “-source 1.5 中不支持 lambda 表达式” -->
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spark.version>3.4.3</spark.version>
  </properties>

  <dependencies>
    <!-- Spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- SparkSQL -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!-- SparkSQL  ON  Hive-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>
    <!--mysql依赖的jar包-->
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.47</version>
    </dependency>
    <!--SparkStreaming-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- Kafka 0.10+ Source For Structured Streaming-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- 向kafka 生产数据需要包 -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.8.0</version>
    </dependency>

    <!-- Scala 包-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-compiler</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-reflect</artifactId>
      <version>2.12.15</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.12</version>
    </dependency>
    <dependency>
      <groupId>com.google.collections</groupId>
      <artifactId>google-collections</artifactId>
      <version>1.0</version>
    </dependency>

  </dependencies>

一、Scala代码如下

package com.lanson.structuredStreaming

/**
 *  Structured Streaming 实时读取Socket数据
 */

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * Structured Streaming 读取Socket数据
 */
object SSReadSocketData {
  def main(args: Array[String]): Unit = {

    //1.创建SparkSession对象
    val spark: SparkSession = SparkSession.builder()
      .master("local")
      .appName("StructuredSocketWordCount")
      //默认200个并行度,由于源头数据量少,可以设置少一些并行度
      .config("spark.sql.shuffle.partitions",1)
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.setLogLevel("Error")

    //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node3")
      .option("port", 9999)
      .load()

    //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作
    val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})

    //4.按照单词分组,统计个数,自动多一个列count
    val wordCounts: DataFrame = words.groupBy("value").count()

    //5.启动流并向控制台打印结果
    val query: StreamingQuery = wordCounts.writeStream
      //更新模式设置为complete
      .outputMode("complete")
      .format("console")
      .start()
    query.awaitTermination()

  }

}

二、Java 代码如下

package com.lanson.structuredStreaming;

import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class SSReadSocketData01 {

    public static void main(String[] args) throws StreamingQueryException, TimeoutException {
        SparkSession spark = SparkSession.builder().master("local")
            .appName("SSReadSocketData01")
            .config("spark.sql.shuffle.partitions", 1)
            .getOrCreate();
        
        spark.sparkContext().setLogLevel("Error");

        Dataset<Row> lines = spark.readStream().format("socket")
            .option("host", "node3")
            .option("port", 9999)
            .load();

        Dataset<String> words = lines.as(Encoders.STRING())
            .flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    return Arrays.asList(line.split(" ")).iterator();
                }
            }, Encoders.STRING());

        Dataset<Row> wordCounts = words.groupBy("value").count();

        StreamingQuery query = wordCounts.writeStream()
            .outputMode("complete")
            .format("console")
            .start();
        
        query.awaitTermination();
    }
}

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c
第二次输入:d a c
第三次输入:a b c

可以看到控制台打印如下结果:

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    c|    1|
|    b|    1|
|    a|    1|
+-----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    2|
|    b|    1|
|    a|    2|
+-----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----+-----+
|value|count|
+-----+-----+
|    d|    1|
|    c|    3|
|    b|    2|
|    a|    3|
+-----+-----+

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/140649090
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“Spark实时(三):Structured Streaming入门案例”的评论:

还没有评论