首页 > 代码库 > homework做了些什么?
homework做了些什么?
第一步:get_new_guid_uid_pairs_{$ymd}
参数是时间和100上的文件.
那么100上的文件是从哪里来的呢?
我们进入到100机器上,打开root权限下的cron,看到如下内容:
### add by kamilzhou for clickflow system
8 * * * * /data/clickflow/hourly_data/hourlyGetLogDataStore.sh >> /data1/clickflow/hourly_data/get.log &
20 * * * * /data/clickflow/filters/correct-page-id-filter/clickflowlog-filter/bin/hourly_refer_filter.sh
所以我猜测是不是100上的数据是从这两个脚本拉过来的呢?
查看这俩个脚本内容.
1 #!/bin/sh 2 3 #获取当前脚本所在路径 4 currdir=$(cd "$(dirname "$0")"; pwd) 5 6 #进入脚本所在目录 7 cd $currdir 8 9 log_file_time=`date +%Y%m%d%H -d"-1 hours"`10 log_file_dir=`date +%Y%m%d -d"-1 hours"`11 12 if [ $# = 1 ]; then13 log_file_time=$114 fi15 16 if [ $# = 2 ]; then17 log_file_time=$118 log_file_dir=$219 fi20 21 if [ ! -d "/data1/clickflow/by-time/""$log_file_dir" ]; then22 mkdir "/data1/clickflow/by-time/""$log_file_dir"23 fi24 25 log_filename="row_data_""${log_file_time}""*"26 src_log_filename="/data/clickflow/data_today/""${log_filename}"27 28 /usr/bin/expect recv_file.exp 10.206.8.75 36000 $src_log_filename "/data1/clickflow/by-time/""$log_file_dir" kamilzhou Zhou2013@tencent
1 #!/bin/sh 2 3 orig_log_dir="/data1/clickflow/by-time/" 4 refer_filter_dir="/data1/clickflow/refer-filtered/" 5 6 refer_filter_bin="/data/clickflow/filters/correct-page-id-filter/clickflowlog-filter/bin/correct-page-id-filter.sh" 7 8 log_file_time=`date +%Y%m%d%H -d"-1 hours"` 9 log_file_dir=`date +%Y%m%d -d"-1 hours"`10 11 if [ $# = 1 ]; then12 log_file_time=$113 fi14 15 if [ $# = 2 ]; then16 log_file_time=$117 log_file_dir=$218 fi19 20 src_dir="$orig_log_dir""$log_file_dir""/"21 dest_dir="$refer_filter_dir""$log_file_dir""/"22 23 if [ ! -d "$dest_dir" ]; then24 mkdir "$dest_dir"25 fi26 27 src_files="$src_dir""row_data_""$log_file_time""*"28 29 for file in `ls $src_files`; do30 filename=`basename $file`;31 32 echo $filename33 $refer_filter_bin $file "$dest_dir""$filename"34 done
hourlyGetLogDataStore.sh这个脚本每小时的8分钟执行,负责把前一个小时的75上的原始数据拉过来.
hourly_refer_filter.sh 这个脚本每小时的20分钟执行,代码里调用了很多其他脚本.目的是负责过滤不合法的数据.
详细了解请参考:
100上的数据过滤
参数是有了,那么输出是什么呢?
我们来看下这个任务的handler.
1 /** 2 * get new guid uid pairs and paritition the raw clickflow data 获得新的 guid和uid 的配对 raw是未经加工的意思 3 * useing multi-processes 4 * 5 * @param array $request request of a task 6 * @param boolean &$continued wheater the process should be continued 7 * 8 * @return boolean true when success, otherwise false 9 * @author baronyuan 10 */ 11 function getNewGuidUidPairsNPartitionRawDataSMP($in, $out, &$continued) 12 { 13 { //参数检查 14 if (!is_array($in) 15 || !isset($in[‘date‘]) 16 || !isset($in[‘rawdata‘]) 17 || !is_array($out) 18 || !isset($out[‘upairs‘]) 19 || !isset($out[‘rawdata‘]) 20 ) { 21 Logger::err(sprintf( 22 ‘invalid request with in[%s] or out[%s]‘, 23 json_encode($in), json_encode($out) 24 )); 25 26 return false; 27 } 28 } 29 30 $date = trim($in[‘date‘]); //20140806 31 $remote_dir = trim($in[‘rawdata‘]); // sftp://10.191.152.100:36000/data1/clickflow/refer-filtered/20140806/ 未经处理的数据(来自100 ) 32 $result_pairs = $out[‘upairs‘]; // sftp://10.206.8.75:36000/data/clickflow/tmp/guid_uid-map/new/20140806/data //这个应该是临时存放文件夹,因为75上暂时看不见数据 33 $results_rawdata = pattern_range2array($out[‘rawdata‘]); // sftp://10.206.8.75:36000/data/clickflow/tmp/rawdata/20140806/by-guid/data_0 ~ sftp://10.206.8.75:36000/data/clickflow/tmp/rawdata/20140806/by-guid/data_99 34 35 $unikey = get_unikey(); //33772 36 37 $resource_prefix = 38 HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_input_‘; // /data/tmp/clickflow/homework/tasks.unikey_input_ 这个地址应该在75和85 上都有 39 $output_prefix = 40 HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_output_‘; 41 42 if (RemoteResourceAccessExists($remote_dir . ‘row_data_‘ . $date . ‘000‘)) { //正常情况下 ,这个条件是 成立的 row_data_20140806000 43 $remote_resource_names = getSplitedRawDataNames($date); //这个地方20140806000~20140806235 44 $tasks_input = array(); 45 46 foreach ($remote_resource_names as $key => $name) { 47 $resource_local = $resource_prefix . $key; // HOMEWORK_CLICKFLOW_TMP_PATH . ‘tasks.‘ . $unikey . ‘_input_0‘ 对应 row_data_20140806000 48 $remote = $remote_dir . ‘row_data_‘ . $name; //sftp://10.191.152.100:36000/data1/clickflow/refer-filtered/20140806/row_data_20140806235 49 50 if (!RemoteResourceAccessExists($remote)) { // 查看100上原始文件 是否存在 51 Logger::err(sprintf(‘%s not found‘, $remote)); 52 return false; 53 } 54 55 $get_result = RemoteResourceAccessGetUntil( //把100上的文件拿过来,放在本地 56 $remote_dir . ‘row_data_‘ . $name, 57 $resource_local, //存放在本地的临时文件 58 $continued 59 ); 60 61 if (!$continued) { 62 Logger::err(‘stop!‘); 63 return false; 64 } 65 66 FileAutoCleaner::add($resource_local); 67 $tasks_input[] = $resource_local; //把拿过来的文件 存到一个数组里面 68 } 69 } else { 70 $remote_resource_name = $remote_dir . ‘row_data_‘ . $date; 71 72 if (!RemoteResourceAccessExists($remote_resource_name)) { 73 Logger::err(sprintf(‘%s not found‘, $remote_resource_name)); 74 return false; 75 } 76 77 $resource_local = $resource_prefix . ‘inall‘; 78 $get_result = RemoteResourceAccessGetUntil( 79 $remote_resource_name, 80 $resource_local, 81 $continued 82 ); 83 84 if (!$continued) { 85 Logger::err(‘stop!‘); 86 return false; 87 } 88 89 FileAutoCleaner::add($resource_local); 90 91 if (false === split_file($resource_local, 144, $resource_prefix)) { 92 Logger::err(sprintf( 93 ‘failed to split the file[%s]‘, $resource_prefix 94 )); 95 96 return false; 97 } 98 99 for ($i = 0; $i < 144; ++$i) {100 $tasks_input[] = $resource_prefix . $i;101 }102 103 FileAutoCleaner::add($tasks_input);104 }105 106 $tasks_output = array( //输出数组???、、。。。107 $output_prefix . ‘pairs.output‘,108 pattern_range2array(109 $output_prefix . ‘rawdata_[0-99]‘110 )111 );112 113 file_put_contents($tasks_output[0], ‘‘);114 115 foreach ($tasks_output[1] as $task_rawdata_output) {116 file_put_contents($task_rawdata_output, ‘‘);117 }118 119 FileAutoCleaner::add($tasks_output[0], $tasks_output[1]);120 121 $num_of_processes = 40;122 $num_of_tasks = count($tasks_input); // 从100上过来多少个文件 就有多少子任务123 $counter = -1;124 125 $task_divider = new TaskDivider();126 127 while ($num_of_tasks > 0) {128 if ($num_of_tasks < $num_of_processes) {129 $num_of_processes = $num_of_tasks;130 }131 132 $task_divider->clear();133 134 for ($i = 0; $i < $num_of_processes; ++ $i) {135 ++ $counter;136 137 $task_divider->add(138 ‘getNewGuidUidPairsNPartitionRawDataTask‘,139 array(140 $tasks_input[$counter],141 $tasks_output[0],142 $tasks_output[1]143 )144 );145 }146 147 $run_result = $task_divider->run($continued);148 149 if (false === $run_result) {150 Logger::err(‘fail to run tasks‘);151 return false;152 } elseif ($run_result !== 0) {153 Logger::err(sprintf(‘fail to run %d tasks‘, $run_result));154 return false;155 } else {156 // nothing to do ...157 }158 159 $num_of_tasks -= $num_of_processes;160 }161 162 $uniq_result = uniq_file($tasks_output[0]);163 164 if (false === $uniq_result) {165 Logger::err(sprintf(‘fail to uniq the file[%s]‘, $tasks_output[0]));166 return false;167 }168 169 $put_result = RemoteResourceAccessPutUntil(170 $tasks_output[0],171 $result_pairs,172 $continued173 );174 175 if (!$continued) {176 Logger::err(‘stop!‘);177 return false;178 }179 180 foreach ($tasks_output[1] as $key => $task_rawdata_output) {181 $local_path = $task_rawdata_output;182 $remote_path = $results_rawdata[$key];183 184 $put_result = RemoteResourceAccessPutUntil(185 $local_path,186 $remote_path,187 $continued188 );189 190 if (!$continued) {191 Logger::err(‘stop!‘);192 return false;193 }194 }195 196 return true;197 }
1 /** 2 * get new guid uid pairs sub task for a single process to handle 3 * 4 * @param string $input_file file to handle 5 * @param string $output_pairs_file output of pairs 6 * @param array $output_rawdata_files output of rawdata partitioned 7 * 8 * @return boolean true when success, otherwise failed 9 * @author baronyuan 10 */ 11 function getNewGuidUidPairsNPartitionRawDataTask( 12 $input_file, 13 $output_pairs_file, 14 $output_rawdata_files 15 ) { 16 global $logger; 17 $logger->uninit(); 18 $logger = new Logger(HOMEWORK_WORKER_LOG_NAME); 19 20 $fp_input = @fopen($input_file, ‘r‘); //这个地方打开的应该是 已经从100上拿来的 存在本地的临时文件 21 22 if (false === $fp_input) { 23 Logger::err(sprintf(‘failed to open file[%s]‘, $input_file)); 24 return false; 25 } 26 27 $upairs = array(); 28 $fps_rawdata_output = array(); 29 30 for ($i = 0; $i < 100; ++ $i) { 31 $fp_rawdata_output = 32 $fps_rawdata_output[] = //这个地方打开了100个文件,存在一个数组里,准备往里面写 33 @fopen($output_rawdata_files[$i], ‘a‘); // a 以附加的方式打开只写文件。若文件不存在,则会建立该文件,如果文件存在,写入的数据会被加到文件尾,即文件原先的内容会被保留。 34 35 if (false === $fp_rawdata_output) { 36 Logger::err( 37 sprintf(‘fail to open file[%s]‘, $output_rawdata_files[$i]) 38 ); 39 40 return false; 41 } 42 } 43 44 $special_key = -1; 45 46 while (($line = fgets($fp_input, 10240)) !== false) { 47 $cells = explode("\t", $line); 48 49 if (!isset($cells[LOG_FIELD_MAX_IDX]) 50 || $cells[LOG_FIELD_IDX_LOG_TYPE] != LOG_TYPE_PV 51 || $cells[LOG_FIELD_IDX_PAGE_ID] < 0 52 ) { 53 continue; 54 } 55 56 // iframe 57 $page_id = (int)$cells[LOG_FIELD_IDX_PAGE_ID]; 58 $wh_id = (int)$cells[LOG_FIELD_IDX_WH_ID]; 59 60 if ($wh_id != 1971) { 61 if ((12376 == $page_id) || (1237017 == $page_id) 62 || (63786 == $page_id) || (6378017 == $page_id) 63 || (63796 == $page_id) || (6379017 == $page_id) 64 || (63806 == $page_id) || (6380017 == $page_id) 65 || (63816 == $page_id) || (6381017 == $page_id) 66 || (63826 == $page_id) || (6382017 == $page_id) 67 || (5020 == $page_id) || (11718320 == $page_id) 68 || (!(int)$cells[LOG_FIELD_IDX_PAGE_LEVEL] && 69 !preg_match(‘#buy\.(?:51buy|yixun)\.com#‘, $cells[LOG_FIELD_IDX_PAGE_URL])) 70 ) { 71 continue; 72 } 73 } 74 75 $uid = (int)$cells[LOG_FIELD_IDX_USER_ID]; 76 $guid = trim($cells[LOG_FIELD_IDX_GUID]); 77 78 $hash_key = null; 79 80 // tmp hack 81 if (!in_array($wh_id, array(1, 1001, 2001, 3001, 4001, 5001, 1969, 1971))) { 82 continue; 83 } 84 85 if (1970 == $wh_id) { 86 $hash_key = $special_key = (++ $special_key) % 100; 87 } else { 88 $hash_key = crc32($guid) % 100; 89 90 if ($uid && $guid && $guid != ‘null‘ && $guid != ‘Unknown‘) { 91 $upairs[$guid] = $uid; 92 } 93 } 94 95 if ($hash_key < 0 || $hash_key > 99) { 96 Logger::err(sprintf(‘invalid hash key %d‘, $hash_key)); 97 } 98 99 $write_result = @fwrite($fps_rawdata_output[$hash_key], $line);100 101 if (false === $write_result) {102 Logger::err(‘failed to write a line into a file‘);103 return false;104 }105 }106 107 if (!feof($fp_input)) {108 Logger::err(sprintf(‘failed to open file[%s]‘, $input_file));109 return false;110 }111 112 fclose($fp_input);113 114 foreach ($fps_rawdata_output as $fp_rawdata_output) {115 @fclose($fp_rawdata_output);116 }117 118 $fp_pairs_output = @fopen($output_pairs_file, ‘a‘);119 120 if (false === $fp_pairs_output) {121 return false;122 }123 124 foreach ($upairs as $guid => $uid) {125 $write_result = fwrite($fp_pairs_output, "{$guid}\t{$uid}\n");126 127 if (false === $write_result) {128 Logger:err(129 sprintf(130 ‘failed to write a line into file[%s]‘,131 $output_pairs_file132 )133 );134 135 return false;136 }137 }138 139 fclose($fp_pairs_output);140 return true;141 }
homework做了些什么?
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。