首页 > 代码库 > Erlang OTP编程初体验——gen_server和行为模式

Erlang OTP编程初体验——gen_server和行为模式

http://blog.sina.com.cn/s/blog_3fe961ae0101k4p6.html

行为模式其实非常类似于面向对象语言中的接口,至少笔者是这么理解的。OTP行为模式将一些反复出现的模式分成了两个部分,通用部分和具体应用相关的实现部分,这一过程其实就类似于面向对象编程中的抽象出接口的过程。本文给出一个OTP中最常见的行为模式的示例:通用服务器,即gen_server。

编写gen_server回调模块大致包括3相步骤:

(1) 确定回调模块的名称;

(2) 写接口函数(由客户端调用的);

(3) 在回调模块中实现gen_server的6个回调函数(由gen_server容器来调用的)。

 

下面给出一个《Erlang OTP并发编程实战》中的示例代码,代码通过实现gen_server的接口,进而实现一个简单的RPC服务,这个RPC服务可以允许客户端调用服务器端任意模块中导出的任意函数。并且提供了一个get_count的查询接口,用于查询当前服务器已经处理过的请求数量。另外还有start_link()和stop()用于停止服务器进程。

 

tr_server.erl

%%%------------------------------------------------------------------- 

%%% @author Martin & Eric  

%%%  [http://www.erlware.org] 

%%% @copyright 2008-2010 Erlware 

%%% @doc RPC over TCP server. This module defines a server process that 

%%%      listens for incoming TCP connections and allows the user to 

%%%      execute RPC commands via that TCP stream. 

%%% @end 

%%%------------------------------------------------------------------- 

 

-module(tr_server). 

 

-behaviour(gen_server). 

 

 

%% API 

-export([ 

         start_link/1, 

         start_link/0, 

         get_count/0, 

         stop/0 

         ]). 

 

%% gen_server callbacks 

-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 

         terminate/2, code_change/3]). 

 

-define(SERVER, ?MODULE). 

-define(DEFAULT_PORT, 1055). 

 

-record(state, {port, lsock, request_count = 0}). 

 

 

%%%=================================================================== 

%%% API 

%%%=================================================================== 

 

 

%%-------------------------------------------------------------------- 

%% @doc Starts the server. 

%% 

%% @spec start_link(Port::integer()) -> {ok, Pid} 

%% where 

%%  Pid = pid() 

%% @end 

%%-------------------------------------------------------------------- 

start_link(Port) -> 

    gen_server:start_link({local, ?SERVER}, ?MODULE, [Port], []). 

 

%% @spec start_link() -> {ok, Pid} 

