admin管理员组

文章数量:1437370

SpringBoot系列之RabbitMQ可靠性投递实践教程

基于SpringBoot 2.2.1.RELEASE集成RabbitMQ的可靠性投递实践,以下是一个详细的测试例子,包括如何配置、发送消息、接收消息,并验证消息的可靠性投递。

一、环境准备

安装 RabbitMQ

使用 Docker 快速启动 RabbitMQ 服务:

代码语言:javascript代码运行次数:0运行复制
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

访问管理界面 http://localhost:15672,使用默认账户 guest/guest 登录,验证服务是否正常运行。

创建 Spring Boot 项目

  • 使用 Spring Initializr 快速创建项目,添加以下依赖:
    • Spring Boot DevTools
    • Spring Web
    • Spring AMQP
二、引入依赖

pom.xml 文件中添加 RabbitMQ 相关依赖:

代码语言:javascript代码运行次数:0运行复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
三、配置 RabbitMQ

application.yml 文件中配置 RabbitMQ 的连接信息:

代码语言:javascript代码运行次数:0运行复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 开启生产者确认机制
    publisher-returns: true  # 开启消息返回机制
    template:
      mandatory: true  # 消息路由失败时返回给生产者
四、生产者配置
1. 配置交换机、队列和绑定关系

创建一个配置类,定义交换机、队列和绑定关系:

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "my-queue";
    public static final String EXCHANGE_NAME = "my-exchange";
    public static final String ROUTING_KEY = "my-routing-key";

    @Bean
    public Queue queue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }
}
2. 配置 RabbitTemplate

配置 RabbitTemplate,开启 Confirm 和 Return 回调:

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class RabbitTemplateConfig {
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);

        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息发送到交换机成功,消息ID:{}", correlationData.getId());
            } else {
                log.info("消息发送到交换机失败,消息ID:{},原因:{}", correlationData.getId(), cause);
            }
        });

        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息发送失败,交换机:{},路由键:{},原因:{}", exchange, routingKey, replyText);
        });

        return rabbitTemplate;
    }
}
3. 发送消息

创建一个服务类,用于发送消息:

代码语言:javascript代码运行次数:0运行复制
@Service
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message, correlationData);
    }
}
五、消费者配置
1. 开启手动 ACK

在配置文件中设置手动 ACK:

代码语言:javascript代码运行次数:0运行复制
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认模式
2. 创建消费者

创建一个消费者类,监听队列并手动确认消息:

代码语言:javascript代码运行次数:0运行复制
@Component
public class MessageReceiver {
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        log.info("接收到消息:{}", message);
        try {
            // 模拟消息处理逻辑
            Thread.sleep(1000);
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("消息处理失败,消息内容:{}", message, e);
            // 消息处理失败,拒绝消息并重新入队
            channel.basicNack(deliveryTag, false, true);
        }
    }
}
六、消息持久化
1. 队列持久化

在定义队列时,将 durable 参数设置为 true

代码语言:javascript代码运行次数:0运行复制
@Bean
public Queue queue() {
    return QueueBuilder.durable(QUEUE_NAME).build();
}
2. 消息持久化

在发送消息时,设置消息的 deliveryMode2

代码语言:javascript代码运行次数:0运行复制
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message msg = new Message(message.getBytes(), messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, msg);
七、消息失败重试机制
1. 本地重试机制

在消费者配置中开启本地重试机制:

代码语言:javascript代码运行次数:0运行复制
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true  # 开启消费者失败重试
          initial-interval: 1000  # 初始的失败等待时长为1秒
          multiplier: 1  # 失败的等待时长倍数
          max-attempts: 3  # 最大重试次数
2. 失败策略

使用 RepublishMessageRecoverer 将失败消息投递到指定的异常队列:

代码语言:javascript代码运行次数:0运行复制
@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorExchange() {
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error.routing");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error.routing");
    }
}
八、测试

发送消息

调用 MessageSendersendMessage 方法发送消息。

示例代码:

代码语言:javascript代码运行次数:0运行复制
@RestController
public class TestController {
    @Autowired
    private MessageSender messageSender;

    @GetMapping("/send")
    public String sendMessage() {
        String message = "Hello, RabbitMQ!";
        messageSender.sendMessage(message);
        return "消息发送成功";
    }
}

接收消息

  • 观察消费者是否正确接收并处理消息。

模拟失败

在消费者中抛出异常,观察重试机制是否生效。

示例代码:

代码语言:javascript代码运行次数:0运行复制
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
    log.info("接收到消息:{}", message);
    try {
        // 模拟消息处理失败
        throw new RuntimeException("消息处理失败");
    } catch (Exception e) {
        log.error("消息处理失败,消息内容:{}", message, e);
        // 消息处理失败,拒绝消息并重新入队
        channel.basicNack(deliveryTag, false, true);
    }
}

查看异常队列

  • 检查异常消息是否被投递到指定的异常队列。
九、总结

通过以上配置和实践,可以实现基于 Spring Boot 2.2.1.RELEASE 的 RabbitMQ 可靠性投递,确保消息在生产者、RabbitMQ 和消费者之间的可靠传输。关键点包括:

  • 生产者确认机制(Confirm 和 Return 回调)
  • 消息持久化(交换机、队列、消息)
  • 消费者手动确认(ACK)
  • 消息失败重试机制(本地重试和异常队列)

希望这篇教程能帮助你更好地理解和实现 RabbitMQ 的可靠性。

推荐,腾讯云代码助手平台:

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-05-01,如有侵权请联系 cloudcommunity@tencent 删除实践rabbitmq队列教程配置

本文标签: SpringBoot系列之RabbitMQ可靠性投递实践教程