首页 > 代码库 > 使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)

使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)

参考资料:http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principles

服务器设计
tcp_server_app下的根监控树使用one_for_one重启策略。两个子树应用,第一个是一个tcp套接字监听服务器,使用gen_server模式来实现,采用异步监听的客户端连接的模式。第二个是一个客户端应用,使用gen_fsm模式实现,使用标准SASL错误报告接口,记录客户端消息处理的日志以及非正常与服务器断开连接日志。

整体应用架构:
                 +----------------+
                 | tcp_server_app |
                 +--------+-------+
                          | (one_for_one)
         +----------------+---------+
         |                                       |
 +-------+------+           +-------+--------+
 | tcp_listener |                + tcp_client_sup |
 +--------------+            +-------+--------+
                                                  | (simple_one_for_one)
                                         +-----|---------+
                                      +-------|--------+|
                                     +--------+-------+|+
                                      |  tcp_echo_fsm  |+
                                     +----------------+

tcp_server代码如下:

  1 %% TCP Server Application (tcp_server_app.erl)  2 -module(tcp_server_app).  3 -author(‘saleyn@gmail.com‘).  4    5 %% 实现application模式  6 -behaviour(application).  7    8 -export([start_client/0]).  9   10 %% 应用程序启动以及监控树回调函数 11 -export([start/2, stop/1, init/1]). 12   13 %% 宏变量定义 14 -define(MAX_RESTART,    5). 15 -define(MAX_TIME,      60). 16 -define(DEF_PORT,    2222). 17   18 %% 启动客户端进程的接口 19 %% 在监听程序建立连接时调用 20 start_client() -> 21     %% 回调第二个init函数,因为第二个是动态添加监控树子节点 22     %% 也就是说这里是两颗不同的监控树,使用了一个模块两个 init 函数来实现 23     supervisor:start_child(tcp_client_sup, []).  24   25 %%---------------------------------------------------------------------- 26 %% Application behaviour callbacks 27 %%---------------------------------------------------------------------- 28 start(_Type, _Args) -> 29     %% 获取端口配置参数,找不到时返回默认端口 ?DEF_PORT 30     ListenPort = get_app_env(listen_port, ?DEF_PORT),  31  32     %% 启动应用程序,回调函数为 第一个 init 函数,根据参数匹配,参数为 [端口,客户端回调模块] 33     %% 第一个 init 函数仅仅是启动了两个监控树 34     supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). 35   36 stop(_S) -> 37     ok. 38   39 %%---------------------------------------------------------------------- 40 %% Supervisor behaviour callbacks 41 %%---------------------------------------------------------------------- 42 init([Port, Module]) -> 43     {ok, 44         %% 监控树策略参数,ono_for_one策略,设置MAX_TIME最多重启的MAX_RESTART次 45         {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, 46             [ 47               % TCP Listener 48               {   tcp_server_sup,                          % Id       = internal id 49                   {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} 50                   permanent,                               % Restart  = permanent | transient | temporary 51                   2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity 52                   worker,                                  % Type     = worker | supervisor 53                   [tcp_listener]                           % Modules  = [Module] | dynamic 54               }, 55               % Client instance supervisor 56               {    57                      %% Module参数初始化了tcp_client_sup监控树的 init 函数, init 函数在下面 58                      tcp_client_sup,  59                      %% 子节点启动策略 60                   {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]},  61                   permanent,                               % Restart  = permanent | transient | temporary 62                   infinity,                                % Shutdown = brutal_kill | int() >= 0 | infinity 63                   supervisor,                              % Type     = worker | supervisor 64                   []                                       % Modules  = [Module] | dynamic 65               } 66             ] 67         } 68     }; 69   70 %% 在服务器接收连接时,创建客户端进程时会回调到这个函数,使用simple_one_for_one启动策略  71 %% 参数 Module 在第一个 72 init([Module]) -> 73     {ok,     74         %% 另外一种根监督树模式,simple_one_for_one策略子节点只能动态添加 75         {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, 76             [ 77               % TCP Client 78               {   undefined,                               % Id       = internal id 79                   {Module,start_link,[]},                  % StartFun = {M, F, A} 80                   temporary,                               % Restart  = permanent | transient | temporary 81                   2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity 82                   worker,                                  % Type     = worker | supervisor 83                   []                                       % Modules  = [Module] | dynamic 84               } 85             ] 86         } 87     }. 88   89 %%---------------------------------------------------------------------- 90 %% Internal functions 91 %%---------------------------------------------------------------------- 92 %% 获取配置文件xxx.app文件中的配置变量  93 get_app_env(Opt, Default) -> 94     case application:get_env(application:get_application(), Opt) of 95     {ok, Val} -> Val; 96     _ -> 97         case init:get_argument(Opt) of 98         [[Val | _]] -> Val; 99         error       -> Default100         end101     end.

 

