首页 > 代码库 > storm trident 示例

storm trident 示例

Storm Trident的核心数据模型是一批一批被处理的“流”,“流”在集群的分区在集群的节点上,对“流”的操作也是并行的在每个分区上进行。

Trident有五种对“流”的操作:

1.      不需要网络传输的本地批次运算

2.      需要网络传输的“重分布”操作,不改变数据的内容

3.      聚合操作,网络传输是该操作的一部分

4.      “流”分组(grouby)操作

5.      合并和关联操作

批次本地操作:

批次本地操作不需要网络传输,本格分区(partion)的运算是互相独立的

Functions(函数)

         函数接收一些字段并发射(emit)零个或更多“元组”。输出的字段附加在原始的元组上面。如果一个函数不发射数据,原始的数据被过滤。如果发射(emit)多个元组,原始的元组被重复的冗余到输出元组上。例如:

public class MyFunction extends BaseFunction {    public void execute(TridentTuple tuple, TridentCollector collector) {        for(int i=0; i < tuple.getInteger(0); i++) {            collector.emit(new Values(i));        }    }}

假设有一个叫“mystream”输入流有[“a”,“b”,“c“]三个字段

[1, 2, 3]
[4, 1, 6]
[3, 0, 8]

如果运行下面的代码:

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

运行的结果将会有4个字段[“a”,“b”,“c”,“d”],如下:

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

 

Filters(过滤)

Filters接收一个元组(tuple),决定是否需要继续保留这个元组。,比如:

public class MyFilter extends BaseFunction {    public booleanisKeep(TridentTuple tuple) {        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;    }}

假设有如下输入:

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

运行下面的代码:

mystream.each(new Fields("b","a"), new MyFilter())

结果将会如下:

[2, 1, 1]

 

分区汇聚

         分区汇总是在每个批次的分区上运行的函数,和函数不同的是,分区汇总发射(emit)的数据覆盖了原始的tuple。考虑下面的例子:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))

假设输入的“流”包含【“a”,“b”】两个字段,并且按照如下分区

Partition 0:
["a", 1]
["b", 2]
 
Partition 1:
["a", 3]
["c", 8]
 
Partition 2:
["e", 1]
["d", 9]
["d", 10

 

输出的“流”将只包含一个叫“sum”的字段:

Partition 0:
[3]
 
Partition 1:
[11]
 
Partition 2:
[20]

这里有定义了三种不同的聚合接口:CombinerAggreator,ReduceAggregator和Aggregate。

CombinerAggregator:

public interface CombinerAggregator<T> extends Serializable {    T init(TridentTuple tuple);    T combine(T val1, T val2);    T zero();}

一个CombinerAggregator返回一个单独的元组,这个元组值有一个字段。CombinerAggregator在每个tuple上运行init,使用combine去联合结果直到只有一个tuple剩余。如果批次没有数据,运行zero函数。比如,下面实现了一个Count

public class Count implements CombinerAggregator<Long> {    public Long init(TridentTuple tuple) {        return 1L;    }     public Long combine(Long val1, Long val2) {        return val1 + val2;    }     public Long zero() {        return 0L;    }}

 

         RducerAggregator接口如下:可以看到,CombinerAggregator的优势使用聚合函数代替分区聚合。在这种情况下,trident自动优化成,在网络传输前合一做局部的汇总。(类似于mapreduce的combine)。

public interface ReducerAggregator<T> extends Serializable {    T init();    T reduce(T curr, TridentTuple tuple);}


         RducerAggregator在初始化的时候产生一个值,每个输入的元组在这个值的基础上进行迭代并输出一个单独的值,例如下面定义了一个Count的reduceAggegator:

public class Count implements ReducerAggregator<Long> {    public Long init() {        return 0L;    }        public Long reduce(Long curr, TridentTuple tuple) {        return curr + 1;    }}

 


 
ReducerAggregator可以和persistentAggregate一起使用,后面会讲到。

         更加通用聚合接口是Aggregator,如下:

public interface Aggregator<T> extends Operation {    T init(Object batchId, TridentCollector collector);    voidaggregate(T state, TridentTuple tuple, TridentCollector collector);    voidcomplete(T state, TridentCollector collector);}

 

1.      Init函数在执行批次操作之前被调用,并返回一个state对象,这个额对象将会会传入到aggregate和complete函数中。         Aggregator可以发射任何数量的输出tuple,这些tuple可以包含多个字段(fields)。可以在执行过程的任何点发射输出。聚合按照下面的方式执行:

2.      Aggregate会对批次中每个tuple调用,这个方法可以跟新state也可以发射(emit)tuple。

3.      当这个批次分区的数据执行结束后调用complete函数。

下面是一个使用Aggregate事项的Count

public class CountAgg extends BaseAggregator<CountState> {    static class CountState {        long count = 0;    }     public CountState init(Object batchId, TridentCollector collector) {        return new CountState();    }     public voidaggregate(CountState state, TridentTuple tuple, TridentCollector collector) {        state.count+=1;    }     public voidcomplete(CountState state, TridentCollector collector) {        collector.emit(new Values(state.count));    }}


如果想同事执行多个聚合,可以使用如下的调用链

mystream.chainedAgg()        .partitionAggregate(new Count(), new Fields("count"))        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))        .chainEnd()

 


 
这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。

StateQuery和partitionPersist

         stateQuery和partitionPersistent查询和跟新状态。可以参考trident state doc。https://github.com/nathanmarz/storm/wiki/Trident-state
投影(projection)

投影操作是对数据上进行列裁剪,如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:

mystream.project(new Fields("b","d"))

输出流将只有【“b”,“d”】两个字段。

 

重分区(repartition)操作

         重分区操作是通过一个函数改变元组(tuple)在task之间的分布。也可以调整分区数量(比如,如果并发的hint在repartition之后变大)重分区(repatition)需要网络传输。,线面是重分区函数:

1.      Shuffle:使用随机算法在目标分区中选一个分区发送数据

2.      Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。如果你想做在每个分区上做一个statequery。

3.      paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发字段(fields)分发到相同的目标分区。

4.      global:所有的tuple分发到相同的分区。这个分区所有的批次相同。

5.      batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分区。

6.      patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。

 

聚合操作

         Trident有aggregate和persistentAggregate函数对流做聚合。Aggregate在每个批次上独立运行,persistentAggregate聚合流的所有的批次并将结果存储下来。

         在一个流上做全局的聚合,可以使用reducecerAggregator或者aggretator,这个流先被分成一个分区,然后聚合函数在这个分区上运行。如果使用CombinerAggreator,Trident贤惠在每个分区上做一个局部的汇总,然后重分区冲为一个分区,在网络传输结束后完成聚合。CombinerAggreator非常有效,在尽可能的情况下多使用。

 

下面是一个做批次内聚合的例子:

mystream.aggregate(new Count(), new Fields("count"))

和partitionAggregate一样,聚合的aggregate也可以串联。如果将CombinerAggreator和非CombinerAggreator串联,trident就不能做局部汇总的优化。

 

流分组操作

         GroupBy操作根据特殊的字段对流进行重分区,分组字段相同的元组(tuple)被分到同一个分区,下面是个GroupBy的例子:

 

如果对分组的流进行聚合,聚会会对每个组聚合而不是这个批次聚合。(和关系型数据库的groupby相同)。PersistentAggregate也可以在分组的流哈桑运行,这种情况下结果将会存储在MapState里面,key是分组字段。可以查看https://github.com/nathanmarz/storm/wiki/Trident-state。

和普通聚合一样,分组流的聚合也可以串联。

 

合并和关联

         最后一部分API是将不通的流合并,最简单的方式就是合并(meger)多个流成为一个流。可以使用tridentTopology#meger,如下:

topology.merge(stream1, stream2, stream3);

Trident合并的流字段会一第一个流的字段命名。

         另一个中合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。Trident中的join只会在spout发出的每个批次见进行。

 

下面是个join的例子,一个流包含字段【“key”,“val1”,“val2”】,另一个流包含字段【“x”,“val1”】:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));

Stream1的“key”和stream2的“x”关联。Trident要求所有的字段要被名字,因为原来的名字将会会覆盖。Join的输入会包含:

1.      首先是join字段。例子中stream1中的“key”对应stream2中的“x”。

2.      接下来,会吧非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。

当join的流分别来自不通的spout,这些spout会同步发射的批次,也就是说,批次处理会包含每个spout发射的tuple。

         有人可能会问怎么做“windowedjoin”,join的一边和另一边最近一个小时的数据做join运算。

         为了实现这个,你可以使用patitionPersist和stateQuery。最近一个小时的数据可以按照join字段做key存储下改变,在join的过程中可以查询存储的额数据完成join操作。

 

 

    FixedBatchSpout spout=new FixedBatchSpout(new Fields("sentence"), 3,              new Values("The cow jumped over the moon"),                new Values("The man went to the store and bought some candy"),                new Values("Four score and seven years ago"),                new Values("How many apples can you eat"),                new Values("To be or not to be the person"));       spout.setCycle(true);

 

 

或者:

   

 BrokerHosts brokerHosts =  new ZkHosts(ConfigFactory.getConfigProps().getString(ConfigProps.KEY_ZOOKEEPER_KAFKA));    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, ConfigProps.TOPIC_USER, "pv");     TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);         TridentTopology topology=new TridentTopology();       TridentState tridentState = topology.newStream("spout1", spout)           .parallelismHint(16)           .each(new Fields("sentence"), new Split(), new Fields("item"))           .each(new Fields("item"), new LowerCase(), new Fields("word"))           //输入item  lowcase 操作   输出word           .groupBy(new Fields("word"))//根据上一步输出word聚合           .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))           //聚合并且持久化           .parallelismHint(6);       topology.newDRPCStream("words", localDRPC)           .each(new Fields("args"), new Split(), new Fields("word"))           .groupBy(new Fields("word"))           .stateQuery(tridentState, new Fields("word"), new MapGet(), new Fields("count"))           //tridentState的输出结果作为输入源           .each(new Fields("count"), new FilterNull())           .aggregate(new Fields("count"), new Sum(), new Fields("sum"));       return topology.build();

 

storm trident 示例