目录
- 1、原始需求
- 2、解决方案
-
3、canal介绍、安装
- canal的工作原理
- 架构
- 安装
-
4、验证
1、原始需求
既要同步原始全量数据,也要实时同步mysql特定库的特定表增量数据,同时对应的修改、删除也要对应。
数据同步不能有侵入性:不能更改业务程序,并且不能对业务侧有太大性能压力。
应用场景:数据etl同步、降低业务服务器压力。
2、解决方案
3、canal介绍、安装
canal是阿里巴巴旗下的一款开源项目,纯java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql(也支持mariadb)。
工作原理:mysql主备复制实现
从上层来看,复制分成三步:
canal的工作原理
原理相对比较简单:
架构
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance模块:
- eventparser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventsink (parser和store链接器,进行数据过滤,加工,分发的工作)
- eventstore (数据存储)
- metamanager (增量订阅&消费信息管理器)
安装
1、mysql、kafka环境准备
2、canal下载:wgethttps://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解压:tar -zxvf canal.deployer-1.1.3.tar.gz
4、对目录conf里文件参数配置
对canal.properties配置:
进入conf/example里,对instance.properties配置:
5、启动:bin/startup.sh
6、日志查看:
4、验证
1、开发对应的kafka消费者
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | package org.kafka; import java.util.arrays; import java.util.properties; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; import org.apache.kafka.common.serialization.stringdeserializer; /** * * title: kafkaconsumertest * description: * kafka消费者 demo * version:1.0.0 * @author pancm * @date 2018年1月26日 */ public class kafkaconsumertest implements runnable { private final kafkaconsumer<string, string> consumer; private consumerrecords<string, string> msglist; private final string topic; private static final string groupid = "groupa"; public kafkaconsumertest(string topicname) { properties props = new properties(); props.put("bootstrap.servers", "192.168.7.193:9092"); props.put("group.id", groupid); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer", stringdeserializer.class.getname()); props.put("value.deserializer", stringdeserializer.class.getname()); this.consumer = new kafkaconsumer<string, string>(props); this.topic = topicname; this.consumer.subscribe(arrays.aslist(topic)); } @override public void run() { int messageno = 1; system.out.println("———开始消费———"); try { for (; ; ) { msglist = consumer.poll(1000); if (null != msglist && msglist.count() > 0) { for (consumerrecord<string, string> record : msglist) { //消费100条就打印 ,但打印的数据不一定是这个规律的 system.out.println(messageno + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); // string v = decodeunicode(record.value()); // system.out.println(v); //当消费了1000条就退出 if (messageno % 1000 == 0) { break; } messageno++; } } else { thread.sleep(11); } } } catch (interruptedexception e) { e.printstacktrace(); } finally { consumer.close(); } } public static void main(string args[]) { kafkaconsumertest test1 = new kafkaconsumertest("sample-data"); thread thread1 = new thread(test1); thread1.start(); } /* * 中文转unicode编码 */ public static string gbencoding(final string gbstring) { char[] utfbytes = gbstring.tochararray(); string unicodebytes = ""; for (int i = 0; i < utfbytes.length; i++) { string hexb = integer.tohexstring(utfbytes[i]); if (hexb.length() <= 2) { hexb = "00" + hexb; } unicodebytes = unicodebytes + "\\\\u" + hexb; } return unicodebytes; } /* * unicode编码转中文 */ public static string decodeunicode(final string datastr) { int start = 0; int end = 0; final stringbuffer buffer = new stringbuffer(); while (start > -1) { end = datastr.indexof("\\\\u", start + 2); string charstr = ""; if (end == -1) { charstr = datastr.substring(start + 2, datastr.length()); } else { charstr = datastr.substring(start + 2, end); } char letter = (char) integer.parseint(charstr, 16); // 16进制parse整形字符串。 buffer.append(new character(letter).tostring()); start = end; } return buffer.tostring(); } } |
2、对表bak1进行增加数据
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | create table `bak1` ( `vin` varchar(20) not null, `p1` double default null, `p2` double default null, `p3` double default null, `p4` double default null, `p5` double default null, `p6` double default null, `p7` double default null, `p8` double default null, `p9` double default null, `p0` double default null ) engine=innodb default charset=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10 |
3、查看输出结果:
到此这篇关于mysql特定表全量、增量数据同步到消息队列-解决方案的文章就介绍到这了,更多相关mysql特定表数据同步内容请搜索钦钦技术栈以前的文章或继续浏览下面的相关文章希望大家以后多多支持钦钦技术栈!
原文链接:https://www.cnblogs.com/lilei2blog/p/15608206.html
版权声明:本文(即:原文链接:https://www.qin1qin.com/catagory/3893/)内容由互联网用户自发投稿贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630367839@qq.com 举报,一经查实,本站将立刻删除。