下面根据数据流向逐一介绍 Flume -> Kafka -> StructuredStreaming -> Mysql
1. Flume
- Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
为了监测某个目录下,所有文件是否有新的行产生,因此,我们source 用 Taildir Source 见官网
Flume 1.11.0 User Guide — Apache Flume
我在node3上构建了Flume。
a1.sources =s1
a1.channels=c1
a1.sinks = k1
#define s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile=/export/server/flume/position/taildir_7mo_kafka.json
#将所有需要监控的数据源变成一个组
a1.sources.s1.filegroups =f1
#指定了f1是谁:监控目录下所有文件
a1.sources.s1.filegroups.f1 =/export/data/7mo_data/.*
#指定f1采集到的数拟的header中包含一个KV对
a1.sources.s1.headers.f1.type = 7mo
a1.sources.s1.fileHeader = true
其中 /export/data/7mo_data 以及 /export/server/flume/position/ 目录自己先创建。
我们的sink是kafka,因此Kafka sink 见官网
Flume 1.11.0 User Guide — Apache Flume
具体如下(channels 也一起附上了):
#define c1
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=1000
#define k1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic=7MO-MSG
a1.sinks.k1.kafka.bootstrap.servers= node2:9092
a1.sinks.k1.kafka.flumeBatchsize =10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms=100
#bind
a1.sources.s1.channels=c1
a1.sinks.k1.channel=c1
注意topic, servers的设置,根据实际情况改。
2. Kafka
注意是创建topic即可,我在node2上创建(记得先去一键启动zookeeper和kafka)
# 先进入kafka目录
cd /export/server/kafka
# 创建topic
bin/kafka-topics.sh --create --topic 7MO-MSG \
--partitions 3 --replication-factor 2 \
--bootstrap-server node2:9092
3. 生产数据
采用momo聊天消息。示例数据如下,采用 ‘\t’ 分开各个字段,模拟数据可在文章顶头处下载。
直接上代码,每隔1秒钟产生一条数据,数据来自data1.tsv文件。
import csv
import argparse
import time
parser = argparse.ArgumentParser(description='momo information')
parser.add_argument('--src', dest='srcFile', type=str, help='sourceFile')
parser.add_argument('--dst', dest='dstFile', type=str, help='destinationFile')
parser.add_argument('--sec', dest='second', default=1, type=str, help='the time of generate one data ')
args = parser.parse_args()
def writeToDst(row):
with open(args.dstFile, 'a') as fw:
line = '\t'.join(row) + '\n'
fw.write(line)
def main():
with open(args.srcFile, 'r', encoding='utf-8') as fr:
reader = csv.reader(fr,delimiter='\t')
for row in reader:
writeToDst(row)
time.sleep(args.second)
if __name__=="__main__":
main()
src 是data1.tsv文件路径,dst是flume要监控的文件目录下的具体文件路径,sec是多少秒采集一条记录。
4. StructuredStreaming
#!/root/anaconda3/envs/pyspark_env/bin/ python3
from pyspark.sql import SparkSession, DataFrame
import os
from pyspark.sql.functions import col, split, to_timestamp
from pyspark.sql.types import StructType, StringType, StructField
import argparse
parser = argparse.ArgumentParser(description='argparse mysql')
parser.add_argument('--user', '-us', type=str, default="root", help="user")
parser.add_argument('--password', '-p', type=str, default='123456', help='password')
parser.add_argument('--driver', '-dr', type=str, default='com.mysql.jdbc.Driver')
parser.add_argument('--dbtable', '-db', type=str, default='Momo', help="table name")
parser.add_argument('--url', '-ur', type=str, default='jdbc:mysql://node1:3306/spark?useSSL=false&useUnicode=yes&characterEncoding=utf8', help="url name")
parser.add_argument('--batchsize', '-b', type=int, default=100, help="batch size")
schema_type = StructType([
StructField("msg_time", StringType()),
StructField("sender_account", StringType()),
StructField("receiver_account", StringType()),
StructField("context", StringType())
])
def insert2mysql(df: DataFrame, batch):
prop = parser.parse_args()
print("batch:{} is start".format(batch))
# df.write.jdbc("jdbc:mysql://node1:3306/spark", 'student', 'append', prop)
df.write.format("jdbc") \
.mode("append") \
.options(user=prop.user,
password=prop.password,
driver=prop.driver,
dbtable=prop.dbtable,
url=prop.url,
batchsize=prop.batchsize
) \
.save()
def createSparkSession():
spark = SparkSession \
.builder \
.appName("StructuredKafkaWordCount") \
.config("spark.sql.shuffle.partitions", 2) \
.getOrCreate()
# config("spark.sql.shuffle.partitions",2) 可能需要在spark-submit提交才有效,但是检查点需要改
spark.sparkContext.setLogLevel('WARN')
# spark.conf.set("spark.sql.shuffle.partitions", 2)
return spark
def readFromKafka(spark: SparkSession, topicName):
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "node2:9092") \
.option("subscribe", topicName) \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load()
return lines
def process(kafkaStream):
# 获取数据
messageDF = kafkaStream.selectExpr("CAST(value AS STRING)").select(split(col("value"),"\t").alias("m"))
messageDF = messageDF.select(col("m")[0].alias("msg_time"),
col("m")[1].alias("sender_account"),
col("m")[9].alias("receiver_account"),
col("m")[19].alias("context"))
messageDF = messageDF.withColumn("msg_time", to_timestamp(col("msg_time"), "yyyy-MM-dd HH:mm:ss"))
messageDF = messageDF.selectExpr("CONCAT( CAST(msg_time AS STRING), '%', sender_account ) AS id" , "*")
# messageDF = messageDF.withColumn("msg_time", from_unixtime(messageDF.msg_time.cast("bigint"), 'yyyy-MM-dd HH:mm:ss'))
# messageDF=messageDF.select(from_json(col("value"),schema_type).alias("data")).select("data.msg_time")
return messageDF
# 测试,将数据打印到控制台
def printToConsole(streamDF):
streamDF \
.writeStream \
.outputMode("Append") \
.format("console") \
.option("numRows", "10") \
.option("truncate", "false") \
.option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
.trigger(processingTime="8 seconds") \
.start()
if __name__ == "__main__":
# 构建sparksession实例对象
spark = createSparkSession()
# 从kafka实时消费数据
topicName = "7MO-MSG"
kafkaStream = readFromKafka(spark, topicName)
# 提取数据,转换数据类型
stream = process(kafkaStream)
# 打印输出到控制台
# printToConsole(stream)
# 保存数据至mysql
stream.writeStream \
.outputMode("append") \
.foreachBatch(insert2mysql) \
.trigger(processingTime="8 seconds") \
.start()
spark.streams.awaitAnyTermination()
自行看即可,要注意,pyspark中structured Streaming的流转成rdd或者dataset有问题(scala的简单的多,没办法学校主要走的是python),需要直接在流上进行各种转换的操作方可。
5. Mysql
创建好数据库和表就行了。我的是mysql5,主要数据库和表是utf8格式的,要不中文显示会出问题。
create table Momo(
id VARCHAR(30),
msg_time TIMESTAMP,
sender_account VARCHAR(20),
receiver_account VARCHAR(20),
context VARCHAR(500)
)
6. 启动程序
启动好zookeeper和kafka
启动flmue,
我在node3启动
/export/server/flume/bin/flume-ng agent -n a1 -c /export/server/flume/conf/ -f /export/server/flume/conf/7mo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动kafka消费者
我在node2启动
/export/server/kafka/bin/kafka-console-consumer.sh --topic 7MO-MSG --bootstrap-server node2:9092
启动spark程序
我在node1启动
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2 ch07_flume_kafka_spark_consumer.py
查看数据库内容
数据库在node1了,我在windows用navicat查看数据。
能整个流程跑通,还是挺不错的。后期就可以在此基础上,继续增加各种功能了。加油吧,少年!
整体参考自B站视频:黑马程序员大数据数据湖架构Hudi视频教程。主要代码逻辑改为pyspark了而已。02--实战案例技术架构--Flume+Kafka+StructuredStreaming+Hudi+Hive+MySQL_哔哩哔哩_bilibili
版权归原作者 pzy0668 所有, 如有侵权,请联系我们删除。