springboot3+canal+rabbitMQ实现对缓存进行清除

springboot3+canal+rabbitMQ实现对缓存进行清除

canace
2024-01-30 / 0 评论 / 113 阅读 / 正在检测是否收录...

Canal 工作原理

  • Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • Canal 解析 binary log 对象(原始为 byte 流)

环境准备

本项目所有的环境都是在 Docker 环境下进行了,需要自行在安装 Docker

Mysql安装

# 拉取镜像
docker pull mysql

# 创建文件并写入内容
vim /usr/local/mysql/conf/my.cnf
[client]

default-character-set=utf8mb4

[mysql]

default-character-set=utf8mb4

[mysqld]

# 设置东八区时区
default-time_zone = '+8:00'

# 设置密码验证规则,default_authentication_plugin参数已被废弃

# 改为authentication_policy

#default_authentication_plugin=mysql_native_password
authentication_policy=mysql_native_password

# 限制导入和导出的数据目录
# 为空,不限制导入到处的数据目录;
# 指定目录,必须从该目录导入到处,且MySQL不会自动创建该目录;
# 为NULL,禁止导入与导出功能
#secure_file_priv=/var/lib/mysql
secure_file_priv=

init_connect='SET collation_connection = utf8mb4_0900_ai_ci'

init_connect='SET NAMES utf8mb4'

character-set-server=utf8mb4

collation-server=utf8mb4_0900_ai_ci

skip-character-set-client-handshake

skip-name-resolve

[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可

# 运行mysql容器
docker run -p 3306:3306 --name mysql --restart=always --privileged=true \
-v /usr/local/mysql/log:/var/log/mysql \
-v /usr/local/mysql/data:/var/lib/mysql \
-v /usr/local/mysql/conf:/etc/mysql \
-v /etc/localtime:/etc/localtime:ro \
-e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest

Canal安装

这里建议使用v1.1.6版本,v1.1.7版本使用时报错

# mysql创建授权账号
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

# 先创建一个临时容器
docker run --name canal -p 11111:11111 \
-d canal/canal-server:v1.1.6

# 复制配置文件
docker cp canal:/home/admin/canal-server/conf/canal.properties /mydata/canal/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf

# 删除该临时容器
docker rm -f canal

# 新建canal容器
docker run --name canal -p 11111:11111 \
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /mydata/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
--privileged=true \
--restart=always \
-d canal/canal-server:v1.1.6


# 修改/mydata/canal/conf/canal.properties
canal.serverMode = rabbitMQ
canal.destinations = canal.shortlink # 可自定义
rabbitmq.host = 192.168.10.107 # rabbitmq地址
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange # 可自定义
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode =

# 修改/mydata/canal/conf/instance.properties
canal.instance.mysql.slaveId=111 # 和mysql的server_id不一样即可
canal.instance.master.address= 192.168.10.107:3306 # mysql地址
canal.instance.dbUsername= canal # 数据库用户名
canal.instance.dbPassword= canal # 数据库密码
canal.instance.filter.regex=link\\..* # 白名单,这里link是表示link的数据库,其余内容可至canal官网查看配置
canal.instance.filter.black.regex=mysql\\.slave_.*,link\\.BASE.* # link\\.BASE.*得添加,否则会报错
canal.mq.topic= canal.shortlink # 和canal.destinations一致

Springboot

引入依赖

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

新建RabbitMQ配置类

package com.study.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author canace
 * @version 1.0
 * @description rabbitmq配置类
 * @date 2024/1/30 14:21
 */
@Configuration
@Slf4j
public class RabbitConfig {


    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(connectionFactory);
        template.setMessageConverter(new Jackson2JsonMessageConverter());

        return template;
    }

    /**
     * template.setMessageConverter(new Jackson2JsonMessageConverter());
     * 这段和上面这行代码解决RabbitListener循环报错的问题
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

新建消息实体类

package com.study.config;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * @author canace
 * @version 1.0
 * @description canal消息实体类
 * @date 2024/1/30 14:06
 */

@NoArgsConstructor
@Data
public class CanalMessage<T> {
    private String type;
    private String table;
    private List<T> data;
    private String database;
    private Long es;
    private Integer id;
    private Boolean isDdl;
    private List<T> old;
    private List<String> pkNames;
    private String sql;
    private Long ts;

}

新建监听类

package com.study.config;

import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.study.cacheManager.GroupCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * @author canace
 * @version 1.0
 * @description Canal + RabbitMQ 监听数据库变化
 * @date 2024/1/30 13:59
 */

@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {

    private final GroupCacheManager groupCacheManager;

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.shortlink"
            )
    })
    public void handleDataChange(@Payload CanalMessage<?> message) {
        String tableName = message.getTable();
        if ("t_group".equals(tableName)) {
            if ("INSERT".equals(message.getType())) {
                // 新增只需要清空用户名的分组缓存即可
                String username = JSONUtil.parseObj(message.getData().get(0)).getStr("username");
                log.info("新增分组,清空用户名为{}的分组缓存", username);
                groupCacheManager.delCacheGroupListByUsername(username);
            } else {
                // 其他情况需要清空当前gid的缓存和用户名的缓存
                String username = JSONUtil.parseObj(message.getData().get(0)).getStr("username");
                String gid = JSONUtil.parseObj(message.getData().get(0)).getStr("gid");
                log.info("修改或删除分组,清空用户名为{}的分组缓存和gid为{}的分组缓存", username, gid);
                groupCacheManager.delCacheGroupByGid(gid);
                groupCacheManager.delCacheGroupListByUsername(username);
            }


        }
    }
}

综上即可完成canal监听后由rabbitmq通知后,再对缓存进行处理

0

评论

博主关闭了所有页面的评论