0


使用Kafka实现带消息记录的实时传输对话场景(聊天室)

使用Websocket可以实现客户端的双向通信,客户端可以向服务端发送数据,服务端也可以向客户端发送数据。

传输内容是可以实时传输了,但是如果没有存储功能,一刷新页面(或者重新进入)就再也看不到之前发送的消息,这是一个很大的麻烦。

策略一(数据持久化)

首先想到在传输的时候,顺带将数据存入数据库,貌似这样也是一种可行的方案,但后果是每一次传输都要和数据库进行一次IO,性能低下,甚至有时候可能出现存入失败的情况,导致数据丢失。

策略二(消息队列)

是否能将消息持久化进消息队列呢,消息队列可以保证我们消息的顺序性,可靠性,也有一些附带功能例如负载均衡、异步处理、监控等。

优点无数,貌似很完美,实现一下看看效果就知道了~

消息队列选用Kafka

优点如下:

  1. 高吞吐量:Kafka能够处理大规模的数据流,每秒可以处理数百万条消息,甚至更多。这种高吞吐量特性使其成为处理大量数据的理想选择,尤其适用于实时应用程序和日志收集场景。
  2. 低延迟:Kafka具有低延迟特性,可以实现几乎实时的数据传输和处理。其延迟最低可以达到几毫秒,非常适用于需要快速响应和实时分析的应用。
  3. 持久性和可靠性:Kafka消息被持久化到磁盘上,并通过多副本机制进行数据备份,确保数据不会丢失。这种持久性和可靠性使得Kafka适用于关键性的数据采集和日志记录需求。
  4. 分布式架构和水平扩展性:Kafka是分布式的,可以在多个节点上运行,并提供高可用性和容错性。通过添加更多的代理节点,可以轻松扩展Kafka集群的能力,以处理更多的数据流。这种扩展性使其能够适应快速增长的数据需求。
  5. 多样的生产者和消费者支持:Kafka提供了多种编程语言的客户端库,允许多种不同类型的生产者和消费者与其集成,包括Java、Python、Go等。这种跨语言的支持使得Kafka能够广泛应用于各种技术栈。
  6. 灵活的消息处理模型:Kafka支持发布-订阅消息系统模型,允许消息被多个消费者订阅和使用。同时,它还支持消息的分区和消费者组,使得消息处理更加灵活和高效。

具体采取的模式如下

在这里插入图片描述

开多个消费者组,一个消费者组只有一个消费者,也就意味着一个Topic有多个消费者组订阅。

由于Kafka的模式是一个Topic的消息只会不重复的给到一个消费者组,如果消费者组内有两名消费者,其中一名消费者消费了消息的话,另一名消费者无法重复消费此消息。

这样一来,我们只需要写一个接口往Topic发送消息,那么订阅的消费者们就可以实时收到消息了,就算消费者不在线,消息也会存储在Topic当中。

开始实现

新建springboot项目,引入pom

👇spring kafka使用文档👇

Overview :: Spring Kafka

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.51</version></dependency>

在SpringApplication中写,或者在@Configuration下注入bean

