Hope is a good thing, and maybe the best thing of all

编程不止是一份工作,还是一种乐趣!!!

Storm深入浅出之入门篇

Storm是一个简单的分布式流式计算系统,它提供了对数据流进行实时处理的能力。Storm之于流式处理就像Hadoop之于批处理计算一样,区别在于Hadoop Job最终会运行结束,而Storm会一直持续下去。在设计上Storm是非常简单的,搭建一个简单的入门Sample通常不会很难,但要深入掌握Storm的各个方面,还是需要下一翻功夫的。本文会比较长,我会由浅入深的介绍Storm的各个方面,耐心看完相信您一定会有所收获。

基本概念


Topology

在Storm中,一个分布式计算结构被称为topology(拓扑),由stream(流)、spout(流生成器)和bolt(处理器)组成,如下图所示:

storm topology

Stream与Tuple

Stream是一个数据流,流中的每个元素被称为一个Tuple。Tuple是Storm中的核心数据结构,是一个由一个或多个键值对组成的列表对象。


Spout

Spout代表了Storm Topology中的数据入口,充当一个数据采集器的角色,连接到数据源,读取并将数据转化为一个个的tuple发送给Bolt。


Bolt

在Storm Topology中像过滤、转换、运算等操作都是由Bolt来完成的,可以把Bolt看作一个数据处理器。Bolt从一个或多个Spout那接收数据流进行处理,然后选择性的输出出去,形成一个复杂的网络。


Storm的并发机制


Storm集群中,Topology由四个部分组成:

  • Node(服务器)

    配置在集群中的服务器,执行一个topology的一部分运算。一个集群包含多个Nodes。

  • Worker Process

    一个Worker是指服务器中一个独立运行的JVM虚拟机进程,每个服务器中可以启动多个Worker,这取决于服务器的配置的负载情况。

  • Executor

    Executor是Worker中执行Task的线程。同一个Spout或Bolt的多个Task实例可以指派给一个Executor来执行,默认情况下,Storm会给每个Executor分配一个Task。

  • Task

    Task是Spout和Bolt的实例。运行时Task的数量是不可变的,但Executor的数量可以动态调整,这是Storm提供的灵活性之一。


假设我们有一个Topology,由一个Spout和两个Bolt组成,部署在一个Worker上,这时Topology的结构如下图所示:

topology structure

默认情况下,Storm为每一个组件设置的并行度是1,在上面的Topology中,每个Spout和Bolt都只是分配了一个Executor和一个Task。当负载上来后,我们没有办法通过增加Executor或服务器的方式来提高这个Topology的吞吐,因为每个组件的Task实例数是不可变的。我们可以在定义Topology的时候,预先设置好各个组件的并行度(即多少个Executor来执行该组件)和Task实例数量。

例如把前面的Topology做一个调整,Spout的Task实例数设为2,red Bolt的Task实例数设为4,green Bolt的实例数设为6,Executor的配置不动,那Topology的结构变为下图所示:

topology structure multi tasks

因为Executor的数量未作调整,所以这个Topology的吞吐与前面的那个差别不大,甚至会有小幅下降。但是这个Topology为后续的伸缩提供了更好的支持。Storm支持在不重启Topology的情况下,动态的改变(增减)Worker的数量和Executor的数量,这个过程称为Rebalance。通过Storm web UI,或者CLI tool storm rebalance命令我们可以实现Rebalance。

$ storm rebalance topology -n 2 -e spout=2 -e red-bolt=2 -e green-bolt=6
# -n 调整Topology worker数量
# -e 调整Spout或Bolt的Executor数量,最大不能超过Task实例数

例如我们可以通过上面的命令,把Topology的Worker数量改为2个,Spout和red Bolt的Executor改为2个,green Bolt的Executor改为6个。修改之后,Topology的结构所下所示:

topology structure multi workers

Topology中Executor的总量是由各个组件的并行度之和决定的,再除以Worker的数量,就是每个Worker中Executor的数量了,Storm的并发机制是非常简单的。有一点需要单独指出,在本地模式下增加Worker的数量不会达至预期的效果,因为本地模式本质上是同一个JVM。


数据流分组


