现在Storm学习有没有什么可靠的机构啊?

Storm 的可靠性保证测试 - 知乎专栏
{"debug":false,"apiRoot":"","paySDK":"/api/js","wechatConfigAPI":"/api/wechat/jssdkconfig","name":"production","instance":"column","tokens":{"X-XSRF-TOKEN":null,"X-UDID":null,"Authorization":"oauth c3cef7c66aa9e6a1e3160e20"}}
{"database":{"Post":{"":{"title":"Storm 的可靠性保证测试","author":"mei-tuan-dian-ping-ji-shu-tuan-dui","content":" 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。在美团点评公司内部,实时计算主要应用场景包括实时日志解析、用户行为分析、实时消息推送、消费趋势展示、实时新客判断、实时活跃用户数统计等。这些数据提供给各事业群,并作为他们实时决策的有力依据,弥补了离线计算“T+1”的不足。在实时计算中,用户不仅仅关心时效性的问题,同时也关心消息处理的成功率。本文将通过实验验证 Storm 的消息可靠性保证机制,文章分为消息保证机制、测试目的、测试环境、测试场景以及总结等五节。Storm 的消息保证机制Storm 提供了三种不同层次的消息保证机制,分别是 At Most Once、At Least Once 以及 Exactly Once。消息保证机制依赖于消息是否被完全处理。消息完全处理每个从 Spout(Storm 中数据源节点)发出的 Tuple(Storm 中的最小消息单元)可能会生成成千上万个新的 Tuple,形成一棵 Tuple 树,当整棵 Tuple 树的节点都被成功处理了,我们就说从 Spout 发出的 Tuple 被完全处理了。 我们可以通过下面的例子来更好地诠释消息被完全处理这个概念:TopologyBuilder builder = new TopologyBuilder();\nbuilder.setSpout(\"sentences\", new KafkaSpout(spoutConfig), spoutNum);\nbuilder.setBolt(\"split\", new SplitSentence(), 10)\n
.shuffleGrouping(\"sentences\");\nbuilder.setBolt(\"count\", new WordCount(), 20)\n
.fieldsGrouping(\"split\", new Fields(\"word\"));\n这个 Topology 从 Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的 Bolt 将收到的句子分割成单独的单词,并进行计数。每一个从 Spout 发送出来的 Tuple 会衍生出多个新的 Tuple,从 Spout 发送出来的 Tuple 以及后续衍生出来的 Tuple 形成一棵 Tuple 树,下图是一棵 Tuple 树示例:上图中所有的 Tuple 都被成功处理了,我们才认为 Spout 发出的 Tuple 被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为 30 秒),有至少一个 Tuple 处理失败或超时,则认为整棵 Tuple 树处理失败,即从 Spout 发出的 Tuple 处理失败。如何实现不同层次的消息保证机制Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。Tuple 的完全处理需要 Spout、Bolt 以及 Acker(Storm 中用来记录某棵 Tuple 树是否被完全处理的节点)协同完成,如上图所示。从 Spout 发送 Tuple 到下游,并把相应信息通知给 Acker,整棵 Tuple 树中某个 Tuple 被成功处理了都会通知 Acker,待整棵 Tuple 树都被处理完成之后,Acker 将成功处理信息返回给 Spout;如果某个 Tuple 处理失败,或者超时,Acker 将会给 Spout 发送一个处理失败的消息,Spout 根据 Acker 的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传。Storm 提供的三种不同消息保证机制中。利用 Spout、Bolt 以及 Acker 的组合我们可以实现 At Most Once 以及 At Least Once 语义,Storm 在 At Least Once 的基础上进行了一次封装(Trident),从而实现 Exactly Once 语义。Storm 的消息保证机制中,如果需要实现 At Most Once 语义,只需要满足下面任何一条即可:关闭 ACK 机制,即 Acker 数目设置为 0Spout 不实现可靠性传输Spout 发送消息是使用不带 message ID 的 API不实现 fail 函数Bolt 不把处理成功或失败的消息发送给 Acker如果需要实现 At Least Once 语义,则需要同时保证如下几条:开启 ACK 机制,即 Acker 数目大于 0Spout 实现可靠性传输保证Spout 发送消息时附带 message 的 ID如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数Bolt 在处理成功或失败后需要调用相应的方法通知 Acker实现 Exactly Once 语义,则需要在 At Least Once 的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现。下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游接收到的消息。从图中可以知道,At Most Once 中,消息可能会丢失(上游发送了两个 A,下游只收到一个 A);At Least Once 中,消息不会丢失,可能重复(上游只发送了一个 B ,下游收到两个 B);Exactly Once 中,消息不丢失、不重复,因此需要在 At Least Once 的基础上保存相应的状态,表示上游的哪些消息已经成功发送到下游,防止同一条消息发送多次给下游的情况。测试目的Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三种不同层次的消息保证机制,我们希望通过相关测试,达到如下目的:三种消息保证机制的表现,是否与官方的描述相符;At Most Once 语义下,消息的丢失率和什么有关系、关系如何;At Least Once 语义下,消息的重复率和什么有关系、关系如何。测试环境本文的测试环境如下: 每个 worker(worker 为一个 物理 JVM 进程,用于运行实际的 Storm 作业)分配 1 CPU 以及 1.6G 内存。Spout、Bolt、Acker 分别跑在单独的 worker 上。并通过在程序中控制抛出异常以及人工 Kill Spout/Bolt/Acker 的方式来模拟实际情况中的异常情况。三种消息保证机制的测试均由 Spout 从 Kafka 读取测试数据,经由相应 Bolt 进行处理,然后发送到 Kafka,并将 Kafka 上的数据同步到 MySQL 方便最终结果的统计,如下图所示:测试数据为 Kafka 上顺序保存的一系列纯数字,数据量分别有十万、五十万、一百万等,每个数字在每个测试样例中出现且仅出现一次。测试场景对于三种不同的消息保证机制,我们分别设置了不同的测试场景,来进行充分的测试。其中为了保证 Spout/Bolt/Acker 发生异常的情况下不影响其他节点,在下面的测试中,所有的节点单独运行在独立的 Worker 上。At Most Once从背景中可以得知,如果希望实现 At Most Once 语义,将 Acker 的数目设置为 0 即可,本文的测试过程中通过把设置 Acker 为 0 来进行 At Most Once 的测试。输入数据保存在 Kafka 上的一系列纯数字,数据量从十万到五百万不等,每个测试样例中,同一个数字在 Kafka 中出现且仅出现一次。测试结果异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量000%000%异常次数测试数据总量结果集中不同 Tuple 的总量丢失的 Tuple 数据量Tuple 的丢失百分比Tuple 的重复量.50%.09%.57%2.64%0结论不发生异常的情况下,消息能够不丢不重;Bolt 发生异常的情况下,消息会丢失,不会重复,其中消息的丢失数目与异常次数正相关。与官方文档描述相符,符合预期。At Least Once为了实现 At Least Once 语义,需要 Spout、Bolt、Acker 进行配合。我们使用 Kafka-Spout 并通过自己管理 offset 的方式来实现可靠的 Spout;Bolt 通过继承 BaseBasicBolt,自动帮我们建立 Tuple 树以及消息处理之后通知 Acker;将 Acker 的数目设置为 1,即打开 ACK 机制,这样整个 Topology 即可提供 At Least Once 的语义。测试数据Kafka 上保存的十万到五十万不等的纯数字,其中每个测试样例中,每个数字在 Kafka 中出现且仅出现一次。测试结果Acker 发生异常的情况异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量最大积压量0--02000(默认值)0--异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量最大积压量Spout 发生异常的情况异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量0--00--00--00--0异常的次数测试数据总量结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量Bolt 发生异常的情况调用 emit 函数之前发生异常异常次数结果集中不重复的 Tuple 数数据重复的次数 (&1)出现重复的 Tuple 数数据丢失量异常次数结果集中不重复的 Tuple 数数据重复的次数 (&1)出现重复的 Tuple 数数据丢失量调用 emit 函数之后发生异常异常次数结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量异常次数结果集中不重复的 Tuple 数数据重复的次数(&1)出现重复的 Tuple 数数据丢失数量2110结论从上面的表格中可以得到,消息不会丢失,可能发生重复,重复的数目与异常的情况相关。不发生任何异常的情况下,消息不会重复不会丢失。Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。Bolt 发生异常的情况:emit 之前发生异常,消息不会重复。emit 之后发生异常,消息重复的次数等于异常的次数。结论与官方文档所述相符,每条消息至少发送一次,保证数据不会丢失,但可能重复,符合预期。Exactly Once对于 Exactly Once 的语义,利用 Storm 中的 Trident 来实现。测试数据Kafka 上保存的一万到一百万不等的数字,每个数字在每次测试样例中出现且仅出现一次。测试结果Spout 发生异常情况异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和Acker 发生异常的情况异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和Bolt 发生异常的情况异常数测试数据量结果集中不重复的 Tuple 数结果集中所有 Tuple 的总和结论在所有情况下,最终结果集中的消息不会丢失,不会重复,与官方文档中的描述相符,符合预期。总结对 Storm 提供的三种不同消息保证机制,用户可以根据自己的需求选择不同的消息保证机制。不同消息可靠性保证的使用场景对于 Storm 提供的三种消息可靠性保证,优缺点以及使用场景如下所示:可靠性保证层次优点缺点使用场景At most once处理速度快数据可能丢失都处理速度要求高,且对数据丢失容忍度高的场景At least once数据不会丢失数据可能重复不能容忍数据丢失,可以容忍数据重复的场景Exactly once数据不会丢失,不会重复处理速度慢对数据不丢不重性质要求非常高,且处理速度要求没那么高,比如支付金额如何实现不同层次的消息可靠性保证对于 At Least Once 的保证需要做如下几步:需要开启 ACK 机制,即 Topology 中的 Acker 数量大于零;Spout 是可靠的。即 Spout 发送消息的时候需要附带 msgId,并且实现失败消息重传功能(fail 函数 ,可以参考下面的 Spout 代码);Bolt 在发送消息时,需要调用 emit(inputTuple, outputTuple)进行建立 anchor 树(参考下面建立 anchor 树的代码),并且在成功处理之后调用 ack ,处理失败时调用 fail 函数,通知 Acker。不满足以上三条中任意一条的都只提供 At Most Once 的消息可靠性保证,如果希望得到 Exactly Once 的消息可靠性保证,可以使用 Trident 进行实现。不同层测的可靠性保证如何实现如何实现可靠的 Spout实现可靠的 Spout 需要在 nextTuple 函数中发送消息时,调用带 msgID 的 emit 方法,然后实现失败消息的重传(fail 函数),参考如下示例:/**\n
* 想实现可靠的 Spout,需要实现如下两点\n
* 1. 在 nextTuple 函数中调用 emit 函数时需要带一个
msgId,用来表示当前的消息(如果消息发送失败会用 msgId 作为参数回调 fail 函数)\n
* 2. 自己实现 fail 函数,进行重发(注意,在 storm 中没有 msgId 和消息的对应关系,需要自己进行维护)\n
*/\npublic void nextTuple() {\n
//设置 msgId 和 Value 一样,方便 fail 之后重发\n
collector.emit(new Values(curNum + \"\", round +
\"\"), curNum + \":\" + round);\n}\n\n@Override\npublic void fail(Object msgId) {//消息发送失败时的回调函数\nString tmp = (String)msgId;
//上面我们设置了 msgId 和消息相同,这里通过 msgId 解析出具体的消息\nString[] args = tmp.split(\":\");\n\n//消息进行重发\ncollector.emit(new Values(args[0], args[1]), msgId);\n}\n如何实现可靠的 BoltStorm 提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt,都可以实现可靠性消息传递,不过 BaseRichBolt 需要自己做很多周边的事情(建立 anchor 树,以及手动 ACK/FAIL 通知 Acker),使用场景更广泛,而 BaseBasicBolt 则由 Storm 帮忙实现了很多周边的事情,实现起来方便简单,但是使用场景单一。如何用这两个 Bolt 实现(不)可靠的消息传递如下所示://BaseRichBolt 实现不可靠消息传递\npublic class SplitSentence extends BaseRichBolt {//不建立 anchor 树的例子\n
OutputCollector _\n\n
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {\n
_collector =\n
public void execute(Tuple tuple) {\n
String sentence = tuple.getString(0);\n
for(String word: sentence.split(\" \")) {\n
_collector.emit(new Values(word));
// 不建立 anchor 树\n
_collector.ack(tuple);
//手动 ack,如果不建立 anchor 树,是否 ack 是没有区别的,这句可以进行注释\n
public void declareOutputFields(OutputFieldsDeclarer declarer) {\n
declarer.declare(new Fields(\"word\"));\n
\n}\n\n//BaseRichBolt 实现可靠的 Bolt\npublic class SplitSentence extends BaseRichBolt {//建立 anchor 树以及手动 ack 的例子\n
OutputCollector _\n\n
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {\n
_collector =\n
public void execute(Tuple tuple) {\n
String sentence = tuple.getString(0);\n
for(String word: sentence.split(\" \")) {\n
_collector.emit(tuple, new Values(word));
// 建立 anchor 树\n
_collector.ack(tuple);
//手动 ack,如果想让 Spout 重发该 Tuple,则调用 _collector.fail(tuple);\n
public void declareOutputFields(OutputFieldsDeclarer declarer) {\n
declarer.declare(new Fields(\"word\"));\n
\n}\n\n下面的示例会可以建立 Multi-anchoring\nList&Tuple& anchors = new ArrayList&Tuple&();\nanchors.add(tuple1);\nanchors.add(tuple2);\n_collector.emit(anchors, new Values(1, 2, 3));\n\n//BaseBasicBolt 是吸纳可靠的消息传递\npublic class SplitSentence extends BaseBasicBolt {//自动建立 anchor,自动 ack\n
public void execute(Tuple tuple, BasicOutputCollector collector) {\n
String sentence = tuple.getString(0);\n
for(String word: sentence.split(\" \")) {\n
collector.emit(new Values(word));\n
public void declareOutputFields(OutputFieldsDeclarer declarer) {\n
declarer.declare(new Fields(\"word\"));\n
\n}\nTrident在 Trident 中,Spout 和 State 分别有三种状态,如下图所示:其中表格中的 Yes 表示相应的 Spout 和 State 组合可以实现 Exactly Once 语义,No 表示相应的 Spout 和 State 组合不保证 Exactly Once 语义。下面的代码是一个 Trident 示例:
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
//Opaque Spout\n
//TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf);
//Transaction Spout\n\n
TridentTopology topology = new TridentTopology();\n
String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);\n
Stream stream = topology.newStream(spoutTxid, spout)\n
.name(\"new stream\")\n
.parallelismHint(1);\n\n
// kafka config\n
KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
//KafkaProducerConfig 仅对 kafka 相关配置进行了封装,具体可以参考 TridentKafkaStateFactory2(Map&String, String& config)\n
Map&String, String& kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);\n
TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper();
//TridentToKafkaMapper 继承自 TridentTupleToKafkaMapper&String, String&,实现 getMessageFromTuple 接口,该接口中返回 tridentTuple.getString(0);\n\n
dstTopic = \"test__topic_for_all\";\n\n
TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);\n
stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);\n
stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));\n\n
stream.each(new Fields(\"bytes\"), new AddMarkFunction(), new Fields(\"word\")) //从spout 出来数据是一个 bytes 类型的数据,第二个是参数是自己的处理函数,第三个参数是处理函数的输出字段\n
.name(\"write2kafka\")\n
.partitionPersist(stateFactory
//将数据写入到 Kafka 中,可以保证写入到 Kafka 的数据是 exactly once 的\n
, new Fields(\"word\")\n
, new TridentKafkaUpdater())\n
.parallelismHint(1);\n不想错过技术博客更新?想给文章评论、和作者互动?第一时间获取技术沙龙信息?请关注我们的官方微信公众号“美团点评技术团队”。","updated":"T03:31:47.000Z","canComment":false,"commentPermission":"anyone","commentCount":4,"collapsedCount":0,"likeCount":41,"state":"published","isLiked":false,"slug":"","isTitleImageFullScreen":false,"rating":"none","titleImage":"","links":{"comments":"/api/posts//comments"},"reviewers":[],"topics":[{"url":"/topic/","id":"","name":"分布式计算"},{"url":"/topic/","id":"","name":"实时计算"},{"url":"/topic/","id":"","name":"数据分析工具"}],"adminClosedComment":false,"titleImageSize":{"width":0,"height":0},"href":"/api/posts/","excerptTitle":"","tipjarState":"closed","annotationAction":[],"sourceUrl":"","pageCommentsCount":4,"snapshotUrl":"","publishedTime":"T11:31:47+08:00","url":"/p/","lastestLikers":[{"bio":"科技宅男","isFollowing":false,"hash":"cccda70aaee7d6f","uid":96,"isOrg":false,"slug":"xinggev5","isFollowed":false,"description":"双子分裂男","name":"行哥","profileUrl":"/people/xinggev5","avatar":{"id":"f05faeda2a65","template":"/{id}_{size}.jpg"},"isOrgWhiteList":false},{"bio":"呵呵","isFollowing":false,"hash":"1272dad85ddb8cfd3967c8a","uid":04,"isOrg":false,"slug":"lu-yang-11-44","isFollowed":false,"description":"","name":"逯阳","profileUrl":"/people/lu-yang-11-44","avatar":{"id":"da8e974dc","template":"/{id}_{size}.jpg"},"isOrgWhiteList":false},{"bio":"发明当饭吃 / 完美主义系统工程师","isFollowing":false,"hash":"9a9fc22dd0acc7c25107","uid":00,"isOrg":false,"slug":"quxiaofeng","isFollowed":false,"description":"智能硬件 / 智能家居 / 生物特征识别 / 机器视觉 / 嵌入式设备设计与制作 / 排球 / 游泳 / 外语(英法俄德日西意韩)/喜欢玩 LaTeX, Julia, Python, Matlab, Ruby。香港理工大学哲学博士。会写论文和专利,会做系统和网站,也会焊接 BGA 的算法工程师。知乎 14225 号用户。","name":"曲晓峰","profileUrl":"/people/quxiaofeng","avatar":{"id":"v2-2ed2a38ff50ea6f8fc2e3ae6c239ff68","template":"/{id}_{size}.jpg"},"isOrgWhiteList":false},{"bio":"天下为公","isFollowing":false,"hash":"c2d32c05f526a3cb6657fb41bfaf0c57","uid":08,"isOrg":false,"slug":"chym","isFollowed":false,"description":"文不能测字,武不能防身","name":"chym","profileUrl":"/people/chym","avatar":{"id":"3d08b49b7","template":"/{id}_{size}.jpg"},"isOrgWhiteList":false},{"bio":null,"isFollowing":false,"hash":"0dbf69ffe10ef","uid":96,"isOrg":false,"slug":"dai-si-meng-89","isFollowed":false,"description":"","name":"戴斯孟","profileUrl":"/people/dai-si-meng-89","avatar":{"id":"da8e974dc","template":"/{id}_{size}.jpg"},"isOrgWhiteList":false}],"summary":" 是一个分布式的实时计算框架,可以很方便地对流式数据进行实时处理和分析,能运用在实时分析、在线数据挖掘、持续计算以及分布式 RPC 等场景下。Storm 的实时性可以使得数据从收集到处理展示在秒级别内完成,从而为业务方决策提供实时的数据支持。在…","reviewingCommentsCount":0,"meta":{"previous":null,"next":null},"annotationDetail":null,"commentsCount":4,"likesCount":41,"FULLINFO":true}},"User":{"mei-tuan-dian-ping-ji-shu-tuan-dui":{"isFollowed":false,"name":"美团点评技术团队","headline":"从最初只有几位工程师的技术组,到如今几千优秀工程师,美团·大众点评技术团队支撑并驱动着中国最大生活服务O2O业务的高速增长。人是美团·大众点评最重要的产品,打造最优秀的互联网研发团队是我们的愿景。","avatarUrl":"/9fee8de728e8_s.jpg","isFollowing":false,"type":"org","slug":"mei-tuan-dian-ping-ji-shu-tuan-dui","bio":"用技术连接人和服务,点亮精彩生活","hash":"ee7e749bb9ecbeb446a0","uid":040900,"isOrg":true,"description":"从最初只有几位工程师的技术组,到如今几千优秀工程师,美团·大众点评技术团队支撑并驱动着中国最大生活服务O2O业务的高速增长。人是美团·大众点评最重要的产品,打造最优秀的互联网研发团队是我们的愿景。","profileUrl":"/org/mei-tuan-dian-ping-ji-shu-tuan-dui","avatar":{"id":"9fee8de728e8","template":"/{id}_{size}.jpg"},"isOrgWhiteList":true,"badge":{"identity":null,"bestAnswerer":null}}},"Comment":{},"favlists":{}},"me":{},"global":{},"columns":{},"columnPosts":{},"postComments":{},"postReviewComments":{"comments":[],"newComments":[],"hasMore":true},"favlistsByUser":{},"favlistRelations":{},"promotions":{},"switches":{"couldAddVideo":false},"draft":{"titleImage":"","titleImageSize":{},"isTitleImageFullScreen":false,"canTitleImageFullScreen":false,"title":"","titleImageUploading":false,"error":"","content":"","draftLoading":false,"globalLoading":false,"pendingVideo":{"resource":null,"error":null}},"drafts":{"draftsList":[]},"config":{"userNotBindPhoneTipString":{}},"recommendPosts":{"articleRecommendations":[],"columnRecommendations":[]},"env":{"isAppView":false,"appViewConfig":{"content_padding_top":128,"content_padding_bottom":56,"content_padding_left":16,"content_padding_right":16,"title_font_size":22,"body_font_size":16,"is_dark_theme":false,"can_auto_load_image":true,"app_info":"OS=iOS"},"isApp":false},"sys":{}}您所在的位置: &
Storm如何保证可靠性
Storm如何保证可靠性
赵必厦/程丽明
清华大学出版社
《从零开始学Storm》第2章Storm的基本知识,本章将学习Storm的基本知识,包括Storm的基本概念、配置、序列化、容错机制、可靠性机制、消息机制、开发环境与生产环境、并行度和命令行客户端等内容。本节为大家介绍Storm如何保证可靠性。
2.5.3& Storm如何保证可靠性
作为受益于Storm可靠性功能的用户,有两件事情必须完成:一是当在树的元组上创建一个新链接时需要通知Storm,二是处理完成一个单一元组时需要通知Storm。通过这两件事情,当树上的元组被完全处理时Storm可以检测到,并且可以相应地对Spout的元组进行确认。Storm的API提供了一个简洁的方法来完成这些任务。
在元组树中指定一个链接,此链接被称为锚定(Anchoring)。Anchoring在发射一个新的元组的同一时间完成。让我们使用以下Bolt为例进行介绍,这个Bolt将包含一个句子的元组划分为一个包含每个单词的锚定:
public&class&SplitSentence&extends&BaseRichBolt&{ &&&&&OutputCollector&_ &&&&& &&&&&public&void&prepare(Map&conf,&TopologyContext&context,&OutputCollector& &collector)&{ &&&&&&&&&_collector&=& &&&&&} &&&&&&public&void&execute(Tuple&tuple)&{ &&&&&&&&&String&sentence&=&tuple.getString(0); &&&&&&&&&for(String&word:&sentence.split(&&&))&{ &&&&&&&&&&&&&_collector.emit(tuple,&new&Values(word)); &&&&&&&&&} &&&&&&&&&_collector.ack(tuple); &&&&&} &&&&&&public&void&declareOutputFields(OutputFieldsDeclarer&declarer)&{ &&&&&&&&&declarer.declare(new&Fields(&word&)); &&&&&} &}&
通过指定输入元组作为第一个参数来发射,每个单词元组被锚定(anchored)。因为这个单词元组是被锚定的,如果单词元组未能被下游处理,树的根的Spout元组将在稍后重发。相反,如果单词元组的发射操作如下,让我们看看会发生什么:
_collector.emit(new&Values(word));&
这种方式发射的单词元组导致未被锚定(unanchored)。如果元组未被下游处理,根元组将不会重发。这取决于你需要的Topology的容错保证,有时候需要相应地发射一个未被锚定的元组。
一个输出元组可以被锚定到多个输入元组。在做流媒体连接或聚合时,这是有用的。一个复合锚定(multi-anchore)元组未能被处理将导致来自Spout的多个元组重发。复合锚定是通过指定一个元组列表而不仅仅是一个元组来完成的。例如:
List&anchors&=&new&ArrayList(); &anchors.add(tuple1); &anchors.add(tuple2); &_collector.emit(anchors,&new&Values(1,&2,&3));&
复合锚定添加输出元组到多个元组树。请注意,对于复合锚定还可以打破树结构,创建元组的有向无环图,如图2.4所示。
Storm的实现适用于有向无环图以及元组树。
锚定是你如何指定元组树。当完成处理元组树中的单个元组时,Storm的可靠API将指定下一个元组和最后一个元组。这是通过使用OutputCollector类的ack和fail方法完成的。如果回顾SplitSentence示例,可以看到输入元组处于acked状态,毕竟这个单词元组已经被发射。
可以使用OutputCollector类的fail()方法立即失败在元组树的根部的Spout的元组。例如,应用程序可以选择从数据库客户端捕获到一个异常,明确失败输入元组。由于没有明确的元组,Spout元组回放的速度比等待tuple超时的速度更快。
你处理的每一个元组必须ack或者fail。Storm使用内存来追踪每个元组,所以如果不ack/fail每个元组,任务最终会耗尽内存。
很多Bolt遵循一个读取一个输入元组,发射元组,在execute方法确认元组的通用模式。这些Bolt具有类别过滤器和简单的功能。Storm有一个接口称为BasicBolt,为你封装这个模式。SplitSentence的例子可以使用BasicBolt写成:
public&class&SplitSentence&extends&BaseBasicBolt&{ &&&&&public&void&execute(Tuple&tuple,&BasicOutputCollector&collector)&{ &&&&&&&&&String&sentence&=&tuple.getString(0); &&&&&&&&&for(String&word:&sentence.split(&&&))&{ &&&&&&&&&&&&&collector.emit(new&Values(word)); &&&&&&&&&} &&&&&} &&&&&&public&void&declareOutputFields(OutputFieldsDeclarer&declarer)&{ &&&&&&&&&declarer.declare(new&Fields(&word&)); &&&&&}&&&&&&&& &}&
这个实现与之前的实现相比,语义上是相同的,但是更简单。元组发射到BasicOutputCollectorare是自动Anchoring到输入元组,当execute方法完成时,输入元组自动为你确认。
与此相反,做聚合或者连接操作的Bolt可能会延迟确认一个元组,直到它基于一群元组计算完结果。聚合和连接一般也会multi-anchore它们输出元组。这些东西不属于简单的IBasicBolt模式。
喜欢的朋友可以添加我们的微信账号:
51CTO读书频道二维码
51CTO读书频道活动讨论群:
【责任编辑: TEL:(010)】&&&&&&
关于&&的更多文章
Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大
本书描述了黑客用默默无闻的行动为数字世界照亮了一条道路的故事。
讲师: 54人学习过讲师: 91人学习过讲师: 103人学习过
从早期的B2B、B2C发展到现在的O2O,电子商务是商业发
《Linux 系统运维》以CentOS 6 为蓝本,主要介绍了Lin
《Learning Android中文版(第2版)》是一本介绍Andro
本书针对初级网管朋友所需掌握的网络组建和网络管理技能,以示例方式编写而成,其主要特点就是实用性和可操作性非常强。
51CTO旗下网站

我要回帖

 

随机推荐