Storm源码阅读之SpoutOutputCollector

不得不说storm是一个特别棒的实时计算框架。为了对后文理解的方便,先说几个storm中的术语:

Topology:拓扑图或者拓扑结构。在storm中它通过消息分组的分式连接Spout和Bolt节点定义了运算处理的拓扑结构。如下图:

那什么是Spout呢?

在计算任务需要的数据其实就是由Spout提供的,所以它可以说是Storm中的消息源,一般是从外部数据源(日志文件、数据库、消息队列等等)不间断地读取数据然后发送给tuple元组的。

那它是通过谁发送的呢?又是如何发送的呢?

这里我们先回答第一个问题,第二个问题以后解答。

好了上面说了那么多就是为了引出今天的任务:阅读SpoutOutputCollector源码。

在阅读之前,我们先明确一下SpoutOutputCollector到底是什么?其实从类名就能说出大概(不得不说老外写的代码的可读性真是好的没法说。这里啰嗦一句,

个人觉得这也是他们分享精神的体现,时刻记住方便给别人看。),它就是Spout输出收集器。

那它到底能干些啥呢?请看代码:

1.ISpoutOutputCollector:是SpoutOutputCollector的接口

 1 public interface ISpoutOutputCollector {
 2     /**
 3         发送tuple消息,并返回起发送任务的task的序列号集合
 4     */
 5     List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
 6     /**
 7     *与上述发送方法类似,只不过emitDirect方法是要指定接收端的task,让接收端特定的task接收消息。
 8     */
 9     void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId);
10     /**
11     *处理异常
12     */
13     void reportError(Throwable error);
14 }

从上述接口ISpoutOutputCollector源码可以看出ISpoutOutputCollector中声明了3个方法,两个属于发送tuple元组的方法,他们之间的差异在上述注释中已说的很清楚,还有一个处理异常的方法。

2.SpoutOutputCollector:它实现了接口ISpoutOutputCollector

 1 public class SpoutOutputCollector implements ISpoutOutputCollector {
 2     ISpoutOutputCollector _delegate;
 3
 4     public SpoutOutputCollector(ISpoutOutputCollector delegate) {
 5         _delegate = delegate;
 6     }
 7
 8     /**
 9      * 指定一个streamid和message发射tuple消息并返回起发送消息的task的序号。当tuple消息完全处理了,就会回调ack方法,否则会回调fail方法。
10      */
11     public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
12         return _delegate.emit(streamId, tuple, messageId);
13     }
14
15     /**
16      * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,这没有指定streamid,故采用默认的streamid
17      */
18     public List<Integer> emit(List<Object> tuple, Object messageId) {
19         return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
20     }
21
22     /**
23      * emit(String streamId, List<Object> tuple, Object messageId)
24      *的重载方法,这没有指定streamid,故采用默认的streamid,因为没有messageid,故ack方法和fail方法不会被调用
25      */
26     public List<Integer> emit(List<Object> tuple) {
27         return emit(tuple, null);
28     }
29
30     /**
31      * emit(String streamId, List<Object> tuple, Object messageId)的重载方法,因为没有messageid,故ack方法和fail方法不会被调用
32      */
33     public List<Integer> emit(String streamId, List<Object> tuple) {
34         return emit(streamId, tuple, null);
35     }
36
37     /**
38      * 发射tuple消息,不过需要指定接收端的task来接收,并且输出必须声明为直接流,同时指定用来接收消息的task必须采用直接分组的方式来接收消息.
39      *
40      */
41     public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
42         _delegate.emitDirect(taskId, streamId, tuple, messageId);
43     }
44
45     /**
46      * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,采用默认的streamid
47      */
48     public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
49         emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
50     }
51
52     /**
53      * emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId)的重载方法,因为没有指定的消息id,所以ack和fail方法就不会调用.
54      */
55     public void emitDirect(int taskId, String streamId, List<Object> tuple) {
56         emitDirect(taskId, streamId, tuple, null);
57     }
58
59     /**
60      * 该类提供的重载方法,因为没有指定的消息id,所以ack和fail方法就不会调用.
61      */
62     public void emitDirect(int taskId, List<Object> tuple) {
63         emitDirect(taskId, tuple, null);
64     }
65     /**
66      * 接口ISpoutOutputCollector中reportError的实现.
67      */
68     @Override
69     public void reportError(Throwable error) {
70         _delegate.reportError(error);
71     }
72 }

在SpoutOutputCollector类中,实现了消息发射的方法,并且还提供了多个重载方法方便用户使用。

时间: 08-18

Storm源码阅读之SpoutOutputCollector的相关文章

Apache Storm源码阅读笔记

欢迎转载,转载请注明出处. 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两位的指点,非常感谢.

storm源码阅读笔记之任务调度算法

3种Scheduler概述 EventScheduler:将系统中的可用资源均匀地分配给需要资源的topology,其实也不是绝对均匀,后续会详细说明 DefaultScheduler:和EvenetScheduler差不多,只不过会先将其它topology不需要的资源重新收集起来,再进行EventScheduler IsolationScheduler:用户可定义这个topology的机器资源,storm分配的时候会优先分配这些topology,以保证分配给该topology的机器只为这一个t

storm源码之storm代码结构【译】【转】

[原]storm源码之storm代码结构[译] 说明:本文翻译自Storm在GitHub上的官方Wiki中提供的Storm代码结构描述一节Structure of the codebase,希望对正在基于Storm进行源码级学习和研究的朋友有所帮助. Storm的源码共分为三个不同的层次. 首先,Storm在设计之初就考虑到了兼容多语言开发.Nimbus是一个thrift服务,topologies被定义为Thrift结构体.Thrift的运用使得Storm可以被任意开发语言使用. 其次,Stor

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划

body, td { font-family: tahoma; font-size: 10pt; } 淘宝数据库OceanBase SQL编译器部分 源码阅读--生成逻辑计划 SQL编译解析三部曲分为:构建语法树,生成逻辑计划,指定物理执行计划.第一步骤,在我的上一篇博客淘宝数据库OceanBase SQL编译器部分 源码阅读--解析SQL语法树里做了介绍,这篇博客主要研究第二步,生成逻辑计划. 一. 什么是逻辑计划?我们已经知道,语法树就是一个树状的结构组织,每个节点代表一种类型的语法含义.如

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

Memcache-Java-Client-Release源码阅读(之七)

一.主要内容 本章节的主要内容是介绍Memcache Client的Native,Old_Compat,New_Compat三个Hash算法的应用及实现. 二.准备工作 1.服务器启动192.168.0.106:11211,192.168.0.106:11212两个服务端实例. 2.示例代码: String[] servers = { "192.168.0.106:11211", "192.168.0.106:11212" }; SockIOPool pool =

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in

JDK 源码 阅读 - 2 - 设计模式 - 创建型模式

A.创建型模式 抽象工厂(Abstract Factory) javax.xml.parsers.DocumentBuilderFactory DocumentBuilderFactory通过FactoryFinder实例化具体的Factory. 使用例子: DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); DocumentBuilder docBuilder = docBuilder