RabbitMQ 入门与应用

MOKE 2019-08-21 PM 12℃ 0条

RabbitMQ 概述

消息队列:地址

几种消息队列的区别:
在这里插入图片描述

安装
1.RabbitMQ 依赖 erlang,所以先安装 erlang 环境,配置环境变量:
在这里插入图片描述
在这里插入图片描述
2.在 RabbitMQ 的 sbin 目录下,执行命令

rabbitmq-plugins enable rabbitmq_management \开启管理后台的插件
rabbitmq-server start //cmd窗口运行,要保持开启则直接启动服务即可
如果上一步报错:ERROR: node with name "rabbit" already running on "localhost"
先 rabbitmqctl stop 再 rabbitmq-server start

3.此时我们可以通过浏览器访问:http://localhost:15672 就可以看到 RabbitMQ 的后台,默认用户和密码都是guest


RabbitMQ 的后台操作

主界面:
在这里插入图片描述
1.概述
在这里插入图片描述
2.连接,我们还没有程序连接到这个 RabbitMQ
在这里插入图片描述


添加一个用户
在这里插入图片描述vhost 管理
我添加了一个 root 的管理员账号,可以看到在 vhost 一栏是没有权限的,vhost 就相当于一个数据库,我们需要给用户授权。
在这里插入图片描述

  1. 首先创建一个 vhost, vhost 一般是以”/”开头,如:/vhost_test
    在这里插入图片描述
  2. 对我们创建的用户 root 授权
    在这里插入图片描述

Java 连接 RabbitMQ

1.老规矩,先引入JAR

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.2</version>
        </dependency>

2.连接工具类

        public class ConnectionUtil{
            private static Logger logger = Logger.getLogger(ConnectionUtil.class);
            
            public static Connection getConnection(){
                try{
                    Connection connection = null;
                    //定义一个连接工厂
                    ConnectionFactory factory = new ConnectionFactory();
                    //设置服务端地址(域名地址/ip)
                    factory.setHost("127.0.0.1");
                    //设置服务器端口号
                    factory.setPort(5672);
                    //设置虚拟主机(相当于数据库中的库)
                    factory.setVirtualHost("/vhost_test");
                    //设置用户名
                    factory.setUsername("root");
                    //设置密码
                    factory.setPassword("123456");
                    connection = factory.newConnection();
                    return connection;
                }
                catch (Exception e){
                    return null;
                }
            }
        }

接下我们就可以通过这个工具类获得连接,创建频道和声明队列了,可以分为以下几种:


简单队列

在这里插入图片描述
1.生产者-队列-消费者: 生产者一一对应消费
生产者:

        /**
         * 生产者
         * @author admin
         */
        public class Producter {
            public static void main(String[] args) {
                try{
                    //获取连接
                    Connection connection = ConnectionUtil.getConnection();
                    //从连接中获取一个通道
                    Channel channel = connection.createChannel();
                    //声明队列
                    channel.queueDeclare("test_queue", false, false, false, null);
                    String message = "hello mq";
                    //发送消息
                    channel.basicPublish("", "test_queue", null, message.getBytes("utf-8"));
                    System.out.println("[send]:" + message);
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

消费者:

        /**
         * 消费者
         * @author admin
         */
        public class Consumer {
        
            public static void main(String[] args) {
                try{
                    //获取连接
                    Connection connection = ConnectionUtil.getConnection();
                    //从连接中获取一个通道
                    Channel channel = connection.createChannel();
                    //声明队列
                    channel.queueDeclare("test_queue", false, false, false, null);
                    //定义消费者
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        //当消息到达时执行回调方法
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive]:" + message);
                        }
                    };
                    //监听队列
                    channel.basicConsume("test_queue", true, consumer);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

在这里插入图片描述
运行生产者,可以在 RabbitMQ 后台看到该队列,且有 1 条消息:
在这里插入图片描述
我们再运行消费者:
在这里插入图片描述
同时可以看到消息被使用,队列中没有了:
在这里插入图片描述
这样我们就实现了 RabbitMQ 的连接和使用,但这种简单队列缺点也很明显:

  • 不支持多个消费者使用同一个队列里的消息,且队列名变更,就得同时变更

工作队列

在这里插入图片描述

轮询分发

2.生产者-队列-多个消费者,即工作队列(Work Queues)
修改生产者,使其生成多个消息:

        public class Producter {
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
                    channel.queueDeclare("test_queue", false, false, false, null);

                     for (int i = 0; i < 50; i++) {
                        String message = "message_" + i;
                        System.out.println("[send]:" + message);
                        channel.basicPublish("", "test_queue", null, message.getBytes());
                        Thread.sleep(i * 20);
                    }
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

2.消费者加上等待时间,一个1s,一个3s:
消费者1

        public class Consumer1 {
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
                    channel.queueDeclare("test_queue", false, false, false, null);
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive]:" + message);
                        }
                    };
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicConsume("test_queue", true, consumer);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

消费者2

        public class Cunsumer2 {
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
                    channel.queueDeclare("test_queue", false, false, false, null);
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive]:" + message);
                        }
                    };
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicConsume("test_queue", true, consumer);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

