0


Apache Nifi挂接MQTT与Kafka实践

1. 说明:

  在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。

2. 方案设计:

- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)

- 架构图
2.1 资源配置:

简单起见,在docker环境中实施,后续迁移到K8s
服务集群服务入口备注MQTT (tcp|mqtt)://host001.dev.ia:1883
client id:

nifi-xio1-sub1 订阅者

nifi-xio1-pub1 发布者
Kafkahost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092Apache Nifihttp://host001.dev.ia:9080/nifi/
Nifi的docker配置

# 建个卷,持久化数据
docker volume create nifi_data

docker-compose.yml

version: "3.7"
services:
  nifi:
    image: apache/nifi:1.9.2
    container_name: nifi
    restart: always
    ports:
      - "9080:8080"
    environment:
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      #- NIFI_HOME=/home/nifi
      #- NIFI_LOG_DIR=/home/nifi/logs

    volumes:
      - nifi_data:/home/nifi

volumes:
  nifi_data:
    external: true
2.2 交互Topics:

Topic类型备注ia001.device.busKafka 主题, 不允许有“/”消息总线/ia001/device/#Emqx 主题测试通配符topic/ia001/device/mqttjs_111Emqx 主题测试带设备id的topic

3. 实现步骤

3.1 Nifi 桌面

配好后,访问​http://host001.dev.ia:9080/nifi/​, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。

3.2 MqttToKafka
3.2.1 配置

加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT
Settings备注NameConsumeMQTTAutomatically terminate relationships
failure / success 勾选
Properties备注NameConsumeMQTTBroker URItcp://host001.dev.ia:1883Client IDnifi-xio1-sub1Username/Password--Topic Filter/ia001/device/#Max Queue Size1000
加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0
Properties备注NamePublishKafka_2_0Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092Security ProtocolPLAINTEXTTopic Nameia001.device.busDelivery Guarantee
Guarantee Replicated Delivery
Use Transactionstrue
拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.2.2 测试

说明:

  1. 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
  2. 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包

python脚本:

from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import json

async def consume_loop(consumer, topics):
    try:
        # 订阅主题
        consumer.subscribe(topics)

        while True:
            # 轮询消息
            msg = consumer.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print(
                        "%% %s [%d] reached end at offset %d\n"
                        % (msg.topic(), msg.partition(), msg.offset())
                    )
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # 正常消息
                raw_message = msg.value()
                print(f"Raw message: {raw_message}")
                parsed_message = json.loads(raw_message.decode("utf-8"))
                print(f"Received message: {type(parsed_message)} : {parsed_message}")
            await asyncio.sleep(0.01)  # 小睡片刻,让出控制权
    finally:
        # 关闭消费者
        consumer.close()

async def consume():
    # 消费者配置
    conf = {
        "bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",
        "group.id": "mygroup1",
        "auto.offset.reset": "earliest",
    }

    # 创建消费者
    consumer = Consumer(conf)
    await consume_loop(consumer, ["ia001.device.bus"])

if __name__ == "__main__":
    asyncio.run(consume())
3.2.3 结果

脚本 Nifi

3.3 KafkaToMqtt
3.3.1 配置

加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT
Settings备注NameConsumeKafka_2_0Properties备注Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092Topic Name(s)test.topic.bus / test.device.*Group IDtest
加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT
Settings备注NamePublishMQTTAutomatically terminate relationships
failure / success 勾选
Properties备注Broker URItcp://host001.dev.ia:1883Client IDnifi-xio1-pub1Username/Password--Topic Filtertest.topic.busQoS0
拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.3.1 测试

说明:

  1. python脚本向Kafka发布消息到 topic = test.topic.bus
  2. MqttX客户端订阅接收

脚本

from confluent_kafka import Producer
import json

def delivery_report(err, msg):
    """Called once for each message produced to indicate delivery result.
    Triggered by poll() or flush()."""
    if err is not None:
        print(f"Message delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} [{msg.partition()}]")

def create_async_producer(config):
    """Creates an instance of an asynchronous Kafka producer."""
    return Producer(config)

def produce_messages(producer, topic, messages):
    """Asynchronously produces messages to a Kafka topic."""
    for message in messages:
        # Trigger any available delivery report callbacks from previous produce() calls
        producer.poll(0)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        producer.produce(
            topic, json.dumps(message).encode("utf-8"), callback=delivery_report
        )

    # Wait for any outstanding messages to be delivered and delivery report
    # callbacks to be triggered.
    producer.flush()

if __name__ == "__main__":
    # Kafka configuration
    # Replace these with your server's configuration
    conf = {
        "bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",  # Replace with your Kafka server addresses
        # "client.id": "python-producer",
    }

    # Create an asynchronous Kafka producer
    async_producer = create_async_producer(conf)

    # Messages to send to Kafka
    messages_to_send = [{"key": "value1a"}, {"key": "value2a"}, {"key": "value3a"}]

    # Produce messages
    # produce_messages(async_producer, "test.topic.bus", messages_to_send)
    produce_messages(async_producer, "test.device.mw3039kkj001", messages_to_send)
3.3.1 结果

MqttX

Nifi

4. 总结:

** Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。**

4.1 知识点
Nifi Kafka Processor 配置字典:

Delivery Guarantee
数据传递保证

  1. Best Effort (尽力交付,相当于ack=0)
  2. Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):
  3. Guarantee Replicated Delivery(保证复制交付,相当于ack=-1)
    Use Transactions

**使用事务 **

true / false

Topic通配符:

“/”
主题层级分隔符

如果存在分隔符,它将主题名分割为多个主题层级。

如:room401/tv/contrl/sensor

“#”多层通配符
匹配主题中任意层级的通配符

如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息

china/guangzhou china/guangzhou/huangpu china/guangzhou/tianhe/zhongshanlu china/guangzhou/tianhe/zhongshanlu/num123

school/# //也匹配单独的 “school” ,因为 # 包括它的父级。

//是有效的,会收到所有的应用消息。

school/teacher/# //有效的。
school/teacher# //无效的。
school/teacher/#/lever //无效的,必须是主题过滤器的最后一个字符
https://blog.51cto.com/u_16099203/10959511

“+”单层通配符
单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

china/+ 只能匹配 china/guangzhou

china/+/+/zhongshanlu 能匹配china/guangzhou/tianhe/zhongshanlu和china/shenzhen/nanshan/zhongshanlu

“$”匹配一个字符$xx
/$xx
/xx$

5. 参考:

标签: Nifi 大数据 kafka

本文转载自: https://blog.csdn.net/bennybi/article/details/140543068
版权归原作者 bennybi 所有, 如有侵权,请联系我们删除。

“Apache Nifi挂接MQTT与Kafka实践”的评论:

还没有评论