下面是服务端socket监听程序,这里使用了一个不具有官方文档的 api
 prim_inet:async_accept/2 来实现一个异步监听套接字的服务器程序,代码如下:

  1 % TCP Listener Process (tcp_listener.erl)      2 -module(tcp_listener).  3 -author(‘saleyn@gmail.com‘).  4    5 %% 实现 gen_server 模式   6 -behaviour(gen_server).  7    8 %% 内部接口  9 -export([start_link/2]). 10   11 %% gen_server 回调函数 12 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 13          code_change/3]). 14   15 %% 定义了一个 record 记录 gen_server 进程的状态 16 -record(state, { 17                 listener,       % Listening socket 18                 acceptor,       % Asynchronous acceptor‘s internal reference 19                 module          % FSM handling module 20                }). 21   22 %%-------------------------------------------------------------------- 23 %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} 24 %% @doc 监控树调用并开始进行tcp套接字监听 25 %% @end 26 %%---------------------------------------------------------------------- 27 start_link(Port, Module) when is_integer(Port), is_atom(Module) -> 28     gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []). 29   30 %%%------------------------------------------------------------------------ 31 %%% Callback functions from gen_server 32 %%%------------------------------------------------------------------------ 33   34 %%---------------------------------------------------------------------- 35 %% @spec (Port::integer()) -> {ok, State}           | 36 %%                            {ok, State, Timeout}  | 37 %%                            ignore                | 38 %%                            {stop, Reason} 39 %% 40 %% @doc gen_server启动时回调,并创建 tcp 监听 41 %% @end 42 %%---------------------------------------------------------------------- 43 init([Port, Module]) -> 44     process_flag(trap_exit, true), 45     Opts = [binary, {packet, 2}, {reuseaddr, true}, 46             {keepalive, true}, {backlog, 30}, {active, false}], 47     %% 使用 gen_tcp 模块启动套接字监听,这是一个阻塞动作 48     case gen_tcp:listen(Port, Opts) of 49     {ok, Listen_socket} -> %% 创建监听成功返回监听socket 50         %% 创建第一个接受连接的进程 51         %% prim_inet:async_accept/2开启异步监听 52         %% 之后有客户端连接时会向此进程发送一个异步消息inet_async到进程消息队列 53         %% Ref 存储接受进程的引用 54         {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), 55         {ok, #state{listener = Listen_socket, 56                     acceptor = Ref, 57                     module   = Module}}; 58     {error, Reason} -> 59         {stop, Reason} 60     end. 61   62 %%------------------------------------------------------------------------- 63 %% @spec (Request, From, State) -> {reply, Reply, State}          | 64 %%                                 {reply, Reply, State, Timeout} | 65 %%                                 {noreply, State}               | 66 %%                                 {noreply, State, Timeout}      | 67 %%                                 {stop, Reason, Reply, State}   | 68 %%                                 {stop, Reason, State} 69 %% @doc 服务进程被同步调用时的回调函数 70 %% @end 71 %% @private 72 %%------------------------------------------------------------------------- 73 handle_call(Request, _From, State) -> 74     {stop, {unknown_call, Request}, State}. 75   76 %%------------------------------------------------------------------------- 77 %% @spec (Msg, State) ->{noreply, State}          | 78 %%                      {noreply, State, Timeout} | 79 %%                      {stop, Reason, State} 80 %% @doc 服务进程被异步调用时的回调函数 81 %% @end 82 %% @private 83 %%------------------------------------------------------------------------- 84 handle_cast(_Msg, State) -> 85     {noreply, State}. 86   87 %%------------------------------------------------------------------------- 88 %% @spec (Msg, State) ->{noreply, State}          | 89 %%                      {noreply, State, Timeout} | 90 %%                      {stop, Reason, State} 91 %% @doc 回调函数,处理那些直接发消息到进程邮箱的事件 92 %% 这里处理的是 {inet_async, ListSock, Ref, {ok, CliSocket}}事件, 93 %% inet_async 表示是一个异步事件,服务器端接收连接采用异步的方式, 94 %% 客户端连接最终会被转化成一个 inet_async 消息发送到进程邮箱等待处理 95 %% {{ok, CliSocket}} 里的CliSocket表示客户端建立的连接套接口 96 %% @end 97 %% @private 98 %%------------------------------------------------------------------------- 99 100 %% 注意这里 ListSock 以及 Ref 做了匹配,只有匹配了才是该监听口接收的连接101 handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},102             #state{listener=ListSock, acceptor=Ref, module=Module} = State) ->103     try104         case set_sockopt(ListSock, CliSocket) of105         ok              -> ok;106         {error, Reason} -> exit({set_sockopt, Reason})107         end,108  109         %% 接收新的客户端连接,启动一个新的客户端状态机进程,动态添加到 tcp_client_sup 客户端监控树110         {ok, Pid} = tcp_server_app:start_client(),111 112         %% 绑定 CliSocet 到客户端进程 Pid, 这样CliSocket接收数据都会被转化成Pid代表进程的邮箱消息113         gen_tcp:controlling_process(CliSocket, Pid),114         %% Instruct the new FSM that it owns the socket.115 116         Module:set_socket(Pid, CliSocket),117  118         %% Signal the network driver that we are ready to accept another connection119         %% 重新设置异步监听下一个客户端连接的消息,设置新的监听引用120         %% 必须重新设置才能监听到 {inet_async,S,Ref,Status} 消息121         case prim_inet:async_accept(ListSock, -1) of122         {ok,    NewRef} -> ok;123         {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})124         end,125          126          %% 更新新的监听引用127         {noreply, State#state{acceptor=NewRef}}128     catch exit:Why ->129         error_logger:error_msg("Error in async accept: ~p.\n", [Why]),130         {stop, Why, State}131     end;132 133 %%客户端建立连接的容错处理134 handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->135     error_logger:error_msg("Error in socket acceptor: ~p.\n", [Error]),136     {stop, Error, State};137  138 handle_info(_Info, State) ->139     {noreply, State}.140  141 %%-------------------------------------------------------------------------142 %% @spec (Reason, State) -> any143 %% @doc  Callback executed on server shutdown. It is only invoked if144 %%       `process_flag(trap_exit, true)‘ is set by the server process.145 %%       The return value is ignored.146 %% @end147 %% @private148 %%-------------------------------------------------------------------------149 terminate(_Reason, State) ->150     gen_tcp:close(State#state.listener),151     ok.152  153 %%-------------------------------------------------------------------------154 %% @spec (OldVsn, State, Extra) -> {ok, NewState}155 %% @doc  Convert process state when code is changed.156 %% @end157 %% @private158 %%-------------------------------------------------------------------------159 code_change(_OldVsn, State, _Extra) ->160     {ok, State}.161  162 %%%------------------------------------------------------------------------163 %%% Internal functions164 %%%------------------------------------------------------------------------165  166 %% 设置客户端socket的参数选项,只是简单的复制了监听服务器的配置选项167 set_sockopt(ListSock, CliSocket) ->168     true = inet_db:register_socket(CliSocket, inet_tcp),169     case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of170     {ok, Opts} ->171         case prim_inet:setopts(CliSocket, Opts) of172         ok    -> ok;173         Error -> gen_tcp:close(CliSocket), Error174         end;175     Error ->176         gen_tcp:close(CliSocket), Error177     end.

下面是客户端处理输出的状态机:

  1 %% TCP Client Socket Handling FSM (tcp_echo_fsm.erl)  2 %% 客户端输出处理状态机,这里其实就是一个 echo_server 的客户端版本  3       4 -module(tcp_echo_fsm).  5 -author(‘saleyn@gmail.com‘).  6    7 %% 实现 gen_fsm 模式,事实上状态机应用场景没有 gen_server 多  8 %% 不过能用的场景都比较特殊,比如游戏客户端,服务端战斗模块  9 -behaviour(gen_fsm). 10   11 -export([start_link/0, set_socket/2]). 12   13 %% gen_fsm 回调函数 14 -export([init/1, handle_event/3, 15          handle_sync_event/4, handle_info/3, terminate/3, code_change/4]). 16   17 %% FSM States FSM 状态机的状态 18 -export([ 19     ‘WAIT_FOR_SOCKET‘/2, %% 等待socket 20     ‘WAIT_FOR_DATA‘/2    %% 等待socket数据 21 ]). 22   23 -record(state, { 24                 socket,    % client socket 25                 addr       % client address 26                }). 27   28 -define(TIMEOUT, 120000). 29   30 %%%------------------------------------------------------------------------ 31 %%% API 32 %%%------------------------------------------------------------------------ 33   34 %%------------------------------------------------------------------------- 35 %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} 36 %% @doc To be called by the supervisor in order to start the server. 37 %%      If init/1 fails with Reason, the function returns {error,Reason}. 38 %%      If init/1 returns {stop,Reason} or ignore, the process is 39 %%      terminated and the function returns {error,Reason} or ignore, 40 %%      respectively. 41 %% @end 42 %%------------------------------------------------------------------------- 43 start_link() -> 44     gen_fsm:start_link(?MODULE, [], []). 45   46 set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> 47     gen_fsm:send_event(Pid, {socket_ready, Socket}). 48   49 %%%------------------------------------------------------------------------ 50 %%% Callback functions from gen_server 51 %%%------------------------------------------------------------------------ 52   53 %%------------------------------------------------------------------------- 54 %% Func: init/1 55 %% Returns: {ok, StateName, StateData}          | 56 %%          {ok, StateName, StateData, Timeout} | 57 %%          ignore                              | 58 %%          {stop, StopReason} 59 %% @private 60 %%------------------------------------------------------------------------- 61 init([]) -> 62     process_flag(trap_exit, true), 63  64       %% 状态机启动之后的初始化状态 65     {ok, ‘WAIT_FOR_SOCKET‘, #state{}}. 66   67 %%------------------------------------------------------------------------- 68 %% Func: StateName/2 69 %% Returns: {next_state, NextStateName, NextStateData}          | 70 %%          {next_state, NextStateName, NextStateData, Timeout} | 71 %%          {stop, Reason, NewStateData} 72 %% @private 73 %%------------------------------------------------------------------------- 74  75 %% 创建客户端之后 set_socket 函数发送消息之后在这里被处理了 76 %% 大致逻辑是:收到通知,客户端连接socket到手,可以设置套接字选项并开始接收数据 77 ‘WAIT_FOR_SOCKET‘({socket_ready, Socket}, State) when is_port(Socket) -> 78     % Now we own the socket 79     inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), 80     {ok, {IP, _Port}} = inet:peername(Socket), 81  82     %% 确定了socket之后,状态机的下一个状态就是等着接收数据了 83     {next_state, ‘WAIT_FOR_DATA‘, State#state{socket=Socket, addr=IP}, ?TIMEOUT}; 84 ‘WAIT_FOR_SOCKET‘(Other, State) -> 85     error_logger:error_msg("State: ‘WAIT_FOR_SOCKET‘. Unexpected message: ~p\n", [Other]), 86     %% Allow to receive async messages 87     {next_state, ‘WAIT_FOR_SOCKET‘, State}. 88   89 %% 显示来自客户端的事件 90 ‘WAIT_FOR_DATA‘({data, Data}, #state{socket=S} = State) -> 91     ok = gen_tcp:send(S, Data), 92     {next_state, ‘WAIT_FOR_DATA‘, State, ?TIMEOUT}; 93   94 ‘WAIT_FOR_DATA‘(timeout, State) -> 95     error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]), 96     {stop, normal, State}; 97   98 ‘WAIT_FOR_DATA‘(Data, State) -> 99     io:format("~p Ignoring data: ~p\n", [self(), Data]),100     {next_state, ‘WAIT_FOR_DATA‘, State, ?TIMEOUT}.101  102 %%-------------------------------------------------------------------------103 %% Func: handle_event/3104 %% Returns: {next_state, NextStateName, NextStateData}          |105 %%          {next_state, NextStateName, NextStateData, Timeout} |106 %%          {stop, Reason, NewStateData}107 %% @private108 %%-------------------------------------------------------------------------109 handle_event(Event, StateName, StateData) ->110     {stop, {StateName, undefined_event, Event}, StateData}.111  112 %%-------------------------------------------------------------------------113 %% Func: handle_sync_event/4114 %% Returns: {next_state, NextStateName, NextStateData}            |115 %%          {next_state, NextStateName, NextStateData, Timeout}   |116 %%          {reply, Reply, NextStateName, NextStateData}          |117 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |118 %%          {stop, Reason, NewStateData}                          |119 %%          {stop, Reason, Reply, NewStateData}120 %% @private121 %%-------------------------------------------------------------------------122 handle_sync_event(Event, _From, StateName, StateData) ->123     {stop, {StateName, undefined_event, Event}, StateData}.124  125 %%-------------------------------------------------------------------------126 %% Func: handle_info/3127 %% Returns: {next_state, NextStateName, NextStateData}          |128 %%          {next_state, NextStateName, NextStateData, Timeout} |129 %%          {stop, Reason, NewStateData}130 %% @private131 %%-------------------------------------------------------------------------132 handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->133     % Flow control: enable forwarding of next TCP message134     inet:setopts(Socket, [{active, once}]),135     ?MODULE:StateName({data, Bin}, StateData);136  137 handle_info({tcp_closed, Socket}, _StateName,138             #state{socket=Socket, addr=Addr} = StateData) ->139     error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),140     {stop, normal, StateData};141  142 handle_info(_Info, StateName, StateData) ->143     {noreply, StateName, StateData}.144  145 %%-------------------------------------------------------------------------146 %% Func: terminate/3147 %% Purpose: Shutdown the fsm148 %% Returns: any149 %% @private150 %%-------------------------------------------------------------------------151 terminate(_Reason, _StateName, #state{socket=Socket}) ->152     (catch gen_tcp:close(Socket)),153     ok.154  155 %%-------------------------------------------------------------------------156 %% Func: code_change/4157 %% Purpose: Convert process state when code is changed158 %% Returns: {ok, NewState, NewStateData}159 %% @private160 %%-------------------------------------------------------------------------161 code_change(_OldVsn, StateName, StateData, _Extra) ->162     {ok, StateName, StateData}.

最后是app文件了:

 1 %% tcp_server.app 文件 2      3 {application, tcp_server, 4  [ 5   {description, "Demo TCP server"}, 6   {vsn, "1.0"}, 7   {id, "tcp_server"}, 8   {modules,      [tcp_listener, tcp_echo_fsm]}, 9   {registered,   [tcp_server_sup, tcp_listener]},10   {applications, [kernel, stdlib]},11   %%12   %% mod: 指定应用启动初始化的模块13   %%14   {mod, {tcp_server_app, []}},15   {env, []}16  ]17 }.

以上基本上都是个人查找资料过程的笔记,有理解错误的地方请评论指出,谢谢!

使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)