首页 > 代码库 > Day12 线程池、RabbitMQ和SQLAlchemy

Day12 线程池、RabbitMQ和SQLAlchemy

1、with实现上下文管理

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng


#with实现上下文管理
import contextlib
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
:param self:
:param state_list:
:param worker_thread:
:return:
"""
#第二步
state_list.append(worker_thread)
try:
#第三步
yield
finally:
#执行第五步
state_list.remove(worker_thread)

free_list = []
current_thread = "wang"
#第一步执行with,调用worker_state方法
with worker_state(free_list, current_thread):
#第四步
print(123)
print(456)
#执行完成出去的瞬间

2、with实现socket上下文

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng


import contextlib
import socket

@contextlib.contextmanager
def context_socket(host, port):
sk = socket.socket()
sk.bind((host, port))
sk.listen(5)
try:
yield sk
finally:
sk.close()

with context_socket(‘127.0.0.1‘, 8888) as sock:
print(sock)

3、Redis连接池

import redis
pool = redis.ConnectionPool(host=‘192.168.195.128‘, port=6379)
r = redis.Redis(connection_pool=pool)
pipe = r.pipeline(transaction=True)
r.set(‘name‘, ‘wanghuafeng‘)
r.set(‘role‘, ‘haha‘)
t = pipe.execute()
print(r.get(‘role‘))

4、Redis发布、订阅

import redis

class RedisHelper:
def __init__(self):
self.__conn = redis.Redis(host=‘192.168.195.128‘)
self.chan_sub = ‘FM98.8‘
self.chan_pub = ‘FM98.8‘

def public(self, msg):
self.__conn.publish(self.chan_pub, msg)
return True

def subscribe(self):
pub = self.__conn.pubsub()
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub
 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng


from monitor.s22 import RedisHelper

obj = RedisHelper()
redis_sub = obj.subscribe()

while True:
msg = redis_sub.parse_response()
print(msg)
 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng


from monitor.s22 import RedisHelper

obj = RedisHelper()
obj.public(‘helo‘)

5、RabbitMQ

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng

import pika

# ######################### 生产者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,
routing_key=‘hello‘,
body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
 
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: wanghuafeng


import pika

# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(
host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘hello‘)


def callback(ch, method, properties, body):
print(" [x] Received %r" % body)


channel.basic_consume(callback,
queue=‘hello‘,
no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()

Day12 线程池、RabbitMQ和SQLAlchemy