0


RabbitMQ-基础学习

在虚拟机上安装Erlang的GCC环境,装erlong,然后安装rabbitmq

参考:安装说明链接

安装web端面板

在这里插入图片描述

创建交换机

在这里插入图片描述

先学习一下工作模式(详细介绍可见官网)

在这里插入图片描述

上代码

1.Hello Word模式

在这里插入图片描述

写在测试类中:
Providucer

@TestvoidcontextLoads()throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("peng",true,false,false,null);//6.发送消息//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)/**
         * 1.exchange:交换机名称
         * 2.routingKey:路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */String body="第一个消息";
        channel.basicPublish("","peng",null,body.getBytes());//7.释放资源
        channel.close();
        connection.close();}

Consumer

@TestvoidcontextLoads()throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("peng",true,false,false,null);//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)/**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */Consumer consumer=newDefaultConsumer(channel){/**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag"+consumerTag);System.out.println("envelope"+envelope.getExchange());System.out.println("properties"+envelope.getRoutingKey());System.out.println("properties"+properties);System.out.println("body"+newString(body));}};
        channel.basicConsume("peng",true,consumer);}

2.Work Queues模式

在这里插入图片描述
生产者生产,两个消费者循环消费
P:

packagecom.providucer.factory;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * @ClassName: Providucerfactory
 * @author: 鹏
 * @date: 2023/7/4 14:42
 */publicclassProvideFactory{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */
        channel.queueDeclare("pengwork",true,false,false,null);//6.发送消息//basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)/**
         * 1.exchange:交换机名称
         * 2.routingKey:路由名称
         * 3.props:配置信息
         * 4.body:发送的消息数据
         */for(int i =1; i <=10; i++){String body="第"+i+"个消息";
            channel.basicPublish("","pengwork",null,body.getBytes());}//7.释放资源
        channel.close();
        connection.close();}}

C1:

packagecom.consumer.factory;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */publicclassConsumerFactory{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)/**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */Consumer consumer=newDefaultConsumer(channel){/**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/System.out.println("body"+newString(body));}};
        channel.basicConsume("pengwork",true,consumer);}}

C2:

packagecom.consumer.factory;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * @ClassName: ConsumerFactory
 * @author: 鹏
 * @date: 2023/7/4 14:44
 */publicclassConsumerFactory1{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建队列Queue//queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)/**
         * 1.queue 队列名
         * 2.durable 是否持久化
         * 3.exclusive (1)是否独占站,只能有一个消费的监听者 (2)当Connection关闭时是否删除队列
         * 4.autoDelete 是否自动删除,当没有consumer会自动删除掉
         * 5.arguments:
         * */

        channel.queueDeclare("pengwork",true,false,false,null);//basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)/**
         * 1.queue:队列名
         * 2.deliverCallback:是否自动确认收到
         * 3.cancelCallback:回调对象
         */Consumer consumer=newDefaultConsumer(channel){/**
             * 1.consumerTag:
             * 2.envelope:
             * 3.properties:
             * 4.body:
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{/*                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope.getExchange());
                System.out.println("properties"+envelope.getRoutingKey());
                System.out.println("properties"+properties);*/System.out.println("body"+newString(body));}};
        channel.basicConsume("pengwork",true,consumer);}}

消费结果:
在这里插入图片描述
在这里插入图片描述

3.Publish/Subscribe订阅模式

在这里插入图片描述
在这里插入图片描述
消费着只需要绑定相应的队列,生产者需要创建交换机

publicclassPubFactory{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)String exchange="test_fanout";
        channel.exchangeDeclare(exchange,BuiltinExchangeType.FANOUT,true,false,false,null);String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";//1.exchange:交换机名称//2.type:交换机类型/**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         *///3.durable:是否持久化//4.autoDelete:是福哦自动删除//5.internal: 内部使用一般用false//6.arguments: 参数//channel.exchangeDeclare();//6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);//7.绑定交换机与队列/**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"");
        channel.queueBind(queue2Name,exchange,"");String body="日志信息:接收成功";
        channel.basicPublish(exchange,"",null,body.getBytes());//8.释放资源
        channel.close();
        connection.close();}}

4.Routing路由模式

路由模式相当于增加一层限制,只有通过相应的限制交换机才能将消息发布到对应的队列,也就是在发布的时候路由参数数设置值,且交换机类型必须为direct

channel.basicPublish(exchange,"error",null,body.getBytes());

此处限制队列路由为error的可以发送

publicclassRoutingFactory{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)String exchange="test_direct";
        channel.exchangeDeclare(exchange,BuiltinExchangeType.DIRECT,true,false,false,null);String queue1Name ="test_fanout_queue1";String queue2Name ="test_fanout_queue2";//1.exchange:交换机名称//2.type:交换机类型/**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         *///3.durable:是否持久化//4.autoDelete:是福哦自动删除//5.internal: 内部使用一般用false//6.arguments: 参数//channel.exchangeDeclare();//6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);//7.绑定交换机与队列/**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"info");
        channel.queueBind(queue2Name,exchange,"error");
        channel.queueBind(queue2Name,exchange,"warming");String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());//8.释放资源
        channel.close();
        connection.close();}}

5. Topics模式

相对于routing在队列增加了匹配规则,让交换机发送与队列接受更加灵活

*

匹配一个单词,

#

匹配多个单词

 channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true,false,false,null);

设置为BuiltinExchangeType.TOPIC
在这里插入图片描述

publicclassTopicsFactory{publicstaticvoidmain(String[] args)throwsException{//1.创建链接ConnectionFactory factory =newConnectionFactory();//2。设置参数
        factory.setHost("192.168.63.130");
        factory.setPort(5672);
        factory.setVirtualHost("/peng");
        factory.setUsername("peng");
        factory.setPassword("peng");//3.创建链接ConnectionConnection connection = factory.newConnection();//4.创建ChannelChannel channel = connection.createChannel();//5.创建交换机//exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)String exchange="test_topic";
        channel.exchangeDeclare(exchange,BuiltinExchangeType.TOPIC,true,false,false,null);String queue1Name ="test_topic_queue1";String queue2Name ="test_topic_queue2";//1.exchange:交换机名称//2.type:交换机类型/**
         *        DIRECT("direct"),定向
         *         FANOUT("fanout"),扇形(广播)
         *         TOPIC("topic"),通配符方式
         *         HEADERS("headers")参数匹配
         *///3.durable:是否持久化//4.autoDelete:是福哦自动删除//5.internal: 内部使用一般用false//6.arguments: 参数//channel.exchangeDeclare();//6.创建队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);//7.绑定交换机与队列/**
         * 1.queue:队列名称
         * 2.exchange:交换机名称
         * 3.routingKey:路由键,绑定规则
         *      如果交换机的类型为fanout,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchange,"*.*");
        channel.queueBind(queue2Name,exchange,"*.one");
        channel.queueBind(queue2Name,exchange,"*.two");
        channel.queueBind(queue2Name,exchange,"ok.*");String body="日志信息:接收成功";
        channel.basicPublish(exchange,"error",null,body.getBytes());
        channel.basicPublish(exchange,"123.one",null,body.getBytes());
        channel.basicPublish(exchange,"123.two",null,body.getBytes());//8.释放资源
        channel.close();
        connection.close();}}
标签: rabbitmq 学习 ruby

本文转载自: https://blog.csdn.net/qq_45720234/article/details/131535031
版权归原作者 二两相思酿 所有, 如有侵权,请联系我们删除。

“RabbitMQ-基础学习”的评论:

还没有评论