Twitter Storm如何保证消息不丢失

转自:http://xumingming.sinaapp.com/127/twitter-storm如何保证消息不丢失/

storm保证从spout发出的每个tuple都会被完全处理。这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点。

一个tuple被”完全处理”是什么意思?

就如同蝴蝶效应一样,从spout发射的一个tuple可以引起其它成千上万个tuple因它而产生, 想想那个计算一篇文章中每个单词出现次数的topology.

帮助


1

2

3

4

5

6

7

8

9

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

                                     22133,

                                     "sentence_queue",

                                     new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

        .shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

        .fieldsGrouping(2, new Fields("word"));

这个topology从一个Kestrel队列读取句子, 把每个句子分割成一个个单词, 然后发射这一个个单词: 一个源tuple(一个句子)引起后面很多tuple的产生(一个个单词), 这个消息流大概是这样的:

统计单词出现次数的tuple树

在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。 而这个timetout可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来指定。

如果一个消息处理成功了或者失败了会发生什么?

FYI。 下面这个是spout要实现的接口:

帮助


1

2

3

4

5

6

7

8

public interface ISpout extends Serializable {

    void open(Map conf, TopologyContext context,

              SpoutOutputCollector collector);

    void close();

    void nextTuple();

    void ack(Object msgId);

    void fail(Object msgId);

}

首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法参数里面提供的SpoutOutputCollector来发射新tuple到它的其中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面我们通过这个message-id来追踪这个tuple。举例来说, KestrelSpout从kestrel队列里面读取一个消息,并且把kestrel提供的消息id作为message-id, 看例子:

帮助


1

_collector.emit(new Values("field1","field2", 3),msgId);

接下来, 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。如果storm检测到一个tuple被完全处理了, 那么storm会以最开始的那个message-id作为参数去调用消息源的ack方法;反之storm会调用spout的fail方法。值得注意的一点是, storm调用ack或者fail的task始终是产生这个tuple的那个task。所以如果一个spout被分成很多个task来执行, 消息执行的成功失败与否始终会通知最开始发出tuple的那个task。

我们再以KestrelSpout为例来看看spout需要做些什么才能保证“一个消息始终被完全处理”, 当KestrelSpout从Kestrel里面读出一条消息, 首先它“打开”这条消息, 这意味着这条消息还在kestrel队列里面, 不过这条消息会被标示成“处理中”直到ack或者fail被调用。处于“处理中“状态的消息不会被发给其他消息处理者了;并且如果这个spout“断线”了, 那么所有处于“处理中”状态的消息会被重新标示成“等待处理”.

Storm的可靠性API

作为storm的使用者,有两件事情要做以更好的利用storm的可靠性特征。 首先,在你生成一个新的tuple的时候要通知storm; 其次,完成处理一个tuple之后要通知storm。 这样storm就可以检测整个tuple树有没有完成处理,并且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

由一个tuple产生一个新的tuple称为: anchoring。你发射一个新tuple的同时也就完成了一次anchoring。看下面这个例子: 这个bolt把一个包含一个句子的tuple分割成每个单词一个tuple。

帮助


01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

public class SplitSentence implements IRichBolt {

        OutputCollector _collector;

        public void prepare(Map conf,

                            TopologyContext context,

                            OutputCollector collector) {

            _collector = collector;

        }

        public void execute(Tuple tuple) {

            String sentence = tuple.getString(0);

            for(String word: sentence.split(" ")) {

                _collector.emit(tuple, new Values(word));

            }

            _collector.ack(tuple);

        }

        public void cleanup() {

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields("word"));

        }

    }

看一下这个execute方法, emit的第一个参数是输入tuple, 第二个参数则是输出tuple, 这其实就是通过输入tuple anchoring了一个新的输出tuple。因为这个“单词tuple”被anchoring在“句子tuple”一起, 如果其中一个单词处理出错,那么这整个句子会被重新处理。作为对比, 我们看看如果通过下面这行代码来发射一个新的tuple的话会有什么结果。

帮助


1

_collector.emit(new Values(word));

用这种方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), 如果这个tuple处理失败了, 整个句子不会被重新处理。到底要anchoring还是要 unanchoring则完全取决于你的业务需求。

一个输出tuple可以被anchoring到多个输入tuple。这种方式在stream合并或者stream聚合的时候很有用。一个多入口tuple处理失败的话,那么它对应的所有输入tuple都要重新执行。看看下面演示怎么指定多个输入tuple:

帮助


1

2

3

4

List<Tuple> anchors = new ArrayList<Tuple>();

anchors.add(tuple1);

anchors.add(tuple2);

_collector.emit(anchors, new Values(1, 2, 3));

多入口tuple把这个新tuple加到了多个tuple树里面去了。

