Canal中间件入门指南:使用教程与最佳实践

发表时间: 2022-05-03 07:33

最近正在迭代公司的一个项目,由于业务调整,需要用到阿里开源的canal中间件,业余时间整理了一下相关的知识点,有需要的小伙伴可以参考一下!

生产环境遇到的问题

  • 数据库更新数据后,缓存也要相应的更新
  • 数据库更新后,elasticsearch,hbase中的数据也要及时更新
  • 数据库更新后,kafaka消息队列中也要及时更新

也就是说,在数据库更新后,依赖这些数据的服务都需要做相应的变化,需要在相应的服务中写相应的逻辑,对原有代码侵入量比较大,也不利于后期的维护

什么canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件,canal通过binlog同步拿到变更数据,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等多源同步

测试环境

系统:ubuntu:21

以下服务都在docker中运行

mysql:8.0

canal-server:latest

安装mysql步骤省略

  • 注意点如下
  • 1. 一定要开启binlog 日志

    2. 设置binglog 行模式为row

    3.设置一个全局唯一的server_id

    创建canal用户,并授予权限

    创建用户

    create user canal identified by 'canal';

    授予权限

    grant select ,replication slave ,replication client on *.* to 'canal'@'%';

    刷新权限

    flush privileges;

    //如果是mysql8.0以上的版本,需要修改一下加密方式,否则同步时会有异常,我这里以前设置过,所以省略了

    安装canal-server服务端

    下载canal-sever镜像

    docker pull canal/canal-server:latest

    启动方式一,直接使用 docker run在命令行运行

    说明:

    mysql 和canal-server必须可以通信,由于我的mysql和canal-server在同一台机器上,但是mysql并没有默认的网络,这里使用的是自定义网络wwwdata_frontend,所以也需要把canal-server加入到这个网络中


    docker run -d --name canal-server --network wwwdata_frontend -p 11111:11111 canal/canal-server:latest

    启动方式二,使用docker-compose启动,方便多个服务管理

    docker-compose.yml部分配置如下

    canal-server:

    image: canal/canal-server:latest

    container_name: canal-server

    restart: always

    ports:

    - "9100:9100"

    - "11111:11111"

    - "11110:11110"

    - "11112:11112"

    networks:

    - frontend

    volumes:

    - /wwwdata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties

    - /wwwdata/canal/conf/canal.properties:/home/admin/canal-server/conf/example/canal.properties

    需要完整的docker-compose.yml(测试服务器上的服务只包含nginx,redis,php,mysql,canal)可以私信我哈,有需要帮忙制作dockerfiledocker-compose.yml文件的,都可以私信我哈

    修改配置文件instance.properties和canal.properties

    我这里由于测试,大部分使用了默认的参数,有以下几个参数需要注意

    instance.properties文件中

    canal.instance.master.address=mysql80:3306 我这里的mysql80是我的docker中运行的mysql服务名,当然也可以mysql对应的ip,(两个服务必须可以ping)

    这里是在mysql中创建的用户名

    canal.instance.dbUsername=canal canal.instance.dbPassword=canal

    canal.properties文件中

    canal端口 canal.port = 11111 canal.metrics.pull.port = 11112

    canal.destinations = example //我这里用的是默认的

    再重新启动canal-server,查看日志,如下则说明执行成功

    编写客户端来测试一下

    我这里使用go客户端来实现

    package mainimport (	"fmt"	"github.com/golang/protobuf/proto"	"github.com/withlin/canal-go/client"	pbe "github.com/withlin/canal-go/protocol/entry"	"log"	"os"	"time")func main() {	//idletimeout 设置为0 表示不限制	connector := client.NewSimpleCanalConnector("canal-serve地址", 端口号默认11111, "用户名", "密码", "destination默认是example", sotimeOut, idletimeout设置为0表示不退出)	err := connector.Connect()	if err != nil {		log.Println(err)		os.Exit(1)	}	//订阅表,所有表	err = connector.Subscribe(".*\..*")	if err != nil {		fmt.Println(err)		return	}	for {		message, err := connector.Get(100, nil, nil)		if err != nil {			log.Println(err)			os.Exit(1)		}		batchId := message.Id		if batchId == -1 || len(message.Entries) <= 0 {			time.Sleep(3000 * time.Millisecond)			fmt.Println("===没有数据了===")			continue		}		printEntry(message.Entries)	}}func printEntry(entrys []pbe.Entry) {	for _, entry := range entrys {		if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {			continue		}		rowChange := new(pbe.RowChange)		err := proto.Unmarshal(entry.GetStoreValue(), rowChange)		checkError(err)		if rowChange != nil {			eventType := rowChange.GetEventType()			header := entry.GetHeader()			fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.GetSchemaName(), header.GetTableName(), header.GetEventType()))			for _, rowData := range rowChange.GetRowDatas() {				if eventType == pbe.EventType_DELETE {					printColumn(rowData.GetBeforeColumns())				} else if eventType == pbe.EventType_INSERT {					printColumn(rowData.GetAfterColumns())				} else {					fmt.Println("-------> before")					printColumn(rowData.GetBeforeColumns())					fmt.Println("-------> after")					printColumn(rowData.GetAfterColumns())				}			}		}	}}func printColumn(columns []*pbe.Column) {	for _, col := range columns {		fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))	}}func checkError(err error) {	if err != nil {		fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())		os.Exit(1)	}}

    测试

    mysql添加数据

    //创建表

    CREATE TABLE `canal` (

    `id` int NOT NULL AUTO_INCREMENT,

    `name` varchar(255) DEFAULT NULL,

    PRIMARY KEY (`id`)

    ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

    //添加数据

    insert INTO canal (name) VALUE("abcccc")

    在客户端可以看到,实时更新了

    本次测试并没有实现HA,只有单机实例做的测试,生产中遇到啥坑,后面再填吧