//创建channel
Channel channel = connection.createChannel();
//创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
channel.queueDeclare(“hello world”,true,false,false,null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
for (int i=0;i<5;i++){
String message=“hello god”;
//发送消息
channel.basicPublish(“”,“hello world”,null,message.getBytes());
System.out.println(“发送成功”);
}
channel.close();
connection.close();
}
}
消费者:
package cn.xjt.rabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.AMQImpl;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @author xu
- @Description
- @createTime 2021年02月23日 11:00:00
*/
public class ConsumerHelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost(“ip”); //主机,默认为localhost
factory.setPort(5672); //端口号
factory.setVirtualHost(“/xujiangtao”); //虚拟机
factory.setUsername(“admin”);
factory.setPassword(“000000”);
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
// channel.queueDeclare(“hello world”,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
//回调方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println(“consumerTag=”+consumerTag);
// System.out.println(“Exchange=”+envelope.getExchange());
// System.out.println(“RoutingKey=”+envelope.getRoutingKey());
// System.out.println(“properties=”+properties.toString());
System.out.println(“接受成功”);
System.out.println(“消息为=”+new String(body));
}
};
channel.basicConsume(“hello world”,true,consumer);
}
}
通配符(topic)模式下的生产者与消费者代码如下
topic模式下,生产者与消费者关系为一对多,通过通配符来判断是否发布到队列
通配符中 * 为一个单词。 # 为一个或多个单词
如#.error所有error结束的通配符信息的都可以发布到当前队列
例如: log.error user.error 都可以接受 log.warning不接受
*.error 表示error前面只能有一个单词
- . * 表示所有以(一个单词).(一个单词)都可以接受
生产者:
package cn.xjt.rabbitmq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @author xu
- @Description
- @createTime 2021年02月23日 10:14:00
*/
public class ProductTopics {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost(“ip”); //主机,默认为localhost
factory.setPort(5672); //端口号
factory.setVirtualHost(“/xujiangtao”); //虚拟机
factory.setUsername(“admin”);
factory.setPassword(“000000”);
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
String exChangeName=“test_exChange_topic”;
//创建交换机
channel.exchangeDeclare(exChangeName, BuiltinExchangeType.TOPIC,true, false, false, null);
//创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
String queuesName1=“test_topic_queues1”;
String queuesName2=“test_topic_queues2”;
channel.queueDeclare(queuesName1,true,false,false,null);
channel.queueDeclare(queuesName2,true,false,false,null);
//绑定交换机和队列
channel.queueBind(queuesName1, exChangeName, “#.error”, null);
channel.queueBind(queuesName2, exChangeName, “user.*”, null);
/*
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
*/
String message=“hello god”;
//发送消息
channel.basicPublish(exChangeName,“add.user.error”,null,message.getBytes());
System.out.println(“发送成功”);
channel.close();
connection.close();
}
}
多个消费者:
Consumer1
在这里插入代码片package cn.xjt.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
- @author xu
- @Description
- @createTime 2021年02月23日 12:18:00
*/
public class ConsumerTopic1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置参数
factory.setHost(“ip”); //主机,默认为localhost
factory.setPort(5672); //端口号
factory.setVirtualHost(“/xujiangtao”); //虚拟机
factory.setUsername(“admin”);
factory.setPassword(“000000”);
//创建连接
Connection connection = factory.newConnection();
//创建channel
Channel channel = connection.createChannel();
//创建队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
*/
// channel.queueDeclare(“hello world”,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
//回调方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// System.out.println(“consumerTag=”+consumerTag);
// System.out.println(“Exchange=”+envelope.getExchange());
// System.out.println(“RoutingKey=”+envelope.getRoutingKey());
// System.out.println(“properties=”+properties.toString());
System.out.println(“队列1接受成功,消息级别为error,存储数据库成功”);
System.out.println(“消息为=”+new String(body));
}
};
String queuesName1=“test_topic_queues1”;
String queuesName2=“test_topic_queues2”;
//消费队列
channel.basicConsume(queuesName1, true, consumer);
// channel.basicConsume(queuesName2, true, consumer);
}
}
Consumer2
最后
这份《“java高分面试指南”-25分类227页1000+题50w+字解析》同样可分享给有需要的朋友,感兴趣的伙伴们可挑战一下自我,在不看答案解析的情况,测试测试自己的解题水平,这样也能达到事半功倍的效果!(好东西要大家一起看才香)
1=“test_topic_queues1”;
String queuesName2=“test_topic_queues2”;
//消费队列
channel.basicConsume(queuesName1, true, consumer);
// channel.basicConsume(queuesName2, true, consumer);
}
}
Consumer2
最后
这份《“java高分面试指南”-25分类227页1000+题50w+字解析》同样可分享给有需要的朋友,感兴趣的伙伴们可挑战一下自我,在不看答案解析的情况,测试测试自己的解题水平,这样也能达到事半功倍的效果!(好东西要大家一起看才香)
[外链图片转存中…(img-DLvAQPHD-1714576460309)]
[外链图片转存中…(img-oWIOSWjD-1714576460310)]
本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录
版权归原作者 2401_84024343 所有, 如有侵权,请联系我们删除。