我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完当个tuple的时候告诉storm,  通过OutputCollector类的ack和fail方法来做,如果你回过头来看看SplitSentence的例子, 你可以看到“句子tuple”在所有“单词tuple”被发出之后调用了ack。

你可以调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail, 比如你查询了数据库,发现一个错误,你可以马上fail那个输入tuple, 这样可以让这个tuple被快速的重新处理, 因为你不需要等那个timeout时间来让它自动fail。

每个你处理的tuple, 必须被ack或者fail。因为storm追踪每个tuple要占用内存。所以如果你不ack/fail每一个tuple, 那么最终你会看到OutOfMemory错误。

大多数Bolt遵循这样的规律:读取一个tuple;发射一些新的tuple;在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。如果用BasicBolt来做, 上面那个SplitSentence可以改写成这样:

帮助


01

02

03

04

05

06

07

08

09

10

11

12

13

14

15

16

17

18

19

20

21

public class SplitSentence implements IBasicBolt {

        public void prepare(Map conf,

                            TopologyContext context) {

        }

        public void execute(Tuple tuple,

                            BasicOutputCollector collector) {

            String sentence = tuple.getString(0);

            for(String word: sentence.split(" ")) {

                collector.emit(new Values(word));

            }

        }

        public void cleanup() {

        }

        public void declareOutputFields(

                        OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields("word"));

        }

    }

这个实现比之前的实现简单多了, 但是功能上是一样的。发送到BasicOutputCollector的tuple会自动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack的。
作为对比,处理聚合和合并的bolt往往要处理一大堆的tuple之后才能被ack, 而这类tuple通常都是多输入的tuple, 所以这个已经不是IBasicBolt可以罩得住的了。

storm是怎么实现高效率的可靠性的?

storm里面有一类特殊的task称为:acker, 他们负责跟踪spout发出的每一个tuple的tuple树。当acker发现一个tuple树已经处理完成了。它会发送一个消息给产生这个tuple的那个task。你可以通过Config.TOPOLOGY_ACKERS来设置一个topology里面的acker的数量, 默认值是一。 如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。

理解storm的可靠性的最好的方法是来看看tuple和tuple树的生命周期, 当一个tuple被创建, 不管是spout还是bolt创建的, 它会被赋予一个64位的id,而acker就是利用这个id去跟踪所有的tuple的。每个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。所以当一个tuple被ack的时候,它会发一个消息给acker,告诉它这个tuple树发生了怎么样的变化。具体来说就是:它告诉acker: 我呢已经完成了, 我有这些儿子tuple, 你跟踪一下他们吧。下面这个图演示了C被ack了之后,这个tuple树所发生的变化。

tuple ack示例

关于storm怎么跟踪tuple还有一些细节, 前面已经提到过了, 你可以自己设定你的topology里面有多少个acker。而这又给我们带来一个问题, 当一个tuple需要ack的时候,它到底选择哪个acker来发送这个信息呢?

storm使用一致性哈希来把一个spout-tuple-id对应到acker, 因为每一个tuple知道它所有的祖宗的tuple-id, 所以它自然可以算出要通知哪个acker来ack。(这里所有的祖宗是指这个tuple所对应的所有的根tuple。这里注意因为一个tuple可能存在于多个tuple树,所以才有所有一说)。

storm的另一个细节是acker是怎么知道每一个spout tuple应该交给哪个task来处理。当一个spout发射一个新的tuple, 它会简单的发一个消息给一个合适的acker,并且告诉acker它自己的id(taskid), 这样storm就有了taskid-tupleid的对应关系。 当acker发现一个树完成处理了, 它知道给哪个task发送成功的消息。

acker task并不显式的跟踪tuple树。对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反, acker用了一种不同的方式, 使得对于每个spout tuple所需要的内存量是恒定的(20 bytes) .  这个跟踪算法是storm如何工作的关键,并且也是它的主要突破。

一个acker task存储了一个spout-tuple-id到一对值的一个mapping。这个对子的第一个值是创建这个tuple的taskid, 这个是用来在完成处理tuple的时候发送消息用的。 第二个值是一个64位的数字称作:”ack val”, ack val是整个tuple树的状态的一个表示,不管这棵树多大。它只是简单地把这棵树上的所有创建的tupleid/ack的tupleid一起异或(XOR)。

当一个acker task 发现一个 ack val变成0了, 它知道这棵树已经处理完成了。 因为tupleid是随机的64位数字, 所以, ack val碰巧变成0(而不是因为所有创建的tuple都完成了)的几率极小。算一下就知道了, 就算每秒发生10000个ack, 那么需要50000000万年才可能碰到一个错误。而且就算碰到了一个错误, 也只有在这个tuple失败的时候才会造成数据丢失。 关于Acker的详细工作流程的分析可以看看这篇文章: Twitter Storm源代码分析之acker工作流程

