0


[ RabbitMQ 消息队列来处理高并发场景 ]

首先,需要创建一个 RabbitMQ 的连接和消息通道。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMQUtils {

    private static final String QUEUE_NAME = "demo_queue";
    private static final String HOST = "localhost";
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";

    private static ConnectionFactory connectionFactory;
    private static Connection connection;
    private static Channel channel;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static Channel getChannel() {
        return channel;
    }

    public static void close() {
        try {
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在上面的代码中,我们通过

Connection

Channel

对象来管理 RabbitMQ 连接和消息通道。在

static

代码块中,我们通过

ConnectionFactory

对象来创建连接,并创建一个名为

demo_queue

的队列。在实际应用中,可以根据需要进行其他配置。最后,我们提供了一个

getChannel()

方法来获取消息通道,并提供了一个

close()

方法来关闭连接。

然后,需要创建一个生产者来发送消息到消息队列。

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        for (int i = 0; i < 100; i++) {
            String message = "Message " + i;
            channel.basicPublish("", RabbitMQUtils.QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
        System.out.println("Sent all messages");
        RabbitMQUtils.close();
    }
}

在上面的代码中,我们通过

RabbitMQUtils.getChannel()

方法获取消息通道,然后循环发送消息到队列中。在实际应用中,可以根据需要设置消息体的格式和内容。最后,我们通过

RabbitMQUtils.close()

方法关闭连接。

最后,需要创建一个消费者来消费消息队列中的消息。

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        channel.basicConsume(RabbitMQUtils.QUEUE_NAME, true, new DeliverCallback() {
            @Override
            public void handle(String consumerTag, Delivery delivery) throws IOException {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Received message: " + message);
//处理数据  比如缓冲数据等等
            }
        }, new CancelCallback() {
            @Override
            public void handle(String consumerTag) throws IOException {
                System.out.println("Consumer cancelled");
            }
        });
    }
}

在上面的代码中,我们通过

RabbitMQUtils.getChannel()

方法获取消息通道,然后通过

basicConsume()

方法来消费队列中的消息。在

DeliverCallback

中,我们对接收到的消息进行处理,并在

CancelCallback

中实现取消消费的逻辑。在实际应用中,可以根据需要进行其他操作。

通过使用 RabbitMQ 消息队列,我们可以方便地实现系统之间的消息传递和解耦,从而提高系统的可扩展性和并发性能。需要注意的是,RabbitMQ 的使用需要谨慎,特别是在高并发场景下,需要根据实际情况进行配置和优化,以获得更好的性能和效果。

RabbitMQ 消息队列可以提高代码执行性能,主要体现在以下几个方面:

  • 异步处理:通过将任务放入消息队列中,可以让发送方和接收方之间实现异步处理,发送方可以快速返回,而接收方可以在后台线程中处理任务,从而提高代码的执行效率。
  • 解耦应用:通过使用消息队列,可以将应用程序之间的依赖关系解耦,从而提高应用程序的模块化和可维护性。
  • 缓冲数据:通过使用消息队列缓冲数据,可以减轻服务器的负载,避免服务器崩溃,从而提高应用程序的可用性。
  • 并行处理:通过使用多个消费者,可以实现消息队列的并行处理,从而提高应用程序的并发性能。

需要注意的是,使用 RabbitMQ 消息队列的过程中,需要考虑到消息传递的延迟和数据一致性的问题。例如,在将任务放入消息队列之前,需要将任务数据写入数据库或其他持久化存储中,以避免数据丢失。在接收到任务后,也需要检查任务数据的完整性,并在需要时进行数据恢复。

总之,通过合理使用 RabbitMQ 消息队列,可以有效提高代码执行性能和应用程序的可扩展性,但同时也需要考虑到消息传递的延迟和数据一致性等问题。


RabbitMQ 实现保持消息一致性的demo

我们可以使用 RabbitMQ 的 Java 客户端库来实现保持消息一致性的功能。以下是一个简单的示例代码

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQDemo {
    private static final String EXCHANGE_NAME = "my_exchange";
    private static final String QUEUE_NAME = "my_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 定义交换机和队列
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 发送消息
        String message = "Hello, RabbitMQ!";
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2) // 消息持久化
                .build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 消费消息
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                // 处理消息
                System.out.println("Received message: " + message);
                // 确认消息已处理完成
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });
    }
}
  • 在上面的示例代码中,我们创建了一个连接工厂,设置 RabbitMQ 服务器的主机名和端口号。然后我们创建了一个连接和通道,定义了一个交换机和队列,并设置了队列的持久化属性。接着我们发送了一条消息,并在消息上设置了持久化属性。最后我们消费了队列中的消息,并在处理完成后确认了消息已经处理完成。
  • 在实际应用中,我们还需要根据具体需求进行配置和优化,例如设置 Exchange 的类型、使用事务提交和回滚、设置消息过期时间等。

本文转载自: https://blog.csdn.net/sqL520lT/article/details/130932293
版权归原作者 是汤圆丫 所有, 如有侵权,请联系我们删除。

“[ RabbitMQ 消息队列来处理高并发场景 ]”的评论:

还没有评论