首页 > 代码库 > 近期工作总结

近期工作总结

cdn注入流程详细分解:主要包括cdnadapter、ci、cl、cpm、mysql、sqlite


一、cdnadapter
1.cdnadapter接收到AutoTest发来的消息:以下是非标准的A3消息
<span style="font-size:14px;"><span style="font-size:12px;">POST /TransferContent HTTP/1.1
User-Agent: Java/1.6.0_21
Host: 172.30.25.245:8071
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive
Content-type: application/x-www-form-urlencoded
Content-Length: 430


<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<TransferContent interval="6" contentType="VOD" startNext="false" responseURL="http://172.30.25.111:60000/TransferStatus" volumeName="example" assetID="222222222222222" providerID="example">
   <Input format="ts" codec="h264" isEncrypted="false" sourceURL="http://172.30.25.246:8090/home/dengxiaobing/ad1.ts" transferBitRate="800" transferCoder="false"/>
</TransferContent></span></span>


2.解析接收到的消息:
2.1以 "媒资文件_码率" 的方式作为注入任务的assetid插入到sqlite数据库中
2.2生成一级m3u8文件,注意一级m3u8文件的格式以及一级m3u8文件和二级的区别

3.在第2步中,往数据库中插入了记录之后,就意味着cdnadapter真正的开始向ci发消息了,
消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">ReqMsg  =
POST /TransferContent HTTP/1.1
User-Agent: CDNAdapter
Host: 172.30.25.246:8886
Content-Type: text/xml
Content-Length: 411
Date: 2014-08-30 11:20:34
Connection: close


<?xml version ="1.0" encoding="UTF-8"?>
<TransferContent providerID="example" assetID="222222222222222_800" volumeName="example" transferBitRate="800" responseURL="http://172.30.25.246:8888">
	<Input sourceURL="http://172.30.25.246:8090/home/dengxiaobing/ad1.ts" sourceIP="" codec="h264" transferCoder="false"/>
	<ExtraFile>
		<file type="top"/>
	</ExtraFile>
	<ContentAsset>
		<!-- metadata -->
	</ContentAsset>
</TransferContent></span></span>




当发送完这条消息后,注入工作就真正的开始了。


4.cdnadapter不断向ci发送查询状态的消息,消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;"><TransferContent providerID="example" assetID="S001" contentType=”VOD” volumeName="example" responseURL="http://192.168.1.1:8001/" interval="6">
	<Input transferBitRate="500" isEncrypted="true" codec=” h264” format =”ts” sourceURL="ftp://192.168.1.12:6000/disk2/500K.ts" userName="FtpClient1" password="H47G113NN4"/>
	<Input transferBitRate="800" isEncrypted="true" codec=” h264” format =”ts” sourceURL="ftp://192.168.1.44:6000/disk2/800K.ts" userName="FtpClient1" password="H47G113NN4" transferCoder="true" audioBitrate=32 resolution="1280x720" frameRate=10/>
</TransferContent>
</span></span>



5.与注入相类似的还有一个流程就是分发
5.1分发的时候,在cdnadapter中,会从数据库中查找state为Pending的任务;
5.2所谓分发,就是分发到另一个cdn系统,入口还是cdnadapter。
5.3如果存在这样的任务,则发送一条这样的消息:注意这个消息是发送到另一个cdnadapter的;
<span style="font-size:14px;"><span style="font-size:12px;">cdnadapter ==> cdnadapter
<?xml version="1.0" encoding="UTF-8"?>
<TransferContent interval="6" contentType="VOD" startNext="false" responseURL="http://172.30.25.111:60000/TransferStatus" volumeName="example" assetID="1111111112220" providerID="mp4">
	<Input format="mp4" codec="h264" isEncrypted="false" sourceURL="http://172.30.25.246:8099/vod/example_001_800" transferBitRate="1000" transferCoder="false"/>
	<Input format="mp4" codec="h264" isEncrypted="false" sourceURL="http://172.30.25.246:8099/vod/example_001_800" transferBitRate="800" transferCoder="false"/>
