首页 > 代码库 > 数据汇总计算和分析的反思
数据汇总计算和分析的反思
以下内容,都经过本人实践验证过。
若转发,请在标题上标记[转],并注明原文链接:http://www.cnblogs.com/robinjava77/p/6285747.html,作者名称:robin。
并在文章首行附上本段话。否则,作者保留追究的权利。
术语定义:
1.片:本周、本月、本年、近两月、近三月、近半年、近一年和至今八个维度
诉求:基于**年的日数据,进行计算汇总,分别以本周、本月、本年、近两月、近三月、近半年、近一年和至今八个维度进行统计精准性修复,时间消耗越低越好。
场景:
1.日数据是每日都会进行各类综合计算,形成的业务基础数据。
2.每日只会保留当日的片数据。
3.当日片数据是根据当日日数据、昨日片数据和需要去掉的日数据,综合计算而保存下来。
4.日数据,在未发现错误前,都是可靠的。
5.日数据,少量错误,修复少量key的片数据。
6.某日的日数据,出现大量甚至是全量key的数据错误,则需进行全量精准性修复。
应对第六个场景,就出现了我们上文提到的诉求。
两年的基础日数据大约在***亿多条,由于系统现在仍然处于第一代架构:单进程java+oracle的原始架构。无法使用后续升级的架构:①zk+多进程java去中心化分布式计算;②mongo+spark;③hadoop+spark等等来更优地去解决现在的诉求。
因此本文仅针对:单进程java+oracle解决上述诉求来进行说明。如果有同类诉求的场景,最好是在项目架构时,根据实际情况,直接选择更好的架构去进行开发。这样能避免一些让人十分尴尬的场景。本人所负责的项目,启动之初,由于没有业务大神参与,领导只能选择简单易掌控的架构,好让所有开发人员,都能将主要精力投入业务规则的摸索和熟悉。
废话说太多,直接上主菜。
针对这个诉求,自己前前后后花了一定的时间才解决。现在按实践的时间前后顺序,分别是:①集团作战方案;②一锅乱炖方案;③分而治之方案。
所有日数据,都是基于key进行唯一性标识,片的日数据量=key数量*8。
Key的数量仅是***万级。
以下是一些基础类定义和变量说明
1 // List<String> keys集团线程的1万个key 2 //List<Unit> units 片定义数据集合 3 Class Unit{ 4 String name;//片标识 5 Integer startDay;//片起始日期 6 Integer endDay;//片截止日期 7 Integer indexDay;//片的索引日期 8 } 9 Class DayData{ 10 Integer day;//日期 11 Number data;//日数据 12 String key;// 13 }
①集团作战方案:遍历key,将1万个key划为1个集团单位,建立一个集团线程,将这个集团线程加入业务线程池。每个集团线程,仅负责计算本线程所分配的key的片数据。
下面贴出集团线程的核心伪代码:
1 //读取该集团起止key范围所有天的日数据 2 Map<String,List<DayData>> groupDataWarehouse = getDayDataByDBOnSE(startKey,endKey); 3 //key 片 计算汇总临时存储对象 key unit dayData 4 HashBasedTable<String,String,Number> result= HashBasedTable.create(); 5 6 for(String key:keys){//遍历集团所属的key 7 List<DayData> dayDatas = groupDataWarehouse.get(key);//获取该key的所有天的日数据 8 for(DayData daydata: dayDatas){//遍历该key的日数据 9 for(Unit unit:units){//遍历需要计算的片 10 if(daydata.getDay().before(unit.getEndDay()) 11 && daydata.getDay().after(unit.getStartDay())){//该key该日的日数据是否在片的计算范围 12 Number temp = result.get(key,unit.getName());//获取临时存储对象之前存储的结果 13 if(temp == null){ 14 temp = new Number(); 15 } 16 temp = ArithUtil.collect(temp,dayData.getData());//前述结果与该日日数据进行汇总计算 17 result.put(key,unit.getName(),temp);//计算后的数据,保存进临时存储对象 18 } 19 } 20 } 21 } 22 23 saveDataIntoDB(result);//保存数据进数据库
集团线程的好处:
1.将大量的key分散成少量的集团key;
2.少量的key,读取数据库数据量少,内存占用量少;
3.一个集团的数据计算出错,不影响其他的集团key,只需重新执行出错的集团key即可;
4.控制业务线程池同时可执行线程数目,就能降低服务器负载或者提高计算的性能。
但是这种方案实现后,在实测中,被DB强烈要求禁止使用。
因为在日常计算场景里,日数据在数据库存储的分区是按天来进行分区。
但集团线程则是根据key来进行分区,每个集团线程查询该片key的日数据时,需要在数据库查询600多个数据分区。
因此数据库吃不消,数据库的CPU、内存和其他各种指标,都被打得非常高,严重影响其他系统的数据服务。
基于上述原因,DB建议我们按天来获取数据,将每天的数据计算完成后,直接丢弃。基于DB的建议,就出现了第二种方案:一锅乱炖方案。
②一锅乱炖方案:又分为两个阶段:(1)全部数据取出来计算;(2)取单日数据,计算完再获取下一个单日数据。
(1)遍历日期,读取数据,放入Map<String,List<DayData>> dataWarehouse中,然后替代方案①中的getDayDataByDB(startKey,endKey)方法,按照集团作战方案,进行数据计算。
这里的伪代码就不写了,源代码更不想贴。总而言之,这是一个非常糟糕的方案。
第一,所有基础数据,都加载到内存,消耗几十G的内存,放这些临时数据,服务器吃不消;
第二,只能单线程同步执行,无法异步执行,即使异步执行,也需对dataWarehouse用关键字“synchronized”上锁,导致效率低下。如果给每个key一个单独的队列来进行处理,整个代码实现冗长又难看;
第三,可能公司不缺服务器,DB说几十G的内存搞不定,就申请几百G的服务器呗,作为一个有追求的程序员,真是欲哭无泪。
(2)按日期顺序,创建单日读数据线程,放入读数据线程池。单日读数据线程读取指定日所有key的日数据,写入LinkedBlockingQueue队列中。另外创建一个单独的计算线程,从队列中获取单日的数据,遍历日数据,按key保存中间结果。
这个方案,将其转化为生产者-消费者模型,注意:读数据线程和计算线程不能放在同一个工作线程池内,否则容易造成死锁。
LinkedBlockingQueue(10) queue;这个队列是支持阻塞方式。
List<DayData> data
生产者:put(data);
消费者:data = http://www.mamicode.com/take();
放在同一个线程池内,线程池内所有的工作线程被生产者占据,队列被塞满后,所有工作线程都阻塞在put方法,消费者无法获取工作资源。
这是最容易犯的一个错误之一。
但这种方案有明显的弊端:由于受按key汇总计算值的限制,消费者只能一个,即便生产者是多线程,队列设置得足够大,因为消费者的效率低下,导致所需时间无法预估,经实验,是达不到预期最低要求的。
简单贴下伪代码:
1 BlockingQueue<Vector<DayData>> queue = new LinkedBlockingQueue(100);//阻塞队列 2 3 生产者(多线程) 4 Vector<DayData> dayData =http://www.mamicode.com/ getDayDataByDBOnDay(day); 5 queue.put(dayData); 6 7 消费者(单线程) 8 Thread cal = new Thread(new Runnable() { 9 @Override 10 public void run() { 11 HashBasedTable<String,String,Number> result= HashBasedTable.create(); 12 int calDayCount = 0; 13 while(calDayCount < totalDayCount){ 14 Vector<DayData> dayDatas = queue.take(); 15 for (DayData dayData:dayDatas){ 16 for (Unit unit:units){ 17 if(daydata.getDay().before(unit.getEndDay()) 18 && daydata.getDay().after(unit.getStartDay())){//该key该日的日数据是否在片的计算范围 19 Number temp = result.get(dayData.getKey(),unit.getName()); 20 if(temp == null){ 21 temp = new Number(); 22 } 23 temp = ArithUtil.collect(temp,dayData.getData());//前述结果与该日日数据进行汇总计算 24 result.put(dayData.getKey(),unit.getName(),temp);//计算后的数据,保存进临时存储对象 25 } 26 } 27 } 28 calDayCount++; 29 } 30 saveDataIntoDB(result);//保存数据进数据库 31 } 32 33 }); 34 cal.start();
生产者线程和消费者线程,都完成相应的任务后,用CountDownLatch downLatch = new CountDownLatch(int num);倒数计数器锁来进行异步等待控制
③分而治之方案:根据这个特定的业务场景,(1)将日数据汇总为月月数据,保存下来;(2)根据各个片,计算片的开始日期月1到startDay前一天的阶段去掉数据;(3)取片起止时间包含的月数据文件和阶段去掉数据,月数据进一步汇总,最后消去阶段去掉数据。
分而治之方案优点:
(1)大功能拆分为三个小功能,各个小功能独立实现,每个小功能可使用多线程快速完成,整体程序开发可控,代码不会太冗长;
(2)各个功能互不影响,可以异步进行,通过CountDownLatch在主线程中,达到异步等待;
(3)基础日数据按月汇总保留文件,下次再次进行修复,无需再次读取,若历史日数据发现有错,则删除错误日的月汇总文件,仅重新生成该日的月汇总文件即可。
下面贴伪代码:
1 getModulo(String key){//key 按Constant.MODULO返回取模的余数 2 //具体实现略 3 } 4 5 void precise(Integer today,List<Unit> units,ExecutorService BOSS_EXEC){ 6 createMonthFile(today,BOSS_EXEC);//创建以月为汇总的中间值文件 7 createUnitRemoveFile(units,BOSS_EXEC);//创建本周、近两月、近三月、近半年、近一年需被去掉的中间值文件 8 calPreciseAll(today,units,BOSS_EXEC);//基于月汇总和除掉的收益率中间值文件进行精确的片收益率计算 9 }
1 void createMonthFile(Integer today,ExecutorService BOSS_EXEC){ 2 Integer localMonth = getMonth(today); 3 String rootFilePath = getPath(); 4 String monthFilePath = FileUtil.getCompleteFilePath(rootFilePath,localMonth+"");//获取今月汇总文件夹 5 FileUtil.deleteFile(monthFilePath);//删除今月汇总文件 6 Map<Integer,List<Integer>> months = getDayByMonth(today);//按月分散每个月包含的日期 7 CountDownLatch monthFileDownLatch = new CountDownLatch(months.size()); 8 for(Map.Entry<Integer,List<Integer>> entry:months.entrySet()){ 9 Runnable run = createMonthFileRun(entry.getValue(),monthFileDownLatch,entry.getKey(),rootFilePath); 10 BOSS_EXEC.submit(run); 11 } 12 monthFileDownLatch.await();//月汇总数据功能完成后继续向下执行,否则阻塞 13 }
1 Runnable createMonthFileRun(final List<Integer> todays,final CountDownLatch downLatch,final Integer month,final String rootFilePath){ 2 Runnable run = new Runnable() { 3 @Override 4 public void run() { 5 try{ 6 String monthFilePath = FileUtil.getCompleteFilePath(rootFilePath,month+""); 7 if(!FileUtil.isExist(rootFilePath) || !FileUtil.isExist(monthFilePath)){//根目录不存在或者月汇总文件不存在 则进行汇总计算 8 Map<String,Number> monthData = http://www.mamicode.com/new HashMap<>(); 9 //以key 为标记,汇总各个key的月中间值数据 10 for (Integer day:todays){ 11 List<DayData> dayDatas = getDayDataByDBOnDay(day); 12 for (DayData dayData:dayDatas){ 13 String key = dayData.getKey(); 14 Number temp = monthData.get(key); 15 if(temp == null){ 16 temp = new Number(); 17 } 18 temp = ArithUtil.collect(temp,dayData.getData()); 19 monthData.put(key,temp); 20 } 21 } 22 //将本月各个key的中间值数据,保存为文件 23 for(Map.Entry<String,Number> entry:monthData.entrySet()){ 24 String aimFilePath = FileUtil.getCompleteFilePath(monthFilePath,getModulo(entry.getKey()),FileUtil.txtFileSuffix); 25 File aimFile = new File(aimFilePath); 26 String contentTxt = entry.getKey() + FILE_CONTENT_SEPARATOR + entry.getValue();//每个key的数据,保存为一行,key和汇总数据,用特定符号分隔 27 FileUtil.apppendContentToFileNewLine(aimFile,contentTxt); 28 } 29 } 30 }catch (Exception e){ 31 LOG.error(e); 32 }finally { 33 downLatch.countDown(); 34 } 35 } 36 }; 37 return run; 38 }
1 void createUnitRemoveFile(List<Unit> units,ExecutorService BOSS_EXEC){ 2 CountDownLatch needRemoveDownLatch = new CountDownLatch(units.size()); 3 for (Unit unit:units){ 4 if(isNeedRemove(unit)){//本周、近两月、近三月、近半年、近一年需要保存阶段去掉汇总数据 5 String rootFilePath = getPath(); 6 Runnable run = createUnitRemoveFileRun(rootFilePath,unit,needRemoveDownLatch); 7 BOSS_EXEC.submit(run); 8 }else{ 9 needRemoveDownLatch.countDown(); 10 } 11 } 12 needRemoveDownLatch.await();//阶段去掉数据功能完成后继续向下执行,否则阻塞 13 }
1 Runnable createUnitRemoveFileRun(final String rootFilePath,final Unit unit,final CountDownLatch downLatch){ 2 Runnable run = new Runnable() { 3 @Override 4 public void run() { 5 try{ 6 String removeFilePath = FileUtil.getCompleteFilePath(rootFilePath,unit.getName()); 7 FileUtil.deleteFile(removeFilePath); 8 Integer startDay = getMonthFirstDay(unit.getStartDay()); 9 Integer endDay = getLastDay(unit.getStartDay()); 10 List<Integer> takeOutDays = getDays(startDay,endDay); 11 Map<String,Number> removeData = http://www.mamicode.com/new HashMap<>(); 12 for (Integer day:takeOutDays){ 13 List<DayData> dayDatas = getDayDataByDBOnDay(day); 14 for (DayData dayData:dayDatas){ 15 String key = dayData.getKey(); 16 Number temp = removeData.get(key); 17 if(temp == null){ 18 temp = new Number(); 19 } 20 temp = ArithUtil.collect(temp,dayData.getData()); 21 removeData.put(key, temp); 22 } 23 } 24 for (Map.Entry<String,Number> entry:removeData.entrySet()){ 25 String aimFilePath = FileUtil.getCompleteFilePath(removeFilePath,(getModulo(entry.getKey())),FileUtil.txtFileSuffix); 26 File aimFile = new File(aimFilePath); 27 String contentTxt = entry.getKey() + FILE_CONTENT_SEPARATOR + entry.getValue(); 28 FileUtil.apppendContentToFileNewLine(aimFile,contentTxt); 29 } 30 }catch (Exception e){ 31 LOG.error(e); 32 }finally { 33 downLatch.countDown(); 34 } 35 } 36 }; 37 return run; 38 }
1 void calPreciseAll(Integer today,List<Unit> units,ExecutorService BOSS_EXEC){ 2 List<String> keys = getKeysOnDay(today); //获取今日需要计算的key范围 3 Map<String,KeyDetail> keyDetails = getKeyDetailOnDay(today);//获取key详情,主要是为了获取key的开始使用日期,使用日期前,key并没有日数据 4 HashBasedTable<String,String,Number> existMap = getExistUnitData(today);//表中存在的片数据 5 Map<String,DayData> todayDataMap = getTodayDataMap(bizDate);//今天日收率 6 Map<Integer,List<String>> keyModuloMap = keyZone(fundIds);//被计算的key取模分片 7 CountDownLatch downLatch = new CountDownLatch(keyModuloMap.size());//倒数计数器锁 8 List<Integer> months = DateUtil.getMonths(START_DAY,today);//统计开始日期到现在日期的各个月份数据 START_DAY 产品统计数据开始日期 9 for(Map.Entry<Integer,List<String>> entry:keyModuloMap.entrySet()){//分片计算 10 Runnable run = calDataSectionRun(keyDetails,today,downLatch,existMap,entry.getValue(),months,entry.getKey(),todayDataMap,units); 11 BOSS_EXEC.submit(run); 12 } 13 downLatch.await(); 14 }
1 Map<Integer,List<Stirng>> keyZone(List<String> keys){ 2 Map<Integer,List<String>> keyModuloMap = new HashMap<>(); 3 for(int i=0;i<Constant.MODULO;i++){ 4 keyModuloMap.put(i,new Vector<String>()); 5 } 6 for(String key:keys){ 7 int remainder = getModulo(key); 8 List<String> temp = keyModuloMap.get(remainder); 9 temp.add(key); 10 } 11 return keyModuloMap; 12 }
1 Runnable calDataSectionRun(final Map<String,KeyDetail> keyDetails,final Integer today,final CountDownLatch downLatch, 2 final HashBasedTable<String,String,Number> existMap,final List<String> keys,final List<Integer> months, 3 final Integer remainder,final Map<String, DayData> todayDataMap,final List<Unit> units){ 4 Runnable run = new Runnable() { 5 @Override 6 public void run() { 7 try{ 8 HashBasedTable<String,Integer,Number> keyMonthData =http://www.mamicode.com/ HashBasedTable.create(); 9 HashBasedTable<String,String,Number> keyRemoveData =http://www.mamicode.com/ HashBasedTable.create(); 10 for (Integer month:months){ 11 readTxtToTable(month,remainder,keyMonthData); 12 } 13 for(Unit unit:units){ 14 if(unit.isNeedRemove()){ 15 readTxtToTable(unit.getName(),remainder,keyRemoveData); 16 } 17 } 18 19 Vector<Section> save = new Vector<>(); 20 Vector<Section> mod = new Vector<>(); 21 for (String key:keys){ 22 Map<Integer,Number> monthMap = keyMonthData.row(key); 23 KeyDetail keyDetail = keyDetails.get(key); 24 if(keyDetail == null){ 25 LOG.error("key[{}],日期[{}]没有详情,请检查",key,today); 26 continue; 27 } 28 for(Unit unit:units){ 29 Number val = new Number(); 30 Integer keyStartDay = keyDetail.getStartDay(); 31 Integer monthStart = keyStartDay>unit.getStartDay()?keyStartDay:unit.getStartDay(); 32 List<Integer> unitMonths = DateUtil.getMonths(monthStart,unitBO.getEndDay()); 33 for(Integer month:unitMonths){ 34 Number monthDouble = monthMap.get(month); 35 val = ArithUtil.collect(val,monthDouble); 36 } 37 Number removeData =http://www.mamicode.com/ keyRemoveData.get(key,unit.getName()); 38 val = ArithUtil.remove(val,removeData); 39 Number todayDayData = http://www.mamicode.com/new Number(); 40 DayData todayDayData =http://www.mamicode.com/ todayDataMap.get(key); 41 if(todayDayData != null && todayDayData.getData() != null){ 42 todayDayData =http://www.mamicode.com/ todayDayData.getData(); 43 }else{ 44 LOG.error("key[{}],片[{}],今日[{}]的值为null或者0,请检查",fundId,unit.getName(),today); 45 } 46 Seciton section = new Section(key,unitBO.getName(),unit.getIndexDay(),ArithUtil.remove(val,FIX_VAL)); 47 Boolean flag = existMap.get(key,unitBO.getName()); 48 if(flag == null || !flag){ 49 save.add(section); 50 }else{ 51 mod.add(section); 52 } 53 } 54 } 55 saveBatchManyThread(save,Constant.batchNum,Constant.maxThreadNum); 56 updateBatchManyThread(mod,Constant.batchNum,Constant.maxThreadNum); 57 }catch (Exception e){ 58 LOG.error("所有key,模为{}的片数据计算出现异常",remainder,e); 59 }finally { 60 downLatch.countDown(); 61 } 62 } 63 }; 64 return run; 65 }
1 <C> void readTxtToTable(C folder,Integer remainder,HashBasedTable<String,C,Number> table){ 2 if(table == null){ 3 table = HashBasedTable.create(); 4 } 5 String rootFilePath = getPath(); 6 String aimFilePath = FileUtil.getCompleteFilePath(rootFilePath,folder+"",remainder+"",FileUtil.txtFileSuffix); 7 List<String> txtContent = FileUtil.readTxtFile(aimFilePath, FileUtil.ENCODE_UTF8); 8 for (String content:txtContent){ 9 String[] arr = content.split(FILE_CONTENT_SEPARATOR); 10 String key = arr[0]; 11 Number val = getNumber(arr[1]);//将字符串转化为Number类型 12 table.put(key,folder,val); 13 } 14 }
以上就是基于原始架构,实现的较多数据的汇总分析。这种架构,迟早是要被淘汰的。目前四个月后,就会升级为mongo+spark架构,届时看看新架构实现这个诉求和现在的方案相比,到底哪里方便了许多。
但是经过这样一步步迭代升级,对大量数据的汇总分析,有一个很好地策略:分而治之,化整为零,逐个击破,确认各自的边界和交互,一步步调试调优独立功能的性能。
之前的程序跑完这个诉求,最少也需要8个小时,现在基于分而治之方案,第一次执行需要50min,第二次及以后执行,只需30min以内。
本文主要对近期做的一点事情,小结。同时,也方便自己日后回顾。若其他小伙伴有什么文中没出现的更好的解决方案,欢迎留言沟通交流。
数据汇总计算和分析的反思