0


【愚公系列】2023年03月 MES生产制造执行系统-004.Kafka的使用

文章目录


前言

Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。

Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。

Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。

Kafka有四个核心的API:

  • The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

Kafka官网:https://kafka.apache.org/
在这里插入图片描述
Kafka中文文档:https://kafka.apachecn.org/
在这里插入图片描述

一、Kafka的使用

1.安装包

Confluent.Kafka

在这里插入图片描述

2.注入

if(AppSetting.Kafka.UseConsumer)
    builder.RegisterType<KafkaConsumer<string,string>>().As<IKafkaConsumer<string,string>>().SingleInstance();if(AppSetting.Kafka.UseProducer)
    builder.RegisterType<KafkaProducer<string,string>>().As<IKafkaProducer<string,string>>().SingleInstance();

在这里插入图片描述

3.封装

3.1 IKafkaConsumer和IKafkaProducer

1、IKafkaConsumer

publicinterfaceIKafkaConsumer<TKey, TValue>:IDisposable{/// <summary>/// 订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>voidConsume(Func<ConsumeResult<TKey, TValue>,bool> Func,string Topic);/// <summary>/// 批量订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topics">主题集合</param>voidConsumeBatch(Func<ConsumeResult<TKey, TValue>,bool> Func,List<string> Topics);/// <summary>/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>/// <returns>待消费数据</returns>List<ConsumeResult<TKey, TValue>>ConsumeOnce(string Topic,int TimeOut =300,int MaxRow =100);/// <summary>/// 单笔消费模式-单行消费/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <returns>待消费数据</returns>ConsumeResult<TKey, TValue>ConsumeOneRow(string Topic,int TimeOut =300);}

在这里插入图片描述
2、IKafkaProducer

publicinterfaceIKafkaProducer<TKey, TValue>{/// <summary>/// 生产/// </summary>/// <param name="Key"></param>/// <param name="Value"></param>/// <param name="Topic"></param>voidProduce(TKey Key,TValue Value,string Topic);/// <summary>/// 生产 异步/// </summary>/// <param name="Key"></param>/// <param name="Value"></param>/// <param name="Topic"></param>/// <returns></returns>TaskProduceAsync(TKey Key,TValue Value,string Topic);}

在这里插入图片描述

3.2 KafkaConsumer和KafkaProducer

1、KafkaConsumer

/// <summary>/// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)/// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式/// </summary>/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>publicclassKafkaConsumer<TKey, TValue>:KafkaConfig,IKafkaConsumer<TKey, TValue>{/// <summary>///  Kafka地址(包含端口号)/// </summary>publicstring Servers
    {get{return ConsumerConfig.BootstrapServers;}set{
            ConsumerConfig.BootstrapServers =value;}}/// <summary>/// 消费者群组/// </summary>publicstring GroupId
    {get{return ConsumerConfig.GroupId;}set{
            ConsumerConfig.GroupId =value;}}/// <summary>/// 自动提交 默认为 false/// </summary>publicbool EnableAutoCommit
    {get{return ConsumerConfig.EnableAutoCommit ??false;}set{
            ConsumerConfig.EnableAutoCommit =value;}}/// <summary>/// 订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>publicvoidConsume(Func<ConsumeResult<TKey, TValue>,bool> Func,string Topic){
        Task.Factory.StartNew(()=>{var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
            builder.SetValueDeserializer(newKafkaDConverter<TValue>());
            builder.SetErrorHandler((_, e)=>{
                Logger.Error(LoggerType.KafkaException,null,null,$"Error:{e.Reason}");}).SetStatisticsHandler((_, json)=>{
                Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
                Console.WriteLine($"-分配的kafka分区:{partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
                Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");});usingvar consumer = builder.Build();
            consumer.Subscribe(Topic);while(AppSetting.Kafka.IsConsumerSubscribe)//true{ConsumeResult<TKey, TValue> result =null;try{
                    result = consumer.Consume();if(result.IsPartitionEOF)continue;if(Func(result)){if(!(bool)ConsumerConfig.EnableAutoCommit){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                            consumer.Commit(result);}}}catch(ConsumeException ex){
                    Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},{ex.Error.Reason}",null, ex.Message + ex.StackTrace);}catch(Exception ex){
                    Logger.Error(LoggerType.KafkaException,$"Topic:{result.Topic}",null, ex.Message + ex.StackTrace);}}});}/// <summary>/// 批量订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>publicvoidConsumeBatch(Func<ConsumeResult<TKey, TValue>,bool> Func,List<string> Topics){
        Task.Factory.StartNew(()=>{var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
            builder.SetValueDeserializer(newKafkaDConverter<TValue>());
            builder.SetErrorHandler((_, e)=>{
                Logger.Error(LoggerType.KafkaException,null,null,$"Error:{e.Reason}");}).SetStatisticsHandler((_, json)=>{
                Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
                Console.WriteLine($"-分配的kafka分区:{partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
                Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");});usingvar consumer = builder.Build();
            consumer.Subscribe(Topics);while(AppSetting.Kafka.IsConsumerSubscribe)//true{ConsumeResult<TKey, TValue> result =null;try{
                    result = consumer.Consume();if(result.IsPartitionEOF)continue;if(Func(result)){if(!(bool)ConsumerConfig.EnableAutoCommit){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                            consumer.Commit(result);}}}catch(ConsumeException ex){
                    Logger.Error(LoggerType.KafkaException,$"Topic:{Topics.ToArray()},{ex.Error.Reason}",null, ex.Message + ex.StackTrace);}catch(Exception ex){
                    Logger.Error(LoggerType.KafkaException,$"Topic:{result.Topic}",null, ex.Message + ex.StackTrace);}}});}/// <summary>/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>/// <returns>待消费数据</returns>publicList<ConsumeResult<TKey, TValue>>ConsumeOnce(string Topic,int TimeOut =300,int MaxRow =100){var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
        builder.SetValueDeserializer(newKafkaDConverter<TValue>());usingvar consumer = builder.Build();
        consumer.Subscribe(Topic);List<ConsumeResult<TKey, TValue>> Res =newList<ConsumeResult<TKey, TValue>>();while(true){try{var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));if(result ==null)break;else{
                    Res.Add(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                    consumer.Commit();}if(Res.Count > MaxRow)break;}catch(Exception ex){
                Logger.Error(LoggerType.KafkaException,$"Topic:{Topic}",null, ex.Message + ex.StackTrace);returnnull;}}return Res;}/// <summary>/// 单笔消费模式-单行消费/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <returns>待消费数据</returns>publicConsumeResult<TKey, TValue>ConsumeOneRow(string Topic,int TimeOut =300){var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
        builder.SetValueDeserializer(newKafkaDConverter<TValue>());usingvar consumer = builder.Build();
        consumer.Subscribe(Topic);try{var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));if(result !=null){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
                consumer.Commit();}return result;}catch(Exception ex){
            Logger.Error(LoggerType.KafkaException,$"Topic:{Topic}",null, ex.Message + ex.StackTrace);returnnull;}}publicvoidDispose(){//if (_cache != null)//    _cache.Dispose();
        GC.SuppressFinalize(this);}}

