Java使用Canal同步MySQL数据到Redis

 更新时间:2024年11月08日 09:59:10   作者:binbinxyz  
在现代微服务架构中,数据同步是一个常见的需求,特别是将 MySQL 数据实时同步到 Redis,下面我们就来看看Java如何使用Canal同步MySQL数据到Redis吧

一、引言

在现代微服务架构中,数据同步是一个常见的需求。特别是将 MySQL 数据实时同步到 Redis,可以显著提升应用的性能和响应速度。本文将详细介绍如何使用 Canal 实现这一目标。Canal 是阿里巴巴开源的一个数据库 Binlog 同步工具,可以实时捕获 MySQL 的 Binlog 日志并将其同步到其他存储系统。

项目地址:alibaba/canal

二、工作原理

1. MySQL主备复制原理

MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2. canal 工作原理

canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)

三、环境准备

1. 安装和配置 MySQL

Canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,并且配置binlog模式为row。编辑 MySQL 配置文件 my.cnf 或 my.ini,添加或修改以下内容:

[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=ROW

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

重启 MySQL 服务以使配置生效:

sudo service mysql restart

2. 安装和配置 Canal

下载并解压 Canal 服务端:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz -C /opt/canal
cd /opt/canal

编辑 Canal 配置文件 conf/example/instance.properties,配置 MySQL 服务器的相关信息:

canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex=.*\\..*

启动 Canal 服务:

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log</pre>
2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志

vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

如果启动失败,注意检查配置文件conf/example/instance.properties的内容,还要注意JDK版本及配置。建议使用1.6.25。我用openjdk 21启动报错,改回JDK8u421启动成功。

3. 安装和配置 Redis

确保 Redis 服务已经安装并启动。可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

四、开发 Java 应用

1. 添加依赖

在你的 pom.xml 文件中添加 Canal 客户端和 Redis 客户端的依赖。以下是一个示例:

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
</dependencies>

2. 编写 Canal 客户端代码

创建一个 Java 类来连接 Canal 服务并处理 Binlog 事件,将数据同步到 Redis:

package org.hbin.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalToRedisSync {

    public static void main(String[] args) {
        // 创建 Canal 连接
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 11111);
        CanalConnector connector = CanalConnectors.newSingleConnector(address, "example", "", "");

        // 连接到 Canal 服务
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();

        // 创建 Redis 客户端
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            Message message = connector.getWithoutAck(100); // 获取最多 100 条记录
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                handleEntry(message.getEntries(), jedis);
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }
    }

    private static void handleEntry(List<CanalEntry.Entry> entries, Jedis jedis) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }

            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    syncDelete(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    syncInsert(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                } else {
                    System.out.println("-------> before");
                    syncUpdate(rowData.getBeforeColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                    System.out.println("-------> after");
                    syncUpdate(rowData.getAfterColumnsList(), jedis, entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
                }
            }
        }
    }

    private static void syncInsert(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Insert: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncUpdate(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        StringBuilder value = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            } else {
                value.append(column.getName()).append(":").append(column.getValue()).append(",");
            }
        }
        System.out.println("Update: " + key.toString() + " -> " + value.toString());
        jedis.hset(schema + ":" + table, key.toString(), value.toString());
    }

    private static void syncDelete(List<CanalEntry.Column> columns, Jedis jedis, String schema, String table) {
        StringBuilder key = new StringBuilder();
        for (CanalEntry.Column column : columns) {
            if (column.getName().equals("id")) {
                key.append(column.getValue());
            }
        }
        System.out.println("Delete: " + key.toString());
        jedis.hdel(schema + ":" + table, key.toString());
    }
}

3. 运行和测试

3.1 启动 Canal 服务:

sh /opt/canal/bin/startup.sh

3.2 启动 Redis 服务:

确保 Redis 服务已经启动,可以在 Redis 客户端中执行以下命令检查:

redis-cli
ping

3.3 启动 Java 应用:

编译并运行上述 Java 应用,确保 Canal 服务和 MySQL 服务器正常运行。

3.4 测试数据同步:

在 MySQL 中插入、更新或删除数据,观察 Java 应用是否能够实时捕获这些变化并将数据同步到 Redis。

相关SQL如下:

drop database if exists canal;
create database canal;
use canal;

drop table if exists user;
create table user(
  `id` bigint AUTO_INCREMENT primary key,
  `name` varchar(20) NOT NULL,
  `age` tinyint DEFAULT 0,
  `detail` varchar(100) DEFAULT '',
  `create_time` date,
  `update_time` date
);

