首页 > 代码库 > Curator源码解析(五)Curator的连接和重试机制

Curator源码解析(五)Curator的连接和重试机制

转载请注明出处: jiq?钦‘s technical Blog

本文将主要关注Curator是如何处理连接丢失和会话终止这两个关键问题的。

1.   连接丢失的处理

Curator中利用类ConnectionState来管理客户端到ZooKeeper集群的连接状态,其中用到原子布尔型变量来标识当前连接是否已经建立:

private finalAtomicBoolean isConnected=newAtomicBoolean(false);

在事件处理函数中(ConnectionState实现了Watcher接口)修改isConnected的值:

@Override

   publicvoidprocess(WatchedEvent event)

   {

       //逐个调用parentWatchers容器中的Watcherprocess函数

        for ( Watcher parentWatcher :parentWatchers )

        {

            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());

            parentWatcher.process(event);

            timeTrace.commit();

        }

 

        //记录旧连接状态

        boolean wasConnected =isConnected.get();

        boolean newIsConnected = wasConnected;

        if ( event.getType() ==Watcher.Event.EventType.None )

        {

            //获取新连接状态

            newIsConnected =checkState(event.getState(), wasConnected);

        }

 

        //若状态发生变化,则修改

        if ( newIsConnected != wasConnected )

        {

           isConnected.set(newIsConnected);

            connectionStartMs = System.currentTimeMillis();

        }

   }

 

其中checkState函数获取当前连接状态是否为已连接:

private booleancheckState(Event.KeeperState state, boolean wasConnected)

   {

        boolean isConnected = wasConnected;

        boolean checkNewConnectionString =true;

        switch ( state )

        {

        default:

        //连接丢失

        case Disconnected:

        {

            isConnected = false;

            break;

        }

 

        //连接建立

        case SyncConnected:

        case ConnectedReadOnly:

        {

            isConnected = true;

            break;

        }

 

        //验证失败

        case AuthFailed:

        {

            isConnected = false;

            log.error("Authentication failed");

            break;

        }

 

        //会话终止

        case Expired:

        {

            isConnected = false;

            checkNewConnectionString = false;

            handleExpiredSession();

            break;

        }

 

        case SaslAuthenticated:

        {

            // NOP

            break;

        }

        }

 

        if ( checkNewConnectionString &&zooKeeper.hasNewConnectionString())

        {

            handleNewConnectionString();

        }

 

        return isConnected;

}

 

若平时发生连接丢失isConnected(标识当前连接状态)被置为false,ZooKeeper自动重连回来之后isConnected被置为true,所以在平时连接与否无关紧要,但是当发起ZooKeeper操作(like getChildren,get/setData, create, delete)时,若发生连接丢失的情况,则会抛出ConnectionLossexception,那么Curator这个时候是如何处理的呢?

 

下面以SetData操作来看,下面是Curator执行SetData操作的代码:

TimeTrace   trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Foreground");

        Stat resultStat =RetryLoop.callWithRetry

        (

            client.getZookeeperClient(),

            new Callable<Stat>()

            {

                @Override

                public Stat call()throws Exception

                {

                    returnclient.getZooKeeper().setData(path, data, version);

                }

            }

        );

 

可以看到真正的setData操作被包装到了callWithRetry函数中:

public static<T>T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)throws Exception

   {

        T result = null;

        RetryLoop retryLoop =client.newRetryLoop();

        while ( retryLoop.shouldContinue() )

        {

            try

           {

                //检测当前连接状态,若未连接则等待一定时间直到连接完成

               client.internalBlockUntilConnectedOrTimedOut();

               

                //调用带返回值的Callable方法

                result = proc.call();

                retryLoop.markComplete();

            }

            catch ( Exception e )

            {

                retryLoop.takeException(e);

            }

        }

        return result;

}

 

这个函数其实很简单,步骤如下:

(1)     检测当前是否已连接,若已连接则执行下一句代码,否则等待一定时间;

(2)     执行真正的ZooKeeper操作;

(3)     执行成功,标记为执行完成。

 

若执行ZooKeeper操作的时候发生任何异常,将会执行takeException函数:

public voidtakeException(Exception exception) throws Exception

   {

        boolean rethrow =true;

        if ( isRetryException(exception) )

        {

            //是否允许继续重试

            if (retryPolicy.allowRetry(retryCount++,System.currentTimeMillis() -startTimeMs, sleeper))

            {

                rethrow = false;

            }          

        }

 

        if ( rethrow )

        {

            throw exception;

        }

}

 

