首页 > 代码库 > libgsc的java实现
libgsc的java实现
花了约7天的时间用java又实现了一遍. 编程方式相比c++的版本有很大不同, 基本上是一种面向future的编程风格. 主要是期望在业务不复杂的
情况下, 可以在一个屏幕做完所有的事, 避免消息指令定义, 减少寻找回调的麻烦. 下面是一个demo.
/** 提交一个ITC事务, 由Gas-Actor向Db-Actor发送消息, 并期待响应, 请求的参数是一个Boolean, 响应的也是一个Boolean. */ Gsc.future(new GitcTrans<Gas, Db, Boolean, Boolean>(Gas.instance() /* from */, Db.instance() /* to */, null /* 消息. */) { /** 请求到来, 由Db-Actor所在的线程调用request函数, 因此这里Db-Actor可以安全做它自己的事. */ public Boolean req(Db db, Boolean req) { return db.loadDb(); /* 加载数据库, 并返回结果. */ } /** 响应到来, 由Gas-Actor所在的线程调用response函数, 并携带之前的request, 和上面的返回值. */ public void rsp(Gas gas, Boolean req, Boolean rsp) { if (!rsp) /* 加载数据库失败, 退出. */ System.exit(1); Gcb.init(); if (!Gsc.publish()) System.exit(1); } /** 超时, libgsc有一个默认值, 一样是由Gas-actor所在的线程调用, 并携带从事务提交到当前逝去的毫秒数. */ public void timeout(Gas gas, Boolean req, long elap) { } });
与java/scala的future类似, Gsc.future(...)总是立即返回, 内部GitcTrans对象的req/rsp/timeout函数都通过libgsc内部消息总线进行路由调用.
libgsc保证req函数总是由目标Actor所在的线程调用, rsp/timeout总是由消息发起方Actor所在的线程调用.
libgsc提供三个主要的future函数, 适用于不同的场景
1. 参数为GitcTrans, 就是上面demo中的场景, 一个actor向另一个actor发送请求, 并期待响应, 等不到响应的时候, 就一定会等到超时.
这种场景设计被用于数据库操作, 也就是允许适当的阻塞. 因此才定义了一个超时函数. 实际上上面的Db-Actor拥有自己独立的线
程或线程池, 以避免IO中断时, 阻塞libgsc的消息总线.
2. 参数为GitcNot, 适用于一个actor向另一个actor发送通知, 不需要响应. demo如下:
Gsc.future(new GitcNot<N2H, Db, GcAuthWithGasReq>(n2h, Db.instance(), req) { public void not(Db db, GcAuthWithGasReq req) { GcAuthWithGasRsp rsp = db.authGc(req); if (rsp == null) gt.endRet(Gsc.RET_INVALID); else gt.endSuccess(rsp); } });
3. 参数为Gh2nTrans, H2N即host到network方向, 也就是libgsc向外的网络连接Actor对象. 适用于libgsc(作client)向其它网元(作server)
发送消息, 并期待响应, demo如下:
Gas gas = new Gas(Net.getAddr("192.168.8.129", 1225)); /* 创建一个H2N-Actor. */ Misc.sleep(1000); /* 等待连接建立. */ GsAuthWithGasReq auth = GsAuthWithGasReq.newBuilder().setKey(ByteString.copyFromUtf8("auth-info")).build(); Gsc.future(new Gh2nTrans<Gsc, Gas, GsAuthWithGasReq, GsAuthWithGasRsp>(CmdGas.GAS_REQ_GS_AUTH.ordinal(), Gsc.instance(), gas, auth) { public void rsp(Gas gas, GsAuthWithGasReq req, GsAuthWithGasRsp rsp) { Log.info("req: %s, rsp: %s", Misc.pb2str(req), Misc.pb2str(rsp)); } public void timeout(Gsc gsc, GsAuthWithGasReq req, long elap) { Log.info("req: %s", Misc.pb2str(req)); } });与其它的future调用不同的时, Gh2nTrans的由libgsc自动处理发送(到网络上)请求(上面的auth对象), 响应消息则是由H2N-Actor所在的线程调用,
在上面的例子中, 是Gas. 简单来说, Gh2nTrans就是发送一个请求消息到其它网元, libgsc在收到响应后, 调用rsp函数, 如果等不到rsp, 就一定会
等到timeout.
libgsc的消息总线设计沿用了c++版本的方式, 通过pipe进行线程间通信. 当然, java中无法前转指针, 所以使用了一个ConcurrentLinkedQueue, 先将消
息入队, 如果发现目标线程空闲, 就发送一个字节的消息到selector, 唤醒目标线程. 目标线程醒来后, 一次会将队列中的消息全部取完. 这种方式要比直
接在管道中传递指针更高效, 当然, 队列换成ringbuf, 或者disruptor可能还会更好.
基于上面的设计, 在centos6.6 x86_64bit, 四核i5cpu的机器上测试了一下. 一个TCP连接, 利用127.0.0.1 whie(true)发送约200个字节的pb消息到 libgsc,
libgsc再经过两不同线程的actor交互处理结果. 吞吐量约为15万个事务/s, 极端情况下为30万/s.
----------------------------------------------------------------------------
c++11中, 利用lamda, 也可以实现这种面向future编程方式. 并且看起来比java的实现更舒服, 如果还能做到像libcaf那样省掉模板参数就更完美了.
水平有限, 还需要努力.
string req = "foo"; NetActor* from = NULL; NetActor* to = NULL; Gsc.future(new Future<NetActor, NetActor, string, string>(from, to, &req, // [](NetActor* t, string* req)-> string* { return new string("bar"); }, // [](NetActor* f, string* rsp) { }, // [](NetActor* f, string* req, long elap) { }));
libgsc的java实现