首页 > 代码库 > homework做了些什么?

homework做了些什么?

第一步:get_new_guid_uid_pairs_{$ymd}

参数是时间100上的文件.

那么100上的文件是从哪里来的呢?

我们进入到100机器上,打开root权限下的cron,看到如下内容:

### add by kamilzhou for clickflow system
8 * * * * /data/clickflow/hourly_data/hourlyGetLogDataStore.sh >> /data1/clickflow/hourly_data/get.log &
20 * * * * /data/clickflow/filters/correct-page-id-filter/clickflowlog-filter/bin/hourly_refer_filter.sh

所以我猜测是不是100上的数据是从这两个脚本拉过来的呢?

查看这俩个脚本内容.

 1 #!/bin/sh 2  3 #获取当前脚本所在路径 4 currdir=$(cd "$(dirname "$0")"; pwd) 5  6 #进入脚本所在目录 7 cd $currdir 8  9 log_file_time=`date +%Y%m%d%H -d"-1 hours"`10 log_file_dir=`date +%Y%m%d -d"-1 hours"`11 12 if  [ $# = 1 ]; then13     log_file_time=$114 fi15 16 if  [ $# = 2 ]; then17     log_file_time=$118     log_file_dir=$219 fi20 21 if [ ! -d "/data1/clickflow/by-time/""$log_file_dir" ]; then22     mkdir "/data1/clickflow/by-time/""$log_file_dir"23 fi24 25 log_filename="row_data_""${log_file_time}""*"26 src_log_filename="/data/clickflow/data_today/""${log_filename}"27 28 /usr/bin/expect recv_file.exp 10.206.8.75 36000 $src_log_filename "/data1/clickflow/by-time/""$log_file_dir" kamilzhou Zhou2013@tencent
hourlyGetLogDataStore.sh
 1 #!/bin/sh 2  3 orig_log_dir="/data1/clickflow/by-time/" 4 refer_filter_dir="/data1/clickflow/refer-filtered/" 5  6 refer_filter_bin="/data/clickflow/filters/correct-page-id-filter/clickflowlog-filter/bin/correct-page-id-filter.sh" 7  8 log_file_time=`date +%Y%m%d%H -d"-1 hours"` 9 log_file_dir=`date +%Y%m%d -d"-1 hours"`10 11 if  [ $# = 1 ]; then12     log_file_time=$113 fi14 15 if  [ $# = 2 ]; then16     log_file_time=$117     log_file_dir=$218 fi19 20 src_dir="$orig_log_dir""$log_file_dir""/"21 dest_dir="$refer_filter_dir""$log_file_dir""/"22 23 if [ ! -d "$dest_dir" ]; then24     mkdir "$dest_dir"25 fi26 27 src_files="$src_dir""row_data_""$log_file_time""*"28 29 for file in `ls $src_files`; do30     filename=`basename $file`;31 32     echo $filename33     $refer_filter_bin $file "$dest_dir""$filename"34 done
hourly_refer_filter.sh

hourlyGetLogDataStore.sh这个脚本每小时的8分钟执行,负责把前一个小时的75上的原始数据拉过来.

hourly_refer_filter.sh 这个脚本每小时的20分钟执行,代码里调用了很多其他脚本.目的是负责过滤不合法的数据.

详细了解请参考:

100上的数据过滤

