首页 > 代码库 > api安全认证
api安全认证
三、auth自定义授权
客户端代码:
import requestsimport hashlibimport timecurrent_time = time.time()#自意义的字符串app_id,和服务端ck一致app_id =‘8kasoimnasodn8687asdfkmasdf‘app_id_time = "%s|%s" % (app_id,current_time,)m = hashlib.md5()m.update(bytes(app_id_time,encoding=‘utf-8‘))authkey = m.hexdigest()authkey_time ="%s|%s" % (authkey,current_time,)print(authkey_time)host_data = { ‘status‘:True, ‘data‘:{ ‘hostname‘:‘c1.com‘, ‘disk‘:{‘status‘:True,‘data‘:‘xxx‘}, ‘mem‘:{‘status‘:True,‘data‘:‘xxx‘}, ‘nic‘:{‘status‘:True,‘data‘:‘xxx‘}, },}response=requests.post( url=‘http://127.0.0.1:8000/api/asset/‘, json=host_data, headers={‘authkey‘:authkey_time},)print(response.text)# requests.get(url=‘http://127.0.0.1:8000/api/asset/?k1=123‘)# requests.get(url=‘http://127.0.0.1:8000/api/asset/‘,params={‘k1‘:‘v1‘,‘k2‘:‘v2‘})# requests.post(# url=‘http://127.0.0.1:8000/api/asset/‘,# params={‘k1‘:‘v1‘,‘k2‘:‘v2‘}, # GET形式传值# data=http://www.mamicode.com/{‘username‘:‘1123‘,‘pwd‘: ‘666‘}, # POST形式传值# headers={‘a‘:‘123‘} # 请求头数据# )
服务端代码:
from django.shortcuts import render,HttpResponsefrom django.views.decorators.csrf import csrf_exempt,csrf_protectimport hashlibimport time#自定义的字符串ck=‘8kasoimnasodn8687asdfkmasdf‘#5秒钟的授权列表auth_list = []@csrf_exemptdef asset(request): auth_key_time = request.META.get(‘HTTP_AUTHKEY‘) auth_key_client,client_ctime = auth_key_time.split(‘|‘) server_current_time = time.time() if server_current_time - 5 > float(client_ctime): #太久远了 return HttpResponse(‘时间太久远了‘) if auth_key_time in auth_list: #已经访问过了 return HttpResponse(‘你来晚了‘) key_time = ‘%s|%s‘ %(ck,client_ctime) m = hashlib.md5() m.update(bytes(key_time,encoding=‘utf-8‘)) authkey = m.hexdigest() if authkey != auth_key_client: return HttpResponse(‘授权失败‘) auth_list.append(auth_key_time) if request.method == ‘POST‘: import json print(authkey) host_info = json.loads(str(request.body,encoding=‘utf-8‘)) print(host_info) return HttpResponse(‘授权成功‘)def test(request): # print(request.POST,type(request.POST)) # from django.http.request import QueryDict response = render(request,‘index.html‘) response.set_signed_cookie(‘kkkk‘,‘vvvv‘,salt=‘asdf‘) return response
线程池和进程池
#!/usr/bin/env python# -*- coding:utf8 -*-####### 编写方式一 ##########from concurrent.futures import ThreadPoolExecutorfrom concurrent.futures import ProcessPoolExecutorimport requestsimport timedef task(url): response = requests.get(url) print(url,response)pool = ThreadPoolExecutor(5)url_list = [ ‘https://www.baidu.com‘, ‘http://www.sina.com.cn‘, ‘http://cn.bing.com‘, ‘https://home.cnblogs.com/u/liaoboshi/‘, ‘https://www.linkedin.com‘, ‘http://mil.news.baidu.com‘,]for url in url_list: pool.submit(task,url)pool.shutdown(wait=True)####### 编写方式一 ################################from concurrent.futures import ThreadPoolExecutorimport requestsimport timedef task(url): ‘‘‘ 下载页面 :param url: :return: ‘‘‘ response = requests.get(url) return responsedef done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content)pool = ThreadPoolExecutor(5)url_list = [ ‘https://www.baidu.com‘, ‘http://www.sina.com.cn‘, ‘http://cn.bing.com‘, ‘https://home.cnblogs.com/u/liaoboshi/‘, ‘https://www.linkedin.com‘, ‘http://mil.news.baidu.com‘,]for url in url_list: v = pool.submit(task,url) # 每一个线程函数走完,再走下面的另一个回调函数 v.add_done_callback(done)pool.shutdown(wait=True)
自定义异步IO框架
#!/usr/bin/env python# -*- coding:utf8 -*-# IO多路复用: 监听多个socket对象,感知变化,利用其特性可以并发出异步IO模块# 异步IO: 异步是非阻塞 非阻塞 + IO多路复用 # setblocking(False)import selectimport socketclass HttpRequest: def __init__(self,sk,host,callback): self.socket = sk self.host = host self.callback = callback def fileno(self): return self.socket.fileno()class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b‘\r\n\r\n‘, 1) self.body = body header_list = headers.split(b‘\r\n‘) for h in header_list: h_str = str(h, encoding=‘utf-8‘) v = h_str.split(‘:‘, 1) if len(v) == 2: self.header_dict[v[0]] = v[1]class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() # 创建 socket 对象 sk.setblocking(False) # 设置socket为非阻塞 sk.connect((host,80,)) # 连接 主机 except BlockingIOError as e: # 设置socket为非阻塞后,会报错,要抓住异常 pass request = HttpRequest(sk,host,callback) # 创建一个socket对象 要返回self.socket.fileno() self.conn.append(request) # 把对象加到列表里 self.connection.append(request) # 把对象加到列表里 def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) # 创建select对象 for w in wlist: print(w.host,‘连接成功...‘) # 只要能循环到,表示socket和服务器端已经连接成功 tpl = ‘GET / HTTP/1.0\r\nHost:%s\r\n\r\n‘ % w.host w.socket.send(bytes(tpl,encoding=‘utf8‘)) # 给发服务器送消息 self.connection.remove(w) # 发送完消息后,删除对象 for r in rlist: # r,是HttpRequest recv_data =http://www.mamicode.com/ bytes() while True: try: chunck = r.socket.recv(8096) # 接收服务器返回消息 recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) # 返回的消息包装成字典(请求头和请求体) r.callback(response) # 执行回调函数 r.socket.close() # 关闭连接 self.conn.remove(r) #删除对象 if len(self.conn) == 0: breakdef f1(response): # 回调函数拿到返回的请求头和请求体 print(‘保存到文件‘, response.header_dict)def f2(response): print(‘保存到数据库‘, response.header_dict)url_list = [ {‘host‘: ‘www.baidu.com‘, ‘callback‘: f1}, {‘host‘: ‘cn.bing.com‘, ‘callback‘: f2}, {‘host‘: ‘www.cnblogs.com‘, ‘callback‘: f2},]req = AsyncRequest() # 创建一个对象for item in url_list: req.add_request(item[‘host‘],item[‘callback‘]) # 运行类的add_request方法,把 主机名 和 回调函数 传进去req.run() # 运行类的run方法
api安全认证
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。