Hope is a good thing, and maybe the best thing of all

编程不止是一份工作,还是一种乐趣!!!

Storm深入浅出之高级篇

前面一篇文章《Storm深入浅出之入门篇》我们介绍了Storm的基本概念、并发性。另外我们还实现了一个简单的单词计数的例子,虽然例子很简单,但还是需要为每一个组件(Spout与Bolt)都开发一个类。对于比较复杂的业务场景来说,Storm的开发工作量可能会变得很高,另外At least once的语义保证级别对于一些关键业务场景来说是不够的。为此Storm提供了更高级别的抽象:Trident,它实现了Exactly Once的语义保证级别,另外它还提供了流式风格的API,极大的简化了开发工作量。下面我们一步步解开它神秘的面纱。

Trident是Storm的一种高度抽象的实时计算模型,它可以将高吞吐量数据输入、状态化的流式处理与低延时的分布式查询无缝的结合起来。如果你了解Pig或者Cascading这样的高级批处理工具,你就会发现Trident和他们的概念非常相似。Trident同样也提供了像联结(join)、聚合(aggregation)、分组(grouping)、函数(function)以及过滤器(filter)这些流式风格的功能。此外,Trident在持久化存储之上抽象了状态(State)的概念,并提供了增量式处理的基础原语。


初识Trident


最直接的方式不外乎代码了,下面是用Trident重新实现单词计数的例子:

public static void main(String[] args) {
    @SuppressWarnings("unchecked")
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values("how are you"),
            new Values("nice to meet you"), new Values("what a good day"));

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1", spout)
            .each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
            .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
            .parallelismHint(1);

    LocalDRPC drpc = new LocalDRPC();
    topology.newDRPCStream("word", drpc).groupBy(new Fields("args"))
            .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"));

    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("drpc-demo", conf, topology.build());

    Utils.sleep(20000);
    System.out.println("DRPC RESULT: " + drpc.execute("word", "how"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "are"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "you"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "nice"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "to"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "meet"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "what"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "a"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "good"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "day"));

    cluster.shutdown();
    drpc.shutdown();
}


惊呀吗,这一小段代码实现的功能与前文的例子完全相同,包括SentenceSpoutSplitBoltCountBoltReportBolt。 首先,我们创建了一个Spout,与Storm一样,Trident也需要一个Spout作为数据流的产生源。不同的是这里的FixedBatchSpout实现的是IBatchSpout而不是ISpout接口,Trident对Spout的要求与传统的Spout有很大的不同,Trident Spout引入了批次(Batch)的概念,每个Batch包含若干个tuple,同时Trident会为每个Batch赋于一个标识txid。Batch引入的原因是为了配合State的更新,实现Exactly Once的语义保证的,后面会详细介绍。有了Spout对象,就可以通过TridentTopology.newStream方法创建一个流对象Stream,它是我们可以使用流式风格开发的关键所在。


Stream.each(new Fields("sentence"), new Split(), new Fields("word"))的作用是使用Split对流中的每个tuple进行处理,这里实际上是把sentence字段进行拆分,并把拆分的结果存放到word字段中。

public class Split extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(String word: tuple.getString(0).split(" ")) {
            if(word.length() > 0) {
                collector.emit(new Values(word));
            }
        }
    }
}

Split的源码来看,一个tuple经过Function处理过后,有可能产生多个tuple。比如我们例子中的第一个tuple是:

{"sentence":"how are you"}

经过Split处理后变成了三个tuple:

{"sentence":"how are you", "word":"how"}
{"sentence":"how are you", "word":"are"}
{"sentence":"how are you", "word":"you"}


groupBy(new Fields("word"))的作用是根据字段word的值对tuple进行分组,这意味着字段word相同的tuple都会被路由到相同的分区Partition。groupBy方法的返回的结果是一个GroupedStream对象,与Stream对象类似,但多出一个分组字段的概念。另外,将GroupedStream持久到State时,Storm要求我们提供一个MapState的实现,而Stream只需要提供一个State的实现。

persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")的作用是将每个单词的次数进行统计,然后持久化到MemoryMapState对象中。MemoryMapStateMapState的一个实现,它将数据保存到内存中。persistentAggregate方法的返回结果是一个TridentState对象,后面我们需要借助它来查询MemoryMapState中的内容。


DRPC是Trident提供的另一个特性,从客户端的角度来看它就像普通的RPC调用那样,你给出入参,它返回你结果。但在服务器端,Trident把你的请求转化成Steam,并以分布式的方式在Storm集群中进行并行处理。

topology.newDRPCStream("word", drpc).groupBy(new Fields("args"))
                .stateQuery(wordCounts, new Fields("args"), new MapGet(), new Fields("count"));


TridentTopology.newDRPCStream方法定义了一个DRPC方法:word,并且返回一个Stream对象,这个Stream中的tuple只包含一个字段args,字段的值是客户端的请求参数。例如:drpc.execute("word", "how"),表示客户端请求word这个DRPC方法,并传入参数值how,即args字段的值。

通过newDRPCStream方法得到Stream对象后,后续的处理与普通的Trident完全一样,这也使得我们可以使用一套通用的API来管理数据的加工处理和访问。stateQuery方法的作用是查询之前保存在MemoryMapState中的结果的,他接受一个TridentState对象,通过MapGet方法对每个tuple(在本例中只有一个tuple)进行查询,并将结果输出到count字段中,最后Trident会将count字段的值返回给客户端。

第一次接触Trident,或流式风格的编程,你可能会非常不习惯。但其实它真的不难,你熟悉后会发现它有很多的优点:开发效率的提升,代码可读性方面的提高等等。关于Trident的API,在Storm官网上已经有很好的文档了,这里我们不再介绍。使用Trident的另一个重要的原因是它支持Exactly Once的语义保证级别,这也是本文的重点,后面我会详细的进行讲解。


Exactly Once语义


状态(State)是Trident引入的一个新概念,通俗一点可以把State理解为一切可持久化设备的抽象,比如内存、数据库、Redis或者HDFS,Trident提供了对状态化数据源进行读取和写入操作的一级抽象封装工具。

要实现Exactly Once总是困难的,数据在处理过程中总是可能出错、或者超时,所以重试机制是无法避免的。问题在于重试过程中如何避免重复处理。例如前面单词计数的例子,失败重试的时候,我们没有办法知道之前的错误发生在什么位置,可能在State更新前,也可能在State更新后,所以在重试过程中我们无法确定是否需要继续更新State。仅仅在State中保留业务信息(次数)是不够的。


Trident使用乐观锁的原理,以一种容错的方式实现对State的管理,即使在发生操作失败或者重试的情况下状态的更新操作仍然是幂等的。为了实现Exactly Once,Trident制定了一系列的约定:

  1. 引入了批次(Batch),一个Batch包含若干个tuple。
  2. 为每个Batch提供一个唯一的txid。失败重试是基于Batch而不是tuple的,如果需要对Batch重新处理,这个Batch仍然会有相同的txid。
  3. State的更新操作是按照Batch的顺序进行的。也就是说在Batch 2完成处理之前,Batch 3的状态更新操作不会进行。

这些约定是实现Exactly Once的基础,此外,Trident定义了三种类型的Spout与State,分别是Transactional、Opaque Transactional与Non-Transactional。我们先来看看Transactional。


Transactional Spout

Transactional类型的Spout具有以下的约定:

  1. 每个Batch的txid永远不会改变。对于某个特定的txid,Batch在失败重试时的tuple集合和前一次完全相同。
  2. 不同Batch中的tuple不会出现重复的情况(某个tuple只会出现在一个Batch中,而不会同时出现在多个Batch中)。
  3. 每个tuple都会放入一个Batch中(不会遗漏任何的tuple)。


我们继续单次计数的例子,前面我们已经讲过了,仅仅在State中保留次数是不够的。我们应该将txid作为一种原子化的值与次数一起保存。随后,在更新次数的时候,就可以将State中的txid与当前Batch的txid进行比对。如果两者相同,说明当前的更新已经处理过,可以直接跳过——由于 Trident的强有序性处理机制,可以确定State中的值是对应于当前的Batch的。如果两者不同,可以放心地更新次数值。由于一个Batch的txid永远不会改变,而且Trident能够保证State的更新完全是按照Batch的顺序进行的,所以这样的处理逻辑是完全可行的。这种将txid与业务信息一起保存的State就是Transactional类型的State。


