首页 > 代码库 > [python测试框架学习篇] 分享一个和adb相关的测试框架
[python测试框架学习篇] 分享一个和adb相关的测试框架
https://testerhome.com/topics/7106 (user: zteandallwinner password: same to qq )
264768502 · #1 · 2017年01月13日 小小的建议 没缩进不能看,不如贴gist 单纯的adb的封装有很多人写了 比如我(#厚脸皮) https://github.com/264768502/adb_wrapper 比如这贴: https://testerhome.com/topics/6938 如果要处理UI的话,其实有现成的,比如pyuiautomator或者Appium 96 yaboandriod · #2 · 2017年01月16日 controller代码 import xml.etree.ElementTree as ET import os import sys import subprocess as sp import time import logging import re import codecs import datetime import ui_hooks import socket import threading WINDOW_FACTOR = [‘index‘, ‘text‘, ‘resource-id‘, ‘class‘, ‘package‘, ‘visible‘, ‘content-desc‘, ‘checkable‘, ‘checked‘, ‘clickable‘, ‘enabled‘, ‘focusable‘, ‘focused‘, ‘scrollable‘, ‘long-clickable‘, ‘password‘, ‘bottom‘, ‘selected‘, ‘bounds‘] RETRY_MAX_TIME = 3 def get_data(address): result={} temp = ‘‘ f1 = open(address,‘r‘) temp = f1.read() result = eval(temp) f1.close() return result class Operator: def __init__(self,device_id=‘‘,log_class=None,log_path=‘‘,**kwargs): self.device_id = device_id self.log_class = log_class self.log_path = log_path self.current_path = os.path.dirname(__file__) self.home_dir = os.path.expanduser(‘~‘) if not os.path.isdir(self.log_path): try: os.makedirs(log_path) except OSError: pass self.failure_pic_location = os.path.join(self.log_path,‘screenshots‘) if not os.path.isdir(self.failure_pic_location): try: os.makedirs(self.failure_pic_location) except OSError: pass self.temporary = os.path.join(self.log_path,‘temp‘) if not os.path.isdir(self.temporary): try: os.makedirs(self.temporary) except OSError: pass self.hooks = ui_hooks.Hooks(self.device_id) def get_instance(self): op = Operator(self.device_id,self.log_class,self.log_path) return op def decode_utf(self,var): utype = codecs.lookup("utf-8") return utype.decode(var)[0] def update_dictory(self,dictory): keys_list = dictory.keys() values_list = dictory.values() temp={} for i in range(len(keys_list)): temp[keys_list[i]] = self.decode_utf(values_list[i]) return temp def search(self,**kwargs): global paras_dictory,retry_time temp = kwargs paras_dictory = self.update_dictory(temp) if paras_dictory.has_key("retry_time"): retry_time = paras_dictory["retry_time"] else: retry_time = None return self.judge(paras_dictory,retry_time) def judge(self,dic,retry_time): timer = 0 global xml_path xml_path = self.get_xml() if xml_path is None: print "Error : can not generate window dump file." panel = None return panel else: self.native_unlock_screen() if retry_time is not None: while 1: if timer < int(retry_time): if self.parse_window_dump(paras_dictory) is None: print "Warnning : there is no view or many views match your require, retry again!" self.little_swipe() time.sleep(1) timer+=1 self.judge(paras_dictory,retry_time) else: op = self.get_instance() panel = Panel(op) return panel break else: panel = None return panel else: if self.parse_window_dump(paras_dictory) is None: print "Warnning : there is no view or many views match your require, start to search hooks library...!" self.hooks.get_generators(self.get_all_nodes()) status = self.hooks.parse() # if status is True, it means find corresponding frame and click next button to skip this frame if status: time.sleep(1) self.judge(paras_dictory,retry_time).click() else: panel = None return panel else: op = self.get_instance() panel = Panel(op) return panel def get_intents_dictory(self): bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir)) app_intents_file_path = os.path.join(bluesea_dir,‘repository/intent_list.cfg‘) return get_data(app_intents_file_path) def hardware(self,**kwargs): executor = self.get_instance() dut = Device(executor) return dut def get_all_nodes(self): parser = ET.parse(xml_path) root = parser.getroot() node_instance_list = root.iter(‘node‘) return node_instance_list def get_scrollable_view(self): for ins in self.get_all_nodes(): if ins.attrib["scrollable"] == "true": bound = ins.get("bounds") break else: bound = None return bound def native_is_screen_lock(self): is_screen_lock = False for ins in self.get_all_nodes(): id_collector = ins.attrib[‘resource-id‘] if "id/lock_icon" in id_collector.strip(‘\n‘): is_screen_lock = True break return is_screen_lock def parse_window_dump(self,parametres=None): r1 = r‘[A-Za-z]+:id/(\w+)‘ bound = None p_counter = [] bound_list = [] counter_id = 0 counter_index = 0 counter_desc = 0 counter_pack = 0 counter_text = 0 counter_enabled = 0 false_flag = 0 if parametres.has_key(‘id‘): view_id = parametres[‘id‘] p_counter.append(view_id) if parametres.has_key(‘text‘): view_text = parametres[‘text‘] p_counter.append(view_text) if parametres.has_key(‘description‘): view_description = parametres[‘description‘] p_counter.append(view_description) if parametres.has_key(‘index‘): view_index = parametres[‘index‘] p_counter.append(view_index) if parametres.has_key(‘package‘): view_package = parametres[‘package‘] p_counter.append(view_package) if parametres.has_key(‘enabled‘): view_enabled = parametres[‘enabled‘] p_counter.append(view_enabled) if len(p_counter) == 0: print "Error : invalid input, there is no useful parameter given!!!" sys.exit(-1) for factor in self.get_all_nodes(): id_collector = factor.attrib[‘resource-id‘] text_collector = factor.attrib[‘text‘] index_collector = factor.attrib[‘index‘] desc_collector = factor.attrib[‘content-desc‘] pack_collector = factor.attrib[‘package‘] enabled_collector = factor.attrib[‘enabled‘] for your_input in p_counter: id_strs = re.findall(r1,id_collector.strip(‘\n‘)) id_string = id_strs if not id_strs == []: id_string=id_strs[0] else: id_string = id_collector.strip(‘\n\r‘) if your_input == id_string: bound_id = factor.get("bounds") counter_id +=1 if your_input == text_collector.strip(‘\n‘): bound_text = factor.get("bounds") counter_text +=1 if your_input == index_collector.strip(‘\n‘): bound_index = factor.get("bounds") counter_index +=1 if your_input == desc_collector.strip(‘\n‘): bound_desc = factor.get("bounds") counter_desc +=1 if your_input == pack_collector.strip(‘\n‘): bound_pack = factor.get("bounds") counter_pack +=1 if your_input == enabled_collector.strip(‘\n‘): bound_enabled = factor.get("bounds") counter_enabled +=1 if counter_id == 0: false_flag += 1 if counter_index == 0: false_flag += 1 if counter_pack == 0: false_flag += 1 if counter_desc == 0: false_flag += 1 if counter_text == 0: false_flag += 1 if counter_enabled == 0: false_flag += 1 if counter_id == 1: bound_list.append(bound_id) if counter_text == 1: bound_list.append(bound_text) if counter_desc == 1: bound_list.append(bound_desc) if counter_pack == 1: bound_list.append(bound_pack) if counter_index == 1: bound_list.append(bound_index) if counter_enabled == 1: bound_list.append(bound_enabled) if false_flag <= 6 - len(p_counter): if len(bound_list) == 0: bound = None elif len(bound_list) == 1: bound = bound_list[0] elif len(bound_list) == 2: temp_bound = bound_list[0] if bound_list[1] == temp_bound: bound = temp_bound else: bound = None elif len(bound_list) > 2: temp_bound = bound_list[0] for bo in range(1,len(bound_list)): if bound_list[bo] != temp_bound: bound = None break else: bound = temp_bound else: bound = None return bound def get_xml(self): retry_count = 0 while 1: if retry_count < RETRY_MAX_TIME: self.adb_root() shell_cmd = self.adb_shell() dump_xml_command = shell_cmd + ‘uiautomator dump /sdcard/window_dump.xml‘ self.execute_command(dump_xml_command).wait() xml_file_on_device_path = ‘/sdcard/window_dump.xml‘ xml_file_on_server_path = self.temporary xml_file_origin = os.path.join(xml_file_on_server_path,‘window_dump.xml‘) xml_new_name = self.device_id + ‘_‘ + ‘window_dump.xml‘ xml_file = os.path.join(xml_file_on_server_path,xml_new_name) get_xml_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + xml_file_on_device_path + ‘ ‘ + xml_file_on_server_path self.execute_command(get_xml_command).wait() try: os.rename(xml_file_origin,xml_file) except OSError as oer: print oer self.get_xml() if os.path.isfile(xml_file): return xml_file break else: retry_count += 1 else: print "Unkown issue of adb, uiautomator dump file failed!" break return def adb_root(self): root_command = ‘adb -s ‘ + self.device_id + ‘ root‘ self.execute_command(root_command).wait() def generate_folder_locate_picture(self): address_picture = self.failure_pic_location if not os.path.isdir(address_picture): try: os.makedirs(address_picture) except OSError: pass return address_picture def generate_screenshot_name_format(self): dt = datetime.datetime.now().strftime(‘%Y_%m_%d_%H_%M_%S‘) file_name = ‘screenshot_‘ + dt + ‘.png‘ return file_name def adb_command(self): adb_command = ‘adb -s ‘ + self.device_id + ‘ ‘ return adb_command def adb_shell(self): adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘ return adb_shell_command def execute_command(self,cmd,ignore_print=True): ccmd = cmd if ignore_print: self.log_class.info("Execute command : {0}".format(ccmd)) else: pass proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE) return proc def check_adb_works(self,proc): pass def get_dumpsys_display(self): dumpsys_display_command = self.adb_shell() + ‘dumpsys display‘ out = self.execute_command(dumpsys_display_command) lines = out.stdout.read().split(‘\n‘) return lines def bound_to_list(self): s = self.parse_window_dump(paras_dictory) temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘) temp.remove(‘‘) return temp def get_centre_coordinate(self): c_list = self.bound_to_list() point_c = [] co_x1 = c_list[0] co_x2 = c_list[2] point_c.append(str((int(co_x1)+int(co_x2))/2)) co_y1 = c_list[1] co_y2 = c_list[3] point_c.append(str((int(co_y1)+int(co_y2))/2)) return point_c def native_power_on(self): if not self.native_check_screen_on(): power_on_command = self.adb_shell() + ‘input keyevent 26‘ self.execute_command(power_on_command).wait() def native_unlock_screen(self): self.native_power_on() if self.native_is_screen_lock(): self.native_push_up() def native_check_screen_on(self): is_screen_on = False for line in self.get_dumpsys_display(): if ‘mScreenState‘ in line and ‘ON‘ in line: is_screen_on = True return is_screen_on def native_push_up(self): device_resolution = self.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/2) x2 = str(int(max_w)/2) y1 = str((int(max_h)*4)/5) y2 = str(int(max_h)/5) push_up_command = self.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 self.execute_command(push_up_command).wait() def get_max_resolution(self): max_resolution = [] for line in self.get_dumpsys_display(): if ‘mDisplayWidth‘ in line: max_resolution.append(line.strip(‘\r‘).split(‘=‘)[-1]) if ‘mDisplayHeight‘ in line: max_resolution.append(line.strip(‘\r‘).split(‘=‘)[-1]) return max_resolution def take_snapshot(self): global xml_path xml_path=self.get_xml() file_path = self.generate_folder_locate_picture() picture_name = self.generate_screenshot_name_format() uiautomator_dump_file_name = picture_name.split(‘.‘)[0] + ‘.uix‘ uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name) file_path_device = os.path.join(‘/sdcard/‘ + picture_name) take_snapshot_command = self.adb_shell() + ‘/system/bin/screencap -p ‘ + file_path_device pull_snapshot_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + file_path_device + ‘ ‘ + file_path get_uiautomator_dump_command = self.adb_shell() + ‘uiautomator dump‘ pull_uiautomator_dump_command = ‘adb -s ‘ + self.device_id + ‘ pull /sdcard/window_dump.xml‘ + ‘ ‘ + uiautomator_dump_file self.native_unlock_screen() time.sleep(0.5) self.execute_command(take_snapshot_command).wait() self.execute_command(pull_snapshot_command).wait() self.execute_command(get_uiautomator_dump_command).wait() self.execute_command(pull_uiautomator_dump_command).wait() def little_swipe(self): device_resolution_2 = self.get_max_resolution() max_w_2 = device_resolution_2[0] max_h_2 = device_resolution_2[1] x1_2 = str(int(max_w_2)/2) x2_2 = str(int(max_w_2)/2) y2_2 = str((int(max_h_2)*2)/5) y1_2 = str((int(max_h_2)*4)/5) little_swipe_command = self.adb_shell() + ‘input swipe ‘ + x1_2 + ‘ ‘ + y1_2 + ‘ ‘ + x2_2 + ‘ ‘ + y2_2 self.execute_command(little_swipe_command).wait() class Device(object): def __init__(self,executor): self.executor = executor def return_home(self): go_home_command = self.executor.adb_shell() + ‘input keyevent 3‘ self.executor.execute_command(go_home_command).wait() def press_enter(self): press_enter_command = self.executor.adb_shell() + ‘input keyevent 66‘ self.executor.execute_command(press_enter_command).wait() def press_menu(self): press_menu_command = self.executor.adb_shell() + ‘input keyevent 82‘ self.executor.execute_command(press_menu_command).wait() def shift_tab(self,counter=1): tab_key = ‘61 ‘ shift_tab_command = self.executor.adb_shell() + ‘input keyevent ‘ + (tab_key*int(counter)).strip() self.executor.execute_command(shift_tab_command).wait() def shift_down(self,counter=1): down_key = ‘20 ‘ press_down_command = self.executor.adb_shell() + ‘input keyevent ‘ + (down_key*int(counter)).strip() self.executor.execute_command(press_down_command).wait() def shift_up(self,counter=1): up_key = ‘19 ‘ press_up_command = self.executor.adb_shell() + ‘input keyevent ‘ + (up_key*int(counter)).strip() self.executor.execute_command(press_up_command).wait() def shift_left(self,counter=1): press_left_command = self.executor.adb_shell() + ‘input keyevent 21‘ for j in range(int(counter)): self.executor.execute_command(press_left_command).wait() time.sleep(1) def volume_up(self,counter=1): volume_up_command = self.executor.adb_shell() + ‘input keyevent 24‘ for j in range(int(counter)): self.executor.execute_command(volume_up_command).wait() time.sleep(0.5) def volume_down(self,counter=1): volume_down_command = self.executor.adb_shell() + ‘input keyevent 25‘ for j in range(int(counter)): self.executor.execute_command(volume_down_command).wait() time.sleep(0.5) def shift_right(self,counter=1): press_right_command = self.executor.adb_shell() + ‘input keyevent 22‘ for k in range(int(counter)): self.executor.execute_command(press_right_command).wait() time.sleep(1) def push_up(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/2) x2 = str(int(max_w)/2) y1 = str((int(max_h)*4)/5) y2 = str(int(max_h)/5) push_up_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 self.executor.execute_command(push_up_command).wait() def swipe_right(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/5) x2 = str((int(max_w)*4)/5) y1 = str(int(max_h)/2) y2 = str(int(max_h)/2) swipe_right_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 self.executor.execute_command(swipe_right_command).wait() def check_boot_status(self): is_booted = False check_boot_status_command = self.executor.adb_shell() + ‘getprop‘ grep_command = ‘grep bootanim‘ p5 = sp.Popen(check_boot_status_command.split(‘ ‘),stdout=sp.PIPE) p6 = sp.Popen(grep_command.split(‘ ‘),stdin=p5.stdout,stdout=sp.PIPE) out = p6.stdout.read().strip(‘\n‘) if "stopped" and ‘1‘ in out: is_booted = True return is_booted def check_reboot_progress(self): reboot_execute_success = False if not self.check_boot_status(): reboot_execute_success = True return reboot_execute_success def reboot_device(self): reboot_device_command = self.executor.adb_command() + ‘reboot‘ self.executor.execute_command(reboot_device_command).wait() def input_text(self,string=‘%s‘): string = string.strip().replace(‘ ‘,‘%s‘) input_text_command = self.executor.adb_shell() + ‘input text ‘ + string self.executor.execute_command(input_text_command).wait() def swipe_left(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str((int(max_w)*4)/5) x2 = str(int(max_w)/5) y1 = str(int(max_h)/2) y2 = str(int(max_h)/2) swipe_left_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 self.executor.execute_command(swipe_left_command).wait() def pull_down(self): device_resolution_1 = self.executor.get_max_resolution() max_w_1 = device_resolution_1[0] max_h_1 = device_resolution_1[1] if max_w_1 ==‘unknow‘ or max_h_1 ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1_1 = str(int(max_w_1)/2) x2_1 = str(int(max_w_1)/2) y1_1 = str(int(max_h_1)/5) y2_1 = str((int(max_h_1)*4)/5) push_up_command_1 = self.executor.adb_shell() + ‘input swipe ‘ + x1_1 + ‘ ‘ + y1_1 + ‘ ‘ + x2_1 + ‘ ‘ + y2_1 self.executor.execute_command(push_up_command_1).wait() def swipe_pull_notification(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: x1 = str(int(max_w)/2) x2 = str(int(max_w)/2) y1 = "0" y2 = str(int(max_h)/5) swip_noticfi_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 self.executor.execute_command(swip_noticfi_command).wait() def get_through_oobe(self): device_resolution = self.executor.get_max_resolution() max_w = device_resolution[0] max_h = device_resolution[1] print "[debug] max_w and max_h is ",max_w,max_h if max_w ==‘unknow‘ or max_h ==‘unknow‘: print "Error : dumpsys display failed" sys.exit(-1) else: p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)] p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)] p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)] p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)] tap_command1 = self.executor.adb_shell() + ‘input tap ‘ + p1[0] + ‘ ‘ + p1[1] tap_command2 = self.executor.adb_shell() + ‘input tap ‘ + p2[0] + ‘ ‘ + p2[1] tap_command3 = self.executor.adb_shell() + ‘input tap ‘ + p3[0] + ‘ ‘ + p3[1] tap_command4 = self.executor.adb_shell() + ‘input tap ‘ + p4[0] + ‘ ‘ + p4[1] self.executor.execute_command(tap_command1).wait() self.executor.execute_command(tap_command2).wait() self.executor.execute_command(tap_command3).wait() self.executor.execute_command(tap_command4).wait() def power_on(self): if not self.check_screen_on(): power_on_command = self.executor.adb_shell() + ‘input keyevent 26‘ self.executor.execute_command(power_on_command).wait() def press_back(self,counter=1): press_back_command = self.executor.adb_shell() + ‘input keyevent 4‘ for i in range(int(counter)): self.executor.execute_command(press_back_command).wait() time.sleep(1) def check_screen_on(self): is_screen_on = False for line in self.executor.get_dumpsys_display(): if ‘mScreenState‘ in line and ‘ON‘ in line: is_screen_on = True return is_screen_on def launch_app_with_intent(self,app="Native_HomePage",**kwargs): intent_dictory = self.executor.get_intents_dictory() if app in intent_dictory.keys(): app_intent = intent_dictory[app][‘intent‘] launch_app_command = self.executor.adb_shell() + ‘am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ‘ + app_intent self.executor.execute_command(launch_app_command).wait() else: print "[Error] there is no intent match your given application, please double check or contact author to add!!!" sys.exit(-1) class Panel(object): def __init__(self,panel): self.panel = panel def click(self): click_coordinate = self.panel.get_centre_coordinate() click_command = self.panel.adb_shell() + ‘input tap ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1] self.panel.execute_command(click_command).wait() def swipe_up(self,times=1): coordinate_list = self.panel.bound_to_list() x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2) x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2) y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5) y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5) push_up_command = self.panel.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2 for loop in range(int(times)): self.panel.execute_command(push_up_command).wait() def long_click(self): click_coordinate = self.panel.get_centre_coordinate() px = str(int(click_coordinate[0])+1) py = str(int(click_coordinate[1])+1) during_time = ‘2000‘ long_click_command = self.panel.adb_shell() + ‘input swipe ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1] + ‘ ‘ + px + ‘ ‘ + py + ‘ ‘ + during_time self.panel.execute_command(long_click_command).wait() def drag_to(self,dx,dy): drag_to_command = [] down_key_coordinate = self.panel.get_centre_coordinate() down_key_px = str(int(down_key_coordinate[0])) down_key_py = str(int(down_key_coordinate[1])) down_key_command = self.touch_down(down_key_px,down_key_py) move_command = self.move(str(dx),str(dy)) up_key_command = self.touch_up(str(dx),str(dy)) if self.get_pid() is None: self.generate_monkey_process() monkey_pid_num = self.get_pid() if monkey_pid_num is not None: drag_to_command.append(down_key_command) drag_to_command.append(move_command) drag_to_command.append(up_key_command) self.open_socket(drag_to_command) self.kill_monkey_process(monkey_pid_num) if self.get_pid() is not None: self.kill_monkey_process(monkey_pid_num) def touch_down(self,dx,dy): return ‘touch down {0} {1}\n‘.format(dx,dy) def touch_up(self,dx,dy): return ‘touch up {0} {1}\n‘.format(dx,dy) def move(self,dx,dy): return ‘touch move {0} {1}\n‘.format(dx,dy) def get_pid(self): monkey_pid=None r2=r‘\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey‘ grep_monkey_command = self.panel.adb_shell() + ‘ps‘ o = sp.Popen(grep_monkey_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(‘\n‘) re_result = re.findall(r2,o) if re_result == []: monkey_pid=None else: monkey_pid=re_result[0] return monkey_pid def kill_monkey_process(self,monkey_pid): if monkey_pid is not None: kill_monkey_process_command = self.panel.adb_shell() + ‘kill ‘ + monkey_pid self.panel.execute_command(kill_monkey_process_command).wait() def generate_monkey_process(self): generate_monkey_process_command = self.panel.adb_shell() + ‘LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555‘ p2 = sp.Popen(generate_monkey_process_command.split(‘ ‘),stdout=sp.PIPE) t1 = threading.Timer(5,lambda:self.kill(p2)) t1.setDaemon(True) t1.start() time.sleep(5) if self.get_pid() is not None: forwad_command = ‘adb -s ‘ + self.panel.device_id + ‘ forward tcp:15555 tcp:15555‘ self.panel.execute_command(forwad_command).wait() def open_socket(self,cmd_list): sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sk.settimeout(10) sk.connect((‘127.0.0.1‘,15555)) for cmd in cmd_list: self.panel.log_class.info("Execute command : {0}".format(cmd)) sk.sendall(cmd) time.sleep(1.5) sk.send(‘quit\n‘) sk.close() def kill(self,process): process.terminate() process.kill() 96 yaboandriod · #3 · 2017年01月16日 #1楼 @264768502 第一次发,不懂怎么发代码,现在改过来了。 96 yaboandriod · #4 · 2017年01月16日 logger代码如下: #!/usr/bin/python import logging import os class Logger(): def __init__(self,name,log_path): self.log_path = log_path self.name = name self.log_file = os.path.join(log_path,‘log.txt‘) if not os.path.isdir(log_path): try: os.makedirs(log_path) except OSError: pass def modlogger(self): mylogger = logging.getLogger(self.name) mylogger.setLevel(logging.DEBUG) fh = logging.FileHandler(self.log_file) sh = logging.StreamHandler() formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(message)s") fh.setFormatter(formatter) sh.setFormatter(formatter) mylogger.addHandler(fh) mylogger.addHandler(sh) return mylogger def get_instance(self): logger = Logger(self.name,self.log_path) return logger def logger(self): methods = self.get_instance() return Methods(methods) class Methods(object): def __init__(self,obj): self.ml = obj.modlogger() def info(self,string): self.ml.info(string) def debug(self,string): self.ml.debug(string) def warning(self,string): self.ml.warning(string) def error(self,string): self.ml.error(string) if __name__=="__main__": l = Logger(‘test0‘,‘/home/buildbot/bluesea/reporter‘) m = l.logger() m.info(‘------this is info message-------‘) m.error(‘tap ok button fail‘) m.debug(‘-----this is debug message-------‘) m.warning(‘you have not set timeout‘) 96 yaboandriod · #5 · 2017年01月16日 ui_hooks模块如下: import controler import time import inspect import subprocess as sp """ It provides a way to get through kinds of instability pup up box """ class Hooks: def __init__(self,device_id): self.device_id=device_id def get_generators(self,generator=None): global frame frame={} a,b,c,d,e,f,g=[],[],[],[],[],[],[] for element in generator: a.append(element.get(‘package‘)) b.append(element.get(‘index‘)) c.append(element.get(‘text‘)) d.append(element.get(‘content-desc‘)) e.append(element.get(‘resource-id‘)) f.append(element.get(‘bounds‘)) g.append(element.get(‘enabled‘)) for i in range(len(a)): frame[i+1]={‘package‘:a[i],‘index‘:b[i],‘text‘:c[i],‘description‘:d[i],‘id‘:e[i],‘bounds‘:f[i],‘enabled‘:g[i]} def parse(self): status=False status=self.divider() return status def divider(self): flag=False main_key = frame[1][‘package‘] seducer001 = inspect.getargspec(self.hook_maps)[3][0] seducer002 = inspect.getargspec(self.hook_contact)[3][0] seducer003 = inspect.getargspec(self.hook_gms)[3][0] seducer004 = inspect.getargspec(self.hook_settings)[3][0] seducer005 = inspect.getargspec(self.hook_stk)[3][0] seducer006 = inspect.getargspec(self.hook_android)[3][0] if main_key == seducer001: flag = self.executer(seducer001,self.hook_maps()) if main_key == seducer002: flag = self.executer(seducer002,self.hook_contact()) if main_key == seducer003: flag = self.executer(seducer003,self.hook_gms()) if main_key == seducer004: flag = self.executer(seducer004,self.hook_settings()) if main_key == seducer005: flag = self.executer(seducer005,self.hook_stk()) if main_key == seducer006: flag = self.executer(seducer006,self.hook_android()) return flag def executer(self,sed,func): e_flag=False print "this app is %s"% sed coodinate=func if coodinate is None: e_flag = False else: pointseq=self.get_point(coodinate) self.tap(pointseq) e_flag =True return e_flag def hook_maps(self,seducer=‘com.google.android.apps.maps‘): coodinate=None zipers=[] print "hook_maps method is invoked" return coodinate def hook_stk(self,seducer=‘com.android.stk‘): coodinate=None print "hook_stk method is invoked" for j in range(1,len(frame.keys())+1): if ‘button_ok‘ in frame[j][‘id‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] return coodinate def hook_android(self,seducer=‘android‘): coodinate=None print "hook_android method is invoked" for j in range(1,len(frame.keys())+1): if ‘button1‘ in frame[j][‘id‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] return coodinate def hook_settings(self,seducer=‘com.android.settings‘): coodinate=None zipers=["Update preferred SIM card?"] print "hook_settings method is invoked" zipers_match=False for zi in zipers: for i in range(1,len(frame.keys())+1): if zi == frame[i][‘text‘].strip(‘\n‘): zipers_match=True break if zipers_match: for j in range(1,len(frame.keys())+1): if ‘NO‘ in frame[j][‘text‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] return coodinate def hook_gms(self,seducer=‘com.google.android.gms‘): coodinate=None zipers=["Couldn‘t sign in"] print "hook_gms method is invoked" zipers_match=False for zi in zipers: for i in range(1,len(frame.keys())+1): if zi == frame[i][‘text‘].strip(‘\n‘): zipers_match=True break if zipers_match: for j in range(1,len(frame.keys())+1): if ‘Next‘ in frame[j][‘text‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] return coodinate def hook_contact(self,seducer=‘com.google.android.packageinstaller‘): coodinate=None zipers = ["Allow Contacts to access photos, media, and files on your device?"] print "hook_contact method is invoked" zipers_match=False for zi in zipers: for i in range(1,len(frame.keys())+1): if zi == frame[i][‘text‘].strip(‘\n‘): zipers_match=True break if zipers_match: for j in range(1,len(frame.keys())+1): if ‘Allow‘ in frame[j][‘text‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] if ‘Next‘ in frame[j][‘text‘].strip(‘\n‘): coodinate=frame[j][‘bounds‘] return coodinate def get_point(self,coodinate): s = coodinate temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘) temp.remove(‘‘) point_c = [] co_x1 = temp[0] co_x2 = temp[2] point_c.append(str((int(co_x1)+int(co_x2))/2)) co_y1 = temp[1] co_y2 = temp[3] point_c.append(str((int(co_y1)+int(co_y2))/2)) return point_c def tap(self,point_list): tap_coordinate = point_list tap_command = self.adb_shell() + ‘input tap ‘ + tap_coordinate[0] + ‘ ‘ + tap_coordinate[1] self.execute_command(tap_command).wait() def adb_shell(self): adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘ return adb_shell_command def execute_command(self,cmd,ignore_print=True): ccmd = cmd if ignore_print: print ccmd else: pass proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE) return proc 96 yaboandriod · #6 · 2017年01月16日 某个库文件如下: # coding: utf-8 import os import time import sys import subprocess as sp sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir))) from engine import controler max_retry = 30 class Commons: def __init__(self,conf_dic,ins_logger=None): self.conf_dic = conf_dic name = os.path.basename(conf_dic[‘TC‘]).split(‘.‘)[0] workpath = conf_dic[‘work_dir‘] dev = conf_dic[‘device_id‘] logpath = conf_dic[‘log_path‘] log_path = os.path.join(workpath,logpath,name) self.mylogger = ins_logger self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path) def warm_reboot(self): if not self.op.hardware().check_boot_status(): self.mylogger.error("[Common] Device is not ready for test reboot case. ") return False self.mylogger.info("[Common] Device is ready. ") self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id)) self.get_boot_infomation() self.op.hardware().reboot_device() time.sleep(15) self.mylogger.info("[Common] Start to check if execute reboot command? ") if not self.op.hardware().check_reboot_progress(): self.mylogger.error("[Common] unknow error, reboot fail. ") return False self.mylogger.info("[Common] Device start booting...") while 1: if self.op.hardware().check_boot_status(): self.mylogger.info("[Common] Device reboot successfully. ") break return True def get_boot_infomation(self): get_boot_infomation_command = ‘adb -s ‘ + self.op.device_id + ‘ shell getprop | grep boot‘ out = sp.Popen(get_boot_infomation_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(‘\n\r‘) self.mylogger.info(out) def log_on_google_account(self): self.mylogger.info("[Common] launch Settings app ") self.op.hardware().launch_app_with_intent(app=‘Settings‘) try: self.mylogger.info("[Common] click Accounts item ") self.op.search(text="Accounts",retry_time="3").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e)) return False try: self.mylogger.info("[Common] click Add account item ") self.op.search(text="Add account").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find Add account item . Got Exception {0}".format(e)) return False try: self.mylogger.info("[Common] click Google item ") self.op.search(text="Google").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find Google item . Got Exception {0}".format(e)) return False retry_count = 0 while 1: try: self.mylogger.info("[Common] click enter your email input edittext ") self.op.search(description="Enter your email ").click() retry_count = 0 break except AttributeError as e: if retry_count <= max_retry: self.mylogger.debug("[Common] Can‘t find Enter your email input edittext . try again") time.sleep(2) retry_count+=1 else: self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e)) return False self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic[‘google_account‘])) self.op.hardware().input_text(self.conf_dic[‘google_account‘]) try: self.mylogger.info("[Common] click NEXT button ") self.op.search(description="NEXT").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e)) return False time.sleep(5) try: self.mylogger.info("[Common] click password edittext ") self.op.search(id="password").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find password edittext . Got Exception {0}".format(e)) return False self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic[‘google_password‘])) self.op.hardware().input_text(self.conf_dic[‘google_password‘]) try: self.mylogger.info("[Common] click NEXT button ") self.op.search(description="NEXT").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e)) return False while 1: try: self.mylogger.info("[Common] click ACCEPT button ") self.op.search(description="ACCEPT").click() retry_count = 0 break except AttributeError as e: if retry_count <= max_retry: self.mylogger.debug("[Common] Can‘t find ACCEPT button . try again") time.sleep(2) retry_count+=1 else: self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e)) return False while 1: try: self.mylogger.info("[Common] check Google services shown??? ") self.op.search(text="Google services").click() retry_count = 0 break except AttributeError as e: if retry_count <= max_retry: self.mylogger.debug("[Common] Can‘t Google services keywords. try again") time.sleep(5) retry_count+=1 else: self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e)) return False try: self.mylogger.info("[Common] click Next button ") self.op.search(text="Next",enabled=‘true‘,retry_time=‘3‘).click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e)) return False while 1: try: self.mylogger.info("[Common] click No thanks button ") self.op.search(text="No thanks").click() retry_count = 0 break except AttributeError as e: if retry_count <= max_retry: self.mylogger.debug("[Common] Can‘t find No thanks button . try again") time.sleep(2) retry_count+=1 else: self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e)) return False try: self.mylogger.info("[Common] click Continue button ") self.op.search(text="Continue").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find Continue button . Got Exception {0}".format(e)) return False time.sleep(2) return True def check_google_account_added(self): self.mylogger.info("[Common] launch Settings app ") self.op.hardware().launch_app_with_intent(app=‘Settings‘) try: self.mylogger.info("[Common] click Accounts item ") self.op.search(text="Accounts",retry_time="3").click() except AttributeError as e: self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e)) return False if self.op.search(text=‘Google‘) is not None: self.mylogger.info("[Common] add google account successfully. ") return True else: self.mylogger.error("[Common] add account fail. ") return False 96 yaboandriod · #7 · 2017年01月16日 入口程序: import xml.etree.ElementTree as ET import os import sys sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir))) import argparse import random import subprocess as sp import pickle import datetime import re import time from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt import generate_report class Configuration: def __init__(self): self.log_path = ‘‘ self.execute_main_campaign = ‘‘ self.device_id = ‘‘ self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir)) def load(self,filepath): temp_dict = {} try: execfile(filepath,{},temp_dict) for key in temp_dict.keys(): setattr(self,key,temp_dict[key]) except TypeError,errormsg: print "Error : No file path given!" print "you can add -h option to view more info" sys.exit(-1) except IOError,msg1: print msg1 print "Error : Please specify a path which contain a config file!" sys.exit(-1) def get_testcase_from_xml(config): xml_path = os.path.join(config.work_dir,config.execute_main_campaign) parser = ET.parse(xml_path) root = parser.getroot() item_instance_list = root.iter(‘item‘) return item_instance_list def get_testsuite_from_xml(config): xml_path = os.path.join(config.work_dir,config.execute_main_campaign) parser = ET.parse(xml_path) root = parser.getroot() suite_instance_list = root.iter(‘suite‘) return suite_instance_list def get_full_path(config,string): return os.path.join(config.work_dir,string) + ‘.py‘ def generate_file(config,dictory): temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id) if not os.path.isdir(temporary_path): try: os.makedirs(temporary_path) except OSError: pass pickle_dump_path = os.path.join(temporary_path,‘settings_temp.pkl‘) f1 = open(pickle_dump_path,‘wb‘) pickle.dump(dictory,f1,True) f1.close() return pickle_dump_path def save_data_for_html(config,dictory): temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id) if not os.path.isdir(temporary_path): try: os.makedirs(temporary_path) except OSError: pass pickle_dump_path = os.path.join(temporary_path,‘html_raw_data.pkl‘) f1 = open(pickle_dump_path,‘wb‘) pickle.dump(dictory,f1,True) f1.close() return pickle_dump_path def main(config): r1=r‘Fail|Pass‘ r2=r‘(\d+)‘ r_crash_data0 = r‘DATA0=(\S+\s)[\s\S]*‘ r_crash_data1 = r‘DATA1=(\S+\s)[\s\S]*‘ r_crash_data2 = r‘DATA2=(\S+\s)[\s\S]*‘ seq_num = 1 html_result_data = {} settings_list = dir(config) for _ in settings_list: if _ == ‘__doc__‘: settings_list.remove(_) if _ == ‘__module__‘: settings_list.remove(_) if _ == ‘load‘: settings_list.remove(_) settings_list.remove(‘__init__‘) settings_dictory = {} for _ in settings_list: settings_dictory[_] = getattr(config,_) rs = ‘run_set_‘+datetime.datetime.now().strftime(‘%Y_%m_%d_%H_%M_%S‘) settings_dictory[‘log_path‘] = os.path.join(settings_dictory[‘log_path‘],config.device_id,rs) execue_case_sequence = {} for testsuite in get_testsuite_from_xml(config): execue_case_sequence[‘suite_name‘] = testsuite.get(‘name‘) execue_case_sequence[‘suite_loop‘] = testsuite.get(‘loop‘) execue_case_sequence[‘suite_mode‘] = testsuite.get(‘mode‘) execue_case_sequence[‘suite_content‘] = {} execue_case_sequence[‘device_id‘] = settings_dictory[‘device_id‘] for testcase in get_testcase_from_xml(config): execue_case_sequence[‘suite_content‘][seq_num] = [testcase.text,testcase.get(‘loop‘)] seq_num += 1 suite_loop = int(execue_case_sequence[‘suite_loop‘]) suite_mode = execue_case_sequence[‘suite_mode‘] tc_symbol_list = execue_case_sequence[‘suite_content‘].keys() suite_content = execue_case_sequence[‘suite_content‘] if suite_loop == 0 or tc_symbol_list == []: sys.exit(0) print "there is no loop specify for running testsuite or no cases need to be executed!" need_to_change_sequence = False if suite_mode == ‘regular‘: need_to_change_sequence = False elif suite_mode == ‘random‘: need_to_change_sequence = True else: need_to_change_sequence = False html_result_data = execue_case_sequence original_history = parse_history(settings_dictory[‘device_id‘]) for sl in range(1,suite_loop+1): print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence[‘suite_name‘],sl) if need_to_change_sequence: random.shuffle(tc_symbol_list) for i in range(len(tc_symbol_list)): item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1 item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0]) case_name = os.path.basename(item_path).split(‘.‘)[0] report_path = os.path.join(config.work_dir,settings_dictory[‘log_path‘],case_name) if item_loop == 0: continue for j in range(1,item_loop+1): print "Start to run case : {0} , loop {1}".format(case_name,j) start_time = time.time() start_time_dateformat = datetime.datetime.now().strftime(‘%Y/%m/%d_%H:%M:%S‘) settings_dictory[‘TC‘] = item_path dir_path = generate_file(config,settings_dictory) execute_command = ‘python ‘ + item_path + ‘ ‘ + dir_path proc=sp.Popen(execute_command.split(‘ ‘),stdout=sp.PIPE) out = proc.stdout.read().strip(‘\n‘) proc.wait() current_history = parse_history(settings_dictory[‘device_id‘]) stop_time = time.time() stop_time_dateformat = datetime.datetime.now().strftime(‘%Y/%m/%d_%H:%M:%S‘) time_taken = stop_time - start_time crash_list = iter_event(get_different_list(original_history,current_history)) print_crashfile = ‘‘ crashes = {} crash_dic = {} c_count = 1 for crash in crash_list: crashlog_path = get_crash_path(crash) if crashlog_path is not None: print_crashfile = get_crashfile_conetnt(settings_dictory[‘device_id‘],crashlog_path) match_data0 = re.findall(r_crash_data0,print_crashfile) match_data1 = re.findall(r_crash_data1,print_crashfile) match_data2 = re.findall(r_crash_data2,print_crashfile) crash_dic[‘crash_info‘] = crash.strip(‘\r‘) crash_dic[‘crash_data0‘] = match_data0[0] if match_data0 != [] else ‘‘ crash_dic[‘crash_data1‘] = match_data1[0] if match_data1 != [] else ‘‘ crash_dic[‘crash_data2‘] = match_data2[0] if match_data2 != [] else ‘‘ crashes[c_count] = crash_dic c_count+=1 crash_number = len(crash_list) original_history = current_history html_result_data[‘suite_content‘][i+1].append(re.findall(r1,out)[0]) html_result_data[‘suite_content‘][i+1].append(start_time_dateformat) html_result_data[‘suite_content‘][i+1].append(stop_time_dateformat) html_result_data[‘suite_content‘][i+1].append(round(time_taken,3)) html_result_data[‘suite_content‘][i+1].append(crash_number) html_result_data[‘suite_content‘][i+1].append(crashes) address = save_data_for_html(config,html_result_data) generate_report.main(address,report_path) if __name__ == ‘__main__‘: parse=argparse.ArgumentParser(usage=‘%(prog)s [options]‘) parse.add_argument(‘configuration_file‘,type=str,nargs=‘?‘,help="A path of config file") args=parse.parse_args() config_file = args.configuration_file configuration = Configuration() configuration.load(config_file) main(configuration) 96 yaboandriod · #8 · 2017年01月16日 自定义生成测试报告: import os import sys from pyh import * import pickle def main(address,report_path): f = open(address,‘rb‘) database1 = pickle.load(f) f.close() case_name_pre = report_path.split(‘/‘)[-1] report_path_base = os.path.abspath(os.path.join(report_path,os.pardir)) report_file_path = os.path.join(report_path_base,‘report.html‘) content_dic = database1[‘suite_content‘] page = PyH(‘test report‘) page << h1(‘this is report of yath project‘) order_list = page << ul(type=‘square‘) order_list << li(‘device id : %s‘%database1[‘device_id‘]) order_list << li(‘suite mode : %s‘%database1[‘suite_mode‘]) order_list << li(‘suite loop : %s‘%database1[‘suite_loop‘]) order_list << li(‘suite name : %s‘%database1[‘suite_name‘]) page << hr() p1_content = ‘Loop_‘+database1[‘suite_loop‘]+‘:‘ page << p(i(p1_content)) t = page << table(border=‘1‘,bgcolor=‘#bbbb99‘) t << tr(th(‘test case‘,align=‘left‘)+th(‘test loop‘,align=‘center‘)+th(‘test result‘,align=‘center‘)+th(‘start time‘,align=‘center‘)+th(‘stop time‘,align=‘center‘)+th(‘duration(s)‘,align=‘center‘)+th(‘crashes‘,align=‘center‘)) for k in content_dic: item = content_dic[k] if len(item) == 2: continue test_case = item[0] test_loop = item[1] if item[1] == ‘0‘: test_result,start_time,stop_time,duration,crashes = ‘NA‘,‘NA‘,‘NA‘,‘NA‘,‘0‘ else: test_result = item[2] start_time = item[3] stop_time = item[4] duration = item[5] crashes = item[6] if test_result == ‘Pass‘: attr_co_test_result = ‘green‘ elif test_result == ‘Fail‘: attr_co_test_result = ‘red‘ elif test_result == ‘NA‘: attr_co_test_result = ‘grey‘ if int(crashes) > 0: crashes_dic = item[7] generate_crashes_page(crashes_dic,test_case,report_path) link_crash = ‘file://‘+os.path.join(report_path,‘crash.html‘) t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(a(crashes,href=http://www.mamicode.com/link_crash),align=‘center‘)) else: t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(crashes,align=‘center‘)) page.printOut(report_file_path) def generate_crashes_page(dictory,case_name,crash_path): crash_file_path = os.path.join(crash_path,‘crash.html‘) crash_page = PyH(‘Crash Page‘) crash_page << h1(‘this is a page of crash.‘) crash_page << p(‘detail info : ‘) for j in dictory: div1 = crash_page << div() h2_content = ‘Crash %s occurs when execute %s : ‘%(str(j),case_name) div1 << h2(h2_content) crash_table = div1 << table(frame=‘box‘) crash_table << tr(th(‘key‘,align=‘left‘)+th(‘value‘),align=‘left‘) crash_table << tr(td(‘crash info‘,align=‘left‘)+td(dictory[j][‘crash_info‘]),align=‘left‘) crash_table << tr(td(‘data 0‘,align=‘left‘)+td(dictory[j][‘crash_data0‘]),align=‘left‘) crash_table << tr(td(‘data 1‘,align=‘left‘)+td(dictory[j][‘crash_data1‘]),align=‘left‘) crash_table << tr(td(‘data 2‘,align=‘left‘)+td(dictory[j][‘crash_data2‘]),align=‘left‘) crash_page.printOut(crash_file_path) if __name__ == "__main__": main() 96 yaboandriod · #9 · 2017年01月16日 测试用例: import os import unittest import sys import pickle sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir))) from container.base import settings from engine import log dir_path = sys.argv[1] f = open(dir_path,‘rb‘) config_dictory = pickle.load(f) f.close() name = os.path.basename(config_dictory[‘TC‘]).split(‘.‘)[0] workpath = config_dictory[‘work_dir‘] logpath = config_dictory[‘log_path‘] log_path = os.path.join(workpath,logpath,name) l = log.Logger(name,log_path) mylogger = l.logger() class test_setup_wifi_class(unittest.TestCase): def setUp(self): self.test_class = settings.Settings(config_dictory,mylogger) def tearDown(self): self.test_class.op.hardware().press_back(3) self.test_class.op.hardware().return_home() def test_skip_wizard(self): self.assertEqual(self.test_class.setup_wifi(),True,‘test setup_wifi case fail‘) if __name__ == "__main__": my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class) result = unittest.TextTestRunner(verbosity=0).run(my_suite) if len(result.failures) != 0 or len(result.errors) != 0: flag = ‘Fail‘ else: flag = ‘Pass‘ print flag,(len(result.failures)+len(result.errors)) 96 yaboandriod · #10 · 2017年02月03日 自己顶一下?? Db42eb mads · #11 · 2017年02月05日 你这个代码干嘛用的,陈述一下 D38bdc cloudhuan · #12 · 2017年02月05日 ???干嘛用的???封装了哪里?有什么优势? 96 yaboandriod · #13 · 2017年02月06日 针对android操作系统UI进行自动化测试,可以进行绝大多数的操作,多个app之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。 96 yaboandriod · #14 · 2017年02月06日 这是我写的一个小测试框架,case和lib可以自己定义,提供了log和controler接口。log用来打印日志,controler用来控制设备。 回帖
264768502 · #1 · 2017年01月13日
小小的建议
没缩进不能看,不如贴gist
单纯的adb的封装有很多人写了
比如我(#厚脸皮) https://github.com/264768502/adb_wrapper
比如这贴: https://testerhome.com/topics/6938
如果要处理UI的话,其实有现成的,比如pyuiautomator或者Appium
yaboandriod · #2 · 2017年01月16日
controller代码
import xml.etree.ElementTree as ET
import os
import sys
import subprocess as sp
import time
import logging
import re
import codecs
import datetime
import ui_hooks
import socket
import threading
WINDOW_FACTOR = [‘index‘, ‘text‘, ‘resource-id‘, ‘class‘, ‘package‘, ‘visible‘, ‘content-desc‘, ‘checkable‘,
‘checked‘, ‘clickable‘, ‘enabled‘, ‘focusable‘, ‘focused‘, ‘scrollable‘, ‘long-clickable‘,
‘password‘, ‘bottom‘, ‘selected‘, ‘bounds‘]
RETRY_MAX_TIME = 3
def get_data(address):
result={}
temp = ‘‘
f1 = open(address,‘r‘)
temp = f1.read()
result = eval(temp)
f1.close()
return result
class Operator:
def __init__(self,device_id=‘‘,log_class=None,log_path=‘‘,**kwargs):
self.device_id = device_id
self.log_class = log_class
self.log_path = log_path
self.current_path = os.path.dirname(__file__)
self.home_dir = os.path.expanduser(‘~‘)
if not os.path.isdir(self.log_path):
try:
os.makedirs(log_path)
except OSError:
pass
self.failure_pic_location = os.path.join(self.log_path,‘screenshots‘)
if not os.path.isdir(self.failure_pic_location):
try:
os.makedirs(self.failure_pic_location)
except OSError:
pass
self.temporary = os.path.join(self.log_path,‘temp‘)
if not os.path.isdir(self.temporary):
try:
os.makedirs(self.temporary)
except OSError:
pass
self.hooks = ui_hooks.Hooks(self.device_id)
def get_instance(self):
op = Operator(self.device_id,self.log_class,self.log_path)
return op
def decode_utf(self,var):
utype = codecs.lookup("utf-8")
return utype.decode(var)[0]
def update_dictory(self,dictory):
keys_list = dictory.keys()
values_list = dictory.values()
temp={}
for i in range(len(keys_list)):
temp[keys_list[i]] = self.decode_utf(values_list[i])
return temp
def search(self,**kwargs):
global paras_dictory,retry_time
temp = kwargs
paras_dictory = self.update_dictory(temp)
if paras_dictory.has_key("retry_time"):
retry_time = paras_dictory["retry_time"]
else:
retry_time = None
return self.judge(paras_dictory,retry_time)
def judge(self,dic,retry_time):
timer = 0
global xml_path
xml_path = self.get_xml()
if xml_path is None:
print "Error : can not generate window dump file."
panel = None
return panel
else:
self.native_unlock_screen()
if retry_time is not None:
while 1:
if timer < int(retry_time):
if self.parse_window_dump(paras_dictory) is None:
print "Warnning : there is no view or many views match your require, retry again!"
self.little_swipe()
time.sleep(1)
timer+=1
self.judge(paras_dictory,retry_time)
else:
op = self.get_instance()
panel = Panel(op)
return panel
break
else:
panel = None
return panel
else:
if self.parse_window_dump(paras_dictory) is None:
print "Warnning : there is no view or many views match your require, start to search hooks library...!"
self.hooks.get_generators(self.get_all_nodes())
status = self.hooks.parse()
# if status is True, it means find corresponding frame and click next button to skip this frame
if status:
time.sleep(1)
self.judge(paras_dictory,retry_time).click()
else:
panel = None
return panel
else:
op = self.get_instance()
panel = Panel(op)
return panel
def get_intents_dictory(self):
bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
app_intents_file_path = os.path.join(bluesea_dir,‘repository/intent_list.cfg‘)
return get_data(app_intents_file_path)
def hardware(self,**kwargs):
executor = self.get_instance()
dut = Device(executor)
return dut
def get_all_nodes(self):
parser = ET.parse(xml_path)
root = parser.getroot()
node_instance_list = root.iter(‘node‘)
return node_instance_list
def get_scrollable_view(self):
for ins in self.get_all_nodes():
if ins.attrib["scrollable"] == "true":
bound = ins.get("bounds")
break
else:
bound = None
return bound
def native_is_screen_lock(self):
is_screen_lock = False
for ins in self.get_all_nodes():
id_collector = ins.attrib[‘resource-id‘]
if "id/lock_icon" in id_collector.strip(‘\n‘):
is_screen_lock = True
break
return is_screen_lock
def parse_window_dump(self,parametres=None):
r1 = r‘[A-Za-z]+:id/(\w+)‘
bound = None
p_counter = []
bound_list = []
counter_id = 0
counter_index = 0
counter_desc = 0
counter_pack = 0
counter_text = 0
counter_enabled = 0
false_flag = 0
if parametres.has_key(‘id‘):
view_id = parametres[‘id‘]
p_counter.append(view_id)
if parametres.has_key(‘text‘):
view_text = parametres[‘text‘]
p_counter.append(view_text)
if parametres.has_key(‘description‘):
view_description = parametres[‘description‘]
p_counter.append(view_description)
if parametres.has_key(‘index‘):
view_index = parametres[‘index‘]
p_counter.append(view_index)
if parametres.has_key(‘package‘):
view_package = parametres[‘package‘]
p_counter.append(view_package)
if parametres.has_key(‘enabled‘):
view_enabled = parametres[‘enabled‘]
p_counter.append(view_enabled)
if len(p_counter) == 0:
print "Error : invalid input, there is no useful parameter given!!!"
sys.exit(-1)
for factor in self.get_all_nodes():
id_collector = factor.attrib[‘resource-id‘]
text_collector = factor.attrib[‘text‘]
index_collector = factor.attrib[‘index‘]
desc_collector = factor.attrib[‘content-desc‘]
pack_collector = factor.attrib[‘package‘]
enabled_collector = factor.attrib[‘enabled‘]
for your_input in p_counter:
id_strs = re.findall(r1,id_collector.strip(‘\n‘))
id_string = id_strs
if not id_strs == []:
id_string=id_strs[0]
else:
id_string = id_collector.strip(‘\n\r‘)
if your_input == id_string:
bound_id = factor.get("bounds")
counter_id +=1
if your_input == text_collector.strip(‘\n‘):
bound_text = factor.get("bounds")
counter_text +=1
if your_input == index_collector.strip(‘\n‘):
bound_index = factor.get("bounds")
counter_index +=1
if your_input == desc_collector.strip(‘\n‘):
bound_desc = factor.get("bounds")
counter_desc +=1
if your_input == pack_collector.strip(‘\n‘):
bound_pack = factor.get("bounds")
counter_pack +=1
if your_input == enabled_collector.strip(‘\n‘):
bound_enabled = factor.get("bounds")
counter_enabled +=1
if counter_id == 0:
false_flag += 1
if counter_index == 0:
false_flag += 1
if counter_pack == 0:
false_flag += 1
if counter_desc == 0:
false_flag += 1
if counter_text == 0:
false_flag += 1
if counter_enabled == 0:
false_flag += 1
if counter_id == 1:
bound_list.append(bound_id)
if counter_text == 1:
bound_list.append(bound_text)
if counter_desc == 1:
bound_list.append(bound_desc)
if counter_pack == 1:
bound_list.append(bound_pack)
if counter_index == 1:
bound_list.append(bound_index)
if counter_enabled == 1:
bound_list.append(bound_enabled)
if false_flag <= 6 - len(p_counter):
if len(bound_list) == 0:
bound = None
elif len(bound_list) == 1:
bound = bound_list[0]
elif len(bound_list) == 2:
temp_bound = bound_list[0]
if bound_list[1] == temp_bound:
bound = temp_bound
else:
bound = None
elif len(bound_list) > 2:
temp_bound = bound_list[0]
for bo in range(1,len(bound_list)):
if bound_list[bo] != temp_bound:
bound = None
break
else:
bound = temp_bound
else:
bound = None
return bound
def get_xml(self):
retry_count = 0
while 1:
if retry_count < RETRY_MAX_TIME:
self.adb_root()
shell_cmd = self.adb_shell()
dump_xml_command = shell_cmd + ‘uiautomator dump /sdcard/window_dump.xml‘
self.execute_command(dump_xml_command).wait()
xml_file_on_device_path = ‘/sdcard/window_dump.xml‘
xml_file_on_server_path = self.temporary
xml_file_origin = os.path.join(xml_file_on_server_path,‘window_dump.xml‘)
xml_new_name = self.device_id + ‘_‘ + ‘window_dump.xml‘
xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
get_xml_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + xml_file_on_device_path + ‘ ‘ + xml_file_on_server_path
self.execute_command(get_xml_command).wait()
try:
os.rename(xml_file_origin,xml_file)
except OSError as oer:
print oer
self.get_xml()
if os.path.isfile(xml_file):
return xml_file
break
else:
retry_count += 1
else:
print "Unkown issue of adb, uiautomator dump file failed!"
break
return
def adb_root(self):
root_command = ‘adb -s ‘ + self.device_id + ‘ root‘
self.execute_command(root_command).wait()
def generate_folder_locate_picture(self):
address_picture = self.failure_pic_location
if not os.path.isdir(address_picture):
try:
os.makedirs(address_picture)
except OSError:
pass
return address_picture
def generate_screenshot_name_format(self):
dt = datetime.datetime.now().strftime(‘%Y_%m_%d_%H_%M_%S‘)
file_name = ‘screenshot_‘ + dt + ‘.png‘
return file_name
def adb_command(self):
adb_command = ‘adb -s ‘ + self.device_id + ‘ ‘
return adb_command
def adb_shell(self):
adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘
return adb_shell_command
def execute_command(self,cmd,ignore_print=True):
ccmd = cmd
if ignore_print:
self.log_class.info("Execute command : {0}".format(ccmd))
else:
pass
proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE)
return proc
def check_adb_works(self,proc):
pass
def get_dumpsys_display(self):
dumpsys_display_command = self.adb_shell() + ‘dumpsys display‘
out = self.execute_command(dumpsys_display_command)
lines = out.stdout.read().split(‘\n‘)
return lines
def bound_to_list(self):
s = self.parse_window_dump(paras_dictory)
temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘)
temp.remove(‘‘)
return temp
def get_centre_coordinate(self):
c_list = self.bound_to_list()
point_c = []
co_x1 = c_list[0]
co_x2 = c_list[2]
point_c.append(str((int(co_x1)+int(co_x2))/2))
co_y1 = c_list[1]
co_y2 = c_list[3]
point_c.append(str((int(co_y1)+int(co_y2))/2))
return point_c
def native_power_on(self):
if not self.native_check_screen_on():
power_on_command = self.adb_shell() + ‘input keyevent 26‘
self.execute_command(power_on_command).wait()
def native_unlock_screen(self):
self.native_power_on()
if self.native_is_screen_lock():
self.native_push_up()
def native_check_screen_on(self):
is_screen_on = False
for line in self.get_dumpsys_display():
if ‘mScreenState‘ in line and ‘ON‘ in line:
is_screen_on = True
return is_screen_on
def native_push_up(self):
device_resolution = self.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1 = str(int(max_w)/2)
x2 = str(int(max_w)/2)
y1 = str((int(max_h)*4)/5)
y2 = str(int(max_h)/5)
push_up_command = self.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
self.execute_command(push_up_command).wait()
def get_max_resolution(self):
max_resolution = []
for line in self.get_dumpsys_display():
if ‘mDisplayWidth‘ in line:
max_resolution.append(line.strip(‘\r‘).split(‘=‘)[-1])
if ‘mDisplayHeight‘ in line:
max_resolution.append(line.strip(‘\r‘).split(‘=‘)[-1])
return max_resolution
def take_snapshot(self):
global xml_path
xml_path=self.get_xml()
file_path = self.generate_folder_locate_picture()
picture_name = self.generate_screenshot_name_format()
uiautomator_dump_file_name = picture_name.split(‘.‘)[0] + ‘.uix‘
uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
file_path_device = os.path.join(‘/sdcard/‘ + picture_name)
take_snapshot_command = self.adb_shell() + ‘/system/bin/screencap -p ‘ + file_path_device
pull_snapshot_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + file_path_device + ‘ ‘ + file_path
get_uiautomator_dump_command = self.adb_shell() + ‘uiautomator dump‘
pull_uiautomator_dump_command = ‘adb -s ‘ + self.device_id + ‘ pull /sdcard/window_dump.xml‘ + ‘ ‘ + uiautomator_dump_file
self.native_unlock_screen()
time.sleep(0.5)
self.execute_command(take_snapshot_command).wait()
self.execute_command(pull_snapshot_command).wait()
self.execute_command(get_uiautomator_dump_command).wait()
self.execute_command(pull_uiautomator_dump_command).wait()
def little_swipe(self):
device_resolution_2 = self.get_max_resolution()
max_w_2 = device_resolution_2[0]
max_h_2 = device_resolution_2[1]
x1_2 = str(int(max_w_2)/2)
x2_2 = str(int(max_w_2)/2)
y2_2 = str((int(max_h_2)*2)/5)
y1_2 = str((int(max_h_2)*4)/5)
little_swipe_command = self.adb_shell() + ‘input swipe ‘ + x1_2 + ‘ ‘ + y1_2 + ‘ ‘ + x2_2 + ‘ ‘ + y2_2
self.execute_command(little_swipe_command).wait()
class Device(object):
def __init__(self,executor):
self.executor = executor
def return_home(self):
go_home_command = self.executor.adb_shell() + ‘input keyevent 3‘
self.executor.execute_command(go_home_command).wait()
def press_enter(self):
press_enter_command = self.executor.adb_shell() + ‘input keyevent 66‘
self.executor.execute_command(press_enter_command).wait()
def press_menu(self):
press_menu_command = self.executor.adb_shell() + ‘input keyevent 82‘
self.executor.execute_command(press_menu_command).wait()
def shift_tab(self,counter=1):
tab_key = ‘61 ‘
shift_tab_command = self.executor.adb_shell() + ‘input keyevent ‘ + (tab_key*int(counter)).strip()
self.executor.execute_command(shift_tab_command).wait()
def shift_down(self,counter=1):
down_key = ‘20 ‘
press_down_command = self.executor.adb_shell() + ‘input keyevent ‘ + (down_key*int(counter)).strip()
self.executor.execute_command(press_down_command).wait()
def shift_up(self,counter=1):
up_key = ‘19 ‘
press_up_command = self.executor.adb_shell() + ‘input keyevent ‘ + (up_key*int(counter)).strip()
self.executor.execute_command(press_up_command).wait()
def shift_left(self,counter=1):
press_left_command = self.executor.adb_shell() + ‘input keyevent 21‘
for j in range(int(counter)):
self.executor.execute_command(press_left_command).wait()
time.sleep(1)
def volume_up(self,counter=1):
volume_up_command = self.executor.adb_shell() + ‘input keyevent 24‘
for j in range(int(counter)):
self.executor.execute_command(volume_up_command).wait()
time.sleep(0.5)
def volume_down(self,counter=1):
volume_down_command = self.executor.adb_shell() + ‘input keyevent 25‘
for j in range(int(counter)):
self.executor.execute_command(volume_down_command).wait()
time.sleep(0.5)
def shift_right(self,counter=1):
press_right_command = self.executor.adb_shell() + ‘input keyevent 22‘
for k in range(int(counter)):
self.executor.execute_command(press_right_command).wait()
time.sleep(1)
def push_up(self):
device_resolution = self.executor.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1 = str(int(max_w)/2)
x2 = str(int(max_w)/2)
y1 = str((int(max_h)*4)/5)
y2 = str(int(max_h)/5)
push_up_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
self.executor.execute_command(push_up_command).wait()
def swipe_right(self):
device_resolution = self.executor.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1 = str(int(max_w)/5)
x2 = str((int(max_w)*4)/5)
y1 = str(int(max_h)/2)
y2 = str(int(max_h)/2)
swipe_right_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
self.executor.execute_command(swipe_right_command).wait()
def check_boot_status(self):
is_booted = False
check_boot_status_command = self.executor.adb_shell() + ‘getprop‘
grep_command = ‘grep bootanim‘
p5 = sp.Popen(check_boot_status_command.split(‘ ‘),stdout=sp.PIPE)
p6 = sp.Popen(grep_command.split(‘ ‘),stdin=p5.stdout,stdout=sp.PIPE)
out = p6.stdout.read().strip(‘\n‘)
if "stopped" and ‘1‘ in out:
is_booted = True
return is_booted
def check_reboot_progress(self):
reboot_execute_success = False
if not self.check_boot_status():
reboot_execute_success = True
return reboot_execute_success
def reboot_device(self):
reboot_device_command = self.executor.adb_command() + ‘reboot‘
self.executor.execute_command(reboot_device_command).wait()
def input_text(self,string=‘%s‘):
string = string.strip().replace(‘ ‘,‘%s‘)
input_text_command = self.executor.adb_shell() + ‘input text ‘ + string
self.executor.execute_command(input_text_command).wait()
def swipe_left(self):
device_resolution = self.executor.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1 = str((int(max_w)*4)/5)
x2 = str(int(max_w)/5)
y1 = str(int(max_h)/2)
y2 = str(int(max_h)/2)
swipe_left_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
self.executor.execute_command(swipe_left_command).wait()
def pull_down(self):
device_resolution_1 = self.executor.get_max_resolution()
max_w_1 = device_resolution_1[0]
max_h_1 = device_resolution_1[1]
if max_w_1 ==‘unknow‘ or max_h_1 ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1_1 = str(int(max_w_1)/2)
x2_1 = str(int(max_w_1)/2)
y1_1 = str(int(max_h_1)/5)
y2_1 = str((int(max_h_1)*4)/5)
push_up_command_1 = self.executor.adb_shell() + ‘input swipe ‘ + x1_1 + ‘ ‘ + y1_1 + ‘ ‘ + x2_1 + ‘ ‘ + y2_1
self.executor.execute_command(push_up_command_1).wait()
def swipe_pull_notification(self):
device_resolution = self.executor.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
x1 = str(int(max_w)/2)
x2 = str(int(max_w)/2)
y1 = "0"
y2 = str(int(max_h)/5)
swip_noticfi_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
self.executor.execute_command(swip_noticfi_command).wait()
def get_through_oobe(self):
device_resolution = self.executor.get_max_resolution()
max_w = device_resolution[0]
max_h = device_resolution[1]
print "[debug] max_w and max_h is ",max_w,max_h
if max_w ==‘unknow‘ or max_h ==‘unknow‘:
print "Error : dumpsys display failed"
sys.exit(-1)
else:
p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
tap_command1 = self.executor.adb_shell() + ‘input tap ‘ + p1[0] + ‘ ‘ + p1[1]
tap_command2 = self.executor.adb_shell() + ‘input tap ‘ + p2[0] + ‘ ‘ + p2[1]
tap_command3 = self.executor.adb_shell() + ‘input tap ‘ + p3[0] + ‘ ‘ + p3[1]
tap_command4 = self.executor.adb_shell() + ‘input tap ‘ + p4[0] + ‘ ‘ + p4[1]
self.executor.execute_command(tap_command1).wait()
self.executor.execute_command(tap_command2).wait()
self.executor.execute_command(tap_command3).wait()
self.executor.execute_command(tap_command4).wait()
def power_on(self):
if not self.check_screen_on():
power_on_command = self.executor.adb_shell() + ‘input keyevent 26‘
self.executor.execute_command(power_on_command).wait()
def press_back(self,counter=1):
press_back_command = self.executor.adb_shell() + ‘input keyevent 4‘
for i in range(int(counter)):
self.executor.execute_command(press_back_command).wait()
time.sleep(1)
def check_screen_on(self):
is_screen_on = False
for line in self.executor.get_dumpsys_display():
if ‘mScreenState‘ in line and ‘ON‘ in line:
is_screen_on = True
return is_screen_on
def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
intent_dictory = self.executor.get_intents_dictory()
if app in intent_dictory.keys():
app_intent = intent_dictory[app][‘intent‘]
launch_app_command = self.executor.adb_shell() + ‘am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ‘ + app_intent
self.executor.execute_command(launch_app_command).wait()
else:
print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
sys.exit(-1)
class Panel(object):
def __init__(self,panel):
self.panel = panel
def click(self):
click_coordinate = self.panel.get_centre_coordinate()
click_command = self.panel.adb_shell() + ‘input tap ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1]
self.panel.execute_command(click_command).wait()
def swipe_up(self,times=1):
coordinate_list = self.panel.bound_to_list()
x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
push_up_command = self.panel.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
for loop in range(int(times)):
self.panel.execute_command(push_up_command).wait()
def long_click(self):
click_coordinate = self.panel.get_centre_coordinate()
px = str(int(click_coordinate[0])+1)
py = str(int(click_coordinate[1])+1)
during_time = ‘2000‘
long_click_command = self.panel.adb_shell() + ‘input swipe ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1] + ‘ ‘ + px + ‘ ‘ + py + ‘ ‘ + during_time
self.panel.execute_command(long_click_command).wait()
def drag_to(self,dx,dy):
drag_to_command = []
down_key_coordinate = self.panel.get_centre_coordinate()
down_key_px = str(int(down_key_coordinate[0]))
down_key_py = str(int(down_key_coordinate[1]))
down_key_command = self.touch_down(down_key_px,down_key_py)
move_command = self.move(str(dx),str(dy))
up_key_command = self.touch_up(str(dx),str(dy))
if self.get_pid() is None:
self.generate_monkey_process()
monkey_pid_num = self.get_pid()
if monkey_pid_num is not None:
drag_to_command.append(down_key_command)
drag_to_command.append(move_command)
drag_to_command.append(up_key_command)
self.open_socket(drag_to_command)
self.kill_monkey_process(monkey_pid_num)
if self.get_pid() is not None:
self.kill_monkey_process(monkey_pid_num)
def touch_down(self,dx,dy):
return ‘touch down {0} {1}\n‘.format(dx,dy)
def touch_up(self,dx,dy):
return ‘touch up {0} {1}\n‘.format(dx,dy)
def move(self,dx,dy):
return ‘touch move {0} {1}\n‘.format(dx,dy)
def get_pid(self):
monkey_pid=None
r2=r‘\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey‘
grep_monkey_command = self.panel.adb_shell() + ‘ps‘
o = sp.Popen(grep_monkey_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(‘\n‘)
re_result = re.findall(r2,o)
if re_result == []:
monkey_pid=None
else:
monkey_pid=re_result[0]
return monkey_pid
def kill_monkey_process(self,monkey_pid):
if monkey_pid is not None:
kill_monkey_process_command = self.panel.adb_shell() + ‘kill ‘ + monkey_pid
self.panel.execute_command(kill_monkey_process_command).wait()
def generate_monkey_process(self):
generate_monkey_process_command = self.panel.adb_shell() + ‘LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555‘
p2 = sp.Popen(generate_monkey_process_command.split(‘ ‘),stdout=sp.PIPE)
t1 = threading.Timer(5,lambda:self.kill(p2))
t1.setDaemon(True)
t1.start()
time.sleep(5)
if self.get_pid() is not None:
forwad_command = ‘adb -s ‘ + self.panel.device_id + ‘ forward tcp:15555 tcp:15555‘
self.panel.execute_command(forwad_command).wait()
def open_socket(self,cmd_list):
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.settimeout(10)
sk.connect((‘127.0.0.1‘,15555))
for cmd in cmd_list:
self.panel.log_class.info("Execute command : {0}".format(cmd))
sk.sendall(cmd)
time.sleep(1.5)
sk.send(‘quit\n‘)
sk.close()
def kill(self,process):
process.terminate()
process.kill()
yaboandriod · #3 · 2017年01月16日
#1楼 @264768502
第一次发,不懂怎么发代码,现在改过来了。
yaboandriod · #4 · 2017年01月16日
logger代码如下:
#!/usr/bin/python
import logging
import os
class Logger():
def __init__(self,name,log_path):
self.log_path = log_path
self.name = name
self.log_file = os.path.join(log_path,‘log.txt‘)
if not os.path.isdir(log_path):
try:
os.makedirs(log_path)
except OSError:
pass
def modlogger(self):
mylogger = logging.getLogger(self.name)
mylogger.setLevel(logging.DEBUG)
fh = logging.FileHandler(self.log_file)
sh = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(message)s")
fh.setFormatter(formatter)
sh.setFormatter(formatter)
mylogger.addHandler(fh)
mylogger.addHandler(sh)
return mylogger
def get_instance(self):
logger = Logger(self.name,self.log_path)
return logger
def logger(self):
methods = self.get_instance()
return Methods(methods)
class Methods(object):
def __init__(self,obj):
self.ml = obj.modlogger()
def info(self,string):
self.ml.info(string)
def debug(self,string):
self.ml.debug(string)
def warning(self,string):
self.ml.warning(string)
def error(self,string):
self.ml.error(string)
if __name__=="__main__":
l = Logger(‘test0‘,‘/home/buildbot/bluesea/reporter‘)
m = l.logger()
m.info(‘------this is info message-------‘)
m.error(‘tap ok button fail‘)
m.debug(‘-----this is debug message-------‘)
m.warning(‘you have not set timeout‘)
yaboandriod · #5 · 2017年01月16日
ui_hooks模块如下:
import controler
import time
import inspect
import subprocess as sp
"""
It provides a way to get through kinds of instability pup up box
"""
class Hooks:
def __init__(self,device_id):
self.device_id=device_id
def get_generators(self,generator=None):
global frame
frame={}
a,b,c,d,e,f,g=[],[],[],[],[],[],[]
for element in generator:
a.append(element.get(‘package‘))
b.append(element.get(‘index‘))
c.append(element.get(‘text‘))
d.append(element.get(‘content-desc‘))
e.append(element.get(‘resource-id‘))
f.append(element.get(‘bounds‘))
g.append(element.get(‘enabled‘))
for i in range(len(a)):
frame[i+1]={‘package‘:a[i],‘index‘:b[i],‘text‘:c[i],‘description‘:d[i],‘id‘:e[i],‘bounds‘:f[i],‘enabled‘:g[i]}
def parse(self):
status=False
status=self.divider()
return status
def divider(self):
flag=False
main_key = frame[1][‘package‘]
seducer001 = inspect.getargspec(self.hook_maps)[3][0]
seducer002 = inspect.getargspec(self.hook_contact)[3][0]
seducer003 = inspect.getargspec(self.hook_gms)[3][0]
seducer004 = inspect.getargspec(self.hook_settings)[3][0]
seducer005 = inspect.getargspec(self.hook_stk)[3][0]
seducer006 = inspect.getargspec(self.hook_android)[3][0]
if main_key == seducer001:
flag = self.executer(seducer001,self.hook_maps())
if main_key == seducer002:
flag = self.executer(seducer002,self.hook_contact())
if main_key == seducer003:
flag = self.executer(seducer003,self.hook_gms())
if main_key == seducer004:
flag = self.executer(seducer004,self.hook_settings())
if main_key == seducer005:
flag = self.executer(seducer005,self.hook_stk())
if main_key == seducer006:
flag = self.executer(seducer006,self.hook_android())
return flag
def executer(self,sed,func):
e_flag=False
print "this app is %s"% sed
coodinate=func
if coodinate is None:
e_flag = False
else:
pointseq=self.get_point(coodinate)
self.tap(pointseq)
e_flag =True
return e_flag
def hook_maps(self,seducer=‘com.google.android.apps.maps‘):
coodinate=None
zipers=[]
print "hook_maps method is invoked"
return coodinate
def hook_stk(self,seducer=‘com.android.stk‘):
coodinate=None
print "hook_stk method is invoked"
for j in range(1,len(frame.keys())+1):
if ‘button_ok‘ in frame[j][‘id‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
return coodinate
def hook_android(self,seducer=‘android‘):
coodinate=None
print "hook_android method is invoked"
for j in range(1,len(frame.keys())+1):
if ‘button1‘ in frame[j][‘id‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
return coodinate
def hook_settings(self,seducer=‘com.android.settings‘):
coodinate=None
zipers=["Update preferred SIM card?"]
print "hook_settings method is invoked"
zipers_match=False
for zi in zipers:
for i in range(1,len(frame.keys())+1):
if zi == frame[i][‘text‘].strip(‘\n‘):
zipers_match=True
break
if zipers_match:
for j in range(1,len(frame.keys())+1):
if ‘NO‘ in frame[j][‘text‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
return coodinate
def hook_gms(self,seducer=‘com.google.android.gms‘):
coodinate=None
zipers=["Couldn‘t sign in"]
print "hook_gms method is invoked"
zipers_match=False
for zi in zipers:
for i in range(1,len(frame.keys())+1):
if zi == frame[i][‘text‘].strip(‘\n‘):
zipers_match=True
break
if zipers_match:
for j in range(1,len(frame.keys())+1):
if ‘Next‘ in frame[j][‘text‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
return coodinate
def hook_contact(self,seducer=‘com.google.android.packageinstaller‘):
coodinate=None
zipers = ["Allow Contacts to access photos, media, and files on your device?"]
print "hook_contact method is invoked"
zipers_match=False
for zi in zipers:
for i in range(1,len(frame.keys())+1):
if zi == frame[i][‘text‘].strip(‘\n‘):
zipers_match=True
break
if zipers_match:
for j in range(1,len(frame.keys())+1):
if ‘Allow‘ in frame[j][‘text‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
if ‘Next‘ in frame[j][‘text‘].strip(‘\n‘):
coodinate=frame[j][‘bounds‘]
return coodinate
def get_point(self,coodinate):
s = coodinate
temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘)
temp.remove(‘‘)
point_c = []
co_x1 = temp[0]
co_x2 = temp[2]
point_c.append(str((int(co_x1)+int(co_x2))/2))
co_y1 = temp[1]
co_y2 = temp[3]
point_c.append(str((int(co_y1)+int(co_y2))/2))
return point_c
def tap(self,point_list):
tap_coordinate = point_list
tap_command = self.adb_shell() + ‘input tap ‘ + tap_coordinate[0] + ‘ ‘ + tap_coordinate[1]
self.execute_command(tap_command).wait()
def adb_shell(self):
adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘
return adb_shell_command
def execute_command(self,cmd,ignore_print=True):
ccmd = cmd
if ignore_print:
print ccmd
else:
pass
proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE)
return proc
yaboandriod · #6 · 2017年01月16日
某个库文件如下:
# coding: utf-8
import os
import time
import sys
import subprocess as sp
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from engine import controler
max_retry = 30
class Commons:
def __init__(self,conf_dic,ins_logger=None):
self.conf_dic = conf_dic
name = os.path.basename(conf_dic[‘TC‘]).split(‘.‘)[0]
workpath = conf_dic[‘work_dir‘]
dev = conf_dic[‘device_id‘]
logpath = conf_dic[‘log_path‘]
log_path = os.path.join(workpath,logpath,name)
self.mylogger = ins_logger
self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path)
def warm_reboot(self):
if not self.op.hardware().check_boot_status():
self.mylogger.error("[Common] Device is not ready for test reboot case. ")
return False
self.mylogger.info("[Common] Device is ready. ")
self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id))
self.get_boot_infomation()
self.op.hardware().reboot_device()
time.sleep(15)
self.mylogger.info("[Common] Start to check if execute reboot command? ")
if not self.op.hardware().check_reboot_progress():
self.mylogger.error("[Common] unknow error, reboot fail. ")
return False
self.mylogger.info("[Common] Device start booting...")
while 1:
if self.op.hardware().check_boot_status():
self.mylogger.info("[Common] Device reboot successfully. ")
break
return True
def get_boot_infomation(self):
get_boot_infomation_command = ‘adb -s ‘ + self.op.device_id + ‘ shell getprop | grep boot‘
out = sp.Popen(get_boot_infomation_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(‘\n\r‘)
self.mylogger.info(out)
def log_on_google_account(self):
self.mylogger.info("[Common] launch Settings app ")
self.op.hardware().launch_app_with_intent(app=‘Settings‘)
try:
self.mylogger.info("[Common] click Accounts item ")
self.op.search(text="Accounts",retry_time="3").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
return False
try:
self.mylogger.info("[Common] click Add account item ")
self.op.search(text="Add account").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find Add account item . Got Exception {0}".format(e))
return False
try:
self.mylogger.info("[Common] click Google item ")
self.op.search(text="Google").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find Google item . Got Exception {0}".format(e))
return False
retry_count = 0
while 1:
try:
self.mylogger.info("[Common] click enter your email input edittext ")
self.op.search(description="Enter your email ").click()
retry_count = 0
break
except AttributeError as e:
if retry_count <= max_retry:
self.mylogger.debug("[Common] Can‘t find Enter your email input edittext . try again")
time.sleep(2)
retry_count+=1
else:
self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
return False
self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic[‘google_account‘]))
self.op.hardware().input_text(self.conf_dic[‘google_account‘])
try:
self.mylogger.info("[Common] click NEXT button ")
self.op.search(description="NEXT").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
return False
time.sleep(5)
try:
self.mylogger.info("[Common] click password edittext ")
self.op.search(id="password").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find password edittext . Got Exception {0}".format(e))
return False
self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic[‘google_password‘]))
self.op.hardware().input_text(self.conf_dic[‘google_password‘])
try:
self.mylogger.info("[Common] click NEXT button ")
self.op.search(description="NEXT").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
return False
while 1:
try:
self.mylogger.info("[Common] click ACCEPT button ")
self.op.search(description="ACCEPT").click()
retry_count = 0
break
except AttributeError as e:
if retry_count <= max_retry:
self.mylogger.debug("[Common] Can‘t find ACCEPT button . try again")
time.sleep(2)
retry_count+=1
else:
self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
return False
while 1:
try:
self.mylogger.info("[Common] check Google services shown??? ")
self.op.search(text="Google services").click()
retry_count = 0
break
except AttributeError as e:
if retry_count <= max_retry:
self.mylogger.debug("[Common] Can‘t Google services keywords. try again")
time.sleep(5)
retry_count+=1
else:
self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
return False
try:
self.mylogger.info("[Common] click Next button ")
self.op.search(text="Next",enabled=‘true‘,retry_time=‘3‘).click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
return False
while 1:
try:
self.mylogger.info("[Common] click No thanks button ")
self.op.search(text="No thanks").click()
retry_count = 0
break
except AttributeError as e:
if retry_count <= max_retry:
self.mylogger.debug("[Common] Can‘t find No thanks button . try again")
time.sleep(2)
retry_count+=1
else:
self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
return False
try:
self.mylogger.info("[Common] click Continue button ")
self.op.search(text="Continue").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find Continue button . Got Exception {0}".format(e))
return False
time.sleep(2)
return True
def check_google_account_added(self):
self.mylogger.info("[Common] launch Settings app ")
self.op.hardware().launch_app_with_intent(app=‘Settings‘)
try:
self.mylogger.info("[Common] click Accounts item ")
self.op.search(text="Accounts",retry_time="3").click()
except AttributeError as e:
self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
return False
if self.op.search(text=‘Google‘) is not None:
self.mylogger.info("[Common] add google account successfully. ")
return True
else:
self.mylogger.error("[Common] add account fail. ")
return False
yaboandriod · #7 · 2017年01月16日
入口程序:
import xml.etree.ElementTree as ET
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir)))
import argparse
import random
import subprocess as sp
import pickle
import datetime
import re
import time
from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt
import generate_report
class Configuration:
def __init__(self):
self.log_path = ‘‘
self.execute_main_campaign = ‘‘
self.device_id = ‘‘
self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir))
def load(self,filepath):
temp_dict = {}
try:
execfile(filepath,{},temp_dict)
for key in temp_dict.keys():
setattr(self,key,temp_dict[key])
except TypeError,errormsg:
print "Error : No file path given!"
print "you can add -h option to view more info"
sys.exit(-1)
except IOError,msg1:
print msg1
print "Error : Please specify a path which contain a config file!"
sys.exit(-1)
def get_testcase_from_xml(config):
xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
parser = ET.parse(xml_path)
root = parser.getroot()
item_instance_list = root.iter(‘item‘)
return item_instance_list
def get_testsuite_from_xml(config):
xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
parser = ET.parse(xml_path)
root = parser.getroot()
suite_instance_list = root.iter(‘suite‘)
return suite_instance_list
def get_full_path(config,string):
return os.path.join(config.work_dir,string) + ‘.py‘
def generate_file(config,dictory):
temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id)
if not os.path.isdir(temporary_path):
try:
os.makedirs(temporary_path)
except OSError:
pass
pickle_dump_path = os.path.join(temporary_path,‘settings_temp.pkl‘)
f1 = open(pickle_dump_path,‘wb‘)
pickle.dump(dictory,f1,True)
f1.close()
return pickle_dump_path
def save_data_for_html(config,dictory):
temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id)
if not os.path.isdir(temporary_path):
try:
os.makedirs(temporary_path)
except OSError:
pass
pickle_dump_path = os.path.join(temporary_path,‘html_raw_data.pkl‘)
f1 = open(pickle_dump_path,‘wb‘)
pickle.dump(dictory,f1,True)
f1.close()
return pickle_dump_path
def main(config):
r1=r‘Fail|Pass‘
r2=r‘(\d+)‘
r_crash_data0 = r‘DATA0=(\S+\s)[\s\S]*‘
r_crash_data1 = r‘DATA1=(\S+\s)[\s\S]*‘
r_crash_data2 = r‘DATA2=(\S+\s)[\s\S]*‘
seq_num = 1
html_result_data = {}
settings_list = dir(config)
for _ in settings_list:
if _ == ‘__doc__‘:
settings_list.remove(_)
if _ == ‘__module__‘:
settings_list.remove(_)
if _ == ‘load‘:
settings_list.remove(_)
settings_list.remove(‘__init__‘)
settings_dictory = {}
for _ in settings_list:
settings_dictory[_] = getattr(config,_)
rs = ‘run_set_‘+datetime.datetime.now().strftime(‘%Y_%m_%d_%H_%M_%S‘)
settings_dictory[‘log_path‘] = os.path.join(settings_dictory[‘log_path‘],config.device_id,rs)
execue_case_sequence = {}
for testsuite in get_testsuite_from_xml(config):
execue_case_sequence[‘suite_name‘] = testsuite.get(‘name‘)
execue_case_sequence[‘suite_loop‘] = testsuite.get(‘loop‘)
execue_case_sequence[‘suite_mode‘] = testsuite.get(‘mode‘)
execue_case_sequence[‘suite_content‘] = {}
execue_case_sequence[‘device_id‘] = settings_dictory[‘device_id‘]
for testcase in get_testcase_from_xml(config):
execue_case_sequence[‘suite_content‘][seq_num] = [testcase.text,testcase.get(‘loop‘)]
seq_num += 1
suite_loop = int(execue_case_sequence[‘suite_loop‘])
suite_mode = execue_case_sequence[‘suite_mode‘]
tc_symbol_list = execue_case_sequence[‘suite_content‘].keys()
suite_content = execue_case_sequence[‘suite_content‘]
if suite_loop == 0 or tc_symbol_list == []:
sys.exit(0)
print "there is no loop specify for running testsuite or no cases need to be executed!"
need_to_change_sequence = False
if suite_mode == ‘regular‘:
need_to_change_sequence = False
elif suite_mode == ‘random‘:
need_to_change_sequence = True
else:
need_to_change_sequence = False
html_result_data = execue_case_sequence
original_history = parse_history(settings_dictory[‘device_id‘])
for sl in range(1,suite_loop+1):
print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence[‘suite_name‘],sl)
if need_to_change_sequence:
random.shuffle(tc_symbol_list)
for i in range(len(tc_symbol_list)):
item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1
item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0])
case_name = os.path.basename(item_path).split(‘.‘)[0]
report_path = os.path.join(config.work_dir,settings_dictory[‘log_path‘],case_name)
if item_loop == 0:
continue
for j in range(1,item_loop+1):
print "Start to run case : {0} , loop {1}".format(case_name,j)
start_time = time.time()
start_time_dateformat = datetime.datetime.now().strftime(‘%Y/%m/%d_%H:%M:%S‘)
settings_dictory[‘TC‘] = item_path
dir_path = generate_file(config,settings_dictory)
execute_command = ‘python ‘ + item_path + ‘ ‘ + dir_path
proc=sp.Popen(execute_command.split(‘ ‘),stdout=sp.PIPE)
out = proc.stdout.read().strip(‘\n‘)
proc.wait()
current_history = parse_history(settings_dictory[‘device_id‘])
stop_time = time.time()
stop_time_dateformat = datetime.datetime.now().strftime(‘%Y/%m/%d_%H:%M:%S‘)
time_taken = stop_time - start_time
crash_list = iter_event(get_different_list(original_history,current_history))
print_crashfile = ‘‘
crashes = {}
crash_dic = {}
c_count = 1
for crash in crash_list:
crashlog_path = get_crash_path(crash)
if crashlog_path is not None:
print_crashfile = get_crashfile_conetnt(settings_dictory[‘device_id‘],crashlog_path)
match_data0 = re.findall(r_crash_data0,print_crashfile)
match_data1 = re.findall(r_crash_data1,print_crashfile)
match_data2 = re.findall(r_crash_data2,print_crashfile)
crash_dic[‘crash_info‘] = crash.strip(‘\r‘)
crash_dic[‘crash_data0‘] = match_data0[0] if match_data0 != [] else ‘‘
crash_dic[‘crash_data1‘] = match_data1[0] if match_data1 != [] else ‘‘
crash_dic[‘crash_data2‘] = match_data2[0] if match_data2 != [] else ‘‘
crashes[c_count] = crash_dic
c_count+=1
crash_number = len(crash_list)
original_history = current_history
html_result_data[‘suite_content‘][i+1].append(re.findall(r1,out)[0])
html_result_data[‘suite_content‘][i+1].append(start_time_dateformat)
html_result_data[‘suite_content‘][i+1].append(stop_time_dateformat)
html_result_data[‘suite_content‘][i+1].append(round(time_taken,3))
html_result_data[‘suite_content‘][i+1].append(crash_number)
html_result_data[‘suite_content‘][i+1].append(crashes)
address = save_data_for_html(config,html_result_data)
generate_report.main(address,report_path)
if __name__ == ‘__main__‘:
parse=argparse.ArgumentParser(usage=‘%(prog)s [options]‘)
parse.add_argument(‘configuration_file‘,type=str,nargs=‘?‘,help="A path of config file")
args=parse.parse_args()
config_file = args.configuration_file
configuration = Configuration()
configuration.load(config_file)
main(configuration)
yaboandriod · #8 · 2017年01月16日
自定义生成测试报告:
import os
import sys
from pyh import *
import pickle
def main(address,report_path):
f = open(address,‘rb‘)
database1 = pickle.load(f)
f.close()
case_name_pre = report_path.split(‘/‘)[-1]
report_path_base = os.path.abspath(os.path.join(report_path,os.pardir))
report_file_path = os.path.join(report_path_base,‘report.html‘)
content_dic = database1[‘suite_content‘]
page = PyH(‘test report‘)
page << h1(‘this is report of yath project‘)
order_list = page << ul(type=‘square‘)
order_list << li(‘device id : %s‘%database1[‘device_id‘])
order_list << li(‘suite mode : %s‘%database1[‘suite_mode‘])
order_list << li(‘suite loop : %s‘%database1[‘suite_loop‘])
order_list << li(‘suite name : %s‘%database1[‘suite_name‘])
page << hr()
p1_content = ‘Loop_‘+database1[‘suite_loop‘]+‘:‘
page << p(i(p1_content))
t = page << table(border=‘1‘,bgcolor=‘#bbbb99‘)
t << tr(th(‘test case‘,align=‘left‘)+th(‘test loop‘,align=‘center‘)+th(‘test result‘,align=‘center‘)+th(‘start time‘,align=‘center‘)+th(‘stop time‘,align=‘center‘)+th(‘duration(s)‘,align=‘center‘)+th(‘crashes‘,align=‘center‘))
for k in content_dic:
item = content_dic[k]
if len(item) == 2:
continue
test_case = item[0]
test_loop = item[1]
if item[1] == ‘0‘:
test_result,start_time,stop_time,duration,crashes = ‘NA‘,‘NA‘,‘NA‘,‘NA‘,‘0‘
else:
test_result = item[2]
start_time = item[3]
stop_time = item[4]
duration = item[5]
crashes = item[6]
if test_result == ‘Pass‘:
attr_co_test_result = ‘green‘
elif test_result == ‘Fail‘:
attr_co_test_result = ‘red‘
elif test_result == ‘NA‘:
attr_co_test_result = ‘grey‘
if int(crashes) > 0:
crashes_dic = item[7]
generate_crashes_page(crashes_dic,test_case,report_path)
link_crash = ‘file://‘+os.path.join(report_path,‘crash.html‘)
t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(a(crashes,href=link_crash),align=‘center‘))
else:
t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(crashes,align=‘center‘))
page.printOut(report_file_path)
def generate_crashes_page(dictory,case_name,crash_path):
crash_file_path = os.path.join(crash_path,‘crash.html‘)
crash_page = PyH(‘Crash Page‘)
crash_page << h1(‘this is a page of crash.‘)
crash_page << p(‘detail info : ‘)
for j in dictory:
div1 = crash_page << div()
h2_content = ‘Crash %s occurs when execute %s : ‘%(str(j),case_name)
div1 << h2(h2_content)
crash_table = div1 << table(frame=‘box‘)
crash_table << tr(th(‘key‘,align=‘left‘)+th(‘value‘),align=‘left‘)
crash_table << tr(td(‘crash info‘,align=‘left‘)+td(dictory[j][‘crash_info‘]),align=‘left‘)
crash_table << tr(td(‘data 0‘,align=‘left‘)+td(dictory[j][‘crash_data0‘]),align=‘left‘)
crash_table << tr(td(‘data 1‘,align=‘left‘)+td(dictory[j][‘crash_data1‘]),align=‘left‘)
crash_table << tr(td(‘data 2‘,align=‘left‘)+td(dictory[j][‘crash_data2‘]),align=‘left‘)
crash_page.printOut(crash_file_path)
if __name__ == "__main__":
main()
yaboandriod · #9 · 2017年01月16日
测试用例:
import os
import unittest
import sys
import pickle
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from container.base import settings
from engine import log
dir_path = sys.argv[1]
f = open(dir_path,‘rb‘)
config_dictory = pickle.load(f)
f.close()
name = os.path.basename(config_dictory[‘TC‘]).split(‘.‘)[0]
workpath = config_dictory[‘work_dir‘]
logpath = config_dictory[‘log_path‘]
log_path = os.path.join(workpath,logpath,name)
l = log.Logger(name,log_path)
mylogger = l.logger()
class test_setup_wifi_class(unittest.TestCase):
def setUp(self):
self.test_class = settings.Settings(config_dictory,mylogger)
def tearDown(self):
self.test_class.op.hardware().press_back(3)
self.test_class.op.hardware().return_home()
def test_skip_wizard(self):
self.assertEqual(self.test_class.setup_wifi(),True,‘test setup_wifi case fail‘)
if __name__ == "__main__":
my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class)
result = unittest.TextTestRunner(verbosity=0).run(my_suite)
if len(result.failures) != 0 or len(result.errors) != 0:
flag = ‘Fail‘
else:
flag = ‘Pass‘
print flag,(len(result.failures)+len(result.errors))
yaboandriod · #10 · 2017年02月03日
自己顶一下
mads · #11 · 2017年02月05日
你这个代码干嘛用的,陈述一下
cloudhuan · #12 · 2017年02月05日
???干嘛用的???封装了哪里?有什么优势?
yaboandriod · #13 · 2017年02月06日
针对android操作系统UI进行自动化测试,可以进行绝大多数的操作,多个app之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。
yaboandriod · #14 · 2017年02月06日
这是我写的一个小测试框架,case和lib可以自己定义,提供了log和controler接口。log用来打印日志,controler用来控制设备。
回帖
[python测试框架学习篇] 分享一个和adb相关的测试框架
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。