首先我们分别开启两个消费者线程,因为队列中还没有内容,所以都会阻塞,这时我们再开启生产者线程,结果如下:
在这里插入图片描述
虽然处理结束的时间不同,但两个消费者接收到消息数量是一样的,这种方式叫轮询分发(round-robin)不管谁忙,都不会多给消息,总是你一个我一个。

公平分发

要想实现公平分发,需要禁止频道的自动应答,看下这句代码:

        channel.basicConsume(QUEUE_NAME, true, consumer);

这里的 true,其实就是 autoACK 的值,将其设置为 false,我们手动进行应答:

        public class Consumer {
            private static final String QUEUE_NAME = "test_queue";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    final Channel channel = connection.createChannel();
                    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                    //在每个消费者返回确认消息前,消息队列都不会发送下一个消息给消费者,保证一次发送一个消息
                    channel.basicQos(1);
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive1]:" + message);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            channel.basicAck(envelope.getDeliveryTag(),false);//手动应答
                        }
                    };
                    channel.basicConsume(QUEUE_NAME, false, consumer);//取消自动应答
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

结果如下,能者多劳:
在这里插入图片描述
公平分发相比轮询分发的好处:

  • 轮询分发,消息分发给消费者,则消息就会从内存中删除,如果此时这个消费者挂了,那么消息就丢失了;
  • 而公平分发,只有在等待到消息应答后才会删除内存中的信息,如果出现问题则会交给其他消费者。

订阅者模式

在这里插入图片描述
模式:一个生产者,多个消费者,消费者都有自己的队列,消息先发送到交换机exchange,每个队列都绑定到交换机,实现一个消息被多个消费者消费。
交换机:接收生产者的消息,向队列推送消息
交换机有三种类型:

  • Fanout
  • Direct
  • Topic
fanout

不处理路由键,队列绑定到交换机,直接转发到所有队列

生产者
生产者在向外发送消息时,指定了exchange(交换机)和routing key,但是没有指定queue(队列)也没有将queue(队列)绑定到exchange。

    public class Producter {
        private static final String EXCHANGE_NAME = "test_exchange_fanout";
        public static void main(String[] args) {
            try{
                Connection connection = ConnectionUtil.getConnection();
                Channel channel = connection.createChannel();
    
                //声明交换机,类型为fanout
                channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
    
                String msg = "hello mq!";
    
                channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());//发送方法和上面的不同!!!
    
                System.out.println("[send]:"+ msg);
                channel.close();
                connection.close();
            }
            catch (Exception e){
                e.printStackTrace();
            }
        }
    }

消费者1
消费者在消费消息时,需要声明队列(队列名字随便),并绑定到交换机。

注意下queueBind方法的参数顺序,队列和交换机顺序不能变,不然会报错:
在这里插入图片描述

        public class Consumer {
            private static final String EXCHANGE_NAME = "test_exchange_fanout";
            private static final String QUEUE_NAME = "test_queue_fanout_email";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    final Channel channel = connection.createChannel();
                    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                    //交换机绑定队列
                    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
                    
                    channel.basicQos(1);
                    
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive1]:" + message);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally {
                                channel.basicAck(envelope.getDeliveryTag(),false);
                            }
                        }
                    };
                    channel.basicConsume(QUEUE_NAME, false, consumer);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

