首页 > 代码库 > Curator源码解析(二)初始化和启动分析

Curator源码解析(二)初始化和启动分析

上一篇文章这里已经列出了Curator的一个使用的例子,这篇文章将详细分析其初始化和启动部分。

测试程序分析

1      初始化和启动

(1) newClient方法返回CuratorFramework接口对象:

 

public staticCuratorFramework newClient(String connectString, int sessionTimeoutMs,int connectionTimeoutMs,RetryPolicy retryPolicy)
   {
        return builder().
            connectString(connectString).
            sessionTimeoutMs(sessionTimeoutMs).
           connectionTimeoutMs(connectionTimeoutMs).
            retryPolicy(retryPolicy).
            build();
    }

 

看看builder()方法:

   //返回用于构建CuratorFramework的新的builder对象
   publicstaticBuilder builder()
   {
        return new Builder();
}

 

可以看到这个方法返回一个构建CuratorFramework的Builder。Builder类就在CuratorFrameworkFactory.java文件中。

        //设置连接到的ZooKeeper集群的地址列表
        public BuilderconnectString(String connectString)
        {
            ensembleProvider =newFixedEnsembleProvider(connectString);
            return this;
        }

 

前面方法都是设置当前对象的属性,然后将当前Builder对象返回,设置的属性可以看到包括这些:

private EnsembleProvider   ensembleProvider;
        private int                 sessionTimeoutMs =DEFAULT_SESSION_TIMEOUT_MS;
        private int                 connectionTimeoutMs =DEFAULT_CONNECTION_TIMEOUT_MS;
        private int                 maxCloseWaitMs =DEFAULT_CLOSE_WAIT_MS;
        private RetryPolicy        retryPolicy;
        private ThreadFactory      threadFactory =null;
        private String             namespace;
        private String             authScheme =null;
        private byte[]              authValue =http://www.mamicode.com/null;>
 

主要看最后一个build()方法:

//使用当前的Builder对象构建一个CuratorFramework接口对象
        public CuratorFramework build()
        {
            return new CuratorFrameworkImpl(this);
    }

 

可以看到创建一个CuratorFrameworkImpl实例,将当前Builder对象传递进去。

CuratorFrameworkImpl类是CuratorFramework接口的实现类,看看其构造函数:

public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
   {
        ZookeeperFactory localZookeeperFactory= makeZookeeperFactory(builder.getZookeeperFactory());
        this.client =new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(),builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(),new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
               processEvent(event);
            }
        }, builder.getRetryPolicy(),builder.canBeReadOnly());
 
        listeners = new ListenerContainer<CuratorListener>();
        unhandledErrorListeners =newListenerContainer<UnhandledErrorListener>();
       backgroundOperations = newDelayQueue<OperationAndData<?>>();
        namespace = new NamespaceImpl(this, builder.getNamespace());
        threadFactory = getThreadFactory(builder);
        maxCloseWaitMs = builder.getMaxCloseWaitMs();
        connectionStateManager =new ConnectionStateManager(this,builder.getThreadFactory());
        compressionProvider =builder.getCompressionProvider();
        aclProvider = builder.getAclProvider();
        state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
 
        byte[] builderDefaultData =http://www.mamicode.com/builder.getDefaultData();>
 

CuratorFrameworkImpl主要是对CuratorZookeeperClient的封装,所以我们主要看构造函数中第二句代码是如何构建CuratorZookeeperClient对象的。构造函数中除了参数1和参数5,其他参数都是来自builder对象,参数1是localZookeeperFactory,通过下面方法构造:

ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());

下面是makeZookeeperFactory()方法的实现代码:

private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
   {
        return new ZookeeperFactory()
        {
            @Override
            public ZooKeeper newZooKeeper(StringconnectString,intsessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
            {
                ZooKeeper zooKeeper =actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher,canBeReadOnly);
                AuthInfo auth = authInfo.get();
                if ( auth !=null )
                {
                    zooKeeper.addAuthInfo(auth.scheme, auth.auth);
                }
 
                return zooKeeper;
            }
        };
}

 

传递进来的是builder中定义的ZookeeperFactory对象,实际上就是Curator提供的DefaultZookeeperFactory类,定义如下:

public classDefaultZookeeperFactory implements ZookeeperFactory
{
   @Override
   publicZooKeeper newZooKeeper(String connectString,int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws Exception
   {
        return new ZooKeeper(connectString, sessionTimeout,watcher, canBeReadOnly);
   }
}

 

仅仅是简单地new出一个原生的ZooKeeper对象,所以传递到CuratorZookeeperClient构造函数中的ZookeeperFactory类的newZooKeeper返回的是原生的ZooKeeper对象。

 

参数5是一个Watcher对象,其中事件响应函数process()又调用了processEvent()方法:

private void processEvent(finalCuratorEvent curatorEvent)
   {
        if ( curatorEvent.getType() ==CuratorEventType.WATCHED )
        {
           validateConnection(curatorEvent.getWatchedEvent().getState());
        }
 
        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListenerlistener)
            {
                try
                {
                    TimeTrace trace = client.startTracer("EventListener");
                   listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();
                }
                catch ( Exception e )
                {
                    logError("Event listener threw exception", e);
                }
                returnnull;
            }
       });
}

这个watcher什么时候被调用?作用是什么?这个问题稍后再解答。

 

接着深入进去看的CuratorZookeeperClient类的构造函数:

   publicCuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProviderensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs, Watcher watcher, RetryPolicy retryPolicy,boolean canBeReadOnly)
   {
        this.connectionTimeoutMs = connectionTimeoutMs;
        state = new ConnectionState(zookeeperFactory,ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher,tracer, canBeReadOnly);
        setRetryPolicy(retryPolicy);
    }

 

主要在构造函数的最后两行初始化下面两个成员变量:

private finalConnectionState state;

private finalAtomicReference<RetryPolicy> retryPolicy =newAtomicReference<RetryPolicy>();

 

继续看ConnectionState的构造函数:

ConnectionState(ZookeeperFactoryzookeeperFactory, EnsembleProvider ensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs,Watcher parentWatcher, AtomicReference<TracerDriver> tracer,boolean canBeReadOnly)
   {
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.connectionTimeoutMs = connectionTimeoutMs;
        this.tracer = tracer;
        if ( parentWatcher !=null )
        {
            parentWatchers.offer(parentWatcher);
        }
 
        zooKeeper = new HandleHolder(zookeeperFactory,this, ensembleProvider,sessionTimeoutMs, canBeReadOnly);
}

做了两件事情,一个是把传递进来的watcher对象放入parentWatchers容器中,一个是new出HandleHolder对象,注意这里讲当前ConnectionState对象作为watcher参数传递到HandleHolder构造函数。继续看HandleHolder构造函数:

HandleHolder(ZookeeperFactoryzookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider,int sessionTimeout,boolean canBeReadOnly)
   {
        this.zookeeperFactory = zookeeperFactory;
        this.watcher = watcher;
        this.ensembleProvider = ensembleProvider;
        this.sessionTimeout = sessionTimeout;
        this.canBeReadOnly = canBeReadOnly;
}

 

简单字段赋值,这个类是ZooKeeper对象的持有者,其中包含两个关键函数:

ZooKeeper getZooKeeper() throws Exception
   {
        return (helper !=null) ?helper.getZooKeeper() : null;
}
 
void closeAndReset()throws Exception
   {
        internalClose();
 
        // first helper is synchronized when getZooKeeper is called. Subsequentcalls
        // are not synchronized.
        helper = new Helper()
        {
            private volatile ZooKeeper zooKeeperHandle =null;
            private volatile String connectionString =null;
 
            @Override
            public ZooKeeper getZooKeeper()throws Exception
            {
                synchronized(this)
                {
                    if (zooKeeperHandle ==null)
                    {
                        connectionString =ensembleProvider.getConnectionString();
                        zooKeeperHandle =zookeeperFactory.newZooKeeper(connectionString,sessionTimeout,watcher,canBeReadOnly);
                    }
 
                    helper = newHelper()
                    {
                        @Override
                        public ZooKeepergetZooKeeper()throwsException
                        {
                            returnzooKeeperHandle;
                        }
 
                        @Override
                        public StringgetConnectionString()
                        {
                            returnconnectionString;
                        }
                    };
 
                    returnzooKeeperHandle;
                }
            }
 
            @Override
            public String getConnectionString()
            {
                returnconnectionString;
            }
        };
}

 

