首页 > 代码库 > 使用ranch tcp开发服务端
使用ranch tcp开发服务端
Ranch:
简单来说,Ranch就是一个tcp acceptor pool,用于高并发下的tcp连接建立与管理。可以设置并发链接最大数量,在不关闭socket连接的情况下可以动态升级连接池。Cowboy就是使用的ranch。
https://github.com/ninenines/ranch
下面通过改造ranch自带的reverse example实现简易的服务端。
game_server.app.src
{application, game_server, [ {description, "Ranch TCP reverse example."}, {vsn, "1"}, {modules, []}, {registered, []}, {applications, [ kernel, stdlib, ranch ]}, {mod, {game_server_app, []}}, {env, []} ]}.
game_server_app.erl
-module(game_server_app). -behaviour(application). -export([start/2, stop/1]). %% start/2 start(_Type, _StartArgs) -> {ok, _Pid} = ranch:start_listener(tcp_reverse, 1, ranch_tcp, [{port, 5555},{max_connections, 10240}], game_protocol, []), game_server_sup:start_link(). %% stop/1 stop(State) -> ok.
这里注意ranch:start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> {ok, pid()} | {error, badarg}.
最大连接数max_connections就是在这里进行设定, 默认值1024. NbAcceptors, Acceptor的数量,具体数值要根据实际并发设置。
Ranch接受请求并建立连接,然后就会将具体的处理交给实现了ranch_protocol行为的game_protocol,erlang中的behaviour跟java中的接口差不多。game_server_sup.erl
-module(game_server_sup). -behaviour(supervisor). -export([start_link/0, init/1]). -spec start_link() -> {ok, pid()}. start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). %% init/1 init([]) -> {ok, {{one_for_one, 10, 10}, []}}.
game_protocol.erl
-module(game_protocol). -behaviour(gen_server). -behaviour(ranch_protocol). %% API. -export([start_link/4]). %% gen_server. -export([init/4]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(TIMEOUT, 50000). -record(state, {socket, transport}). %% API. start_link(Ref, Socket, Transport, Opts) -> proc_lib:start_link(?MODULE, init, [Ref, Socket, Transport, Opts]). %% gen_server. %% This function is never called. We only define it so that %% we can use the -behaviour(gen_server) attribute. init([]) -> {ok, undefined}. init(Ref, Socket, Transport, _Opts = []) -> ok = proc_lib:init_ack({ok, self()}), ok = ranch:accept_ack(Ref), ok = Transport:setopts(Socket, [{active, once}, {packet, 2}]), gen_server:enter_loop(?MODULE, [], #state{socket=Socket, transport=Transport}, ?TIMEOUT). handle_info({tcp, Socket, Data}, State=#state{ socket=Socket, transport=Transport}) -> io:format("Data:~p~n", [Data]), Transport:setopts(Socket, [{active, once}]), Transport:send(Socket, reverse_binary(Data)), {noreply, State, ?TIMEOUT}; handle_info({tcp_closed, _Socket}, State) -> {stop, normal, State}; handle_info({tcp_error, _, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State) -> {stop, normal, State}; handle_info(_Info, State) -> {stop, normal, State}. handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast(_Msg, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% Internal. reverse_binary(B) when is_binary(B) -> list_to_binary(lists:reverse(binary_to_list(B))).
这里init的实现与常规的gen_server不一样。首先来说为什么不能用常规的gen_server写法。常规写法如下:
init([Ref, Socket, Transport, Opts]) -> ok = ranch:accept_ack(Ref), ok = Transport:setopts(Socket, [{active, once}, {packet, 2}]), {ok, #state{socket=Socket, transport=Transport}}.
gen_server的start_link只有在init/1执行完毕后才会返回,但我们来看ranch:accept_ack(Ref):
-spec accept_ack(ref()) -> ok. accept_ack(Ref) -> receive {shoot, Ref, Transport, Socket, AckTimeout} -> Transport:accept_ack(Socket, AckTimeout) end.运行ranch:accept_ack/1时,进程会阻塞,等待{shoot, ...}这条消息,直到接收到此消息才会继续执行,接着才会完成init。但是{shoot, ...}这条消息从哪里来?查下ranch源码不难发现,ranch在建立了与新的gen_server进程的连接后,会向gen_server进程发送该消息(参考ranch_conns_sup:loop/4). 显然,gen_server进程在等待ranch:accept_ack接收到{shoot,...}消息迟迟不能返回,而ranch又无法与gen_server进程连接发送不了{shoot, ...}消息,造成死锁。故使用proc_lib:start_link/3优雅地解决了此问题。
下面copy一下文档的一个说明:
By default the socket will be set to return `binary` data, with the
options `{active, false}`, `{packet, raw}`, `{reuseaddr, true}` set.
These values can‘t be overriden when starting the listener, but
they can be overriden using `Transport:setopts/2` in the protocol.
It will also set `{backlog, 1024}` and `{nodelay, true}`, which
can be overriden at listener startup.
这也就是为什么{active, once}, {packet, 2}只能在procotol里重写
这样就实现了一个基本的服务端,make后编写脚本启动:
start.sh
erl -pa ebin deps/*/ebin +K true +P 199999 -sname game_server -s game
-s game表示启动时默认调用game:start/0方法。
game.erl
-module(game). %% ==================================================================== %% API functions %% ==================================================================== -export([start/0, stop/0]). start() -> ok = application:start(ranch), ok = application:start(game_server). stop() -> application:stop(ranch), application:stop(game_server).
如果设置{packet, raw}的话,直接打开一个Terminal $ telnet localhost 5555 就可以进行测试了。
不过这里设置的{packet,2}, 所以写了个测试client发送消息,建立连接->发送消息->接收返回消息->关闭连接:
-module(client). -export([send/1]). send(BinMsg) -> SomeHostInNet = "localhost", {ok, Sock} = gen_tcp:connect(SomeHostInNet, 5555, [binary, {packet, 2}]), ok = gen_tcp:send(Sock, BinMsg), receive {tcp,Socket,String} -> io:format("Client received = ~p~n",[String]), gen_tcp:close(Socket) after 60000 -> exit end, ok = gen_tcp:close(Sock).
handler_info中加入不同消息的处理,就可以时间一个简单的游戏服务器了。R17后可以使用{active, N}, 程序效率应该会更高。