您当前的位置 >首页 > 曝光台

10G mysql binlog重放并传输到另一台服务器执行阿里中间件大赛

更新时间:2019-07-27 20:00:42 点击数:81

  给定一批固定的增量数据变更信息(存放在Server端),程序需要单线程顺序读取文件,进行数据重放计算,然后将最终结果输出到给定的目标文件(在Client端)中。

  增量数据的变更主要包含数据库的Insert/Update/Delete三种类型的数据。主键可能发生变更。

  为了降低数据在网络中的传输开销,我们的设计是在Server端完成数据的重放计算,再将结果发送到Client端,写入结果文件中。

  由于数据的重放计算在Server端,Client主要负责接收结果和写入文件,因此核心的算法都在Server端。本节将首先给出Server端算法的系统架构,之后具体介绍多线程算法的细节与实现。

  1)Reader按照赛题要求,负责单线程读取文件。事实上,由于单线程的内存拷贝速度已经跟不上流水线速度,我们用 MappedByteBuffer 将文件按照16M的大小切分成多个的分段(Segment)映射到内存,交由Parser处理。

  容易误解的一点:Parser 并不是 Parse 全部的 key-value,这不够高效。Parser 只负责第一列也就是主键,剩下的部分,通过把当前的 offset 传给 Worker,从而交给 Worker 来处理。

  当所有 Worker 处理完最后的 task 时,意味着回放完成,可以准备输出了。输出其实是 merge K 个有序 stream 的经典问题,可以用堆来高效的解决。

  这里我们用了Disruptor框架,它是一个高性能的线程间消息通讯的库,底层用 RingBuffer 实现。它不仅能取代 ArrayBlockingQueue,功能上还要丰富的多。对于本题的架构,只需要一个 RingBuffer 就能完成。

  为了发挥并行性能,在每个Parser中我们将数据表按 hash(PK) 切分成 N 个 Bucket, 每个Bucket都由一个独立的Worker线程完成重放计算。

  对于Insert,Delete 和一般的 Update 事件,只要分配到对应的 Bucket 去做就可以了;唯独 UpdatePK(更新主键)事件例外,必须要 Bucket 间协作才能保证正确地把数据移动过去。

  一行数据被“拿走”的时候,如果还存在对该行的操作没完成,那这些修改就丢失了!所以,必须要保证一行数据被“拿走”前,所有的修改都已经 apply 到上面。反之同理,必须要“拿到”数据以后,才能把后续的操作 apply 上去。

  很自然的想到,可以用 CountDownLatch 来阻塞 UpdatePK 的接收方(数据被移到此 Bucket),直到 UpdatePK 的发送方发出这行数据,它才拿到数据、接着运行。然而,当 UpdatePK 操作较为密集的时候,这个解决方案非常低效!

  另一种思路是内存中维护一张主键变更表,记录主键的变更历史,将所有 UpdatePK 后新主键的所有操作都分配到旧主键所在的Bucket中。然而每个task分配时都需要从主键变更表中查找对应的Bucket,并且Parser也无法并行执行,同样十分低效。

  针对这一问题, 最终设计出下文的并行算法,本算法的核心在于通过 Promise 对象,解决了 Update 主键这一难题,从而使得数据表的各个 Bucket 的线程能够无锁、高效地协作。

  所谓 Promise,是借鉴 Future/Promise 异步编程范式设计的一个数据结构。实现很简单,只要封装一个 volatile 的变量,如下所示(实际代码实现更复杂,仅为示例):

  相比上一小节提到的 Latch 解决方案,Promise 不是阻塞接收方,而是告诉他:你要的数据还没准备好。明智的接收方将会先“搁置”这个消息,并把后来遇到的所有对这个 Key 的操作都暂存起来(放在blockedTable中,如图所示)。一旦某一时刻 Promise.ready() 为真,就可以把这个 data 放到对应的 Key 上了!暂存的操作也可以那时候再做。

  对于被 block 的 PK 的操作,会以一个链表保存下来。如果不巧操作很多,这个链表就会变的很长。一个简单的改进:如果来的是一个普通 Update 操作,其实可以直接 apply 到上一个操作上的 data 上,例如 A=5 B=6 可以叠加到 PK=3 A=4上, 就变成 PK=3 A=5 B=6,从而避免把 Update 操作追加到链表上。

  如果阻塞的 Tasks 中包含一个 Delete,后面又来了一个 UpdatePKDst,要注意,可能会再次阻塞。

  您可能担心 blockedTable 的查询增加了单个 Bucket 的计算负担。实验表明,由于各个 Bucket 的工作进度差异相差不会很大,blockedTable 的最大 size 也在 25000 以内,远小于数据表大小,所以这个代价是完全可接受的。

  可以从理论上证明:本算法可以处理以任何顺序出现的 UpdatePK / Update / Delete / Insert 操作,保证重放结束后一定查询到正确的结果。

  其实很简单,算法保证了所有的操作都在它们可以执行的时候被执行,换句话说,对于一切有互相依赖关系的操作,算法不会破坏它们的先后关系。算法的并行性,是在保证了该前提的情况下做到的。

  关于表结构的健壮性, 程序会根据第一次遇到的 Insert log 来确定表结构,包括各个列的名字、类型、主键信息等。

  程序严格按照比赛要求。对于数字,支持 long 型正数范围;对于文本,最长支持 65536 个字符。具体实现参考下文“数据存储”一小节。

  *完全无锁(Lock-free),无阻塞(Non-blocking)。在16核CPU的测试场景下,锁竞争将会导致不小的开销;而阻塞更不用说,极端情况下可能多线程会退化成协程。(例如 Latch 的解决方案,连续 UpdatePK 就会导致这样的情况)。本算法完全摈弃了wait()或lock(),而是用 代价极低 的 volatile 实现同步,这是最大的创新点。

  *可伸缩(Scalability)。除了 Reader 根据题意必须单线程,算法中没有任何不可伸缩的数据结构,理论上为线性加速比。若 CPU 核数增加,只要提升 Parser 和 Worker/Bucket 的线程数即可。(一些解决方案用到全局的 KeyMap,导致无法伸缩)

  *流处理(Streaming)。本算法是一个真正的流处理系统,在真实场景中可以不断灌入新数据并提供查询(保证最终一致性)。这也与比赛的初衷一致。

  Java 的范型对于 primitive type 的数据是严重的浪费。比如 Map 是非常低效的,不仅浪费了大量内存,还产生了大量冗余的 boxing/unboxing。

  由于列值的类型为 long 或者 String。对于 long 类型值, 将其解析成 long 数据存储即可。而对于String 类型的数据, 如果通过将其转换成 String 存储, 至少有两个问题:

  一个优化是:如果字符串的 bytes 的长度小于等于 7: 那么直接利用 long 里面的 7 个字节存储,剩下一个字节存长度,避免了磁盘写入。附上StringStore类的核心代码:

  如果为每行数据都创建一个 long[],需要频繁地 new 出大量对象。为此,我们实现了一个 LongArrayPool 来管理所有的行数据,用 offset 来查找所需的数据。

  使用对象池可以解决一半的问题。通过复用 Task,减轻了 new Task() 的压力。但这还不够好!让我们看看 Task 的结构:

  可见,Task 本身结构很简单,相比之下对象头的代价显得很浪费。如果不用对象,其实可以用数组来代替:

  做了上述数组池和对象池优化后,程序启动时间大大增加,这是因为创建 Pool 需要大量分配内存,如果发生在类加载期间,就会阻塞 main 函数的运行。

  解决方案是适当延迟部分 Pool 的分配,对它们采用 lazy 的初始化策略,即第一次使用时才分配所需的内存空间。

  JVM 会在新生代不够分配时触发 GC。考虑到我们有 1G 的新生代内存,而事实上要动态 new 的对象很少,通过调节 Pool 的初始化时机,可以做到只发生一次 ParNew GC。

  对于老年代的 CMS GC 代价很大,我们在比赛中尽可能避免触发 CMS GC。而这就要求尽可能节约内存,上文提到的对象池和数组池发挥了重要作用。

  根据比赛数据集选取最合适的 Parser 和 Worker 线程数,对榨干最后一点 CPU 性能至关重要。

  由于题目规定为单库单表,变更类型(U/I/D)之前的字符(下称 header)没有必要解析,可直接跳过。不过这部分的字符长度并不确定, 因此我们尝试预测这个跳过的长度。对于每一行,如果推测正确,则可以直接跳过这部分字符;否则,从行首开始解析直到到达变更类型, 同时更新预估的 header 长度。

  由于大部分的 log 的 header 长度是一样的, 这个技巧有效地避免大量不必要字符的解析。

  同理,对于 Parser 来说除了主键以外剩下 key-value 并没有用。用类似的思路也可以预测长度并直接跳过。附上 skipHeader的代码:

  实验发现 Netty 和 logback 都比较重量级,拖慢了启动速度。因此自己实现了网络传输和 Logger,减少启动时间。

  大赛过程竞争非常激烈,可谓高手云集。通过这场比赛,我们的技术得到了锻炼,收获了解决问题的成就感。同时也真诚感谢大赛的主办方,让我们有机会在赛场上证明自己的能力。

  背景  早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的...博文来自:FeelTouch

  之前因为懒,没有针对otter做更多的解释和说明,在使用过程中,也发现了一些问题,此次补上一个完整的文档,方便大家使用。Otter是基于cannal开源的,canal又是基于mysqlbinlog的产...博文来自:frog4的专栏

  MySQL的binlog日志是MySQL日志中非常重要的一种日志,记录了数据库所有的DML操作。通过binlog日志我们可以进行数据库的读写分离、数据增量备份以及服务器宕机时的数据恢复。定期备份固然可...博文来自:happyfly的博客

  这两天线上数据被误删除为了回滚折腾了两天终于搞定,其中用到了mysql的binlog以此记录一下。之前没有搞过mysql的mysqlbinlog查看了很多文档但是多数为简单的教程,虽然有用但是还是相对...博文来自:flower的博客

  转载地址- 背景  早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,...博文来自:RodJohnsonDoctor的专栏

  MySQL的主从复制都是单线程的操作,主库对所有DDL和DML产生的日志写进binlog,由于binlog是顺序写,所以效率很高。Slave的SQLThread线程将主库的DDL和DML操作事件在sl...博文来自:疯狂老司机的博客

  Otter1.Otter是什么Ottter是由阿里爸爸开源的一个数据同步产品,它的最初的目的是为了解决跨国异地机房双A架构,两边可写的场景,开发时间从2011年7月份一直持续到现在,目前阿里巴巴B2B...博文来自:FeelTouch

  最近一直在找装修公司,自己辛苦买的房子不住进去确实心有不甘呐。。。所以,比赛完了好久才开始写这个比赛总结。写总结的原因是这次比赛还是学到了很多东西。想要总结下。一开始看到有这个比赛的时候我是犹豫之拒绝...博文来自:yuhaiyang457288的专栏

  本次只分享初赛,赛题介绍见链接:第四届阿里中间件性能挑战赛成绩目前是第16名:总结下,这次比赛收获特别大,从搭建环境都需要两周的一个小白到能走到复赛还能名次这么靠前,这其中靠的是每日每夜的对知识的渴望...博文来自:聆风散的博客

  抱大腿参加了一次中间件比赛,受益匪浅。学到了很多东西,更重要的是认识到了差距在哪。...博文来自:dwt0317的专栏

  目录目录前言正文目录初赛题目介绍Tair环境配置RocketMQ环境配置Jstorm环境配置后记前言跳过废线,阿里巴巴集团在天池大数据平台上举行了阿里中间件性能挑...博文来自:clayanddev的博客

  初赛总结赛题说明      赛题:原来的dubbo服务调用是consumer去注册中心...博文来自:jerryq的博客

  因为脚本中用到SCP命令,需要先实现scp不用输入密码(通过证书实现)。在数据库服务器执行:ssh-keygen按四五次回车证书就生成了,将id_rsa.pub拷贝到另外一台文件服务器的/root/....博文来自:jiaoshenmo的博客

  本次只分享复赛,赛题介绍见链接:第四届阿里中间件性能挑战赛复赛成绩是第8名,得到了去阿里总部答辩的机会,至少1W元的奖金到手。总结下,复赛是另外一番的体验,队友的合作,一起通宵奋斗,最终得到了一个比较...博文来自:聆风散的博客

  初赛:《基于Open-Messaging实现进程内消息引擎》我们手里有程序demo,有大部分的接口,主要需要实现,创建消息,发送消息,创建消费的信息,消费对象。本地有一个单线程和多线程程序,在线测评程...博文来自:u011299745的博客

  先贴一下赛题:实现一个基于发布-订阅模型的消息中间件(broker+client)必选特性:提供可靠消息服务,broker要保证数据同步落盘才能向生产者返回发送成功的ack,并保证投递给所有的消费者,...博文来自:凡凡好疲惫的专栏

  1天池中间件大赛dubboMesh优化总结(qps从1000到6850)原文链接:天池中间件大赛dubboMesh优化总结2 天池中间件大赛-单机百万消息队列存储实现分享原文链接: 天池中间件大赛-单...博文来自:ice-wee的专栏

  第三届阿里中间件性能挑战赛总结喜讯香港科技大学的rapids团队(右二:王立鹏,右三:车煜林)获得季军。thirdplacephotorapids团队(左一:王立鹏,左二:车煜林)与评测环境负责人(左...博文来自:weixin_33721344的博客

  目录目录前言正文题目分析索引设计代码展示关键优化后记前言跳过废话,直接看正文经过初赛的筛选后,我们就进入了复赛(名副其实的废话)。接下来我简单介绍下我们队伍参加复赛一些情况吧。复赛相对初赛而言题目针对...博文来自:clayanddev的博客

  大数据监控binlog组件之maxwell组件:Maxwell是什么玩意?下面我给大家简单介绍一下,并和canal做一个简单的对比: Maxwell:(Zendesk公司)--(服务端+客户端)官方地...博文来自:阿尼古

  1,查看两台服务器是不是可以ping通的例:ping10.22.137.10ping通之后:2,从已有文件的服务器传输到另一台服务器上格式:scp目录/文件名.文件格式目标服务器用户名@ip地址:目标...博文来自:的博客

  数据抽取是ETL流程的第一步,我们常会需要从多个不同的MySQL实例中抽取数据,存入一个中心节点,或直接进入Hive。借助Canal项目,我们能够通过MySQLBinlog进行数据抽取。...博文来自:张吉的博客

  维护mysql的时候,总会遇到数据库恢复的例子。如果把备份集恢复出来相对比较简单。然而如果遇到恢复到时间点的例子,把一个MySQL实例恢复出来之后,需要执行binlog做增量恢复。 常见的办法是用my...博文来自:wzy0623的专栏

  1、在一台新服务器上安装Oracle10g,安装的时候会设置一个密码,请记住,安装快好的时候,会有相关账号锁住的提示,请注意system账号不要被锁住。2、安装好后,用system和设置的密码登陆(P...博文来自:布里渊区

  声明:如果您有更好的技术与作者分享,或者商业合作;请访问作者个人网站留言给作者。如果该案例触犯您的专利,请在这里:http...博文来自:esqabc的博客

  Canal学习记录canal启动过程在canal中一个server中可以包含多个instance,每个instance对应着不同数据库中的不同表格的数据变更。举例说明就是:你可以启动一个server(...博文来自:CWeeYii的专栏

  工程师65533号:您好,已经被上传的binlog日志,可以在控制台-备份恢复里下载博文来自:goldDaNiu的博客

  准备条件:1.已开通阿里云账户并开通了物联网平台。账号的开通利用支付宝、手机号、淘宝号都行的,开通成功后为了不影响后续的使用,最好来个实名认证。2.这次使用的将物联网数据平台上报到云端MySql上,所...博文来自:的博客

  Canal是阿里开源的binlog同步工具。可以解析binlog,并将解析后的数据同步到任何目标存储中。Canal工作原理1、mysqlmaster节点将改变记录保存到二进制binlog文件中。2、c...博文来自:王佩的CSDN博客

  写这篇博文时,自己一定是含着误删数据库的眼泪写完的,文中的后续部分会谈到这个“从删库到**”的悲惨故事,这个故事深刻地教训了我,我也想以此来警示大家,注意数据安全和数据备份。1.可行方案回归正题:我们...博文来自:华仔的逆袭的专栏

上一篇:台海网 首页 -- 台海局势新闻福建新闻厦门新闻漳州新闻泉州新闻厦门微公益

下一篇:组图:网曝范冰冰与妈妈拜访恒大文化 戴大墨镜纤瘦靓丽吸睛_高清图集_新浪网