@SpringBootApplicationpublicclassSpringTestApplication{publicstaticvoidmain(String[] args){SpringApplication.run(SpringTestApplication.class, args);}// 启动时创建topic@BeanpublicNewTopictopic1(){returnTopicBuilder.name("thing10").partitions(1).build();}// 生产者配置publicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.141.130:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);// org.springframework.kafka.support.serializer.JsonDeserializerreturn props;}//消费者配置publicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.141.130:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);// 值序列化器 // org.springframework.kafka.support.serializer.JsonDeserializerreturn props;}// kafka模板@BeanpublicKafkaTemplate<String,Message>kafkaTemplate(){KafkaTemplate<String,Message> template =newKafkaTemplate<>(producerFactory());
        template.setConsumerFactory(consumerFactory());return template;}@BeanpublicConsumerFactory<String,Message>consumerFactory(){returnnewDefaultKafkaConsumerFactory<>(consumerConfigs(),newStringDeserializer(),newJsonDeserializer<>(Message.class));}@BeanpublicProducerFactory<String,Message>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}@BeanpublicConcurrentKafkaListenerContainerFactory<String,Message>kafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory<String,Message> factory =newConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setReplyTemplate(kafkaTemplate());return factory;}@BeanpublicKafkaAdminadmin(){Map<String,Object> configs =newHashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.141.130:9092");returnnewKafkaAdmin(configs);}}

JsonDeserializer需要添加信任

新建KafkaConfig.java

@ConfigurationpublicclassKafkaConfig{@BeanpublicJsonDeserializer<Message>jsonDeserializer(){JsonDeserializer<Message> deserializer =newJsonDeserializer<>(Message.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");return deserializer;}// Websocket服务@BeanpublicServerEndpointExporterserverEndpointExporter(){ServerEndpointExporter exporter =newServerEndpointExporter();
        exporter.setAnnotatedEndpointClasses(WebSocketServer.class);return exporter;}}

新建WebSocketServer.java

@ServerEndpoint(value ="/connect/{id}")@Slf4j@ComponentpublicclassWebSocketServer{//在线客户端集合publicstaticfinalMap<String,Session> onlineSessionClientMap =newConcurrentHashMap<>();@OnOpenpublicvoidonOpen(@PathParam("id")String id,Session session){System.out.println("开启连接"+ id);
        onlineSessionClientMap.put(id, session);}@OnClosepublicvoidonClose(@PathParam("id")String id,Session session){//从map集合中移除System.out.println("断开连接"+ id);
        onlineSessionClientMap.remove(id);ConcurrentMessageListenerContainer<String,Message> container =ChatService.containerMap.get(id);if(container !=null){// 监听容器停止监听
            container.stop();// 从Map中移除ChatService.containerMap.remove(id);}}@OnMessagepublicvoidonMessage(String message,Session session){}@OnErrorpublicvoidonError(Session session,Throwable error){}}

这里ChatService比较核心,需要实现以下。

新建ChatService.java

@RequiredArgsConstructor@ServicepublicclassChatService{privatefinalConcurrentKafkaListenerContainerFactory<String,Message> kafkaListenerContainerFactory;// 根据userId取监听容器,userId是接口传入的值,因为消费者组里就一个消费者,所以groupId也用userIdpublicstaticMap<String,ConcurrentMessageListenerContainer<String,Message>> containerMap =newHashMap<>();publicvoidlisten(String userId,String... topic){// 消费者组重复监听,先停止,再开启if(containerMap.containsKey(userId)){stop(userId);}ConcurrentMessageListenerContainer<String,Message> container = kafkaListenerContainerFactory.createContainer(topic);// groupId用userId
        container.getContainerProperties().setGroupId(userId);//自定义监听器,下面实现
        container.getContainerProperties().setMessageListener(newMyMessageListenr(userId));

        containerMap.put(userId, container);
        container.start();}publicvoidstop(String userId){ConcurrentMessageListenerContainer<String,Message> container = containerMap.get(userId);
        container.stop();
        containerMap.remove(userId);}}

实现MyMessageListenr.java,新建。

@Data@AllArgsConstructor@NoArgsConstructorpublicclassMyMessageListenrimplementsMessageListener<String,Message>{privateString userId;@OverridepublicvoidonMessage(ConsumerRecord<String,Message> data){System.out.println("收到"+ data.value());Message message = data.value();Map<String,Session> map =WebSocketServer.onlineSessionClientMap;System.out.println(map);if(map.containsKey(userId)){MessageVo entity =newMessageVo(message,newDate(data.timestamp()));
            map.get(userId).getAsyncRemote().sendText(JSONObject.toJSONString(entity));}else{System.out.println(userId +"不在线");}}}

特别说明:

每一个消费者组都有一个监听器,监听器收到消息只会给监听他的消费组发。

比如①号用户监听了,②号用户也监听了,此时实际上有两个MyMessageListenr,不需要相互发。

Message、MessageVo是实体

@Data@AllArgsConstructor@NoArgsConstructorpublicclassMessage{privateString from;privateStringto;privateString body;}
@Data@AllArgsConstructor@NoArgsConstructorpublicclassMessageVo{privateString from;privateStringto;privateString body;@JsonFormat(pattern ="yyyy-MM-dd HH:mm:ss",timezone ="GMT+8")privateDate date;publicMessageVo(Message message,Date date){this.from = message.getFrom();this.to= message.getTo();this.body = message.getBody();this.date = date;}}

接下来编写接口,新建KafkaController.java

@RestController@RequestMapping("/kafka")@RequiredArgsConstructorpublicclassKafkaController{privatefinalKafkaTemplate<String,Message> kafkaTemplate;privatefinalKafkaAdmin kafkaAdmin;privatefinalChatService chatService;// 发送消息@GetMapping("/send")publicStringsend(@RequestParamString msg,@RequestParamString from,@RequestParamStringto){ProducerRecord<String,Message> record =newProducerRecord<>("thing10","key",newMessage(from,to, msg));
        kafkaTemplate.send(record);return"ok";}// 开始监听@PostMapping("/start")publicvoidstart(@RequestParamString userId){// thing10监听的Topic名称
        chatService.listen(userId,"thing10");}// 获取Topic所有消息@GetMapping("/message")publicList<MessageVo>message(@RequestParamString topic)throwsExecutionException,InterruptedException{AdminClient client =AdminClient.create(kafkaAdmin.getConfigurationProperties());Map<TopicPartition,ListOffsetsResult.ListOffsetsResultInfo> map = client.listOffsets(Collections.singletonMap(newTopicPartition("thing10",0),OffsetSpec.latest())).all().get();List<MessageVo> list =newArrayList<>();for(ListOffsetsResult.ListOffsetsResultInfo value : map.values()){long offset = value.offset();List<TopicPartitionOffset> cur =newArrayList<>();for(long i =0; i < offset; i++){
                cur.add(newTopicPartitionOffset(topic,0, i));}ConsumerRecords<String,Message> consumerRecords = kafkaTemplate.receive(cur);for(ConsumerRecord<String,Message> record : consumerRecords){
                list.add(newMessageVo(record.value(),newDate(record.timestamp())));}}return list;}}

