最近正在迭代公司的一个项目,由于业务调整,需要用到阿里开源的canal中间件,业余时间整理了一下相关的知识点,有需要的小伙伴可以参考一下!
也就是说,在数据库更新后,依赖这些数据的服务都需要做相应的变化,需要在相应的服务中写相应的逻辑,对原有代码侵入量比较大,也不利于后期的维护
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件,canal通过binlog同步拿到变更数据,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等多源同步
系统:ubuntu:21
以下服务都在docker中运行
mysql:8.0
canal-server:latest
1. 一定要开启binlog 日志
2. 设置binglog 行模式为row
3.设置一个全局唯一的server_id
创建canal用户,并授予权限
创建用户
create user canal identified by 'canal';
授予权限
grant select ,replication slave ,replication client on *.* to 'canal'@'%';
刷新权限
flush privileges;
//如果是mysql8.0以上的版本,需要修改一下加密方式,否则同步时会有异常,我这里以前设置过,所以省略了
下载canal-sever镜像
docker pull canal/canal-server:latest
启动方式一,直接使用 docker run在命令行运行
说明:
mysql 和canal-server必须可以通信,由于我的mysql和canal-server在同一台机器上,但是mysql并没有默认的网络,这里使用的是自定义网络wwwdata_frontend,所以也需要把canal-server加入到这个网络中
docker run -d --name canal-server --network wwwdata_frontend -p 11111:11111 canal/canal-server:latest
启动方式二,使用docker-compose启动,方便多个服务管理
docker-compose.yml部分配置如下
canal-server:
image: canal/canal-server:latest
container_name: canal-server
restart: always
ports:
- "9100:9100"
- "11111:11111"
- "11110:11110"
- "11112:11112"
networks:
- frontend
volumes:
- /wwwdata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
- /wwwdata/canal/conf/canal.properties:/home/admin/canal-server/conf/example/canal.properties
需要完整的docker-compose.yml(测试服务器上的服务只包含nginx,redis,php,mysql,canal)可以私信我哈,有需要帮忙制作dockerfile或docker-compose.yml文件的,都可以私信我哈
我这里由于测试,大部分使用了默认的参数,有以下几个参数需要注意
instance.properties文件中
canal.instance.master.address=mysql80:3306 我这里的mysql80是我的docker中运行的mysql服务名,当然也可以mysql对应的ip,(两个服务必须可以ping 通)
这里是在mysql中创建的用户名
canal.instance.dbUsername=canal canal.instance.dbPassword=canal
canal.properties文件中
canal端口 canal.port = 11111 canal.metrics.pull.port = 11112
canal.destinations = example //我这里用的是默认的
我这里使用go客户端来实现
package mainimport ( "fmt" "github.com/golang/protobuf/proto" "github.com/withlin/canal-go/client" pbe "github.com/withlin/canal-go/protocol/entry" "log" "os" "time")func main() { //idletimeout 设置为0 表示不限制 connector := client.NewSimpleCanalConnector("canal-serve地址", 端口号默认11111, "用户名", "密码", "destination默认是example", sotimeOut, idletimeout设置为0表示不退出) err := connector.Connect() if err != nil { log.Println(err) os.Exit(1) } //订阅表,所有表 err = connector.Subscribe(".*\..*") if err != nil { fmt.Println(err) return } for { message, err := connector.Get(100, nil, nil) if err != nil { log.Println(err) os.Exit(1) } batchId := message.Id if batchId == -1 || len(message.Entries) <= 0 { time.Sleep(3000 * time.Millisecond) fmt.Println("===没有数据了===") continue } printEntry(message.Entries) }}func printEntry(entrys []pbe.Entry) { for _, entry := range entrys { if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND { continue } rowChange := new(pbe.RowChange) err := proto.Unmarshal(entry.GetStoreValue(), rowChange) checkError(err) if rowChange != nil { eventType := rowChange.GetEventType() header := entry.GetHeader() fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType())) for _, rowData := range rowChange.GetRowDatas() { if eventType == pbe.EventType_DELETE { printColumn(rowData.GetBeforeColumns()) } else if eventType == pbe.EventType_INSERT { printColumn(rowData.GetAfterColumns()) } else { fmt.Println("-------> before") printColumn(rowData.GetBeforeColumns()) fmt.Println("-------> after") printColumn(rowData.GetAfterColumns()) } } } }}func printColumn(columns []*pbe.Column) { for _, col := range columns { fmt.Println(fmt.Sprintf("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated())) }}func checkError(err error) { if err != nil { fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error()) os.Exit(1) }}
mysql添加数据
//创建表
CREATE TABLE `canal` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
//添加数据
insert INTO canal (name) VALUE("abcccc")
在客户端可以看到,实时更新了
本次测试并没有实现HA,只有单机实例做的测试,生产中遇到啥坑,后面再填吧