首页 > 代码库 > Curator源码解析(五)Curator的连接和重试机制
Curator源码解析(五)Curator的连接和重试机制
转载请注明出处: jiq?钦‘s technical Blog
本文将主要关注Curator是如何处理连接丢失和会话终止这两个关键问题的。
1. 连接丢失的处理
Curator中利用类ConnectionState来管理客户端到ZooKeeper集群的连接状态,其中用到原子布尔型变量来标识当前连接是否已经建立:
private finalAtomicBoolean isConnected=newAtomicBoolean(false);
在事件处理函数中(ConnectionState实现了Watcher接口)修改isConnected的值:
@Override
publicvoidprocess(WatchedEvent event)
{
//逐个调用parentWatchers容器中的Watcher的process函数
for ( Watcher parentWatcher :parentWatchers )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());
parentWatcher.process(event);
timeTrace.commit();
}
//记录旧连接状态
boolean wasConnected =isConnected.get();
boolean newIsConnected = wasConnected;
if ( event.getType() ==Watcher.Event.EventType.None )
{
//获取新连接状态
newIsConnected =checkState(event.getState(), wasConnected);
}
//若状态发生变化,则修改
if ( newIsConnected != wasConnected )
{
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}
}
其中checkState函数获取当前连接状态是否为已连接:
private booleancheckState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString =true;
switch ( state )
{
default:
//连接丢失
case Disconnected:
{
isConnected = false;
break;
}
//连接建立
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break;
}
//验证失败
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break;
}
//会话终止
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
case SaslAuthenticated:
{
// NOP
break;
}
}
if ( checkNewConnectionString &&zooKeeper.hasNewConnectionString())
{
handleNewConnectionString();
}
return isConnected;
}
若平时发生连接丢失,isConnected(标识当前连接状态)被置为false,ZooKeeper自动重连回来之后isConnected被置为true,所以在平时连接与否无关紧要,但是当发起ZooKeeper操作(like getChildren,get/setData, create, delete)时,若发生连接丢失的情况,则会抛出ConnectionLossexception,那么Curator这个时候是如何处理的呢?
下面以SetData操作来看,下面是Curator执行SetData操作的代码:
TimeTrace trace = client.getZookeeperClient().startTracer("SetDataBuilderImpl-Foreground");
Stat resultStat =RetryLoop.callWithRetry
(
client.getZookeeperClient(),
new Callable<Stat>()
{
@Override
public Stat call()throws Exception
{
returnclient.getZooKeeper().setData(path, data, version);
}
}
);
可以看到真正的setData操作被包装到了callWithRetry函数中:
public static<T>T callWithRetry(CuratorZookeeperClient client, Callable<T> proc)throws Exception
{
T result = null;
RetryLoop retryLoop =client.newRetryLoop();
while ( retryLoop.shouldContinue() )
{
try
{
//检测当前连接状态,若未连接则等待一定时间直到连接完成
client.internalBlockUntilConnectedOrTimedOut();
//调用带返回值的Callable方法
result = proc.call();
retryLoop.markComplete();
}
catch ( Exception e )
{
retryLoop.takeException(e);
}
}
return result;
}
这个函数其实很简单,步骤如下:
(1) 检测当前是否已连接,若已连接则执行下一句代码,否则等待一定时间;
(2) 执行真正的ZooKeeper操作;
(3) 执行成功,标记为执行完成。
若执行ZooKeeper操作的时候发生任何异常,将会执行takeException函数:
public voidtakeException(Exception exception) throws Exception
{
boolean rethrow =true;
if ( isRetryException(exception) )
{
//是否允许继续重试
if (retryPolicy.allowRetry(retryCount++,System.currentTimeMillis() -startTimeMs, sleeper))
{
rethrow = false;
}
}
if ( rethrow )
{
throw exception;
}
}
---如果isRetryException函数判断抛出的异常是否是“连接丢失异常/会话终止异常”,如果是则判断是否允许重试(其实传递进来的重试策略就是简单地进行三次重试),允许重试的话就不抛出异常,返回,继续下一轮循环。
---如果isRetryException函数判断不属于“连接丢失异常/会话终止异常”,比如是其他的一些其他异常(create操作可能引起NodeExists 异常, delete操作可能引起NoNode异常),那么将继续把异常抛出,callWithRetry函数将因为异常而结束返回。
这就是Curator处理连接丢失的策略,平时仅仅是通过在watch事件响应函数中记录连接状态isConnected,执行ZooKeeper操作的时候,先等待连接状态isConnected变为true再执行操作,若执行期间若发生异常,仅仅在当异常类型为“连接丢失/会话终止”时进行重试,反复几次。
这种机制个人认为已经足够应付所有场景。
2. 会话终止的处理
和连接丢失一样,我们需要分别来分析平时和执行ZooKeeper操作时发生“会话终止”异常Curator怎么来处理。
还是看ConnectionState类中watch事件响应函数,其中有这么一段代码:
//会话终止
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break;
}
关键是看handleExpiredSession函数:
private voidhandleExpiredSession()
{
try
{
reset();
}
catch ( Exception e )
{
queueBackgroundException(e);
}
}
就是一个reset函数:
private synchronizedvoidreset() throwsException
{
log.debug("reset");
instanceIndex.incrementAndGet();
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
zooKeeper.closeAndReset();
zooKeeper.getZooKeeper(); // initiateconnection
}
关键是看最后两句代码,先是执行HandleHolder的closeAndReset函数:
void closeAndReset()throws Exception
{
internalClose();
helper = new Helper()
{
private volatile ZooKeeper zooKeeperHandle =null;
private volatile String connectionString =null;
@Override
public ZooKeeper getZooKeeper()throws Exception
{
synchronized(this)
{
if (zooKeeperHandle == null)
{
connectionString =ensembleProvider.getConnectionString();
zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout, watcher,canBeReadOnly);
}
helper = new Helper()
{
@Override
public ZooKeepergetZooKeeper()throwsException
{
returnzooKeeperHandle;
}
@Override
public StringgetConnectionString()
{
returnconnectionString;
}
};
returnzooKeeperHandle;
}
}
@Override
public String getConnectionString()
{
returnconnectionString;
}
};
}
如果对这个函数不清楚,可以回过头看看之前文章讲的Curator的初始化和启动的源码分析,HandleHolder是原生ZooKeeper对象的持有者,维护ZooKeeper对象的单例就是通过这个函数。
一旦发生会话终止异常,ZooKeeper句柄会被自动关闭,所以之前初始化的helper对象中的zooKeeperHandle变量将会变得不可用,所以需要调用这个closeAndReset函数重新初始化helper对象,然后再调用一次getZooKeeper函数执行zookeeperFactory.newZooKeeper初始化好ZooKeeper句柄。
注意:会话终止后ZooKeeper句柄会被自动关闭,但并不是被置为null了,所以在用原来的helper对象的getZooKeeper方法返回的句柄是不可用的。
再看看当执行ZooKeeper操作时发生了会话终止时怎么处理。如果执行ZooKeeper时发生了会话终止,watch事件响应函数中会中心构建ZooKeeper句柄,callWithRetry函数中不但判断当前发生的是连接丢失异常时会进行重试,判断是会话终止异常也会进行重试。
所以说Curator处理会话终止的方法,就是在收到Expired事件时候重新构建HandleHolder中维护的ZooKeeper句柄。
关于临时节点和watch事件
特别注意,有的应用程序创建的临时节点和注册的watch事件至关重要,无法忍受丢失的情况,若发生会话终止,它们必定会被ZooKeeper服务器端删除掉,并且Curator无法帮助你重新还原回来。
这个时候就需要应用程序自己处理,在收到会话终止异常之后,重新注册关键的watch事件,以及重新创建关键的临时节点。
Curator源码解析(五)Curator的连接和重试机制