首页 > 代码库 > Erlang OTP编程初体验——gen_server和行为模式
Erlang OTP编程初体验——gen_server和行为模式
http://blog.sina.com.cn/s/blog_3fe961ae0101k4p6.html
行为模式其实非常类似于面向对象语言中的接口,至少笔者是这么理解的。OTP行为模式将一些反复出现的模式分成了两个部分,通用部分和具体应用相关的实现部分,这一过程其实就类似于面向对象编程中的抽象出接口的过程。本文给出一个OTP中最常见的行为模式的示例:通用服务器,即gen_server。
编写gen_server回调模块大致包括3相步骤:
(1) 确定回调模块的名称;
(2) 写接口函数(由客户端调用的);
(3) 在回调模块中实现gen_server的6个回调函数(由gen_server容器来调用的)。
下面给出一个《Erlang OTP并发编程实战》中的示例代码,代码通过实现gen_server的接口,进而实现一个简单的RPC服务,这个RPC服务可以允许客户端调用服务器端任意模块中导出的任意函数。并且提供了一个get_count的查询接口,用于查询当前服务器已经处理过的请求数量。另外还有start_link()和stop()用于停止服务器进程。
tr_server.erl |
%%%-------------------------------------------------------------------
%%% @author Martin & Eric
%%% [http://www.erlware.org]
%%% @copyright 2008-2010 Erlware
%%% @doc RPC over TCP server. This module defines a server process that
%%% listens for incoming TCP connections and allows the user to
%%% execute RPC commands via that TCP stream.
%%% @end
%%%-------------------------------------------------------------------
-module(tr_server).
-behaviour(gen_server).
%% API
-export([
start_link/1,
start_link/0,
get_count/0,
stop/0
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(DEFAULT_PORT, 1055).
-record(state, {port, lsock, request_count = 0}).
%%%===================================================================
%%% API
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc Starts the server.
%%
%% @spec start_link(Port::integer()) -> {ok, Pid}
%% where
%% Pid = pid()
%% @end
%%--------------------------------------------------------------------
start_link(Port) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Port], []).
%% @spec start_link() -> {ok, Pid}
%% @doc Calls `start_link(Port)‘ using the default port.
start_link() ->
start_link(?DEFAULT_PORT).
%%--------------------------------------------------------------------
%% @doc Fetches the number of requests made to this server.
%% @spec get_count() -> {ok, Count}
%% where
%% Count = integer()
%% @end
%%--------------------------------------------------------------------
get_count() ->
gen_server:call(?SERVER, get_count).
%%--------------------------------------------------------------------
%% @doc Stops the server.
%% @spec stop() -> ok
%% @end
%%--------------------------------------------------------------------
stop() ->
gen_server:cast(?SERVER, stop).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Port]) ->
{ok, LSock} = gen_tcp:listen(Port, [{active, true}]),
{ok, #state{port = Port, lsock = LSock}, 0}.
handle_call(get_count, _From, State) ->
{reply, {ok, State#state.request_count}, State}.
handle_cast(stop, State) ->
{stop, normal, State}.
handle_info({tcp, Socket, RawData}, State) ->
do_rpc(Socket, RawData),
RequestCount = State#state.request_count,
{noreply, State#state{request_count = RequestCount 1}};
handle_info(timeout, #state{lsock = LSock} = State) ->
{ok, _Sock} = gen_tcp:accept(LSock),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_rpc(Socket, RawData) ->
try
{M, F, A} = split_out_mfa(RawData),
Result = apply(M, F, A),
gen_tcp:send(Socket, io_lib:fwrite("~p~n", [Result]))
catch
_Class:Err ->
gen_tcp:send(Socket, io_lib:fwrite("~p~n", [Err]))
end.
split_out_mfa(RawData) ->
MFA = re:replace(RawData, "\r\n$", "", [{return, list}]),
{match, [M, F, A]} =
re:run(MFA,
"(.*):(.*)\s*\\((.*)\s*\\)\s*.\s*$",
[{capture, [1,2,3], list}, ungreedy]),
{list_to_atom(M), list_to_atom(F), args_to_terms(A)}.
args_to_terms(RawArgs) ->
{ok, Toks, _Line} = erl_scan:string("[" RawArgs "]. ", 1),
{ok, Args} = erl_parse:parse_term(Toks),
Args.
笔者在Linux环境下运行这个程序:
1> c(tr_server).
{ok,tr_server}
2> tr_server:start_link().
{ok,<0.39.0>}
3>
这里需要再启动一个Shell,输入:
root@controller:~# telnet 127.0.0.1 1055
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is ‘^]‘.
然后再回到Erlang控制台,输入:
3> tr_server:get_count().
{ok,0}
4> tr_server:stop().
ok
为什么要先用telnet连接一下1055端口呢?分析一下init([Port])这个函数的行为。init([Port])函数首先用标准库中的gen_tcp模块在指定的端口上建立一个TCP监听套接字:
{ok, LSock} = gen_tcp:listen(Port, [{active, true}]),
然后init([Port])函数返回一个三元组,包含原子ok,初始进程状态,以及数字0:
{ok, #state{port = Port, lsock = LSock}, 0}.
这个0表示超时值。将超时设置为零就是让gen_server容器在init/1结束后,立即触发一个超时,从而迫使进程在完成初始化之后第一时间处理超时消息(由handle_info/2完成)。在这里使用0的用意是唤醒服务器并执行一些指定的操作:等待创建的监听套接字上的连接。在没有接收到连接时,gen_server会一直阻塞在这里,因此如果此时发送tr_server:get_count()请求,将会得到一个timeout反馈:
** exception exit: {timeout,{gen_server,call,[tr_server,get_count]}}
tr_server所实现的RPC服务,理论上可以调用服务器端任意模块中导出的任意函数。比如,可以在telnet中输入:
init:stop().
返回:
ok
Connection closed by foreign host.
这是因为init:stop()关闭了运行着RPC服务器的整个Erlang节点。
最后,我们看看gen_server中的几个常用的回调函数是怎么定义的。打开gen_server源码(笔者的Windows系统上,这个文件位于C:\Program Files (x86)\erl5.8.5\lib\stdlib-1.17.5\src),文件的头部注释中,详细地介绍了各个接口所需要返回的参数格式,以及gen_server的执行流程。
%%% ---------------------------------------------------
%%%
%%% The idea behind THIS server is that the user module
%%% provides (different) functions to handle different
%%% kind of inputs.
%%% If the Parent process terminates the Module:terminate/2
%%% function is called.
%%%
%%% The user module should export:
%%%
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
%%% ignore
%%% {stop, Reason}
%%%
%%% handle_call(Msg, {From, Tag}, State)
%%%
%%% ==> {reply, Reply, State}
%%% {reply, Reply, State, Timeout}
%%% {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, Reply, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_cast(Msg, State)
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term terminate(State) is called
%%%
%%% handle_info(Info, State) Info is e.g. {‘EXIT‘, P, R}, {nodedown, N}, ...
%%%
%%% ==> {noreply, State}
%%% {noreply, State, Timeout}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
%%% always called when server terminates
%%%
%%% ==> ok
%%%
%%%
%%% The work flow (of the server) can be described as follows:
%%%
%%% User module Generic
%%% ----------- -------
%%% start -----> start
%%% init <----- .
%%%
%%% loop
%%% handle_call <----- .
%%% -----> reply
%%%
%%% handle_cast <----- .
%%%
%%% handle_info <----- .
%%%
%%% terminate <----- .
%%%
%%% -----> reply
%%%
%%%
%%% ---------------------------------------------------
参考资料:《Erlang OTP并发编程实战》
Erlang OTP编程初体验——gen_server和行为模式