0


Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)

一、ActiveMQ 示例

在 Java 中,可以使用消息队列实现异步处理。下面是一个简单的示例代码,用于说明如何使用 ActiveMQ 实现消息队列异步处理:

  1. 添加 ActiveMQ 依赖

在 pom.xml 文件中添加以下依赖:

<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.16.3</version></dependency>
  1. 创建消息队列

创建一个名为 “TestQueue” 的消息队列,并配置 ActiveMQ 连接信息:

importjavax.jms.*;importorg.apache.activemq.ActiveMQConnectionFactory;publicclassTestQueue{publicstaticvoidmain(String[] args)throwsException{// 创建连接工厂ConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();
        connection.start();// 创建会话Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("TestQueue");// 创建生产者MessageProducer producer = session.createProducer(queue);// 发送消息for(int i =0; i <10; i++){TextMessage message = session.createTextMessage("Message "+ i);
            producer.send(message);}// 关闭连接
        connection.close();}}
  1. 创建消息消费者

创建一个消息消费者,并实现 MessageListener 接口,以便异步处理消息:

importjavax.jms.*;importorg.apache.activemq.ActiveMQConnectionFactory;publicclassTestConsumerimplementsMessageListener{publicstaticvoidmain(String[] args)throwsException{// 创建连接工厂ConnectionFactory connectionFactory =newActiveMQConnectionFactory("tcp://localhost:61616");// 创建连接Connection connection = connectionFactory.createConnection();
        connection.start();// 创建会话Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);// 创建队列Queue queue = session.createQueue("TestQueue");// 创建消费者MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(newTestConsumer());// 等待消息Thread.sleep(5000);// 关闭连接
        connection.close();}@OverridepublicvoidonMessage(Message message){try{if(message instanceofTextMessage){TextMessage textMessage =(TextMessage) message;System.out.println("Received message: "+ textMessage.getText());}}catch(JMSException e){
            e.printStackTrace();}}}
  1. 运行代码

在命令行中分别执行 TestQueue 和 TestConsumer 两个类,可以看到生产者向消息队列发送了10条消息,并由消息消费者异步处理这些消息。

使用消息队列可以有效地实现异步处理,可以提高应用程序的性能和并发能力。在上述示例代码中,生产者通过创建消息并将其发送到队列中,而消费者则监听队列并异步处理接收到的消息。消息队列可以起到解耦作用,使得生产者和消费者之间的交互更加灵活和可靠,因为消息队列具有缓冲和异步处理的特点,即使某个消费者出现故障,也不会影响消息的传递和处理。

总的来说,消息队列是一种重要的异步通信机制,能够提高系统的可靠性和可伸缩性,适用于各种分布式系统和大规模应用程序。除了 ActiveMQ,还有其他很多优秀的消息队列实现,比如 RabbitMQ、Kafka 等。

二、RabbitMQ

  1. RabbitMQ 示例代码

在 Java 中,可以使用 RabbitMQ 的 Java 客户端实现消息队列的异步处理。下面是一个简单的示例代码,用于说明如何使用 RabbitMQ 实现消息队列异步处理:

  1. 添加 RabbitMQ 依赖

在 pom.xml 文件中添加以下依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.12.0</version></dependency>
  1. 创建消息队列

创建一个名为 “TestQueue” 的消息队列,并配置 RabbitMQ 连接信息:

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;publicclassTestQueue{publicstaticvoidmain(String[] args)throwsException{// 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");// 创建连接Connection connection = connectionFactory.newConnection();// 创建通道Channel channel = connection.createChannel();// 创建队列
        channel.queueDeclare("TestQueue",false,false,false,null);// 发送消息for(int i =0; i <10; i++){String message ="Message "+ i;
            channel.basicPublish("","TestQueue",null, message.getBytes());}// 关闭连接
        channel.close();
        connection.close();}}
  1. 创建消息消费者

创建一个消息消费者,并实现 Consumer 接口,以便异步处理消息:

importcom.rabbitmq.client.*;importjava.io.IOException;publicclassTestConsumerimplementsConsumer{publicstaticvoidmain(String[] args)throwsException{// 创建连接工厂ConnectionFactory connectionFactory =newConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");// 创建连接Connection connection = connectionFactory.newConnection();// 创建通道Channel channel = connection.createChannel();// 创建队列
        channel.queueDeclare("TestQueue",false,false,false,null);// 创建消费者
        channel.basicConsume("TestQueue",true,newTestConsumer());// 等待消息Thread.sleep(5000);// 关闭连接
        channel.close();
        connection.close();}@OverridepublicvoidhandleConsumeOk(String consumerTag){}@OverridepublicvoidhandleCancelOk(String consumerTag){}@OverridepublicvoidhandleCancel(String consumerTag)throwsIOException{}@OverridepublicvoidhandleShutdownSignal(String consumerTag,ShutdownSignalException sig){}@OverridepublicvoidhandleRecoverOk(String consumerTag){}@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String message =newString(body,"UTF-8");System.out.println("Received message: "+ message);}}

三、 Kafka 示例

在 Java 中,可以使用 Kafka 的 Java 客户端实现消息队列的异步处理。下面是一个简单的示例代码,用于说明如何使用 Kafka 实现消息队列异步处理:

  1. 添加 Kafka 依赖

在 pom.xml 文件中添加以下依赖:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.0</version></dependency>
  1. 创建消息队列

创建一个名为 “TestTopic” 的消息队列,并配置 Kafka 连接信息:

importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassTestProducer{publicstaticvoidmain(String[] args)throwsException{// 配置 Kafka 连接信息Properties properties =newProperties();
        properties.put("bootstrap.servers","localhost:9092");
        properties.put("acks","all");
        properties.put("retries",0);
        properties.put("batch.size",16384);
        properties.put("linger.ms",1);
        properties.put("buffer.memory",33554432);
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 创建生产者Producer<String,String> producer =newKafkaProducer<>(properties);// 发送消息for(int i =0; i <10; i++){String message ="Message "+ i;
            producer.send(newProducerRecord<>("TestTopic", message));}// 关闭生产者
        producer.close();}}
  1. 创建消息消费者

创建一个消息消费者,并实现 Consumer 接口,以便异步处理消息:

importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.TopicPartition;importjava.util.Arrays;importjava.util.HashMap;importjava.util.Map;importjava.util.Properties;publicclassTestConsumer{publicstaticvoidmain(String[] args)throwsException{// 配置 Kafka 连接信息Properties properties =newProperties();
        properties.put("bootstrap.servers","localhost:9092");
        properties.put("group.id","TestGroup");
        properties.put("enable.auto.commit","false");
        properties.put("auto.offset.reset","earliest");
        properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");// 创建消费者KafkaConsumer<String,String> consumer =newKafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("TestTopic"));// 消费消息while(true){ConsumerRecords<String,String> records = consumer.poll(100);for(ConsumerRecord<String,String>record: records){System.out.println("Received message: "+record.value());// 手动提交消费位移Map<TopicPartition,OffsetAndMetadata> offsets =newHashMap<>();
                offsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1));
                consumer.commitSync(offsets);}}}}

本文转载自: https://blog.csdn.net/IamBird/article/details/130298020
版权归原作者 码视野 所有, 如有侵权,请联系我们删除。

“Java中如何使用消息队列实现异步(ActiveMQ,RabbitMQ,Kafka)”的评论:

还没有评论