文章目录
前言
Kafka是一个分布式流处理平台,主要用于处理实时数据流。它可以用于日志收集、数据流处理、消息队列等场景。在大数据处理、实时数据分析等领域,Kafka被广泛应用。
Kafka的主要功能包括消息发布和订阅、消息存储和消息处理。
Kafka的概念包括生产者、消费者、主题、分区、偏移量等。生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息在分区中的位置。
Kafka有四个核心的API:
- The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
- The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
- The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
- The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
Kafka官网:https://kafka.apache.org/
Kafka中文文档:https://kafka.apachecn.org/
一、Kafka的使用
1.安装包
Confluent.Kafka
2.注入
if(AppSetting.Kafka.UseConsumer)
builder.RegisterType<KafkaConsumer<string,string>>().As<IKafkaConsumer<string,string>>().SingleInstance();if(AppSetting.Kafka.UseProducer)
builder.RegisterType<KafkaProducer<string,string>>().As<IKafkaProducer<string,string>>().SingleInstance();
3.封装
3.1 IKafkaConsumer和IKafkaProducer
1、IKafkaConsumer
publicinterfaceIKafkaConsumer<TKey, TValue>:IDisposable{/// <summary>/// 订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>voidConsume(Func<ConsumeResult<TKey, TValue>,bool> Func,string Topic);/// <summary>/// 批量订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topics">主题集合</param>voidConsumeBatch(Func<ConsumeResult<TKey, TValue>,bool> Func,List<string> Topics);/// <summary>/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>/// <returns>待消费数据</returns>List<ConsumeResult<TKey, TValue>>ConsumeOnce(string Topic,int TimeOut =300,int MaxRow =100);/// <summary>/// 单笔消费模式-单行消费/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <returns>待消费数据</returns>ConsumeResult<TKey, TValue>ConsumeOneRow(string Topic,int TimeOut =300);}
2、IKafkaProducer
publicinterfaceIKafkaProducer<TKey, TValue>{/// <summary>/// 生产/// </summary>/// <param name="Key"></param>/// <param name="Value"></param>/// <param name="Topic"></param>voidProduce(TKey Key,TValue Value,string Topic);/// <summary>/// 生产 异步/// </summary>/// <param name="Key"></param>/// <param name="Value"></param>/// <param name="Topic"></param>/// <returns></returns>TaskProduceAsync(TKey Key,TValue Value,string Topic);}
3.2 KafkaConsumer和KafkaProducer
1、KafkaConsumer
/// <summary>/// 消费者 (Message.Key的数据类型为string、Message.Value的数据类型为string)/// 消费者实现三种消费方式:1.订阅回调模式 2.批量消费模式 3.单笔消费模式/// </summary>/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>publicclassKafkaConsumer<TKey, TValue>:KafkaConfig,IKafkaConsumer<TKey, TValue>{/// <summary>/// Kafka地址(包含端口号)/// </summary>publicstring Servers
{get{return ConsumerConfig.BootstrapServers;}set{
ConsumerConfig.BootstrapServers =value;}}/// <summary>/// 消费者群组/// </summary>publicstring GroupId
{get{return ConsumerConfig.GroupId;}set{
ConsumerConfig.GroupId =value;}}/// <summary>/// 自动提交 默认为 false/// </summary>publicbool EnableAutoCommit
{get{return ConsumerConfig.EnableAutoCommit ??false;}set{
ConsumerConfig.EnableAutoCommit =value;}}/// <summary>/// 订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>publicvoidConsume(Func<ConsumeResult<TKey, TValue>,bool> Func,string Topic){
Task.Factory.StartNew(()=>{var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
builder.SetValueDeserializer(newKafkaDConverter<TValue>());
builder.SetErrorHandler((_, e)=>{
Logger.Error(LoggerType.KafkaException,null,null,$"Error:{e.Reason}");}).SetStatisticsHandler((_, json)=>{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");});usingvar consumer = builder.Build();
consumer.Subscribe(Topic);while(AppSetting.Kafka.IsConsumerSubscribe)//true{ConsumeResult<TKey, TValue> result =null;try{
result = consumer.Consume();if(result.IsPartitionEOF)continue;if(Func(result)){if(!(bool)ConsumerConfig.EnableAutoCommit){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);}}}catch(ConsumeException ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},{ex.Error.Reason}",null, ex.Message + ex.StackTrace);}catch(Exception ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{result.Topic}",null, ex.Message + ex.StackTrace);}}});}/// <summary>/// 批量订阅回调模式-消费(持续订阅)/// </summary>/// <param name="Func">回调函数,若配置为非自动提交(默认为否),则通过回调函数的返回值判断是否提交</param>/// <param name="Topic">主题</param>publicvoidConsumeBatch(Func<ConsumeResult<TKey, TValue>,bool> Func,List<string> Topics){
Task.Factory.StartNew(()=>{var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
builder.SetValueDeserializer(newKafkaDConverter<TValue>());
builder.SetErrorHandler((_, e)=>{
Logger.Error(LoggerType.KafkaException,null,null,$"Error:{e.Reason}");}).SetStatisticsHandler((_, json)=>{
Console.WriteLine($"-{DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
Console.WriteLine($"-分配的kafka分区:{partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions)=>{string partitionsStr =string.Join(", ", partitions);
Console.WriteLine($"-回收了kafka的分区:{partitionsStr}");});usingvar consumer = builder.Build();
consumer.Subscribe(Topics);while(AppSetting.Kafka.IsConsumerSubscribe)//true{ConsumeResult<TKey, TValue> result =null;try{
result = consumer.Consume();if(result.IsPartitionEOF)continue;if(Func(result)){if(!(bool)ConsumerConfig.EnableAutoCommit){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit(result);}}}catch(ConsumeException ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topics.ToArray()},{ex.Error.Reason}",null, ex.Message + ex.StackTrace);}catch(Exception ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{result.Topic}",null, ex.Message + ex.StackTrace);}}});}/// <summary>/// 批量消费模式-单次消费(消费出当前Kafka缓存的所有数据,并持续监听 300ms,如无新数据生产,则返回(最多一次消费 100条)/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <param name="MaxRow">最多单次消费行数 默认值:100行</param>/// <returns>待消费数据</returns>publicList<ConsumeResult<TKey, TValue>>ConsumeOnce(string Topic,int TimeOut =300,int MaxRow =100){var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
builder.SetValueDeserializer(newKafkaDConverter<TValue>());usingvar consumer = builder.Build();
consumer.Subscribe(Topic);List<ConsumeResult<TKey, TValue>> Res =newList<ConsumeResult<TKey, TValue>>();while(true){try{var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));if(result ==null)break;else{
Res.Add(result);//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();}if(Res.Count > MaxRow)break;}catch(Exception ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic}",null, ex.Message + ex.StackTrace);returnnull;}}return Res;}/// <summary>/// 单笔消费模式-单行消费/// </summary>/// <param name="Topic">主题</param>/// <param name="TimeOut">持续监听时间,单位ms 默认值:300ms</param>/// <returns>待消费数据</returns>publicConsumeResult<TKey, TValue>ConsumeOneRow(string Topic,int TimeOut =300){var builder =newConsumerBuilder<TKey, TValue>(ConsumerConfig);//设置反序列化方式
builder.SetValueDeserializer(newKafkaDConverter<TValue>());usingvar consumer = builder.Build();
consumer.Subscribe(Topic);try{var result = consumer.Consume(TimeSpan.FromMilliseconds(TimeOut));if(result !=null){//手动提交,如果上面的EnableAutoCommit=true表示自动提交,则无需调用Commit方法
consumer.Commit();}return result;}catch(Exception ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic}",null, ex.Message + ex.StackTrace);returnnull;}}publicvoidDispose(){//if (_cache != null)// _cache.Dispose();
GC.SuppressFinalize(this);}}
2、KafkaProducer
/// <summary>/// 生产者 控制器或Service里面构造函数注入即可调用/// Message.Key的数据类型为string、Message.Value的数据类型为string/// </summary>/// <typeparam name="TKey">Message.Key 的数据类型</typeparam>/// <typeparam name="TValue">Message.Value 的数据类型</typeparam>publicclassKafkaProducer<TKey, TValue>:KafkaConfig,IKafkaProducer<TKey, TValue>{/// <summary>/// 构造生产者/// </summary>publicKafkaProducer(){}/// <summary>/// Kafka地址(包含端口号)/// </summary>publicstring Servers
{get{return ProducerConfig.BootstrapServers;}set{
ProducerConfig.BootstrapServers =value;}}/// <summary>/// 生产/// </summary>/// <param name="Key">Message.Key 做消息指定分区投放有用的</param>/// <param name="Value">Message.Value</param>/// <param name="Topic">主题</param>publicvoidProduce(TKey Key,TValue Value,string Topic){var producerBuilder =newProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(newKafkaConverter<TValue>());//设置序列化方式usingvar producer = producerBuilder.Build();try{
producer.Produce(Topic,newMessage<TKey, TValue>{
Key = Key,
Value = Value
},(result)=>{if(result.Error.IsError)
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()}",null,$"Delivery Error:{result.Error.Reason}");});//Value = JsonConvert.SerializeObject(value)}catch(ProduceException<Null,string> ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},Delivery failed: {ex.Error.Reason}",null, ex.Message + ex.StackTrace);}}/// <summary>/// 生产异步/// </summary>/// <param name="Key">Message.Key</param>/// <param name="Value">Message.Value</param>/// <param name="Topic">主题</param>/// <returns></returns>publicasyncTaskProduceAsync(TKey Key,TValue Value,string Topic){var producerBuilder =newProducerBuilder<TKey, TValue>(ProducerConfig);
producerBuilder.SetValueSerializer(newKafkaConverter<TValue>());usingvar producer = producerBuilder.Build();try{var dr =await producer.ProduceAsync(Topic,newMessage<TKey, TValue>{
Key = Key,
Value = Value
});//Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");}catch(ProduceException<Null,string> ex){
Logger.Error(LoggerType.KafkaException,$"Topic:{Topic},ServerIp:{KafkaHelper.GetServerIp()},ServerName:{KafkaHelper.GetServerName()},Delivery failed: {ex.Error.Reason}",null, ex.Message + ex.StackTrace);}}}
3.3 KafkaConfig配置类
/// <summary>/// 配置类/// </summary>publicclassKafkaConfig{/// <summary>/// 构造配置类/// </summary>protectedKafkaConfig(){
ProducerConfig =newProducerConfig(){
BootstrapServers = AppSetting.Kafka.ProducerSettings.BootstrapServers,// "192.168.20.241:9092",};
ConsumerConfig =newConsumerConfig(){
BootstrapServers = AppSetting.Kafka.ConsumerSettings.BootstrapServers,
GroupId = AppSetting.Kafka.ConsumerSettings.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,//当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
EnableAutoCommit =false,//Kafka配置安全认证//SecurityProtocol = SecurityProtocol.SaslPlaintext,//SaslMechanism = SaslMechanism.Plain,//SaslUsername = AppSetting.Kafka.ConsumerSettings.SaslUsername,//SaslPassword = AppSetting.Kafka.ConsumerSettings.SaslPassword,};}/// <summary>/// 消费者配置文件/// </summary>publicConsumerConfig ConsumerConfig;/// <summary>/// 生产者配置文件/// </summary>publicProducerConfig ProducerConfig;}
3.4 KafkaHelper帮助类
namespaceKafkaManager{/// <summary>/// 辅助类/// </summary>publicclassKafkaHelper{/// <summary>/// 获取当前应用程式名称(仅控制台应用程序和Windows应用程序可用)/// </summary>/// <returns></returns>publicstaticstringGetApplicationName(){try{return Assembly.GetEntryAssembly().GetName().Name;}catch{return"Kafka_Test";}}/// <summary>/// 获取服务器名称/// </summary>/// <returns></returns>publicstaticstringGetServerName(){return Dns.GetHostName();}/// <summary>/// 获取服务器IP/// </summary>/// <returns></returns>publicstaticstringGetServerIp(){IPHostEntry ips = Dns.GetHostEntry(Dns.GetHostName());foreach(var ip in ips.AddressList){if(Regex.IsMatch(ip.ToString(),@"^10\.((25[0-5]|2[0-4]\d|1\d{2}|\d?\d)\.){2}(25[0-5]|2[0-4]\d|1\d{2}|\d?\d)$")){return ip.ToString();};}return"127.0.0.1";}/// <summary> /// 将c# DateTime时间格式转换为Unix时间戳格式(毫秒级) /// </summary> /// <returns>long</returns> publicstaticlongGetTimeStamp(){DateTime time = DateTime.Now;long t =(time.Ticks -621356256000000000)/10000;return t;}}#region 实现消息序列化和反序列化publicclassKafkaConverter<T>:ISerializer<T>{/// <summary>/// 序列化数据成字节/// </summary>/// <param name="data"></param>/// <param name="context"></param>/// <returns></returns>publicbyte[]Serialize(T data,SerializationContext context){var json = JsonConvert.SerializeObject(data);return Encoding.UTF8.GetBytes(json);}}publicclassKafkaDConverter<T>:IDeserializer<T>{/// <summary>/// 反序列化字节数据成实体数据/// </summary>/// <param name="data"></param>/// <param name="isNull"></param>/// <param name="context"></param>/// <returns></returns>publicTDeserialize(ReadOnlySpan<byte> data,bool isNull,SerializationContext context){if(isNull)returndefault(T);var json = Encoding.UTF8.GetString(data.ToArray());try{return JsonConvert.DeserializeObject<T>(json);}catch{returndefault(T);}}}#endregion#region 日志类/// <summary>/// 默认日志类 可自行构造使用/// </summary>publicclassKafkaLogModel{/// <summary>/// 构造默认日志类(设置默认值 ServerIp,ServerName,TimeStamp,ApplicationVersion)/// </summary>publicKafkaLogModel(){
ServerIp = KafkaHelper.GetServerIp();
ServerName = KafkaHelper.GetServerName();
TimeStamp = DateTime.Now;
ApplicationName = KafkaHelper.GetApplicationName();
ApplicationVersion ="V1.0.0";}/// <summary>/// 程式名称(默认获取当前程式名称,Web应用 默认为 ISD_Kafka)/// </summary>publicstring ApplicationName {get;set;}/// <summary>/// 程式版本(默认为V1.0.0)/// </summary>publicstring ApplicationVersion {get;set;}/// <summary>/// 发生时间(默认为当前时间)/// </summary>publicDateTime TimeStamp {get;set;}/// <summary>/// 开始时间/// </summary>publicDateTime BeginDate {get;set;}/// <summary>/// 结束时间/// </summary>publicDateTime EndDate {get;set;}/// <summary>/// 服务器IP(默认抓取当前服务器IP)/// </summary>publicstring ServerIp {get;set;}/// <summary>/// 服务器名称(默认抓取当前服务器名称)/// </summary>publicstring ServerName {get;set;}/// <summary>/// 客户端IP/// </summary>publicstring ClientIp {get;set;}/// <summary>/// 模块(页面路径)/// </summary>publicstring Module {get;set;}/// <summary>/// 操作人/// </summary>publicstring Operator {get;set;}/// <summary>/// 操作类型 如:Query,Add,Update,Delete,Export等,可自定义/// </summary>publicstring OperationType {get;set;}/// <summary>/// 操作状态 如:http请求使用200,404,503等,其他操作 1:成功,0失败等 可自定义/// </summary>publicstring Status {get;set;}/// <summary>/// 其他信息/// </summary>publicstring Message {get;set;}}#endregion}
4.使用
#region kafka使用if(AppSetting.Kafka.UseConsumer){usingvar scope = host.Services.CreateScope();var testConsumer = scope.ServiceProvider.GetService<IKafkaConsumer<string,string>>();
testConsumer.Consume(res =>{
Console.WriteLine($"recieve:{DateTime.Now.ToLongTimeString()} value:{res.Message.Value}");bool bl = DataHandle.AlarmData(res.Message.Value);return bl;}, AppSetting.Kafka.Topics.TestTopic);}#endregion
版权归原作者 愚公搬代码 所有, 如有侵权,请联系我们删除。