首页 > 代码库 > 测试Storm的多源头锚定

测试Storm的多源头锚定

过程,

Spout 发送msgid 1-10

一级Bolt, msgid1的tuple做为基本组合tuple, 其他8个和一组合, 然后发送给二级Bolt, 同时单个msgid对应的tuple都ack一次,msgid1对象tuple, acker将会跟踪8个二级bolt处理情况.

二级Bolt,发送ack fail(模拟处理失败)

结果:在spout fail下出现msg1-9都失败的情况 .

拓扑代码

package storm.starter;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.StormSubmitter;import backtype.storm.drpc.DRPCSpout;import backtype.storm.task.OutputCollector;import backtype.storm.task.ShellBolt;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.TopologyBuilder;import backtype.storm.topology.base.BaseBasicBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import storm.starter.spout.RandomSentenceSpout;import java.lang.management.ManagementFactory;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.log4j.Logger;import org.apache.log4j.PropertyConfigurator;/** * This topology demonstrates Storm‘s stream groupings and multilang * capabilities. */public class WordCountTopology {	public static String GetThreadName() {		Thread thread = Thread.currentThread();		return thread.getName();	}	public static final Logger logger = Logger			.getLogger(WordCountTopology.class);	// 切分单词 一级bolt	/*	 * public static class SplitSentence extends ShellBolt implements IRichBolt	 * { public SplitSentence() { super("python", "splitsentence.py");	 * logger.error(GetThreadName() + "SplitSentence create"); }	 * 	 * // 定义字段发送	 * 	 * @Override public void declareOutputFields(OutputFieldsDeclarer declarer)	 * { declarer.declare(new Fields("word")); logger.error(GetThreadName() +	 * "declarer.declare(new Fields(word))"); }	 * 	 * @Override public Map<String, Object> getComponentConfiguration() {	 * logger.error("getComponentConfiguration"); return null; } }	 */	public static class SplitSentence implements IRichBolt {		private OutputCollector _collector;				int num = 0;		@Override		public void prepare(Map stormConf, TopologyContext context,				OutputCollector collector) {			_collector = collector;		}				private Tuple tuple1;		@Override		public void execute(Tuple tuple) {			String sentence = tuple.getString(0);		    if(sentence.equals("a")) {		    	tuple1 = tuple;		    }		    else{		    	List<Tuple> anchors = new ArrayList<Tuple>();		    	anchors.add(tuple1);		    	anchors.add(tuple);		    	_collector.emit(anchors, new Values(sentence + "a"));		    	_collector.ack(tuple);		    	_collector.ack(tuple1);		    }			//			for (String word : sentence.split(" ")){//				_collector.emit(tuple, new Values(word));//			}//			num++;						System.out.println("Bolt Thread " + Thread.currentThread().getName() + "recve : " + sentence);				System.out.println( num + " bolt recev:" + tuple.getMessageId().getAnchorsToIds());					}		@Override		public void cleanup() {		}		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			declarer.declare(new Fields("word"));		}		@Override		public Map<String, Object> getComponentConfiguration() {			// TODO Auto-generated method stub			return null;		}	}		public static class CountCount1 implements IRichBolt {		Map<String, Integer> counts = new HashMap<String, Integer>();		private OutputCollector _collector;		int num = 0;		@Override		public void execute(Tuple tuple) {			String word = tuple.getString(0);			//logger.error(this.toString() + "word = " + word);			Integer count = counts.get(word);			if (count == null)				count = 0;						count++;			counts.put(word, count); 			num++;						_collector.fail(tuple);			//_collector.ack(tuple);					   //_collector.emit(tuple, new Values(word, count));		}		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			// logger.error("declareOutputFields :");			declarer.declare(new Fields("result", "count"));		}		@Override		public void prepare(Map stormConf, TopologyContext context,				OutputCollector collector) {			// TODO Auto-generated method stub			_collector = collector;		}		@Override		public void cleanup() {			// TODO Auto-generated method stub					}		@Override		public Map<String, Object> getComponentConfiguration() {			// TODO Auto-generated method stub			return null;		}	}			public static class WordCount extends BaseBasicBolt {		private OutputCollector _collector;		Map<String, Integer> counts = new HashMap<String, Integer>();		@Override		public void execute(Tuple tuple, BasicOutputCollector collector) {			String word = tuple.getString(0);			//logger.error(this.toString() + "word = " + word);			Integer count = counts.get(word);			if (count == null)				count = 0;			count++;			counts.put(word, count); // <key, list<value, count> >			//logger.error(this.toString() + "count = " + count);			collector.emit(new Values(word, count));		}		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			// logger.error("declareOutputFields :");			declarer.declare(new Fields("result", "count"));		}	}	public static class WordCount1 extends BaseBasicBolt {		Map<String, Integer> counts = new HashMap<String, Integer>();		@Override		public void execute(Tuple tuple, BasicOutputCollector collector) {			// logger.error("WordCount1");			// tuple.getFields()[0];			if (tuple.getFields().contains("result")) {				String count = (String) tuple.getValueByField("result");				// tuple.getValueByField(field)				long countl = -0;// = Long.valueOf(count);				// logger.error(this.toString() + " key  = resultkey " + count);			}			if (tuple.getFields().contains("count")) {				Integer count = (Integer) tuple.getValueByField("count");				// tuple.getValueByField(field)				long countl = -0;// = Long.valueOf(count);				//logger.error(this.toString() + " key  = count " + count);			}			// String word = tuple.getString(0);			// logger.error(this.toString() +"word = " + word);			// Integer count = counts.get(word);			// if (count == null)			// count = 0;			// count++;			// counts.put(word, count);			// logger.error(this.toString() + "count = " + count);			// collector.emit(new Values(word, count));		}		@Override		public void declareOutputFields(OutputFieldsDeclarer declarer) {			// logger.error("declareOutputFields :");			declarer.declare(new Fields("word1", "count1"));		}	}	public static void main(String[] args) throws Exception {		TopologyBuilder builder = new TopologyBuilder();		PropertyConfigurator				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");			    // parallelism_hint 代表是executor数量, setNumTasks 代表Tasks数量		builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(2);        builder.setBolt("split", new SplitSentence(), 8).setNumTasks(1).shuffleGrouping("spout");		builder.setBolt("count", new CountCount1(), 12).fieldsGrouping("split",				new Fields("word"));//		builder.setBolt("WordCount1", new WordCount1(), 1).fieldsGrouping(//				"count", new Fields("result", "count"));		Config conf = new Config();		conf.setDebug(true);
                //  这个设置一个spout task上面最多有多少个没有处理(ack/fail)的tuple,防止tuple队列过大, 只对可靠任务起作用 conf.setMaxSpoutPending(2); conf.setMessageTimeoutSecs(5); // 消息处理延时 conf.setNumAckers(2); // 消息处理acker System.out.println("args.length = " + args.length); if (args != null && args.length > 0) { conf.setNumWorkers(5); // 设置工作进程 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { // 每个组件的最大executor数 conf.setMaxTaskParallelism(1); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); String str = "testdrpc"; // drpc.execute("testdrpc", str); Thread.sleep(1088000); cluster.shutdown(); } }}

 spout代码

package storm.starter.spout;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.utils.Utils;import java.util.Map;import java.util.Random;import org.apache.log4j.Logger;import storm.starter.WordCountTopology;// IRichSpout public class RandomSentenceSpout extends BaseRichSpout { 	SpoutOutputCollector _collector;	Random _rand;	public static final Logger logger = Logger			.getLogger(RandomSentenceSpout.class);	@Override	public void open(Map conf, TopologyContext context,			SpoutOutputCollector collector) {		_collector = collector;		_rand = new Random();		WordCountTopology.logger.error(this.toString()				+ "RandomSentenceSpout is create");	}	private int num = 0;	private String gettmstr() {		StringBuilder tmp = new StringBuilder();		for (int i = 0; i <= num; i++)			tmp.append("a");		num++;		return tmp.toString();	}	@Override	public void nextTuple() {		Utils.sleep(200);		// String[] sentences = new String[]{ "the cow jumped over the moon",		// "an apple a day keeps the doctor away",		// "four score and seven years ago", "snow white and the seven dwarfs",		// "i am at two with nature" };		String[] sentences = new String[] { "A" };		String sentence = gettmstr(); // sentences[_rand.nextInt(sentences.length)];		if (num < 10) {			_collector.emit(new Values(sentence), new Integer(num));			// logger.error(this.toString() + "send sentence = " + sentence);		   // System.out.println(Thread.currentThread().getName() + " Spout ");		}	}	@Override	public void ack(Object id) {		logger.error(this.toString() + "spout ack =" + (Integer)id);	}	@Override	public void fail(Object id) {		logger.error("spout fail =" + (Integer)id);	}	@Override	public void declareOutputFields(OutputFieldsDeclarer declarer) {		declarer.declare(new Fields("word"));	}}

 运行结果

2014-10-03 21:17:31,149 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =12014-10-03 21:17:31,351 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =2Bolt Thread Thread-22recve : aaa0 bolt recev:{-3139141336114052337=7131499433188364504}2014-10-03 21:17:31,552 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =3Bolt Thread Thread-22recve : aaaa0 bolt recev:{-4497680640148241887=-615828348570847097}2014-10-03 21:17:31,754 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =4Bolt Thread Thread-22recve : aaaaa0 bolt recev:{-8878772617767839827=-7708082520013359311}2014-10-03 21:17:31,957 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =5Bolt Thread Thread-22recve : aaaaaa0 bolt recev:{-3995020874692495577=-5070846720162762196}2014-10-03 21:17:32,160 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =6Bolt Thread Thread-22recve : aaaaaaa0 bolt recev:{-5994700617723404155=-3738685841476816404}2014-10-03 21:17:32,362 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =7Bolt Thread Thread-22recve : aaaaaaaa0 bolt recev:{-2308734827213127682=-5719708045753233056}2014-10-03 21:17:32,563 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =8Bolt Thread Thread-22recve : aaaaaaaaa0 bolt recev:{-3718844156917119468=-6359724009048981605}2014-10-03 21:17:32,766 ERROR (storm.starter.spout.RandomSentenceSpout:71) - spout fail =9

 

测试Storm的多源头锚定