0


牛客项目(五)-使用kafka实现发送系统通知

kafka入门以及与spring整合

在这里插入图片描述

Message.java

importjava.util.Date;publicclassMessage{privateint id;privateint fromId;privateint toId;privateString conversationId;privateString content;privateint status;privateDate createTime;publicintgetId(){return id;}publicvoidsetId(int id){this.id = id;}publicintgetFromId(){return fromId;}publicvoidsetFromId(int fromId){this.fromId = fromId;}publicintgetToId(){return toId;}publicvoidsetToId(int toId){this.toId = toId;}publicStringgetConversationId(){return conversationId;}publicvoidsetConversationId(String conversationId){this.conversationId = conversationId;}publicStringgetContent(){return content;}publicvoidsetContent(String content){this.content = content;}publicintgetStatus(){return status;}publicvoidsetStatus(int status){this.status = status;}publicDategetCreateTime(){return createTime;}publicvoidsetCreateTime(Date createTime){this.createTime = createTime;}@OverridepublicStringtoString(){return"Message{"+"id="+ id +", fromId="+ fromId +", toId="+ toId +", conversationId='"+ conversationId +'\''+", content='"+ content +'\''+", status="+ status +", createTime="+ createTime +'}';}}

EventConsumer.java

定义事件消费者

importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.DiscussPost;importedu.npu.newcoder.community.community.entity.Event;importedu.npu.newcoder.community.community.entity.Message;importedu.npu.newcoder.community.community.service.DiscussPostService;importedu.npu.newcoder.community.community.service.ElasticsearchService;importedu.npu.newcoder.community.community.service.MessageService;importedu.npu.newcoder.community.community.util.CommunityConstant;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;@ComponentpublicclassEventConsumerimplementsCommunityConstant{//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@AutowiredprivateMessageService messageService;@AutowiredprivateDiscussPostService discussPostService;@AutowiredprivateElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics ={TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})publicvoidhandleCommentMessage(ConsumerRecord record){if(record ==null|| record.value()==null){System.out.println("错误发帖");return;}Event event=JSONObject.parseObject(record.value().toString(),Event.class);if(event ==null){System.out.println("错误发帖");return;}//发送站内通知Message message =newMessage();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(newDate());//message的内容Map<String,Object> content=newHashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());}}
        message.setContent(JSONObject.toJSONString(content));System.out.println(message);
        messageService.addMessage(message);System.out.println("成功处理事件");}}

Event.java

定义一个事件实体 以方便在消息的发送与处理

importjava.util.HashMap;importjava.util.Map;//用于事件驱动的kafka消息队列开发publicclassEvent{privateString topic;//事件触发的人privateint userId;//事件发生在哪个实体privateint entityType;privateint entityId;//实体作者privateint entityUserId;//存储额外数据privateMap<String,Object> data =newHashMap<>();publicStringgetTopic(){return topic;}publicEventsetTopic(String topic){this.topic = topic;returnthis;}publicintgetUserId(){return userId;}publicEventsetUserId(int userId){this.userId = userId;returnthis;}publicintgetEntityType(){return entityType;}publicEventsetEntityType(int entityType){this.entityType = entityType;returnthis;}publicintgetEntityId(){return entityId;}publicEventsetEntityId(int entityId){this.entityId = entityId;returnthis;}publicintgetEntityUserId(){return entityUserId;}publicEventsetEntityUserId(int entityUserId){this.entityUserId = entityUserId;returnthis;}publicMap<String,Object>getData(){return data;}publicEventsetData(String key,Object value){this.data.put(key,value);returnthis;}}

EventProducer.java

定义事件的生产者

importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.Event;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.core.KafkaTemplate;importorg.springframework.stereotype.Component;@ComponentpublicclassEventProducer{//生产者使用kafkaTemplate发送消息@AutowiredKafkaTemplate kafkaTemplate;//处理事件publicvoidfireEvent(Event event){//将事件发布到指定的主题//将event转换为json数据进行消息发送
        kafkaTemplate.send(event.getTopic(),JSONObject.toJSONString(event));System.out.println("成功发送"+event.getTopic());}}

EventConsumer.java

定义事件消费者

importcom.alibaba.fastjson.JSONObject;importedu.npu.newcoder.community.community.entity.DiscussPost;importedu.npu.newcoder.community.community.entity.Event;importedu.npu.newcoder.community.community.entity.Message;importedu.npu.newcoder.community.community.service.DiscussPostService;importedu.npu.newcoder.community.community.service.ElasticsearchService;importedu.npu.newcoder.community.community.service.MessageService;importedu.npu.newcoder.community.community.util.CommunityConstant;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.kafka.annotation.KafkaListener;importorg.springframework.stereotype.Component;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;@ComponentpublicclassEventConsumerimplementsCommunityConstant{//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);@AutowiredprivateMessageService messageService;@AutowiredprivateDiscussPostService discussPostService;@AutowiredprivateElasticsearchService elasticsearchService;//加一个监听相关主题的listener@KafkaListener(topics ={TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})publicvoidhandleCommentMessage(ConsumerRecord record){if(record ==null|| record.value()==null){System.out.println("错误发帖");return;}Event event=JSONObject.parseObject(record.value().toString(),Event.class);if(event ==null){System.out.println("错误发帖");return;}//发送站内通知Message message =newMessage();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(newDate());//message的内容Map<String,Object> content=newHashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());if(!event.getData().isEmpty()){for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());}}
        message.setContent(JSONObject.toJSONString(content));System.out.println(message);
        messageService.addMessage(message);System.out.println("成功处理事件");}}

在特定的地方触发消息产生

CommentController

//触发评论事件Event event=newEvent().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId",discussPostId);if(comment.getEntityType()==ENTITY_TYPE_POST){DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());}elseif(comment.getEntityType()==ENTITY_TYPE_COMMENT){//根据评论的id查询评论Comment target =commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());}
         eventProducer.fireEvent(event);

LikeController

//触发点赞事件if(likeStatus ==1){Event event =newEvent().setTopic(TOPIC_LIKE).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityUserId).setData("postId",postId);
            eventProducer.fireEvent(event);}

FollowController

//触发关注事件Event event =newEvent().setTopic(TOPIC_FOLLOW).setUserId(hostHolder.getUser().getId()).setEntityType(entityType).setEntityId(entityId).setEntityUserId(entityId);
        eventProducer.fireEvent(event);
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/weixin_44925329/article/details/134183204
版权归原作者 没脑袋的喵 所有, 如有侵权,请联系我们删除。

“牛客项目(五)-使用kafka实现发送系统通知”的评论:

还没有评论