下面来看一个例子。假如你正在处理txid 3,其中包含有以下几个 tuple:

["man"]
["man"]
["dog"]

假如State中有数据如下:

man => [count=3, txid=1]
dog => [count=4, txid=3]
apple => [count=6, txid=2]

其中与man相关联的txid为1,由于当前处理的txid为3,可以确定当前处理的Batch与State中的值无关,可以放心地将man的次数值加上2并更新txid为3;另一方面,由于dog的txid与当前的txid相同,所以dog的次数是之前已经处理过的,现在不能再进行更新操作。这样,在结束txid3的更新操作之后,State中的结果就会变成这样:

man => [count=5, txid=3]
dog => [count=4, txid=3]
apple => [count=6, txid=2]


非常简单吧,你可能会问,Transactional类型的Spout与State够简单啊,已经实现了Exactly Once了,为什么还会有其它类型的Spout呢?原因之一是Transactional Spout的容错性并不是非常高。比如一个Transactional Spout从kafka消费数据,如果一个Batch失败的同时,kafka中某个partition(包括主分片与副分片)也故障了,那这个Spout就无法重新发送这个Batch,因为故障partition中的数据拿不到了。除此之外,这个Batch之后的其它Batch也不能得到处理,从而阻塞了整个Trident。正是为了解决此类问题的发生,Trident设计了另一种类型的Spout与State:Opaque Transactional。


Opaque Transactional

Opaque Transactional类型的Spout具有以下的约定:每个tuple会确保在某个Batch处理完成。在处理失败的时候,tuple可能继续在另一个Batch中完成处理,而不一定是在原先的Batch中完成处理。相比较于Transactional类型的Spout存在的问题来说,Opaque Transactional类型的Spout在失败重新发送Batch时需要kafka某个分区发生故障,那重新发布的Batch将不会包含故障分区的数据。稍后等故障分区恢复以后,分区内的数据会在另一个Batch中发送。

聪明的你也许已经发现,仅仅在State中保留txid已经行不通了,因为失败重试的情况下Batch的内容可能会发生变化。在这种情况下,我们需要在State中存储更多的信息。除了业务信息和txid之外,还需要存入业务信息的前一个结果值。我们再以上面的计数值的例子来分析以下这个问题。假如你的Batch的部分计数值是2,现在你需要应用一个更新操作。假定现在State中的值是这样的:

{ value = 4, prevValue = 1, txid = 2 }

假如当前处理的Batch的txid为3,与State中的txid不同,这时可以将prevValue的值设为value的值,再为value的值加上部分计数的结果并更新txid。执行完这一系列操作之后的State中的值就会变成这样:

{ value = 6, prevValue = 4, txid = 3 }

假如当前处理的Batch的txid与State中相同呢?此时意味着State中的更新操作是由上一个拥有相同txid的Batch做出的。不过那个Batch有可能与当前的Batch并不相同,所以你需要忽略它的操作。这个时候,你应该将prevValue加上batch中的部分计数值来计算新的value。在这个操作之后State中的值就会变成这样:

{ value = 3, prevValue = 1, txid = 2 }

这种方法之所以可行是因为Trident具有强顺序性处理的特性。一旦Trident开始处理一个新的Batch的状态更新操作,它永远不会回到过去的Batch的处理上。同时,Opaque Transactional类型的Spout保证Batch之间不会存在重复,每个tuple只会被某一个Batch完成处理,所以你可以放心地使用prevValue来更新value。这种将txid与业务信息当前值与前一次历史值都保存的State就是Opaque Transactional类型的State。


Non-Transactional

非事务型Spout不能提供任何的安全性保证。非事务型Spout有可能提供At most once的语义保证,在这种情况下Batch处理失败后Spout并不会重新处理;也有可能提供At least once的语义保证,在这种情况下可能会有多个Batch分别处理某个tuple。