现在我们了解了Storm的并发机制,你可能会有疑问,一个Bolt可能有多个Task实例,当一个tuple从Spout中发送出去时,会发送给哪个Task实例呢?这个问题就涉及到Storm的数据流分组了,Storm定义了几种分组的方式:

  • Shuffle grouping(随机)

    这种方式会随机分发tuple给Bolt的各个Task实例,每个实例接收到相同数量的tuple。

  • Fields grouping

    根据指定字段的值进行分组。

  • All grouping

    每个tuple都复制分发给所有的Bolt Task实例。

  • Global grouping

    这种分组方式将所有的tuple都路由到唯一一个Task实例上。

  • None grouping

    在功能上与Shuffle grouping一样,为将来扩展预留的。

  • Direct grouping

    这是一种特殊的路由方式,由发送者(emitDirect方法)来决定tuple由哪个Task实例来接收。

  • Local or shuffle grouping

    与Shuffle grouping类似,但是会优先选择同一个Worker内的Task实例。由于减少了网络传输,性能上更好。


一个示例


单词计数的例子在很多介绍Storm的文章里都是作为Tutorial,因为它够简单,而且非常适合Storm的入门。这里我们使用同样的例子,但我会介绍每个组件实现时的细节。

单词计数Topology包含了一个Spout与三个Bolt组件,如:

sample topology

例子比较简单,Sentence Spout的作用是随机发送一些英文句子;Split Bolt负责将句子拆分成单词;Count Bolt负责统计每个单词的出现次数;最后由Report Bolt将结果输出。


实现Sentence Spout

首先,Storm要求所有的组件(Spout与Bolt)都实现IComponent接口,该接口定义了两个方法。其中,declareOutputFields方法用于告诉Storm该组件会发送哪些数据流,以及流中的tuple包含哪些字段Fields;getComponentConfiguration方法用于覆盖、指定一些组件级别的配置,IComponent接口的源码如下:

public interface IComponent extends Serializable {
    /**
     * Declare the output schema for all the streams of this topology.
     *
     * @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
     */
    void declareOutputFields(OutputFieldsDeclarer declarer);

    /**
     * Declare configuration specific to this component. Only a subset of the "topology.*" configs can
     * be overridden. The component configuration can be further overridden when constructing the
     * topology using {@link TopologyBuilder}
     *
     */
    Map<String, Object> getComponentConfiguration();
}


其次,Storm要求所有的Spout组件实现ISpout接口。ISpout接口定义的方法相对要多一些,大部分时候我们不需要全部实现,为此Storm API提供了一些辅助类,降低Storm的开发工作量,相关的类图如下:

spout类图

BaseComponent类与BaseRichSpout类,提供了大部分方法的默认实现,所以我们在开发Spout组件的时候,只需要承继BaseRichSpout类即可。剩下的工作就是实现declareOutputFieldsopennextTuple三个方法了。declareOutputFields方法前面我们提到过,它的作用是告诉Storm该Spout组件会发送哪些数据流,以及流中的tuple包含哪些字段Fields。该示例中,SentenceBolt发送的tuple中只有一个sentence字段,它的实现如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("sentence"));
}


open方法在该组件的Task实例初始化的时候被调用,它提供了三个参数:

  • Map类型的conf

    关于该Spout组件的Storm配置信息。

  • TopologyContext

    关于该Spout组件当前Task实例的一些信息,比如task id。

  • SpoutOutputCollector

    SpoutOutputCollector对象用于发送tuple,它是线程安全的。通常会把该参数作为Spout的实例属性保存下来。

由于我们会在nextTuple方法中使用SpoutOutputCollector对象发送tuple,所以在open方法中我们需要把它保存起来:

private SpoutOutputCollector collector;

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.collector = collector;
}


nextTuple方法是Spout组件最关键的方法之一,Storm通过调用这个方法来发送tuple。值得注意的是,nextTuple方法的内部实现最好不要有阻塞式的操作,因为Storm是在一个专门的线程中不断循环调用该方法的。如果没有数据需要发送,可以睡眠几毫秒再返回,防止空转消耗过多的CPU资源。本例中我们随机发送一些英文句子,nextTuple方法实现如下:

private int index = 0;
private String[] sentences = {
    "how are you", "nice to meet you", "what a good day"
};

public void nextTuple() {
    this.collector.emit(new Values(sentences[index]));
    index++;
    if (index == sentences.length)
        index = 0;
    Utils.sleep(5);
}


至此,我们完成了SentenceSpout的实现,关于ISpout接口中的其它方法,我们后面再详细介绍。SentenceSpout完整代码如下:

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private int index = 0;
    private String[] sentences = {
        "how are you", "nice to meet you", "what a good day"
    };

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index == sentences.length)
            index = 0;
        Utils.sleep(5);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}


实现Split Bolt

