首页 > 代码库 > [python测试框架学习篇] 分享一个和adb相关的测试框架

[python测试框架学习篇] 分享一个和adb相关的测试框架

https://testerhome.com/topics/7106   (user: zteandallwinner     password: same to qq )

264768502 · #1 · 2017年01月13日
小小的建议
没缩进不能看,不如贴gist
单纯的adb的封装有很多人写了
比如我(#厚脸皮) https://github.com/264768502/adb_wrapper
比如这贴: https://testerhome.com/topics/6938

如果要处理UI的话,其实有现成的,比如pyuiautomator或者Appium
96
yaboandriod · #2 · 2017年01月16日
controller代码

import xml.etree.ElementTree as ET
import os
import sys
import subprocess as sp
import time
import logging
import re
import codecs
import datetime
import ui_hooks
import socket
import threading


WINDOW_FACTOR = [index, text, resource-id, class, package, visible, content-desc, checkable,
              checked, clickable, enabled, focusable, focused, scrollable, long-clickable,
              password, bottom, selected, bounds]

RETRY_MAX_TIME = 3

def get_data(address):
    result={}
    temp = ‘‘
    f1 = open(address,r)
    temp = f1.read()
    result = eval(temp)
    f1.close()
    return result 




class Operator:
    def __init__(self,device_id=‘‘,log_class=None,log_path=‘‘,**kwargs):
        self.device_id = device_id
        self.log_class = log_class
        self.log_path = log_path
        self.current_path = os.path.dirname(__file__) 
        self.home_dir = os.path.expanduser(~)
        if not os.path.isdir(self.log_path): 
            try:
                os.makedirs(log_path)
            except OSError:
                pass         
        self.failure_pic_location = os.path.join(self.log_path,screenshots)
        if not os.path.isdir(self.failure_pic_location):
            try:
                os.makedirs(self.failure_pic_location)
            except OSError:
                pass             
        self.temporary = os.path.join(self.log_path,temp)
        if not os.path.isdir(self.temporary):
            try:
                os.makedirs(self.temporary)
            except OSError:
                pass             
        self.hooks = ui_hooks.Hooks(self.device_id)       



    def get_instance(self):
        op = Operator(self.device_id,self.log_class,self.log_path)
        return op

    def decode_utf(self,var):
        utype = codecs.lookup("utf-8")
        return utype.decode(var)[0]

    def update_dictory(self,dictory):
        keys_list = dictory.keys()
        values_list = dictory.values()
        temp={}
        for i in range(len(keys_list)):
            temp[keys_list[i]] = self.decode_utf(values_list[i])
        return temp    

    def search(self,**kwargs):
        global paras_dictory,retry_time
        temp = kwargs
        paras_dictory = self.update_dictory(temp)
        if paras_dictory.has_key("retry_time"):
            retry_time = paras_dictory["retry_time"]
        else:
            retry_time = None
        return self.judge(paras_dictory,retry_time)

    def judge(self,dic,retry_time):
        timer = 0      
        global xml_path    
        xml_path = self.get_xml()
        if xml_path is None:
            print "Error : can not generate window dump file."
            panel = None
            return panel
        else:
            self.native_unlock_screen() 
            if retry_time is not None:
                while 1:
                    if timer < int(retry_time):
                        if self.parse_window_dump(paras_dictory) is None:
                            print "Warnning : there is no view or many views match your require, retry again!"
                            self.little_swipe()
                            time.sleep(1)
                            timer+=1
                            self.judge(paras_dictory,retry_time)
                        else:
                            op = self.get_instance()
                            panel = Panel(op)
                            return panel
                            break
                    else:
                        panel = None
                        return panel
            else:
                if self.parse_window_dump(paras_dictory) is None:
                    print "Warnning : there is no view or many views match your require, start to search hooks library...!" 
                    self.hooks.get_generators(self.get_all_nodes())
                    status = self.hooks.parse()
                    # if status is True, it means find corresponding frame and click next button to skip this frame
                    if status:
                        time.sleep(1)
                        self.judge(paras_dictory,retry_time).click()
                    else:
                        panel = None
                        return panel
                else:
                    op = self.get_instance()
                    panel = Panel(op)
                    return panel


    def get_intents_dictory(self):
        bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
        app_intents_file_path = os.path.join(bluesea_dir,repository/intent_list.cfg)
        return get_data(app_intents_file_path)


    def hardware(self,**kwargs):    
        executor = self.get_instance()
        dut = Device(executor)
        return dut


    def get_all_nodes(self):
        parser = ET.parse(xml_path)
        root = parser.getroot()
        node_instance_list = root.iter(node)
        return node_instance_list

    def get_scrollable_view(self):
        for ins in self.get_all_nodes():
            if ins.attrib["scrollable"] == "true":
                bound = ins.get("bounds")
                break
            else:
                bound = None
        return bound

    def native_is_screen_lock(self):
        is_screen_lock = False
        for ins in self.get_all_nodes():
            id_collector = ins.attrib[resource-id]
            if "id/lock_icon" in id_collector.strip(\n):
                is_screen_lock = True
                break     
        return is_screen_lock


    def parse_window_dump(self,parametres=None):
        r1 = r[A-Za-z]+:id/(\w+)
        bound = None
        p_counter = []
        bound_list = [] 
        counter_id = 0
        counter_index = 0
        counter_desc = 0
        counter_pack = 0
        counter_text = 0
        counter_enabled = 0
        false_flag = 0
        if parametres.has_key(id):
            view_id = parametres[id]
            p_counter.append(view_id)
        if parametres.has_key(text):
            view_text = parametres[text]
            p_counter.append(view_text)
        if parametres.has_key(description):
            view_description = parametres[description]
            p_counter.append(view_description)
        if parametres.has_key(index):
            view_index = parametres[index]
            p_counter.append(view_index)
        if parametres.has_key(package):
            view_package = parametres[package]
            p_counter.append(view_package)
        if parametres.has_key(enabled):
            view_enabled = parametres[enabled]
            p_counter.append(view_enabled)


        if len(p_counter) == 0:
            print "Error : invalid input, there is no useful parameter given!!!"
            sys.exit(-1)

        for factor in self.get_all_nodes():
            id_collector = factor.attrib[resource-id]
            text_collector = factor.attrib[text]  
            index_collector = factor.attrib[index]
            desc_collector = factor.attrib[content-desc]
            pack_collector = factor.attrib[package]
            enabled_collector = factor.attrib[enabled]
            for your_input in p_counter:
                id_strs = re.findall(r1,id_collector.strip(\n))
                id_string = id_strs
                if not id_strs == []:
                    id_string=id_strs[0]
                else:
                    id_string = id_collector.strip(\n\r)    
                if your_input == id_string:
                    bound_id = factor.get("bounds")
                    counter_id +=1
                if your_input == text_collector.strip(\n):
                    bound_text = factor.get("bounds")
                    counter_text +=1
                if your_input == index_collector.strip(\n):
                    bound_index = factor.get("bounds")
                    counter_index +=1
                if your_input == desc_collector.strip(\n):
                    bound_desc = factor.get("bounds")
                    counter_desc +=1
                if your_input == pack_collector.strip(\n):
                    bound_pack = factor.get("bounds")
                    counter_pack +=1
                if your_input == enabled_collector.strip(\n):
                    bound_enabled = factor.get("bounds")
                    counter_enabled +=1                    

        if counter_id == 0:
            false_flag += 1 
        if counter_index == 0:
            false_flag += 1 
        if counter_pack == 0:
            false_flag += 1            
        if counter_desc == 0:
            false_flag += 1 
        if counter_text == 0:
            false_flag += 1
        if counter_enabled == 0:
            false_flag += 1

        if counter_id == 1:
            bound_list.append(bound_id)
        if counter_text == 1:
            bound_list.append(bound_text)
        if counter_desc == 1:
            bound_list.append(bound_desc)
        if counter_pack == 1:
            bound_list.append(bound_pack)
        if counter_index == 1:
            bound_list.append(bound_index)
        if counter_enabled == 1:
            bound_list.append(bound_enabled)

        if false_flag <= 6 - len(p_counter):  
            if len(bound_list) == 0:
                bound = None
            elif len(bound_list) == 1:
                bound = bound_list[0]
            elif len(bound_list) == 2:
                temp_bound = bound_list[0]
                if bound_list[1] == temp_bound:
                    bound = temp_bound
                else:
                    bound = None        
            elif len(bound_list) > 2:
                temp_bound = bound_list[0]
                for bo in range(1,len(bound_list)):
                    if bound_list[bo] != temp_bound:
                        bound = None
                        break   
                    else:
                        bound = temp_bound
        else:
            bound = None                 
        return bound            

    def get_xml(self):
        retry_count = 0
        while 1:
            if  retry_count < RETRY_MAX_TIME:
                self.adb_root()
                shell_cmd = self.adb_shell()
                dump_xml_command = shell_cmd + uiautomator dump /sdcard/window_dump.xml
                self.execute_command(dump_xml_command).wait()
                xml_file_on_device_path = /sdcard/window_dump.xml
                xml_file_on_server_path = self.temporary
                xml_file_origin = os.path.join(xml_file_on_server_path,window_dump.xml)
                xml_new_name = self.device_id + _ + window_dump.xml
                xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
                get_xml_command = adb -s  + self.device_id +  pull  + xml_file_on_device_path +   + xml_file_on_server_path
                self.execute_command(get_xml_command).wait()
                try:
                    os.rename(xml_file_origin,xml_file)
                except OSError as oer:
                    print oer
                    self.get_xml()    
                if os.path.isfile(xml_file):
                    return xml_file
                    break
                else:
                    retry_count += 1    
            else:
                print "Unkown issue of adb, uiautomator dump file failed!"  
                break
        return          

    def adb_root(self):
        root_command = adb -s  + self.device_id +  root
        self.execute_command(root_command).wait()   

    def generate_folder_locate_picture(self):
        address_picture = self.failure_pic_location
        if not os.path.isdir(address_picture):
            try:
                os.makedirs(address_picture)
            except OSError:
                pass             
        return address_picture

    def generate_screenshot_name_format(self):
        dt = datetime.datetime.now().strftime(%Y_%m_%d_%H_%M_%S)
        file_name = screenshot_ + dt + .png
        return file_name

    def adb_command(self):
        adb_command = adb -s  + self.device_id +  
        return adb_command


    def adb_shell(self):
        adb_shell_command = adb -s  + self.device_id +  shell 
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            self.log_class.info("Execute command : {0}".format(ccmd))
        else:
            pass    
        proc = sp.Popen(ccmd.split( ),stdout=sp.PIPE)    
        return proc

    def check_adb_works(self,proc):
        pass    

    def get_dumpsys_display(self):
        dumpsys_display_command = self.adb_shell() + dumpsys display
        out = self.execute_command(dumpsys_display_command)
        lines = out.stdout.read().split(\n)
        return lines

    def bound_to_list(self):
        s = self.parse_window_dump(paras_dictory)
        temp = s.replace([,‘‘).replace(],,).split(,)
        temp.remove(‘‘)
        return temp

    def get_centre_coordinate(self):
        c_list = self.bound_to_list()
        point_c = []
        co_x1 = c_list[0]
        co_x2 = c_list[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = c_list[1]
        co_y2 = c_list[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def native_power_on(self):
        if not self.native_check_screen_on():
            power_on_command = self.adb_shell() + input keyevent 26
            self.execute_command(power_on_command).wait()   

    def native_unlock_screen(self):
        self.native_power_on()
        if self.native_is_screen_lock():
            self.native_push_up()


    def native_check_screen_on(self):
        is_screen_on = False
        for line in self.get_dumpsys_display():
            if mScreenState in line and ON in line:
                is_screen_on = True
        return is_screen_on 

    def native_push_up(self):
        device_resolution = self.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.adb_shell()  + input swipe  + x1 +   + y1 +   + x2 +   + y2
            self.execute_command(push_up_command).wait()

    def get_max_resolution(self):
        max_resolution = []
        for line in self.get_dumpsys_display():
            if mDisplayWidth in line:
                max_resolution.append(line.strip(\r).split(=)[-1])
            if mDisplayHeight in line:        
                max_resolution.append(line.strip(\r).split(=)[-1])     
        return max_resolution

    def take_snapshot(self):
        global xml_path
        xml_path=self.get_xml()
        file_path = self.generate_folder_locate_picture()
        picture_name = self.generate_screenshot_name_format()
        uiautomator_dump_file_name = picture_name.split(.)[0] + .uix
        uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
        file_path_device = os.path.join(/sdcard/ + picture_name)
        take_snapshot_command = self.adb_shell() + /system/bin/screencap -p  + file_path_device
        pull_snapshot_command = adb -s  + self.device_id +  pull  + file_path_device +   + file_path
        get_uiautomator_dump_command = self.adb_shell() + uiautomator dump
        pull_uiautomator_dump_command = adb -s  + self.device_id +  pull /sdcard/window_dump.xml +   + uiautomator_dump_file
        self.native_unlock_screen()
        time.sleep(0.5)
        self.execute_command(take_snapshot_command).wait()
        self.execute_command(pull_snapshot_command).wait()
        self.execute_command(get_uiautomator_dump_command).wait()
        self.execute_command(pull_uiautomator_dump_command).wait()        

    def little_swipe(self):
        device_resolution_2 = self.get_max_resolution()
        max_w_2 = device_resolution_2[0]
        max_h_2 = device_resolution_2[1]
        x1_2 = str(int(max_w_2)/2)
        x2_2 = str(int(max_w_2)/2)
        y2_2 = str((int(max_h_2)*2)/5)
        y1_2 = str((int(max_h_2)*4)/5)
        little_swipe_command = self.adb_shell() + input swipe  + x1_2 +   + y1_2 +   + x2_2 +   + y2_2
        self.execute_command(little_swipe_command).wait()


class Device(object):
    def __init__(self,executor):
        self.executor = executor

    def return_home(self):
        go_home_command = self.executor.adb_shell() + input keyevent 3
        self.executor.execute_command(go_home_command).wait()


    def press_enter(self):
        press_enter_command = self.executor.adb_shell() + input keyevent 66
        self.executor.execute_command(press_enter_command).wait()

    def press_menu(self):
        press_menu_command = self.executor.adb_shell() + input keyevent 82
        self.executor.execute_command(press_menu_command).wait()

    def shift_tab(self,counter=1):
        tab_key = 61 
        shift_tab_command = self.executor.adb_shell() + input keyevent  + (tab_key*int(counter)).strip()
        self.executor.execute_command(shift_tab_command).wait()

    def shift_down(self,counter=1):
        down_key = 20 
        press_down_command = self.executor.adb_shell() + input keyevent  + (down_key*int(counter)).strip()
        self.executor.execute_command(press_down_command).wait()    

    def shift_up(self,counter=1):
        up_key = 19 
        press_up_command = self.executor.adb_shell() + input keyevent  + (up_key*int(counter)).strip()
        self.executor.execute_command(press_up_command).wait()

    def shift_left(self,counter=1):
        press_left_command = self.executor.adb_shell() + input keyevent 21
        for j in range(int(counter)):
            self.executor.execute_command(press_left_command).wait()
            time.sleep(1)

    def volume_up(self,counter=1):
        volume_up_command = self.executor.adb_shell() + input keyevent 24
        for j in range(int(counter)):
            self.executor.execute_command(volume_up_command).wait()
            time.sleep(0.5)

    def volume_down(self,counter=1):
        volume_down_command = self.executor.adb_shell() + input keyevent 25
        for j in range(int(counter)):
            self.executor.execute_command(volume_down_command).wait()
            time.sleep(0.5)

    def shift_right(self,counter=1):
        press_right_command = self.executor.adb_shell() + input keyevent 22
        for k in range(int(counter)):
            self.executor.execute_command(press_right_command).wait()
            time.sleep(1)

    def push_up(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.executor.adb_shell() + input swipe  + x1 +   + y1 +   + x2 +   + y2
            self.executor.execute_command(push_up_command).wait()



    def swipe_right(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/5)
            x2 = str((int(max_w)*4)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_right_command = self.executor.adb_shell() + input swipe  + x1 +   + y1 +   + x2 +   + y2
            self.executor.execute_command(swipe_right_command).wait()

    def check_boot_status(self):
        is_booted = False
        check_boot_status_command = self.executor.adb_shell() + getprop
        grep_command = grep bootanim
        p5 = sp.Popen(check_boot_status_command.split( ),stdout=sp.PIPE)
        p6 = sp.Popen(grep_command.split( ),stdin=p5.stdout,stdout=sp.PIPE)
        out = p6.stdout.read().strip(\n)
        if "stopped" and 1 in out:
            is_booted = True
        return is_booted


    def check_reboot_progress(self):
        reboot_execute_success = False
        if not self.check_boot_status():
            reboot_execute_success = True
        return reboot_execute_success    


    def reboot_device(self):
        reboot_device_command = self.executor.adb_command() + reboot
        self.executor.execute_command(reboot_device_command).wait()

    def input_text(self,string=%s):
        string = string.strip().replace( ,%s)    
        input_text_command = self.executor.adb_shell() + input text  + string
        self.executor.execute_command(input_text_command).wait()



    def swipe_left(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str((int(max_w)*4)/5)
            x2 = str(int(max_w)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_left_command = self.executor.adb_shell()  + input swipe  + x1 +   + y1 +   + x2 +   + y2
            self.executor.execute_command(swipe_left_command).wait()


    def pull_down(self):
        device_resolution_1 = self.executor.get_max_resolution()
        max_w_1 = device_resolution_1[0]
        max_h_1 = device_resolution_1[1]
        if max_w_1 ==unknow or max_h_1 ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1_1 = str(int(max_w_1)/2)
            x2_1 = str(int(max_w_1)/2)
            y1_1 = str(int(max_h_1)/5)
            y2_1 = str((int(max_h_1)*4)/5)
            push_up_command_1 = self.executor.adb_shell()   + input swipe  + x1_1 +   + y1_1 +   + x2_1 +   + y2_1
            self.executor.execute_command(push_up_command_1).wait()

    def swipe_pull_notification(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = "0"
            y2 = str(int(max_h)/5)
            swip_noticfi_command = self.executor.adb_shell()    + input swipe  + x1 +   + y1 +   + x2 +   + y2
            self.executor.execute_command(swip_noticfi_command).wait()

    def get_through_oobe(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        print "[debug] max_w and max_h is ",max_w,max_h
        if max_w ==unknow or max_h ==unknow:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
            p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
            p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
            p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
            tap_command1 = self.executor.adb_shell() + input tap  + p1[0] +   + p1[1]
            tap_command2 = self.executor.adb_shell() + input tap  + p2[0] +   + p2[1]
            tap_command3 = self.executor.adb_shell() + input tap  + p3[0] +   + p3[1]
            tap_command4 = self.executor.adb_shell() + input tap  + p4[0] +   + p4[1]
            self.executor.execute_command(tap_command1).wait()
            self.executor.execute_command(tap_command2).wait()
            self.executor.execute_command(tap_command3).wait()
            self.executor.execute_command(tap_command4).wait()      

    def power_on(self):
        if not self.check_screen_on():
            power_on_command = self.executor.adb_shell() + input keyevent 26
            self.executor.execute_command(power_on_command).wait()


    def press_back(self,counter=1):
        press_back_command = self.executor.adb_shell() + input keyevent 4
        for i in range(int(counter)):
            self.executor.execute_command(press_back_command).wait()
            time.sleep(1)



    def check_screen_on(self):
        is_screen_on = False
        for line in self.executor.get_dumpsys_display():
            if mScreenState in line and ON in line:
                is_screen_on = True
        return is_screen_on     

    def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
        intent_dictory = self.executor.get_intents_dictory()
        if app in intent_dictory.keys():
            app_intent = intent_dictory[app][intent]  
            launch_app_command = self.executor.adb_shell() + am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n  + app_intent
            self.executor.execute_command(launch_app_command).wait()
        else:
            print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
            sys.exit(-1) 


class Panel(object):
    def __init__(self,panel):
        self.panel = panel

    def click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        click_command = self.panel.adb_shell() + input tap  + click_coordinate[0] +   + click_coordinate[1]
        self.panel.execute_command(click_command).wait()

    def swipe_up(self,times=1):
        coordinate_list = self.panel.bound_to_list()
        x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
        y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
        push_up_command = self.panel.adb_shell() + input swipe  + x1 +   + y1 +   + x2 +   + y2
        for loop in range(int(times)):
            self.panel.execute_command(push_up_command).wait()

    def long_click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        px = str(int(click_coordinate[0])+1)
        py = str(int(click_coordinate[1])+1)
        during_time = 2000
        long_click_command = self.panel.adb_shell() + input swipe  + click_coordinate[0] +   + click_coordinate[1] +   + px +   + py +   + during_time
        self.panel.execute_command(long_click_command).wait()

    def drag_to(self,dx,dy):
        drag_to_command = []
        down_key_coordinate = self.panel.get_centre_coordinate()
        down_key_px = str(int(down_key_coordinate[0]))
        down_key_py = str(int(down_key_coordinate[1]))
        down_key_command = self.touch_down(down_key_px,down_key_py)
        move_command = self.move(str(dx),str(dy))
        up_key_command = self.touch_up(str(dx),str(dy))
        if self.get_pid() is None:
            self.generate_monkey_process()
        monkey_pid_num = self.get_pid()
        if monkey_pid_num is not None:
            drag_to_command.append(down_key_command)
            drag_to_command.append(move_command)
            drag_to_command.append(up_key_command)
            self.open_socket(drag_to_command)
            self.kill_monkey_process(monkey_pid_num)
        if self.get_pid() is not None:
            self.kill_monkey_process(monkey_pid_num)    

    def touch_down(self,dx,dy):
        return touch down {0} {1}\n.format(dx,dy)

    def touch_up(self,dx,dy):
        return touch up {0} {1}\n.format(dx,dy)

    def move(self,dx,dy):
        return touch move {0} {1}\n.format(dx,dy)

    def get_pid(self):
        monkey_pid=None
        r2=r\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey
        grep_monkey_command = self.panel.adb_shell() + ps
        o = sp.Popen(grep_monkey_command.split( ),stdout=sp.PIPE).stdout.read().strip(\n)
        re_result = re.findall(r2,o)
        if re_result == []:
            monkey_pid=None
        else:
            monkey_pid=re_result[0]    
        return monkey_pid

    def kill_monkey_process(self,monkey_pid):
        if monkey_pid is not None:
            kill_monkey_process_command = self.panel.adb_shell() + kill  + monkey_pid
            self.panel.execute_command(kill_monkey_process_command).wait()

    def generate_monkey_process(self):
        generate_monkey_process_command = self.panel.adb_shell() + LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555
        p2 = sp.Popen(generate_monkey_process_command.split( ),stdout=sp.PIPE)
        t1 = threading.Timer(5,lambda:self.kill(p2))
        t1.setDaemon(True)
        t1.start()
        time.sleep(5)
        if self.get_pid() is not None:
            forwad_command = adb -s  + self.panel.device_id +  forward tcp:15555 tcp:15555
            self.panel.execute_command(forwad_command).wait()

    def open_socket(self,cmd_list):
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.settimeout(10)
        sk.connect((127.0.0.1,15555))
        for cmd in cmd_list:
            self.panel.log_class.info("Execute command : {0}".format(cmd)) 
            sk.sendall(cmd)
            time.sleep(1.5)
        sk.send(quit\n)
        sk.close()    

    def kill(self,process):
        process.terminate()
        process.kill()







96
yaboandriod · #3 · 2017年01月16日
#1楼 @264768502 
第一次发,不懂怎么发代码,现在改过来了。
96
yaboandriod · #4 · 2017年01月16日
logger代码如下:

#!/usr/bin/python
import logging
import os

class Logger():

    def __init__(self,name,log_path):
        self.log_path = log_path
        self.name = name
        self.log_file = os.path.join(log_path,log.txt)
        if not os.path.isdir(log_path):
            try:
                os.makedirs(log_path)
            except OSError:
                pass             

    def modlogger(self):
        mylogger = logging.getLogger(self.name)
        mylogger.setLevel(logging.DEBUG)
        fh = logging.FileHandler(self.log_file)
        sh = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s  %(levelname)s\t%(message)s")
        fh.setFormatter(formatter)
        sh.setFormatter(formatter)
        mylogger.addHandler(fh)
        mylogger.addHandler(sh)
        return mylogger

    def get_instance(self):
        logger = Logger(self.name,self.log_path)
        return logger

    def logger(self):
        methods = self.get_instance()
        return Methods(methods)  

class Methods(object):
    def __init__(self,obj):
        self.ml = obj.modlogger()

    def info(self,string):
        self.ml.info(string)

    def debug(self,string):
        self.ml.debug(string)

    def warning(self,string):
        self.ml.warning(string)

    def error(self,string):
        self.ml.error(string)


if __name__=="__main__":
    l = Logger(test0,/home/buildbot/bluesea/reporter)
    m = l.logger()
    m.info(------this is info message-------)
    m.error(tap ok button fail)
    m.debug(-----this is debug message-------)
    m.warning(you have not set timeout)

96
yaboandriod · #5 · 2017年01月16日
ui_hooks模块如下:

import controler
import time
import inspect
import subprocess as sp

"""
It provides a way to get through kinds of instability pup up box 
"""


class Hooks:
    def __init__(self,device_id):   
        self.device_id=device_id

    def get_generators(self,generator=None):
        global frame
        frame={}
        a,b,c,d,e,f,g=[],[],[],[],[],[],[]
        for element in generator:
            a.append(element.get(package))
            b.append(element.get(index))
            c.append(element.get(text))
            d.append(element.get(content-desc))
            e.append(element.get(resource-id))
            f.append(element.get(bounds))
            g.append(element.get(enabled))

        for i in range(len(a)):
            frame[i+1]={package:a[i],index:b[i],text:c[i],description:d[i],id:e[i],bounds:f[i],enabled:g[i]}



    def parse(self):
        status=False
        status=self.divider()
        return status

    def divider(self):
        flag=False
        main_key = frame[1][package]
        seducer001 = inspect.getargspec(self.hook_maps)[3][0]
        seducer002 = inspect.getargspec(self.hook_contact)[3][0]
        seducer003 = inspect.getargspec(self.hook_gms)[3][0]
        seducer004 = inspect.getargspec(self.hook_settings)[3][0]
        seducer005 = inspect.getargspec(self.hook_stk)[3][0]
        seducer006 = inspect.getargspec(self.hook_android)[3][0]        
        if main_key == seducer001:
            flag = self.executer(seducer001,self.hook_maps())       
        if main_key ==  seducer002:
            flag = self.executer(seducer002,self.hook_contact())
        if main_key ==  seducer003:
            flag = self.executer(seducer003,self.hook_gms())
        if main_key ==  seducer004:
            flag = self.executer(seducer004,self.hook_settings())
        if main_key ==  seducer005:
            flag = self.executer(seducer005,self.hook_stk()) 
        if main_key ==  seducer006:
            flag = self.executer(seducer006,self.hook_android())                                            
        return flag        


    def executer(self,sed,func):
        e_flag=False
        print "this app is %s"% sed
        coodinate=func
        if coodinate is None:
            e_flag = False
        else:
            pointseq=self.get_point(coodinate)
            self.tap(pointseq)  
            e_flag =True
        return e_flag           


    def hook_maps(self,seducer=com.google.android.apps.maps):
        coodinate=None
        zipers=[]
        print "hook_maps method is invoked"
        return coodinate


    def hook_stk(self,seducer=com.android.stk):
        coodinate=None
        print "hook_stk method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if button_ok in frame[j][id].strip(\n):
                coodinate=frame[j][bounds]        
        return coodinate


    def hook_android(self,seducer=android):
        coodinate=None
        print "hook_android method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if button1 in frame[j][id].strip(\n):
                coodinate=frame[j][bounds]        
        return coodinate


    def hook_settings(self,seducer=com.android.settings):
        coodinate=None
        zipers=["Update preferred SIM card?"]
        print "hook_settings method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][text].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if NO in frame[j][text].strip(\n):
                    coodinate=frame[j][bounds]        
        return coodinate


    def hook_gms(self,seducer=com.google.android.gms):
        coodinate=None
        zipers=["Couldn‘t sign in"]
        print "hook_gms method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][text].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if Next in frame[j][text].strip(\n):
                    coodinate=frame[j][bounds]        
        return coodinate

    def hook_contact(self,seducer=com.google.android.packageinstaller):
        coodinate=None  
        zipers = ["Allow Contacts to access photos, media, and files on your device?"]
        print "hook_contact method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][text].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if Allow in frame[j][text].strip(\n):
                    coodinate=frame[j][bounds]
                if Next in frame[j][text].strip(\n):
                    coodinate=frame[j][bounds]  
        return coodinate

    def get_point(self,coodinate):
        s = coodinate
        temp = s.replace([,‘‘).replace(],,).split(,)
        temp.remove(‘‘)
        point_c = []
        co_x1 = temp[0]
        co_x2 = temp[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = temp[1]
        co_y2 = temp[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def tap(self,point_list):
        tap_coordinate = point_list
        tap_command = self.adb_shell() + input tap  + tap_coordinate[0] +   + tap_coordinate[1]
        self.execute_command(tap_command).wait()                      


    def adb_shell(self):
        adb_shell_command = adb -s  + self.device_id +  shell 
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            print ccmd
        else:
            pass    
        proc = sp.Popen(ccmd.split( ),stdout=sp.PIPE)    
        return proc
96
yaboandriod · #6 · 2017年01月16日
某个库文件如下:

 # coding: utf-8 
import os
import time
import sys
import subprocess as sp
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from engine import controler
max_retry = 30

class Commons:
    def __init__(self,conf_dic,ins_logger=None):
        self.conf_dic = conf_dic
        name = os.path.basename(conf_dic[TC]).split(.)[0]
        workpath = conf_dic[work_dir]
        dev = conf_dic[device_id]
        logpath = conf_dic[log_path]    
        log_path = os.path.join(workpath,logpath,name)
        self.mylogger = ins_logger   
        self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path)

    def warm_reboot(self):
        if not self.op.hardware().check_boot_status():
            self.mylogger.error("[Common] Device is not ready for test reboot case. ")
            return False
        self.mylogger.info("[Common] Device is ready. ")    
        self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id))
        self.get_boot_infomation()
        self.op.hardware().reboot_device()
        time.sleep(15)
        self.mylogger.info("[Common] Start to check if execute reboot command? ")
        if not self.op.hardware().check_reboot_progress():
            self.mylogger.error("[Common] unknow error, reboot fail. ")
            return False
        self.mylogger.info("[Common] Device start booting...")    
        while 1:  
            if self.op.hardware().check_boot_status():
                self.mylogger.info("[Common] Device reboot successfully. ")
                break       
        return True

    def get_boot_infomation(self):
        get_boot_infomation_command = adb -s  + self.op.device_id +  shell getprop | grep boot
        out = sp.Popen(get_boot_infomation_command.split( ),stdout=sp.PIPE).stdout.read().strip(\n\r)
        self.mylogger.info(out)

    def log_on_google_account(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app=Settings)
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
            return False 
        try:
            self.mylogger.info("[Common] click Add account item ")
            self.op.search(text="Add account").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Add account item . Got Exception {0}".format(e))
            return False
        try:
            self.mylogger.info("[Common] click Google item ")
            self.op.search(text="Google").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Google item . Got Exception {0}".format(e))
            return False
        retry_count = 0    
        while 1:
            try:
                self.mylogger.info("[Common] click enter your email input edittext ")
                self.op.search(description="Enter your email ").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find Enter your email input edittext . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic[google_account]))
        self.op.hardware().input_text(self.conf_dic[google_account])
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        time.sleep(5)    
        try:
            self.mylogger.info("[Common] click password edittext ")
            self.op.search(id="password").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find password edittext . Got Exception {0}".format(e))
            return False
        self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic[google_password]))
        self.op.hardware().input_text(self.conf_dic[google_password])            
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click ACCEPT button ")
                self.op.search(description="ACCEPT").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find ACCEPT button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False        
        while 1:
            try:
                self.mylogger.info("[Common] check Google services shown??? ")
                self.op.search(text="Google services").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t Google services keywords. try again")
                    time.sleep(5)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Next button ")
            self.op.search(text="Next",enabled=true,retry_time=3).click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click No thanks button ")
                self.op.search(text="No thanks").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find No thanks button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Continue button ")
            self.op.search(text="Continue").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Continue button . Got Exception {0}".format(e))
            return False
        time.sleep(2)
        return True

    def check_google_account_added(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app=Settings)
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
            return False
        if self.op.search(text=Google) is not None:
            self.mylogger.info("[Common] add google account successfully. ")
            return True
        else:
            self.mylogger.error("[Common] add account fail. ")
            return False        

















96
yaboandriod · #7 · 2017年01月16日
入口程序:

import xml.etree.ElementTree as ET
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir)))
import argparse
import random
import subprocess as sp
import pickle
import datetime
import re
import time
from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt
import generate_report


class Configuration:

    def __init__(self):
        self.log_path = ‘‘
        self.execute_main_campaign = ‘‘
        self.device_id = ‘‘
        self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir))

    def load(self,filepath):
        temp_dict = {}
        try:
            execfile(filepath,{},temp_dict)
            for key in temp_dict.keys():
                setattr(self,key,temp_dict[key])
        except TypeError,errormsg:
            print "Error : No file path given!"
            print "you can add -h option to view more info"
            sys.exit(-1)
        except IOError,msg1:
            print msg1
            print "Error : Please specify a path which contain a config file!"
            sys.exit(-1)


