MySQL特定表全量、增量数据同步到消息队列-解决方案

mysql要同步原始全量数据,也要实时同步MySQL特定库的特定表增量数据,同时对应的修改、删除也要对应,下面就为大家分享一下

目录

  • 1、原始需求
  • 2、解决方案
  • 3、canal介绍、安装

    • canal的工作原理
    • 架构
    • 安装
  • 4、验证

1、原始需求

既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。

数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。

应用场景:数据etl同步、降低业务服务器压力。

2、解决方案

MySQL特定表全量、增量数据同步到消息队列-解决方案

3、canal介绍、安装

canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariadb)。

工作原理:mysql主备复制实现

MySQL特定表全量、增量数据同步到消息队列-解决方案

从上层来看,复制分成三步:

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。
  • canal的工作原理

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)
  • 架构

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    说明:

    • server代表一个canal运行实例,对应于一个jvm
    • instance对应于一个数据队列 (1个server对应1..n个instance)

    instance模块:

    • eventparser (数据源接入,模拟slave协议和master进行交互,协议解析)
    • eventsink (parser和store链接器,进行数据过滤,加工,分发的工作)
    • eventstore (数据存储)
    • metamanager (增量订阅&消费信息管理器)

    安装

    1、mysql、kafka环境准备

    2、canal下载:wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

    3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz

    4、对目录conf里文件参数配置

    对canal.properties配置:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    进入conf/example里,对instance.properties配置:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    5、启动:bin/startup.sh

    6、日志查看:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    4、验证

    1、开发对应的kafka消费者

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    95

    96

    97

    98

    99

    100

    101

    102

    103

    104

    105

    106

    107

    108

    109

    110

    111

    112

    113

    114

    115

    116

    117

    118

    119

    120

    121

    122

    package org.kafka;

    import java.util.arrays;

    import java.util.properties;

    import org.apache.kafka.clients.consumer.consumerrecord;

    import org.apache.kafka.clients.consumer.consumerrecords;

    import org.apache.kafka.clients.consumer.kafkaconsumer;

    import org.apache.kafka.common.serialization.stringdeserializer;

    /**

    *

    * title: kafkaconsumertest

    * description:

    * kafka消费者 demo

    * version:1.0.0

    * @author pancm

    * @date 2018年1月26日

    */

    public class kafkaconsumertest implements runnable {

    private final kafkaconsumer<string, string> consumer;

    private consumerrecords<string, string> msglist;

    private final string topic;

    private static final string groupid = "groupa";

    public kafkaconsumertest(string topicname) {

    properties props = new properties();

    props.put("bootstrap.servers", "192.168.7.193:9092");

    props.put("group.id", groupid);

    props.put("enable.auto.commit", "true");

    props.put("auto.commit.interval.ms", "1000");

    props.put("session.timeout.ms", "30000");

    props.put("auto.offset.reset", "latest");

    props.put("key.deserializer", stringdeserializer.class.getname());

    props.put("value.deserializer", stringdeserializer.class.getname());

    this.consumer = new kafkaconsumer<string, string>(props);

    this.topic = topicname;

    this.consumer.subscribe(arrays.aslist(topic));

    }

    @override

    public void run() {

    int messageno = 1;

    system.out.println("———开始消费———");

    try {

    for (; ; ) {

    msglist = consumer.poll(1000);

    if (null != msglist && msglist.count() > 0) {

    for (consumerrecord<string, string> record : msglist) {

    //消费100条就打印 ,但打印的数据不一定是这个规律的

    system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());

    // string v = decodeunicode(record.value());

    // system.out.println(v);

    //当消费了1000条就退出

    if (messageno % 1000 == 0) {

    break;

    }

    messageno++;

    }

    } else {

    thread.sleep(11);

    }

    }

    } catch (interruptedexception e) {

    e.printstacktrace();

    } finally {

    consumer.close();

    }

    }

    public static void main(string args[]) {

    kafkaconsumertest test1 = new kafkaconsumertest("sample-data");

    thread thread1 = new thread(test1);

    thread1.start();

    }

    /*

    * 中文转unicode编码

    */

    public static string gbencoding(final string gbstring) {

    char[] utfbytes = gbstring.tochararray();

    string unicodebytes = "";

    for (int i = 0; i < utfbytes.length; i++) {

    string hexb = integer.tohexstring(utfbytes[i]);

    if (hexb.length() <= 2) {

    hexb = "00" + hexb;

    }

    unicodebytes = unicodebytes + "\\\\u" + hexb;

    }

    return unicodebytes;

    }

    /*

    * unicode编码转中文

    */

    public static string decodeunicode(final string datastr) {

    int start = 0;

    int end = 0;

    final stringbuffer buffer = new stringbuffer();

    while (start > -1) {

    end = datastr.indexof("\\\\u", start + 2);

    string charstr = "";

    if (end == -1) {

    charstr = datastr.substring(start + 2, datastr.length());

    } else {

    charstr = datastr.substring(start + 2, end);

    }

    char letter = (char) integer.parseint(charstr, 16); // 16进制parse整形字符串

    buffer.append(new character(letter).tostring());

    start = end;

    }

    return buffer.tostring();

    }

    }

    2、对表bak1进行增加数据

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    create table `bak1` (

    `vin` varchar(20) not null,

    `p1` double default null,

    `p2` double default null,

    `p3` double default null,

    `p4` double default null,

    `p5` double default null,

    `p6` double default null,

    `p7` double default null,

    `p8` double default null,

    `p9` double default null,

    `p0` double default null

    ) engine=innodb default charset=utf8mb4

    show create table bak1;

    insert into bak1 select '李雷abcv',

    `p1` ,

    `p2` ,

    `p3` ,

    `p4` ,

    `p5` ,

    `p6` ,

    `p7` ,

    `p8` ,

    `p9` ,

    `p0` from moci limit 10

    3、查看输出结果:

    MySQL特定表全量、增量数据同步到消息队列-解决方案

    到此这篇关于mysql特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关mysql特定表数据同步内容请搜索钦钦技术栈以前的文章或继续浏览下面的相关文章希望大家以后多多支持钦钦技术栈!

    原文链接:https://www.cnblogs.com/lilei2blog/p/15608206.html

    版权声明:本文(即:原文链接:https://www.qin1qin.com/catagory/3893/)内容由互联网用户自发投稿贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630367839@qq.com 举报,一经查实,本站将立刻删除。

    (0)
    上一篇 2022-07-20 10:15:46
    下一篇 2022-07-20 10:15:55

    软件定制开发公司

    相关阅读

    发表回复

    登录后才能评论
    通知:禁止投稿所有关于虚拟货币,币圈类相关文章,发现立即永久封锁账户ID!