公众号「后端进阶」 每一步成长都想与你分享

RabbitMQ的一些基本概念

2018-10-16
zch

MQ 全称为 Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法,即我们常说的中间件之一,而 RabbitMQ 则是 MQ 的一种开源实现,遵循 AMQP(高级消息队列协议) 协议。

AMQP 相关概念

MQ 的模型从大体上看,都是类似的,如下:

mq

而 RabbitMQ 由于是基于 AMQP 协议的开源实现,AMQP 协议比 MQ 模型有更加详细的模型概念,如下:

信道

如果项目需要发布消息,那么必须要链接到 RabbitMQ,而项目于 RabbitMQ之间使用 TCP 连接,加入每次发布消息都要连接TCP,这不仅会造成连接资源严重浪费,会造成服务器性能瓶颈,所以 RabbitMQ 为所有的线程只用一条 TCP 连接,怎么实现的呢?RabbitMQ 引入了信道的概念,所有需要发布消息的线程都包装成一条信道在 TCP 中传输,理论上 一条 TCP 连接支持无限多个信道,模型如下:

mq

队列

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

绑定

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表,如下:

mq

交换器

我们向 RabbitMQ 发送消息,实际上是把消息发到交换器了,再由交换器根据相关路由规则发到特定队列上,在队列上监听的消费者就可以进行消费了,目前 RabbitMQ 共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

direct交换器

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

mq

fanout交换器

每个发送到 fanout 交换器中的消息,他不会去匹配路由键,直接把消息投递到所有绑定到 fanout 交换器中的队列上,它就像一个广播站一样,它会向所有收听广播的用户发送消息。对应到系统上,它允许你针对一个消息作不同操作,比如用户上传了一张新的图片,系统要同时对这个事件进行不同的操作,比如删除旧的图片缓存、增加积分奖励等等。这样就大大降低了系统之间的耦合度了。

mq

topic交换器

topic 交换器有点类似于 direct 交换器,它通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

mq

RabbitMQ 运行与管理

  • 在 Linux 下安装:
$ yum install rabbitmq-server
  • 启动 RabbitMQ:
$ /sbin/rabbitmq-server  // 开启
$ /sbin/rabbitmq-server -detached // 守护线程开启
$ /sbin/rabbitmqctl reset // 重启节点
  • 停止 RabbitMQ:
$ /sbin/rabbitmqctl stop // 停止
$ /sbin//rabbitmqctl stop_app // 停止应用
$ /sbin/rabbitmqctl -n rabbit@server.example.com stop  // 停止特定节点

这里需要特别注意一下这两个命令的区别:由于 RabbitmMQ 是用 Erlang 写的,Erlang 有节点的概念,也就是在一个 Erlang 节点上,可以运行很多个 Erlang 应用,比如 RabbitMQ。stop 命令是使得整个 Erlang 节点停止工作,而 stop_app 则是使得当前应用停止工作,不会影响其它应用的正常运行。

  • 查看 RabbitMQ 状态:
$ /sbin/rabbitmqctl status
  • 查看绑定:
$ /sbin/rabbitmqctl list_bindings
  • 查看交换器:
$ /sbin/rabbitmqctl list_exchanges
  • 查看已声明的队列:
$ /sbin/rabbitmqctl list_queues
  • 配置文件
$ sudo vim /etc/rabbitmq/rabbitmq.config // 配置文件位置

客户端

RabbitMQ 客户端支持多种语言,作为一个 Javaer,这里当然是要用 Java 语言啦:

  • 引入 Maven 依赖:
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
  • 生产者:
public class Producer {

  public static void main(String[] argv) throws Exception {
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("guest");
    factory.setPassword("guest");
    //设置 RabbitMQ 地址
    factory.setHost("localhost");
    //建立到代理服务器到连接
    Connection conn = factory.newConnection();
    //获得信道
    Channel channel = conn.createChannel();
    //声明交换器
    String exchangeName = "hello-exchange";
    channel.exchangeDeclare(exchangeName, "direct", true);

    String routingKey = "hola";
    //发布消息
    byte[] messageBodyBytes = "quit".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

    channel.close();
    conn.close();
  }

}
  • 消费者:
public class Consumer {

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setHost("localhost");
    //建立到代理服务器到连接
    Connection conn = factory.newConnection();
    //获得信道
    final Channel channel = conn.createChannel();
    //声明交换器
    String exchangeName = "hello-exchange";
    channel.exchangeDeclare(exchangeName, "direct", true);
    //声明队列
    String queueName = channel.queueDeclare().getQueue();
    System.out.println("声明队列名称:" + queueName);
    String routingKey = "hola";
    //绑定队列,通过键 hola 将队列和交换器绑定起来
    channel.queueBind(queueName, exchangeName, routingKey);

    while(true) {
      //消费消息
      boolean autoAck = false;
      String consumerTag = "";
      channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
          String routingKey = envelope.getRoutingKey();
          String contentType = properties.getContentType();
          System.out.println("消费的路由键:" + routingKey);
          System.out.println("消费的内容类型:" + contentType);
          long deliveryTag = envelope.getDeliveryTag();
          //确认消息
          channel.basicAck(deliveryTag, false);
          System.out.println("消费的消息体内容:");
          String bodyStr = new String(body, "UTF-8");
          System.out.println(bodyStr);

        }
      });
    }
  }

}

更多精彩文章请关注作者维护的公众号「后端进阶」,这是一个专注后端相关技术的公众号。 关注公众号并回复「后端」免费领取后端相关电子书籍。 欢迎分享,转载请保留出处。

微信公众号「Java科代表」

Content