</TransferContent></span></span>


5.4这里的另一个cdnadapter地址就是对应cdnadapter配置文件中的<cdnadapter_addrs>字段
5.5配置文件中的这个third_cdn_addrs字段表示从第三方cdn拉取内容,这是一个源url,最终会形成一个这样的串http://172.30.25.246:8099/vod/example_001_800
6.分发是在注入完成之后才进行的,注入完成后,加入我们打开了分发配置选项,那么,将会在数据库的分发那张表中,插入分发任务,此时state为Pending
7.和注入一样,这里也需要查询分发任务的状态,通过源代码可以发现是这样一条消息:


<span style="font-size:14px;"><span style="font-size:12px;"><?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<GetTransferStatus contentType="VOD" volumeName="" assetID="" providerID="">
	 <Input format="" codec="h264" transferBitRate=""/>
</GetTransferStatus></span></span>


和转码不同的是这里需要指定具体的编码格式以及波特率等。


至此cdnadapter的功能就分析完毕了。


二、ci
1.首先应该启动ci模块,完成一些初始化任务,初始化主要做的事情如下
1.1初始化监听的线程,这是各个模块必须做的事情
1.2加载动态库streamres.so,该库用于生成索引文件
1.3初始化一个线程用于获取注入任务,初始化一个用于删除过期任务的线程,初始化一个用于上报md5的线程
2.接下来同样看看,是怎样解析cdnadapter传过来的消息,过上面我们知道接收到的消息是这样的:
<span style="font-size:14px;"><span style="font-size:12px;"><?xml version ="1.0" encoding="UTF-8"?>
<TransferContent providerID="example" assetID="222222222222222_800" volumeName="example" transferBitRate="800" responseURL="http://172.30.25.246:8888">
	<Input sourceURL="http://172.30.25.246:8090/home/dengxiaobing/ad1.ts" sourceIP="" codec="h264" transferCoder="false"/>
	<ExtraFile>
		<file type="top"/>
	</ExtraFile>
	<ContentAsset>
		<!-- metadata -->
	</ContentAsset>
</TransferContent></span></span>


注意这里的Input只有一个,同时解析也只实现对一个Input进行处理
3.providerID以及assetID的大小不能超过128个字节,否则将会报错。
3.1解析完上述消息后就是将任务插入到数据库中去了
3.2在插入数据库之前,还要检查该任务是否已经注入或者注入失败
3.3插入任务的时候会将任务state设置为Pending
4.对应着config配置文件中的内容总结几点
4.1 索引类型分为两种top和idx
4.2 top类型的索引文件是在OTT中使用的,而idx类型的索引文件是在DVB中使用的
4.3 这里的倍速参数在OTT中没有使用到,只是在DVB中才会用到
4.4 m3u8文件也只在OTT中使用到
5.我们知道监听线程在解析监听到的消息后就会往数据库中A3_message中添加Pending任务,而我们在初始化的时候
已经开启了一个线程专门用从A3_message中获取Pending任务,一旦获取到Pending任务就会往另外一个表ingest_task中
插入一条状态为Init的任务,从这里开始才真正进入注入流程


