[toc]
canal是什么
canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。
中文文档 https://www.wenjiangs.com/doc/canal-introduction
官网 https://github.com/alibaba/canal
工作原理
1.canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2.mysql master收到dump请求,开始推送binary log给slave(也就是canal)
3.canal解析binary log对象(原始为byte流)
架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance??椋?/h3>
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
安装
1.下载安装包
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -xvf canal.deployer-1.1.5.tar.gz
2.mysql开启binlog
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -xvf canal.deployer-1.1.5.tar.gz
修改my.cnf
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
log-bin=mysql-bin
binlog-format=ROW
server_id=1
binlog是row模式
重启后,执行sql指令show variables like '%log_bin%'
3.创建mysql的canal用户
mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'localhost' WITH GRANT OPTION;
Query OK, 0 rows affected (0.01 sec)
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
4.修改canal配置文件
文件夹/root/canal/conf/有一个example文件夹,一个example就代表一个instance实例
vi /root/canal/conf/example/instance.properties
#################################################
# 定义mysql slave的id
canal.instance.mysql.slaveId=1234
# 填写数据库ip:端口
canal.instance.master.address=192.168.10.27:3306
# 填写数据库username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#################################################
5.启动、关闭、重启canal
cd /root/canal/bin
sh startup.sh
sh stop.sh
sh restart.sh
6.相关日志
/root/canal/logs/canal/canal.log
/root/canal/logs/example/example.log
java代码读取binlog同步到redis
1.添加依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
2.RedisUtil
package com.wangyue.study.canal;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
// Redis服务器IP
private static String ADDR = "192.168.10.27";
// Redis的端口号
private static int PORT = 6379;
// 访问密码
private static String AUTH = "hxcx123!@#";
// 可用连接实例的最大数目,默认值为8;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
private static int MAX_ACTIVE = 1024;
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
private static int MAX_IDLE = 200;
// 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
private static int MAX_WAIT = 10000;
// 过期时间
protected static int expireTime = 660 * 660 *24;
// 连接池
protected static JedisPool pool;
/**
* 静态代码,只在初次调用一次
*/
static {
JedisPoolConfig config = new JedisPoolConfig();
//最大连接数
config.setMaxTotal(MAX_ACTIVE);
//最多空闲实例
config.setMaxIdle(MAX_IDLE);
//超时时间
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000, AUTH, 3);
}
/**
* 获取jedis实例
*/
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
}
return jedis;
}
/**
* 释放jedis资源
*
* @param jedis
* @param isBroken
*/
protected static void closeResource(Jedis jedis, boolean isBroken) {
return;
}
/**
* 是否存在key
*
* @param key
*/
public static boolean existKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return false;
}
/**
* 删除key
*
* @param key
*/
public static void delKey(String key) {
Jedis jedis = null;
boolean isBroken = false;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
/**
* 取得key的值
*
* @param key
*/
public static String stringGet(String key) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加string数据
*
* @param key
* @param value
*/
public static String stringSet(String key, String value) {
Jedis jedis = null;
boolean isBroken = false;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
e.printStackTrace();
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
return lastVal;
}
/**
* 添加hash数据
*
* @param key
* @param field
* @param value
*/
public static void hashSet(String key, String field, String value) {
boolean isBroken = false;
Jedis jedis = null;
try {
jedis = getJedis();
if (jedis != null) {
jedis.select(0);
jedis.hset(key, field, value);
jedis.expire(key, expireTime);
}
} catch (Exception e) {
isBroken = true;
} finally {
closeResource(jedis, isBroken);
}
}
}
3.CanalTest
package com.wangyue.study.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.util.List;
public class CanalTest {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.27", 11111),
"example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisUpdate( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
}
}
private static void redisDelete( List<Column> columns){
JSONObject json=new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if(columns.size()>0){
RedisUtil.delKey("user:"+ columns.get(0).getValue());
}
}
}
运行后,在mysql数据库里修改数据保存
控制台结果:
canal集群搭建
安装zookeeper
??!这里注意下,不要使用zookeeper的高版本,可能会出现启动失败的情况,Starting zookeeper ... FAILED TO START
1.下载解压
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar -xvf zookeeper-3.4.9.tar.gz
2.修改配置
cd zookeeper-3.4.9/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
############################
#设置数据存储位置
dataDir=/root/zookeeper/data
############################
- 启动/重启/关闭
./zkServer.sh start
./zkServer.sh restart
./zkServer.sh stop
- 查看状态
./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /root/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
Mode: standalone
- 客户端连接
# 2181 是zk默认端口
./zkCli.sh -server localhost:2181
集群部署
目前canal的集群部署仅支持HA形式,使用zookeeper来实现抢占式HA,一个active,多个standby。
修改canal配置文件
vi canal/conf/canal.properties
# register ip to zookeeper
canal.register.ip = 192.168.10.27
# zk地址,如果多个zk用逗号隔开且不留空格,例如10.105.10.123:2181,10.105.10.124:2181,10.105.10.125:2181
canal.zkServers = 192.168.10.27:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
部署从节点canal
拷贝主节点的canal到另一台机器,修改instance配置
vi /root/canal/conf/example/instance.properties
# 设置slaveid,和master不同即可
canal.instance.mysql.slaveId=1235
修改canal配置
vi /root/canal/conf/canal.properties
# register ip to zookeeper
canal.register.ip = 192.168.10.26
其他配置项都跟主节点一致,然后两个节点canal启动
查看canal在zk中的状态
./zkCli.sh -server localhost:2181
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.26:11111"}
cZxid = 0x86
ctime = Fri Mar 04 01:14:55 EST 2022
mZxid = 0x86
mtime = Fri Mar 04 01:14:55 EST 2022
pZxid = 0x86
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000a
dataLength = 47
numChildren = 0
部署情况:
- 部署了192.168.10.27:11111和192.168.10.26:11111
- 当前192.168.10.26:11111节点是active,192.168.10.27:11111是standby
- 如果去192.168.10.26关闭canal,可以看到192.168.10.27成为active
[zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
{"active":true,"address":"192.168.10.27:11111"}
cZxid = 0x9f
ctime = Fri Mar 04 03:06:18 EST 2022
mZxid = 0x9f
mtime = Fri Mar 04 03:06:18 EST 2022
pZxid = 0x9f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x17f53863740000b
dataLength = 47
numChildren = 0
java客户端代码
修改CanalConnector即可,连接地址改成zookeeper的ip:端口
CanalConnector connector = CanalConnectors.newClusterConnector("192.168.10.27:2181","example", "canal", "canal");
Canal Admin
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
部署
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
mkdir /tmp/canal-admin
tar zxvf canal.admin-$version.tar.gz -C /tmp/canal-admin
修改配置文件 conf/application.yml
设置mysql地址
在mysql执行conf/canal_manager.sql
初始化sql
启动
sh bin/startup.sh
启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456
canal+Kafka进行数据库同步
为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。这里就出现了一个比较重要的问题,如何保证canal-client集群解析消费binlog的顺序性呢?
我们使用的binlog是row模式。每一个写操作都会产生一条binlog日志。 举个简单的例子:插入了一条a记录,并且立马修改a记录。这样会有两个消息发送给canal-client,如果由于网络等原因,更新的消息早于插入的消息被处理了,还没有插入记录,更新操作的最后效果是失败的。
canal可以和消息队列组合,支持kafka,rabbitmq,rocketmq多种选择,在消息队列这层来实现消息的顺序性。
安装kafka
部署
wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
修改配置文件
vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改参数
zookeeper.connect=192.168.10.27:2181
listeners=PLAINTEXT://:9092
# zookeeper地址
advertised.listeners=PLAINTEXT://192.168.10.27:9092
启动server
start脚本
# bin/kafka-server-start.sh -daemon config/server.properties &
查看所有topic
# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
查看指定topic 下面的数据
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092 --from-beginning --topic example_t
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
修改canal配置
修改配置文件canal.properties
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
# kafka地址
kafka.bootstrap.servers = 192.168.10.27:9092
然后重启
[root@bogon kafka_2.11-1.1.1]# sh bin/kafka-topics.sh --list --zookeeper 192.168.10.27:2181
example
java代码
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
package com.wangyue.study.canal;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class CanalKafkaConsumer {
public static void main(String[] args) {
/* 消费者三个属性必须指定(broker地址清单、key和value的反序列化器) */
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.10.27:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
// 群组并非完全必须. 重要知识:在同一Topic下,相同的groupID消费群组中,只有一个消费者可以拿到数据。
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
try {
//消费者订阅主题(可以多个)
consumer.subscribe(Collections.singletonList("example"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s", record.topic(), record.partition(),
record.offset(), record.key(), record.value()));
JSONObject valueJson = JSONObject.parseObject(record.value());
JSONArray data = valueJson.getJSONArray("data");
String type = valueJson.getString("type");
String table = valueJson.getString("table");
if (StringUtils.equalsIgnoreCase(type, "delete")) {
redisDelete(data, table);
} else if (StringUtils.equalsIgnoreCase(type, "insert")) {
redisInsert(data, table);
} else if (StringUtils.equalsIgnoreCase(type, "update")) {
redisUpdate(data, table);
}
}
}
//通过另外一个线程 consumer. wakeup()
} finally {
consumer.close();
}
}
private static void redisInsert(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.stringSet(key, rowData.toJSONString());
}
}
private static void redisUpdate(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.stringSet(key, rowData.toJSONString());
}
}
private static void redisDelete(JSONArray data, String tableName) {
for (int i = 0; i < data.size(); i++) {
JSONObject rowData = data.getJSONObject(i);
String key = tableName + ":" + rowData.getString("id");
RedisUtil.delKey(key);
}
}
}
canal存到kafka里的数据内容范例:
{
"data": [{
"id": "17",
"doctorId": "15",
"name": "来二楼3",
"birthday": "2013-02-28",
"sex": "0",
"telephone": "15632554566",
"province": "重庆市",
"city": "重庆城区",
"area": "万州区",
"address": "AP库珀热热蓉蓉",
"createTime": "2022-02-28 17:29:43",
"updateTime": "2022-03-03 14:48:31",
"createBy": null,
"updateBy": "18960862122",
"isRemove": "1"
}],
"database": "tcm",
"es": 1646631848000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "int",
"doctorId": "int",
"name": "varchar(200)",
"birthday": "varchar(100)",
"sex": "int",
"telephone": "varchar(20)",
"province": "varchar(50)",
"city": "varchar(50)",
"area": "varchar(50)",
"address": "varchar(255)",
"createTime": "varchar(50)",
"updateTime": "varchar(50)",
"createBy": "varchar(50)",
"updateBy": "varchar(50)",
"isRemove": "bit(1)"
},
"old": [{
"name": "来二楼"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"doctorId": 4,
"name": 12,
"birthday": 12,
"sex": 4,
"telephone": 12,
"province": 12,
"city": 12,
"area": 12,
"address": 12,
"createTime": 12,
"updateTime": 12,
"createBy": 12,
"updateBy": 12,
"isRemove": -7
},
"table": "t_patient",
"ts": 1646631848288,
"type": "UPDATE"
}
redis 存取结果: