首页 > 代码库 > Storm集成Siddhi

Storm集成Siddhi

《Siddhi初探》中我们介绍了Siddhi的基本使用方法,并表示我们将把Siddhi集成到Storm中作为流任务处理引擎。本文将用《Storm初探》中的例子讲解如何集成Siddhi。

《Storm初探》中的例子把名字字符串进行分割与输出,我们将增加一个SIddhiBolt进行名字过滤,过滤规则是筛选出小于50岁的人的名字。

对于输出:刘备 49 关羽 50 张飞 51,曹操 49 郭嘉 50 荀彧 51。我们将过滤出刘备,曹操两个名字。代码如下:

package com.coshaho.learn.storm;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.query.output.callback.QueryCallback;
import org.wso2.siddhi.core.stream.input.InputHandler;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

/**
 * 
 * NamesFilterSiddhiBolt.java Create on 2017年6月26日 下午11:08:45    
 *    
 * 类功能说明:   根据年龄过滤名称
 *
 * Copyright: Copyright(c) 2013 
 * Company: COSHAHO
 * @Version 1.0
 * @Author coshaho
 */
public class NamesFilterSiddhiBolt implements IRichBolt
{
    private static final long serialVersionUID = 1L;
    
    private OutputCollector collector;
    
    private InputHandler inputHandler;

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) 
    {
        this.collector = collector;
        init();
    }
    
    private void init()
    {
        SiddhiManager siddhiManager = new SiddhiManager();

        String siddhiApp = "" +
                "define stream namesStream (name string, age int, streamid String); " +
                "" +
                "@info(name = ‘namefilter‘) " +
                "from namesStream[age < 50] " +
                "select name,streamid,age " +
                "insert into outputStream ;";
        
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        siddhiAppRuntime.addCallback("namefilter", new QueryCallback() 
        {
            @Override
            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) 
            {
                for(Event event : inEvents)
                {
                    String name = event.getData(0) + "";
                    String streamId = event.getData(1) + "";
                    String age = event.getData(2) + "";
                    List<Object> splitList = new ArrayList<Object>();
                    splitList.add(name);
                    System.out.println(name + " 年龄为 " + age);
                    collector.emit(streamId, splitList);
                }
            }
        });
        
        inputHandler = siddhiAppRuntime.getInputHandler("namesStream");
        siddhiAppRuntime.start();
    }

    public void execute(Tuple input) 
    {
        String name = input.getString(0);
        int age = input.getInteger(1);
        String inputStream = input.getSourceStreamId();
        try 
        {
            inputHandler.send(new Object[]{name, age, inputStream});
        } 
        catch (InterruptedException e) 
        {
            e.printStackTrace();
        }
        
        collector.ack(input);
    }

    public void cleanup() 
    {
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) 
    {
        declarer.declare(new Fields("name"));
    }

    public Map<String, Object> getComponentConfiguration() 
    {
        return null;
    }
}

需要简单的修改一下名称切割Bolt,增加age字段输出

    public void execute(Tuple input) 
    {
        // 打印线程号用于追踪Storm的分配策略
        Thread current = Thread.currentThread();
        String names = input.getString(0);
        System.out.println("准备拆分" + names + "。当前线程号是" + current.getId() + "。");
        List<Tuple> inputList = new ArrayList<Tuple>();
        inputList.add(input);
        String[] nameArray = names.split(" ");
        int age = 49;
        for(String name : nameArray)
        {
            List<Object> splitList = new ArrayList<Object>();
            splitList.add(name);
            splitList.add(age);
            collector.emit(inputList, splitList);
            age++;
        }
        collector.ack(input);
    }

Topo发布时增加Siddhi过滤节点

    public static void main(String[] args) throws InterruptedException
    {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("names-reader", new NamesReaderSpout());
        // 启动两个名字分割Task,名字列表随机分配给一个Task
        builder.setBolt("names-spliter", new NamesSpliterBolt(), 2)
            .shuffleGrouping("names-reader");
        builder.setBolt("names-filter", new NamesFilterSiddhiBolt(), 1)
            .shuffleGrouping("names-spliter");
        // 启动两个Hello World Task,相同名字发送到同一个Task
        builder.setBolt("hello-world", new HelloWorldBolt(), 2)
            .fieldsGrouping("names-filter", new Fields("name"));
        
        Config conf = new Config();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("storm-test", conf, builder.createTopology());
    }

输出如下

技术分享

Storm集成Siddhi