首页 > 代码库 > Zookeeper 3.4.6 Client端流程粗略梳理

Zookeeper 3.4.6 Client端流程粗略梳理

首先从Zookeeper入手,Zookeeper-->ClientCnxn-->sendThread/eventThread

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

这里默认使用getClientCnxnSocket(),使用NIO实现ClientCnxnSocketNIO。

cnxn.start();

public void start() {
    sendThread.start();
    eventThread.start();
}

Zookeeper的APi与Server交互,cnxn.submitRequest提交请求,getData为例,各种xxRequest,xxRespone

public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException
 {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}

cnxn.submitRequest先丢到通过queuePacket将请求加入队列outgoingQueue,同步请求阻塞在packet.wait(),这个最后在SendThread.readRespons()-->finishPacket(packet);中被唤醒

public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

加入outgoingQueue队列

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration)
{
    Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            //添加到outgoingQueue发送队列
            outgoingQueue.add(packet);
        }
    }
    //唤醒Selector
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

ClientCnxn的后台线程SendThread负责发送outgoingQueue中的请求,以及接受Server端返回的数据,还包括心跳的发送,run()

先看主要IO流程 clientCnxnSocket.doTransport-->doIO(pendingQueue, outgoingQueue, cnxn)-->sockKey.isWritable()-->sock.write(p.bb)-->服务端处理请求——>sockKey.isReadable()--> sendThread.readResponse(incomingBuffer)--> finishPacket(packet)-->p.notifyAll();

这里在sock.write(p.bb),hui将Packet加入到pendingQueue队列

如果是读取完了之后,自然要从pendingQueue移除remove


clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = System.currentTimeMillis();
while (state.isAlive()) {
    try {
        if (!clientCnxnSocket.isConnected()) {
            if(!isFirstConnect){
                try {
                    Thread.sleep(r.nextInt(1000));
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected exception", e);
                }
            }
            // don‘t re-establish connection if we are closing
            if (closing || !state.isAlive()) {
                break;
            }
            startConnect();
            clientCnxnSocket.updateLastSendAndHeard();
        }

        if (state.isConnected()) {
            // determine whether we need to send an AuthFailed event.
            if (zooKeeperSaslClient != null) {
                boolean sendAuthEvent = false;
                if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                    try {
                        zooKeeperSaslClient.initialize(ClientCnxn.this);
                    } catch (SaslException e) {
                       LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                        state = States.AUTH_FAILED;
                        sendAuthEvent = true;
                    }
                }
                KeeperState authState = zooKeeperSaslClient.getKeeperState();
                if (authState != null) {
                    if (authState == KeeperState.AuthFailed) {
                        // An authentication error occurred during authentication with the Zookeeper Server.
                        state = States.AUTH_FAILED;
                        sendAuthEvent = true;
                    } else {
                        if (authState == KeeperState.SaslAuthenticated) {
                            sendAuthEvent = true;
                        }
                    }
                }

                if (sendAuthEvent == true) {
                    eventThread.queueEvent(new WatchedEvent(
                          Watcher.Event.EventType.None,
                          authState,null));
                }
            }
            to = readTimeout - clientCnxnSocket.getIdleRecv();
        } else {
            to = connectTimeout - clientCnxnSocket.getIdleRecv();
        }

        if (to <= 0) {
            throw new SessionTimeoutException(
                    "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv() + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId));
        }
        if (state.isConnected()) {
            int timeToNextPing = readTimeout / 2
                    - clientCnxnSocket.getIdleSend();
            if (timeToNextPing <= 0) {
                sendPing();
                clientCnxnSocket.updateLastSend();
            } else {
                if (timeToNextPing < to) {
                    to = timeToNextPing;
                }
            }
        }

        // If we are in read-only mode, seek for read/write server
        if (state == States.CONNECTEDREADONLY) {
            long now = System.currentTimeMillis();
            int idlePingRwServer = (int) (now - lastPingRwServer);
            if (idlePingRwServer >= pingRwTimeout) {
                lastPingRwServer = now;
                idlePingRwServer = 0;
                pingRwTimeout =
                    Math.min(2*pingRwTimeout, maxPingRwTimeout);
                pingRwServer();
            }
            to = Math.min(to, pingRwTimeout - idlePingRwServer);
        }

        clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
    } catch (Throwable e) {
        if (closing) {
            if (LOG.isDebugEnabled()) {
                // closing so this is expected
                LOG.debug("An exception was thrown while closing send thread for session 0x"
                        + Long.toHexString(getSessionId())
                        + " : " + e.getMessage());
            }
            break;
        } else {
            // this is ugly, you have a better way speak up
            if (e instanceof SessionExpiredException) {
                LOG.info(e.getMessage() + ", closing socket connection");
            } else if (e instanceof SessionTimeoutException) {
                LOG.info(e.getMessage() + RETRY_CONN_MSG);
            } else if (e instanceof EndOfStreamException) {
                LOG.info(e.getMessage() + RETRY_CONN_MSG);
            } else if (e instanceof RWServerFoundException) {
                LOG.info(e.getMessage());
            } else {
                LOG.warn(
                        "Session 0x"
                                + Long.toHexString(getSessionId())
                                + " for server "
                                + clientCnxnSocket.getRemoteSocketAddress()
                                + ", unexpected error"
                                + RETRY_CONN_MSG, e);
            }
            cleanup();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(
                        Event.EventType.None,
                        Event.KeeperState.Disconnected,
                        null));
            }
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
        }
    }
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
    eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
            Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                         "SendThread exitedloop.");

Zookeeper 3.4.6 Client端流程粗略梳理