参数是有了,那么输出是什么呢?

 我们来看下这个任务的handler.

  1 /**  2  * get new guid uid pairs and paritition the raw clickflow data        获得新的 guid和uid 的配对   raw是未经加工的意思  3  * useing multi-processes  4  *  5  * @param array $request request of a task  6  * @param boolean &$continued wheater the process should be continued      7  *  8  * @return boolean true when success, otherwise false  9  * @author baronyuan 10  */ 11 function getNewGuidUidPairsNPartitionRawDataSMP($in, $out, &$continued) 12 { 13     {  //参数检查 14     if (!is_array($in)                       15         || !isset($in[‘date‘]) 16         || !isset($in[‘rawdata‘]) 17         || !is_array($out) 18         || !isset($out[‘upairs‘]) 19         || !isset($out[‘rawdata‘]) 20     ) { 21         Logger::err(sprintf( 22             ‘invalid request with in[%s] or out[%s]‘, 23             json_encode($in), json_encode($out) 24         )); 25  26         return false; 27     } 28     } 29  30     $date = trim($in[‘date‘]);   //20140806 31     $remote_dir = trim($in[‘rawdata‘]);   //    sftp://10.191.152.100:36000/data1/clickflow/refer-filtered/20140806/   未经处理的数据(来自100 ) 32     $result_pairs = $out[‘upairs‘];     //      sftp://10.206.8.75:36000/data/clickflow/tmp/guid_uid-map/new/20140806/data //这个应该是临时存放文件夹,因为75上暂时看不见数据 33     $results_rawdata = pattern_range2array($out[‘rawdata‘]);  //   sftp://10.206.8.75:36000/data/clickflow/tmp/rawdata/20140806/by-guid/data_0   ~   sftp://10.206.8.75:36000/data/clickflow/tmp/rawdata/20140806/by-guid/data_99 34  35     $unikey = get_unikey();   //33772 36  37     $resource_prefix = 38         HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_input_‘;       //         /data/tmp/clickflow/homework/tasks.unikey_input_  这个地址应该在75和85 上都有 39     $output_prefix = 40         HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_output_‘; 41  42     if (RemoteResourceAccessExists($remote_dir . ‘row_data_‘ . $date . ‘000‘)) {   //正常情况下 ,这个条件是 成立的  row_data_20140806000 43         $remote_resource_names = getSplitedRawDataNames($date);      //这个地方20140806000~20140806235 44         $tasks_input = array(); 45  46         foreach ($remote_resource_names as $key => $name) { 47             $resource_local = $resource_prefix . $key;               // HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_input_0‘ 对应 row_data_20140806000 48             $remote = $remote_dir . ‘row_data_‘ . $name;            //sftp://10.191.152.100:36000/data1/clickflow/refer-filtered/20140806/row_data_20140806235 49  50             if (!RemoteResourceAccessExists($remote)) {                  // 查看100上原始文件  是否存在 51                 Logger::err(sprintf(‘%s not found‘, $remote)); 52                 return false; 53             } 54  55             $get_result = RemoteResourceAccessGetUntil(        //把100上的文件拿过来,放在本地 56                 $remote_dir . ‘row_data_‘ . $name, 57                 $resource_local,                                //存放在本地的临时文件 58                 $continued 59             ); 60  61             if (!$continued) { 62                 Logger::err(‘stop!‘); 63                 return false; 64             } 65  66             FileAutoCleaner::add($resource_local); 67             $tasks_input[] = $resource_local;                           //把拿过来的文件 存到一个数组里面 68         } 69     } else { 70         $remote_resource_name = $remote_dir . ‘row_data_‘ . $date; 71  72         if (!RemoteResourceAccessExists($remote_resource_name)) { 73             Logger::err(sprintf(‘%s not found‘, $remote_resource_name)); 74             return false; 75         } 76  77         $resource_local = $resource_prefix . ‘inall‘; 78         $get_result = RemoteResourceAccessGetUntil( 79             $remote_resource_name, 80             $resource_local, 81             $continued 82         ); 83  84         if (!$continued) { 85             Logger::err(‘stop!‘); 86             return false; 87         } 88  89         FileAutoCleaner::add($resource_local); 90  91         if (false === split_file($resource_local, 144, $resource_prefix)) { 92             Logger::err(sprintf( 93                 ‘failed to split the file[%s]‘, $resource_prefix 94             )); 95  96             return false; 97         } 98  99         for ($i = 0; $i < 144; ++$i) {100             $tasks_input[] = $resource_prefix . $i;101         }102 103         FileAutoCleaner::add($tasks_input);104     }105 106     $tasks_output = array(                            //输出数组???、、。。。107         $output_prefix . ‘pairs.output‘,108         pattern_range2array(109             $output_prefix . ‘rawdata_[0-99]‘110         )111     );112 113     file_put_contents($tasks_output[0], ‘‘);114 115     foreach ($tasks_output[1] as $task_rawdata_output) {116         file_put_contents($task_rawdata_output, ‘‘);117     }118 119     FileAutoCleaner::add($tasks_output[0], $tasks_output[1]);120 121     $num_of_processes = 40;122     $num_of_tasks = count($tasks_input);    // 从100上过来多少个文件  就有多少子任务123     $counter = -1;124 125     $task_divider = new TaskDivider();126 127     while ($num_of_tasks > 0) {128         if ($num_of_tasks < $num_of_processes) {129             $num_of_processes = $num_of_tasks;130         }131 132         $task_divider->clear();133 134         for ($i = 0; $i < $num_of_processes; ++ $i) {135             ++ $counter;136 137             $task_divider->add(138                 ‘getNewGuidUidPairsNPartitionRawDataTask‘,139                 array(140                     $tasks_input[$counter],141                     $tasks_output[0],142                     $tasks_output[1]143                 )144             );145         }146 147         $run_result = $task_divider->run($continued);148 149         if (false === $run_result) {150             Logger::err(‘fail to run tasks‘);151             return false;152         } elseif ($run_result !== 0) {153             Logger::err(sprintf(‘fail to run %d tasks‘, $run_result));154             return false;155         } else {156             // nothing to do ...157         }158 159         $num_of_tasks -= $num_of_processes;160     }161 162     $uniq_result = uniq_file($tasks_output[0]);163 164     if (false === $uniq_result) {165         Logger::err(sprintf(‘fail to uniq the file[%s]‘, $tasks_output[0]));166         return false;167     }168 169     $put_result = RemoteResourceAccessPutUntil(170         $tasks_output[0],171         $result_pairs,172         $continued173     );174 175     if (!$continued) {176         Logger::err(‘stop!‘);177         return false;178     }179 180     foreach ($tasks_output[1] as $key => $task_rawdata_output) {181         $local_path = $task_rawdata_output;182         $remote_path = $results_rawdata[$key];183 184         $put_result = RemoteResourceAccessPutUntil(185             $local_path,186             $remote_path,187             $continued188         );189 190         if (!$continued) {191             Logger::err(‘stop!‘);192             return false;193         }194     }195 196     return true;197 }
getNewGuidUidPairsNPartitionRawDataSMP
  1 /**  2  * get new guid uid pairs sub task for a single process to handle  3  *  4  * @param string $input_file file to handle  5  * @param string $output_pairs_file output of pairs  6  * @param array $output_rawdata_files output of rawdata partitioned  7  *  8  * @return boolean true when success, otherwise failed  9  * @author baronyuan 10  */ 11 function getNewGuidUidPairsNPartitionRawDataTask( 12     $input_file, 13     $output_pairs_file, 14     $output_rawdata_files 15 ) { 16     global $logger; 17     $logger->uninit(); 18     $logger = new Logger(HOMEWORK_WORKER_LOG_NAME); 19  20     $fp_input = @fopen($input_file, ‘r‘);     //这个地方打开的应该是 已经从100上拿来的 存在本地的临时文件 21  22     if (false === $fp_input) { 23         Logger::err(sprintf(‘failed to open file[%s]‘, $input_file)); 24         return false; 25     } 26  27     $upairs = array(); 28     $fps_rawdata_output = array(); 29  30     for ($i = 0; $i < 100; ++ $i) { 31         $fp_rawdata_output = 32             $fps_rawdata_output[] =                           //这个地方打开了100个文件,存在一个数组里,准备往里面写 33                 @fopen($output_rawdata_files[$i], ‘a‘);   // a 以附加的方式打开只写文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾,即文件原先的内容会被保留。 34  35         if (false === $fp_rawdata_output) { 36             Logger::err( 37                 sprintf(‘fail to open file[%s]‘, $output_rawdata_files[$i]) 38             ); 39  40             return false; 41         } 42     } 43  44     $special_key = -1; 45  46     while (($line = fgets($fp_input, 10240)) !== false) { 47         $cells = explode("\t", $line); 48  49         if (!isset($cells[LOG_FIELD_MAX_IDX]) 50             || $cells[LOG_FIELD_IDX_LOG_TYPE] != LOG_TYPE_PV 51             || $cells[LOG_FIELD_IDX_PAGE_ID] < 0 52         ) { 53             continue; 54         } 55  56         // iframe 57         $page_id = (int)$cells[LOG_FIELD_IDX_PAGE_ID]; 58         $wh_id = (int)$cells[LOG_FIELD_IDX_WH_ID]; 59  60         if ($wh_id != 1971) { 61             if ((12376 == $page_id) || (1237017 == $page_id) 62                 || (63786 == $page_id) || (6378017 == $page_id) 63                 || (63796 == $page_id) || (6379017 == $page_id) 64                 || (63806 == $page_id) || (6380017 == $page_id) 65                 || (63816 == $page_id) || (6381017 == $page_id) 66                 || (63826 == $page_id) || (6382017 == $page_id) 67                 || (5020 == $page_id) || (11718320 == $page_id) 68                 || (!(int)$cells[LOG_FIELD_IDX_PAGE_LEVEL] && 69                 !preg_match(‘#buy\.(?:51buy|yixun)\.com#‘, $cells[LOG_FIELD_IDX_PAGE_URL])) 70             ) { 71                 continue; 72             } 73         } 74  75         $uid = (int)$cells[LOG_FIELD_IDX_USER_ID]; 76         $guid = trim($cells[LOG_FIELD_IDX_GUID]); 77  78         $hash_key = null; 79  80         // tmp hack 81         if (!in_array($wh_id, array(1, 1001, 2001, 3001, 4001, 5001, 1969, 1971))) { 82             continue; 83         } 84  85         if (1970 == $wh_id) { 86             $hash_key = $special_key = (++ $special_key) % 100; 87         } else { 88             $hash_key = crc32($guid) % 100; 89  90             if ($uid && $guid && $guid != ‘null‘ && $guid != ‘Unknown‘) { 91                 $upairs[$guid] = $uid; 92             } 93         } 94  95         if ($hash_key < 0 || $hash_key > 99) { 96             Logger::err(sprintf(‘invalid hash key %d‘, $hash_key)); 97         } 98  99         $write_result = @fwrite($fps_rawdata_output[$hash_key], $line);100 101         if (false === $write_result) {102             Logger::err(‘failed to write a line into a file‘);103             return false;104         }105     }106 107     if (!feof($fp_input)) {108         Logger::err(sprintf(‘failed to open file[%s]‘, $input_file));109         return false;110     }111 112     fclose($fp_input);113 114     foreach ($fps_rawdata_output as $fp_rawdata_output) {115         @fclose($fp_rawdata_output);116     }117 118     $fp_pairs_output = @fopen($output_pairs_file, ‘a‘);119 120     if (false === $fp_pairs_output) {121         return false;122     }123 124     foreach ($upairs as $guid => $uid) {125         $write_result = fwrite($fp_pairs_output, "{$guid}\t{$uid}\n");126 127         if (false === $write_result) {128             Logger:err(129                 sprintf(130                     ‘failed to write a line into file[%s]‘,131                     $output_pairs_file132                 )133             );134 135             return false;136         }137     }138 139     fclose($fp_pairs_output);140     return true;141 }
getNewGuidUidPairsNPartitionRawDataTask

 

homework做了些什么?