首页 > 代码库 > Storm的BaseBasicBolt源码解析ack机制
Storm的BaseBasicBolt源码解析ack机制
我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。
在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。
在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
那么我们来看看BasicBolt的源码是不是这样的,不能因为看到别人的帖子说是这样的,我们就这样任务,以讹传讹,我们要To see is to believe。
为了方便看源代码,我先上我们的继承类:
public class SplitSentenceBolt extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); }
//5:执行我们自己的逻辑处理方法,接收传入的参数。
public void execute(Tuple input, BasicOutputCollector collector) { String sentence = (String)input.getValueByField("sentence"); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); word = word.toLowerCase(); collector.emit(new Values(word,1));//这个地方就是调用OutputCollector的包装类,来发消息 } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); }}
通过打断点,我们发现,bolt的task会创建这个类下面会标准执行顺序
public class BasicBoltExecutor implements IRichBolt { public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class); private IBasicBolt _bolt; private transient BasicOutputCollector _collector; //1:创建该对象,然后把我们写的SplitSentenceBolt对象赋给父类IBasicBolt。 public BasicBoltExecutor(IBasicBolt bolt) { _bolt = bolt; } public void declareOutputFields(OutputFieldsDeclarer declarer) { _bolt.declareOutputFields(declarer);//这里就是调用SplitSentenceBolt对象的方法了。 } //2:给BasicOutputCollector _collector字段赋值,BasicOutputCollector就是对OutputCollector类的包装。 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _bolt.prepare(stormConf, context); _collector = new BasicOutputCollector(collector); } //3:然后程序执行该方法,input的值source: spout1:4, stream: default, id: {}, [+ - * % /] public void execute(Tuple input) { _collector.setContext(input);//把接收到的tuple值设置给BasicOutputCollector中inputTuple字段。 try { _bolt.execute(input, _collector);//这个地方是调用我们实现类SplitSentenceBolt的ececute方法。 _collector.getOutputter().ack(input);//这个地方就是响应 } catch(FailedException e) { if(e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input);//这个地方就是响应 } } public void cleanup() { _bolt.cleanup(); } public Map<String, Object> getComponentConfiguration() { return _bolt.getComponentConfiguration(); }}
public class BasicOutputCollector implements IBasicOutputCollector { private OutputCollector out; private Tuple inputTuple; public BasicOutputCollector(OutputCollector out) { this.out = out; }
//4:把收到的tuple数据赋值给inputTuple,这个时候BasicOutputCollector对象的字段都具有值了。
public void setContext(Tuple inputTuple) { this.inputTuple = inputTuple; }
//6:这里我们发送新的(转换后的)tuple数据,看他内部的调用,其实他也会发送一个anchor tuple来保持tracker链路,
而这个anchor tuple就是bolt接收到转换前的源tuple数据。
public List<Integer> emit(List<Object> tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
} public List<Integer> emit(String streamId, List<Object> tuple) { return out.emit(streamId, inputTuple, tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { out.emitDirect(taskId, streamId, inputTuple, tuple); } public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } protected IOutputCollector getOutputter() { return out; } public void reportError(Throwable t) { out.reportError(t); }}
这里大家不要纠结bolt的启动时从哪里开始的,我后面会讲的,这里我们关注的是,BasicBoltExecutor对象创建后的执行过程,以这我们来看执行的过程。在BasicBoltExecutor的execute方法中,我们看到了ack和fail方法会被自动调用的,当我们的程序抛出异常则会执行fail方法的。
Storm的BaseBasicBolt源码解析ack机制
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。