Storm入门教程第五章一致性事务

上传人:tia****g98 文档编号:167455203 上传时间:2022-11-03 格式:DOCX 页数:7 大小:35.26KB
收藏 版权申诉 举报 下载
Storm入门教程第五章一致性事务_第1页
第1页 / 共7页
Storm入门教程第五章一致性事务_第2页
第2页 / 共7页
Storm入门教程第五章一致性事务_第3页
第3页 / 共7页
资源描述:

《Storm入门教程第五章一致性事务》由会员分享,可在线阅读,更多相关《Storm入门教程第五章一致性事务(7页珍藏版)》请在装配图网上搜索。

1、第五章 一致性事务Storm是一个分布式的流处理系统,利用anchor和ack机制保证所有tuple都被成功处理。如果tuple出错,则可以被重传,但是如何保证出错的tuple只被处理一次呢?Storm提供了一套事务性组件Transaction Topology,用来解决这个问题。Transactional Topology目前已经不再维护,由Trident来实现事务性topology,但是原理相同。5.1一致性事务的设计Storm如何实现即对tuple并行处理,又保证事务性。本节从简单的事务性实现方法入手,逐步引出Transactional Topology的原理。5.1.1 简单设计一:强

2、顺序流保证tuple只被处理一次,最简单的方法就是将tuple流变成强顺序的,并且每次只处理一个tuple。从1开始,给每个tuple都顺序加上一个id。在处理tuple的时候,将处理成功的tuple id和计算结果存在数据库中。下一个tuple到来的时候,将其id与数据库中的id做比较。如果相同,则说明这个tuple已经被成功处理过了,忽略它;如果不同,根据强顺序性,说明这个tuple没有被处理过,将它的id及计算结果更新到数据库中。以统计消息总数为例。每来一个tuple,如果数据库中存储的id 与当前tuple id不同,则数据库中的消息总数加1,同时更新数据库中的当前tuple id值。

3、如图: 但是这种机制使得系统一次只能处理一个tuple,无法实现分布式计算。5.1.2 简单设计二:强顺序batch流为了实现分布式,我们可以每次处理一批tuple,称为一个batch。一个batch中的tuple可以被并行处理。我们要保证一个batch只被处理一次,机制和上一节类似。只不过数据库中存储的是batch id。batch的中间计算结果先存在局部变量中,当一个batch中的所有tuple都被处理完之后,判断batch id,如果跟数据库中的id不同,则将中间计算结果更新到数据库中。如何确保一个batch里面的所有tuple都被处理完了呢?可以利用Storm提供的Coordinate

4、Bolt。如图:但是强顺序batch流也有局限,每次只能处理一个batch,batch之间无法并行。要想实现真正的分布式事务处理,可以使用storm提供的Transactional Topology。在此之前,我们先详细介绍一下CoordinateBolt的原理。5.1.3 CoordinateBolt原理CoordinateBolt具体原理如下: 真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为real bolt。 每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要

5、给哪些tuple发送信息(同样根据groping信息) Real bolt发出一个tuple后,其外层的CoordinateBolt会记录下这个tuple发送给哪个task了。 等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理完了所有的tuple。 下游CoordinateBolt会重复上面的步骤,通知其下游。整个过程如图所示:CoordinateBolt主要用于两

6、个场景: DRPC Transactional TopologyCoordinatedBolt对于业务是有侵入的,要使用CoordinatedBolt提供的功能,你必须要保证你的每个bolt发送的每个tuple的第一个field是request-id。 所谓的“我已经处理完我的上游”的意思是说当前这个bolt对于当前这个request-id所需要做的工作做完了。这个request-id在DRPC里面代表一个DRPC请求;在Transactional Topology里面代表一个batch。5.1.4 Trasactional TopologyStorm提供的Transactional Topo

7、logy将batch计算分为process和commit两个阶段。Process阶段可以同时处理多个batch,不用保证顺序性;commit阶段保证batch的强顺序性,并且一次只能处理一个batch,第1个batch成功提交之前,第2个batch不能被提交。还是以统计消息总数为例,以下代码来自storm-starter里面的TransactionalGlobalCount。MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA,new Fields(“word“),PARTITION_TAKE_PER_BATCH)

