首页 > 代码库 > 使用ranch tcp开发服务端

使用ranch tcp开发服务端

Ranch:

简单来说,Ranch就是一个tcp acceptor pool,用于高并发下的tcp连接建立与管理。可以设置并发链接最大数量,在不关闭socket连接的情况下可以动态升级连接池。Cowboy就是使用的ranch。

https://github.com/ninenines/ranch


下面通过改造ranch自带的reverse example实现简易的服务端。


game_server.app.src

{application, game_server, [
	{description, "Ranch TCP reverse example."},
	{vsn, "1"},
	{modules, []},
	{registered, []}, 
	{applications, [
		kernel,
		stdlib,
		ranch
	]},
	{mod, {game_server_app, []}},
	{env, []}
]}.

game_server_app.erl

-module(game_server_app).
-behaviour(application).
-export([start/2, stop/1]).

%% start/2
start(_Type, _StartArgs) ->
    {ok, _Pid} = ranch:start_listener(tcp_reverse, 1,
        ranch_tcp, [{port, 5555},{max_connections, 10240}], game_protocol, []),
    game_server_sup:start_link().


%% stop/1
stop(State) ->
    ok.


这里注意ranch:start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> {ok, pid()} | {error, badarg}.

最大连接数max_connections就是在这里进行设定, 默认值1024. NbAcceptors, Acceptor的数量,具体数值要根据实际并发设置。

Ranch接受请求并建立连接,然后就会将具体的处理交给实现了ranch_protocol行为的game_protocol,erlang中的behaviour跟java中的接口差不多。


game_server_sup.erl

-module(game_server_sup).
-behaviour(supervisor).
-export([start_link/0, init/1]).

-spec start_link() -> {ok, pid()}.
start_link() ->
	supervisor:start_link({local, ?MODULE}, ?MODULE, []).


%% init/1
init([]) ->
	{ok, {{one_for_one, 10, 10}, []}}.

game_protocol.erl

-module(game_protocol).
-behaviour(gen_server).
-behaviour(ranch_protocol).

%% API.
-export([start_link/4]).

%% gen_server.
-export([init/4]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-define(TIMEOUT, 50000).

-record(state, {socket, transport}).

%% API.

start_link(Ref, Socket, Transport, Opts) ->
    proc_lib:start_link(?MODULE, init, [Ref, Socket, Transport, Opts]).

%% gen_server.

%% This function is never called. We only define it so that
%% we can use the -behaviour(gen_server) attribute.
init([]) -> {ok, undefined}.

init(Ref, Socket, Transport, _Opts = []) ->
    ok = proc_lib:init_ack({ok, self()}),
    ok = ranch:accept_ack(Ref),
    ok = Transport:setopts(Socket, [{active, once}, {packet, 2}]),
    gen_server:enter_loop(?MODULE, [],
        #state{socket=Socket, transport=Transport},
        ?TIMEOUT).

handle_info({tcp, Socket, Data}, State=#state{
        socket=Socket, transport=Transport}) ->
    io:format("Data:~p~n", [Data]),
    Transport:setopts(Socket, [{active, once}]),
    Transport:send(Socket, reverse_binary(Data)),
    {noreply, State, ?TIMEOUT};
handle_info({tcp_closed, _Socket}, State) ->
    {stop, normal, State};
handle_info({tcp_error, _, Reason}, State) ->
    {stop, Reason, State};
handle_info(timeout, State) ->
    {stop, normal, State};
handle_info(_Info, State) ->
    {stop, normal, State}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%% Internal.

reverse_binary(B) when is_binary(B) ->
    list_to_binary(lists:reverse(binary_to_list(B))).

这里init的实现与常规的gen_server不一样。首先来说为什么不能用常规的gen_server写法。常规写法如下:

init([Ref, Socket, Transport, Opts]) ->
    ok = ranch:accept_ack(Ref),
    ok = Transport:setopts(Socket, [{active, once}, {packet, 2}]),
    {ok, #state{socket=Socket, transport=Transport}}.

gen_server的start_link只有在init/1执行完毕后才会返回,但我们来看ranch:accept_ack(Ref):

-spec accept_ack(ref()) -> ok.
accept_ack(Ref) ->
    receive {shoot, Ref, Transport, Socket, AckTimeout} ->
        Transport:accept_ack(Socket, AckTimeout)
    end.
运行ranch:accept_ack/1时,进程会阻塞,等待{shoot, ...}这条消息,直到接收到此消息才会继续执行,接着才会完成init。但是{shoot, ...}这条消息从哪里来?查下ranch源码不难发现,ranch在建立了与新的gen_server进程的连接后,会向gen_server进程发送该消息(参考ranch_conns_sup:loop/4). 显然,gen_server进程在等待ranch:accept_ack接收到{shoot,...}消息迟迟不能返回,而ranch又无法与gen_server进程连接发送不了{shoot, ...}消息,造成死锁。故使用proc_lib:start_link/3优雅地解决了此问题。


下面copy一下文档的一个说明:

By default the socket will be set to return `binary` data, with the
options `{active, false}`, `{packet, raw}`, `{reuseaddr, true}` set.
These values can‘t be overriden when starting the listener, but
they can be overriden using `Transport:setopts/2` in the protocol.

It will also set `{backlog, 1024}` and `{nodelay, true}`, which
can be overriden at listener startup.

这也就是为什么{active, once}, {packet, 2}只能在procotol里重写


这样就实现了一个基本的服务端,make后编写脚本启动:

start.sh

erl -pa ebin deps/*/ebin +K true +P 199999     -sname game_server     -s game 

-s game表示启动时默认调用game:start/0方法。

game.erl

-module(game).

%% ====================================================================
%% API functions
%% ====================================================================
-export([start/0, stop/0]).

start() ->
    ok = application:start(ranch),
    ok = application:start(game_server).

stop() ->
    application:stop(ranch),
    application:stop(game_server).


如果设置{packet, raw}的话,直接打开一个Terminal $ telnet localhost 5555 就可以进行测试了。

不过这里设置的{packet,2}, 所以写了个测试client发送消息,建立连接->发送消息->接收返回消息->关闭连接:

-module(client).

-export([send/1]).

send(BinMsg) ->
    SomeHostInNet = "localhost", 
    {ok, Sock} = gen_tcp:connect(SomeHostInNet, 5555,
                                 [binary, {packet, 2}]),
    ok = gen_tcp:send(Sock, BinMsg),
    receive
        {tcp,Socket,String} ->
            io:format("Client received = ~p~n",[String]),       
            gen_tcp:close(Socket)
        after 60000 ->
            exit        
    end,
    ok = gen_tcp:close(Sock).

handler_info中加入不同消息的处理,就可以时间一个简单的游戏服务器了。R17后可以使用{active, N}, 程序效率应该会更高。