首页 > 代码库 > skynet源码阅读<3>--网关分析
skynet源码阅读<3>--网关分析
继上一篇介绍了skynet的网络部分之后,这一篇以网关gate.lua为例,简单分析下其串接和处理流程。
在官方给出的范例中,是以examples/main.lua作为启动脚本的,在此过程中会创建watchdog服务:
1 local watchdog = skynet.newservice("watchdog") 2 skynet.call(watchdog, "lua", "start", { 3 port = 8888, 4 maxclient = max_client, 5 nodelay = true, 6 })
首先加载watchdog.lua脚本。而在watchdog.lua的加载过程中,创建了gate服务。加载gate.lua过程中,调用gateserver.start(gate),gateserver会向skynet注册socket协议的处理:
1 skynet.register_protocol { 2 name = "socket", 3 id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6 4 unpack = function ( msg, sz ) 5 return netpack.filter( queue, msg, sz) 6 end, 7 dispatch = function (_, _, q, type, ...) 8 queue = q 9 if type then 10 MSG[type](...) 11 end 12 end 13 }
另外gateserver也会注册lua协议的处理,这里就不展开了。gateserver中会拦截skynet_socket_message;也会拦截部分lua消息(一般是由watchdog转发而来),并调用gate注册进来的回调。注意gateserver才是skynet消息的入口,gate只不过是个回调而已。至此,gate服务加载完毕。
watchdog服务加载完毕后,main.lua中接着调用watchdog的start方法,其参数分别指定了侦听的端口、最大客户端连接数、是否延迟等。看下watchdog的start方法:
1 function CMD.start(conf) 2 skynet.call(gate, "lua", "open" , conf) 3 end
其紧接着调用gate的open方法,而这个方法在gateserver中被拦截了:
1 function CMD.open( source, conf ) 2 assert(not socket) 3 local address = conf.address or "0.0.0.0" 4 local port = assert(conf.port) 5 maxclient = conf.maxclient or 1024 6 nodelay = conf.nodelay 7 skynet.error(string.format("Listen on %s:%d", address, port)) 8 socket = socketdriver.listen(address, port) 9 socketdriver.start(socket) 10 if handler.open then 11 return handler.open(source, conf) 12 end 13 end
可以看到,在open方法中创建了socket并开始了侦听过程。回忆上篇,socket操作的LuaAPI作为socketdriver被实现在lua-socket.c文件中,看一眼这里的listen是如何交互的:
1 static int 2 llisten(lua_State *L) { 3 const char * host = luaL_checkstring(L,1); 4 int port = luaL_checkinteger(L,2); 5 int backlog = luaL_optinteger(L,3,BACKLOG); 6 struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); 7 int id = skynet_socket_listen(ctx, host,port,backlog); 8 if (id < 0) { 9 return luaL_error(L, "Listen error"); 10 } 11 12 lua_pushinteger(L,id); 13 return 1; 14 }
析取参数,获取关联的skynet-context之后,调用skynet_socket.c的skynet_socket_listen:
1 int 2 skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) { 3 uint32_t source = skynet_context_handle(ctx); 4 return socket_server_listen(SOCKET_SERVER, source, host, port, backlog); 5 }
拿到context-handle,这个handle在后续创建socket时会被关联起来。handle作为参数opaque传递入socket_server.c中的socket_server_listen方法中:
1 int 2 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) { 3 int fd = do_listen(addr, port, backlog); 4 if (fd < 0) { 5 return -1; 6 } 7 struct request_package request; 8 int id = reserve_id(ss); 9 if (id < 0) { 10 close(fd); 11 return id; 12 } 13 request.u.listen.opaque = opaque; 14 request.u.listen.id = id; 15 request.u.listen.fd = fd; 16 send_request(ss, &request, ‘L‘, sizeof(request.u.listen)); 17 return id; 18 }
在作了bind和listen之后,将此socket描述符打包为request写入socket_server的读管道,由socket_server_poll轮循处理。至此,gate服务中listen的流程已经非常清晰了。
再回到gateserver.CMD.open方法中,在socketdriver.listen之后,紧接着调用socketdriver.start开始网络事件的处理(具体细节请按上述流程参照源码),最后调用gate.lua的回调handler.open。在这个过程中,gate服务的skynet-context-handle已经与对应的socket绑定了,后续关于此socket的SOCKET消息都会转发到gate服务中来,具体到代码则是由gateserver接收过滤后,再做进一步的分发处理。
那么一个新的客户端连接是如何被接收创建的呢?
在socket_server_poll过程中,会检查是否有网络事件产生(以下是简化的代码):
1 int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { 2 for (;;) { 3 // 管道select 4 ...... 5 6 // socket event check 7 ...... 8 9 // s: socket 10 switch (s->type) { 11 case SOCKET_TYPE_CONNECTING: 12 return report_connect(ss, s, result); 13 case SOCKET_TYPE_LISTEN: { 14 int ok = report_accept(ss, s, result); 15 if (ok > 0) { return SOCKET_ACCEPT; }
17 if (ok < 0) { return SOCKET_ERROR; } 20 // when ok == 0, retry 21 break; 22 }
23 // other
24 ......25 } 26 }
对于正在listen的socket(SOCKET_TYPE_LISTEN),当发生事件(即侦测到有新的连接)时,会在report_accept中accept得连接描述符fd并创建socket结构,然后返回SOCKET_ACCEPT交由上层的skynet_socket.c:skynet_socket_poll处理,后者会封装类型为SKYNET_SOCKET_TYPE_ACCEPT的skynet-message并推入到gate服务的队列中去,最终转发到gateserver中所注册的SOCKET协议入口。
回到gateserver中来(见上述gateserver摘录的代码),在接收到网络消息时,先是在unpack中通过netpack(见lua-netpack.c)合并过滤消息,比如TCP消息粘包等(注意skynet_socket_message如果其padding为true,则表示非数据的命令,比如SKYNET_SOCKET_TYPE_ACCEPT)。对于SKYNET_SOCKET_TYPE_ACCEPT命令,netpack会解析并转换为open命令,最后gateserver会调用到MSG.open:
1 function MSG.open(fd, msg) 2 if client_number >= maxclient then 3 socketdriver.close(fd) 4 return 5 end 6 if nodelay then 7 socketdriver.nodelay(fd) 8 end 9 connection[fd] = true 10 client_number = client_number + 1 11 handler.connect(fd, msg) 12 end
回调到gate.lua中的connect:
1 function handler.connect(fd, addr) 2 local c = { 3 fd = fd, 4 ip = addr, 5 } 6 connection[fd] = c 7 skynet.send(watchdog, "lua", "socket", "open", fd, addr) 8 end
watchdog.lua中的SOCKET.open:
1 function SOCKET.open(fd, addr) 2 skynet.error("New client from : " .. addr) 3 agent[fd] = skynet.newservice("agent") 4 skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() }) 5 end
此时,会创建玩家agent服务,并调用start方法:
1 function CMD.start(conf) 2 local fd = conf.client 3 local gate = conf.gate 4 WATCHDOG = conf.watchdog 5 -- slot 1,2 set at main.lua 6 host = sprotoloader.load(1):host "package" 7 send_request = host:attach(sprotoloader.load(2)) 8 skynet.fork(function() 9 while true do 10 send_package(send_request "heartbeat") 11 skynet.sleep(500) 12 end 13 end) 14 15 client_fd = fd 16 skynet.call(gate, "lua", "forward", fd) 17 end
agent拿到gate服务的标识后,调用forward将自己的标识注册到gate服务中来:
1 function CMD.forward(source, fd, client, address) 2 local c = assert(connection[fd]) 3 unforward(c) 4 c.client = client or 0 5 c.agent = address or source 6 forwarding[c.agent] = c 7 gateserver.openclient(fd) 8 end
gateserver.openclient开始侦听此socket的事件:
function gateserver.openclient(fd) if connection[fd] then socketdriver.start(fd) end end
至此,一个新连接的建立流程就结束了。那么连接建立后网络数据又是如何转发进来的呢?流程依然是一致的,socket_server_poll侦测到READ后读取数据并转发到gateserver中来,后者调用netpack对数据粘包,此时会调用MSG.more或MSG.data将数据转交给gate.message:
1 function handler.message(fd, msg, sz) 2 -- recv a package, forward it 3 local c = connection[fd] 4 local agent = c.agent 5 if agent then 6 skynet.redirect(agent, c.client, "client", 0, msg, sz) 7 else 8 skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz)) 9 end 10 end
直接将数据转发给之前通过forward注册进来的agent,采用的协议是"client"。agent中需注册此协议的处理,拿到字节流并根据上层的业务协议对数据转码,做进一步的处理。这样,数据接收和转发的流程就结束了。其它方面,比如数据发送,关闭socket等等,流程上都是一致的,具体细节不再详述。
skynet源码阅读<3>--网关分析