SpringBoot集成RabbitMq,实现监听数据变化后更新内容

SpringBoot集成RabbitMq,实现监听数据变化后更新内容

canace
2024-02-23 / 0 评论 / 116 阅读 / 正在检测是否收录...

集成Spring AMQP

1.可通过如下 Docker 命令 安装 RabbiMQ:

 docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management

2.登录 RabbiMQ 的 web 管理界面,创建虚拟主机novel:
rabbitmq

3.项目中加入如下的 maven 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.在 application.yml 配置文件中加入 RabbitMQ 的连接配置:

spring:
  rabbitmq:
    addresses: "amqp://guest:guest@47.106.243.172"
    virtual-host: novel
    template:
      retry:
        # 开启重试
        enabled: true
        # 最大重试次数
        max-attempts: 3
        # 第一次和第二次重试之间的持续时间
        initial-interval: "3s"

相关配置

1.建立相关常量:

/**
 * @author canace
 * @version 1.0
 * @description AMQP 相关常量
 * @date 2024/2/23 10:15
 */
public class AmqpConsts {

    /**
     * 小说信息改变MQ
     */

    public static class BookChangeMQ {
        /**
         * 小说信息改变交换机
         */
        public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE";

        /**
         * Elasticsearch book 索引更新的队列
         */
        public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE";

        /**
         * Redis book 缓存更新的队列
         */
        public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE";
    }
}

2.创建 AMQP 配置类,配置各个交换机、队列以及绑定关系:

/**
 * @author canace
 * @version 1.0
 * @description AMPQ 配置类
 * @date 2024/2/23 10:25
 */

@Configuration
public class AmqpConfig {

    /**
     * 小说信息改变交换机(广播模式)
     */
    @Bean
    public FanoutExchange bookChangeExchange() {
        return new FanoutExchange(AmqpConsts.BookChangeMQ.EXCHANGE_NAME);
    }

    /**
     * Elasticsearch book 索引更新队列
     */
    @Bean
    public Queue esBookUpdateQueue() {
        return new Queue(AmqpConsts.BookChangeMQ.QUEUE_ES_UPDATE);
    }

    /**
     * Elasticsearch book 索引更新队列绑定到小说信息改变交换机
     */
    @Bean
    public Binding esBookUpdateQueueBinding() {
        return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange());
    }
}

3.配置 AMQP 消息管理类:

/**
 * @author canace
 * @version 1.0
 * @description AMQP 消息管理器
 * @date 2024/2/23 10:32
 */


@Component
@RequiredArgsConstructor
public class AmqpMsgManager {

    private final AmqpTemplate amqpTemplate;

    // 判断配置是否启动 RabbitMq
    @Value("${spring.amqp.enabled:false}")
    private boolean amqpEnabled;

    // 在小说进行信息更新或者新建时,只需调用这个接口即可实现
    // 这里没有写更新书籍信息的接口,不做演示
    // amqpMsgManager.sendBookChangeMsg(dto.getBookId());
    public void sendBookChangeMsg(Long bookId){
        if (amqpEnabled) {
            sendAmqpMessage(amqpTemplate, bookId);
        }
    }

    private void sendAmqpMessage(AmqpTemplate amqpTemplate, Object message) {

        // 如果在事务中则在事务执行完成后再发送,否则可以直接发送
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization(
                    new TransactionSynchronization() {
                        @Override
                        public void afterCommit() {
                            amqpTemplate.convertAndSend(AmqpConsts.BookChangeMQ.EXCHANGE_NAME, null, message);
                        }
                    });
            return;
        }
        // 将Java对象转为Amqp,并发送消息
        amqpTemplate.convertAndSend(AmqpConsts.BookChangeMQ.EXCHANGE_NAME, null, message);
    }
}

4.配置队列监听器

/**
 * @author canace
 * @version 1.0
 * @description Rabbit 队列监听器
 * @date 2024/2/23 10:58
 */

@Component
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "spring", name = {"elasticsearch.enabled",
        "amqp.enabled"}, havingValue = "true")
@Slf4j
public class RabbitQueueListener {

    private final BookInfoMapper bookInfoMapper;
    private final ElasticsearchClient esClient;


    /**
     * 监听小说信息改变的 ES 更新队列,更新最新小说信息到 ES
     * 即如果调用了amqpMsgManager.sendBookChangeMsg(dto.getBookId());就在这里可以监听到,随后进行相关操作,这里是更新书籍信息到ES
     */
    @RabbitListener(queues = AmqpConsts.BookChangeMQ.QUEUE_ES_UPDATE)
    @SneakyThrows
    public void updateEsBook(Long bookId) {
        log.info("监听到书籍信息更新");
        BookInfo bookInfo = bookInfoMapper.selectById(bookId);
        IndexResponse response = esClient.index(i -> i
                .index(EsConsts.BookIndex.INDEX_NAME)
                .id(bookInfo.getId().toString())
                .document(EsBookDto.build(bookInfo))
        );
        log.info("Indexed with version " + response.version());
    }
}

注:当服务集群部署时,由于多个消费者绑定同一个队列是无法同时消费的,一个消息只能被一个消费者消费,所以刷新本地缓存的 MQ 队列命名应该使用固定名 + 唯一随机值这种动态形式。这样每次启动会生成一个新的队列,我们需要设置该队列的 autoDelete = true,让所有消费客户端连接断开时自动删除该队列

1

评论

博主关闭了所有页面的评论