6.注入流程
6.1 状态为Init,开始进行流传输(文件下载),更新当前任务状态为DownLoad,当下载完成后会将这里的下载进度置为100,
这一步是在HttpSource.cpp中实现的,并且在数据库A3_message中将任务状态更新为DOWNLOADING
6.2 在数据库ingest_task中将任务状态更新为DownLoad,检查下载状态,当下载状态为DownLoad,并且进度为100的时候
会将这里的任务state置为Transcode
6.3 状态为Transcode的时候,会开始进行转码,是否需要转码则根据传入的这个transferCoder="flase"字段进行判断
6.4 转码成功则将当前任务状态设置为indexing,生成索引文件,注意MP4只能创建top索引文件
6.5 创建索引文件:
1.调用这个函数
bool CIngestProcedure::AVDealWith( const char * szFileName, int& ierrcode)
szFileName = "/diskb/dxb/ci_dist/example_0000000011_800", ierrcode = 0
主要分析TS文件的索引文件生成规则
1.1在生成索引文件之前会加载动态库m_vMediaIndxLib = LoadLib("streamres",true,true);
1.2调用streamres.so中这个LoadStreamIndexCreator函数返回一个IStreamIndexCreator*类型指针
1.3调用打开方法 pSI = m_pIStreamIndexCreator->Open(m_eIdxVer),返回一个IStreamIndex*类型指针
1.4 填充FileStreamParam这个结构体,其中包括:
1.4.1文件名filename = "/diskb/dxb/ci_dist/example_0000000011_800"
1.4.2媒资assetId = 
1.4.3提供商providerId = 
1.4.4如果是NGOD的话还会生成szScale倍速文件
1.4.5码率byteRate
1.4.6二级m3u8文件分片time_slice
1.4.7是否将索引保存到索引文件save_items_to_indexfile
1.4.8每次写多少条索引,once_write_items=0 时在解析完整个文件后再一次性写入所有索引
1.4.9最多解析数据大小,用在快速获取流的关键信息,此时不必分析整个流文件max_parse_data_size=0 时解析完整个流文件
主要填充的信息就是上面的,具体情况可以在StreamIndex.h中看到
1.5 pSI->ConstructIndexFromFileStream(pFSPBuf)这个函数的调用将会真正开始分析TS文件,并且构造索引表生成top文件。
1.6分析构造完索引表之后,再读取里面的信息出来:容器 TS/MP4 、最大码率、时长、视频格式、音频格式等,这是分析的结果,
分析的算法在streamres.so中实现。
2.生成了top文件后,就开始解析这个文件了,调用这个函数
topfilename = "/diskb/dxb/ci_dist/example_0000000010_800.top"
ParserSingleTopFile(topfilename)
2.1首先从topfilename中把内容全部读取出来
2.2然后将top文件转化为m3u8文件
分析一下m3u8文件的生成规则
2.2.1利用cTopIndex->GetGopInf(fMap);函数将top文件的Packet信息读取出来,全部存放到fMap中
2.2.2然后对fMap里面读取到的东西进行切片处理,在这里是按照10s一个进行切片的,值得注意的是:
这里的切片并不是准确的说:每一个切片的长度其实是大于10s的,而m3u8文件中有的会显示11s的切片
实际上这里是没有11s的切片的,只是二者做了一个互补。
2.2.3切片处理获取切片长度实际上是(itr->second.lStartTime - timeflag)/90/1000实现的。
6.6假如创建索引文件成功,则会进入注入流程,将state设置为ingest_TS,注意:虽然文件是注入到cl中去的,但是这个注入消息是发送到cpm中的
6.6.1所谓注入一个媒资文件,就是把该媒资生成m3u8文件、top文件、源文件一起发送,发送的消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">	ReqMsg  =
POST /TransferContent HTTP/1.1
User-Agent: CIServer
Host: 172.30.26.246:8880
Content-Type: text/xml
Content-Length: 314
Date: 2014-10-15 11:09:08
Connection: close


<?xml version ="1.0" encoding="utf-8"?>
<TransferContent fileName="example_0000000015_800.top" transferBitRate="1500000" responseURL="http://172.30.26.246:8886/" startNext="false">
	<Input sourceURL="http://172.30.26.246:9999/diskb/dxb/ci_dist/example_0000000015_800.top" userName="ci" password="ci"/>
</TransferContent></span></span>


6.7一直有线程在监听cl/cg的消息,cl/cg会隔一段时间通过cpm向ci上报注入的进度,该功能是在
ErrCode TransferStatusHandle ::ParseReqPara(const std::string& strReqData)中实现的


三、cpm
1.cpm会开启线程专门用于监听ci发过来的消息,如果是注入消息的话,格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">POST /TransferContent HTTP/1.1
User-Agent: CIServer
Host: 172.30.26.246:8880
Content-Type: text/xml
Content-Length: 314
Date: 2014-10-16 09:35:02
Connection: close


