kafka入门以及与spring整合
Message.java
importjava.util.Date;publicclassMessage{privateint id;privateint fromId;privateint toId;privateString conversationId;privateString content;privateint status;privateDate createTime;publicintgetId(){return id;}publicvoidsetId(int id){this.id = id;}publicintgetFromId(){return fromId;}publicvoidsetFromId(int fromId){this.fromId = fromId;}publicintgetToId(){return toId;}publicvoidsetToId(int toId){this.toId = toId;}publicStringgetConversationId(){return conversationId;}publicvoidsetConversationId(String conversationId){this.conversationId = conversationId;}publicStringgetContent(){return content;}publicvoidsetContent(String content){this.content = content;}publicintgetStatus(){return status;}publicvoidsetStatus(int status){this.status = status;}publicDategetCreateTime(){return createTime;}publicvoidsetCreateTime(Date createTime){this.createTime = createTime;}@OverridepublicStringtoString(){return"Message{"+"id="+ id +", fromId="+ fromId +", toId="+ toId +", conversationId='"+ conversationId +'\''+", content='"+ content +'\''+", status="+ status +", createTime="+ createTime +'}';}}
EventConsumer.java
定义事件消费者
importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.DiscussPost;importedu.npu.newcoder.community.community.entity.Event;importedu.npu.newcoder.community.community.entity.Message;importedu.npu.newcoder.community.community.service.DiscussPostService;importedu.npu.newcoder.community.community.service.ElasticsearchService;importedu.npu.newcoder.community.community.service.MessageService;importedu.npu.newcoder.community.community.util.CommunityConstant;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;@ComponentpublicclassEventConsumerimplementsCommunityConstant{// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@AutowiredprivateMessageService messageService;@AutowiredprivateDiscussPostService discussPostService;@AutowiredprivateElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics ={TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})publicvoidhandleCommentMessage(ConsumerRecord record){if(record ==null|| record.value()==null){System.out.println("错误发帖");return;}Event event=JSONObject.parseObject(record.value().toString(),Event.class);if(event ==null){System.out.println("错误发帖");return;}//发送站内通知Message message =newMessage();
message.setFromId(SYSTEM_USERID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(newDate());//message的内容Map<String,Object> content=newHashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());}}
message.setContent(JSONObject.toJSONString(content));System.out.println(message);
messageService.addMessage(message);System.out.println("成功处理事件");}}
Event.java
定义一个事件实体 以方便在消息的发送与处理
importjava.util.HashMap;importjava.util.Map;//用于事件驱动的kafka消息队列开发publicclassEvent{privateString topic;//事件触发的人privateint userId;//事件发生在哪个实体privateint entityType;privateint entityId;//实体作者privateint entityUserId;//存储额外数据privateMap<String,Object> data =newHashMap<>();publicStringgetTopic(){return topic;}publicEventsetTopic(String topic){this.topic = topic;returnthis;}publicintgetUserId(){return userId;}publicEventsetUserId(int userId){this.userId = userId;returnthis;}publicintgetEntityType(){return entityType;}publicEventsetEntityType(int entityType){this.entityType = entityType;returnthis;}publicintgetEntityId(){return entityId;}publicEventsetEntityId(int entityId){this.entityId = entityId;returnthis;}publicintgetEntityUserId(){return entityUserId;}publicEventsetEntityUserId(int entityUserId){this.entityUserId = entityUserId;returnthis;}publicMap<String,Object>getData(){return data;}publicEventsetData(String key,Object value){this.data.put(key,value);returnthis;}}
EventProducer.java
定义事件的生产者
importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.Event;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@ComponentpublicclassEventProducer{//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件publicvoidfireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送
kafkaTemplate.send(event.getTopic(),JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}}
EventConsumer.java
定义事件消费者
importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.DiscussPost;importedu.npu.newcoder.community.community.entity.Event;importedu.npu.newcoder.community.community.entity.Message;importedu.npu.newcoder.community.community.service.DiscussPostService;importedu.npu.newcoder.community.community.service.ElasticsearchService;importedu.npu.newcoder.community.community.service.MessageService;importedu.npu.newcoder.community.community.util.CommunityConstant;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;@ComponentpublicclassEventConsumerimplementsCommunityConstant{// private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@AutowiredprivateMessageService messageService;@AutowiredprivateDiscussPostService discussPostService;@AutowiredprivateElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics ={TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})publicvoidhandleCommentMessage(ConsumerRecord record){if(record ==null|| record.value()==null){System.out.println("错误发帖");return;}Event event=JSONObject.parseObject(record.value().toString(),Event.class);if(event ==null){System.out.println("错误发帖");return;}//发送站内通知Message message =newMessage();
message.setFromId(SYSTEM_USERID);
message.setToId(event.getEntityUserId());
message.setConversationId(event.getTopic());
message.setCreateTime(newDate());//message的内容Map<String,Object> content=newHashMap<>();
content.put("userId",event.getUserId());
content.put("entityType",event.getEntityType());
content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){
content.put(entry.getKey(),entry.getValue());}}
message.setContent(JSONObject.toJSONString(content));System.out.println(message);
messageService.addMessage(message);System.out.println("成功处理事件");}}
在特定的地方触发消息产生
CommentController
//触发评论事件Event event=newEvent().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType()==ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
event.setEntityUserId(target.getUserId());}elseif(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());
event.setEntityUserId(target.getUserId());}
eventProducer.fireEvent(event);
LikeController
//触发点赞事件if(likeStatus ==1){Event event =newEvent().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);
eventProducer.fireEvent(event);}
FollowController
//触发关注事件Event event =newEvent().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);
eventProducer.fireEvent(event);
本文转载自: https://blog.csdn.net/weixin_44925329/article/details/134183204
版权归原作者 没脑袋的喵 所有, 如有侵权,请联系我们删除。
版权归原作者 没脑袋的喵 所有, 如有侵权,请联系我们删除。