下图说明了不同类型的Spout与State搭配时,哪些组合可以提供Exactly Once的语义保证:

Combination of spout & state

Opaque Transactional类型的State具有最好的容错性特征,不过这是以存储更多的内容为代价的(一个txid和两个value)。Transactional类型State要求的存储空间相对较小,但是它的缺点是只对Transactional类型的Spout有效,容错性有限。Non-Transactional类型的State要求的存储空间最少,但是它不能提供任何的Exactly Once的保证。

本文开篇那个单词计数的例子能提供Exactly Once的保证吗?答案是否定的。虽然MemoryMapState是Opaque Transactional类型的State,但是FixedBatchSpout是Non-Transactional类型的Spout,他们搭配起来并不能提供Exactly Once的语义保证。下面我们自己来实现Spout与State,深入Trident内部看看如何实现Exactly Once。


State的实现


Trident要求所有的State必须实现State接口:

public interface State {
    void beginCommit(Long txid);
    void commit(Long txid);
}


从接口的定义就能看出Trident对State怎么工作,使用什么样的方法执行更新操作,或者怎样从State中读取数据都没有做任何的约束,它只是通过两个方法告诉我们State的更新什么时候开始和结束。好吧,现在我们要自己实现一个State来保存我们单词的次数,这个State看上去应该是这样的:

public class WordCountDB implements State {
    private Map<String, Long> inner = new HashMap<>();

    @Override
    public void beginCommit(Long txid) {
        System.out.println("State update begin, for batch: " + txid);
    }

    @Override
    public void commit(Long txid) {
        System.out.println("State update committed, for batch: " + txid);
    }

    public void incrCount(String word, Long count) {
        Long prev = inner.get(word);
        if (null == prev) {
            inner.put(word, 1l);
        } else {
            inner.put(word, prev + count);
        }
    }

    public long getCount(String word) {
        return null == inner.get(word) ? 0 : inner.get(word);
    }
}


作为一个示例,我们简单的把信息保存到一个map对象中,在实际场景中你可以把在这里把信息持久化到数据库或Redis缓存中去。另外,你可能还需要考虑Storm的并行性,State的实现最好是非状态化的。

接着,我们还需要一个工厂类来创建我们的WordCountDB

public class WordCountDBFactory implements StateFactory {
    @Override
    public State makeState(@SuppressWarnings("rawtypes") Map conf, IMetricsContext metrics, int partitionIndex,
            int numPartitions) {
        return new WordCountDB();
    }
}


partitionIndex是当前Task实例的标识,numPartitions是当前组件的Task实例总数。对于一些复杂的场景来说,在创建State时这两个参数可能会有用。

因为Trident对State怎么工作没有做任何的约束,所以它并不知道如何使用我们的WordCountDB,哪个方法负责更新,哪个方法负责查询,参数怎么传等等,所以我们还需要告诉Trident怎么查询和更新我们的State。

对于查询来说,Trident提供了QueryFunction,它是一个模板接口,声明了两个方法:

public interface QueryFunction<S extends State, T> extends EachOperation {
    List<T> batchRetrieve(S state, List<TridentTuple> args);
    void execute(TridentTuple tuple, T result, TridentCollector collector);
}


其中参数S是State的实现类,即WordCountDB,而T表示我们从State查询出来的结果类,这里我们的结果是单词的出现次数,所以我们使用Long类型。在查询State的时候,对于流中的一批tuple,Trident会调用batchRetrieve方法查询得到这一批tuple对应的结果;然后为每个结果调用一次execute方法。

我们的WordCountDB的查询类看上去是这样的:

public class QueryWordCountDB extends BaseQueryFunction<WordCountDB, Long> {
    @Override
    public List<Long> batchRetrieve(WordCountDB state, List<TridentTuple> args) {
        List<Long> list = new LinkedList<>();
        for (TridentTuple tuple : args) {
            list.add(state.getCount(tuple.getString(0)));
        }
        return list;
    }

    @Override
    public void execute(TridentTuple tuple, Long result, TridentCollector collector) {
        collector.emit(new Values(result));
    }
}


