首页 > 代码库 > python之路day11【RabbitMQ、Redis、Mysql】
python之路day11【RabbitMQ、Redis、Mysql】
大纲
1.RabbitMQ
2.Redis
3.Mysql
1.RabbitMQ消息队列
1.1 RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
下面将重点介绍RabbitMQ中的一些基础概念,了解了这些概念,是使用好RabbitMQ的基础。
1.2 安装RabbitMQ和python的pika模块
1.2.1 安装RabbitMQ
(1)安装erlang平台(RabbitMQ的依赖平台)
1.安装依赖文件
yum install ncurses-devel
2.下载源文件
wget http://www.erlang.org/download/ otp_src_19.1.tar.gz
若失败,到地址:http://erlang.org/download/去手动下载
3.解压源文件压缩包
tar -xvf otp_src_17.1.tar.gz
(tar 参数含义: bz2格式用j;gz格式用z;c是创建;x是解压缩;v是详细信息;f是指定文件)
4.进入解压后的目录
cd otp_src_19.1
5.依次执行以下命令:
./configure -prefix=/opt/erlang 就会开始编译安装 会编译到 /opt/erlang 下
make && make install
6.修改/etc/profile文件,增加下面的环境变量:
cd /etc/
#set erlang environment
export PATH=$PATH:/opt/erlang/bin
source profile使得文件生效(用export 查看path中是否有刚刚添加的环境变量)
7.安装完成后执行erl看是否能打开eshell,用’halt().’退出,注意:“.”是erlang的结束符
(2)安装RabbitMQ
wget -c http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm
rpm -ivh --nodeps rabbitmq-server-3.6.0-1.noarch.rpm
1.2.2 安装pika
pip install pika 或者easy_install pika
1.3 最简单的发送/接收消息队列模型
producer:
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 4 import pika 5 6 connection = pika.BlockingConnection(pika.ConnectionParameters( 7 ‘localhost‘)) # 相当于建立一个socket连接 8 channel = connection.channel() 9 # 声明queue 10 channel.queue_declare(queue=‘hello‘) 11 # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 12 channel.basic_publish(exchange=‘‘, 13 routing_key=‘hello‘, 14 body=‘你好!‘.encode("utf-8")) 15 print(" 发送 ‘你好!‘") 16 connection.close()
consumer
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 ‘localhost‘)) 7 channel = connection.channel() 8 9 # You may ask why we declare the queue again ? we have already declared it in our previous code. 10 # We could avoid that if we were sure that the queue already exists. For example if send.py program 11 # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good 12 # practice to repeat declaring the queue in both programs. 13 channel.queue_declare(queue=‘hello‘) 14 15 16 def callback(ch, method, properties, body): 17 print(" 收到: %r" % body.decode("utf-8")) 18 19 channel.basic_consume(callback, 20 queue=‘hello‘, 21 no_ack=True) 22 print(‘ 等待。。。‘) 23 channel.start_consuming()
注意代码中的英文注释,特别是为什么又一次声明queue。。。
1.4 轮询原理
1.3中如果依次运行两个consumer,分别记consumer1、consumer2,那么producer第一次发消息是consumer1收到,第二次发是consumer2收到,第三次发又是consumer1收到......也就是说,rabbitMQ是依次把消息发给consumer端。
1.5 消息持久化
producer
1 #! /usr/bin/env python3 2 # -*- coding:utf-8 -*- 3 import pika 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) # 相当于建立一个socket连接 6 channel = connection.channel() # 定义一个管道 7 # 声明Queue 8 channel.queue_declare(queue="hello2",durable=True) # durable=True 是把这个队列持久化,如果rabbitMQ挂掉,队列还在;如果 9 # 队列中的消息没有持久化,则消息会丢失 10 channel.basic_publish(exchange="", 11 routing_key="hello2", 12 body="Hi,how are you?", 13 properties=pika.BasicProperties( 14 delivery_mode=2,)) # properties=pika.BasicProperties(delivery_mode=2,) 这是队列中的消息持久化 15 print("发送了一句话。。。") 16 connection.close()
上述代码中,第8行只是队列持久化,如果rabbitMQ挂掉,队列还在;但如果队列中的消息没有持久化,则消息会丢失。
1.6 消息公平分发
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
1.7 消息发布/订阅(Publish/Subscribe)
1.8 有选择地接收消息(exchange type = direct)
python之路day11【RabbitMQ、Redis、Mysql】