首页 > 代码库 > storm学习

storm学习

1.HADOOP与STORM比较
数据来源:HADOOP处理的是HDFS上TB级别的数据(历史数据),STORM是处理的是实时新增的某一笔数据(实时数据),处理一些简单的业务逻辑;
处理过程:HADOOP是分MAP阶段到REDUCE阶段,STORM是由用户定义处理流程,流程中可以包含多个步骤,每个步骤可以是数据源(SPOUT)或处理逻辑(BOLT);
是否结束:HADOOP最后是要结束的,STORM是没有结束状态,到最后一步时,就停在那,直到有新数据进入时再从头开始,(SPOUT一直循环nextTuple()方法,BOLT当有接受到消息就调用execute(Tuple input)方法);
处理速度:HADOOP是以处理HDFS上TB级别数据为目的,处理速度慢,STORM是只要处理新增的某一笔数据即可,可以做到很快;
适用场景:HADOOP是在要处理批量数据时用的,不讲究时效性,STORM是要处理某一新增数据时用的,要讲时效性;


2.Storm的设计思想
Storm是对流Stream的抽象,流是一个不间断的无界的连续tuple,注意Storm在建模事件流时,把流中的事件抽象为tuple即元组。
Storm将流中元素抽象为Tuple,一个tuple就是一个值列表value list,list中的每个value都有一个name,并且该value可以是基本类型,字符类型,字节数组等,当然也可以是其他可序列化的类型。
Storm认为每个stream都有一个stream源,也就是原始元组的源头,所以它将这个源头称为Spout。
有了源头即spout也就是有了stream,那么该如何处理stream内的tuple呢。将流的状态转换称为Bolt,bolt可以消费任意数量的输入流,只要将流方向导向该bolt,同时它也可以发送新的流给其他bolt使用,这样一来,只要打开特定的spout(管口)再将spout中流出的tuple导向特定的bolt,又bolt对导入的流做处理后再导向其他bolt或者目的地。   

以上处理过程统称为Topology即拓扑。拓扑是storm中最高层次的一个抽象概念,它可以被提交到storm集群执行,一个拓扑就是一个流转换图,图中每个节点是一个spout或者bolt,图中的边表示bolt订阅了哪些流,当spout或者bolt发送元组到流时,它就发送元组到每个订阅了该流的bolt(这就意味着不需要我们手工拉管道,只要预先订阅,spout就会将流发到适当bolt上)。

拓扑的每个节点都要说明它所发射出的元组的字段的name,其他节点只需要订阅该name就可以接收处理。

 

3.流处理过程


技术分享

技术分享

 

4.Storm的基础概念

Topology : 相当于一个业务流程项目,相当于hadoop中MapperReduce中的job

Stream:消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理

tuple:就是数据的单位,需要每一个需要处理的数据的封装在tuple中

Spouts 消息源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple

Bolts 消息处理者,所有的消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作

Task 每一个Spout和Bolt会被当作很多task在整个集群里面执行,每一个task对应到一个线程.

Stream groupings 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们.

5.本地测试代码

 

package stormNew;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LocalTopology {
	
	public static void main(String[] args) {
		
		//组装Topology
		TopologyBuilder build =  new TopologyBuilder();
		//定义spout的id
		build.setSpout("spout", new Spout());
		//定义bolt的id,使用new Fields("field")字段进行分组
		build.setBolt("bolt", new Bolt()).fieldsGrouping("spout", new Fields("field"));
		
		try {
			Config conf = new Config();
			LocalCluster cluster = new LocalCluster();
			cluster.submitTopology("LocalTopology", conf, build.createTopology());
//			Config stormConf = new Config();
//			stormConf.setNumWorkers(2);
//			StormSubmitter.submitTopology("luluTology", stormConf,build.createTopology());
		} catch (Exception e) {
			e.printStackTrace();
		} 
	}
	
	public static class Spout extends BaseRichSpout{
      
		private Map conf;
		private TopologyContext context;
		private SpoutOutputCollector collector;
		
		private int i;
		private HashMap<Integer,Integer> map = new HashMap<Integer,Integer>();
		
		//spout中循环这个方法,进行消息获取与发送
		public void nextTuple() {
		    // System.err.println("Spout:"+i);
			//作为每一条消息的唯一标识,用于ack的消息确认机制
		    int mgsid = i;
		    //发送tuple消息到bolt中处理
		    this.collector.emit(new Values(i++,i%3),mgsid);
		    //spout自身维护着消息与标识之间的关系
		    map.put(mgsid, i);
		    try {
		    	//休眠一下,清晰看出效果
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

		public void open(Map conf, TopologyContext context,
				SpoutOutputCollector collector) {
			this.conf = conf;
			this.context = context;
			this.collector = collector;
		}

		
		public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
			//定义发送字段的名称
			outputFieldsDeclarer.declare(new Fields("num","field"));
		}

		@Override
		//当bolt调用的ack方面时,回调spout中的ack方法
		public void ack(Object msgId) {
			System.out.println("确认信息-----------");
		}

		@Override
		//当bolt调用的fail方面时,回调spout中的fail方法
		public void fail(Object msgId) {
			System.out.println("消息失败-----------"+map.get(msgId));
		}
		
		
		
	}
	
	
	public static class Bolt extends BaseRichBolt{

		
		private Map conf;
		private TopologyContext context;
		private OutputCollector collector;
		
		private int sum = 0;
		
		//bolt接受动spout发送过来的消息就调execute
		public void execute(Tuple input) {
			// TODO Auto-generated method stub
			//通过字段名称来获取数据
			int num = input.getIntegerByField("num");
			System.err.println("--------------------num:"+(num));
			
			/**
			 * 为了更好的看书ack消息确认机制的效果,所以直接调用ack与fail方法
			 * 
			 * 默认该方法是这样进行ack的调用
			 * try{
			 *		业务逻辑
			 *		this.collector.ack(input);
			 *}catch(){
			 *		this.collector.fail(input);
			 *}
			 */
			if(num >=10 && num <=20){
				//System.err.println("sum:"+(sum+=num));
				this.collector.ack(input);
			}else{
				this.collector.fail(input);
			}
		}

		public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
			// TODO Auto-generated method stub
			this.conf = conf;
			this.context = context;
			this.collector = collector;
		}

		public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
			// TODO Auto-generated method stub
			
		}
		
	}
	
}

 

 

 

