0


java操作RabbitMQ

管理界面

​​​​​​http://127.0.0.1:15672/ MQ启动后进入管理界面

在Add a new exchange中新增一个交换机

    Name: 交换机名称

    Type:交换机类型。 指定exchange按照何种策略投递消息到queue中

            **· **direct:直连![](https://img-blog.csdnimg.cn/direct/400cb67edff144118a6c6a313af75540.png), 指定key根据key将消息发送到指定的queue中。

            **· **topic:主题![](https://img-blog.csdnimg.cn/direct/6a5812542777430ab7a69d39e54f7718.png),  # 代表0或多个单词, * 代表一个单词,.相当于单词分隔符。 如  junior.jvm会发送两个Queue而junior.abc.jvm只会发送一个

            **· **fanout:广播![](https://img-blog.csdnimg.cn/direct/321affc0963f42a28b136c9b7221682a.png), 发送到所有关联的queue。

    Durability : 是否持久化。 durable持久化。Transient非持久化

    Auto delete:在没有绑定任何队列时是都自动删除

    Internal : 内部交换机,不暴露给外部使用(一般是自带的) 。  

    Arguments: 给此exchange设置一些其它参数

点击一个创建完成的交换机进去

Overview : 一些交换机的基本信息

Bindings : 用于绑定队列使用。 当前队列为direct类型。在上图中TO queue 设置绑定的队列。Routing key设置key的值

Publish message : 发布一个消息当前交换机(其中Payload为消息内容)

Delete this exchange:删除当前交换机

新增一个队列Add a new queue

    Type: 队列的类型。  Classic主队列(传统队列) quorum从队列(适用于分布式消息队列) Stream自3.9.0版本开始引入的一种新的数据队列类型

    name : 队列名称

    Durability:持久化(同交换机一致)

    Auto delete: 在没有被任何交换机绑定时自动删除

    Arguments : 置一些其它参数 如: TTL 消息的超时时长。队列的最大消息数等。

java操作

非springBoot操作

    引入对应jar包
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.6.0</version>
            </dependency>

新增MQ.properties文件

rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest

新增RabbitMQConfig类读取配置文件

package mq;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class RabbitMQConfig {

    private static final String configName = "/MQ.properties";
    
    public static String getHost() throws IOException  {
        return getProperties("rabbitmq.host");
    }

    public static int getPort() throws IOException  {
        return Integer.parseInt(getProperties("rabbitmq.port")) ;
    }
    
    public static String getUsername() throws IOException  {
        return getProperties("rabbitmq.username");
    }
    
    public static String getPassword() throws IOException {
        return getProperties("rabbitmq.password");
    }
    
    private static String getProperties(String key) throws IOException {
        Properties properties = new Properties();
        InputStream inputStream = RabbitMQConfig.class.getResourceAsStream(configName);
        properties.load(inputStream);
        return     properties.getProperty(key);
    }
    
    
}

1.生产者

新增MQUtil工具类获取链接。发送消息

package mq;

import java.io.IOException;
import java.util.UUID;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MQUtil {

    private static ConnectionFactory factory = null;// MQ采用工厂模式来完成连接的创建

    private static void initFactory() throws IOException {
        factory = new ConnectionFactory();
        factory.setHost(RabbitMQConfig.getHost());// 设置MQ安装的服务器ip地址
        factory.setPort(RabbitMQConfig.getPort());// 设置端口号
        factory.setVirtualHost("/");//设置虚拟主机名称
        // MQ通过用户来管理
        factory.setUsername(RabbitMQConfig.getUsername());// 设置用户名称
        factory.setPassword(RabbitMQConfig.getUsername());// 设置用户密码
    }

    public static Connection getConnection() throws Exception {
        if (factory == null) {
            initFactory();
        }
        // 通过工厂对象获取连接
        Connection connection = factory.newConnection();

        System.out.println("MQ连接成功!");

        return connection;
    }
    
    /**
     *  封装的发送消息类
     * 
     * @param msg 消息内容
     * @param ExchangeName 交换机名
     * @param key KEY名称
     * @param properties 消息配置
     * @return
     * @throws Exception
     */

    public static String sendMq(String msg, String ExchangeName, String key, AMQP.BasicProperties properties)
            throws Exception {
        // 1.创建连接工厂
        Connection connection = getConnection();
        // mq提供Channel来将处理消息
        // 创建Channel
        Channel channel = connection.createChannel();
        // basicPublish将消息发送到指定的交换机
        channel.txSelect();
        String Uid = UUID.randomUUID().toString().replace("-", "");
        if (properties == null) {
            properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2) // 持久化消息
                    .contentEncoding("UTF-8")
                    .messageId(Uid)
                    // .expiration("20000") // TTL
                    .build();
        }
        channel.basicPublish(ExchangeName, key, properties, msg.getBytes());
        channel.txCommit();
        // 关闭连接
        channel.close();
        connection.close();
        return Uid;
    }

}

测试类

package mq;

public class MqTest {

    
    private static String exchange = "test.direct";
    
    public static void main(String[] args) throws Exception {
        MQUtil.sendMq("hello", exchange, "T", null);
    }
    
}

2.消费者

新增MqInter接口用于实现业务逻辑

package mq;

public interface MqInter {
    //返回队列名称
    String getQuereName();
    
    /**
     * 
     * @param msg 消息内容
     * @param uid 消息主键
     * @return "Y" 正常处理功能  "N" 处理失败并抛弃当前消息  "X" 处理失败并保存当前消息
     */
    String dispose(String msg, String uid);
    
}

消费者工具类

package mq;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class MqClientUtil {
    
    public static void start(MqInter mq){
        try {
            Connection connection = MQUtil.getConnection();
            //创建Channel
            Channel channel = connection.createChannel();
           
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String messageId = properties.getMessageId();
                    String msg = new String(body);
                    String dispose = mq.dispose(msg, messageId);
                    if("Y".equals(dispose)) {
                        //成功
                         channel.basicAck(envelope.getDeliveryTag(),false);
                         System.out.println("成功!");
                    }else if("N".equals(dispose)){
                        //失败 保存消息
                         channel.basicNack(envelope.getDeliveryTag(), false, true);
                         System.out.println("失败N!");
                    }else if("X".equals(dispose)) {
                        //失败 丢掉消息
                        channel.basicReject(envelope.getDeliveryTag(), false);
                         System.out.println("失败X!");
                    }
                }
            };
        channel.basicConsume(mq.getQuereName(),false,consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者测试类

package mq;

public class clientAction implements MqInter{

    @Override
    public String getQuereName() {
        // TODO Auto-generated method stub
        return "test01.queue";
    }

    @Override
    public String dispose(String msg, String uid) {
        System.out.println("msg"+msg);
        System.out.println("uid"+uid);
        return "N";
    }

    
    
    public static void main(String[] args) {
        
        MqClientUtil.start(new clientAction());
    }
    
}
标签: rabbitmq

本文转载自: https://blog.csdn.net/weixin_42188435/article/details/138667352
版权归原作者 ~jshu~ 所有, 如有侵权,请联系我们删除。

“java操作RabbitMQ”的评论:

还没有评论