<?xml version ="1.0" encoding="utf-8"?>
<TransferContent fileName="example_0000000017_800.top" transferBitRate="1500000" responseURL="http://172.30.26.246:8886/" startNext="false">
	<Input sourceURL="http://172.30.26.246:9999/diskb/dxb/ci_dist/example_0000000017_800.top" userName="ci" password="ci"/>
</TransferContent> </span></span>


解析完消息之后,还需要将解析出来的消息,组成一条记录插入到mysql数据库中去,该数据库会携带TransferContent命令字的字段消息
2.而任务注入线程中,XHandleTask()这个函数会一直向数据库查询是否有带TransferContent的任务,没有查询到则直接返回,进行下一轮询
而一旦收到ci发来的消息,从mysql中获取到任务
2.1获取到任务后,首先会获取到当前的mysql数据库的策略,函数是GetStrategyFromDb,这个函数获取的是StrategyType=0的数据库策略
获取数据库策略主要是用于获取到区域码的列表stratagy_areaCodeList、策略strategy以及状态
下面结合代码来分析具体是如何实现选出最优cl节点的
2.1.1首先判断是否指定了cl-cg节点,如果指定了则根据节点名直接从node_info表中获取节点信息
2.2.2如果没有指定的话,就按照数据库中的策略来选:
当strategy=0的时候,按照空间大小来选cl-cg
当strategy=1的时候,按照所有区域来选cl-cg
当strategy=2的时候,按照指定的一些区域来选cl-cg
其实按区域来选择cl-cg也都是建立在空间来选的基础之上的
值得注意的是如果按照空间大小来选cl-cg,那么已经被选作分发的节点,将不能用来注入,这是合理的。
3.获取到cl-cg节点之后,也就是选择到合适的cl-cg之后,cpm就开始把任务发送到cl-cg去,让cl到ci那里去下载
举例如下:
假如现在,我们选择到了5个cl节点,那么这个时候就会把一个消息同时发送到5个cl上去,让这5个cl都从ci那里去下载文件。
4.一旦发送全部成功,那么就会把这些任务插入到分发列表中,并且会更新task_list数据库
5.现在有一个问题,刚刚一直提到从node_info中选择cl-cg,那个这个cl-cg的节点是如何来的呢?
每一个cl-cg里面都会配置cpm的地址,当cl-cg启动之后,就会向cpm进行注册,那么在cpm中实现接收
注册消息的类就是
GetStorageInfoHandle和KeepAliveHandle
实际上,真正步骤应该是这样的:
5.1.cl-cg向cpm注册cl节点,通过心跳上报
5.2.cpm获取到节点之后,通过节点的ip地址向cl-cg发送获取存储信息,这个步骤是在CSpaceManager这个类中实现的
6.cpm还会向ci上报注入任务的进度,这个功能是通过CReportManager这个类实现的
7.还有一个比较重要的就是分发这个消息,分发到cg之前,同样需要注册cg节点
注册cg节点的功能是通过CPMRegister这个类来实现的
关于文件的分发,这个函数没有看明白: CDistributeThread::CreateDistributeTask
8.cpm中有这样一个类TransferStatusHandle,这个类是用于解析cl上报的状态消息
关于cpm部分就到这里结束
四、cl
1.开启缓存
2.开启会话管理
3.开启热点统计
3.1所谓热点统计,就是指统计点播的时候,播放的一些热点文件,具体步骤如下:
3.1.1热点统计是通过读取nginx的日志文件来实现的,首先调用函数access->AccessDowork
3.1.2读取access_log中的内容,一行一行解析日志中的内容从而获取到每个文件对应的点播次数,节目名存放到m_lstHotVod,
3.1.3热点统计的最大数量和间隔时间在配置文件中配置
3.1.4热点统计的结果会发送给cpm,消息格式为
<span style="font-size:14px;"><span style="font-size:12px;">	<?xml version="1.0" encoding="UTF-8"?>
	<ManualDistribute>
	<FileName>example_CDNMS00001</FileName>
	<NodeName>cl_cdnms</NodeName>
	<NodeAddr>172.20.15.10:55001</NodeAddr>
	</ManualDistribute></span></span>