既然你已经理解了storm的可靠性算法, 让我们一起过一遍所有可能的失败场景,并看看storm在每种情况下是怎么避免数据丢失的。

1. 由于对应的task挂掉了,一个tuple没有被ack: storm的超时机制在超时之后会把这个tuple标记为失败,从而可以重新处理。

2. Acker挂掉了: 这种情况下由这个acker所跟踪的所有spout tuple都会超时,也就会被重新处理。

3. Spout挂掉了: 在这种情况下给spout发送消息的消息源负责重新发送这些消息。比如Kestrel和RabbitMQ在一个客户端断开之后会把所有”处理中“的消息放回队列。

就像你看到的那样, storm的可靠性机制是完全分布式的, 可伸缩的并且是高度容错的。

调整可靠性 (Tuning Reliability)

acker task是非常轻量级的, 所以一个topology里面不需要很多acker。你可以通过Strom UI(id: -1)来跟踪它的性能。 如果它的吞吐量看起来不正常,那么你就需要多加点acker了。

如果可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据, 那么你可以通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量减少一半, 因为对于每一个tuple都要发送一个ack消息。并且它需要更少的id来保存下游的tuple, 减少带宽占用。

有三种方法可以去掉可靠性。第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这种情况下, storm会在spout发射一个tuple之后马上调用spout的ack方法。也就是说这个tuple树不会被跟踪。

第二个方法是在tuple层面去掉可靠性。 你可以在发射tuple的时候不指定messageid来达到不跟粽某个特定的spout tuple的目的。

最后一个方法是如果你对于一个tuple树里面的某一部分到底成不成功不是很关心,那么可以在发射这些tuple的时候unanchor它们。 这样这些tuple就不在tuple树里面, 也就不会被跟踪了。

时间: 01-15

Twitter Storm如何保证消息不丢失的相关文章

【转】Twitter Storm如何保证消息不丢失

Twitter Storm如何保证消息不丢失 发表于 2011 年 09 月 30 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/127/twitter-storm如何保证消息不丢失/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing s

关于MQ的几件小事(四)如何保证消息不丢失

1.mq原则 数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决:不能少,就是说不能丢失数据.如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的. 2.丢失数据场景 丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了.下面从rabbitmq和kafka分别说一下,丢失数据的场景, (1)rabbitmq A:生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了. B:rabbit

Strom 消息处理机制 中英对照翻译 (Storm如何保证消息被完全处理)

官方链接: http://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html What does it mean for a message to be “fully processed”? A tuple coming off a spout can trigger thousands of tuples to be created based on it. Consider, for ex

【转】Twitter Storm: 在生产集群上运行topology

Twitter Storm: 在生产集群上运行topology 发表于 2011 年 10 月 07 日 由 xumingming 作者: xumingming | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明网址: http://xumingming.sinaapp.com/185/twitter-storm-在生产集群上运行topology/ 本文翻译自: https://github.com/nathanmarz/storm/wiki/Running-topologi

Twitter Storm: storm的一些常见模式

这篇文章列举出了storm topology里面的一些常见模式: 流聚合(stream join) 批处理(Batching) BasicBolt 内存内缓存 + fields grouping 组合 计算top N 用TimeCacheMap来高效地保存一个最近被更新的对象的缓存 分布式RPC: CoordinatedBolt和KeyedFairBolt 流聚合(stream join) 流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段.流聚合和SQL里面table

RabbitMQ如何保证消息99.99%被发送成功?

1. 本篇概要 RabbitMQ针对这个问题,提供了以下几个机制来解决: 生产者确认 持久化 手动Ack 本篇博客我们先讲解下生产者确认机制,剩余的机制后续单独写博客进行讲解. 2. 生产者确认 要想保证消息不丢失,首先我们得保证生产者能成功的将消息发送到RabbitMQ服务器. 但在之前的示例中,当生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何消息给生产者的,也就是默认情况下生产者是不知道消息有没有正确的到达服务器. 从b

解决RabbitMQ消息丢失问题和保证消息可靠性(一)

原文链接(作者一个人):https://juejin.im/post/5d468591f265da03b810427e 工作中经常用到消息中间件来解决系统间的解耦问题或者高并发消峰问题,但是消息的可靠性如何保证一直是个很大的问题,什么情况下消息就不见了?如何防止消息丢失?下面通过这篇文章,我们就聊聊RabbitMQ 消息可靠性如何解决的? 本文分三部分说明 RabbitMQ 消息丢失场景有哪些? 如何避免消息丢失? 如何设计部署消息中间件保证消息可靠性? RabbitMQ 消息丢失场景有哪些?

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

apache Storm学习之三-消息可靠性

4.1 简介 storm可以确保spout发送出来的每个消息都会被完整的处理.本章将会描述storm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理. 4.2 理解消息被完整处理 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KestrelSpout("kestrel.backtype.com", 22