首页 > 代码库 > zmq笔记四: tcp的connect操作
zmq笔记四: tcp的connect操作
int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0
void *context = zmq_ctx_new();
void *requester = zmq_socket(context, ZMQ_REQ);
zmq_connect(requester, "tcp://localhost:6666"); //如果是进程间通信,改为"ipc://xxx"
int zmq_connect (void *s_, const char *addr_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; return -1; } zmq::socket_base_t *s = (zmq::socket_base_t *) s_; int result = s->connect (addr_); return result; }
int zmq::socket_base_t::connect (const char *addr_) { ENTER_MUTEX (); // Process pending commands, if any. int rc = process_commands (0, false); //不忘先处理一下命令队列.. if (unlikely (rc != 0)) { EXIT_MUTEX (); return -1; } // Parse addr_ string. std::string protocol; std::string address; if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) { EXIT_MUTEX (); return -1; } .... bool is_single_connect = (options.type == ZMQ_DEALER || options.type == ZMQ_SUB || options.type == ZMQ_REQ); //这三种类型的socket只能有一个连接 if (unlikely (is_single_connect)) { const endpoints_t::iterator it = endpoints.find (addr_); if (it != endpoints.end ()) { EXIT_MUTEX (); return 0; } } // Choose the I/O thread to run the session in. io_thread_t *io_thread = choose_io_thread (options.affinity); //查找负载最小的I/O线程 if (!io_thread) { errno = EMTHREAD; EXIT_MUTEX (); return -1; } // Create session. 创建session对象,以后发送给它的命令消息就会放进这个I/O线程的邮箱里 session_base_t *session = session_base_t::create (io_thread, true, this, options, paddr); errno_assert (session); // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. (same for NORM, currently?) bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp"; pipe_t *newpipe = NULL; if (options.immediate != 1 || subscribe_to_all) { // Create a bi-directional pipe. object_t *parents [2] = {this, session}; pipe_t *new_pipes [2] = {NULL, NULL}; bool conflate = options.conflate && (options.type == ZMQ_DEALER || options.type == ZMQ_PULL || options.type == ZMQ_PUSH || options.type == ZMQ_PUB || options.type == ZMQ_SUB); int hwms [2] = {conflate? -1 : options.sndhwm, conflate? -1 : options.rcvhwm}; bool conflates [2] = {conflate, conflate};
rc = pipepair (parents, new_pipes, hwms, conflates); //创建一对双向"管道",一个pipe_t对象有两个ypipe_t,分别是作为inpipe/outpipe队列,其中inpipe->read,outpipe->write errno_assert (rc == 0); // Attach local end of the pipe to the socket object. attach_pipe (new_pipes [0], subscribe_to_all); //第一个pipe_t放进socket对象的pipes集合里 newpipe = new_pipes [0]; // Attach remote end of the pipe to the session object later on. session->attach_pipe (new_pipes [1]); //第二个pipe_t放到session的pipes集合里,注意,new_pipes的两个pipe_t在pipepairs()生成时已经互为peer
} // Save last endpoint URI paddr->to_string (last_endpoint); add_endpoint (addr_, (own_t *) session, newpipe); //开始进行connect EXIT_MUTEX (); return 0; }
首先来看一下attach_pipe()做了什么: (socket和session各有不同的attach_pipe()函数,但功能差不多)
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); //只会被设置一次,当这个pipe有消息要处理时,实际上是由这个this对象来处理的. pipes.push_back (pipe_); // Let the derived socket type know about new pipe. xattach_pipe (pipe_, subscribe_to_all_); //还会加入到socket的fair-queue和load-balance-queue,这个先不在这里分析 // If the socket is already being closed, ask any new pipes to terminate // straight away. if (is_terminating ()) { register_term_acks (1); pipe_->terminate (false); } }
再看下add_endpoint (addr_, (own_t *) session, newpipe);
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe) { // Activate the session. Make it a child of this socket. launch_child (endpoint_); //激活session,把它加入到socket的owned集合 endpoints.insert (endpoints_t::value_type (std::string (addr_), endpoint_pipe_t (endpoint_, pipe))); //当前endpoints表示socket包含所有对端的信息,endpoints是以目标地址为key的multimap }
void zmq::own_t::launch_child (own_t *object_)
// Specify the owner of the object.
object_->set_owner (this);
// Plug the object into the I/O thread.
send_plug (object_); //把session对象plug到socket,给socket一个plug类型的命令消息,目标对象是session
// Take ownership of the object.
send_own (this, object_);
void zmq::object_t::send_plug (own_t *destination_, bool inc_seqnum_) { if (inc_seqnum_) destination_->inc_seqnum (); command_t cmd; cmd.destination = destination_; cmd.type = command_t::plug; send_command (cmd); }
void zmq::object_t::send_command (command_t &cmd_)
ctx->send_command (cmd_.destination->get_tid (), cmd_); //请注意这里的tid
void zmq::ctx_t::send_command (uint32_t tid_, const command_t &command_) { slots [tid_]->send (command_); }
void zmq::mailbox_t::send (const command_t &cmd_) { sync.lock (); cpipe.write (cmd_, false); const bool ok = cpipe.flush (); sync.unlock (); if (!ok) signaler.send (); }
inline void write (const T &value_, bool incomplete_) //ypipe_t { // Place the value to the queue, add new terminator element. queue.back () = value_; queue.push (); // Move the "flush up to here" poiter. if (!incomplete_) f = &queue.back (); }
session的get_tid()返回的其实就是创建session时选择的I/O thread的tid. 回顾一下笔记一的create_socket, context创建的I/O线程都有一个tid, 并且tid是作为context->slot邮箱管理数组的下标. session的基类object_t的构造函数:
zmq::object_t::object_t (object_t *parent_) : //parent正是I/O thread
ctx (parent_->ctx),
tid (parent_->tid)
通过笔记二,三可知道,I/O thread的邮箱有消息处理时,是通过邮箱个fd通知的,而这个fd刚好就是mailbox的signaler的r句柄,也就是说, I/O thread的轮询select会在mailbox->send()的siangler->send()之后激活邮箱消息可读.消息读出来后,经过void zmq::object_t::process_command (command_t &cmd_),这是由cmd.destination.process_command (cmd)调用的,所以处理函数还是根据destination来定义:
void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { ...... case command_t::plug: process_plug (); process_seqnum (); break; ...... }
void zmq::session_base_t::process_plug () { if (active) start_connecting (false); }
void zmq::session_base_t::start_connecting (bool wait_) { zmq_assert (active); // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. io_thread_t *io_thread = choose_io_thread (options.affinity); //到这里,上一个plug消息算完成了,新建connecter对象相当于一个新的消息需求.首先寻找一个负载小的I/O线程. zmq_assert (io_thread); // Create the connecter object. if (addr->protocol == "tcp") { if (!options.socks_proxy_address.empty()) { ...... } else { tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (io_thread, this, options, addr, wait_); alloc_assert (connecter); launch_child (connecter); //这一次launch_child调用的是在session对象里调用,过程和上面一样,命令消息发送到io_thread的邮箱去了 } return; } ...... }
当I/O thread收到处理消息时,调用的是tcp_connecter_t的函数了:
void zmq::tcp_connecter_t::process_plug () { if (delayed_start) add_reconnect_timer (); else start_connecting (); }
当delayed_start为true时,只是加了个timer延迟connect操作,最终还是调用start_connecting ():
void zmq::tcp_connecter_t::timer_event (int id_) { zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); if (id_ == connect_timer_id) { //connecter的timer只有两个timer id, connect_timer_started = false; rm_fd (handle); handle_valid = false; close (); add_reconnect_timer (); } else if (id_ == reconnect_timer_id) { reconnect_timer_started = false; start_connecting (); //最终的调用还是这个函数入口 } }
步骤1: 设置非阻塞,启动连接
实现非阻塞 connect ,首先把 sockfd 设置成非阻塞的。这样调用
connect 可以立刻返回,根据返回值和 errno 处理三种情况:
(1) 如果返回 0,表示 connect 成功。
(2) 如果返回值小于 0, errno 为 EINPROGRESS, 表示连接
(3) 如果返回值小于0,errno 不是 EINPROGRESS,则连接出错了。
然后把 sockfd 加入 select 的读写监听集合,通过 select 判断 sockfd
(1) 如果连接建立好了,对方没有数据到达,那么 sockfd 是可写的
(2) 如果在 select 之前,连接就建立好了,而且对方的数据已到达,
那么 sockfd 是可读和可写的。
(3) 如果连接发生错误,sockfd 也是可读和可写的。
判断 connect 是否成功,就得区别 (2) 和 (3),这两种情况下 sockfd 都是
可读和可写的,区分的方法是,调用 getsockopt 检查是否出错。
步骤3:使用 getsockopt 函数检查错误
getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &error, &len)
在 sockfd 都是可读和可写的情况下,我们使用 getsockopt 来检查连接
如果发生错误,getsockopt 源自 Berkeley 的实现将在变量 error 中
返回错误,getsockopt 本身返回0;然而 Solaris 却让 getsockopt 返回 -1,
并把错误保存在 errno 变量中。所以在判断是否有错误的时候,要处理
void zmq::tcp_connecter_t::start_connecting () { // Open the connecting socket. const int rc = open (); // Connect may succeed in synchronous manner. if (rc == 0) { //条件1 handle = add_fd (s); handle_valid = true; out_event (); } // Connection establishment may be delayed. Poll for its completion. else if (rc == -1 && errno == EINPROGRESS) { //条件2 handle = add_fd (s); handle_valid = true; set_pollout (handle); socket->event_connect_delayed (endpoint, zmq_errno()); // add userspace connect timeout add_connect_timer (); } // Handle any other error condition by eventual reconnect. else { //条件3 if (s != retired_fd) close (); add_reconnect_timer (); } }
open()函数的主要工作是创建新套接字句柄s,并设置为noblock,然后调用 ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); 由于是非阻塞的,所以connect()调用立即返回-1,并且设置errno错误代码为EINPROGRESS表示连接操作还在进行中,而同时三次握手还是在进行中的,握手是否完成可以在poller的select()调用里知道结果.
#ifdef ZMQ_HAVE_WINDOWS const int last_error = WSAGetLastError(); if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) errno = EINPROGRESS; else errno = wsa_error_to_errno (last_error); #else if (errno == EINTR) errno = EINPROGRESS; #endif
假如client发起连接时,对端还没启动listen.那么进入start_connecting()的条件2,把s加入到connecter的poller里error( add_fd(s)只是加入到error集合,见笔记二)和write的fd集合,s还没完成三次握手.如果options.connect_timeout >0的话,再给它加一个connect_timer_id的timer. 然后等待I/O线程poller轮询select(). 由于对端还没有listen,套接字s会发生错误,导致触发tcp_connecter_t:in_event()(如果是连接成功,则触发tcp_connecter_t:out_event()). 然而对于tcp_connecter_t来说,in_event()调用的还是out_event(), 所以s的可写或出错都是会调用同一个函数.
void zmq::tcp_connecter_t::out_event () { if (connect_timer_started) { //如果存在connect timer就去掉 cancel_timer (connect_timer_id); connect_timer_started = false; } rm_fd (handle); //从poller里去掉s handle_valid = false; const fd_t fd = connect (); //查看s的状态,判断三次握手是否成功,返回适当的fd值 // Handle the error condition by attempt to reconnect. if (fd == retired_fd) { //从这次结果看来s三次握手失败了 close ();//关闭套接字s add_reconnect_timer (); //并加一个reconnect timer return; } //到达这里说明s三次握手成功了,连接完成 tune_tcp_socket (fd); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_maxrt (fd, options.tcp_maxrt); // remember our fd for ZMQ_SRCFD in messages socket->set_fd (fd); // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); alloc_assert (engine); // Attach the engine to the corresponding session object. send_attach (session, engine); // Shut the connecter down. terminate (); socket->event_connected (endpoint, (int) fd); }
三次握手是否成功是由tcp_connecter_t::connect ()判断并返回fd,如果成功了,就执行后面的代码; 如果失败就加一个重连的timer,这个timer的处理函数上文已经给出了,最终还是调用start_connecting (),不断循环,直到连接成功为止.
zmq笔记四: tcp的connect操作