4.在上面已经提到cl会通过心跳向cpm上报节点信息,那么这个功能就是通过CKeepAlive这个类实现的,这个类实现两部分功能
4.1第一个是注册节点信息,上报的消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">ReqMsg  =
POST /CPMRegister HTTP/1.1
User-Agent: cl_246_dxb
Host: 172.30.26.246:8880
Content-Type: text/xml
Content-Length: 191
Date: 2014-10-11 18:17:23
Connection: close


<?xml version ="1.0" encoding="UTF-8"?>
<CPMRegister nodeName="cl_246_dxb" nodeType="1" areaCode="11.11" nodeIP="172.30.26.246" nodePort="8882" storageSize="1228800000" freeSize="1223495408"/>
</span></span>

4.2节点信息只会注册一次,第二个就是保持心跳,这里设置的心跳是20s,消息格式如下
<span style="font-size:14px;"><span style="font-size:12px;">ReqMsg  =
POST /KeepAlive HTTP/1.1
User-Agent: cl_246_dxb
Host: 172.30.26.246:8880
Content-Type: text/xml
Content-Length: 73
Date: 2014-10-11 18:17:43
Connection: close


<?xml version ="1.0" encoding="UTF-8"?><KeepAlive nodeName="cl_246_dxb"/>
</span></span>


5.之前说到ci向cl注入,是经过cpm来转发消息的,那么cl将接收来自于cpm的注入消息,消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">POST /TransferContent HTTP/1.1
User-Agent: cpm_server
Host: 172.30.26.246:8882
Content-Type: text/xml
Content-Length: 317   
Date: 2014-10-16 09:35:03
Connection: close


<?xml version ="1.0" encoding="utf-8"?>
<TransferContent fileName="example_0000000017_800.m3u8" transferBitRate="50000000" responseURL="http://172.30.26.246:8880/" startNext="false">
	<Input sourceURL="http://172.30.26.246:9999/diskb/dxb/ci_dist/example_0000000017_800.m3u8" userName="ci" password="ci"/>
</TransferContent>   </span></span>


6.cl对该消息进行解析,解析的函数是A3Protocol::ParseTransferContentReqPara()
解析完后,就会往数据库中添加一个任务,记录了cpm传过来的消息
7假如发了取消任务,那么就停止该任务的注入,并且删除该任务;当cpm向cl发了一个消息的话,那么cl就应该创建一个A3的任务了,
该任务会开始下载文件,当下载完成之后,会更新状态到数据库,同时,会不断的检查下载的状态,当检查到下载完成后就,也会把任务状态
更新到数据库中去。
8值得注意的是当下载文件完成更新到数据库后,就会把下载完成的进度上报到cpm,上报的消息格式如下:
<span style="font-size:14px;"><span style="font-size:12px;">ReqMsg  =
POST /TransferStatus HTTP/1.1
User-Agent: CL
Host: 172.30.26.246:8880
Content-Type: text/xml
Content-Length: 155
Date: 2014-10-13 11:09:02
Connection: close


<?xml version ="1.0" encoding="UTF-8"?>
<TransferStatus nodeName="cl_246_dxb" fileName="example_0000000000003_800.top" state="Transfer" percentComplete="0"/></span></span>


