首页 > 代码库 > Riak Core Guide

Riak Core Guide

Learn Riak Core Step By Step

riak core 是 riak的主要组成部分,主要负责分布式的部分,虽然官方有自己的存储后端,但是我们也可以使用其他的后端存储。


Partitioning & Distributing Work

riak core 在每个节点上都是使用master/worker配置,这样作为一个工作单元来执行,riak core的worker为vnode worker, 在每个节点上由riak_core_sup生成,vnode worker对应的模块为riak_core_vnode

如:

节点1:

Node: mfmn2@127.0.0.1, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
 {current_function,{gen_server,loop,6}},
 {initial_call,{proc_lib,init_p,5}},
 {status,waiting},
 {message_queue_len,0},
 {messages,[]},
 {links,[<0.148.0>,<0.152.0>,<0.154.0>,<0.155.0>,<0.153.0>,<0.150.0>,
         <0.151.0>,<0.149.0>,<0.140.0>,<0.144.0>,<0.146.0>,<0.147.0>,
         <0.145.0>,<0.142.0>,<0.143.0>,<0.141.0>,<0.136.0>,<0.138.0>,
         <0.139.0>,<0.137.0>,<0.77.0>,<0.135.0>]},
 {dictionary,[{‘$ancestors,[riak_core_sup,<0.76.0>]},
              {‘$initial_call,{supervisor_pre_r14b04,init,1}}]},
 {trap_exit,true},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.75.0>},
 {total_heap_size,3571},
 {heap_size,2584},
 {stack_size,9},
 {reductions,4359},
 {garbage_collection,[{min_bin_vheap_size,46368},
                      {min_heap_size,233},
                      {fullsweep_after,10},
                      {minor_gcs,6}]},
 {suspending,[]}]

节点2:

Node: mfmn1@127.0.0.1, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
 {current_function,{gen_server,loop,6}},
 {initial_call,{proc_lib,init_p,5}},
 {status,waiting},
 {message_queue_len,0},
 {messages,[]},
 {links,[<0.183.0>,<0.274.0>,<0.337.0>,<0.375.0>,<0.387.0>,<0.371.0>,
         <0.310.0>,<0.326.0>,<0.226.0>,<0.262.0>,<0.210.0>,<0.218.0>,
         <0.153.0>,<0.165.0>,<0.177.0>,<0.171.0>,<0.159.0>,<0.135.0>,
         <0.147.0>,<0.141.0>,<0.123.0>,<0.129.0>,<0.77.0>]},
 {dictionary,[{‘$ancestors,[riak_core_sup,<0.76.0>]},
              {‘$initial_call,{supervisor_pre_r14b04,init,1}}]},
 {trap_exit,true},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.75.0>},
 {total_heap_size,1974},
 {heap_size,987},
 {stack_size,9},
 {reductions,8777},
 {garbage_collection,[{min_bin_vheap_size,46368},
                      {min_heap_size,233},
                      {fullsweep_after,10},
                      {minor_gcs,2}]},
 {suspending,[]}]

节点3:

Node: mfmn3@127.0.0.1, Process: <0.80.0>
[{registered_name,riak_core_vnode_sup},
 {current_function,{gen_server,loop,6}},
 {initial_call,{proc_lib,init_p,5}},
 {status,waiting},
 {message_queue_len,0},
 {messages,[]},
 {links,[<0.152.0>,<0.167.0>,<0.179.0>,<0.185.0>,<0.182.0>,<0.173.0>,
         <0.176.0>,<0.170.0>,<0.161.0>,<0.164.0>,<0.158.0>,<0.155.0>,
         <0.137.0>,<0.143.0>,<0.149.0>,<0.146.0>,<0.140.0>,<0.128.0>,
         <0.134.0>,<0.131.0>,<0.125.0>,<0.77.0>]},
 {dictionary,[{‘$ancestors,[riak_core_sup,<0.76.0>]},
              {‘$initial_call,{supervisor_pre_r14b04,init,1}}]},
 {trap_exit,true},
 {error_handler,error_handler},
 {priority,normal},
 {group_leader,<0.75.0>},
 {total_heap_size,3194},
 {heap_size,2584},
 {stack_size,9},
 {reductions,4507},
 {garbage_collection,[{min_bin_vheap_size,46368},
                      {min_heap_size,233},
                      {fullsweep_after,10},
                      {minor_gcs,10}]},
 {suspending,[]}]

3个节点的vnode worker 加起来刚好22 + 23 + 22 - 3 = 64

(mfmn3@127.0.0.1)3> supervisor:count_children(riak_core_vnode_sup).
[{specs,1},{active,21},{supervisors,0},{workers,21}]
(mfmn3@127.0.0.1)4>

减去3的原因。

从上面那张图可以看出, riak_core_vnode_master负责与vnode的通信,这些vnode都是fsm,如:

获取当前节点的所有vnode:

(mfmn3@127.0.0.1)8> riak_core_vnode_master:all_nodes(mfmn_vnode).
[<0.173.0>,<0.179.0>,<0.185.0>,<0.143.0>,<0.155.0>,
 <0.164.0>,<0.182.0>,<0.170.0>,<0.161.0>,<0.140.0>,<0.146.0>,
 <0.152.0>,<0.158.0>,<0.176.0>,<0.167.0>,<0.149.0>,<0.137.0>,
 <0.125.0>,<0.128.0>,<0.131.0>,<0.134.0>]
(mfmn3@127.0.0.1)9>

这也再次证明了该节点的vnode个数为21.

向某个vnode发出Ping请求

这个例子是try-try-try的例子:


前面会携带一个Pid,而这个Pid是根据hash在ets表中索引出来的,这个Pid是vnode 的Pid,根据这个Pid可以索引到具体的vnode,最后Mod:handle_command是用户的回调函数。

如果master没有找到对应的vnode,那么他会新建一个vnode:

get_vnode(Idx, State=#state{vnode_mod=Mod}) ->
    case idx2vnode(Idx, State) of
        no_match ->
            {ok, Pid} = riak_core_vnode_sup:start_vnode(Mod, Idx),
            MonRef = erlang:monitor(process, Pid),
            add_vnode_rec(#idxrec{idx=Idx,pid=Pid,monref=MonRef}, State),
            Pid;
        X -> X
    end.

因为vnode下面是存储后端,所以只要定位到vnode就可以访问后端存储。


下面是try try try对应的代码:

start(_StartType, _StartArgs) ->
    case mfmn_sup:start_link() of
        {ok, Pid} ->
            ok = riak_core:register([{vnode_module, mfmn_vnode}]),
            ok = riak_core_ring_events:add_guarded_handler(mfmn_ring_event_handler, []),
            ok = riak_core_node_watcher_events:add_guarded_handler(mfmn_node_event_handler, []),
            ok = riak_core_node_watcher:service_up(mfmn, self()),
            {ok, Pid};
        {error, Reason} ->
            {error, Reason}
    end.

stop(_State) ->
    ok.

启动master, 注册vnode, 铁添加mfmn_ring_event_handler,mfmn_node_event_handler...

下面就讲解一下try try try 的例子,高手勿喷!!!



第二个例子 Riak Core, The vnode

这是一个Real Time Stastics,简称RTS--实时统计应用。

这个系统要解决的两个问题是解析记录和分发记录,这交给entry vnode处理;第二个时接收实时统计,交给stat vnode处理。

  • ###什么是vnode

    • vnode是一个虚拟节点,和物理节点不一样
    • 一个虚拟节点对应一个erlang process
    • 一个虚拟节点是一个behaviour - gen_fsm behaviour
    • 一个虚拟节点处理进来的请求
    • 一个虚拟节点可能会存储数据,这些数据可以被以后检索到
    • 很多虚拟节点会运行在同一个物理节点上
    • 每个虚拟机都有一个主虚拟节点,他主要用于和它的所有存活的节点保持联系

    如你所见,虚拟节点要处理很多东西,不过Basho已经帮我们处理掉,我们只要实现所提供的vnode behaviour即可。用户只要理解输入和输出,然后定义所需的回调函数即可。

  • 生命周期

    init/1和termiante/2回调函数是虚拟节点的生命边缘,既是起止和终止位置,当一个连接到vnode的进程崩溃,那么handle_exit/3将会被调用。

  • ###init([Index]) -> Result

?Index :: int() >= 0 
Result :: {ok, State} 
State :: term()

rts注册了3种vnode -- rts_vnoderts_entry_vnode、 rts_stat_vnode、 对应的master vnode 分别为:rts_vnode_masterrts_entry_vnode_masterrts_stat_vnode_master

每种vnode负责不同服务.

rts提供了http的接口方便用户录入数据,其中rts_wm_entry:process_post的数据结构如下

Client: "progski"
Entry: "0.0.0.0 - - [21/Mar/2011:18:18:19 +0000] \"GET /blog/2011/aol_meet_riak.html HTTP/1.1\" 200 5865 \"http://www.google.com/\" \"Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/534.16 (KHTML, like Gecko) Chrome/10.0.648.151 Safari/534.16\""

Client是?gunzip -c progski.access.log.gz | ./replay progski传递过来的.
然后,根据rts_entry来索引可用vnode,

PrefList = riak_core_apl:get_apl(DocIdx, 1, rts_entry),
    [IdxNode] = PrefList,
    rts_entry_vnode:entry(IdxNode, Client, Entry).

发送命令:

riak_core_vnode_master:command(IdxNode,
                                   {entry, Client, Entry},
                                   ?MASTER).

其中?MASTER为rts_entry_vnode_master.

如图:


最后交给stat vnode 进行统计。

Riak Core Guide