后端就到此结束了。

页面用React写的,感兴趣可以看看

// chat/:userId 路由import{ useMutation }from'@tanstack/react-query'import{ Input }from'antd'import axios from'axios';import{ flushSync }from'react-dom';const{ TextArea }= Input;interfaceMessage{
  from:string;
  to:string;
  body:string;
  date:string;}exportdefaultfunctionChat(){const{ userId }=useParams()const eleRef =useRef<any>()const[curTopic, setCurTopic]=useState('thing10')const[sendTo, setSendTo]=useState('hxy')const[msgList, setMsgList]=useState<Message[]>([])const[inputValue, setInputValue]=useState('')const getMsgList =useMutation<Message[]>({mutationFn:()=>fetch('/api/kafka/message?topic='+ curTopic).then(res => res.json()),onSuccess:(data)=>{flushSync(()=>{setMsgList(data)})
      eleRef.current.scrollTo({
        top: eleRef.current.scrollHeight,})}})const mutation =useMutation({mutationFn:()=>fetch('/api/kafka/start?userId='+ userId,{ method:'POST'}).then(res => res.json()),})functioncreateConnection(){const socket =newWebSocket('ws://localhost:8088/connect/'+ userId)
    socket.onopen=()=>{console.log('socket open');flushSync(()=>{
        mutation.mutateAsync()})}
    socket.onmessage=(event)=>{const data =JSON.parse(event.data)flushSync(()=>{setMsgList(e =>{return[...e, data]})})
      eleRef.current.scrollTo({
        top: eleRef.current.scrollHeight,
        behavior:'smooth'})}
    socket.onerror=(e: Event)=>{console.log(e);}return socket
  }asyncfunctionsendMsg(){const res =await axios.get(`/api/kafka/send?msg=${inputValue}&from=${userId}&to=${sendTo}`)if(res.data ==='ok'){setInputValue('')}}useEffect(()=>{
    getMsgList.mutateAsync()const socket =createConnection()return()=>{
      socket.close()}},[])return(<div className="w-screen h-screen flex justify-center items-center"><div className="w-[600px] h-[700px] bg-slate-200">{/* Top */}<div className='h-[80%] overflow-y-auto p-2' ref={eleRef}><Input value={curTopic} disabled placeholder='输入 Topic id' onChange={e =>setCurTopic(e.currentTarget.value)}></Input><Input value={sendTo} placeholder='输入 Send' onChange={e =>setSendTo(e.currentTarget.value)}></Input><p className='text-sm my-2 underline'><span>用户ID:</span>{userId}</p>{<div>{
                msgList.map((v, i)=>{return<div key={i} className='mb-2'><p className='text-gray-600 text-sm'>{v.date}</p><p>{`${v.from}对${v.to}说:${v.body}`}</p></div>})}</div>}</div>{/* Buttom */}<div className='h-[20%] border-gray-100'><TextArea className='border-none rounded-none focus:border-none focus:shadow-none' rows={6} placeholder="输入内容"
            maxLength={100} value={inputValue} onChange={e =>setInputValue(e.target.value)} onPressEnter={sendMsg}/></div></div></div>)}

一个可以存储信息的实时通信功能,大概就是这样了。

标签: kafka java spring boot

本文转载自: https://blog.csdn.net/a1308003218/article/details/140640183
版权归原作者 你熬夜了吗? 所有, 如有侵权,请联系我们删除。

“使用Kafka实现带消息记录的实时传输对话场景(聊天室)”的评论:

还没有评论