---如果isRetryException函数判断抛出的异常是否是“连接丢失异常/会话终止异常”,如果是则判断是否允许重试(其实传递进来的重试策略就是简单地进行三次重试),允许重试的话就不抛出异常,返回,继续下一轮循环。

---如果isRetryException函数判断不属于“连接丢失异常/会话终止异常”,比如是其他的一些其他异常(create操作可能引起NodeExists 异常, delete操作可能引起NoNode异常),那么将继续把异常抛出,callWithRetry函数将因为异常而结束返回。

 

这就是Curator处理连接丢失的策略,平时仅仅是通过在watch事件响应函数中记录连接状态isConnected,执行ZooKeeper操作的时候,先等待连接状态isConnected变为true再执行操作,若执行期间若发生异常,仅仅在当异常类型为“连接丢失/会话终止”时进行重试,反复几次。

 

这种机制个人认为已经足够应付所有场景。

 

2.   会话终止的处理

和连接丢失一样,我们需要分别来分析平时和执行ZooKeeper操作时发生“会话终止”异常Curator怎么来处理。

还是看ConnectionState类中watch事件响应函数,其中有这么一段代码:

//会话终止

case Expired:

{

   isConnected = false;

   checkNewConnectionString = false;

   handleExpiredSession();

   break;

}

关键是看handleExpiredSession函数:

private voidhandleExpiredSession()

   {

        try

        {

            reset();

        }

        catch ( Exception e )

        {

            queueBackgroundException(e);

        }

}

 

就是一个reset函数:

private synchronizedvoidreset() throwsException

   {

        log.debug("reset");

 

        instanceIndex.incrementAndGet();

 

        isConnected.set(false);

        connectionStartMs = System.currentTimeMillis();

        zooKeeper.closeAndReset();

        zooKeeper.getZooKeeper();  // initiateconnection

   }

 

关键是看最后两句代码,先是执行HandleHolder的closeAndReset函数:

void closeAndReset()throws Exception

   {

        internalClose();

 

        helper = new Helper()

        {

            private volatile ZooKeeper zooKeeperHandle =null;

            private volatile String connectionString =null;

 

            @Override

            public ZooKeeper getZooKeeper()throws Exception

            {

                synchronized(this)

                {

                    if (zooKeeperHandle == null)

                    {

                        connectionString =ensembleProvider.getConnectionString();

                        zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout, watcher,canBeReadOnly);

                    }

 

                    helper = new Helper()

                    {

                        @Override

                        public ZooKeepergetZooKeeper()throwsException

                        {

                            returnzooKeeperHandle;

                        }

 

                        @Override

                        public StringgetConnectionString()

                        {

                            returnconnectionString;

                        }

                    };

 

                   returnzooKeeperHandle;

                }

            }

 

            @Override

            public String getConnectionString()

            {

                returnconnectionString;

            }

        };

   }

 

如果对这个函数不清楚,可以回过头看看之前文章讲的Curator的初始化和启动的源码分析,HandleHolder是原生ZooKeeper对象的持有者,维护ZooKeeper对象的单例就是通过这个函数。

 

一旦发生会话终止异常,ZooKeeper句柄会被自动关闭,所以之前初始化的helper对象中的zooKeeperHandle变量将会变得不可用,所以需要调用这个closeAndReset函数重新初始化helper对象,然后再调用一次getZooKeeper函数执行zookeeperFactory.newZooKeeper初始化好ZooKeeper句柄。

注意:会话终止后ZooKeeper句柄会被自动关闭,但并不是被置为null了,所以在用原来的helper对象的getZooKeeper方法返回的句柄是不可用的。

 

再看看当执行ZooKeeper操作时发生了会话终止时怎么处理。如果执行ZooKeeper时发生了会话终止,watch事件响应函数中会中心构建ZooKeeper句柄,callWithRetry函数中不但判断当前发生的是连接丢失异常时会进行重试,判断是会话终止异常也会进行重试。

 

所以说Curator处理会话终止的方法,就是在收到Expired事件时候重新构建HandleHolder中维护的ZooKeeper句柄。

 

关于临时节点和watch事件

特别注意,有的应用程序创建的临时节点和注册的watch事件至关重要,无法忍受丢失的情况,若发生会话终止,它们必定会被ZooKeeper服务器端删除掉,并且Curator无法帮助你重新还原回来。

 

这个时候就需要应用程序自己处理,在收到会话终止异常之后,重新注册关键的watch事件,以及重新创建关键的临时节点。

Curator源码解析(五)Curator的连接和重试机制