你也许会对batchRetrieve方法的签名有些疑问,为什么入参args和返回结果都是集合?这么设计API的原因是为了追求更高的吞吐,在查询或更新State时,不论一个Batch包含多少个tuple,我们可以只访问State一次。

比如我们可以把WordCountDB中的incrCount方法与getCount方法稍作修改,使之支持批量操作:

public void incrCount(List<String> words, List<Long> counts) {
    int size = words.size();
    for (int i = 0; i < size; i++) {
        String word = words.get(i);
        Long count = counts.get(i);
        Long prev = inner.get(word);
        if (null == prev) {
            inner.put(word, 1l);
        } else {
            inner.put(word, prev + count);
        }
    }
}

public List<Long> getCount(List<String> words) {
    List<Long> list = new LinkedList<>();
    for (String word : words) {
        list.add(null == inner.get(word) ? 0 : inner.get(word));
    }
    return list;
}


现在我们可以优化QueryWordCountDB.batchRetrieve方法,只访问State一次了:

public List<Long> batchRetrieve(WordCountDB state, List<TridentTuple> args) {
    List<String> words = new LinkedList<>();
    for (TridentTuple tuple : args) {
        words.add(tuple.getString(0));
    }
    return state.getCount(words);
}


对于State的更新来说,Trident提供了StateUpdater接口,该接口比较简单,不就做过多介绍了,这里直接给出实现:

public interface StateUpdater<S extends State> extends Operation {
    void updateState(S state, List<TridentTuple> tuples, TridentCollector collector);
}

public class WordCountDBUpdater extends BaseStateUpdater<WordCountDB> {
    @Override
    public void updateState(WordCountDB state, List<TridentTuple> tuples, TridentCollector collector) {
        List<String> words = new LinkedList<>();
        List<Long> counts  = new LinkedList<>();
        for (TridentTuple tuple : tuples) {
            words.add(tuple.getString(0));
            counts.add(tuple.getLong(1));
        }
        state.incrCount(words, counts);
    }
}


可以看到updateState方法的签名设计也同样是为了支持批量操作的,这里对State的更新也是仅仅访问了一次。

现在我们可以使用WordCountDB来重写单词计数的示例了:

public static void main(String[] args) {
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values("how are you"),
            new Values("nice to meet you"), new Values("what a good day"));
    spout.setCycle(false);

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1", spout)
            .each(new Fields("sentence"), new Split(), new Fields("word"))
            .groupBy(new Fields("word"))
            .aggregate(new Count(), new Fields("count"))
            .toStream()
            .partitionPersist(new WordCountDBFactory(), new Fields("word", "count"), new WordCountDBUpdater())
            .parallelismHint(1);

    LocalDRPC drpc = new LocalDRPC();
    topology.newDRPCStream("word", drpc)
            .stateQuery(wordCounts, new Fields("args"), new QueryWordCountDB(), new Fields("count"));

    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("drpc-demo", conf, topology.build());

    Utils.sleep(10000);
    System.out.println("DRPC RESULT: " + drpc.execute("word", "how"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "are"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "you"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "nice"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "to"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "meet"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "what"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "a"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "good"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "day"));

    cluster.shutdown();
    drpc.shutdown();
}

我们已经完全实现了一个自定义的State,只不过它是Non-Transaction类型的,下面我们看看如何修改它,让他成为Transactional类型和Opaque Transactional类型的State。


Transaction State

前面我们已经了解到,Transactional类型的State需要将txid与业务信息一起保存,同时在更新的时候需要用当前Batch的txid与State中的txid作对比。为了方便我们实现Transactional类型的State,Trident提供了一个TransactionalValue类:

public class TransactionalValue<T> {
    T val;
    Long txid;

    public TransactionalValue(Long txid, T val) {
        this.val = val;
        this.txid = txid;
    }

    public T getVal() {
        return val;
    }

    public Long getTxid() {
        return txid;
    }

    @Override
    public String toString() {
        return ToStringBuilder.reflectionToString(this);
    }
}


借助于TransactionalValue类,实现Transactional版本的WordCountDB将变得非常容易:

public class TransactionalWordCountDB extends WordCountDB implements State {
    private Long currTxid;