9.MD5是在http下载的时候生成的,具体在httpsource.cpp中实现
<==================================================================================================================>
以下知识介绍如何加载动态库:
dlopen
基本定义
功能:打开一个动态链接库 
包含头文件: #include <dlfcn.h> 
函数定义: void * dlopen( const char * pathname, int mode ); 
函数描述: 
在dlopen的()函数以指定模式打开指定的动态连接库文件,并返回一个句柄给调用进程。使用dlclose()来卸载打开的库。 
mode:分为这两种 
RTLD_LAZY 暂缓决定,等有需要时再解出符号 
RTLD_NOW 立即决定,返回前解除所有未决定的符号。 
RTLD_LOCAL 
RTLD_GLOBAL 允许导出符号 
RTLD_GROUP 
RTLD_WORLD 
返回值: 
打开错误返回NULL 
成功,返回库引用 
编译时候要加入 -ldl (指定dl库) 
dlsym()
功能:根据动态链接库操作句柄与符号,返回符号对应的地址。
包含头文件:#include <dlfcn.h>
函数定义:void*dlsym(void* handle,const char* symbol)
函数描述:dlsym根据动态链接库操作句柄(handle)与符号(symbol),返回符号对应的地址。使用这个函数不但可以获取函数地址,也可以获取变量地址。
比如,假设在so中定义了一个void mytest()函数,那在使用so时先声明一个函数指针:void (*pMytest)(),然后使用dlsym函数将函数指针pMytest指向
mytest函数,pMytest = (void (*)())dlsym(pHandle, "mytest");


handle是由dlopen打开动态链接库后返回的指针,symbol就是要求获取的函数或全局变量的名称。


dlclose()
dlclose用于关闭指定句柄的动态链接库,只有当此动态链接库的使用计数为0时,才会真正被系统卸载。
<==================================================================================================================>
1.调用这个函数
bool CIngestProcedure::AVDealWith( const char * szFileName, int& ierrcode)
szFileName = "/diskb/dxb/ci_dist/example_0000000011_800", ierrcode = 0
主要分析TS文件的索引文件生成规则
1.1在生成索引文件之前会加载动态库m_vMediaIndxLib = LoadLib("streamres",true,true);
1.2调用streamres.so中这个LoadStreamIndexCreator函数返回一个IStreamIndexCreator*类型指针
1.3调用打开方法 pSI = m_pIStreamIndexCreator->Open(m_eIdxVer),返回一个IStreamIndex*类型指针
1.4 填充FileStreamParam这个结构体,其中包括:
1.4.1文件名filename = "/diskb/dxb/ci_dist/example_0000000011_800"
1.4.2媒资assetId = 
1.4.3提供商providerId = 
1.4.4如果是NGOD的话还会生成szScale倍速文件
1.4.5码率byteRate
1.4.6二级m3u8文件分片time_slice
1.4.7是否将索引保存到索引文件save_items_to_indexfile
1.4.8每次写多少条索引,once_write_items=0 时在解析完整个文件后再一次性写入所有索引
1.4.9最多解析数据大小,用在快速获取流的关键信息,此时不必分析整个流文件max_parse_data_size=0 时解析完整个流文件
主要填充的信息就是上面的,具体情况可以在StreamIndex.h中看到
1.5 pSI->ConstructIndexFromFileStream(pFSPBuf)这个函数的调用将会真正开始分析TS文件,并且构造索引表生成top文件。
1.6分析构造完索引表之后,再读取里面的信息出来:容器 TS/MP4 、最大码率、时长、视频格式、音频格式等,这是分析的结果,
分析的算法在streamres.so中实现。
2.生成了top文件后,就开始解析这个文件了,调用这个函数
topfilename = "/diskb/dxb/ci_dist/example_0000000010_800.top"
ParserSingleTopFile(topfilename)
2.1首先从topfilename中把内容全部读取出来
2.2然后将top文件转化为m3u8文件
分析一下m3u8文件的生成规则
2.2.1利用cTopIndex->GetGopInf(fMap);函数将top文件的Packet信息读取出来,全部存放到fMap中
2.2.2然后对fMap里面读取到的东西进行切片处理,在这里是按照10s一个进行切片的,值得注意的是:
这里的切片并不是准确的说:每一个切片的长度其实是大于10s的,而m3u8文件中有的会显示11s的切片
实际上这里是没有11s的切片的,只是二者做了一个互补。
2.2.3切片处理获取切片长度实际上是(itr->second.lStartTime - timeflag)/90/1000实现的。
<==================================================================================================================>
几个函数
atoi
dlopen
SGetExePath

近期工作总结