文章目录
Windows服务启动
这里只记录每次怎么在本地开启服务,不涉及具体安装细节,工作时一般由运维人员安装在linux环境上
服务开启命令
开启服务时,需要切到本地的rabbitmq的\sbin目录下
Rabbitmq-server
如果遇到端口占用的问题,windows开始面板上搜索rabbitmq-stop,点击后再次输入开启服务命令即可
管理页面入口(测试是否正常启动)
http://127.0.0.1:15672/
默认账号:guest
默认密码:guest
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p <vhostpath>]<user><conf><write><read>
rabbitmqctl set_permissions -p "/" admin ".*"".*"".*"
当前用户和角色
rabbitmqctl list_users
rabbitmq依赖
<!--指定 jdk 编译版本--><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build><dependencies><!--rabbitmq 依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的一个依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency></dependencies>
队列模型
简单队列
生产者
生产者首先获得连接,之后获得信道,信道初始化队列,然后发送信息
publicclassProducer{publicstaticfinalStringQUENE_NAME="QUEUE_SIMPLE";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");//连接Connection connection = connectionFactory.newConnection();//信道Channel channel = connection.createChannel();
channel.queueDeclare(QUENE_NAME,false,false,false,null);String message ="hello world rabbitmq";
channel.basicPublish("",QUENE_NAME,null,message.getBytes());System.out.println("消息发送完毕");}}
消费者
消费者首先获得连接,产生信道,
publicclassConsumer{publicstaticfinalStringQUEUE_NAME="QUEUE_SIMPLE";publicstaticvoidmain(String[] args)throwsIOException,TimeoutException{ConnectionFactory connectionFactory =newConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123");//连接Connection connection = connectionFactory.newConnection();//信道Channel channel = connection.createChannel();System.out.println("等待接受消息");推送的消息如何进行消费的接口回调DeliverCallback deliverCallback =(consumerTag,delivery)->{String message =newString(delivery.getBody());System.out.println(message);};//取消消费的一个回调接口 如在消费的时候队列被删除掉了CancelCallback cancelCallback =(counsumerTag)->{System.out.println("消息消费被中断");};//1. 消费哪个队列//2. 消费成功后是否要自动应答//3. 消费者未成功消费的回调
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
工作队列
核心思想就是轮训分发消息
抽取工具类
首先对获取信道的方法抽取成工具类
调用静态方法getChanel就可以获取一个信道
/**
* 此类为连接工厂创建信道的工具类
*/publicclassRabbitMqUtils{//得到一个连接的 channelpublicstaticChannelgetChannel()throwsException{//创建一个连接工厂ConnectionFactory factory =newConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");Connection connection = factory.newConnection();Channel channel = connection.createChannel();return channel;}}
启动两个工作线程
工作线程c1
/**
* 工作线程(相当于消费者)
*/publicclassWorker01{//队列的名称publicstaticfinalStringQUEUE_NAME="hello";//接收消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("接收:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消息者取消消费接口回调逻辑");};//接收System.out.println("c1启动...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
工作线程c2
/**
* 工作线程(相当于消费者)
*/publicclassWorker02{//队列的名称publicstaticfinalStringQUEUE_NAME="hello";//接收消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("接收:"+newString(message.getBody()));};CancelCallback cancelCallback =(consumerTag)->{System.out.println(consumerTag +"消息者取消消费接口回调逻辑");};//接收System.out.println("c2启动...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}}
启动发送线程
采用scanner输入流的方式从控制台去输入数据
/**
* 生产者 发送大量的消息
*/publicclassTask01{publicstaticfinalStringQUEUE_NAME="hello";//发送大量的消息publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();//队列的声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接受消息Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成:"+message);}}}
发送结果:
接收结果:
work01
work02
总结:生产者一共发送四条消息,两个消费者各接收两条消息,并且是有序的
消息应答
rabbitmq引入消息应答机制,保证消息在发送过程中不丢失,消息应答就是消费者在接收到消息并处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了
自动应答
容易丢失消息
手动应答
Channel.basicAck(用于肯定确认)
rabbitmq已知道该消息并且成功的处理消息,可以将其丢弃了
Multiple
Multiple参数一般设置为false
true
false
消息自动重新入队
若消费者失去连接(宕机、连接关闭等),导致消息未发送ACK确认,Rabbitmq将了解到消息未完全处理,并将对其重新排队,此时会分发给别的消费者。这样,即使某个消费者偶尔死亡,也可以确认不会丢失任何消息
此图代表了消息自动重新入队的具体执行流程
代码实现
生产者
/**
* 消息在手动应答时不丢失,放回队列中重新消费
*/publicclassTask2{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();//声明队列
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);//从控制台输入Scanner scanner =newScanner(System.in);while(scanner.hasNext()){String message = scanner.next();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());System.out.println("生产者发出消息:"+ message);}}}
消费者
消费者c1
沉睡时间1秒,模拟快速处理业务
publicclassWork03{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("c1等待接受消息处理较短");DeliverCallback deliverCallback =(consumerTag,message)->{//沉睡1sSleepUtils.sleep(1);System.out.println("接收到的消息:"+newString(message.getBody()));//手动应答/**
* 1. 消息的标记 tag
* 2. 是否批量应答 false:不批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck =false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println("消费者取消消费接口回调");}));}}
消费者c2
沉睡时间30秒,模拟慢速处理业务
publicclassWork04{//队列名称publicstaticfinalStringTASK_QUEUE_NAME="ack_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();System.out.println("c2等待接受消息处理较长");DeliverCallback deliverCallback =(consumerTag,message)->{//沉睡1sSleepUtils.sleep(30);System.out.println("接收到的消息:"+newString(message.getBody()));//手动应答/**
* 1. 消息的标记 tag
* 2. 是否批量应答 false:不批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck =false;
channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag ->{System.out.println("消费者取消消费接口回调");}));}}
睡眠工具类
publicclassSleepUtils{publicstaticvoidsleep(int second){try{Thread.sleep(1000*second);}catch(InterruptedException _ignored){Thread.currentThread().interrupt();}}}
效果
发送消息a,b,测试可知c1接收后,c2隔一段时间接收,此时发送c,d消息,c1接收了c消息,然后把c2服务停掉,模拟宕机,此时c1会去接收d消息,消息并没有丢失,会重新发送
发送消息:
c1
d消息成功接收到
c2
接收到b消息后模拟宕机
Exchanges类型
fagout-发布订阅模式
不处理routingkey,每个队列都会得到
direct-路由模式
根据routingkey去转发消息给指定的队列
topic-主题模式
根据特殊符号去匹配多个,类似于正则表达式
发布订阅模式
- 一个生产者对应多个消费者
- 生产者将消息发送到X(Exchange),每个消费者需要绑定一个队列
- Exchage:接收生产者的消息 推送给消费者
生产者
/**
* 发布订阅模式-生产者
*/publicclass producer{//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHAGE_NAME,"fanout");String msg ="hello fbdy";//发送消息到交换机
channel.basicPublish(EXCHAGE_NAME,"",null,msg.getBytes());System.out.println("publish msg:"+msg);}}
消费者
消费者1
/**
* 消费者1
*/publicclass cousumer1 {//队列publicstaticfinalStringQUEUE_NAME="queue_mail";//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("cousumer1:"+msg);}};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}
消费者2
/**
* 消费者2
*/publicclass cousumer2 {//队列publicstaticfinalStringQUEUE_NAME="queue_mail";//交换机publicstaticfinalStringEXCHAGE_NAME="exchange_ps";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHAGE_NAME,"");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("cousumer2:"+msg);}};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}
路由模式
- 生产者将消息发送到交换机,消息附带routingkey
- 消费者声明队列与交换机绑定,因为生产者附带routingKey,所以消费者如果想要接收来自交换机的消息,也需要绑定一致的routingKey,这样才可以接收来自生产者的消息。
- 只要消费者的绑定的交换机的路由键与交换机的路由键一致,则可以收到生产者的消息
生产者
- 生产者发布消息给交换机时需要指定routingKey
- 交换机的初始化是由生产者进行的
/**
* 路由模式
*/publicclass producer {publicstaticfinalStringEXCHAGE_NAME="direct_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHAGE_NAME,"direct");String rountingKey ="warning";
channel.basicPublish(EXCHAGE_NAME,rountingKey,null,"hello,gjj".getBytes());System.out.println("发送消息:hello,gjj,routingKey:"+rountingKey);}}
消费者
消费者1
- 队列的初始化是需要消费者进行的
- 每个队列需要绑定到交换机上的routingKey上,当与交换机的routingKey一致,则可以消费消息。
publicclass consumer1 {//声明交换机和队列类型publicstaticfinalStringEXCHANGE_NAME="direct_exchange";publicstaticfinalStringQUEUE_NAME="direct_queue1";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定队列至交换机,可以通过多个routingKey绑定一个队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("counsumer1:"+msg);}};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}
消费者2
publicclass consumer2{//声明交换机和队列类型publicstaticfinalStringEXCHANGE_NAME="direct_exchange";publicstaticfinalStringQUEUE_NAME="direct_queue2";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,true,false,false,null);//绑定队列至交换机,可以通过多个routingKey绑定一个队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");DefaultConsumer defaultConsumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String msg =newString(body);System.out.println("counsumer2:"+msg);}};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);}}
死信队列
延迟队列
版权归原作者 宽宽rrr 所有, 如有侵权,请联系我们删除。