8、;TransactionalTopologyBuilder builder =newTransactionalTopologyBuilder(“global-count“, “spout“, spout, 3);builder.setBolt(“partial-count“,newBatchCount(), 5).noneGrouping(“spout“);builder.setBolt(“sum“,newUpdateGlobalCount().globalGrouping(“partial-count“);TransactionalTopologyBuilder共接收四个参数。 这个Tran

9、sactional Topology的id。Id用来在Zookeeper中保存当前topology的进度,如果这个topology重启,可以继续之前的进度执行。 Spout在这个topology中的id 一个TransactionalSpout。一个Trasactional Topology中只能有一个TrasactionalSpout.在本例中是一个MemoryTransactionalSpout,从一个内存变量(DATA)中读取数据。 TransactionalSpout的并行度(可选)。下面是BatchCount的定义:publicstaticclassBatchCountextends

10、BaseBatchBolt Object _id; BatchOutputCollector _collector;int_count = 0; Overridepublicvoidprepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) _collector = collector; _id = id; Overridepublicvoidexecute(Tuple tuple) _count+; OverridepublicvoidfinishBatch() _collecto

11、r.emit(newValues(_id, _count); OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(newFields(“id“, “count“); BatchCount的prepare方法的最后一个参数是batch id,在Transactional Tolpoloyg里面这id是一个TransactionAttempt对象。Transactional Topology里发送的tuple都必须以TransactionAttempt作为第一个field,sto

12、rm根据这个field来判断tuple属于哪一个batch。TransactionAttempt包含两个值:一个transaction id,一个attempt id。transaction id的作用就是我们上面介绍的对于每个batch中的tuple是唯一的,而且不管这个batch replay多少次都是一样的。attempt id是对于每个batch唯一的一个id, 但是对于同一个batch,它replay之后的attempt id跟replay之前就不一样了, 我们可以把attempt id理解成replay-times, storm利用这个id来区别一个batch发射的tuple的不同

13、版本。execute方法会为batch里面的每个tuple执行一次,你应该把这个batch里面的计算状态保持在一个本地变量里面。对于这个例子来说, 它在execute方法里面递增tuple的个数。最后, 当这个bolt接收到某个batch的所有的tuple之后, finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它的输出流里面去。下面是UpdateGlobalCount类的定义:publicstaticclassUpdateGlobalCountextendsBaseTransactionalBoltimplementsICommitter

14、TransactionAttempt _attempt; BatchOutputCollector _collector;int_sum = 0; Overridepublicvoidprepare(Mapconf, TopologyContext context,BatchOutputCollector collector, TransactionAttempt attempt) _collector = collector; _attempt = attempt; Overridepublicvoidexecute(Tuple tuple) _sum+=tuple.getInteger(1

15、); OverridepublicvoidfinishBatch() Value val =DATABASE.get(GLOBAL_COUNT_KEY); Value newval;if(val =null| !val.txid.equals(_attempt.getTransactionId() newval =newValue(); newval.txid = _attempt.getTransactionId();if(val=null) newval.count = _sum; else newval.count = _sum + val.count; DATABASE.put(GLO

16、BAL_COUNT_KEY, newval); else newval = val; _collector.emit(newValues(_attempt, newval.count); OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(newFields(“id“, “sum“); UpdateGlobalCount实现了ICommitter接口,所以storm只会在commit阶段执行finishBatch方法。而execute方法可以在任何阶段完成。在UpdateGl

17、obalCount的finishBatch方法中,将当前的transaction id与数据库中存储的id做比较。如果相同,则忽略这个batch;如果不同,则把这个batch的计算结果加到总结果中,并更新数据库。Transactional Topolgy运行示意图如下:下面总结一下Transactional Topology的一些特性 Transactional Topology将事务性机制都封装好了,其内部使用CoordinateBolt来保证一个batch中的tuple被处理完。 TransactionalSpout只能有一个,它将所有tuple分为一个一个的batch,而且保证同一个ba

18、tch的transaction id始终一样。 BatchBolt处理batch在一起的tuples。对于每一个tuple调用execute方法,而在整个batch处理完成的时候调用finishBatch方法。 如果BatchBolt被标记成Committer,则只能在commit阶段调用finishBolt方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。 Transactional Topology隐藏了anchor/ack框架,它提供一个不同的机制来fail一个batch,从而使得这个batch被replay。5.2 Trident介绍Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。如果你使用过Pig或Cascading,对这些接口就不会陌生。Trident将stream中的tuples分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。Trident事务性原理这里不详细介绍,有兴趣的读者请自行查阅资料。参考:

展开阅读全文
温馨提示:
1: 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
2: 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
3.本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
5. 装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
关于我们 - 网站声明 - 网站地图 - 资源地图 - 友情链接 - 网站客服 - 联系我们

copyright@ 2023-2025  zhuangpeitu.com 装配图网版权所有   联系电话:18123376007

备案号:ICP2024067431-1 川公网安备51140202000466号


本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有。装配图网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知装配图网,我们立即给予删除!