编程不止是一份工作,还是一种乐趣!!!
Storm是一个简单的分布式流式计算系统,它提供了对数据流进行实时处理的能力。Storm之于流式处理就像Hadoop之于批处理计算一样,区别在于Hadoop Job最终会运行结束,而Storm会一直持续下去。在设计上Storm是非常简单的,搭建一个简单的入门Sample通常不会很难,但要深入掌握Storm的各个方面,还是需要下一翻功夫的。本文会比较长,我会由浅入深的介绍Storm的各个方面,耐心看完相信您一定会有所收获。
Topology
在Storm中,一个分布式计算结构被称为topology(拓扑),由stream(流)、spout(流生成器)和bolt(处理器)组成,如下图所示:
Stream与Tuple
Stream是一个数据流,流中的每个元素被称为一个Tuple。Tuple是Storm中的核心数据结构,是一个由一个或多个键值对组成的列表对象。
Spout
Spout代表了Storm Topology中的数据入口,充当一个数据采集器的角色,连接到数据源,读取并将数据转化为一个个的tuple发送给Bolt。
Bolt
在Storm Topology中像过滤、转换、运算等操作都是由Bolt来完成的,可以把Bolt看作一个数据处理器。Bolt从一个或多个Spout那接收数据流进行处理,然后选择性的输出出去,形成一个复杂的网络。
Storm集群中,Topology由四个部分组成:
Node(服务器)
配置在集群中的服务器,执行一个topology的一部分运算。一个集群包含多个Nodes。
Worker Process
一个Worker是指服务器中一个独立运行的JVM虚拟机进程,每个服务器中可以启动多个Worker,这取决于服务器的配置的负载情况。
Executor
Executor是Worker中执行Task的线程。同一个Spout或Bolt的多个Task实例可以指派给一个Executor来执行,默认情况下,Storm会给每个Executor分配一个Task。
Task
Task是Spout和Bolt的实例。运行时Task的数量是不可变的,但Executor的数量可以动态调整,这是Storm提供的灵活性之一。
假设我们有一个Topology,由一个Spout和两个Bolt组成,部署在一个Worker上,这时Topology的结构如下图所示:
默认情况下,Storm为每一个组件设置的并行度是1,在上面的Topology中,每个Spout和Bolt都只是分配了一个Executor和一个Task。当负载上来后,我们没有办法通过增加Executor或服务器的方式来提高这个Topology的吞吐,因为每个组件的Task实例数是不可变的。我们可以在定义Topology的时候,预先设置好各个组件的并行度(即多少个Executor来执行该组件)和Task实例数量。
例如把前面的Topology做一个调整,Spout的Task实例数设为2,red Bolt的Task实例数设为4,green Bolt的实例数设为6,Executor的配置不动,那Topology的结构变为下图所示:
因为Executor的数量未作调整,所以这个Topology的吞吐与前面的那个差别不大,甚至会有小幅下降。但是这个Topology为后续的伸缩提供了更好的支持。Storm支持在不重启Topology的情况下,动态的改变(增减)Worker的数量和Executor的数量,这个过程称为Rebalance。通过Storm web UI,或者CLI tool storm rebalance命令我们可以实现Rebalance。
$ storm rebalance topology -n 2 -e spout=2 -e red-bolt=2 -e green-bolt=6
# -n 调整Topology worker数量
# -e 调整Spout或Bolt的Executor数量,最大不能超过Task实例数
例如我们可以通过上面的命令,把Topology的Worker数量改为2个,Spout和red Bolt的Executor改为2个,green Bolt的Executor改为6个。修改之后,Topology的结构所下所示:
Topology中Executor的总量是由各个组件的并行度之和决定的,再除以Worker的数量,就是每个Worker中Executor的数量了,Storm的并发机制是非常简单的。有一点需要单独指出,在本地模式下增加Worker的数量不会达至预期的效果,因为本地模式本质上是同一个JVM。
现在我们了解了Storm的并发机制,你可能会有疑问,一个Bolt可能有多个Task实例,当一个tuple从Spout中发送出去时,会发送给哪个Task实例呢?这个问题就涉及到Storm的数据流分组了,Storm定义了几种分组的方式:
Shuffle grouping(随机)
这种方式会随机分发tuple给Bolt的各个Task实例,每个实例接收到相同数量的tuple。
Fields grouping
根据指定字段的值进行分组。
All grouping
每个tuple都复制分发给所有的Bolt Task实例。
Global grouping
这种分组方式将所有的tuple都路由到唯一一个Task实例上。
None grouping
在功能上与Shuffle grouping一样,为将来扩展预留的。
Direct grouping
这是一种特殊的路由方式,由发送者(emitDirect
方法)来决定tuple由哪个Task实例来接收。
Local or shuffle grouping
与Shuffle grouping类似,但是会优先选择同一个Worker内的Task实例。由于减少了网络传输,性能上更好。
单词计数的例子在很多介绍Storm的文章里都是作为Tutorial,因为它够简单,而且非常适合Storm的入门。这里我们使用同样的例子,但我会介绍每个组件实现时的细节。
单词计数Topology包含了一个Spout与三个Bolt组件,如:
例子比较简单,Sentence Spout的作用是随机发送一些英文句子;Split Bolt负责将句子拆分成单词;Count Bolt负责统计每个单词的出现次数;最后由Report Bolt将结果输出。
实现Sentence Spout
首先,Storm要求所有的组件(Spout与Bolt)都实现IComponent
接口,该接口定义了两个方法。其中,declareOutputFields
方法用于告诉Storm该组件会发送哪些数据流,以及流中的tuple包含哪些字段Fields;getComponentConfiguration
方法用于覆盖、指定一些组件级别的配置,IComponent
接口的源码如下:
public interface IComponent extends Serializable {
/**
* Declare the output schema for all the streams of this topology.
*
* @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
/**
* Declare configuration specific to this component. Only a subset of the "topology.*" configs can
* be overridden. The component configuration can be further overridden when constructing the
* topology using {@link TopologyBuilder}
*
*/
Map<String, Object> getComponentConfiguration();
}
其次,Storm要求所有的Spout组件实现ISpout
接口。ISpout
接口定义的方法相对要多一些,大部分时候我们不需要全部实现,为此Storm API提供了一些辅助类,降低Storm的开发工作量,相关的类图如下:
BaseComponent
类与BaseRichSpout
类,提供了大部分方法的默认实现,所以我们在开发Spout组件的时候,只需要承继BaseRichSpout
类即可。剩下的工作就是实现declareOutputFields
、open
和nextTuple
三个方法了。declareOutputFields
方法前面我们提到过,它的作用是告诉Storm该Spout组件会发送哪些数据流,以及流中的tuple包含哪些字段Fields。该示例中,SentenceBolt发送的tuple中只有一个sentence字段,它的实现如下:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
open
方法在该组件的Task实例初始化的时候被调用,它提供了三个参数:
Map类型的conf
关于该Spout组件的Storm配置信息。
TopologyContext
关于该Spout组件当前Task实例的一些信息,比如task id。
SpoutOutputCollector
SpoutOutputCollector
对象用于发送tuple,它是线程安全的。通常会把该参数作为Spout的实例属性保存下来。
由于我们会在nextTuple
方法中使用SpoutOutputCollector
对象发送tuple,所以在open
方法中我们需要把它保存起来:
private SpoutOutputCollector collector;
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
nextTuple
方法是Spout组件最关键的方法之一,Storm通过调用这个方法来发送tuple。值得注意的是,nextTuple
方法的内部实现最好不要有阻塞式的操作,因为Storm是在一个专门的线程中不断循环调用该方法的。如果没有数据需要发送,可以睡眠几毫秒再返回,防止空转消耗过多的CPU资源。本例中我们随机发送一些英文句子,nextTuple
方法实现如下:
private int index = 0;
private String[] sentences = {
"how are you", "nice to meet you", "what a good day"
};
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index == sentences.length)
index = 0;
Utils.sleep(5);
}
至此,我们完成了SentenceSpout的实现,关于ISpout
接口中的其它方法,我们后面再详细介绍。SentenceSpout完整代码如下:
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private int index = 0;
private String[] sentences = {
"how are you", "nice to meet you", "what a good day"
};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if (index == sentences.length)
index = 0;
Utils.sleep(5);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
实现Split Bolt
实现Bolt与Spout类似,Storm要求所有的Bolt都实现IBolt接口,该接口定义了三个方法。prepare
方法与ISpout.open
方法的作用类似,在Bolt Task实例初始化的时候会被调用;Bolt中最重要的方法就是execute
,对于每一个接收到的tuple,Bolt会调用一次该方法,它是Bolt实现主要逻辑的地方。如果Bolt还需要发送数据流的话,一般也是在该方法中实现的;cleanup
方法一般用在本地模式,集群环境下Storm并不保证该方法一定会被执行。
Storm API同样提供了一些辅助类,降低Bolt的开发工作量,相关的类图如下:
从上面的类图结构看,Split Bolt需要实现declareOutputFields
、prepare
与execute
三个方法。declareOutputFields
与prepare
方法的作用与Spout类似,我们不再重复介绍;Split Bolt的主要职责是接收Sentence Spout发出的句子流,拆分成一个个的单词并发送给后续的Bolt处理,这些逻辑都应该实现在execute
方法中:
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public void execute(Tuple input) {
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
}
从SplitBolt
的实现我们可以发现,一个Tuple经过一个Bolt处理后,变成了多个Tuple发送出去,数量发生了变化。
实现Count Bolt
掌握了Split Bolt的开发,后续的其它Bolt也是类似的。Count Bolt的职责是从Split Bolt接收单词流统计出现次数并持久化,这里我们简单些保存在JVM内存中,但生产环境中我们不应该这么做,集群环境下任意一个节点都是不可靠的,Storm会进行故障监控和转移,所以Spout和Bolt都应该是无状态化的。CountBolt
的实现如下:
public class CountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap<String, Long> counts;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
counts = new HashMap<>();
}
public void execute(Tuple input) {
String word = input.getStringByField("word");
Long count = counts.get(word);
if (null == count)
count = 0l;
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
counts
是在prepqre
方法中实例化的,这么做是由topology的部署方式决定的。当topology发布时,所有的Bolt和Spout组件会首先进行序列化,然后通过网络发送到集群中。如果Spout或Bolt组件在序列化之前初始化了任何无法序列化的对象,在进行序列化时就会抛出NotSerializableException
异常,topology的部署就会失败。当然,Count Bolt中的counts是可序列化的,所以不放在prepare
方法中初始化也没问题。
实现Report Bolt
Report Bolt在本例子只是简单的每个单词的出现次数,没有什么特别的,这里不作过多讲解了。
public class ReportBolt extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void execute(Tuple input) {
String word = input.getString(0);
Long count = input.getLong(1);
System.out.println("Word: " + word + ", Count: " + count);
}
}
在本地执行Counting Topology
现在我们已经实现了全部的组件:Spout与Bolt,现在来看看如何在本地模式下运行这个Topology:
public class CountingTopology {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Sentence Spout", new SentenceSpout(), 1);
builder.setBolt("Split Bolt", new SplitBolt(), 1).shuffleGrouping("Sentence Spout");
builder.setBolt("Count Bolt", new CountBolt(), 1).fieldsGrouping("Split Bolt", new Fields("word"));
builder.setBolt("Report Bolt", new ReportBolt(), 1).globalGrouping("Count Bolt");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("CountingTopology", conf, builder.createTopology());
Utils.sleep(10000); //sleep 10秒后kill掉
cluster.killTopology("CountingTopology");
cluster.shutdown();
}
}
TopologyBuilder
提供了流式接口的API来宝义topology组件之间的数据流。setSpout
方法注册了一个Spout,赋于组件id为”Sentence Spout”,并指定其并行度为1(即Executor的数量)。
setBolt
方法注册”Split Bolt”组件时,通过shuffleGrouping
指示Sentence Spout以随机的方式向该Bolt组件发送数据流。注意,在注册”Count Bolt”时,fieldsGrouping
指示”Split Bolt”向”Count Bolt”发送数据流时,根据word字段的值来分组路由,对于本例来说这是必要的,因为Count Bolt可能会有多个Task实例。
后面的代码都比较简单了,LocalCluster
用于在本地模式下执行一个topology,submitTopology
方法在本地启动和执行topology,10秒后结束。这里单独说明一下,执行过程中会有大量日志输出,大量日志会影响我们观察程序结果,可以在classpath下创建一个log4j2.xml文件(Storm使用了log4j 2)关掉日志输出。
<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="60">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n" />
</Console>
</Appenders>
<Loggers>
<Logger name="org.apache.zookeeper" level="WARN" />
<Root level="warn">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</configuration>
对于前面的示例来说,可靠性显得不是那么重要,但如果是生产环境中的关键数据,某个Bolt处理失败了怎么办?Storm提供了At least once的可靠性保证(事实上Storm现在也做到了exactly once的保证,后面我们会介绍)。在Storm中,可靠性的处理机制要从Spout说起,Spout需要记录它发送出去的tuple,当下游的Bolt处理tuple或者子tuple失败时,Spout才能重新发送这些失败的原始tuple。
在有保障的数据处理过程中,Bolt每接收和处理一个tuple都需要向上游组件应答(ack或者fail)。如果一个tuple在整个tuple树中都被成功处理(即相关的Bolt都进行了ack应答),Storm会调用Spout的ack
方法告之;如果tuple在tuple树中某个环节处理失败(某个Bolt进行了fail应答)或者tuple处理超时,Strom会调用Spout的fail
方法重新发送该tuple。Storm并没有提供像最多重试N次尝试这样的配置,如果某个tuple天生就会处理失败的话,会导致一个死循环,开发的时候我们应该避免这样的情况发生。
还记得ISpout
接口中的方法吗?
ack
与fail
两个方法就是跟可靠性相关的,它们都有一个Object
类型的参数:msgId,Storm通过这个id来告之Spout哪个tuple处理成功或者失败,这说明每个从Spout发出去的tuple都应该有一个与之对应的id。SpoutOutputCollector
类有重载版本的omit
方法可以在发送tuple的时候指定id:
this.collector.emit(new Values(sentences[index]), msgId);
当然,Spout除了在发送tuple的时候指定id外,还需要记录下每个id和tuple的对应关系,这样才能在应答之后做出响应,下面是增加了可靠机制后Sentence Spout的实现:
public class SentenceSpout extends BaseRichSpout {
private static Long count = 1l;
private Map<Long, Values> pending;
private SpoutOutputCollector collector;
private int index = 0;
private String[] sentences = {
"how are you", "nice to meet you", "what a good day"
};
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
pending = new HashMap<>();
}
public void nextTuple() {
Long msgId = count;
Values value = new Values(sentences[index]);
pending.put(msgId, value);
this.collector.emit(value, msgId);
count++;
index++;
if (index == sentences.length)
index = 0;
Utils.sleep(5);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void ack(Object msgId) {
this.pending.remove(msgId);
}
public void fail(Object msgId) {
this.collector.emit(this.pending.get(msgId), msgId);
}
}
在nextTuple
方法中我们为每个tuple设置了一个唯一的id,并将id与tuple的关系保存在一个map中。这么做其实不好,因为它会导致Spout状态化,作为示例讲解问题不大。在ack
和fail
方法中分别实现了Spout的可靠性保证,代码很容易理解,不做过多解释。
Bolt要实现可靠的处理机制需要做好两件事情:
execute
方法中处理tuple之后需要应答。当Bolt成功处理一个tuple后,需要使用OutputCollector
类的ack
方法进行应答:
this.collector.ack(input);
当Bolt处理tuple出现异常时,需要使用OutputCollector
的fail
方法进行报错,要求Spout重新发送tuple:
this.collector.fail(input);
当Bolt发送tuple时,必须锚定子tuple和原tuple的关系,这样下游的Bolt在应答或者报错时,Storm才能跟踪到原始的tuple并通知Spout进行相应的处理。可以使用OutputCollector
类重载版本的omit
方法来锚定子tuple和原tuple。下面是增加了可靠机制后SplitBolt
类中execute
方法的实现:
@Override
public void execute(Tuple input) {
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(input, new Values(word));
}
this.collector.ack(input);
}
其中input是原始tuple,new Values(word)是由原始tuple衍生出来的子tuple。Storm的可靠性机制提供At least once的保证,比如我们示例中Count Bolt对某个tuple的处理失败了,Spout会重新发送该tuple,意味着同一个tuple会被Split Bolt处理两次。后续文章我们会介绍Storm如何实现Exactly once的可靠保证。