在这里插入图片描述

2、KafkaProducer

/// <summary>/// 生产者 控制器或Service里面构造函数注入即可调用/// Message.Key的数据类型为string、Message.Value的数据类型为string/// </summary>/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>publicclassKafkaProducer<TKey, TValue>:KafkaConfig,IKafkaProducer<TKey, TValue>{/// <summary>/// 构造生产者/// </summary>publicKafkaProducer(){}/// <summary>///  Kafka地址(包含端口号)/// </summary>publicstring Servers
    {get{return ProducerConfig.BootstrapServers;}set{
            ProducerConfig.BootstrapServers =value;}}/// <summary>/// 生产/// </summary>/// <param name="Key">Message.Key 做消息指定分区投放有用的</param>/// <param name="Value">Message.Value</param>/// <param name="Topic">主题</param>publicvoidProduce(TKey Key,TValue Value,string Topic){var producerBuilder =newProducerBuilder<TKey, TValue>(ProducerConfig);
        producerBuilder.SetValueSerializer(newKafkaConverter<TValue>());//设置序列化方式usingvar producer = producerBuilder.Build();try{
            producer.Produce(Topic,newMessage<TKey, TValue>{
                Key = Key,
                Value = Value
            },(result)=>{if(result.Error.IsError)
                    Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}",null,$"Delivery Error:{result.Error.Reason}");});//Value = JsonConvert.SerializeObject(value)}catch(ProduceException<Null,string> ex){
            Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},Delivery failed: {ex.Error.Reason}",null, ex.Message + ex.StackTrace);}}/// <summary>/// 生产异步/// </summary>/// <param name="Key">Message.Key</param>/// <param name="Value">Message.Value</param>/// <param name="Topic">主题</param>/// <returns></returns>publicasyncTaskProduceAsync(TKey Key,TValue Value,string Topic){var producerBuilder =newProducerBuilder<TKey, TValue>(ProducerConfig);
        producerBuilder.SetValueSerializer(newKafkaConverter<TValue>());usingvar producer = producerBuilder.Build();try{var dr =await producer.ProduceAsync(Topic,newMessage<TKey, TValue>{
                Key = Key,
                Value = Value
            });//Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");}catch(ProduceException<Null,string> ex){
            Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}",null, ex.Message + ex.StackTrace);}}}

在这里插入图片描述

3.3 KafkaConfig配置类