%% @doc Calls `start_link(Port)‘ using the default port. 

start_link() -> 

    start_link(?DEFAULT_PORT). 

 

%%-------------------------------------------------------------------- 

%% @doc Fetches the number of requests made to this server. 

%% @spec get_count() -> {ok, Count} 

%% where 

%%  Count = integer() 

%% @end 

%%-------------------------------------------------------------------- 

get_count() -> 

    gen_server:call(?SERVER, get_count). 

 

%%-------------------------------------------------------------------- 

%% @doc Stops the server. 

%% @spec stop() -> ok 

%% @end 

%%-------------------------------------------------------------------- 

stop() -> 

    gen_server:cast(?SERVER, stop). 

 

 

%%%=================================================================== 

%%% gen_server callbacks 

%%%=================================================================== 

 

init([Port]) -> 

    {ok, LSock} = gen_tcp:listen(Port, [{active, true}]), 

    {ok, #state{port = Port, lsock = LSock}, 0}. 

 

handle_call(get_count, _From, State) -> 

    {reply, {ok, State#state.request_count}, State}. 

 

handle_cast(stop, State) -> 

    {stop, normal, State}. 

 

handle_info({tcp, Socket, RawData}, State) -> 

    do_rpc(Socket, RawData), 

    RequestCount = State#state.request_count, 

    {noreply, State#state{request_count = RequestCount   1}}; 

 

handle_info(timeout, #state{lsock = LSock} = State) -> 

    {ok, _Sock} = gen_tcp:accept(LSock), 

    {noreply, State}. 

 

terminate(_Reason, _State) -> 

    ok. 

 

code_change(_OldVsn, State, _Extra) -> 

    {ok, State}. 

 

%%%=================================================================== 

%%% Internal functions 

%%%=================================================================== 

 

do_rpc(Socket, RawData) -> 

    try 

        {M, F, A} = split_out_mfa(RawData), 

        Result = apply(M, F, A), 

        gen_tcp:send(Socket, io_lib:fwrite("~p~n", [Result])) 

    catch 

        _Class:Err -> 

            gen_tcp:send(Socket, io_lib:fwrite("~p~n", [Err])) 

    end. 

 

split_out_mfa(RawData) -> 

    MFA = re:replace(RawData, "\r\n$", "", [{return, list}]), 

    {match, [M, F, A]} = 

        re:run(MFA, 

               "(.*):(.*)\s*\\((.*)\s*\\)\s*.\s*$", 

                   [{capture, [1,2,3], list}, ungreedy]), 

    {list_to_atom(M), list_to_atom(F), args_to_terms(A)}. 

 

args_to_terms(RawArgs) -> 

    {ok, Toks, _Line} = erl_scan:string("["   RawArgs   "]. ", 1), 

    {ok, Args} = erl_parse:parse_term(Toks), 

    Args. 

笔者在Linux环境下运行这个程序:

1> c(tr_server).

{ok,tr_server}

2> tr_server:start_link().

{ok,<0.39.0>}

3>

这里需要再启动一个Shell,输入:

root@controller:~# telnet 127.0.0.1 1055

Trying 127.0.0.1...

Connected to 127.0.0.1.

Escape character is ‘^]‘.

然后再回到Erlang控制台,输入:

3> tr_server:get_count().

{ok,0}

4> tr_server:stop().

ok

 

为什么要先用telnet连接一下1055端口呢?分析一下init([Port])这个函数的行为。init([Port])函数首先用标准库中的gen_tcp模块在指定的端口上建立一个TCP监听套接字:

{ok, LSock} = gen_tcp:listen(Port, [{active, true}]), 

然后init([Port])函数返回一个三元组,包含原子ok,初始进程状态,以及数字0:

    {ok, #state{port = Port, lsock = LSock}, 0}. 

这个0表示超时值。将超时设置为零就是让gen_server容器在init/1结束后,立即触发一个超时,从而迫使进程在完成初始化之后第一时间处理超时消息(由handle_info/2完成)。在这里使用0的用意是唤醒服务器并执行一些指定的操作:等待创建的监听套接字上的连接。在没有接收到连接时,gen_server会一直阻塞在这里,因此如果此时发送tr_server:get_count()请求,将会得到一个timeout反馈:

** exception exit: {timeout,{gen_server,call,[tr_server,get_count]}}

tr_server所实现的RPC服务,理论上可以调用服务器端任意模块中导出的任意函数。比如,可以在telnet中输入:

init:stop().

返回:

ok

Connection closed by foreign host.

这是因为init:stop()关闭了运行着RPC服务器的整个Erlang节点。

 

最后,我们看看gen_server中的几个常用的回调函数是怎么定义的。打开gen_server源码(笔者的Windows系统上,这个文件位于C:\Program Files (x86)\erl5.8.5\lib\stdlib-1.17.5\src),文件的头部注释中,详细地介绍了各个接口所需要返回的参数格式,以及gen_server的执行流程。

%%% --------------------------------------------------- 

%%% 

%%% The idea behind THIS server is that the user module 

%%% provides (different) functions to handle different 

%%% kind of inputs.  

%%% If the Parent process terminates the Module:terminate/2 

%%% function is called. 

%%% 

%%% The user module should export: 

%%% 

%%%   init(Args)   

%%%     ==> {ok, State} 

%%%         {ok, State, Timeout} 

%%%         ignore 

%%%         {stop, Reason} 

%%% 

%%%   handle_call(Msg, {From, Tag}, State) 

%%% 

%%%    ==> {reply, Reply, State} 

%%%        {reply, Reply, State, Timeout} 

%%%        {noreply, State} 

%%%        {noreply, State, Timeout} 

%%%        {stop, Reason, Reply, State}   

%%%              Reason = normal | shutdown | Term terminate(State) is called 

%%% 

%%%   handle_cast(Msg, State) 

%%% 

%%%    ==> {noreply, State} 

%%%        {noreply, State, Timeout} 

%%%        {stop, Reason, State}  

%%%              Reason = normal | shutdown | Term terminate(State) is called 

%%% 

%%%   handle_info(Info, State) Info is e.g. {‘EXIT‘, P, R}, {nodedown, N}, ... 

%%% 

%%%    ==> {noreply, State} 

%%%        {noreply, State, Timeout} 

%%%        {stop, Reason, State}  

%%%              Reason = normal | shutdown | Term, terminate(State) is called 

%%% 

%%%   terminate(Reason, State) Let the user module clean up 

%%%        always called when server terminates 

%%% 

%%%    ==> ok 

%%% 

%%% 

%%% The work flow (of the server) can be described as follows: 

%%% 

%%%   User module                          Generic 

%%%   -----------                          ------- 

%%%     start            ----->             start 

%%%     init             <-----              . 

%%% 

%%%                                         loop 

%%%     handle_call      <-----              . 

%%%                      ----->             reply 

%%% 

%%%     handle_cast      <-----              . 

%%% 

%%%     handle_info      <-----              . 

%%% 

%%%     terminate        <-----              . 

%%% 

%%%                      ----->             reply 

%%% 

%%% 

%%% --------------------------------------------------- 

参考资料:《Erlang OTP并发编程实战》

Erlang OTP编程初体验——gen_server和行为模式