首页 > 代码库 > Erlang点滴--当广播卡住时

Erlang点滴--当广播卡住时

很多时候游戏服务器总避免不了要向玩家广播一些消息,我所遇到的问题是假如这个广播操作由一个进程来对待广播玩家逐个进行的话,很容易让该进程卡住(尤其是在网络状况不好或者玩家数量太多的情况下)。

查了查大部分情况是卡在了fun prim_inet:send/3上,这其实是由于之前的广播直接调用了fun gen_tcp:send/2,而这其实是一个同步调用,一层一层剥代码就可以知道是怎么回事儿了。

首先这是fun gen_tcp:send/2的源码:

1 send(S, Packet) when is_port(S) ->2     case inet_db:lookup_socket(S) of3     {ok, Mod} ->4         Mod:send(S, Packet);5     Error ->6         Error7     end.

这里面会调用fun inet_tcp:send/2,所以再看看它的源码:

1 send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts).2 send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

最终事实上调用了fun prim_inet:send/3,这也就是最终卡住的那个函数,看看它的源码:

 1 send(S, Data, OptList) when is_port(S), is_list(OptList) -> 2     ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]), 3     try erlang:port_command(S, Data, OptList) of 4     false -> % Port busy and nosuspend option passed 5         ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []), 6         {error,busy}; 7     true -> 8         receive 9         {inet_reply,S,Status} ->10             ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),11             Status12         end13     catch14     error:_Error ->15         ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),16          {error,einval}17     end.

这里可以看出它在调用fun erlang:port_command/3之后会调用一个receive来接收inet_reply消息。当然这里可以看出,假如这里调用fun erlang:port_command/3时传入了nosuspend参数,当端口忙时就不会再调用receive而卡住了。似乎这样子就可以解决问题了,但最后发现还是有可能卡住,尽管几率小了许多。再卡住那只可能是fun erlang:port_command/3了,这是它的源码:

1 port_command(Port, Data, Flags) ->2     case case erts_internal:port_command(Port, Data, Flags) of3          Ref when erlang:is_reference(Ref) -> receive {Ref, Res} -> Res end;4          Res -> Res5      end of6     Bool when Bool == true; Bool == false -> Bool;7     Error -> erlang:error(Error, [Port, Data, Flags])8     end.

不难看出原来这里也有一个同步调用,亦即当调用了fun erts_internal:port_command/3之后返回的结果是一个ref时就是调用receive。那好吧,那么fun erts_internal:port_command/3还会不会卡住呢?这玩意儿就只有C语言实现了:

 

 1 BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3) 2 { 3     BIF_RETTYPE res; 4     Port *prt; 5     int flags = 0; 6     Eterm ref; 7  8     if (is_not_nil(BIF_ARG_3)) { 9     Eterm l = BIF_ARG_3;10     while (is_list(l)) {11         Eterm* cons = list_val(l);12         Eterm car = CAR(cons);13         if (car == am_force)14         flags |= ERTS_PORT_SIG_FLG_FORCE;15         else if (car == am_nosuspend)16         flags |= ERTS_PORT_SIG_FLG_NOSUSPEND;17         else18         BIF_RET(am_badarg);19         l = CDR(cons);20     }21     if (!is_nil(l))22         BIF_RET(am_badarg);23     }24 25     prt = sig_lookup_port(BIF_P, BIF_ARG_1);26     if (!prt)27     BIF_RET(am_badarg);28 29     if (flags & ERTS_PORT_SIG_FLG_FORCE) {30     if (!(prt->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY))31         BIF_RET(am_notsup);32     }33 34 #ifdef DEBUG35     ref = NIL;36 #endif37 38     switch (erts_port_output(BIF_P, flags, prt, prt->common.id, BIF_ARG_2, &ref)) {39     case ERTS_PORT_OP_CALLER_EXIT:40     case ERTS_PORT_OP_BADARG:41     case ERTS_PORT_OP_DROPPED:42      ERTS_BIF_PREP_RET(res, am_badarg);43     break;44     case ERTS_PORT_OP_BUSY:45     ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));46     if (flags & ERTS_PORT_SIG_FLG_NOSUSPEND)47         ERTS_BIF_PREP_RET(res, am_false);48     else {49         erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, prt);50         ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_erts_internal_port_command_3],51                  BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);52     }53     break;54     case ERTS_PORT_OP_BUSY_SCHEDULED:55     ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));56     /* Fall through... */57     case ERTS_PORT_OP_SCHEDULED:58     ASSERT(is_internal_ref(ref));59     ERTS_BIF_PREP_RET(res, ref);60     break;61     case ERTS_PORT_OP_DONE:62     ERTS_BIF_PREP_RET(res, am_true);63     break;64     default:65     ERTS_INTERNAL_ERROR("Unexpected erts_port_output() result");66     break;67     }68 69     if (ERTS_PROC_IS_EXITING(BIF_P)) {70     KILL_CATCHES(BIF_P);    /* Must exit */71     ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR);72     }73 74     return res;75 }

从这段代码可以看出,如果调用fun erts_internal:port_command/3时传入了nosuspend参数的话,这个函数是不会再被卡住的了。

回过头来再说游戏里的广播消息,这东西很多时候并不需要太多的可靠性保证,万一没发给玩家那就没发嘛,这总比把我的进程卡死要好啊。因此在广播消息的时候可以考虑将fun gen_tcp:send/2改成最底层的fun erts_internal:port_command/3并且传入nosuspend参数,这样就可以保证广播的时候畅通无阻了。不过值得注意的是fun erts_internal:port_command/3的调用会造成进程可能会收到{inet_reply,S,Status}{Ref, Res}这两类消息,需要对其作适当的处理。

Erlang点滴--当广播卡住时