/// <summary>/// 配置类/// </summary>publicclassKafkaConfig{/// <summary>/// 构造配置类/// </summary>protectedKafkaConfig(){
        ProducerConfig =newProducerConfig(){
            BootstrapServers = AppSetting.Kafka.ProducerSettings.BootstrapServers,// "192.168.20.241:9092",};

        ConsumerConfig =newConsumerConfig(){
            BootstrapServers = AppSetting.Kafka.ConsumerSettings.BootstrapServers,
            GroupId = AppSetting.Kafka.ConsumerSettings.GroupId,
            AutoOffsetReset = AutoOffsetReset.Earliest,//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
            EnableAutoCommit =false,//Kafka配置安全认证//SecurityProtocol = SecurityProtocol.SaslPlaintext,//SaslMechanism = SaslMechanism.Plain,//SaslUsername = AppSetting.Kafka.ConsumerSettings.SaslUsername,//SaslPassword = AppSetting.Kafka.ConsumerSettings.SaslPassword,};}/// <summary>/// 消费者配置文件/// </summary>publicConsumerConfig ConsumerConfig;/// <summary>/// 生产者配置文件/// </summary>publicProducerConfig ProducerConfig;}

在这里插入图片描述

3.4 KafkaHelper帮助类

namespaceKafkaManager{/// <summary>/// 辅助类/// </summary>publicclassKafkaHelper{/// <summary>/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)/// </summary>/// <returns></returns>publicstaticstringGetApplicationName(){try{return Assembly.GetEntryAssembly().GetName().Name;}catch{return"Kafka_Test";}}/// <summary>/// 获取服务器名称/// </summary>/// <returns></returns>publicstaticstringGetServerName(){return Dns.GetHostName();}/// <summary>/// 获取服务器IP/// </summary>/// <returns></returns>publicstaticstringGetServerIp(){IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());foreach(var ip in ips.AddressList){if(Regex.IsMatch(ip.ToString(),@"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$")){return ip.ToString();};}return"127.0.0.1";}/// <summary>  /// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级)  /// </summary>  /// <returns>long</returns>  publicstaticlongGetTimeStamp(){DateTime time = DateTime.Now;long t =(time.Ticks -621356256000000000)/10000;return t;}}#region 实现消息序列化和反序列化publicclassKafkaConverter<T>:ISerializer<T>{/// <summary>/// 序列化数据成字节/// </summary>/// <param name="data"></param>/// <param name="context"></param>/// <returns></returns>publicbyte[]Serialize(T data,SerializationContext context){var json = JsonConvert.SerializeObject(data);return Encoding.UTF8.GetBytes(json);}}publicclassKafkaDConverter<T>:IDeserializer<T>{/// <summary>/// 反序列化字节数据成实体数据/// </summary>/// <param name="data"></param>/// <param name="isNull"></param>/// <param name="context"></param>/// <returns></returns>publicTDeserialize(ReadOnlySpan<byte> data,bool isNull,SerializationContext context){if(isNull)returndefault(T);var json = Encoding.UTF8.GetString(data.ToArray());try{return JsonConvert.DeserializeObject<T>(json);}catch{returndefault(T);}}}#endregion#region 日志类/// <summary>/// 默认日志类 可自行构造使用/// </summary>publicclassKafkaLogModel{/// <summary>/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)/// </summary>publicKafkaLogModel(){
            ServerIp = KafkaHelper.GetServerIp();
            ServerName = KafkaHelper.GetServerName();
            TimeStamp = DateTime.Now;
            ApplicationName = KafkaHelper.GetApplicationName();
            ApplicationVersion ="V1.0.0";}/// <summary>/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)/// </summary>publicstring ApplicationName {get;set;}/// <summary>/// 程式版本(默认为V1.0.0)/// </summary>publicstring ApplicationVersion {get;set;}/// <summary>/// 发生时间(默认为当前时间)/// </summary>publicDateTime TimeStamp {get;set;}/// <summary>/// 开始时间/// </summary>publicDateTime BeginDate {get;set;}/// <summary>/// 结束时间/// </summary>publicDateTime EndDate {get;set;}/// <summary>/// 服务器IP(默认抓取当前服务器IP)/// </summary>publicstring ServerIp {get;set;}/// <summary>/// 服务器名称(默认抓取当前服务器名称)/// </summary>publicstring ServerName {get;set;}/// <summary>/// 客户端IP/// </summary>publicstring ClientIp {get;set;}/// <summary>/// 模块(页面路径)/// </summary>publicstring Module {get;set;}/// <summary>/// 操作人/// </summary>publicstring Operator {get;set;}/// <summary>/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义/// </summary>publicstring OperationType {get;set;}/// <summary>/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义/// </summary>publicstring Status {get;set;}/// <summary>/// 其他信息/// </summary>publicstring Message {get;set;}}#endregion}

在这里插入图片描述

4.使用

#region kafka使用if(AppSetting.Kafka.UseConsumer){usingvar scope = host.Services.CreateScope();var testConsumer = scope.ServiceProvider.GetService<IKafkaConsumer<string,string>>();
    testConsumer.Consume(res =>{
        Console.WriteLine($"recieve:{DateTime.Now.ToLongTimeString()}  value:{res.Message.Value}");bool bl = DataHandle.AlarmData(res.Message.Value);return bl;}, AppSetting.Kafka.Topics.TestTopic);}#endregion

本文转载自: https://blog.csdn.net/aa2528877987/article/details/129460943
版权归原作者 愚公搬代码 所有, 如有侵权,请联系我们删除。

“【愚公系列】2023年03月 MES生产制造执行系统-004.Kafka的使用”的评论:

还没有评论