MySQL特定表全量、增量数据同步到消息队列-解决方案

 更新时间:2021年11月27日 17:28:48   作者:李雷  
mysql要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应,下面就为大家分享一下

1、原始需求

既要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应。

数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

应用场景:数据ETL同步、降低业务服务器压力。

2、解决方案

3、canal介绍、安装

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

工作原理:mysql主备复制实现

从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1..n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

安装

1、mysql、kafka环境准备

2、canal下载:wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz

4、对目录conf里文件参数配置

对canal.properties配置:

进入conf/example里,对instance.properties配置:

5、启动:bin/startup.sh

6、日志查看:

4、验证

1、开发对应的kafka消费者

package org.kafka;

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;


/**
 *
 * Title: KafkaConsumerTest
 * Description:
 *  kafka消费者 demo
 * Version:1.0.0
 * @author pancm
 * @date 2018年1月26日
 */
public class KafkaConsumerTest implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.7.193:9092");
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (; ; ) {
                msgList = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    for (ConsumerRecord<String, String> record : msgList) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的

                            System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());


//                            String v = decodeUnicode(record.value());

//                            System.out.println(v);

                        //当消费了1000条就退出
                        if (messageNo % 1000 == 0) {
                            break;
                        }
                        messageNo++;
                    }
                } else {
                    Thread.sleep(11);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }

    public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }


    /*
     * 中文转unicode编码
     */
    public static String gbEncoding(final String gbString) {
        char[] utfBytes = gbString.toCharArray();
        String unicodeBytes = "";
        for (int i = 0; i < utfBytes.length; i++) {
            String hexB = Integer.toHexString(utfBytes[i]);
            if (hexB.length() <= 2) {
                hexB = "00" + hexB;
            }
            unicodeBytes = unicodeBytes + "\\u" + hexB;
        }
        return unicodeBytes;
    }

    /*
     * unicode编码转中文
     */
    public static String decodeUnicode(final String dataStr) {
        int start = 0;
        int end = 0;
        final StringBuffer buffer = new StringBuffer();
        while (start > -1) {
            end = dataStr.indexOf("\\u", start + 2);
            String charStr = "";
            if (end == -1) {
                charStr = dataStr.substring(start + 2, dataStr.length());
            } else {
                charStr = dataStr.substring(start + 2, end);
            }
            char letter = (char) Integer.parseInt(charStr, 16); // 16进制parse整形字符串。
            buffer.append(new Character(letter).toString());
            start = end;
        }
        return buffer.toString();

    }
}

2、对表bak1进行增加数据

CREATE TABLE `bak1` (
  `vin` varchar(20) NOT NULL,
  `p1` double DEFAULT NULL,
  `p2` double DEFAULT NULL,
  `p3` double DEFAULT NULL,
  `p4` double DEFAULT NULL,
  `p5` double DEFAULT NULL,
  `p6` double DEFAULT NULL,
  `p7` double DEFAULT NULL,
  `p8` double DEFAULT NULL,
  `p9` double DEFAULT NULL,
  `p0` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

show create table bak1;

insert into bak1 select '李雷abcv',
  `p1` ,
  `p2` ,
  `p3` ,
  `p4` ,
  `p5` ,
  `p6` ,
  `p7` ,
  `p8` ,
  `p9` ,
  `p0`  from moci limit 10

3、查看输出结果:

到此这篇关于MySQL特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关MySQL特定表数据同步内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 本地windows安装两个mysql服务器,配置主从同步

    本地windows安装两个mysql服务器,配置主从同步

    大型网站为了缓解大量的并发访问,除了在网站实现分布式负载均衡,还会搭建服务器mysql集群技术,来分担主数据库的压力。在本地电脑能实现这样的技术吗,本地windows安装两个mysql服务器,配置主从同步也是可以实现的,快来跟着教程测试一下吧。
    2022-12-12
  • mysql如何根据.frm和.ibd文件恢复数据表

    mysql如何根据.frm和.ibd文件恢复数据表

    这篇文章主要介绍了mysql根据.frm和.ibd文件恢复数据表的操作方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-07-07
  • MySQL安装过程报starting the server报错详细解决方案(附MySQL安装程序)

    MySQL安装过程报starting the server报错详细解决方案(附MySQL安装程序)

    如果电脑是第一次安装MySQL,一般不会出现这样的报错,starting the server失败通常是因为上次安装的该软件未清除干净,这篇文章主要给大家介绍了关于MySQL安装过程报starting the server报错的详细解决方案,文中还附MySQL安装程序,需要的朋友可以参考下
    2024-03-03
  • MySQL触发器使用详解

    MySQL触发器使用详解

    本文主要详细介绍了mysql数据库的触发器的相关知识,非常的全面,有需要的小伙伴参考下吧。
    2015-01-01
  • mysql存储引擎和数据类型(二)

    mysql存储引擎和数据类型(二)

    这篇文章主要为大家详细介绍了mysql存储引擎和数据类型第二篇,感兴趣的小伙伴们可以参考一下
    2016-08-08
  • 使用MySQL实现高效的用户昵称模糊搜索

    使用MySQL实现高效的用户昵称模糊搜索

    在大型系统中,用户表中的昵称字段需要支持高效的模糊搜索,并且必须处理包含特殊字符的查询,本文将介绍一种在MySQL中实现高效模糊搜索的解决方案,能够支持特殊字符,并且利用MySQL自身的全文索引机制来优化搜索性能,需要的朋友可以参考下
    2024-05-05
  • MySQL转换Oracle的需要注意的七个事项

    MySQL转换Oracle的需要注意的七个事项

    有很多应用项目, 刚起步的时候用MySQL数据库基本上能实现各种功能需求,随着应用用户的增多,数据量的增加,MySQL渐渐地出现不堪重负的情况:连接很慢甚至宕机,于是就有MySQL转换Oracle的需求,应用程序也要相应做一些修改。
    2010-12-12
  • mysql数据库插入速度和读取速度的调整记录

    mysql数据库插入速度和读取速度的调整记录

    由于项目变态需求;需要在一个比较短时间段急剧增加数据库记录(两三天内,由于0增加至4亿)。在整个过程调优过程非常艰辛
    2012-07-07
  • Mysql中批量替换某个字段的部分数据(推荐)

    Mysql中批量替换某个字段的部分数据(推荐)

    这篇文章主要介绍了Mysql中批量替换某个字段的部分数据,通过实例代码给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-02-02
  • 修改mysql5.5默认编码(图文步骤修改为utf-8编码)

    修改mysql5.5默认编码(图文步骤修改为utf-8编码)

    安装mysql后,启动服务并登陆,使用show variables命令可查看mysql数据库的默认编码;mysql数据库的默认编码并不是utf-8如何修改呢,本文将详细介绍,感兴趣的朋友可以了解下
    2013-01-01

最新评论