def get_testcase_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    item_instance_list = root.iter(item)
    return item_instance_list

def get_testsuite_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    suite_instance_list = root.iter(suite)
    return suite_instance_list

def get_full_path(config,string):
    return os.path.join(config.work_dir,string) + .py


def generate_file(config,dictory):
    temporary_path = os.path.join(config.work_dir,temporary,config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,settings_temp.pkl)
    f1 = open(pickle_dump_path,wb)
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def save_data_for_html(config,dictory):
    temporary_path = os.path.join(config.work_dir,temporary,config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,html_raw_data.pkl)
    f1 = open(pickle_dump_path,wb)
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def main(config):
    r1=rFail|Pass
    r2=r(\d+)
    r_crash_data0 = rDATA0=(\S+\s)[\s\S]*
    r_crash_data1 = rDATA1=(\S+\s)[\s\S]*
    r_crash_data2 = rDATA2=(\S+\s)[\s\S]*
    seq_num = 1
    html_result_data = {}
    settings_list = dir(config)
    for _ in settings_list:
        if _ == __doc__:
            settings_list.remove(_)
        if _ == __module__:
            settings_list.remove(_)
        if _ == load:
            settings_list.remove(_)    
    settings_list.remove(__init__)
    settings_dictory = {}
    for _ in settings_list:
        settings_dictory[_] = getattr(config,_)
    rs = run_set_+datetime.datetime.now().strftime(%Y_%m_%d_%H_%M_%S)    
    settings_dictory[log_path] = os.path.join(settings_dictory[log_path],config.device_id,rs)
    execue_case_sequence = {}
    for testsuite in get_testsuite_from_xml(config):
        execue_case_sequence[suite_name] = testsuite.get(name)
        execue_case_sequence[suite_loop] = testsuite.get(loop)
        execue_case_sequence[suite_mode] = testsuite.get(mode)
        execue_case_sequence[suite_content] = {}
    execue_case_sequence[device_id] = settings_dictory[device_id]    
    for testcase in get_testcase_from_xml(config):
        execue_case_sequence[suite_content][seq_num] = [testcase.text,testcase.get(loop)]   
        seq_num += 1
    suite_loop = int(execue_case_sequence[suite_loop])
    suite_mode = execue_case_sequence[suite_mode]
    tc_symbol_list = execue_case_sequence[suite_content].keys()
    suite_content = execue_case_sequence[suite_content]
    if suite_loop == 0 or tc_symbol_list == []:
        sys.exit(0)
        print "there is no loop specify for running testsuite or no cases need to be executed!"
    need_to_change_sequence = False    
    if suite_mode == regular:
        need_to_change_sequence = False
    elif suite_mode == random:
        need_to_change_sequence = True
    else:
        need_to_change_sequence = False
    html_result_data = execue_case_sequence
    original_history = parse_history(settings_dictory[device_id])
    for sl in range(1,suite_loop+1):
        print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence[suite_name],sl)
        if need_to_change_sequence:
            random.shuffle(tc_symbol_list)
        for i in range(len(tc_symbol_list)):
            item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1
            item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0])
            case_name = os.path.basename(item_path).split(.)[0]
            report_path = os.path.join(config.work_dir,settings_dictory[log_path],case_name)
            if item_loop == 0:
                continue    
            for j in range(1,item_loop+1):
                print "Start to run case : {0} , loop {1}".format(case_name,j)
                start_time = time.time()
                start_time_dateformat = datetime.datetime.now().strftime(%Y/%m/%d_%H:%M:%S)
                settings_dictory[TC] = item_path
                dir_path = generate_file(config,settings_dictory)
                execute_command = python  + item_path +   + dir_path
                proc=sp.Popen(execute_command.split( ),stdout=sp.PIPE)
                out = proc.stdout.read().strip(\n)
                proc.wait()
                current_history = parse_history(settings_dictory[device_id])
                stop_time = time.time()
                stop_time_dateformat = datetime.datetime.now().strftime(%Y/%m/%d_%H:%M:%S)
                time_taken = stop_time - start_time
                crash_list = iter_event(get_different_list(original_history,current_history))
                print_crashfile = ‘‘
                crashes = {}
                crash_dic = {}
                c_count = 1
                for crash in crash_list:
                    crashlog_path = get_crash_path(crash)
                    if crashlog_path is not None:
                        print_crashfile = get_crashfile_conetnt(settings_dictory[device_id],crashlog_path)
                    match_data0 = re.findall(r_crash_data0,print_crashfile)
                    match_data1 = re.findall(r_crash_data1,print_crashfile)
                    match_data2 = re.findall(r_crash_data2,print_crashfile)
                    crash_dic[crash_info] = crash.strip(\r)
                    crash_dic[crash_data0] = match_data0[0] if match_data0 != [] else ‘‘
                    crash_dic[crash_data1] = match_data1[0] if match_data1 != [] else ‘‘ 
                    crash_dic[crash_data2] = match_data2[0] if match_data2 != [] else ‘‘
                    crashes[c_count] = crash_dic
                    c_count+=1
                crash_number = len(crash_list)
                original_history = current_history
                html_result_data[suite_content][i+1].append(re.findall(r1,out)[0])
                html_result_data[suite_content][i+1].append(start_time_dateformat)
                html_result_data[suite_content][i+1].append(stop_time_dateformat)
                html_result_data[suite_content][i+1].append(round(time_taken,3))
                html_result_data[suite_content][i+1].append(crash_number)
                html_result_data[suite_content][i+1].append(crashes)  
            address = save_data_for_html(config,html_result_data)
            generate_report.main(address,report_path)