insert into user value(1, 'Tom1', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(2, 'Tom2', 25, 'canal', '2024-11-07', '2024-11-07');
insert into user value(3, 'Tom3', 25, 'canal', '2024-11-07', '2024-11-07');
update user set age=26 where id=2;
delete from user where id=3;

输出信息:

================> binlog[binlog.000008:6390] , name[canal,user] , eventType : CREATE
================> binlog[binlog.000008:6899] , name[canal,user] , eventType : INSERT
Insert: 1 -> name:Tom1,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7213] , name[canal,user] , eventType : INSERT
Insert: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7527] , name[canal,user] , eventType : INSERT
Insert: 3 -> name:Tom3,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:7850] , name[canal,user] , eventType : UPDATE
-------> before
Update: 2 -> name:Tom2,age:25,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
-------> after
Update: 2 -> name:Tom2,age:26,detail:canal,create_time:2024-11-07,update_time:2024-11-07,
================> binlog[binlog.000008:8193] , name[canal,user] , eventType : DELETE
Delete: 3

五、注意事项

性能优化:根据实际需求调整 Canal 和 Redis 的配置,以优化性能。

错误处理:在生产环境中,需要增加错误处理和重试机制,确保数据同步的可靠性。

安全性:确保 Canal 和 Redis 的连接是安全的,使用适当的认证和授权机制。

六、结论

通过使用 Canal,我们可以轻松地将 MySQL 数据实时同步到 Redis、Kafka 或其他系统。这不仅提高了数据的一致性和实时性,还为应用提供了更高的性能和响应速度。

以上就是Java使用Canal同步MySQL数据到Redis的详细内容,更多关于Java Canal同步MySQL数据的资料请关注脚本之家其它相关文章!

相关文章

  • 将本地SpringBoot项目发布到云服务器的方法

    将本地SpringBoot项目发布到云服务器的方法

    这篇文章主要介绍了如何将本地SpringBoot项目发布到云服务器,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-12-12
  • java数据库开发之JDBC的完整封装兼容多种数据库

    java数据库开发之JDBC的完整封装兼容多种数据库

    这篇文章主要介绍了java数据库开发之JDBC的完整封装兼容多种数据库,需要的朋友可以参考下
    2020-02-02
  • Java锁机制Lock用法示例

    Java锁机制Lock用法示例

    这篇文章主要介绍了Java锁机制Lock用法,结合具体实例形式分析了Java锁机制的相关上锁、释放锁、隐式锁、显式锁等概念与使用技巧,需要的朋友可以参考下
    2018-08-08
  • Java两大工具库Commons和Guava使用示例详解

    Java两大工具库Commons和Guava使用示例详解

    这篇文章主要为大家介绍了Java两大工具库Commons和Guava使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-02-02
  • SpringBoot+thymeleaf+ajax实现局部刷新详情

    SpringBoot+thymeleaf+ajax实现局部刷新详情

    这篇文章主要介绍了SpringBoot+thymeleaf+ajax实现局部刷新详情,文章围绕主题展开详细的内容介绍具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • SpringBoot使用Apache Tika实现多种文档的内容解析

    SpringBoot使用Apache Tika实现多种文档的内容解析

    在日常开发中,我们经常需要解析不同类型的文档,如PDF、Word、Excel、HTML、TXT等,Apache Tika是一个强大的内容解析工具,可以轻松地提取文档中的内容和元数据信息,本文将通过SpringBoot和Apache Tika的结合,介绍如何实现对多种文档格式的内容解析
    2024-12-12
  • java Unicode和UTF-8之间转换实例

    java Unicode和UTF-8之间转换实例

    这篇文章主要介绍了java Unicode和UTF-8之间转换实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • 详解如何全注解方式构建SpringMVC项目

    详解如何全注解方式构建SpringMVC项目

    这篇文章主要介绍了详解如何全注解方式构建SpringMVC项目,利用Eclipse构建SpringMVC项目,非常具有实用价值,需要的朋友可以参考下
    2018-10-10
  • JAVA StringBuffer类与StringTokenizer类代码解析

    JAVA StringBuffer类与StringTokenizer类代码解析

    这篇文章主要介绍了JAVA StringBuffer类与StringTokenizer类代码解析,具有一定借鉴价值,需要的朋友可以参考下
    2018-01-01
  • SpringBoot+STOMP协议实现私聊、群聊

    SpringBoot+STOMP协议实现私聊、群聊

    本文将结合实例代码,介绍SpringBoot+STOMP协议实现私聊、群聊,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-06-06

最新评论