首页 > 代码库 > storm学习
storm学习
1.HADOOP与STORM比较
数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据),处理一些简单的业务逻辑;
处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);
是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始,(SPOUT一直循环nextTuple()方法,BOLT当有接受到消息就调用execute(Tuple input)方法);
处理速度:HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;
适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;
2.Storm的设计思想
Storm是对流Stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。
Storm将流中元素抽象为Tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。
Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。
以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。
拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。
3.流处理过程
4.Storm的基础概念
Topology : 相当于一个业务流程项目,相当于hadoop中MapperReduce中的job
Stream:消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理
tuple:就是数据的单位,需要每一个需要处理的数据的封装在tuple中
Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple
Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作
Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.
Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们.
5.本地测试代码
package stormNew; import java.util.HashMap; import java.util.Map; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class LocalTopology { public static void main(String[] args) { //组装Topology TopologyBuilder build = new TopologyBuilder(); //定义spout的id build.setSpout("spout", new Spout()); //定义bolt的id,使用new Fields("field")字段进行分组 build.setBolt("bolt", new Bolt()).fieldsGrouping("spout", new Fields("field")); try { Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalTopology", conf, build.createTopology()); // Config stormConf = new Config(); // stormConf.setNumWorkers(2); // StormSubmitter.submitTopology("luluTology", stormConf,build.createTopology()); } catch (Exception e) { e.printStackTrace(); } } public static class Spout extends BaseRichSpout{ private Map conf; private TopologyContext context; private SpoutOutputCollector collector; private int i; private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>(); //spout中循环这个方法,进行消息获取与发送 public void nextTuple() { // System.err.println("Spout:"+i); //作为每一条消息的唯一标识,用于ack的消息确认机制 int mgsid = i; //发送tuple消息到bolt中处理 this.collector.emit(new Values(i++,i%3),mgsid); //spout自身维护着消息与标识之间的关系 map.put(mgsid, i); try { //休眠一下,清晰看出效果 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //定义发送字段的名称 outputFieldsDeclarer.declare(new Fields("num","field")); } @Override //当bolt调用的ack方面时,回调spout中的ack方法 public void ack(Object msgId) { System.out.println("确认信息-----------"); } @Override //当bolt调用的fail方面时,回调spout中的fail方法 public void fail(Object msgId) { System.out.println("消息失败-----------"+map.get(msgId)); } } public static class Bolt extends BaseRichBolt{ private Map conf; private TopologyContext context; private OutputCollector collector; private int sum = 0; //bolt接受动spout发送过来的消息就调execute public void execute(Tuple input) { // TODO Auto-generated method stub //通过字段名称来获取数据 int num = input.getIntegerByField("num"); System.err.println("--------------------num:"+(num)); /** * 为了更好的看书ack消息确认机制的效果,所以直接调用ack与fail方法 * * 默认该方法是这样进行ack的调用 * try{ * 业务逻辑 * this.collector.ack(input); *}catch(){ * this.collector.fail(input); *} */ if(num >=10 && num <=20){ //System.err.println("sum:"+(sum+=num)); this.collector.ack(input); }else{ this.collector.fail(input); } } public void prepare(Map conf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub this.conf = conf; this.context = context; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { // TODO Auto-generated method stub } } }
6.Storm集群结构
主节点(Nimbus):Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测
从节点(Supervisor):Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。
启动集群
在nimbus节点执行"nohup bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行
在supervisor节点执行"nohup bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;
在nimbus节点执行"nohup bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。
在所有节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行,启动后可以通过http://{host}:8000观察日志信息。(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上。)
停止作业
先查询作业列表storm list
命令行下执行storm kill TopologyName
在storm ui上点击kill按钮
7.并行度
一个节点上最多能够运行四个worker,是四个slots
1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。
executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程会在每次循环里顺序调用所有task实例)。
task是最终运行spout或bolt中代码的执行单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。
默认情况下,一个supervisor节点会启动4个worker进程。每个worker进程会启动1个executor,每个executor启动1个task
提高并行度
worker(slots)
默认一个从节点上面可以启动4个worker进程,参数是supervisor.slots.port。在storm配置文件中已经配置过了,默认是在strom-core.jar包中的defaults.yaml中配置的有。
默认一个strom项目只使用一个worker进程,可以通过代码来设置使用多个worker进程。
通过config.setNumWorkers(workers)设置
通过conf.setNumAckers(0);可以取消acker任务
最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker之间的数据传输
如果worker使用完的话再提交topology就不会执行,会处于等待状态
executor
默认情况下一个executor运行一个task,可以通过在代码中设置
builder.setSpout(id, spout, parallelism_hint);
builder.setBolt(id, bolt, parallelism_hint);
task
通过boltDeclarer.setNumTasks(num);来设置实例的个数
executor的数量会小于等于task的数量(为了rebalance)
弹性计算
通过代码调整
topologyBuilder.setBolt("green-bolt", new GreenBolt(),2)
.setNumTasks(4).shuffleGrouping("blue-spout);
通过shell调整
# 10秒之后开始调整
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.
storm rebalance mytopology -w 10 -n 5 -e blue-spout=3 -e yellow-bolt=10
-w 代表几秒后开始执行
-n 代表几个worker
-e 代表几个excutor
stream grouping分类
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务
All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.
storm学习