if __name__ == __main__:
    parse=argparse.ArgumentParser(usage=%(prog)s [options])
    parse.add_argument(configuration_file,type=str,nargs=?,help="A path of config file")
    args=parse.parse_args()
    config_file = args.configuration_file
    configuration = Configuration()
    configuration.load(config_file)
    main(configuration)
96
yaboandriod · #8 · 2017年01月16日
自定义生成测试报告:

import os
import sys
from pyh import *
import pickle

def main(address,report_path):
    f = open(address,rb)
    database1 = pickle.load(f)
    f.close()
    case_name_pre = report_path.split(/)[-1]
    report_path_base = os.path.abspath(os.path.join(report_path,os.pardir))
    report_file_path = os.path.join(report_path_base,report.html)
    content_dic = database1[suite_content]    
    page = PyH(test report)
    page << h1(this is report of yath project)
    order_list = page << ul(type=square)
    order_list << li(device id : %s%database1[device_id])
    order_list << li(suite mode : %s%database1[suite_mode])
    order_list << li(suite loop : %s%database1[suite_loop])
    order_list << li(suite name : %s%database1[suite_name])
    page << hr()
    p1_content = Loop_+database1[suite_loop]+:
    page << p(i(p1_content))
    t = page << table(border=1,bgcolor=#bbbb99)
    t << tr(th(test case,align=left)+th(test loop,align=center)+th(test result,align=center)+th(start time,align=center)+th(stop time,align=center)+th(duration(s),align=center)+th(crashes,align=center))
    for k in content_dic:
        item = content_dic[k]
        if len(item) == 2:
            continue
        test_case = item[0]
        test_loop = item[1]    
        if item[1] == 0:
            test_result,start_time,stop_time,duration,crashes = NA,NA,NA,NA,0
        else:
            test_result = item[2]
            start_time = item[3]
            stop_time = item[4]
            duration = item[5]
            crashes = item[6]
        if test_result == Pass:
            attr_co_test_result = green
        elif test_result == Fail:
            attr_co_test_result = red
        elif test_result == NA:
            attr_co_test_result = grey
        if int(crashes) > 0:
            crashes_dic = item[7]
            generate_crashes_page(crashes_dic,test_case,report_path)
            link_crash = file://+os.path.join(report_path,crash.html)
            t << tr(td(test_case,align=left)+td(test_loop,align=center)+td(test_result,align=center,bgcolor=attr_co_test_result)+td(start_time,align=center)+td(stop_time,align=center)+td(duration,align=center)+td(a(crashes,href=http://www.mamicode.com/link_crash),align=center))
        else:              
            t << tr(td(test_case,align=left)+td(test_loop,align=center)+td(test_result,align=center,bgcolor=attr_co_test_result)+td(start_time,align=center)+td(stop_time,align=center)+td(duration,align=center)+td(crashes,align=center))    

    page.printOut(report_file_path)


def generate_crashes_page(dictory,case_name,crash_path):
    crash_file_path = os.path.join(crash_path,crash.html)
    crash_page = PyH(Crash Page)
    crash_page << h1(this is a page of crash.)
    crash_page << p(detail info : )
    for j in dictory:
        div1 = crash_page << div()
        h2_content = Crash %s occurs when execute %s : %(str(j),case_name)
        div1 << h2(h2_content)
        crash_table = div1 << table(frame=box)
        crash_table << tr(th(key,align=left)+th(value),align=left)
        crash_table << tr(td(crash info,align=left)+td(dictory[j][crash_info]),align=left)
        crash_table << tr(td(data 0,align=left)+td(dictory[j][crash_data0]),align=left)
        crash_table << tr(td(data 1,align=left)+td(dictory[j][crash_data1]),align=left)
        crash_table << tr(td(data 2,align=left)+td(dictory[j][crash_data2]),align=left)
    crash_page.printOut(crash_file_path)  

if __name__ == "__main__":
    main()


96
yaboandriod · #9 · 2017年01月16日  
测试用例:

import os
import unittest
import sys
import pickle
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from container.base import settings
from engine import log


dir_path = sys.argv[1]
f = open(dir_path,rb)
config_dictory = pickle.load(f)
f.close()
name = os.path.basename(config_dictory[TC]).split(.)[0]
workpath = config_dictory[work_dir]
logpath = config_dictory[log_path]    
log_path = os.path.join(workpath,logpath,name)
l = log.Logger(name,log_path)
mylogger = l.logger()

class test_setup_wifi_class(unittest.TestCase):
    def setUp(self):
        self.test_class = settings.Settings(config_dictory,mylogger)
    def tearDown(self):
        self.test_class.op.hardware().press_back(3)
        self.test_class.op.hardware().return_home()
    def test_skip_wizard(self):
        self.assertEqual(self.test_class.setup_wifi(),True,test setup_wifi case fail)    




if __name__ == "__main__":
    my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class)
    result = unittest.TextTestRunner(verbosity=0).run(my_suite)
    if len(result.failures) != 0 or len(result.errors) != 0:
        flag = Fail
    else:
        flag = Pass
    print flag,(len(result.failures)+len(result.errors))
96
yaboandriod · #10 · 2017年02月03日
自己顶一下??
Db42eb
mads · #11 · 2017年02月05日
你这个代码干嘛用的,陈述一下
D38bdc
cloudhuan · #12 · 2017年02月05日
???干嘛用的???封装了哪里?有什么优势?
96
yaboandriod · #13 · 2017年02月06日
针对android操作系统UI进行自动化测试,可以进行绝大多数的操作,多个app之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。
96
yaboandriod · #14 · 2017年02月06日
这是我写的一个小测试框架,case和lib可以自己定义,提供了log和controler接口。log用来打印日志,controler用来控制设备。
回帖

 

264768502 · #1 · 2017年01月13日

小小的建议
没缩进不能看,不如贴gist
单纯的adb的封装有很多人写了
比如我(#厚脸皮) https://github.com/264768502/adb_wrapper
比如这贴: https://testerhome.com/topics/6938

如果要处理UI的话,其实有现成的,比如pyuiautomator或者Appium

技术分享
yaboandriod · #2 · 2017年01月16日

controller代码

import xml.etree.ElementTree as ET
import os
import sys
import subprocess as sp
import time
import logging
import re
import codecs
import datetime
import ui_hooks
import socket
import threading


WINDOW_FACTOR = [‘index‘, ‘text‘, ‘resource-id‘, ‘class‘, ‘package‘, ‘visible‘, ‘content-desc‘, ‘checkable‘,
              ‘checked‘, ‘clickable‘, ‘enabled‘, ‘focusable‘, ‘focused‘, ‘scrollable‘, ‘long-clickable‘,
              ‘password‘, ‘bottom‘, ‘selected‘, ‘bounds‘]

RETRY_MAX_TIME = 3

def get_data(address):
    result={}
    temp = ‘‘
    f1 = open(address,‘r‘)
    temp = f1.read()
    result = eval(temp)
    f1.close()
    return result 




class Operator:
    def __init__(self,device_id=‘‘,log_class=None,log_path=‘‘,**kwargs):
        self.device_id = device_id
        self.log_class = log_class
        self.log_path = log_path
        self.current_path = os.path.dirname(__file__) 
        self.home_dir = os.path.expanduser(‘~‘)
        if not os.path.isdir(self.log_path): 
            try:
                os.makedirs(log_path)
            except OSError:
                pass         
        self.failure_pic_location = os.path.join(self.log_path,‘screenshots‘)
        if not os.path.isdir(self.failure_pic_location):
            try:
                os.makedirs(self.failure_pic_location)
            except OSError:
                pass             
        self.temporary = os.path.join(self.log_path,‘temp‘)
        if not os.path.isdir(self.temporary):
            try:
                os.makedirs(self.temporary)
            except OSError:
                pass             
        self.hooks = ui_hooks.Hooks(self.device_id)       



    def get_instance(self):
        op = Operator(self.device_id,self.log_class,self.log_path)
        return op

    def decode_utf(self,var):
        utype = codecs.lookup("utf-8")
        return utype.decode(var)[0]

    def update_dictory(self,dictory):
        keys_list = dictory.keys()
        values_list = dictory.values()
        temp={}
        for i in range(len(keys_list)):
            temp[keys_list[i]] = self.decode_utf(values_list[i])
        return temp    

    def search(self,**kwargs):
        global paras_dictory,retry_time
        temp = kwargs
        paras_dictory = self.update_dictory(temp)
        if paras_dictory.has_key("retry_time"):
            retry_time = paras_dictory["retry_time"]
        else:
            retry_time = None
        return self.judge(paras_dictory,retry_time)

    def judge(self,dic,retry_time):
        timer = 0      
        global xml_path    
        xml_path = self.get_xml()
        if xml_path is None:
            print "Error : can not generate window dump file."
            panel = None
            return panel
        else:
            self.native_unlock_screen() 
            if retry_time is not None:
                while 1:
                    if timer < int(retry_time):
                        if self.parse_window_dump(paras_dictory) is None:
                            print "Warnning : there is no view or many views match your require, retry again!"
                            self.little_swipe()
                            time.sleep(1)
                            timer+=1
                            self.judge(paras_dictory,retry_time)
                        else:
                            op = self.get_instance()
                            panel = Panel(op)
                            return panel
                            break
                    else:
                        panel = None
                        return panel
            else:
                if self.parse_window_dump(paras_dictory) is None:
                    print "Warnning : there is no view or many views match your require, start to search hooks library...!" 
                    self.hooks.get_generators(self.get_all_nodes())
                    status = self.hooks.parse()
                    # if status is True, it means find corresponding frame and click next button to skip this frame
                    if status:
                        time.sleep(1)
                        self.judge(paras_dictory,retry_time).click()
                    else:
                        panel = None
                        return panel
                else:
                    op = self.get_instance()
                    panel = Panel(op)
                    return panel


    def get_intents_dictory(self):
        bluesea_dir = os.path.abspath(os.path.join(self.current_path,os.pardir))
        app_intents_file_path = os.path.join(bluesea_dir,‘repository/intent_list.cfg‘)
        return get_data(app_intents_file_path)


    def hardware(self,**kwargs):    
        executor = self.get_instance()
        dut = Device(executor)
        return dut


    def get_all_nodes(self):
        parser = ET.parse(xml_path)
        root = parser.getroot()
        node_instance_list = root.iter(‘node‘)
        return node_instance_list

    def get_scrollable_view(self):
        for ins in self.get_all_nodes():
            if ins.attrib["scrollable"] == "true":
                bound = ins.get("bounds")
                break
            else:
                bound = None
        return bound

    def native_is_screen_lock(self):
        is_screen_lock = False
        for ins in self.get_all_nodes():
            id_collector = ins.attrib[‘resource-id‘]
            if "id/lock_icon" in id_collector.strip(\n):
                is_screen_lock = True
                break     
        return is_screen_lock


    def parse_window_dump(self,parametres=None):
        r1 = r‘[A-Za-z]+:id/(\w+)‘
        bound = None
        p_counter = []
        bound_list = [] 
        counter_id = 0
        counter_index = 0
        counter_desc = 0
        counter_pack = 0
        counter_text = 0
        counter_enabled = 0
        false_flag = 0
        if parametres.has_key(‘id‘):
            view_id = parametres[‘id‘]
            p_counter.append(view_id)
        if parametres.has_key(‘text‘):
            view_text = parametres[‘text‘]
            p_counter.append(view_text)
        if parametres.has_key(‘description‘):
            view_description = parametres[‘description‘]
            p_counter.append(view_description)
        if parametres.has_key(‘index‘):
            view_index = parametres[‘index‘]
            p_counter.append(view_index)
        if parametres.has_key(‘package‘):
            view_package = parametres[‘package‘]
            p_counter.append(view_package)
        if parametres.has_key(‘enabled‘):
            view_enabled = parametres[‘enabled‘]
            p_counter.append(view_enabled)


        if len(p_counter) == 0:
            print "Error : invalid input, there is no useful parameter given!!!"
            sys.exit(-1)

        for factor in self.get_all_nodes():
            id_collector = factor.attrib[‘resource-id‘]
            text_collector = factor.attrib[‘text‘]  
            index_collector = factor.attrib[‘index‘]
            desc_collector = factor.attrib[‘content-desc‘]
            pack_collector = factor.attrib[‘package‘]
            enabled_collector = factor.attrib[‘enabled‘]
            for your_input in p_counter:
                id_strs = re.findall(r1,id_collector.strip(\n))
                id_string = id_strs
                if not id_strs == []:
                    id_string=id_strs[0]
                else:
                    id_string = id_collector.strip(\n\r)    
                if your_input == id_string:
                    bound_id = factor.get("bounds")
                    counter_id +=1
                if your_input == text_collector.strip(\n):
                    bound_text = factor.get("bounds")
                    counter_text +=1
                if your_input == index_collector.strip(\n):
                    bound_index = factor.get("bounds")
                    counter_index +=1
                if your_input == desc_collector.strip(\n):
                    bound_desc = factor.get("bounds")
                    counter_desc +=1
                if your_input == pack_collector.strip(\n):
                    bound_pack = factor.get("bounds")
                    counter_pack +=1
                if your_input == enabled_collector.strip(\n):
                    bound_enabled = factor.get("bounds")
                    counter_enabled +=1                    

        if counter_id == 0:
            false_flag += 1 
        if counter_index == 0:
            false_flag += 1 
        if counter_pack == 0:
            false_flag += 1            
        if counter_desc == 0:
            false_flag += 1 
        if counter_text == 0:
            false_flag += 1
        if counter_enabled == 0:
            false_flag += 1

        if counter_id == 1:
            bound_list.append(bound_id)
        if counter_text == 1:
            bound_list.append(bound_text)
        if counter_desc == 1:
            bound_list.append(bound_desc)
        if counter_pack == 1:
            bound_list.append(bound_pack)
        if counter_index == 1:
            bound_list.append(bound_index)
        if counter_enabled == 1:
            bound_list.append(bound_enabled)

        if false_flag <= 6 - len(p_counter):  
            if len(bound_list) == 0:
                bound = None
            elif len(bound_list) == 1:
                bound = bound_list[0]
            elif len(bound_list) == 2:
                temp_bound = bound_list[0]
                if bound_list[1] == temp_bound:
                    bound = temp_bound
                else:
                    bound = None        
            elif len(bound_list) > 2:
                temp_bound = bound_list[0]
                for bo in range(1,len(bound_list)):
                    if bound_list[bo] != temp_bound:
                        bound = None
                        break   
                    else:
                        bound = temp_bound
        else:
            bound = None                 
        return bound            

    def get_xml(self):
        retry_count = 0
        while 1:
            if  retry_count < RETRY_MAX_TIME:
                self.adb_root()
                shell_cmd = self.adb_shell()
                dump_xml_command = shell_cmd + ‘uiautomator dump /sdcard/window_dump.xml‘
                self.execute_command(dump_xml_command).wait()
                xml_file_on_device_path = ‘/sdcard/window_dump.xml‘
                xml_file_on_server_path = self.temporary
                xml_file_origin = os.path.join(xml_file_on_server_path,‘window_dump.xml‘)
                xml_new_name = self.device_id + ‘_‘ + ‘window_dump.xml‘
                xml_file = os.path.join(xml_file_on_server_path,xml_new_name)
                get_xml_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + xml_file_on_device_path + ‘ ‘ + xml_file_on_server_path
                self.execute_command(get_xml_command).wait()
                try:
                    os.rename(xml_file_origin,xml_file)
                except OSError as oer:
                    print oer
                    self.get_xml()    
                if os.path.isfile(xml_file):
                    return xml_file
                    break
                else:
                    retry_count += 1    
            else:
                print "Unkown issue of adb, uiautomator dump file failed!"  
                break
        return          

    def adb_root(self):
        root_command = ‘adb -s ‘ + self.device_id + ‘ root‘
        self.execute_command(root_command).wait()   

    def generate_folder_locate_picture(self):
        address_picture = self.failure_pic_location
        if not os.path.isdir(address_picture):
            try:
                os.makedirs(address_picture)
            except OSError:
                pass             
        return address_picture

    def generate_screenshot_name_format(self):
        dt = datetime.datetime.now().strftime(%Y_%m_%d_%H_%M_%S‘)
        file_name = ‘screenshot_‘ + dt + ‘.png‘
        return file_name

    def adb_command(self):
        adb_command = ‘adb -s ‘ + self.device_id + ‘ ‘
        return adb_command


    def adb_shell(self):
        adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            self.log_class.info("Execute command : {0}".format(ccmd))
        else:
            pass    
        proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE)    
        return proc

    def check_adb_works(self,proc):
        pass    

    def get_dumpsys_display(self):
        dumpsys_display_command = self.adb_shell() + ‘dumpsys display‘
        out = self.execute_command(dumpsys_display_command)
        lines = out.stdout.read().split(\n)
        return lines

    def bound_to_list(self):
        s = self.parse_window_dump(paras_dictory)
        temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘)
        temp.remove(‘‘)
        return temp

    def get_centre_coordinate(self):
        c_list = self.bound_to_list()
        point_c = []
        co_x1 = c_list[0]
        co_x2 = c_list[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = c_list[1]
        co_y2 = c_list[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def native_power_on(self):
        if not self.native_check_screen_on():
            power_on_command = self.adb_shell() + ‘input keyevent 26‘
            self.execute_command(power_on_command).wait()   

    def native_unlock_screen(self):
        self.native_power_on()
        if self.native_is_screen_lock():
            self.native_push_up()


    def native_check_screen_on(self):
        is_screen_on = False
        for line in self.get_dumpsys_display():
            if ‘mScreenState‘ in line and ‘ON‘ in line:
                is_screen_on = True
        return is_screen_on 

    def native_push_up(self):
        device_resolution = self.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.adb_shell()  + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
            self.execute_command(push_up_command).wait()

    def get_max_resolution(self):
        max_resolution = []
        for line in self.get_dumpsys_display():
            if ‘mDisplayWidth‘ in line:
                max_resolution.append(line.strip(\r).split(‘=‘)[-1])
            if ‘mDisplayHeight‘ in line:        
                max_resolution.append(line.strip(\r).split(‘=‘)[-1])     
        return max_resolution

    def take_snapshot(self):
        global xml_path
        xml_path=self.get_xml()
        file_path = self.generate_folder_locate_picture()
        picture_name = self.generate_screenshot_name_format()
        uiautomator_dump_file_name = picture_name.split(‘.‘)[0] + ‘.uix‘
        uiautomator_dump_file = os.path.join(file_path,uiautomator_dump_file_name)
        file_path_device = os.path.join(‘/sdcard/‘ + picture_name)
        take_snapshot_command = self.adb_shell() + ‘/system/bin/screencap -p ‘ + file_path_device
        pull_snapshot_command = ‘adb -s ‘ + self.device_id + ‘ pull ‘ + file_path_device + ‘ ‘ + file_path
        get_uiautomator_dump_command = self.adb_shell() + ‘uiautomator dump‘
        pull_uiautomator_dump_command = ‘adb -s ‘ + self.device_id + ‘ pull /sdcard/window_dump.xml‘ + ‘ ‘ + uiautomator_dump_file
        self.native_unlock_screen()
        time.sleep(0.5)
        self.execute_command(take_snapshot_command).wait()
        self.execute_command(pull_snapshot_command).wait()
        self.execute_command(get_uiautomator_dump_command).wait()
        self.execute_command(pull_uiautomator_dump_command).wait()        

    def little_swipe(self):
        device_resolution_2 = self.get_max_resolution()
        max_w_2 = device_resolution_2[0]
        max_h_2 = device_resolution_2[1]
        x1_2 = str(int(max_w_2)/2)
        x2_2 = str(int(max_w_2)/2)
        y2_2 = str((int(max_h_2)*2)/5)
        y1_2 = str((int(max_h_2)*4)/5)
        little_swipe_command = self.adb_shell() + ‘input swipe ‘ + x1_2 + ‘ ‘ + y1_2 + ‘ ‘ + x2_2 + ‘ ‘ + y2_2
        self.execute_command(little_swipe_command).wait()


class Device(object):
    def __init__(self,executor):
        self.executor = executor

    def return_home(self):
        go_home_command = self.executor.adb_shell() + ‘input keyevent 3‘
        self.executor.execute_command(go_home_command).wait()


    def press_enter(self):
        press_enter_command = self.executor.adb_shell() + ‘input keyevent 66‘
        self.executor.execute_command(press_enter_command).wait()

    def press_menu(self):
        press_menu_command = self.executor.adb_shell() + ‘input keyevent 82‘
        self.executor.execute_command(press_menu_command).wait()

    def shift_tab(self,counter=1):
        tab_key = ‘61 ‘
        shift_tab_command = self.executor.adb_shell() + ‘input keyevent ‘ + (tab_key*int(counter)).strip()
        self.executor.execute_command(shift_tab_command).wait()

    def shift_down(self,counter=1):
        down_key = ‘20 ‘
        press_down_command = self.executor.adb_shell() + ‘input keyevent ‘ + (down_key*int(counter)).strip()
        self.executor.execute_command(press_down_command).wait()    

    def shift_up(self,counter=1):
        up_key = ‘19 ‘
        press_up_command = self.executor.adb_shell() + ‘input keyevent ‘ + (up_key*int(counter)).strip()
        self.executor.execute_command(press_up_command).wait()

    def shift_left(self,counter=1):
        press_left_command = self.executor.adb_shell() + ‘input keyevent 21‘
        for j in range(int(counter)):
            self.executor.execute_command(press_left_command).wait()
            time.sleep(1)

    def volume_up(self,counter=1):
        volume_up_command = self.executor.adb_shell() + ‘input keyevent 24‘
        for j in range(int(counter)):
            self.executor.execute_command(volume_up_command).wait()
            time.sleep(0.5)

    def volume_down(self,counter=1):
        volume_down_command = self.executor.adb_shell() + ‘input keyevent 25‘
        for j in range(int(counter)):
            self.executor.execute_command(volume_down_command).wait()
            time.sleep(0.5)

    def shift_right(self,counter=1):
        press_right_command = self.executor.adb_shell() + ‘input keyevent 22‘
        for k in range(int(counter)):
            self.executor.execute_command(press_right_command).wait()
            time.sleep(1)

    def push_up(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = str((int(max_h)*4)/5)
            y2 = str(int(max_h)/5)
            push_up_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
            self.executor.execute_command(push_up_command).wait()



    def swipe_right(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/5)
            x2 = str((int(max_w)*4)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_right_command = self.executor.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
            self.executor.execute_command(swipe_right_command).wait()

    def check_boot_status(self):
        is_booted = False
        check_boot_status_command = self.executor.adb_shell() + ‘getprop‘
        grep_command = ‘grep bootanim‘
        p5 = sp.Popen(check_boot_status_command.split(‘ ‘),stdout=sp.PIPE)
        p6 = sp.Popen(grep_command.split(‘ ‘),stdin=p5.stdout,stdout=sp.PIPE)
        out = p6.stdout.read().strip(\n)
        if "stopped" and ‘1‘ in out:
            is_booted = True
        return is_booted


    def check_reboot_progress(self):
        reboot_execute_success = False
        if not self.check_boot_status():
            reboot_execute_success = True
        return reboot_execute_success    


    def reboot_device(self):
        reboot_device_command = self.executor.adb_command() + ‘reboot‘
        self.executor.execute_command(reboot_device_command).wait()

    def input_text(self,string=%s‘):
        string = string.strip().replace(‘ ‘,%s‘)    
        input_text_command = self.executor.adb_shell() + ‘input text ‘ + string
        self.executor.execute_command(input_text_command).wait()



    def swipe_left(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str((int(max_w)*4)/5)
            x2 = str(int(max_w)/5)
            y1 = str(int(max_h)/2)
            y2 = str(int(max_h)/2)
            swipe_left_command = self.executor.adb_shell()  + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
            self.executor.execute_command(swipe_left_command).wait()


    def pull_down(self):
        device_resolution_1 = self.executor.get_max_resolution()
        max_w_1 = device_resolution_1[0]
        max_h_1 = device_resolution_1[1]
        if max_w_1 ==‘unknow‘ or max_h_1 ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1_1 = str(int(max_w_1)/2)
            x2_1 = str(int(max_w_1)/2)
            y1_1 = str(int(max_h_1)/5)
            y2_1 = str((int(max_h_1)*4)/5)
            push_up_command_1 = self.executor.adb_shell()   + ‘input swipe ‘ + x1_1 + ‘ ‘ + y1_1 + ‘ ‘ + x2_1 + ‘ ‘ + y2_1
            self.executor.execute_command(push_up_command_1).wait()

    def swipe_pull_notification(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            x1 = str(int(max_w)/2)
            x2 = str(int(max_w)/2)
            y1 = "0"
            y2 = str(int(max_h)/5)
            swip_noticfi_command = self.executor.adb_shell()    + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
            self.executor.execute_command(swip_noticfi_command).wait()

    def get_through_oobe(self):
        device_resolution = self.executor.get_max_resolution()
        max_w = device_resolution[0]
        max_h = device_resolution[1]
        print "[debug] max_w and max_h is ",max_w,max_h
        if max_w ==‘unknow‘ or max_h ==‘unknow‘:
            print "Error : dumpsys display failed"
            sys.exit(-1)
        else:
            p1 = [str((int(max_w)*2)/10),str((int(max_h)*2)/10)]
            p2 = [str((int(max_w)*8)/10),str((int(max_h)*2)/10)]
            p3 = [str((int(max_w)*8)/10),str((int(max_h)*8)/10)]
            p4 = [str((int(max_w)*2)/10),str((int(max_h)*8)/10)]
            tap_command1 = self.executor.adb_shell() + ‘input tap ‘ + p1[0] + ‘ ‘ + p1[1]
            tap_command2 = self.executor.adb_shell() + ‘input tap ‘ + p2[0] + ‘ ‘ + p2[1]
            tap_command3 = self.executor.adb_shell() + ‘input tap ‘ + p3[0] + ‘ ‘ + p3[1]
            tap_command4 = self.executor.adb_shell() + ‘input tap ‘ + p4[0] + ‘ ‘ + p4[1]
            self.executor.execute_command(tap_command1).wait()
            self.executor.execute_command(tap_command2).wait()
            self.executor.execute_command(tap_command3).wait()
            self.executor.execute_command(tap_command4).wait()      

    def power_on(self):
        if not self.check_screen_on():
            power_on_command = self.executor.adb_shell() + ‘input keyevent 26‘
            self.executor.execute_command(power_on_command).wait()


    def press_back(self,counter=1):
        press_back_command = self.executor.adb_shell() + ‘input keyevent 4‘
        for i in range(int(counter)):
            self.executor.execute_command(press_back_command).wait()
            time.sleep(1)



    def check_screen_on(self):
        is_screen_on = False
        for line in self.executor.get_dumpsys_display():
            if ‘mScreenState‘ in line and ‘ON‘ in line:
                is_screen_on = True
        return is_screen_on     

    def launch_app_with_intent(self,app="Native_HomePage",**kwargs):
        intent_dictory = self.executor.get_intents_dictory()
        if app in intent_dictory.keys():
            app_intent = intent_dictory[app][‘intent‘]  
            launch_app_command = self.executor.adb_shell() + ‘am start -a android.intent.action.MAIN -c android.intent.category.LAUNCHER -f 0x10200000 -n ‘ + app_intent
            self.executor.execute_command(launch_app_command).wait()
        else:
            print "[Error] there is no intent match your given application, please double check or contact author to add!!!"
            sys.exit(-1) 


class Panel(object):
    def __init__(self,panel):
        self.panel = panel

    def click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        click_command = self.panel.adb_shell() + ‘input tap ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1]
        self.panel.execute_command(click_command).wait()

    def swipe_up(self,times=1):
        coordinate_list = self.panel.bound_to_list()
        x1 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        x2 = str((int(coordinate_list[0])+int(coordinate_list[2]))/2)
        y1 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*4)/5)
        y2 = str(((int(coordinate_list[1])+int(coordinate_list[3]))*1)/5)
        push_up_command = self.panel.adb_shell() + ‘input swipe ‘ + x1 + ‘ ‘ + y1 + ‘ ‘ + x2 + ‘ ‘ + y2
        for loop in range(int(times)):
            self.panel.execute_command(push_up_command).wait()

    def long_click(self):
        click_coordinate = self.panel.get_centre_coordinate()
        px = str(int(click_coordinate[0])+1)
        py = str(int(click_coordinate[1])+1)
        during_time = ‘2000‘
        long_click_command = self.panel.adb_shell() + ‘input swipe ‘ + click_coordinate[0] + ‘ ‘ + click_coordinate[1] + ‘ ‘ + px + ‘ ‘ + py + ‘ ‘ + during_time
        self.panel.execute_command(long_click_command).wait()

    def drag_to(self,dx,dy):
        drag_to_command = []
        down_key_coordinate = self.panel.get_centre_coordinate()
        down_key_px = str(int(down_key_coordinate[0]))
        down_key_py = str(int(down_key_coordinate[1]))
        down_key_command = self.touch_down(down_key_px,down_key_py)
        move_command = self.move(str(dx),str(dy))
        up_key_command = self.touch_up(str(dx),str(dy))
        if self.get_pid() is None:
            self.generate_monkey_process()
        monkey_pid_num = self.get_pid()
        if monkey_pid_num is not None:
            drag_to_command.append(down_key_command)
            drag_to_command.append(move_command)
            drag_to_command.append(up_key_command)
            self.open_socket(drag_to_command)
            self.kill_monkey_process(monkey_pid_num)
        if self.get_pid() is not None:
            self.kill_monkey_process(monkey_pid_num)    

    def touch_down(self,dx,dy):
        return ‘touch down {0} {1}\n.format(dx,dy)

    def touch_up(self,dx,dy):
        return ‘touch up {0} {1}\n.format(dx,dy)

    def move(self,dx,dy):
        return ‘touch move {0} {1}\n.format(dx,dy)

    def get_pid(self):
        monkey_pid=None
        r2=r‘\S+\s+(\d+)\s+\d+\s+\d+\s+\d+\s+\S+\s+\S+\s.\scom.android.commands.monkey‘
        grep_monkey_command = self.panel.adb_shell() + ‘ps‘
        o = sp.Popen(grep_monkey_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(\n)
        re_result = re.findall(r2,o)
        if re_result == []:
            monkey_pid=None
        else:
            monkey_pid=re_result[0]    
        return monkey_pid

    def kill_monkey_process(self,monkey_pid):
        if monkey_pid is not None:
            kill_monkey_process_command = self.panel.adb_shell() + ‘kill ‘ + monkey_pid
            self.panel.execute_command(kill_monkey_process_command).wait()

    def generate_monkey_process(self):
        generate_monkey_process_command = self.panel.adb_shell() + ‘LD_LIBRARY_PATH=/vendor/lib:/system/lib monkey --port 15555‘
        p2 = sp.Popen(generate_monkey_process_command.split(‘ ‘),stdout=sp.PIPE)
        t1 = threading.Timer(5,lambda:self.kill(p2))
        t1.setDaemon(True)
        t1.start()
        time.sleep(5)
        if self.get_pid() is not None:
            forwad_command = ‘adb -s ‘ + self.panel.device_id + ‘ forward tcp:15555 tcp:15555‘
            self.panel.execute_command(forwad_command).wait()

    def open_socket(self,cmd_list):
        sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        sk.settimeout(10)
        sk.connect((‘127.0.0.1‘,15555))
        for cmd in cmd_list:
            self.panel.log_class.info("Execute command : {0}".format(cmd)) 
            sk.sendall(cmd)
            time.sleep(1.5)
        sk.send(‘quit\n)
        sk.close()    

    def kill(self,process):
        process.terminate()
        process.kill()







技术分享
yaboandriod · #3 · 2017年01月16日

#1楼 @264768502 
第一次发,不懂怎么发代码,现在改过来了。

技术分享
yaboandriod · #4 · 2017年01月16日

logger代码如下:

#!/usr/bin/python
import logging
import os

class Logger():

    def __init__(self,name,log_path):
        self.log_path = log_path
        self.name = name
        self.log_file = os.path.join(log_path,‘log.txt‘)
        if not os.path.isdir(log_path):
            try:
                os.makedirs(log_path)
            except OSError:
                pass             

    def modlogger(self):
        mylogger = logging.getLogger(self.name)
        mylogger.setLevel(logging.DEBUG)
        fh = logging.FileHandler(self.log_file)
        sh = logging.StreamHandler()
        formatter = logging.Formatter("%(asctime)s  %(levelname)s\t%(message)s")
        fh.setFormatter(formatter)
        sh.setFormatter(formatter)
        mylogger.addHandler(fh)
        mylogger.addHandler(sh)
        return mylogger

    def get_instance(self):
        logger = Logger(self.name,self.log_path)
        return logger

    def logger(self):
        methods = self.get_instance()
        return Methods(methods)  

class Methods(object):
    def __init__(self,obj):
        self.ml = obj.modlogger()

    def info(self,string):
        self.ml.info(string)

    def debug(self,string):
        self.ml.debug(string)

    def warning(self,string):
        self.ml.warning(string)

    def error(self,string):
        self.ml.error(string)


if __name__=="__main__":
    l = Logger(‘test0‘,‘/home/buildbot/bluesea/reporter‘)
    m = l.logger()
    m.info(‘------this is info message-------‘)
    m.error(‘tap ok button fail‘)
    m.debug(‘-----this is debug message-------‘)
    m.warning(‘you have not set timeout‘)

技术分享
yaboandriod · #5 · 2017年01月16日

ui_hooks模块如下:

import controler
import time
import inspect
import subprocess as sp

"""
It provides a way to get through kinds of instability pup up box 
"""


class Hooks:
    def __init__(self,device_id):   
        self.device_id=device_id

    def get_generators(self,generator=None):
        global frame
        frame={}
        a,b,c,d,e,f,g=[],[],[],[],[],[],[]
        for element in generator:
            a.append(element.get(‘package‘))
            b.append(element.get(‘index‘))
            c.append(element.get(‘text‘))
            d.append(element.get(‘content-desc‘))
            e.append(element.get(‘resource-id‘))
            f.append(element.get(‘bounds‘))
            g.append(element.get(‘enabled‘))

        for i in range(len(a)):
            frame[i+1]={‘package‘:a[i],‘index‘:b[i],‘text‘:c[i],‘description‘:d[i],‘id‘:e[i],‘bounds‘:f[i],‘enabled‘:g[i]}



    def parse(self):
        status=False
        status=self.divider()
        return status

    def divider(self):
        flag=False
        main_key = frame[1][‘package‘]
        seducer001 = inspect.getargspec(self.hook_maps)[3][0]
        seducer002 = inspect.getargspec(self.hook_contact)[3][0]
        seducer003 = inspect.getargspec(self.hook_gms)[3][0]
        seducer004 = inspect.getargspec(self.hook_settings)[3][0]
        seducer005 = inspect.getargspec(self.hook_stk)[3][0]
        seducer006 = inspect.getargspec(self.hook_android)[3][0]        
        if main_key == seducer001:
            flag = self.executer(seducer001,self.hook_maps())       
        if main_key ==  seducer002:
            flag = self.executer(seducer002,self.hook_contact())
        if main_key ==  seducer003:
            flag = self.executer(seducer003,self.hook_gms())
        if main_key ==  seducer004:
            flag = self.executer(seducer004,self.hook_settings())
        if main_key ==  seducer005:
            flag = self.executer(seducer005,self.hook_stk()) 
        if main_key ==  seducer006:
            flag = self.executer(seducer006,self.hook_android())                                            
        return flag        


    def executer(self,sed,func):
        e_flag=False
        print "this app is %s"% sed
        coodinate=func
        if coodinate is None:
            e_flag = False
        else:
            pointseq=self.get_point(coodinate)
            self.tap(pointseq)  
            e_flag =True
        return e_flag           


    def hook_maps(self,seducer=‘com.google.android.apps.maps‘):
        coodinate=None
        zipers=[]
        print "hook_maps method is invoked"
        return coodinate


    def hook_stk(self,seducer=‘com.android.stk‘):
        coodinate=None
        print "hook_stk method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if ‘button_ok‘ in frame[j][‘id‘].strip(\n):
                coodinate=frame[j][‘bounds‘]        
        return coodinate


    def hook_android(self,seducer=‘android‘):
        coodinate=None
        print "hook_android method is invoked"          
        for j in range(1,len(frame.keys())+1):
            if ‘button1‘ in frame[j][‘id‘].strip(\n):
                coodinate=frame[j][‘bounds‘]        
        return coodinate


    def hook_settings(self,seducer=‘com.android.settings‘):
        coodinate=None
        zipers=["Update preferred SIM card?"]
        print "hook_settings method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][‘text‘].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if ‘NO‘ in frame[j][‘text‘].strip(\n):
                    coodinate=frame[j][‘bounds‘]        
        return coodinate


    def hook_gms(self,seducer=‘com.google.android.gms‘):
        coodinate=None
        zipers=["Couldn‘t sign in"]
        print "hook_gms method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][‘text‘].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if ‘Next‘ in frame[j][‘text‘].strip(\n):
                    coodinate=frame[j][‘bounds‘]        
        return coodinate

    def hook_contact(self,seducer=‘com.google.android.packageinstaller‘):
        coodinate=None  
        zipers = ["Allow Contacts to access photos, media, and files on your device?"]
        print "hook_contact method is invoked"
        zipers_match=False
        for zi in zipers:
            for i in range(1,len(frame.keys())+1):
                if zi == frame[i][‘text‘].strip(\n):
                    zipers_match=True
                    break
        if zipers_match:          
            for j in range(1,len(frame.keys())+1):
                if ‘Allow‘ in frame[j][‘text‘].strip(\n):
                    coodinate=frame[j][‘bounds‘]
                if ‘Next‘ in frame[j][‘text‘].strip(\n):
                    coodinate=frame[j][‘bounds‘]  
        return coodinate

    def get_point(self,coodinate):
        s = coodinate
        temp = s.replace(‘[‘,‘‘).replace(‘]‘,‘,‘).split(‘,‘)
        temp.remove(‘‘)
        point_c = []
        co_x1 = temp[0]
        co_x2 = temp[2]
        point_c.append(str((int(co_x1)+int(co_x2))/2))
        co_y1 = temp[1]
        co_y2 = temp[3]
        point_c.append(str((int(co_y1)+int(co_y2))/2))
        return point_c

    def tap(self,point_list):
        tap_coordinate = point_list
        tap_command = self.adb_shell() + ‘input tap ‘ + tap_coordinate[0] + ‘ ‘ + tap_coordinate[1]
        self.execute_command(tap_command).wait()                      


    def adb_shell(self):
        adb_shell_command = ‘adb -s ‘ + self.device_id + ‘ shell ‘
        return adb_shell_command

    def execute_command(self,cmd,ignore_print=True):    
        ccmd = cmd
        if ignore_print:
            print ccmd
        else:
            pass    
        proc = sp.Popen(ccmd.split(‘ ‘),stdout=sp.PIPE)    
        return proc
技术分享
yaboandriod · #6 · 2017年01月16日

某个库文件如下:

 # coding: utf-8 
import os
import time
import sys
import subprocess as sp
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from engine import controler
max_retry = 30

class Commons:
    def __init__(self,conf_dic,ins_logger=None):
        self.conf_dic = conf_dic
        name = os.path.basename(conf_dic[‘TC‘]).split(‘.‘)[0]
        workpath = conf_dic[‘work_dir‘]
        dev = conf_dic[‘device_id‘]
        logpath = conf_dic[‘log_path‘]    
        log_path = os.path.join(workpath,logpath,name)
        self.mylogger = ins_logger   
        self.op = controler.Operator(device_id=dev,log_class=self.mylogger,log_path=log_path)

    def warm_reboot(self):
        if not self.op.hardware().check_boot_status():
            self.mylogger.error("[Common] Device is not ready for test reboot case. ")
            return False
        self.mylogger.info("[Common] Device is ready. ")    
        self.mylogger.info("[Common] Start to reboot device {0}".format(self.op.device_id))
        self.get_boot_infomation()
        self.op.hardware().reboot_device()
        time.sleep(15)
        self.mylogger.info("[Common] Start to check if execute reboot command? ")
        if not self.op.hardware().check_reboot_progress():
            self.mylogger.error("[Common] unknow error, reboot fail. ")
            return False
        self.mylogger.info("[Common] Device start booting...")    
        while 1:  
            if self.op.hardware().check_boot_status():
                self.mylogger.info("[Common] Device reboot successfully. ")
                break       
        return True

    def get_boot_infomation(self):
        get_boot_infomation_command = ‘adb -s ‘ + self.op.device_id + ‘ shell getprop | grep boot‘
        out = sp.Popen(get_boot_infomation_command.split(‘ ‘),stdout=sp.PIPE).stdout.read().strip(\n\r)
        self.mylogger.info(out)

    def log_on_google_account(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app=‘Settings‘)
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
            return False 
        try:
            self.mylogger.info("[Common] click Add account item ")
            self.op.search(text="Add account").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Add account item . Got Exception {0}".format(e))
            return False
        try:
            self.mylogger.info("[Common] click Google item ")
            self.op.search(text="Google").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Google item . Got Exception {0}".format(e))
            return False
        retry_count = 0    
        while 1:
            try:
                self.mylogger.info("[Common] click enter your email input edittext ")
                self.op.search(description="Enter your email ").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find Enter your email input edittext . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        self.mylogger.info("[Common] input google account : {0}".format(self.conf_dic[‘google_account‘]))
        self.op.hardware().input_text(self.conf_dic[‘google_account‘])
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        time.sleep(5)    
        try:
            self.mylogger.info("[Common] click password edittext ")
            self.op.search(id="password").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find password edittext . Got Exception {0}".format(e))
            return False
        self.mylogger.info("[Common] input google password : {0}".format(self.conf_dic[‘google_password‘]))
        self.op.hardware().input_text(self.conf_dic[‘google_password‘])            
        try:
            self.mylogger.info("[Common] click NEXT button ")
            self.op.search(description="NEXT").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click ACCEPT button ")
                self.op.search(description="ACCEPT").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find ACCEPT button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False        
        while 1:
            try:
                self.mylogger.info("[Common] check Google services shown??? ")
                self.op.search(text="Google services").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t Google services keywords. try again")
                    time.sleep(5)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Next button ")
            self.op.search(text="Next",enabled=‘true‘,retry_time=‘3‘).click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find NEXT button . Got Exception {0}".format(e))
            return False
        while 1:
            try:
                self.mylogger.info("[Common] click No thanks button ")
                self.op.search(text="No thanks").click()
                retry_count = 0
                break
            except AttributeError as e:
                if retry_count <= max_retry:
                    self.mylogger.debug("[Common] Can‘t find No thanks button . try again")
                    time.sleep(2)
                    retry_count+=1
                else:
                    self.mylogger.error("[Common] poor network course can‘t register google account . Got Exception {0}".format(e))
                    return False
        try:
            self.mylogger.info("[Common] click Continue button ")
            self.op.search(text="Continue").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Continue button . Got Exception {0}".format(e))
            return False
        time.sleep(2)
        return True

    def check_google_account_added(self):
        self.mylogger.info("[Common] launch Settings app ")
        self.op.hardware().launch_app_with_intent(app=‘Settings‘)
        try:
            self.mylogger.info("[Common] click Accounts item ")
            self.op.search(text="Accounts",retry_time="3").click()
        except AttributeError as e:
            self.mylogger.error("[Common] Can‘t find Accounts item . Got Exception {0}".format(e))
            return False
        if self.op.search(text=‘Google‘) is not None:
            self.mylogger.info("[Common] add google account successfully. ")
            return True
        else:
            self.mylogger.error("[Common] add account fail. ")
            return False        

















技术分享
yaboandriod · #7 · 2017年01月16日

入口程序:

import xml.etree.ElementTree as ET
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir,os.pardir)))
import argparse
import random
import subprocess as sp
import pickle
import datetime
import re
import time
from gramary.runner import iter_event,get_different_list,parse_history,get_crash_path,get_crashfile_conetnt
import generate_report


class Configuration:

    def __init__(self):
        self.log_path = ‘‘
        self.execute_main_campaign = ‘‘
        self.device_id = ‘‘
        self.work_dir = os.path.abspath(os.path.join(os.getcwd(),os.pardir))

    def load(self,filepath):
        temp_dict = {}
        try:
            execfile(filepath,{},temp_dict)
            for key in temp_dict.keys():
                setattr(self,key,temp_dict[key])
        except TypeError,errormsg:
            print "Error : No file path given!"
            print "you can add -h option to view more info"
            sys.exit(-1)
        except IOError,msg1:
            print msg1
            print "Error : Please specify a path which contain a config file!"
            sys.exit(-1)


def get_testcase_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    item_instance_list = root.iter(‘item‘)
    return item_instance_list

def get_testsuite_from_xml(config):
    xml_path = os.path.join(config.work_dir,config.execute_main_campaign)
    parser = ET.parse(xml_path)
    root = parser.getroot()
    suite_instance_list = root.iter(‘suite‘)
    return suite_instance_list

def get_full_path(config,string):
    return os.path.join(config.work_dir,string) + ‘.py‘


def generate_file(config,dictory):
    temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,‘settings_temp.pkl‘)
    f1 = open(pickle_dump_path,‘wb‘)
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def save_data_for_html(config,dictory):
    temporary_path = os.path.join(config.work_dir,‘temporary‘,config.device_id)
    if not os.path.isdir(temporary_path):
        try:
            os.makedirs(temporary_path)
        except OSError:
            pass        
    pickle_dump_path = os.path.join(temporary_path,‘html_raw_data.pkl‘)
    f1 = open(pickle_dump_path,‘wb‘)
    pickle.dump(dictory,f1,True)
    f1.close()
    return pickle_dump_path


def main(config):
    r1=r‘Fail|Pass‘
    r2=r‘(\d+)‘
    r_crash_data0 = r‘DATA0=(\S+\s)[\s\S]*‘
    r_crash_data1 = r‘DATA1=(\S+\s)[\s\S]*‘
    r_crash_data2 = r‘DATA2=(\S+\s)[\s\S]*‘
    seq_num = 1
    html_result_data = {}
    settings_list = dir(config)
    for _ in settings_list:
        if _ == ‘__doc__‘:
            settings_list.remove(_)
        if _ == ‘__module__‘:
            settings_list.remove(_)
        if _ == ‘load‘:
            settings_list.remove(_)    
    settings_list.remove(‘__init__‘)
    settings_dictory = {}
    for _ in settings_list:
        settings_dictory[_] = getattr(config,_)
    rs = ‘run_set_‘+datetime.datetime.now().strftime(%Y_%m_%d_%H_%M_%S‘)    
    settings_dictory[‘log_path‘] = os.path.join(settings_dictory[‘log_path‘],config.device_id,rs)
    execue_case_sequence = {}
    for testsuite in get_testsuite_from_xml(config):
        execue_case_sequence[‘suite_name‘] = testsuite.get(‘name‘)
        execue_case_sequence[‘suite_loop‘] = testsuite.get(‘loop‘)
        execue_case_sequence[‘suite_mode‘] = testsuite.get(‘mode‘)
        execue_case_sequence[‘suite_content‘] = {}
    execue_case_sequence[‘device_id‘] = settings_dictory[‘device_id‘]    
    for testcase in get_testcase_from_xml(config):
        execue_case_sequence[‘suite_content‘][seq_num] = [testcase.text,testcase.get(‘loop‘)]   
        seq_num += 1
    suite_loop = int(execue_case_sequence[‘suite_loop‘])
    suite_mode = execue_case_sequence[‘suite_mode‘]
    tc_symbol_list = execue_case_sequence[‘suite_content‘].keys()
    suite_content = execue_case_sequence[‘suite_content‘]
    if suite_loop == 0 or tc_symbol_list == []:
        sys.exit(0)
        print "there is no loop specify for running testsuite or no cases need to be executed!"
    need_to_change_sequence = False    
    if suite_mode == ‘regular‘:
        need_to_change_sequence = False
    elif suite_mode == ‘random‘:
        need_to_change_sequence = True
    else:
        need_to_change_sequence = False
    html_result_data = execue_case_sequence
    original_history = parse_history(settings_dictory[‘device_id‘])
    for sl in range(1,suite_loop+1):
        print "Start to run testsuite : {0} , loop {1}".format(execue_case_sequence[‘suite_name‘],sl)
        if need_to_change_sequence:
            random.shuffle(tc_symbol_list)
        for i in range(len(tc_symbol_list)):
            item_loop = int(suite_content[tc_symbol_list[i]][1]) if len(suite_content[tc_symbol_list[i]]) == 2 else 1
            item_path = get_full_path(config,suite_content[tc_symbol_list[i]][0])
            case_name = os.path.basename(item_path).split(‘.‘)[0]
            report_path = os.path.join(config.work_dir,settings_dictory[‘log_path‘],case_name)
            if item_loop == 0:
                continue    
            for j in range(1,item_loop+1):
                print "Start to run case : {0} , loop {1}".format(case_name,j)
                start_time = time.time()
                start_time_dateformat = datetime.datetime.now().strftime(%Y/%m/%d_%H:%M:%S‘)
                settings_dictory[‘TC‘] = item_path
                dir_path = generate_file(config,settings_dictory)
                execute_command = ‘python ‘ + item_path + ‘ ‘ + dir_path
                proc=sp.Popen(execute_command.split(‘ ‘),stdout=sp.PIPE)
                out = proc.stdout.read().strip(\n)
                proc.wait()
                current_history = parse_history(settings_dictory[‘device_id‘])
                stop_time = time.time()
                stop_time_dateformat = datetime.datetime.now().strftime(%Y/%m/%d_%H:%M:%S‘)
                time_taken = stop_time - start_time
                crash_list = iter_event(get_different_list(original_history,current_history))
                print_crashfile = ‘‘
                crashes = {}
                crash_dic = {}
                c_count = 1
                for crash in crash_list:
                    crashlog_path = get_crash_path(crash)
                    if crashlog_path is not None:
                        print_crashfile = get_crashfile_conetnt(settings_dictory[‘device_id‘],crashlog_path)
                    match_data0 = re.findall(r_crash_data0,print_crashfile)
                    match_data1 = re.findall(r_crash_data1,print_crashfile)
                    match_data2 = re.findall(r_crash_data2,print_crashfile)
                    crash_dic[‘crash_info‘] = crash.strip(\r)
                    crash_dic[‘crash_data0‘] = match_data0[0] if match_data0 != [] else ‘‘
                    crash_dic[‘crash_data1‘] = match_data1[0] if match_data1 != [] else ‘‘ 
                    crash_dic[‘crash_data2‘] = match_data2[0] if match_data2 != [] else ‘‘
                    crashes[c_count] = crash_dic
                    c_count+=1
                crash_number = len(crash_list)
                original_history = current_history
                html_result_data[‘suite_content‘][i+1].append(re.findall(r1,out)[0])
                html_result_data[‘suite_content‘][i+1].append(start_time_dateformat)
                html_result_data[‘suite_content‘][i+1].append(stop_time_dateformat)
                html_result_data[‘suite_content‘][i+1].append(round(time_taken,3))
                html_result_data[‘suite_content‘][i+1].append(crash_number)
                html_result_data[‘suite_content‘][i+1].append(crashes)  
            address = save_data_for_html(config,html_result_data)
            generate_report.main(address,report_path)


if __name__ == ‘__main__‘:
    parse=argparse.ArgumentParser(usage=%(prog)s [options]‘)
    parse.add_argument(‘configuration_file‘,type=str,nargs=‘?‘,help="A path of config file")
    args=parse.parse_args()
    config_file = args.configuration_file
    configuration = Configuration()
    configuration.load(config_file)
    main(configuration)
技术分享
yaboandriod · #8 · 2017年01月16日

自定义生成测试报告:

import os
import sys
from pyh import *
import pickle

def main(address,report_path):
    f = open(address,‘rb‘)
    database1 = pickle.load(f)
    f.close()
    case_name_pre = report_path.split(‘/‘)[-1]
    report_path_base = os.path.abspath(os.path.join(report_path,os.pardir))
    report_file_path = os.path.join(report_path_base,‘report.html‘)
    content_dic = database1[‘suite_content‘]    
    page = PyH(‘test report‘)
    page << h1(‘this is report of yath project‘)
    order_list = page << ul(type=‘square‘)
    order_list << li(‘device id : %s‘%database1[‘device_id‘])
    order_list << li(‘suite mode : %s‘%database1[‘suite_mode‘])
    order_list << li(‘suite loop : %s‘%database1[‘suite_loop‘])
    order_list << li(‘suite name : %s‘%database1[‘suite_name‘])
    page << hr()
    p1_content = ‘Loop_‘+database1[‘suite_loop‘]+‘:‘
    page << p(i(p1_content))
    t = page << table(border=‘1‘,bgcolor=‘#bbbb99‘)
    t << tr(th(‘test case‘,align=‘left‘)+th(‘test loop‘,align=‘center‘)+th(‘test result‘,align=‘center‘)+th(‘start time‘,align=‘center‘)+th(‘stop time‘,align=‘center‘)+th(‘duration(s)‘,align=‘center‘)+th(‘crashes‘,align=‘center‘))
    for k in content_dic:
        item = content_dic[k]
        if len(item) == 2:
            continue
        test_case = item[0]
        test_loop = item[1]    
        if item[1] == ‘0‘:
            test_result,start_time,stop_time,duration,crashes = ‘NA‘,‘NA‘,‘NA‘,‘NA‘,‘0‘
        else:
            test_result = item[2]
            start_time = item[3]
            stop_time = item[4]
            duration = item[5]
            crashes = item[6]
        if test_result == ‘Pass‘:
            attr_co_test_result = ‘green‘
        elif test_result == ‘Fail‘:
            attr_co_test_result = ‘red‘
        elif test_result == ‘NA‘:
            attr_co_test_result = ‘grey‘
        if int(crashes) > 0:
            crashes_dic = item[7]
            generate_crashes_page(crashes_dic,test_case,report_path)
            link_crash = ‘file://‘+os.path.join(report_path,‘crash.html‘)
            t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(a(crashes,href=link_crash),align=‘center‘))
        else:              
            t << tr(td(test_case,align=‘left‘)+td(test_loop,align=‘center‘)+td(test_result,align=‘center‘,bgcolor=attr_co_test_result)+td(start_time,align=‘center‘)+td(stop_time,align=‘center‘)+td(duration,align=‘center‘)+td(crashes,align=‘center‘))    

    page.printOut(report_file_path)


def generate_crashes_page(dictory,case_name,crash_path):
    crash_file_path = os.path.join(crash_path,‘crash.html‘)
    crash_page = PyH(‘Crash Page‘)
    crash_page << h1(‘this is a page of crash.‘)
    crash_page << p(‘detail info : ‘)
    for j in dictory:
        div1 = crash_page << div()
        h2_content = ‘Crash %s occurs when execute %s : ‘%(str(j),case_name)
        div1 << h2(h2_content)
        crash_table = div1 << table(frame=‘box‘)
        crash_table << tr(th(‘key‘,align=‘left‘)+th(‘value‘),align=‘left‘)
        crash_table << tr(td(‘crash info‘,align=‘left‘)+td(dictory[j][‘crash_info‘]),align=‘left‘)
        crash_table << tr(td(‘data 0‘,align=‘left‘)+td(dictory[j][‘crash_data0‘]),align=‘left‘)
        crash_table << tr(td(‘data 1‘,align=‘left‘)+td(dictory[j][‘crash_data1‘]),align=‘left‘)
        crash_table << tr(td(‘data 2‘,align=‘left‘)+td(dictory[j][‘crash_data2‘]),align=‘left‘)
    crash_page.printOut(crash_file_path)  

if __name__ == "__main__":
    main()


技术分享
yaboandriod · #9 · 2017年01月16日 

测试用例:

import os
import unittest
import sys
import pickle
sys.path.append(os.path.abspath(os.path.join(os.getcwd(),os.pardir)))
from container.base import settings
from engine import log


dir_path = sys.argv[1]
f = open(dir_path,‘rb‘)
config_dictory = pickle.load(f)
f.close()
name = os.path.basename(config_dictory[‘TC‘]).split(‘.‘)[0]
workpath = config_dictory[‘work_dir‘]
logpath = config_dictory[‘log_path‘]    
log_path = os.path.join(workpath,logpath,name)
l = log.Logger(name,log_path)
mylogger = l.logger()

class test_setup_wifi_class(unittest.TestCase):
    def setUp(self):
        self.test_class = settings.Settings(config_dictory,mylogger)
    def tearDown(self):
        self.test_class.op.hardware().press_back(3)
        self.test_class.op.hardware().return_home()
    def test_skip_wizard(self):
        self.assertEqual(self.test_class.setup_wifi(),True,‘test setup_wifi case fail‘)    




if __name__ == "__main__":
    my_suite = unittest.TestLoader().loadTestsFromTestCase(test_setup_wifi_class)
    result = unittest.TextTestRunner(verbosity=0).run(my_suite)
    if len(result.failures) != 0 or len(result.errors) != 0:
        flag = ‘Fail‘
    else:
        flag = ‘Pass‘
    print flag,(len(result.failures)+len(result.errors))
技术分享
yaboandriod · #10 · 2017年02月03日

自己顶一下技术分享

技术分享
mads · #11 · 2017年02月05日

你这个代码干嘛用的,陈述一下

技术分享
cloudhuan · #12 · 2017年02月05日

???干嘛用的???封装了哪里?有什么优势?

技术分享
yaboandriod · #13 · 2017年02月06日

针对android操作系统UI进行自动化测试,可以进行绝大多数的操作,多个app之间的交互。还可以在多台设备之间进行交互,譬如互相打电话,接电话等。

技术分享
yaboandriod · #14 · 2017年02月06日

这是我写的一个小测试框架,case和lib可以自己定义,提供了log和controler接口。log用来打印日志,controler用来控制设备。

回帖

[python测试框架学习篇] 分享一个和adb相关的测试框架