Canal 工作原理
- Canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- Canal 解析 binary log 对象(原始为 byte 流)
环境准备
本项目所有的环境都是在 Docker 环境下进行了,需要自行在安装 Docker
Mysql安装
# 拉取镜像
docker pull mysql
# 创建文件并写入内容
vim /usr/local/mysql/conf/my.cnf
[client]
default-character-set=utf8mb4
[mysql]
default-character-set=utf8mb4
[mysqld]
# 设置东八区时区
default-time_zone = '+8:00'
# 设置密码验证规则,default_authentication_plugin参数已被废弃
# 改为authentication_policy
#default_authentication_plugin=mysql_native_password
authentication_policy=mysql_native_password
# 限制导入和导出的数据目录
# 为空,不限制导入到处的数据目录;
# 指定目录,必须从该目录导入到处,且MySQL不会自动创建该目录;
# 为NULL,禁止导入与导出功能
#secure_file_priv=/var/lib/mysql
secure_file_priv=
init_connect='SET collation_connection = utf8mb4_0900_ai_ci'
init_connect='SET NAMES utf8mb4'
character-set-server=utf8mb4
collation-server=utf8mb4_0900_ai_ci
skip-character-set-client-handshake
skip-name-resolve
[mysqld]
log-bin=mysql-bin # 开启binlog
binlog-format=ROW # 选择ROW模式
server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
# 运行mysql容器
docker run -p 3306:3306 --name mysql --restart=always --privileged=true \
-v /usr/local/mysql/log:/var/log/mysql \
-v /usr/local/mysql/data:/var/lib/mysql \
-v /usr/local/mysql/conf:/etc/mysql \
-v /etc/localtime:/etc/localtime:ro \
-e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
Canal安装
这里建议使用v1.1.6版本,v1.1.7版本使用时报错
# mysql创建授权账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
# 先创建一个临时容器
docker run --name canal -p 11111:11111 \
-d canal/canal-server:v1.1.6
# 复制配置文件
docker cp canal:/home/admin/canal-server/conf/canal.properties /mydata/canal/conf
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /mydata/canal/conf
# 删除该临时容器
docker rm -f canal
# 新建canal容器
docker run --name canal -p 11111:11111 \
-v /mydata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /mydata/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
--privileged=true \
--restart=always \
-d canal/canal-server:v1.1.6
# 修改/mydata/canal/conf/canal.properties
canal.serverMode = rabbitMQ
canal.destinations = canal.shortlink # 可自定义
rabbitmq.host = 192.168.10.107 # rabbitmq地址
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange # 可自定义
rabbitmq.username = guest
rabbitmq.password = guest
rabbitmq.deliveryMode =
# 修改/mydata/canal/conf/instance.properties
canal.instance.mysql.slaveId=111 # 和mysql的server_id不一样即可
canal.instance.master.address= 192.168.10.107:3306 # mysql地址
canal.instance.dbUsername= canal # 数据库用户名
canal.instance.dbPassword= canal # 数据库密码
canal.instance.filter.regex=link\\..* # 白名单,这里link是表示link的数据库,其余内容可至canal官网查看配置
canal.instance.filter.black.regex=mysql\\.slave_.*,link\\.BASE.* # link\\.BASE.*得添加,否则会报错
canal.mq.topic= canal.shortlink # 和canal.destinations一致
Springboot
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
新建RabbitMQ配置类
package com.study.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author canace
* @version 1.0
* @description rabbitmq配置类
* @date 2024/1/30 14:21
*/
@Configuration
@Slf4j
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
/**
* template.setMessageConverter(new Jackson2JsonMessageConverter());
* 这段和上面这行代码解决RabbitListener循环报错的问题
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
新建消息实体类
package com.study.config;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author canace
* @version 1.0
* @description canal消息实体类
* @date 2024/1/30 14:06
*/
@NoArgsConstructor
@Data
public class CanalMessage<T> {
private String type;
private String table;
private List<T> data;
private String database;
private Long es;
private Integer id;
private Boolean isDdl;
private List<T> old;
private List<String> pkNames;
private String sql;
private Long ts;
}
新建监听类
package com.study.config;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.study.cacheManager.GroupCacheManager;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
/**
* @author canace
* @version 1.0
* @description Canal + RabbitMQ 监听数据库变化
* @date 2024/1/30 13:59
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {
private final GroupCacheManager groupCacheManager;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.shortlink"
)
})
public void handleDataChange(@Payload CanalMessage<?> message) {
String tableName = message.getTable();
if ("t_group".equals(tableName)) {
if ("INSERT".equals(message.getType())) {
// 新增只需要清空用户名的分组缓存即可
String username = JSONUtil.parseObj(message.getData().get(0)).getStr("username");
log.info("新增分组,清空用户名为{}的分组缓存", username);
groupCacheManager.delCacheGroupListByUsername(username);
} else {
// 其他情况需要清空当前gid的缓存和用户名的缓存
String username = JSONUtil.parseObj(message.getData().get(0)).getStr("username");
String gid = JSONUtil.parseObj(message.getData().get(0)).getStr("gid");
log.info("修改或删除分组,清空用户名为{}的分组缓存和gid为{}的分组缓存", username, gid);
groupCacheManager.delCacheGroupByGid(gid);
groupCacheManager.delCacheGroupListByUsername(username);
}
}
}
}
综上即可完成canal监听后由rabbitmq通知后,再对缓存进行处理
评论