首页 > 代码库 > gremlin driver/server 基于netty的 session实现
gremlin driver/server 基于netty的 session实现
gremlin-server中 实现session需要两点保证:
- session 绑定了 变量列表;
- 每一个session必须 在同一台 server进程的同一个 线程中运行。 这是又tinkpop graph transaction的threadlocal 机制要求的。
1. SessionOpProcessor.java 中维护了 id -》 session的列表, 每个session 维护 Binding变量, 这即是 java ScriptEngine 的binding。
protected static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>(); http://docs.oracle.com/javase/7/docs/api/javax/script/ScriptEngine.html#eval(java.lang.String,%20javax.script.Bindings)
2. gremlin driver 端每一个session 固定在一台机器:Client.java
/** * Randomly choose an available {@link Host} to bind the session too and initialize the {@link ConnectionPool}. */ @Override protected void initializeImplementation() { // chooses an available host at random final List<Host> hosts = cluster.allHosts() .stream().filter(Host::isAvailable).collect(Collectors.toList()); if (hosts.isEmpty()) throw new IllegalStateException("No available host in the cluster"); Collections.shuffle(hosts); final Host host = hosts.get(0); connectionPool = new ConnectionPool(host, this, Optional.of(1), Optional.of(1)); }
3. 每个session 有自己的 SingleThreadExecutor,保证单线程执行. Session.java
/** * By binding the session to run ScriptEngine evaluations in a specific thread, each request will respect * the ThreadLocal nature of Graph implementations. */ private final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactoryWorker);
4. 保证 每个Encoder 都在session对应的thread执行。 这样保证了 netty的pipeline 在同一个thread执行:
GremlinResponseFrameEncoder.java
// if the request came in on a session then the serialization must occur in that same thread, except // in the case of an error where we can free the session executor from having to do that job. the // problem here is that if the session executor is used in the case of an error and the executor is // blocked by parallel requests then there is no thread available to serialize the result and send // back the response as the workers get all tied up behind the session executor. if (null == session || !o.getStatus().getCode().isSuccess()) serialized = new Frame(serializer.serializeResponseAsBinary(o, ctx.alloc())); else serialized = new Frame(session.getExecutor().submit(() -> serializer.serializeResponseAsBinary(o, ctx.alloc())).get());
http://netty.io/4.0/api/io/netty/channel/ChannelPipeline.html
// Tell the pipeline to run MyBusinessLogicHandler‘s event handler methods // in a different thread than an I/O thread so that the I/O thread is not blocked by // a time-consuming task. // If your business logic is fully asynchronous or finished very quickly, you don‘t // need to specify a group. pipeline.addLast(group, "handler", new MyBusinessLogicHandler());
gremlin driver/server 基于netty的 session实现
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。