可以看到这个类提供了一个getZooKeeper()方法,返回ZooKeeper对象,closeAndReset()方法是对helper对象的初始化,Helper中的getZooKeeper()方法返回的是ZooKeeper对象的单例,保障一个HandleHolder只会持有一个ZooKeeper对象。

总结一下,通过CuratorFrameworkFactory类的newClient()方法将会返回一个实现了CuratorFramework接口的实现类CuratorFrameworkImpl的对象,这个对象中包含一个CuratorZookeeperClient对象,里面又包含一个ConnectionState对象,再里面又包含一个HandleHolder对象,这个对象通过从最外层逐层传递进来的DefaultZookeeperFactory对象获取原生ZooKeeper对象,并以单例进行维护,每一层都有一个getZooKeeper()方法,在外面调用会最终到HandleHolder这里来取得一个ZooKeeper对象。

这里面HandleHolderZooKeeper对象的持有者,外层封装的ConnectionState类是核心,管理ZooKeeper的连接状态,响应ZooKeeperwatch回调事件。这个回调函数是创建HandleHolder对象时将自己传递进去注册的。

初始化这一步骤注册的真正的原生ZooKeeper对象的watcher响应事件是ConnectionState类中的process()函数,我们看看这个函数:

@Override
   publicvoidprocess(WatchedEvent event)
   {
       //逐个调用parentWatchers容器中的Watcher的process函数
        for ( Watcher parentWatcher :parentWatchers )
        {
            TimeTrace timeTrace = new TimeTrace("connection-state-parent-process",tracer.get());
            parentWatcher.process(event);
            timeTrace.commit();
        }
 
        boolean wasConnected =isConnected.get();
        boolean newIsConnected = wasConnected;
        if ( event.getType() ==Watcher.Event.EventType.None )
        {
            newIsConnected =checkState(event.getState(), wasConnected);
        }
 
        //若当前连接状态不为false,则真正设置isConnected = true
        if ( newIsConnected != wasConnected )
        {
            isConnected.set(newIsConnected);
            connectionStartMs = System.currentTimeMillis();
        }
   }

 

这个watch事件响应函数主要做两件事:

(1)将parentWatchers容器中的所有Watcher都调用一次;

(2)检查并更新ConnectionState类中维护的ZooKeeper的连接状态isConnected

 

那么parentWatchers容器中有哪些Watcher呢,目前只有CuratorFrameworkImpl构造函数中初始化CuratorZookeeperClient对象时传递进去的Watcher,如下所示:

this.client=newCuratorZookeeperClient(localZookeeperFactory,builder.getEnsembleProvider(), builder.getSessionTimeoutMs(),builder.getConnectionTimeoutMs(),new Watcher()
        {
            @Override
            public void process(WatchedEvent watchedEvent)
            {
                CuratorEvent event = newCuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED,watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null);
                processEvent(event);
            }
        }, builder.getRetryPolicy(),builder.canBeReadOnly());

 

其中的processEvent()函数实际上是将CuratorListener列表中的所有事件响应函数全部调用一次,这个和异步执行ZooKeeper操作相关,具体不介绍了,详细可以参考这个例子:

public staticvoid     setDataAsync(CuratorFramework client,String path,byte[]payload)throwsException
   {
        // this is one method of getting event/async notifications
        CuratorListener listener =newCuratorListener()
        {
            @Override
            public void eventReceived(CuratorFramework client,CuratorEvent event) throws Exception
            {
                // examine event for details
            }
        };
       client.getCuratorListenable().addListener(listener);
 
        // set data for the given node asynchronously. The completion notification
        // is done via the CuratorListener.
       client.setData().inBackground().forPath(path, payload);
}

 

(2)CuratorFramework 的start()方法启动:


CuratorFramework的start方法会调用CuratorZookeeperClient对象的start方法,内部又调用ConnectionState的start方法,最后ConnectionState的start方法调用一个reset方法:

private synchronizedvoidreset() throwsException
{
    isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        zooKeeper.closeAndReset();
        zooKeeper.getZooKeeper();  // initiateconnection
}

 

主要是最后两句代码,调用HandleHolder类对象zooKeeper的closeAndReset方法是为了实例化获取ZooKeeper对象的Helper对象,调用一次getZooKeeper方法是为了先第一次实例化好ZooKeeper对象,提高之后调用访问接口时的性能。




Curator源码解析(二)初始化和启动分析