首页 > 代码库 > Storm 中drpc调用

Storm 中drpc调用

package storm.starter;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.StormSubmitter;import backtype.storm.drpc.DRPCSpout;import backtype.storm.task.ShellBolt;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import storm.starter.spout.RandomSentenceSpout;import java.lang.management.ManagementFactory;import java.util.HashMap;import java.util.Map;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;/** * This topology demonstrates Storm‘s stream groupings and multilang * capabilities. */public class Drpctest {	public static final Logger logger = Logger.getLogger(Drpctest.class);	public static class WordCount extends BaseBasicBolt {		Map<String, Integer> counts = new HashMap<String, Integer>();		@Override		public void execute(Tuple tuple, BasicOutputCollector collector) {			String word = tuple.getString(0);			logger.error(this.toString() + "word = " + word);			Integer count = counts.get(word);			if (count == null)				count = 0;			count++;			counts.put(word, count);			logger.error(this.toString() + "count = " + count);			collector.emit(new Values(word, count));		}		String str = Thread.currentThread().getName();		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			logger.error("declareOutputFields :");			declarer.declare(new Fields("result", "count"));		}	}	public static class DrpcBolt extends BaseBasicBolt {		Map<String, Integer> counts = new HashMap<String, Integer>();		@Override		public void execute(Tuple tuple, BasicOutputCollector collector) {			String logString = tuple.getString(0);			logger.error("DrpcBolt recve :" + logString);		}		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			// 暂时没用			declarer.declare(new Fields("word1", "count1"));		}	}	public static void main(String[] args) throws Exception {		TopologyBuilder builder = new TopologyBuilder();		// drpc		LocalDRPC drpc = new LocalDRPC();		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);		builder.setSpout("drpcspout", drpc_spout, 3);		PropertyConfigurator				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");		// 接入drpc		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(				"drpcspout");		Config conf = new Config();		conf.setDebug(true);		if (args != null && args.length > 0) {			conf.setNumWorkers(3);			StormSubmitter.submitTopology(args[0], conf,					builder.createTopology());		} else {			conf.setMaxTaskParallelism(3);			conf.setDebug(true);			LocalCluster cluster = new LocalCluster();			cluster.submitTopology("word-count", conf, builder.createTopology());			String str = "send test drpc"; // 和 DRPCSpout 名字对应			drpc.execute("testdrpc", str);			Thread.sleep(10000);			cluster.shutdown();		}	}}

 

Storm 中drpc调用