消费者2

        public class Cunsumer2 {
            private static final String EXCHANGE_NAME = "test_exchange_fanout";
            private static final String QUEUE_NAME = "test_queue_fanout_sms";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    final Channel channel = connection.createChannel();
                    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
                    
                    channel.basicQos(1);
                    DefaultConsumer consumer = new DefaultConsumer(channel){
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException{
                            String message = new String(body, "utf-8");
                            System.out.println("[Receive2]:" + message);
                            try {
                                Thread.sleep(3000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }finally {
                                channel.basicAck(envelope.getDeliveryTag(),false);
                            }
                        }
                    };
                    channel.basicConsume(QUEUE_NAME, false, consumer);
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

结果:
在这里插入图片描述
在这里插入图片描述


direct

根据绑定的路由key,消息带哪个key,就路由到哪个队列。可以一个队列绑定多个key,如下:
在这里插入图片描述
生产者
注意要先换交换机名字。

        public class Producter{
            private static final String EXCHANGE_NAME = "test_exchange_direct";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
        
                    channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        
                    String msg = "hello direct!";
        
                    String routingKey = "error";
        
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
        
                    System.out.println("[send]:"+ msg);
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

消费者
将声明的队列通过routing key绑定到exchange,这样才能接收到数据,依照上图,如下:

        //消费者1,交换机绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        //消费者2
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

结果
1.生产者routingKey为error:
在这里插入图片描述
2.生产者routingKey为info:
在这里插入图片描述


topics

可以实现模式匹配
字符匹配:# 匹配一个或多个,而 * 匹配一个
在这里插入图片描述
生产者

        public class Producter{
            private static final String EXCHANGE_NAME = "test_exchange_topic";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    Channel channel = connection.createChannel();
        
                    channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        
                    String msg = "hello topic!";
                    String routingKey = "routingKey.one";//注意
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
                    System.out.println("[send]:"+ msg);
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

消费者

        //消费者1
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"routingKey.#");
        //消费者2
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"routingKey.two");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"routingKey.three");

结果
1.生产者routingKey为routingKey.one:
在这里插入图片描述
2.生产者routingKey为routingKey.two:
在这里插入图片描述


消息持久化

声明队列的时候,设置持久化:

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);

注: rabbitmq不支持更改现有名称的队列,直接改为false是不可以的。也就是说rabbitmq不支持重新定义一个已存在的队列。

我们通过持久化数据解决了rabbitMQ服务器异常的数据丢失问题,但是生产者也得知道是否成功发送消息到 rabbitMQ 服务器中,这样才能保证数据真正地不会丢失。

可以通过两种方式来确认:

AMQP协议
其实就是将生成者发送消息的操作变成一个事务:

        try{
            channel.txSelect();//声明事务
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
            System.out.println("[send]:"+ msg);
            channel.txCommit();//提交事务
        }catch(Exception e){
            channel.txRollback();//回滚事务
        }

confirm模式
Confirm发送方确认模式使用和事务类似,不过相比使用事务来保证数据的可靠性,confirm模式的性能更好。

实现原理:
生产者将 channel 设置成confirm模式,所有在该 channel 上面发布的消息都将会被指派一个唯一的ID(从1开始),

  • 消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID)
  • 如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

Confirm的三种实现方式:

  • channel.waitForConfirms():普通发送方确认模式,每发送一条消息,调用waitForConfirms()方法等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
        public class Producter{
            private static final String QueueName = "test_confirm";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    final Channel channel = connection.createChannel();
                    
                    channel.queueDeclare(QueueName, true, false, false, null);//记得开启持久化
                    channel.confirmSelect();//设置 channel 为 confirm 模式
                    
                    String msg = "hello topic";
                    String routingKey = "info";
        
                    for(int i=0;i < 10;i++){
                        channel.basicPublish("", QueueName, null, (msg+"_"+i).getBytes());
                        if (channel.waitForConfirms()) {//普通发送方确认模式
                            System.out.println(msg+"_"+i);
                        }
                    }
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
  • channel.waitForConfirmsOrDie():批量确认模式,每发送一批消息之后,调用waitForConfirms()方法,使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
        public class Producter{
            private static final String QueueName = "test_confirm";
            public static void main(String[] args) {
                try{
                    Connection connection = ConnectionUtil.getConnection();
                    final Channel channel = connection.createChannel();
        
                    channel.queueDeclare(QueueName, true, false, false, null);
                    channel.confirmSelect();//设置 channel 为 confirm 模式
                    
                    String msg = "hello topic";
                    String routingKey = "info";
    
                    for(int i=0;i< 10;i++){
                        channel.basicPublish("", QueueName, null, (msg+"_"+i).getBytes());
                    }
                    channel.waitForConfirmsOrDie();//批量确认模式
                    System.out.println("全部执行完成");
                    channel.close();
                    connection.close();
                }
                catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
  • channel.addConfirmListener():异步监听发送方确认模式,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认
        //Channel信道在收到broker的ack消息之后会回调设置在该信道监听器上的handleAck方法,在收到nack消息之后会回调设置在该信道监听器上的handleNack方法
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
            }
                
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);
            }
        });
标签: RabbitMQ

非特殊说明,本博所有文章均为博主原创。

评论啦~