实现Bolt与Spout类似,Storm要求所有的Bolt都实现IBolt接口,该接口定义了三个方法。prepare方法与ISpout.open方法的作用类似,在Bolt Task实例初始化的时候会被调用;Bolt中最重要的方法就是execute,对于每一个接收到的tuple,Bolt会调用一次该方法,它是Bolt实现主要逻辑的地方。如果Bolt还需要发送数据流的话,一般也是在该方法中实现的;cleanup方法一般用在本地模式,集群环境下Storm并不保证该方法一定会被执行。

Storm API同样提供了一些辅助类,降低Bolt的开发工作量,相关的类图如下:

bolt类图

从上面的类图结构看,Split Bolt需要实现declareOutputFieldsprepareexecute三个方法。declareOutputFieldsprepare方法的作用与Spout类似,我们不再重复介绍;Split Bolt的主要职责是接收Sentence Spout发出的句子流,拆分成一个个的单词并发送给后续的Bolt处理,这些逻辑都应该实现在execute方法中:

public class SplitBolt extends BaseRichBolt {
    private OutputCollector collector;
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public void execute(Tuple input) {
        String sentence = input.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }
}

SplitBolt的实现我们可以发现,一个Tuple经过一个Bolt处理后,变成了多个Tuple发送出去,数量发生了变化。


实现Count Bolt

掌握了Split Bolt的开发,后续的其它Bolt也是类似的。Count Bolt的职责是从Split Bolt接收单词流统计出现次数并持久化,这里我们简单些保存在JVM内存中,但生产环境中我们不应该这么做,集群环境下任意一个节点都是不可靠的,Storm会进行故障监控和转移,所以Spout和Bolt都应该是无状态化的。CountBolt的实现如下:

public class CountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts;

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        counts = new HashMap<>();
    }

    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Long count = counts.get(word);
        if (null == count)
            count = 0l;
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

counts是在prepqre方法中实例化的,这么做是由topology的部署方式决定的。当topology发布时,所有的Bolt和Spout组件会首先进行序列化,然后通过网络发送到集群中。如果Spout或Bolt组件在序列化之前初始化了任何无法序列化的对象,在进行序列化时就会抛出NotSerializableException异常,topology的部署就会失败。当然,Count Bolt中的counts是可序列化的,所以不放在prepare方法中初始化也没问题。


实现Report Bolt

Report Bolt在本例子只是简单的每个单词的出现次数,没有什么特别的,这里不作过多讲解了。

public class ReportBolt extends BaseRichBolt {
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    @Override
    public void execute(Tuple input) {
        String word = input.getString(0);
        Long count  = input.getLong(1);
        System.out.println("Word: " + word + ", Count: " + count);
    }
}


在本地执行Counting Topology

现在我们已经实现了全部的组件:Spout与Bolt,现在来看看如何在本地模式下运行这个Topology:

public class CountingTopology {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Sentence Spout", new SentenceSpout(), 1);
        builder.setBolt("Split Bolt", new SplitBolt(), 1).shuffleGrouping("Sentence Spout");
        builder.setBolt("Count Bolt", new CountBolt(), 1).fieldsGrouping("Split Bolt", new Fields("word"));
        builder.setBolt("Report Bolt", new ReportBolt(), 1).globalGrouping("Count Bolt");

        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("CountingTopology", conf, builder.createTopology());

        Utils.sleep(10000); //sleep 10秒后kill掉
        cluster.killTopology("CountingTopology");
        cluster.shutdown();
    }
}

TopologyBuilder提供了流式接口的API来宝义topology组件之间的数据流。setSpout方法注册了一个Spout,赋于组件id为”Sentence Spout”,并指定其并行度为1(即Executor的数量)。

setBolt方法注册”Split Bolt”组件时,通过shuffleGrouping指示Sentence Spout以随机的方式向该Bolt组件发送数据流。注意,在注册”Count Bolt”时,fieldsGrouping指示”Split Bolt”向”Count Bolt”发送数据流时,根据word字段的值来分组路由,对于本例来说这是必要的,因为Count Bolt可能会有多个Task实例。

后面的代码都比较简单了,LocalCluster用于在本地模式下执行一个topology,submitTopology方法在本地启动和执行topology,10秒后结束。这里单独说明一下,执行过程中会有大量日志输出,大量日志会影响我们观察程序结果,可以在classpath下创建一个log4j2.xml文件(Storm使用了log4j 2)关掉日志输出。

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="60">
  <Appenders>
    <Console name="Console" target="SYSTEM_OUT">
      <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n" />
    </Console>
  </Appenders>
  <Loggers>
    <Logger name="org.apache.zookeeper" level="WARN" />
    <Root level="warn">
      <AppenderRef ref="Console" />
    </Root>
  </Loggers>
