首页 > 代码库 > Python网络编程学习_day11
Python网络编程学习_day11
一、协程
1.理论知识
协程,又称伪线程,是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈,协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
优点:
- 无需线程上下文切换的开销
- 无需原子操作锁定及同步的开销
- "原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
- 方便切换控制流,简化编程模型
- 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
缺点:
- 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
- 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
协程满足条件:
- 必须在只有一个单线程里实现并发
- 修改共享数据不需加锁
- 用户程序里自己保存多个控制流的上下文栈
- 一个协程遇到IO操作自动切换到其它协程
2.代码实例
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
1 import gevent 2 def func1(): 3 print(‘\033[31;1m李闯在跟海涛搞...\033[0m‘) 4 gevent.sleep(2) 5 print(‘\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m‘) 6 7 8 def func2(): 9 print(‘\033[32;1m李闯切换到了跟海龙搞...\033[0m‘) 10 gevent.sleep(1) 11 print(‘\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m‘) 12 13 def func3(): 14 print("33333") 15 gevent.sleep(1) 16 print("4444") 17 18 gevent.joinall([ 19 gevent.spawn(func1), 20 gevent.spawn(func2), 21 gevent.spawn(func3), 22 ])
输出结果: 李闯在跟海涛搞... 李闯切换到了跟海龙搞... 33333 李闯搞完了海涛,回来继续跟海龙搞... 4444 李闯又回去跟继续跟海涛搞...
3.同步与异步的性能区别
import gevent def task(pid): """ Some non-deterministic task """ gevent.sleep(0.5) print(‘Task %s done‘ % pid) def synchronous(): for i in range(1,10): task(i) def asynchronous(): threads = [gevent.spawn(task, i) for i in range(10)] gevent.joinall(threads) print(‘Synchronous:‘) synchronous() print(‘Asynchronous:‘) asynchronous()
4.遇到IO阻塞自动切换任务(爬虫实例)
1 import gevent 2 from gevent import monkey 3 monkey.patch_all() 4 from urllib.request import urlopen 5 import time 6 7 def pa_web_page(url): 8 print("GET url",url) 9 req = urlopen(url) 10 data =http://www.mamicode.com/req.read() 11 print(data) 12 print(‘%d bytes received from %s.‘ % (len(data), url)) 13 14 t_start = time.time() 15 pa_web_page("http://www.autohome.com.cn/beijing/") 16 pa_web_page("http://www.xiaohuar.com/") 17 print("time cost:",time.time()-t_start) 18 19 t2_start = time.time() 20 gevent.joinall([ 21 #gevent.spawn(pa_web_page, ‘https://www.python.org/‘), 22 gevent.spawn(pa_web_page, ‘http://www.autohome.com.cn/beijing/‘), 23 gevent.spawn(pa_web_page, ‘http://www.xiaohuar.com/‘), 24 #gevent.spawn(pa_web_page, ‘https://github.com/‘), 25 ]) 26 print("time cost t2:",time.time()-t2_start)
二、事件驱动与异步IO
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。
当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
- 程序中有许多任务,而且…
- 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
- 在等待事件到来时,某些任务会阻塞。
当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。
1.select多并发socket例子
1 #_*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 4 import select 5 import socket 6 import sys 7 import queue 8 9 10 server = socket.socket() 11 server.setblocking(0) 12 13 server_addr = (‘localhost‘,10000) 14 15 print(‘starting up on %s port %s‘ % server_addr) 16 server.bind(server_addr) 17 18 server.listen(5) 19 20 21 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd 22 outputs = [] 23 24 message_queues = {} 25 26 while True: 27 print("waiting for next event...") 28 29 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 30 31 for s in readable: #每个s就是一个socket 32 33 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了, 34 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀 35 #新连接进来了,接受这个连接 36 conn, client_addr = s.accept() 37 print("new connection from",client_addr) 38 conn.setblocking(0) 39 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接 40 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到 41 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 42 43 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 44 45 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了 46 #客户端的数据过来了,在这接收 47 data = http://www.mamicode.com/s.recv(1024) 48 if data: 49 print("收到来自[%s]的数据:" % s.getpeername()[0], data) 50 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 51 if s not in outputs: 52 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 53 54 55 else:#如果收不到data代表什么呢? 代表客户端断开了呀 56 print("客户端断开了",s) 57 58 if s in outputs: 59 outputs.remove(s) #清理已断开的连接 60 61 inputs.remove(s) #清理已断开的连接 62 63 del message_queues[s] ##清理已断开的连接 64 65 66 for s in writeable: 67 try : 68 next_msg = message_queues[s].get_nowait() 69 70 except queue.Empty: 71 print("client [%s]" %s.getpeername()[0], "queue is empty..") 72 outputs.remove(s) 73 74 else: 75 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 76 s.send(next_msg.upper()) 77 78 79 for s in exeptional: 80 print("handling exception for ",s.getpeername()) 81 inputs.remove(s) 82 if s in outputs: 83 outputs.remove(s) 84 s.close() 85 86 del message_queues[s]
1 import socket 2 import sys 3 4 messages = [ b‘This is the message. ‘, 5 b‘It will be sent ‘, 6 b‘in parts.‘, 7 ] 8 server_address = (‘localhost‘, 10000) 9 10 # Create a TCP/IP socket 11 socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM), 12 socket.socket(socket.AF_INET, socket.SOCK_STREAM), 13 ] 14 15 # Connect the socket to the port where the server is listening 16 print(‘connecting to %s port %s‘ % server_address) 17 for s in socks: 18 s.connect(server_address) 19 20 for message in messages: 21 22 # Send messages on both sockets 23 for s in socks: 24 print(‘%s: sending "%s"‘ % (s.getsockname(), message) ) 25 s.send(message) 26 27 # Read responses on both sockets 28 for s in socks: 29 data = http://www.mamicode.com/s.recv(1024) 30 print( ‘%s: received "%s"‘ % (s.getsockname(), data) ) 31 if not data: 32 print(sys.stderr, ‘closing socket‘, s.getsockname() ) 33 34 select socket client
2.selectors模块
1 import selectors 2 import socket 3 4 def accept(sock, mask): 5 conn, addr = sock.accept() # Should be ready 6 print(‘accepted‘, conn, ‘from‘, addr) 7 conn.setblocking(False)#非阻塞,或者设置为0 8 sel.register(conn, selectors.EVENT_READ, read) 9 def read(conn, mask): 10 try: 11 data = http://www.mamicode.com/conn.recv(1000) # Should be ready 12 if data: 13 print(‘echoing‘, repr(data), ‘to‘, conn) 14 conn.send(data) # Hope it won‘t block 15 else: 16 print(‘closing‘, conn) 17 sel.unregister(conn) 18 conn.close() 19 except ConnectionResetError as e: 20 sel.unregister(conn) 21 sock = socket.socket() 22 sock.bind((‘localhost‘, 10000)) 23 sock.listen(100) 24 sock.setblocking(False) 25 26 sel = selectors.DefaultSelector()#生成实例 27 sel.register(sock, selectors.EVENT_READ, accept)#注册sock连接,读事件,如果有请求调用accept 28 #select.select(inputs,outputs...) 29 while True: 30 events = sel.select() #如果没有事件,一直等待,返回列表 31 for key, mask in events: #有事件,循环events列表 32 callback = key.data #accept内存地址,发送数据后变成read内存地址 33 print("--->",key,mask) 34 callback(key.fileobj, mask)#fileobj是conn, 35 #fileobj=<socket.socket fd=220, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 10000)>,
三、RabbitMQ队列
1.安装
安装python rabbitMQ module
pip install pika or easy_install pika or 源码 https://pypi.python.org/pypi/pika
2.send端
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 ‘localhost‘)) 6 channel = connection.channel() 7 8 #声明queue 9 channel.queue_declare(queue=‘hello‘) 10 11 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘hello‘, 14 body=‘Hello World!‘) 15 print(" [x] Sent ‘Hello World!‘") 16 connection.close()
3.receive端
1 #_*_coding:utf-8_*_ 2 __author__ = ‘Alex Li‘ 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 ‘localhost‘)) 7 channel = connection.channel() 8 9 10 #You may ask why we declare the queue again ? we have already declared it in our previous code. 11 # We could avoid that if we were sure that the queue already exists. For example if send.py program 12 #was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 13 # practice to repeat declaring the queue in both programs. 14 channel.queue_declare(queue=‘hello‘) 15 16 def callback(ch, method, properties, body): 17 print(" [x] Received %r" % body) 18 19 channel.basic_consume(callback, 20 queue=‘hello‘, 21 no_ack=True) 22 23 print(‘ [*] Waiting for messages. To exit press CTRL+C‘) 24 channel.start_consuming()
Python网络编程学习_day11