My logo
Published on

RocketMQ集成在SpringBoot里的用法

  • RocketMQ-5.1.0 服务端安装版本

  • SpringBoot-2.6.3 依赖Jar包

  • RocketmQ-Spring-2.2.3 依赖Jar包

一、 send 和 convertAndSend 两种不同的发送消息方式

Spring BootRocketMQ 集成时,sendconvertAndSend 是两种不同的发送消息方法。

它们的主要区别在于消息的转换和处理方式。让我为您详细解释这两种方法的区别:

SendResult send(String destination, Message<?> msg);
  • 直接发送一个已经构造好的 Message 对象。
  • 你需要自己创建 Message 对象,并设置所有必要的属性。
  • 提供了更多的灵活性和控制,因为你可以直接操作 Message 对象。
  • 适用于需要精细控制消息结构的场景。

使用示例:

Message<String> message = MessageBuilder.withPayload("Hello, RocketMQ!")
.setHeader(RocketMQHeaders.KEYS, "myKey")
.build();
SendResult result = rocketMQTemplate.send("topicName", message);

1. convertAndSend 方法

SendResult convertAndSend(String destination, Object payload);
  • 接受一个目标主题和一个 payload 对象。
  • Spring 会自动将 payload 对象转换为 Message。
  • 使用配置的消息转换器来序列化 payload。
  • 更加便捷,特别是当你只关心消息内容而不是消息结构时。
  • 适用于简单的消息发送场景。

使用示例:

MyObject payload = new MyObject("data");
SendResult result = rocketMQTemplate.convertAndSend("topicName", payload);

主要区别:

  1. 消息构造:
    1. send: 你需要手动构造完整的 Message 对象。
    2. convertAndSend: Spring 自动将你的 payload 转换为 Message 对象。
  2. 灵活性:
    1. send: 提供更多的控制,允许你设置消息的各种属性。
    2. convertAndSend: 更简单,但控制较少。
  3. 类型安全:
    1. send: 由于直接使用 Message 对象,类型安全性较低。
    2. convertAndSend: 可以直接使用你的领域对象,提供更好的类型安全性。
  4. 消息转换:
    1. send: 不进行自动的消息转换。
    2. convertAndSend: 使用配置的消息转换器自动进行转换。
  5. 使用场景:
    1. send: 适用于需要精细控制消息结构的复杂场景。
    2. convertAndSend: 适用于简单的消息发送场景,尤其是当你主要关注业务对象而不是消息结构时。

选择使用哪种方法主要取决于你的具体需求。如果你需要对消息有更多的控制,使用 send;如果你希望更简单直接地发送对象,使用 convertAndSend

二、 同步发送和异步发送

同步发送 同步发送会阻塞当前线程,直到消息发送完成并收到服务器的响应。

使用 send 方法:

// 使用 send 方法
SendResult sendResult = rocketMQTemplate.send("topicName", MessageBuilder.withPayload("Hello, RocketMQ!").build());

使用 convertAndSend 方法:

// 使用 convertAndSend 方法
SendResult sendResult = rocketMQTemplate.convertAndSend("topicName", "Hello, RocketMQ!");

这两种方法默认都是同步发送。调用后会立即返回 SendResult 对象,包含发送结果的详细信息。

异步发送 异步发送不会阻塞当前线程,而是通过回调函数处理发送结果。

使用 asyncSend 方法:

// 使用 asyncSend 方法
rocketMQTemplate.asyncSend("topicName", MessageBuilder.withPayload("Async Hello, RocketMQ!").build(), new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("消息发送成功:" + sendResult);
    }

    @Override
    public void onException(Throwable throwable) {
        System.out.println("消息发送失败:" + throwable.getMessage());
    }
});

使用 asyncSend 方法(带超时):

// 使用 asyncSend 方法,带超时参数
rocketMQTemplate.asyncSend("topicName", "Async Hello with Timeout", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("消息发送成功:" + sendResult);
    }

    @Override
    public void onException(Throwable throwable) {
        System.out.println("消息发送失败:" + throwable.getMessage());
    }
}, 5000); // 5000 毫秒超时

三、 单向发送(One-way)

单向发送不关心发送结果,发送后立即返回,不等待服务器响应。这是最快的但也是最不可靠的方式。

使用 sendOneWay 方法:

// 使用 sendOneWay 方法
rocketMQTemplate.sendOneWay("topicName", "One-way Hello, RocketMQ!");

比较:

  1. 同步发送:
    1. 优点:可靠性高,立即知道发送结果。
    2. 缺点:会阻塞当前线程,影响性能。
    3. 适用场景:对可靠性要求高,且能接受一定的延迟。
  2. 异步发送:
    1. 优点:不阻塞当前线程,性能好。
    2. 缺点:需要处理回调,代码复杂度增加。
    3. 适用场景:对性能要求高,但仍需要知道发送结果。
  3. 单向发送:
    1. 优点:性能最好,延迟最低。
    2. 缺点:不知道发送结果,可靠性最低。
    3. 适用场景:对可靠性要求不高,但需要最快的发送速度,如日志收集。