首页 > 代码库 > 使用OTP原理构建一个非阻塞的TCP服务器(转)
使用OTP原理构建一个非阻塞的TCP服务器(转)
经测试可用!
原文地址:http://www.iucai.com/?paged=8
Erlang OTP设计原理已经被shiningray兄翻译透了。请参见。http://erlang.shiningray.cn/otp-design-principles/index.html
这里翻译了一篇余锋老大和lzy.je老大推荐的文章,闲话不说,奉上。
使用OTP原理构建一个非阻塞的TCP服务器
原文网址:(打不开的同学请自觉FQ)
http://www.trapexit.org.nyud.net:8080/Building_a_Non-blocking_TCP_server_using_OTP_principles
Author:
Serge Aleynikov <saleyn at gmail.com>
Tranlator:
David < GodwitNow@Gmail.com >
Overview :
该文的读者需要熟悉gen_server和gen_fsm行为,使用gen_tcp模块进行TCP socket通信,主动和被动的socket模式,以及OTP监视原理。
OTP为构建可靠的应用提供了一个方便的框架。该框架的实现,部分是通过抽象共同的功能到一个可服用的行为集合里面,例如链接到OTP监视层的gen_server和gen_fsm。
有几个比较著名的TCP服务器设计。这里我们要讨论的包括,一个进程监听一个客户端连接并且为每个连接的客户端产生一个FSM进程。如今在OTP里面可以通过gen_tcp模块来支持TCP链接,但是,没有标准的使用OTP准则的行为来构建非阻塞的TCP服务器。关于非阻塞(non-blocking),我们是指监听进程和处理客户端的FSM(有限状态机)不能被阻塞,时刻准备对到来的控制消息(例如更改系统配置,重启请求,等等)做出反应而不会引起超时。注意在Erlang中阻塞(blocking)意思是阻塞一个Erlang进程而不是虚拟机的操作系统进程。
本教程我们会展示如何使用gen_server行为来构建一个非阻塞的TCP服务器,用gen_fsm行为来提供流控制,并且完全符合OTP应用的设计理念。
对于OTP框架不熟悉的读者,请阅读Joe Armstrong的教程:how to build A Fault-tolerant Serverusing blocking gen_tcp:connect/3 and gen_tcp:accept/1 calls without involving OTP。
本教程灵感来自于Erlang问题列表的几个主题,这个几个主题涉及到构建一个非阻塞异步TCP服务器的方法。
Server Design
我们服务器的设计会包含:主程序的监视进程tcp_server_app,该监视进程带有one_for_one重启策略,以及两个子规范。第一个是用gen_server行为实现的监听进程,该进程会等待来在客户端socket连接的异步通知。第二个是另一个监视进程tcp_client_sup,负责启动处理客户端的FSM(有限状态机)以及通过标准的SASL错误报告来记录异常连接。
为了简化该教程,处理客户端的有限状态机(tcp_echo_fsm)会实现一个echo server,该server会响应客户端的请求给客户端。
+----------------+
| tcp_server_app |
+--------+-------+
| (one_for_one)
+----------------+---------+
| |
+-------+------+ +-------+--------+
| tcp_listener | + tcp_client_sup |
+--------------+ +-------+--------+
| (simple_one_for_one)
+-----|---------+
+-------|--------+|
+--------+-------+|+
| tcp_echo_fsm |+
+----------------+
Application and Supervisor behaviours
为了建立一个OTP应用程序,我们需要构建实现应用的模块,以及监视程序的回调函数。传统的这些功能都是在不同的模块里面实现的,考虑到简洁性,我们将它们组合在一个模块里面。
作为一个额外的功能,我们实现一个get_app_env函数,该函数描述了如何处理配置选项以及虚拟机启动时的命令行选项。
两个init/1函数实例用于检视层次的两层。因为针对每个监视树,我们需要两个不同的重启策略,因此,我们在不同的层里实现它们。
在应用程序启动的时候,回调函数tcp_server_app:start/2 调用 supervisor:start_link/2 ,而该函数会通过调用tcp_server_app:init( [Port, Module] )回调函数来创建应用程序的监视进程。这个监视进程创建一个tcp_listener进程以及一个子监视进程tcp_client_sup,该进程用于创建客户端链接进程。Init函数里面的模块参数正是处理客户端连接的有限状态机的名字(本例中为tcp_echo_fsm)。
TCP Server Application (tcp_server_app.erl) |
-module(tcp_server_app). -author(‘saleyn@gmail.com’). -behaviour(application). %% Internal API -export([start_client/0]). %% Application and Supervisor callbacks -export([start/2, stop/1, init/1]). -define(MAX_RESTART, 5). -define(MAX_TIME, 60). -define(DEF_PORT, 2222). %% A startup function for spawning new client connection handling FSM. %% To be called by the TCP listener process. start_client() -> supervisor:start_child(tcp_client_sup, []). %%———————————————————————- %% Application behaviour callbacks %%———————————————————————- start(_Type, _Args) -> ListenPort = get_app_env(listen_port, ?DEF_PORT), supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]). stop(_S) -> ok. %%———————————————————————- %% Supervisor behaviour callbacks %%———————————————————————- init([Port, Module]) -> {ok, {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Listener { tcp_server_sup, % Id = internal id {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A} permanent, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [tcp_listener] % Modules = [Module] | dynamic }, % Client instance supervisor { tcp_client_sup, {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, permanent, % Restart = permanent | transient | temporary infinity, % Shutdown = brutal_kill | int() >= 0 | infinity supervisor, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } };
init([Module]) -> {ok, {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME}, [ % TCP Client { undefined, % Id = internal id {Module,start_link,[]}, % StartFun = {M, F, A} temporary, % Restart = permanent | transient | temporary 2000, % Shutdown = brutal_kill | int() >= 0 | infinity worker, % Type = worker | supervisor [] % Modules = [Module] | dynamic } ] } }.
%%———————————————————————- %% Internal functions %%———————————————————————- get_app_env(Opt, Default) -> case application:get_env(application:get_application(), Opt) of {ok, Val} -> Val; _ -> case init:get_argument(Opt) of [[Val | _]] -> Val; error -> Default end end. |
Listener Process
Gen_tcp模块的一个缺点就是它只向一个阻塞的accept调用提供接口。这导致大部分的开发者在实现一个TCP Server的时候,构建一个自定义进程,连接到一个使用proc_lib的监视器或者提供一些专有的设计。
检查prim_inet模块揭露了一个有趣的事实,实际的调用inet驱动来accept一个客户端socket是异步的。然而,这是一个未写入文档的性能,这意味着OTP小组可以自由更改这些实现。我们会在构建我们的服务器的时候来利用这个功能。
监听进程是用gen_server行为来实现的。
TCP Listener Process (tcp_listener.erl) |
-module(tcp_listener). -author(‘saleyn@gmail.com’). -behaviour(gen_server). %% External API -export([start_link/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { listener, % Listening socket acceptor, % Asynchronous acceptor’s internal reference module % FSM handling module }). %%——————————————————————– %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason} % %% @doc Called by a supervisor to start the listening process. %% @end %%———————————————————————- start_link(Port, Module) when is_integer(Port), is_atom(Module) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
%%%———————————————————————— %%% Callback functions from gen_server %%%————————————————————————
%%———————————————————————- %% @spec (Port::integer()) -> {ok, State} | %% {ok, State, Timeout} | %% ignore | %% {stop, Reason} %% %% @doc Called by gen_server framework at process startup. %% Create listening socket. %% @end %%———————————————————————- init([Port, Module]) -> process_flag(trap_exit, true), Opts = [binary, {packet, 2}, {reuseaddr, true}, {keepalive, true}, {backlog, 30}, {active, false}], case gen_tcp:listen(Port, Opts) of {ok, Listen_socket} -> %%Create first accepting process {ok, Ref} = prim_inet:async_accept(Listen_socket, -1), {ok, #state{listener = Listen_socket, acceptor = Ref, module = Module}}; {error, Reason} -> {stop, Reason} end.
%%————————————————————————- %% @spec (Request, From, State) -> {reply, Reply, State} | %% {reply, Reply, State, Timeout} | %% {noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, Reply, State} | %% {stop, Reason, State} %% @doc Callback for synchronous server calls. If `{stop, …}’ tuple %% is returned, the server is stopped and `terminate/2′ is called. %% @end %% @private %%————————————————————————- handle_call(Request, _From, State) -> {stop, {unknown_call, Request}, State}.
%%————————————————————————- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for asyncrous server calls. If `{stop, …}’ tuple %% is returned, the server is stopped and `terminate/2′ is called. %% @end %% @private %%————————————————————————- handle_cast(_Msg, State) -> {noreply, State}.
%%————————————————————————- %% @spec (Msg, State) ->{noreply, State} | %% {noreply, State, Timeout} | %% {stop, Reason, State} %% @doc Callback for messages sent directly to server’s mailbox. %% If `{stop, …}’ tuple is returned, the server is stopped and %% `terminate/2′ is called. %% @end %% @private %%————————————————————————- handle_info({inet_async, ListSock, Ref, {ok, CliSocket}}, #state{listener=ListSock, acceptor=Ref, module=Module} = State) -> try case set_sockopt(ListSock, CliSocket) of ok -> ok; {error, Reason} -> exit({set_sockopt, Reason}) end,
%% New client connected – spawn a new process using the simple_one_for_one %% supervisor. {ok, Pid} = tcp_server_app:start_client(), gen_tcp:controlling_process(CliSocket, Pid), %% Instruct the new FSM that it owns the socket. Module:set_socket(Pid, CliSocket),
%% Signal the network driver that we are ready to accept another connection case prim_inet:async_accept(ListSock, -1) of {ok, NewRef} -> ok; {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)}) end,
{noreply, State#state{acceptor=NewRef}} catch exit:Why -> error_logger:error_msg(“Error in async accept: ~p.\n”, [Why]), {stop, Why, State} end;
handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) -> error_logger:error_msg(“Error in socket acceptor: ~p.\n”, [Error]), {stop, Error, State};
handle_info(_Info, State) -> {noreply, State}.
%%————————————————————————- %% @spec (Reason, State) -> any %% @doc Callback executed on server shutdown. It is only invoked if %% `process_flag(trap_exit, true)’ is set by the server process. %% The return value is ignored. %% @end %% @private %%————————————————————————- terminate(_Reason, State) -> gen_tcp:close(State#state.listener), ok.
%%————————————————————————- %% @spec (OldVsn, State, Extra) -> {ok, NewState} %% @doc Convert process state when code is changed. %% @end %% @private %%————————————————————————- code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%%———————————————————————— %%% Internal functions %%%————————————————————————
%% Taken from prim_inet. We are merely copying some socket options from the %% listening socket to the new client socket. set_sockopt(ListSock, CliSocket) -> true = inet_db:register_socket(CliSocket, inet_tcp), case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of {ok, Opts} -> case prim_inet:setopts(CliSocket, Opts) of ok -> ok; Error -> gen_tcp:close(CliSocket), Error end; Error -> gen_tcp:close(CliSocket), Error end. |
在该模块里面,init/1函数调用两个参数-TCP Listener启动的时候所需要的端口号和处理客户端连接的协议模块名称。初始化函数打开了使用被动模式({active, false})打开了一个监听socket。这样做,我们就可以对从已连接socket上接收到的数据进行流控制,这个已连接的socket会从监听的socket继承这个模式选项。
这段代码最有趣的部分是prim_inet:async_accept/2调用以及异步消息进行处理的inet_async。为了让其工作我们还需要复制一些OTP内部代码,这些代码封装在set_sockopt/2函数中,用来处理用inet数据库来注册socket,以及复制些设置选项到客户端socket。
一旦一个客户端socket连接上,inet driver会使用{inet_async, ListSock, Ref, {ok, CliSocket}}消息来通知listening进程。此时,我们会实例化一个客户端socket处理进程,并且设置它的CliSocket的所有者。
Client Socket Handling Process
Tcp listener是一个通用的实现,而tcp_echo_fsm则仅仅是一个简单的有限状态机,用来描述如何写TCP 服务器。这个模块需要输出两个函数 – 一个用于tcp_client_sup监视器的start_link/0 以及一个用于监听进程的set_socket/2,该函数用来通知处理客户端连接的有限状态机进程,此刻它是socket的所有者,并且通过设置{active, once} 或 {active, true}选项能够开始接收消息。
我们想强调在listening进程和处理客户端连接的有限状态机之间使用的异步模式,来避免可能的消息丢失,因为将来自socket的消息派发给错误的listening进程。拥有listening socket的进程通过{active, false}来打开socket。在接收完客户端的socket之后,该socket从listener继承它的socket选项(包括{active, false}),通过调用gen_tcp:controlling_process/2将socket的所有权移交给新产生的处理客户端连接的有限状态机,以及通过调用Module:set_socket/2来通知有限状态机可以从socket接收消息了。被TCP发送的数据会一直停留在socket的buffer里,直到有限状态机进程通过调用inet:setopts(Socket, [{active, once}])设置socket为主动模式来启动消息传递。
当socket的所有权被移交给处于‘WAIT_FOR_SOCKET‘状态的FSM后,FSM设置{active, once}选项来让inet driver一次发送给它一个TCP消息。这就是OTP方式来保持流控制,避免进程消息队列被TCP数据淹没,以及防止一个快生产者慢消费者情况引起系统崩溃。
FSM的状态被在tcp_echo_fsm模块里面的特殊函数来实现,该模块对被包在单引号里面大些状态名进行转换。FSM有两个状态组成,‘WAIT_FOR_SOCKET‘是初始状态,此时FSM正在等待被分配socket的所有权;‘WAIT_FOR_DATA‘是代表等待来自客户端TCP消息的状态。在这个状态,FSM还会处理一个特殊的超时消息,用来表示没有来自客户端的活动,引起该进程停止,并且关闭客户端连接。
TCP Client Socket Handling FSM (tcp_echo_fsm.erl) |
-module(tcp_echo_fsm). -author(‘saleyn@gmail.com’).
-behaviour(gen_fsm).
-export([start_link/0, set_socket/2]).
%% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
%% FSM States -export([ ‘WAIT_FOR_SOCKET‘/2, ‘WAIT_FOR_DATA‘/2 ]).
-record(state, { socket, % client socket addr % client address }).
-define(TIMEOUT, 120000).
%%%———————————————————————— %%% API %%%————————————————————————
%%————————————————————————- %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error} %% @doc To be called by the supervisor in order to start the server. %% If init/1 fails with Reason, the function returns {error,Reason}. %% If init/1 returns {stop,Reason} or ignore, the process is %% terminated and the function returns {error,Reason} or ignore, %% respectively. %% @end %%————————————————————————- start_link() -> gen_fsm:start_link(?MODULE, [], []).
set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) -> gen_fsm:send_event(Pid, {socket_ready, Socket}).
%%%———————————————————————— %%% Callback functions from gen_server %%%————————————————————————
%%————————————————————————- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %% @private %%————————————————————————- init([]) -> process_flag(trap_exit, true), {ok, ‘WAIT_FOR_SOCKET’, #state{}}.
%%————————————————————————- %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%————————————————————————- ‘WAIT_FOR_SOCKET’({socket_ready, Socket}, State) when is_port(Socket) -> % Now we own the socket inet:setopts(Socket, [{active, once}, {packet, 2}, binary]), {ok, {IP, _Port}} = inet:peername(Socket), {next_state, ‘WAIT_FOR_DATA’, State#state{socket=Socket, addr=IP}, ?TIMEOUT}; ‘WAIT_FOR_SOCKET’(Other, State) -> error_logger:error_msg(“State: ‘WAIT_FOR_SOCKET’. Unexpected message: ~p\n”, [Other]), %% Allow to receive async messages {next_state, ‘WAIT_FOR_SOCKET’, State}.
%% Notification event coming from client ‘WAIT_FOR_DATA’({data, Data}, #state{socket=S} = State) -> ok = gen_tcp:send(S, Data), {next_state, ‘WAIT_FOR_DATA’, State, ?TIMEOUT};
‘WAIT_FOR_DATA’(timeout, State) -> error_logger:error_msg(“~p Client connection timeout – closing.\n”, [self()]), {stop, normal, State};
‘WAIT_FOR_DATA’(Data, State) -> io:format(“~p Ignoring data: ~p\n”, [self(), Data]), {next_state, ‘WAIT_FOR_DATA’, State, ?TIMEOUT}.
%%————————————————————————- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%————————————————————————- handle_event(Event, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}.
%%————————————————————————- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %% @private %%————————————————————————- handle_sync_event(Event, _From, StateName, StateData) -> {stop, {StateName, undefined_event, Event}, StateData}.
%%————————————————————————- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %% @private %%————————————————————————- handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) -> % Flow control: enable forwarding of next TCP message inet:setopts(Socket, [{active, once}]), ?MODULE:StateName({data, Bin}, StateData);
handle_info({tcp_closed, Socket}, _StateName, #state{socket=Socket, addr=Addr} = StateData) -> error_logger:info_msg(“~p Client ~p disconnected.\n”, [self(), Addr]), {stop, normal, StateData};
handle_info(_Info, StateName, StateData) -> {noreply, StateName, StateData}.
%%————————————————————————- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %% @private %%————————————————————————- terminate(_Reason, _StateName, #state{socket=Socket}) -> (catch gen_tcp:close(Socket)), ok.
%%————————————————————————- %% Func: code_change/4 %% Purpose: Convert process state when code is changed %% Returns: {ok, NewState, NewStateData} %% @private %%————————————————————————- code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. |
Application File
构建一个OTP应用的另一个必须的部分是创建一个包含应用程序名字,版本号,启动模块和环境的应用程序文件。
Application File (tcp_server.app) |
{application, tcp_server, [ {description, "Demo TCP server"}, {vsn, "1.0"}, {id, "tcp_server"}, {modules, [tcp_listener, tcp_echo_fsm]}, {registered, [tcp_server_sup, tcp_listener]}, {applications, [kernel, stdlib]}, %% %% mod: Specify the module name to start the application, plus args %% {mod, {tcp_server_app, []}}, {env, []} ] }. |
Compiling
为该应用程序创建如下的目录结构
Create the following directory structure for this application:
./tcp_server
./tcp_server/ebin/
./tcp_server/ebin/tcp_server.app
./tcp_server/src/tcp_server_app.erl
./tcp_server/src/tcp_listener.erl
./tcp_server/src/tcp_echo_fsm.erl
$ cd tcp_server/src
$ for f in tcp*.erl ; do erlc +debug_info -o ../ebin $f
Running
我们会带有SASL支持来启动Erlang Shell。这样我们就能查看所有的进程和我们TCP应用的错误报告。同样,我们还会启动appmon应用,来可视化的检查监视层级。
$ cd ../ebin
$ erl -boot start_sasl
...
1> appmon:start().
{ok,<0.44.0>}
2> application:start(tcp_server).
ok
现在点击在appmon窗口里面的tcp_server的按钮,以显示该tcp_server应用程序的监视层级。
3> {ok,S} = gen_tcp:connect({127,0,0,1},2222,[{packet,2}]).
{ok,#Port<0.150>}
上一步初始化一个新的客户端连接到echo server。
4> gen_tcp:send(S,<<"hello">>).
ok
5> f(M), receive M -> M end.
{tcp,#Port<0.150>,"hello"}
我们已证实echo server如我们所期望的工作。现在让我们尝试crash server上的客户端连接,然后发现监视器程序会在屏幕上产生一个错误报告。
6> [{_,Pid,_,_}] = supervisor:which_children(tcp_client_sup).
[{undefined,<0.64.0>,worker,[]}]
7> exit(Pid,kill).
true
=SUPERVISOR REPORT==== 31-Jul-2007::14:33:49 ===
Supervisor: {local,tcp_client_sup}
Context: child_terminated
Reason: killed
Offender: [{pid,<0.77.0>},
{name,undefined},
{mfa,{tcp_echo_fsm,start_link,[]}},
{restart_type,temporary},
{shutdown,2000},
{child_type,worker}]
注意,如果你建立很多连接来对该server进行压力测试,在打开的文件描述符达到系统设置的极限值之后,listener进程可能会在接收新的连接的时候失败。这种情况下你会看到如下错误:
"too many open files"
如果你是运行在Linux/UNIX上,google一个解决方案吧(最终归结为通过设置“ulimit -n …”选项来增加每个进程的限制)。
Conclusion
OTP提供了构建非阻塞的TCP服务器所需要的东西。该教程展示了如何使用标准OTP行为创建一个带有流控制的简单的TCPserver。作为一个练习,读者应该尝试将通用的非阻塞TCP服务器功能抽象为一个独立的行为。
使用OTP原理构建一个非阻塞的TCP服务器(转)