6.Storm集群结构
主节点(Nimbus):Nimbus负责在集群范围内分发代码、为worker分配任务和故障监测

从节点(Supervisor):Supervisor监听分配给它所在机器的工作,基于Nimbus分配给它的事情来决定启动或停止工作者进程。每个工作者进程执行一个topology的子集(也就是一个子拓扑结构);一个运行中的topology由许多跨多个机器的工作者进程组成。

启动集群

在nimbus节点执行"nohup bin/storm nimbus >/dev/null 2>&1 &"启动Nimbus后台程序,并放到后台执行

在supervisor节点执行"nohup bin/storm supervisor >/dev/null 2>&1 &"启动Supervisor后台程序,并放到后台执行;

在nimbus节点执行"nohup bin/storm ui >/dev/null 2>&1 &"启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbus host}:8080观察集群的worker资源使用情况、Topologies的运行状态等信息。

在所有节点执行"nohup bin/storm logviewer >/dev/null 2>&1 &"启动log后台程序,并放到后台执行,启动后可以通过http://{host}:8000观察日志信息。(nimbus节点可以不用启动logviewer进程,因为logviewer进程主要是为了方便查看任务的执行日志,这些执行日志都在supervisor节点上。)

停止作业

先查询作业列表storm list
命令行下执行storm kill TopologyName
在storm ui上点击kill按钮

 

7.并行度

技术分享

 

一个节点上最多能够运行四个worker,是四个slots

技术分享

 

1个worker进程执行的是1个topology的子集(注:不会出现1个worker为多个topology服务)。1个worker进程会启动1个或多个executor线程来执行1个topology的component(spout或bolt)。因此,1个运行中的topology就是由集群中多台物理机上的多个worker进程组成的。

executor是1个被worker进程启动的单独线程。每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程会在每次循环里顺序调用所有task实例)。

task是最终运行spout或bolt中代码的执行单元(注:1个task即为spout或bolt的1个实例,executor线程在执行期间会调用该task的nextTuple或execute方法)。topology启动后,1个component(spout或bolt)的task数目是固定不变的,但该component使用的executor线程数可以动态调整(例如:1个executor线程可以执行该component的1个或多个task实例)。这意味着,对于1个component存在这样的条件:#threads<=#tasks(即:线程数小于等于task数目)。默认情况下task的数目等于executor线程数目,即1个executor线程只运行1个task。
默认情况下,一个supervisor节点会启动4个worker进程。每个worker进程会启动1个executor,每个executor启动1个task

 

提高并行度
worker(slots)
默认一个从节点上面可以启动4个worker进程,参数是supervisor.slots.port。在storm配置文件中已经配置过了,默认是在strom-core.jar包中的defaults.yaml中配置的有。
默认一个strom项目只使用一个worker进程,可以通过代码来设置使用多个worker进程。
通过config.setNumWorkers(workers)设置
通过conf.setNumAckers(0);可以取消acker任务
最好一台机器上的一个topology只使用一个worker,主要原因是减少了worker之间的数据传输
如果worker使用完的话再提交topology就不会执行,会处于等待状态
executor
默认情况下一个executor运行一个task,可以通过在代码中设置
builder.setSpout(id, spout, parallelism_hint);
builder.setBolt(id, bolt, parallelism_hint);
task
通过boltDeclarer.setNumTasks(num);来设置实例的个数
executor的数量会小于等于task的数量(为了rebalance)

弹性计算

通过代码调整
topologyBuilder.setBolt("green-bolt", new GreenBolt(),2)
.setNumTasks(4).shuffleGrouping("blue-spout);
通过shell调整
# 10秒之后开始调整
# Reconfigure the topology "mytopology" to use 5 worker processes,
# the spout "blue-spout" to use 3 executors and
# the bolt "yellow-bolt" to use 10 executors.
storm rebalance mytopology -w 10 -n 5 -e blue-spout=3 -e yellow-bolt=10
-w 代表几秒后开始执行

-n 代表几个worker

-e 代表几个excutor

 

stream grouping分类
Shuffle Grouping: 随机分组, 随机派发stream里面的tuple, 保证bolt中的每个任务接收到的tuple数目相同.(它能实现较好的负载均衡)
Fields Grouping:按字段分组, 比如按userid来分组, 具有同样userid的tuple会被分到同一任务, 而不同的userid则会被分配到不同的任务
All Grouping: 广播发送,对于每一个tuple,Bolts中的所有任务都会收到.


 

storm学习