.net core 发送消息到kafka,Doris进行消费,
这里我只是描述一下,最简单的模式将消息发送到kafka后用Doris进行消费的整个实现过程。
先根据kafka官方文档提供的生产方法实现发送消息到kafka
下面展示一些 .net 代码
publicclassKafkaProducer:IKafkaProducer,IDisposable{privatereadonlyIProducer<string,string> _producer;privatereadonlyKafkaOptions _options;privatereadonlyILogger<KafkaProducer> _logger;publicKafkaProducer(IOptions<KafkaOptions> options,ILogger<KafkaProducer> logger,IProducer<string,string> producer){
_options = options.Value;
_logger = logger;
_producer = producer;}publicasyncTaskProduceAsync(string topic,string key,stringvalue){try{var result =await _producer.ProduceAsync(topic,newMessage<string,string>{ Key = key, Value =value});
_logger.LogInformation($"Kafka:Delivered '{result.Value}' to '{result.TopicPartitionOffset}'");}catch(ProduceException<string,string> e){
_logger.LogError($"Kafka发送消息失败: {e.Error.Reason}");}}publicasyncTaskProduceAsync(string key,stringvalue){awaitProduceAsync(_options.DefaultTopic, key,value);}publicvoidDispose(){
_producer?.Dispose();}}
在启动项目中注册kafka服务
/// <summary>/// 注入kafka服务/// </summary>/// <param name="services"></param>/// <param name="configuration"></param>/// <returns></returns>publicstaticIServiceCollectionAddKafkaService(thisIServiceCollection services,IConfiguration configuration){
services.Configure<KafkaOptions>(configuration.GetSection("Kafka"));
services.AddSingleton(provider =>{var options = provider.GetRequiredService<IOptions<KafkaOptions>>().Value;var config =newProducerConfig{ BootstrapServers = options.BootstrapServers };returnnewProducerBuilder<string,string>(config).Build();});
services.AddSingleton<IKafkaProducer, KafkaProducer>();return services;}publicoverridevoidConfigureServices(ServiceConfigurationContext context){var configuration = context.Services.GetConfiguration();
context.Services.AddKafkaService(configuration);}
发送消息
await _kafkaProducer.ProduceAsync("login-service-log","role1","zhqTest哟");
保证kafka服务中有对应topic的消息后,我们来到Doris执行会话中执行以下代码创建Doris表:
- 系统日志建表语句
CREATETABLEIFNOTEXISTS biz_log_db.ods_example_service_log (`log_timestamp`BIGINTCOMMENT"记录日志时的时间(时间戳)",`business`VARCHAR(50)COMMENT"业务系统",`service`VARCHAR(50)COMMENT"业务服务",`host_ip`VARCHAR(20)COMMENT"主机 IP",`level`VARCHAR(20)COMMENT"日志级别",`logger_name`VARCHAR(500)COMMENT"日志名",`message`VARCHAR(65532)COMMENT"日志信息",`request_path`VARCHAR(500)COMMENT"请求URL",`request_parameter`VARCHAR(65532)COMMENT"请求参数",`request_method`VARCHAR(10)COMMENT"请求方式",`request_header`VARCHAR(65532)COMMENT"请求头",`response_content`VARCHAR(65532)COMMENT"响应内容",`status_code`INTCOMMENT"状态码",`request_response_time`INTCOMMENT"请求响应时长",`exception`VARCHAR(65532)COMMENT"异常信息",`log_time`DATETIMECOMMENT"记录日志时的时间(年月日时分秒)",INDEX idx_name_message(message)USING INVERTED PROPERTIES("parser"="chinese")COMMENT'日志信息 倒排索引',INDEX idx_name_exception(exception)USING INVERTED PROPERTIES("parser"="english")COMMENT'异常信息 倒排索引')ENGINE= olap DuplicateKEY(`log_timestamp`)DISTRIBUTEDBYHASH(`log_timestamp`) BUCKETS 16 PROPERTIES("replication_num"="1");
表建完成了后,就创建Doris任务,
Kafka日志写入Doris
- 实现方式 - 采用 Routine Load 官方文档:https://doris.apache.org/zh-CN/docs/dev/data-operate/import/routine-load-manual- 采用 SparkStreaming实时消费Kafka后写入Doris,在日志记录场景(只写不删,不覆盖,不聚合…),我们使们前者;- Doris 可以通过 Routine Load 导入方式持续消费 Kafka Topic 中的数据。在提交 Routine Load 作业后,Doris 会持续运行该导入作业,实时生成导入任务不断消费 Kakfa 集群中指定 Topic 中的消息。
- 我们的日志数据由log4net推入Kafka后是一串JSON格式,大致如下格式
{"partition":0,"offset":49,"msg":"{\"log_timestamp\":1715752796717,\"business\":\"example.business.project\",\"service\":\"example.service\",\"host_ip\":\"YFB-CTO\",\"level\":\"INFO\",\"logger_name\":\"Web.Controllers.RoleController\",\"message\":\"########### \\u8FD9\\u662F\\u624B\\u52A8\\u8F93\\u51FA\\u7684\\u4E2D\\u6587\\u65E5\\u5FD7\\u4FE1\\u606F ###########\",\"request_path\":null,\"request_parameter\":null,\"request_method\":null,\"request_header\":null,\"status_code\":null,\"request_response_time\":0,\"exception\":null}","timespan":1715752796716,"date":"2024-05-15 13:59:56"}
- 创建 Routine Load 导入作业 - 语法
CREATEROUTINELOAD[db.]job_name [ON tbl_name][merge_type][load_properties][job_properties]FROM data_source [data_source_properties][COMMENT"comment"]
- 在 Doris 中,使用 CREATE ROUTINE LOAD 命令,创建导入作业CREATEROUTINELOAD biz_log_db.ods_example_service_log_routine_load_json ON ods_example_service_log COLUMNS(log_timestamp,business,service,host_ip,level,logger_name,message,request_path,request_parameter,request_method,request_header,response_content,status_code,request_response_time,exception,log_time=from_millisecond(log_timestamp)) PROPERTIES("desired_concurrent_number"="3","max_batch_interval"="20","max_batch_rows"="300000","max_batch_size"="209715200","strict_mode"="false","format"="json","jsonpaths"="[\"$.log_timestamp\",\"$.business\",\"$.service\",\"$.host_ip\",\"$.level\",\"$.logger_name\",\"$.message\",\"$.request_path\",\"$.request_parameter\",\"$.request_method\",\"$.request_header\",\"$.response_content\",\"$.status_code\",\"$.request_response_time\",\"$.exception\"]")FROM KAFKA("kafka_broker_list"="192.168.2.111:9092,192.168.2.184:9092,192.168.2.156:9092","kafka_topic"="example-service-log","property.kafka_default_offsets"="OFFSET_BEGINNING","property.deserializer.encoding"="UTF-8")COMMENT"示例服务日志记录同步 KAFKA ROUTINE LOAD 配置";
- 查看导入运行作业 - 创建 routine load 任务之后,可以通过show routine load命令查看运行状态的例行任务,如果在show routine load中没有找到对应的例行任务,则可能因为例行任务失败或者错误数过多被停止或者暂停,使用show all routine load查看所有状态的例行任务。
- 暂停导入作业 - https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/PAUSE-ROUTINE-LOAD
- 恢复(重启)导入作业 - https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/RESUME-ROUTINE-LOAD
- 修改导入作业 - https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/ALTER-ROUTINE-LOAD
- 取消(停止)导入作业 - https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/STOP-ROUTINE-LOAD
以上Doris导入任务都是从官方文档中进行了一下总结实现的功能。
以上我只是用了基本的发送消息,都需要手动发送消息,你们可以用log4net的功能参照上面实现自动日志记录到kafka上面。
后续会在Doris基础上面用superset进行可视化的查询,进行大数据分析,有空会持续更新Doris相关的功能。
版权归原作者 ZhaoHuaQiaoMagic 所有, 如有侵权,请联系我们删除。