    @Override
    public void beginCommit(Long txid) {
        currTxid = txid;
        System.out.println("State update begin, for batch: " + txid);
    }

    @Override
    public void commit(Long txid) {
        currTxid = null;
        System.out.println("State update committed, for batch: " + txid);
    }

    private Map<String, TransactionalValue<Long>> inner = new HashMap<>();
    public void incrCount(List<String> words, List<Long> counts) {
        int size = words.size();
        for (int i = 0; i < size; i++) {
            String word = words.get(i);
            Long count = counts.get(i);
            TransactionalValue<Long> transVal = inner.get(word);
            if (null == transVal) {
                inner.put(word, new TransactionalValue<>(currTxid.longValue(), count));
            } else if (currTxid > transVal.getTxid()) {
                Long prev = transVal.getVal();
                Long curr = prev + count;
                inner.put(word, new TransactionalValue<>(currTxid.longValue(), curr));
            } else if (currTxid == transVal.getTxid()) {
                System.out.println("Iginre...");
            } else if (currTxid < transVal.getTxid()) {
                throw new RuntimeException("impossible case: currTxid < transVal.getTxid()...");
            }
        }
    }

    public List<Long> getCount(List<String> words) {
        List<Long> list = new LinkedList<>();
        for (String word : words) {
            TransactionalValue<Long> transVal = inner.get(word);
            if (null == transVal) {
                list.add(Long.valueOf(0l));
            }
            list.add(transVal.getVal());
        }
        return list;
    }
}


首先,beginCommit方法中我们需要记下当前Banch的txid;其次,inner的value类型也从Long改为了TransactinoalValue<Long>;最后是incrCount方法中增加了Transational State的处理逻辑,比较简单,我想你一定能看明白!!!

另外你可能会注意到TransactionalWordCountDB类继承了WordCountDB,原因是这样可以复用之前的QueryWordCountDBWordCountDBUpdater


Opaque Transactional State

Opaque Transactional类型的State需要将txid与业务信息的当前值和前一次的历史值一起保存。Trident同样提供了OpaqueValue类方便我们实现Opaque Transactional类型的State。

public class OpaqueWordCountDB extends WordCountDB implements State {

    private Long currTxid;

    @Override
    public void beginCommit(Long txid) {
        currTxid = txid;
        System.out.println("State update begin, for batch: " + txid);
    }

    @Override
    public void commit(Long txid) {
        currTxid = null;
        System.out.println("State update committed, for batch: " + txid);
    }

    private Map<String, OpaqueValue<Long>> inner = new HashMap<>();
    public void incrCount(List<String> words, List<Long> counts) {
        int size = words.size();
        for (int i = 0; i < size; i++) {
            String word = words.get(i);
            Long count = counts.get(i);
            OpaqueValue<Long> transVal = inner.get(word);
            if (null == transVal) {
                inner.put(word, new OpaqueValue<>(currTxid.longValue(), count));
            } else if (currTxid > transVal.getCurrTxid()) {
                Long prev = transVal.getCurr();
                Long curr = prev + count;
                inner.put(word, new OpaqueValue<>(currTxid.longValue(), curr, prev));
            } else if (currTxid == transVal.getCurrTxid()) {
                Long prev = transVal.getPrev();
                Long curr = prev + count;
                inner.put(word, new OpaqueValue<>(currTxid.longValue(), curr, prev));
            } else if (currTxid < transVal.getCurrTxid()) {
                throw new RuntimeException("impossible case: currTxid < transVal.getCurrTxid()...");
            }
        }
    }

    public List<Long> getCount(List<String> words) {
        List<Long> list = new LinkedList<>();
        for (String word : words) {
            OpaqueValue<Long> transVal = inner.get(word);
            if (null == transVal) {
                list.add(Long.valueOf(0l));
            }
            list.add(transVal.getCurr());
        }
        return list;
    }
}


OpaqueWordCountDBTransactionalWordCountDB的不同点主要体现在incrCount方法的实现逻辑不同,前面已经讨论过Opaque Transaction的实现原理,这里就不重复介绍了。

