在Vert.x中,Vertx
接口是最为重要的一个接口,vertx-core的基础功能都在此接口中提供。这篇文章中我们就来分析一下Vertx
接口体系的内部实现以及创建流程。本文对应Vert.x的版本为 3.2.1。
Vertx接口体系
我们来看一下Vertx
接口的UML关系:
可以看到有VertxImpl <:< VertxInternal <:< Vertx
这个继承链。这里我们先不研究Measured
和MetricsProvider
这两个接口。我们先来看一下VertxInternal
的结构:
可以看到里边包含了各种操作多线程、执行回调等等的方法。VertxInternal
接口仅供vertx-core内部调用。
VertxImpl
类是对VertxInternal
和Vertx
接口的实现。我们创建的Vertx实例都是VertxImpl
。
Vertx创建流程以及内部实现
通常,我们都会通过Vertx
接口中的静态方法vertx
创建Vertx实例:
1
|
Vertx vertx = Vertx.vertx();
|
vertx
方法底层通过工厂模式创建VertxImpl实例:
1 2 3
|
static Vertx vertx() { return factory.vertx(); }
|
1 2 3 4 5 6 7 8
|
public class VertxFactoryImpl implements VertxFactory {
@Override public Vertx vertx() { return new VertxImpl(); } // ... }
|
下面我们来探究一下VertxImpl
类的创建流程和内部实现。我们首先来看一下VertxImpl
类的实例成员:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
private final FileSystem fileSystem = getFileSystem(); private final SharedData sharedData; private final VertxMetrics metrics; private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>(); private final AtomicLong timeoutCounter = new AtomicLong(0); private final ClusterManager clusterManager; private final DeploymentManager deploymentManager; private final FileResolver fileResolver; private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>(); private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>(); private final ExecutorService workerPool; private final ExecutorService internalBlockingPool; private final OrderedExecutorFactory workerOrderedFact; private final OrderedExecutorFactory internalOrderedFact; private final ThreadFactory eventLoopThreadFactory; private final NioEventLoopGroup eventLoopGroup; private final NioEventLoopGroup acceptorEventLoopGroup; private final BlockedThreadChecker checker; private final boolean haEnabled; private EventBus eventBus; private HAManager haManager; private boolean closed;
|
这里面包含了一系列重要的类。我们将在初始化部分来分析这些成员的作用。下面我们来看一下构造函数:
1 2 3 4 5 6 7 8 9 10 11
|
VertxImpl() { this(new VertxOptions()); }
VertxImpl(VertxOptions options) { this(options, null); }
VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) { // ... }
|
可以看到最终都会调用到VertxImpl(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler)
这个构造函数,下面我们就来分析一下。
首先,Vertx会检查当前是否有Vertx实例运行(通过factory.context()
方法)。如果有实例运行的话就会给出警告。
1 2 3 4
|
// Sanity check if (Vertx.currentContext() != null) { log.warn("You‘re already on a Vert.x context, are you sure you want to create a new Vertx instance?"); }
|
接着Vertx会初始化checker
成员,它是一个BlockedThreadChecker
,作用是检查vertx context中是否有阻塞的线程,如果有线程阻塞则给出警告。
1 2
|
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getMaxEventLoopExecuteTime(), options.getMaxWorkerExecuteTime(), options.getWarningExceptionTime());
|
接下来,Vertx会初始化EventLoop线程工厂eventLoopThreadFactory
,它用于产生EventLoop线程。然后初始化eventLoopGroup
并进行配置。NioEventLoopGroup
是Netty里的概念,将在稍后进行介绍。
1 2 3
|
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false); eventLoopGroup = new NioEventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory); eventLoopGroup.setIoRatio(NETTY_IO_RATIO);
|
接下来,Vertx会初始化Acceptor EventLoop线程工厂,并对其进行配置。然后对workerPool
和internalBlockingPool
这两个线程池进行初始化。其中workerPool
用于执行worker线程,internalBlockingPool
用于执行阻塞操作线程。
1 2 3 4 5 6 7 8 9
|
ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false); // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections // under a lot of load acceptorEventLoopGroup = new NioEventLoopGroup(1, acceptorEventLoopThreadFactory); acceptorEventLoopGroup.setIoRatio(100); workerPool = Executors.newFixedThreadPool(options.getWorkerPoolSize(), new VertxThreadFactory("vert.x-worker-thread-", checker, true)); internalBlockingPool = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), new VertxThreadFactory("vert.x-internal-blocking-", checker, true));
|
然后,Vertx会初始化两个线程池工厂workerOrderedFact
和internalOrderedFact
,它们的类型是OrderedExecutorFactory
,里边包含一种能够按照次序执行线程的线程池。
1 2
|
workerOrderedFact = new OrderedExecutorFactory(workerPool); internalOrderedFact = new OrderedExecutorFactory(internalBlockingPool);
|
接下来,Vertx会依次对文件解析器fileResolver
、部署管理器deploymentManager
、SPI管理器metrics
进行初始化,并且根据配置来决定是否初始化集群管理器clusterManager
和高可用管理器haManager
。然后Vertx会调用createAndStartEventBus(options, resultHandler)
方法,创建并启动EventBus。最后对共享数据成员sharedData
进行初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
this.fileResolver = new FileResolver(this); this.deploymentManager = new DeploymentManager(this); this.metrics = initialiseMetrics(options); this.haEnabled = options.isClustered() && options.isHAEnabled(); if (options.isClustered()) { this.clusterManager = getClusterManager(options); this.clusterManager.setVertx(this); this.clusterManager.join(ar -> { if (ar.failed()) { log.error("Failed to join cluster", ar.cause()); } else { // Provide a memory barrier as we are setting from a different thread synchronized (VertxImpl.this) { haManager = new HAManager(this, deploymentManager, clusterManager, options.getQuorumSize(), options.getHAGroup(), haEnabled); createAndStartEventBus(options, resultHandler); } } }); } else { this.clusterManager = null; createAndStartEventBus(options, resultHandler); } this.sharedData = http://www.mamicode.com/new SharedDataImpl(this, clusterManager); }
|
经过一系列的构造后,VertxImpl
创建完成。
以上。
以下为链接:
来自vertx中国用户组,
(不同版本,代码可能有少许不同之处(我目前使用的是3.3.2),但不妨碍阅读和理解)
http://mp.weixin.qq.com/s?__biz=MzA4MjUwNzQ0NQ==&mid=2650547613&idx=1&sn=5435d20496e4c9f57d958a45855f9f59&chksm=878c2647b0fbaf51bb8a937c2468117a159e2e0acc8a5bf4ba6404b6282d1f6fd1241f01de25&mpshare=1&scene=23&srcid=1206gxIsVTqNU9B885NQYJzL#rd
vertx核心类之VertxImpl