这个模式是 RabbitMQ 中最简单的一个模式,他从 P(生产者) 发往 C(消费者)。在这个模式中你可以将他简单的描述为一个人在发一封信给另一个人。RabbitMQ 充当的是邮局、邮筒和邮递员。

让我们来玩一玩 RabbitMQ 的简单消息模式(Hello World)。首先准备好 java 的运行环境和 Maven ,还要准备好 IDEA 或者 Eclipse 。

让我们先导入一下 Maven 的包,在 pom.xml 中写入

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.3</version>
    </dependency>
</dependencies>

然后开始来实现代码功能,我们先写一个公用的连接类 ConnectRabbmit.

public class ConnectRabbmit {

    private static final String HOST = "localhost";
    private static final String USER_NAME = "mqadmin";
    private static final String PASSWORD = "mqadmin";
    private static final String VIRTUAL_HOST = "my_vhost";

    private static Connection connection = null;

    public static Channel getChannel(String queueName) {
        Channel channel = null;
        try {
            connection = getConnection().newConnection();
            channel = connection.createChannel();
            // 队列声明
            // queue, durable, exclusive, autoDelete, arguments
            // 队列、持久、独占、自动删除、参数
            channel.queueDeclare(queueName, false, false, false, null);
        } catch (Exception e) {
            try {
                connection.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
            e.printStackTrace();
        }
        return channel;
    }

    public static ConnectionFactory getConnection() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setUsername(USER_NAME); // 用户名
        factory.setPassword(PASSWORD); // 密码
        factory.setVirtualHost(VIRTUAL_HOST); // 设置虚拟主机
        return factory;
    }

    public static void closeConnection() {
        if (connection != null){
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

再来实现 生产者消费者 的代码

  • 生产者 producer

    /**
     * hello world producer (Hello World 的生产者)
     */
    public class Main {
    
        public static void main(String[] args) {
            try {
                String routingKey = "hello_world";
                // 获取 Channel
                Channel hello_world = ConnectRabbmit.getChannel(routingKey);
                // 发送消息
                /*
                    exchange - 将消息发布到的交换
                    routingKey – 路由密钥
                    props - 消息的其他属性 - 路由标头等
                    body – 消息正文
                 */
                int max = 100;
                int pointer = 0;
                for (; ; ) {
                    // 线程睡它个两秒
                    Thread.sleep(2000);
                    String now = LocalDateTime.now().toString();
                    // 推送到 RabbitMQ 上
                    hello_world.basicPublish("", routingKey, null, now.getBytes());
                    System.out.println("发送成功,当前发送的消息为:" + now);
                    pointer++;
                    if (pointer == max) {
                        System.out.println("连接已断开。");
                        ConnectRabbmit.closeConnection();
                        break;
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
  • 消费者 consumer

    /**
     * hello world consumer (Hello World 的消费者)
     */
    public class Main {
    
        public static void main(String[] args) {
            try {
                String routingKey = "hello_world";
                // 获取 Channel
                Channel hello_world = ConnectRabbmit.getChannel(routingKey);
                // 接收消息
                DeliverCallback deliverCallback = (consumerTag, message) -> {
                    // StandardCharsets.UTF_8 是标准Charsets常量定义。 这些字符集保证在 Java 平台的每个实现上都可用
                    String message1 = new String(message.getBody(), StandardCharsets.UTF_8);
                    System.out.println("接收成功,当前接收的消息为:" + message1);
                };
                hello_world.basicConsume(routingKey, true, deliverCallback, consumerTag -> {
                });
            } catch (Exception e) {
                ConnectRabbmit.closeConnection();
            }
        }
    }
    
    

    把两个代码都执行起来就可以得到两个推送和接受的示例

    • 生产者

      发送成功,当前发送的消息为:2021-07-19T15:13:07.303
      发送成功,当前发送的消息为:2021-07-19T15:13:09.314
      发送成功,当前发送的消息为:2021-07-19T15:13:11.328
      发送成功,当前发送的消息为:2021-07-19T15:13:13.337
      发送成功,当前发送的消息为:2021-07-19T15:13:15.348
      发送成功,当前发送的消息为:2021-07-19T15:13:17.358
      发送成功,当前发送的消息为:2021-07-19T15:13:19.370
      发送成功,当前发送的消息为:2021-07-19T15:13:21.382
      发送成功,当前发送的消息为:2021-07-19T15:13:23.391
      
    • 消费者

      接收成功,当前接收的消息为:2021-07-19T15:13:07.303
      接收成功,当前接收的消息为:2021-07-19T15:13:09.314
      接收成功,当前接收的消息为:2021-07-19T15:13:11.328
      接收成功,当前接收的消息为:2021-07-19T15:13:13.337
      接收成功,当前接收的消息为:2021-07-19T15:13:15.348
      接收成功,当前接收的消息为:2021-07-19T15:13:17.358
      接收成功,当前接收的消息为:2021-07-19T15:13:19.370
      接收成功,当前接收的消息为:2021-07-19T15:13:21.382
      接收成功,当前接收的消息为:2021-07-19T15:13:23.391
      

访问一下 RabbitMQ 的后台管理,也是可以看到推动过去的消息.