</configuration>


Storm的可靠性


对于前面的示例来说,可靠性显得不是那么重要,但如果是生产环境中的关键数据,某个Bolt处理失败了怎么办?Storm提供了At least once的可靠性保证(事实上Storm现在也做到了exactly once的保证,后面我们会介绍)。在Storm中,可靠性的处理机制要从Spout说起,Spout需要记录它发送出去的tuple,当下游的Bolt处理tuple或者子tuple失败时,Spout才能重新发送这些失败的原始tuple。

在有保障的数据处理过程中,Bolt每接收和处理一个tuple都需要向上游组件应答(ack或者fail)。如果一个tuple在整个tuple树中都被成功处理(即相关的Bolt都进行了ack应答),Storm会调用Spout的ack方法告之;如果tuple在tuple树中某个环节处理失败(某个Bolt进行了fail应答)或者tuple处理超时,Strom会调用Spout的fail方法重新发送该tuple。Storm并没有提供像最多重试N次尝试这样的配置,如果某个tuple天生就会处理失败的话,会导致一个死循环,开发的时候我们应该避免这样的情况发生。

还记得ISpout接口中的方法吗?

spout类图

ackfail两个方法就是跟可靠性相关的,它们都有一个Object类型的参数:msgId,Storm通过这个id来告之Spout哪个tuple处理成功或者失败,这说明每个从Spout发出去的tuple都应该有一个与之对应的id。SpoutOutputCollector类有重载版本的omit方法可以在发送tuple的时候指定id:

this.collector.emit(new Values(sentences[index]), msgId);


当然,Spout除了在发送tuple的时候指定id外,还需要记录下每个id和tuple的对应关系,这样才能在应答之后做出响应,下面是增加了可靠机制后Sentence Spout的实现:

public class SentenceSpout extends BaseRichSpout {
    private static Long count = 1l;
    private Map<Long, Values> pending;
    private SpoutOutputCollector collector;
    private int index = 0;
    private String[] sentences = {
        "how are you", "nice to meet you", "what a good day"
    };

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        pending = new HashMap<>();
    }

    public void nextTuple() {
        Long msgId = count;
        Values value = new Values(sentences[index]);
        pending.put(msgId, value);
        this.collector.emit(value, msgId);
        count++;
        index++;
        if (index == sentences.length)
            index = 0;
        Utils.sleep(5);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }

    public void ack(Object msgId) {
        this.pending.remove(msgId);
    }

    public void fail(Object msgId) {
        this.collector.emit(this.pending.get(msgId), msgId);
    }
}

nextTuple方法中我们为每个tuple设置了一个唯一的id,并将id与tuple的关系保存在一个map中。这么做其实不好,因为它会导致Spout状态化,作为示例讲解问题不大。在ackfail方法中分别实现了Spout的可靠性保证,代码很容易理解,不做过多解释。


Bolt要实现可靠的处理机制需要做好两件事情:

  1. execute方法中处理tuple之后需要应答。
  2. 如果Bolt需要向后续的组件发送tuple,需要锚定子tuple和原tuple的关系。

当Bolt成功处理一个tuple后,需要使用OutputCollector类的ack方法进行应答:

this.collector.ack(input);

当Bolt处理tuple出现异常时,需要使用OutputCollectorfail方法进行报错,要求Spout重新发送tuple:

this.collector.fail(input);

当Bolt发送tuple时,必须锚定子tuple和原tuple的关系,这样下游的Bolt在应答或者报错时,Storm才能跟踪到原始的tuple并通知Spout进行相应的处理。可以使用OutputCollector类重载版本的omit方法来锚定子tuple和原tuple。下面是增加了可靠机制后SplitBolt类中execute方法的实现:

@Override
public void execute(Tuple input) {
    String sentence = input.getStringByField("sentence");
    String[] words = sentence.split(" ");
    for (String word : words) {
        collector.emit(input, new Values(word));
    }
    this.collector.ack(input);
}

其中input是原始tuple,new Values(word)是由原始tuple衍生出来的子tuple。Storm的可靠性机制提供At least once的保证,比如我们示例中Count Bolt对某个tuple的处理失败了,Spout会重新发送该tuple,意味着同一个tuple会被Split Bolt处理两次。后续文章我们会介绍Storm如何实现Exactly once的可靠保证。