首页 > 代码库 > Python httpsqs封装类
Python httpsqs封装类
‘‘‘httpsqs?????@author xiaopenga = HttpsqsClient(‘192.168.0.218‘,‘1218‘,‘httpsqsmmall.com‘)print a.put(‘logtest‘,‘sdfsfsf‘)print a.get(‘logtest‘)‘‘‘import urllib,urllib2,jsonclass HttpsqsClient(object): def __init__(self,host= ‘127.0.0.1‘,port=‘1218‘,auth = ‘‘, charset = ‘utf-8‘): self.httpsqs_url = ‘http://‘ + host + ‘:‘ + port + ‘/?‘ self.httpsqs_auth = auth self.httpsqs_charset = charset def __http_get(self,params): ‘‘‘ http get?? params = {"wd":"a","b":"2"} ‘‘‘ params[‘auth‘] = self.httpsqs_auth params[‘charset‘] = self.httpsqs_charset try: url_params = urllib.urlencode(params) final_url = self.httpsqs_url + url_params page = urllib2.urlopen(final_url,timeout=3).read() return page except urllib2.HTTPError, e: print "Error Code:", e.code except urllib2.URLError, e: print "Error Reason:", e.reason def __http_post(self,params,post_data): """ http post?? @param params: params = {"wd":"a","b":"2"} @param post_data: json or string @type post_data: string @return:string """ params[‘auth‘] = self.httpsqs_auth params[‘charset‘] = self.httpsqs_charset url_params = urllib.urlencode(params) final_url = self.httpsqs_url + url_params post_data = post_data.encode(‘utf8‘) try: req = urllib2.Request(final_url, post_data) page = urllib2.urlopen(req,timeout=3).read() return page except urllib2.HTTPError, e: print "Error Code:", e.code except urllib2.URLError, e: print "Error Reason:", e.reason def put(self,queue_name,queue_data): params = {‘opt‘:‘put‘,‘name‘:queue_name} r = self.__http_post(params,queue_data) if r == ‘HTTPSQS_PUT_OK‘: return True return False def get(self,queue_name): params = {‘opt‘:‘get‘,‘name‘:queue_name} r = self.__http_get(params) if r == ‘HTTPSQS_GET_END‘: return None return r def status(self,queue_name): params = {‘opt‘:‘status‘,‘name‘:queue_name} return self.__http_get(params) def status_json(self,queue_name): params = {‘opt‘:‘status_json‘,‘name‘:queue_name} return self.__http_get(params) def reset(self,queue_name): params = {‘opt‘:‘reset‘,‘name‘:queue_name} return self.__http_get(params) def maxqueue(self,queue_name,num): params = {‘opt‘:‘maxqueue‘,‘name‘:queue_name,‘num‘:str(num)} return self.__http_get(params) def synctime(self,queue_name,num): params = {‘opt‘:‘synctime‘,‘name‘:queue_name,‘num‘:str(num)} return self.__http_get(params)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。