至此,关于单词计数的各个类型的State我们都有了,但是要实现Exactly Once,我们还需要Transactional或Opaque Transactional类型的Spout。


Spout的实现


与Storm API一样,Spout也是Trident的数据来源。不过,为了实现更复杂的功能服务,Trident Spout在普通的Storm Spout之上另外提供了一些API接口。Trident中所有的Spout都必须有一个标识,并且这个标识必须在整个Storm集群中都是唯一的,因为Trident需要使用这个标识来将Spout中的元数据(metadata)保存在ZooKeeper中。


在Trident中可以使用普通的IRichSpout接口来创建数据流,但这是Non-Transactional类型的Spout。Trident专门提供了一套API来支持Trident Spout的开发:

trident spout

所有的Trident Spout都继承了ITridentDataSource接口:

  1. ITridentSpout:这是最通用的API,支持Transactional和Opaque Transactional的语义实现。不过一般会根据需要使用它的某个已有的实现,而不是直接实现该接口。
  2. IBatchSpout:Non-Transactinoal类型的Spout,每次会输出一个Batch的tuple。
  3. IPartitionedTridentSpout:可以从分布式数据源(比如一个集群或者Kafka服务器)读取数据的Transactional类型的Spout。
  4. OpaquePartitionedTridentSpout:可以从分布式数据源读取数据的Opaque Transactional类型的Spout。


我们之前在示例中使用的FixedBatchSpout实现了IBatchSpout接口,所以它是Non-Transactinoal类型的。接下来我们重点介绍一下ITridentSpout这个接口。我们先看看相关的类图:

ITridentSpout

ITridentSpout接口声明了4个方法,其中getComponentConfigurationgetOutputFields《Storm深入浅出之入门篇》中已经介绍过。这里我们重点介绍一下另外两个方法。

从类图可以看到ITridentSpout接口中并没有声明任何发送tuple的方法,而是把这项工作委托给了Emitter接口。当有新的Batch需要发送的时候,Trident会调用Emitter.emitBatch方法。

  1. 参数tx中提供了两个信息:一个是当前Batch的txid,另一个信息是attemptId。每个Batch第一次处理时attemptId的初始值是0,如果Batch处理过程中出错,重试时attemptId的值会递增。
  2. coordinatorMeta是最重要的一个参数,它不是Batch,也不是tuple,但是它包含了获取这些tuple所需要的元信息。
  3. collector没什么好说的,我们使用它来发送tuple。

emitBatch方法中最重要的逻辑就是根据coordinatorMeta参数中提供了元信息得到tuple,再通过collector把它们发送出去。现在我们明白了Emitter发送的原理,但是这个coordinatorMeta参数Trident是从哪里得到的呢?


这个要看BatchCoordinator接口了,Trident每次开始一个新的Batch时,会调用BatchCoordinator.initializeTransaction方法。这个方法第一个参数是当前Batch的txid,由Trident负责生成和维护。prevMetadata与currMetadata分别是生成前一个Batch的元信息和当前Batch对应的元信息。

这里有一点点绕,我们来看一个例子。在一个Spout发送第一个Batch的时候,Trident会调用initializeTransaction方法,这时候txid应该是1(第一个Batch嘛);prevMetadata是null(发送第一次Batch,所以不存在前一个Batch);currMetadata也是null(为什么是null后面会讲)。这时候initializeTransaction方法要做的事情就是为txid = 1这个Batch生成一个元信息对象,并且返回这个对象。

  • 如果这个txid = 1的Batch在随后的处理过程中失败了,Trident会要求Spout重新生成这个Batch。这时Trident会再次调用initializeTransaction方法,这时候txid还是1;prevMetadata还是null;但是currMetadata的值是前一次initializeTransaction方法的返回结果,所以通过判断currMetadata是否为空我们可以知道这个元信息对应的Batch是不是发生过异常,通常情况下我们可以直接返回它。

  • 如果这个txid = 1的Batch最终被成功的处理了,Trident会要求Spout开始生成新的Batch。这时Trident会调用initializeTransaction方法,这时候txid应该是2;prevMetadata的值就是前一个Batch的元信息(txid = 2的Batch的元信息);而currMetadata的值又重新变成了null。

