集成Spring AMQP
1.可通过如下 Docker 命令 安装 RabbiMQ:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
2.登录 RabbiMQ 的 web 管理界面,创建虚拟主机novel:
3.项目中加入如下的 maven 依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.在 application.yml 配置文件中加入 RabbitMQ 的连接配置:
spring:
rabbitmq:
addresses: "amqp://guest:guest@47.106.243.172"
virtual-host: novel
template:
retry:
# 开启重试
enabled: true
# 最大重试次数
max-attempts: 3
# 第一次和第二次重试之间的持续时间
initial-interval: "3s"
相关配置
1.建立相关常量:
/**
* @author canace
* @version 1.0
* @description AMQP 相关常量
* @date 2024/2/23 10:15
*/
public class AmqpConsts {
/**
* 小说信息改变MQ
*/
public static class BookChangeMQ {
/**
* 小说信息改变交换机
*/
public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE";
/**
* Elasticsearch book 索引更新的队列
*/
public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE";
/**
* Redis book 缓存更新的队列
*/
public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE";
}
}
2.创建 AMQP 配置类,配置各个交换机、队列以及绑定关系:
/**
* @author canace
* @version 1.0
* @description AMPQ 配置类
* @date 2024/2/23 10:25
*/
@Configuration
public class AmqpConfig {
/**
* 小说信息改变交换机(广播模式)
*/
@Bean
public FanoutExchange bookChangeExchange() {
return new FanoutExchange(AmqpConsts.BookChangeMQ.EXCHANGE_NAME);
}
/**
* Elasticsearch book 索引更新队列
*/
@Bean
public Queue esBookUpdateQueue() {
return new Queue(AmqpConsts.BookChangeMQ.QUEUE_ES_UPDATE);
}
/**
* Elasticsearch book 索引更新队列绑定到小说信息改变交换机
*/
@Bean
public Binding esBookUpdateQueueBinding() {
return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange());
}
}
3.配置 AMQP 消息管理类:
/**
* @author canace
* @version 1.0
* @description AMQP 消息管理器
* @date 2024/2/23 10:32
*/
@Component
@RequiredArgsConstructor
public class AmqpMsgManager {
private final AmqpTemplate amqpTemplate;
// 判断配置是否启动 RabbitMq
@Value("${spring.amqp.enabled:false}")
private boolean amqpEnabled;
// 在小说进行信息更新或者新建时,只需调用这个接口即可实现
// 这里没有写更新书籍信息的接口,不做演示
// amqpMsgManager.sendBookChangeMsg(dto.getBookId());
public void sendBookChangeMsg(Long bookId){
if (amqpEnabled) {
sendAmqpMessage(amqpTemplate, bookId);
}
}
private void sendAmqpMessage(AmqpTemplate amqpTemplate, Object message) {
// 如果在事务中则在事务执行完成后再发送,否则可以直接发送
if (TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
amqpTemplate.convertAndSend(AmqpConsts.BookChangeMQ.EXCHANGE_NAME, null, message);
}
});
return;
}
// 将Java对象转为Amqp,并发送消息
amqpTemplate.convertAndSend(AmqpConsts.BookChangeMQ.EXCHANGE_NAME, null, message);
}
}
4.配置队列监听器
/**
* @author canace
* @version 1.0
* @description Rabbit 队列监听器
* @date 2024/2/23 10:58
*/
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "spring", name = {"elasticsearch.enabled",
"amqp.enabled"}, havingValue = "true")
@Slf4j
public class RabbitQueueListener {
private final BookInfoMapper bookInfoMapper;
private final ElasticsearchClient esClient;
/**
* 监听小说信息改变的 ES 更新队列,更新最新小说信息到 ES
* 即如果调用了amqpMsgManager.sendBookChangeMsg(dto.getBookId());就在这里可以监听到,随后进行相关操作,这里是更新书籍信息到ES
*/
@RabbitListener(queues = AmqpConsts.BookChangeMQ.QUEUE_ES_UPDATE)
@SneakyThrows
public void updateEsBook(Long bookId) {
log.info("监听到书籍信息更新");
BookInfo bookInfo = bookInfoMapper.selectById(bookId);
IndexResponse response = esClient.index(i -> i
.index(EsConsts.BookIndex.INDEX_NAME)
.id(bookInfo.getId().toString())
.document(EsBookDto.build(bookInfo))
);
log.info("Indexed with version " + response.version());
}
}
评论