明白了吗?Trident Spout的原理是通过BatchCoordinator获取和跟踪元信息,并把元信息交给Emitter;再由Emitter根据元信息得到最终的tuple,最后把这些tuple发送出去。在失败的时候,通过元信息的跟踪来保证Batch的不变性,很容易可以实现Transactional Spout和Opaque Transactional Spout的约定。


最后,我们来实战一下,用ITridentSpout来实现我们开篇示例中的Spout。开篇中我们的Spout是这样的:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 1, new Values("how are you"),
        new Values("nice to meet you"), new Values("what a good day"));


可以看到它的数据是一个String列表,每个Batch对应一个String。要实现这样一个Spout,我们需要一个String类型的集合来存放我们的数据。另外,每个Batch只包含一个tuple,就简化了我们元信息对象的设计,我们可以使用一个Integer类型来作为我们的元信息对象,记录下集合的位置下标即可。BatchCoordinator的实现:

public class TheBatchCoordinator implements ITridentSpout.BatchCoordinator<Integer> {
    @Override
    public Integer initializeTransaction(long txid, Integer prevMetadata, Integer currMetadata) {
        System.out.println("Init transaction: " + txid);
        if (txid == 1l) {
            return 0;
        }
        return prevMetadata + 1;
    }

    @Override
    public void success(long txid) {

    }

    @Override
    public boolean isReady(long txid) {
        if (txid <= TransactionalSpout.list.size()) {
            return true;
        }
        return false;
    }

    @Override
    public void close() {

    }
}


如果txid = 1,第一个Batch的话,直接返回0,对应集合的第一个下标位置。再看看Emitter的实现:

public class TheEmitter implements ITridentSpout.Emitter<Integer> {
    @Override
    public void emitBatch(TransactionAttempt tx, Integer coordinatorMeta, TridentCollector collector) {
        String sentence = TransactionalSpout.list.get(coordinatorMeta);
        collector.emit(new Values(sentence));
    }

    @Override
    public void success(TransactionAttempt tx) {

    }

    @Override
    public void close() {

    }
}


coordinatorMeta是我们的元信息对象,在本例中对应集合的下标位置,emitBatch方法中根据它直接从集合中获取tuple并发送。最后看看Spout的实现:

public class TransactionalSpout implements ITridentSpout<Integer> {
    static List<String> list = new ArrayList<>();
    static {
        list.add("how are you");
        list.add("nice to meet you");
        list.add("what a good day");
    }

    @Override
    public BatchCoordinator<Integer> getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new TheBatchCoordinator();
    }

    @Override
    public Emitter<Integer> getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new TheEmitter();
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields("sentence");
    }
}


注意我们在Spout内部维护了一个String集合。最后我们把开篇的示例换成我们自己的Spout看看:

public static void main(String[] args) {
    TransactionalSpout spout = new TransactionalSpout();

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology.newStream("spout1", spout)
            .each(new Fields("sentence"), new Split(), new Fields("word"))
            .groupBy(new Fields("word"))
            .aggregate(new Count(), new Fields("count"))
            .toStream()
            .partitionPersist(WordCountDBFactory.opaque_transactional, new Fields("word", "count"), new WordCountDBUpdater()).parallelismHint(1);

    LocalDRPC drpc = new LocalDRPC();
    topology.newDRPCStream("word", drpc)
            .stateQuery(wordCounts, new Fields("args"), new QueryWordCountDB(), new Fields("count"));

    Config conf = new Config();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("drpc-demo", conf, topology.build());

    Utils.sleep(15000);
    System.out.println("DRPC RESULT: " + drpc.execute("word", "how"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "are"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "you"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "nice"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "to"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "meet"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "what"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "a"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "good"));
    System.out.println("DRPC RESULT: " + drpc.execute("word", "day"));

    cluster.shutdown();
    drpc.shutdown();
}


现在,Transactional类型的Spout,配合Opaque Transactional的State,我们终于实现了Exactly Once的语义。

最后附上源码地址https://github.com/OuYangLiang/code-example/tree/master/storm