首页 > 代码库 > Riak VClock
Riak VClock
Riak VClock
关于向量时钟的概念,在这里就多讲了,大家可以参照一下Dynamo的论文了解一下,向量时钟在分布式主要用于解决一致性性问题,可以和CRDTs
一起看。
下面的源代码是参照riak
中的,就是把它翻译为elixir
格式而已,基本不变。
时钟主要出现的情况有网络分区和并行更新。
这样只会丢掉一些向量时钟的信息,即数据更新过程的信息,但是不会丢掉实实在在的数据。只有当一种情况会有问题,就是一个客户端保持了一个很久之前的向量时钟,然后继承于这个向量时钟提交了一个数据,此时就会有冲突,因为服务器这边已经没有这个很久之前的向量时钟信息了,已经被剪枝掉了可能,所以客户端提交的此次数据,在服务端无法找到一个祖先,此时就会创建一个sibling。 所以这个剪枝的策略是一个权衡tradeoff,一方面是无限增长的向量时钟的空间,另一方面是偶尔的会有"false merge",对,但肯定的是,不会悄无声息的丢数据。综上,为了防止向量时钟空间的无限增长,剪枝还是比用server标识向量时钟工作的更好。
- 结构:
主要有3个元祖{node, {opCount, TS}}
,分布为节点(协调器),操作数和操作时间。
- 主要的方法:
merge(合并):
合并的规则是,opCount>TS
:当节相同时,谁的opCount大,谁赢;如果opCount一样时,谁的时间大谁赢。
@doc """ Combine all VClock in the input list into their least possible common descendant """ @spec merge(list, list) :: list def merge([]), do: [] def merge([singevclock]), do: singevclock ## first is a list, eg [:a, {1, 1234}] # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]] def merge([first|rest]) do merge(rest, :lists.keysort(1, first)) end def merge([], nclock), do: nclock def merge([aclock|vclocks], nclock) do merge(vclocks, merge(:lists.keysort(1, aclock), nclock, [])) end def merge([], [], accclock), do: :lists.reverse(accclock) def merge([], left, accclock), do: :lists.reverse(accclock, left) def merge(left, [], accclock), do: :lists.reverse(accclock, left) def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock], n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do cond do node1 < node2 -> merge(vclock, n, [nct1|accclock]); node1 > node2 -> merge(v, nclock, [nct2|accclock]); true -> ({_ctr, _ts} = ct) = cond do ctr1 > ctr2 -> ct1; ctr1 < ctr2 -> ct2; true -> {ctr1, :erlang.max(ts1, ts2)} end merge(vclock, nclock, [{node1, ct}|accclock]) end end
prune(裁剪):
裁剪的法则主要是空间
和时间
两方面.
!()[../pic/riak_4.png]
最终的裁剪函数prune_vclock1(v, now, bprops, headtime)
.
@doc """ Possibly shrink the size of a vclock, depending on current age and size """ @spec prune(v :: list, now :: integer, bucketprops :: any) :: list def prune(v, now, bucketprops) do ## This sort need to be deterministic, to avoid spurious merge conflicts later, # We achieve this by using the node ID as secondary key sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v) prune_vclock1(sortv, now, bucketprops) end def prune_vclock1(v, now, bprops) do case get_property(:small_vclock, bprops) >= :erlang.length(v) do true -> v; false -> {_, {_, headtime}} = hd(v) case (now - headtime) < get_property(:young_vclock, bprops) do true -> v; false -> prune_vclock1(v, now, bprops, headtime) end end end def prune_vclock1(v, now, bprops, headtime) do # has a precondition that v is longer than small and older than young case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do true -> prune_vclock1(tl(v), now, bprops); false -> v end end def get_property(key, pairlist) do case :lists.keyfind(key, 1, pairlist) do {_key, value} -> value; false -> :undefined end end
- source
defmodule VClock do @moduledoc """ this is !!!!!!!! """ @vsn 0.1 @spec fresh() :: [] def fresh do [] end # return true if va is a direct descendant of vb, else false -- remember, a vclock is its own descendant! @spec descends(any, []) :: (true|false) def descends(_, []) do true end @type va :: list() @spec descends(any, any) :: (false|true) def descends(va, vb) do [{nodeb, {ctrb, _}} | resetb] = vb case :lists.keyfind(nodeb, 1, va) do false -> false; {_, {ctra, _tsa}} -> (ctra >= ctrb) && descends(va, resetb) end end @doc """ Combine all VClock in the input list into their least possible common descendant """ @spec merge(list, list) :: list def merge([]), do: [] def merge([singevclock]), do: singevclock ## first is a list, eg [:a, {1, 1234}] # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]] def merge([first|rest]) do merge(rest, :lists.keysort(1, first)) end def merge([], nclock), do: nclock def merge([aclock|vclocks], nclock) do merge(vclocks, merge(:lists.keysort(1, aclock), nclock, [])) end def merge([], [], accclock), do: :lists.reverse(accclock) def merge([], left, accclock), do: :lists.reverse(accclock, left) def merge(left, [], accclock), do: :lists.reverse(accclock, left) def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock], n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do cond do node1 < node2 -> merge(vclock, n, [nct1|accclock]); node1 > node2 -> merge(v, nclock, [nct2|accclock]); true -> ({_ctr, _ts} = ct) = cond do ctr1 > ctr2 -> ct1; ctr1 < ctr2 -> ct2; true -> {ctr1, :erlang.max(ts1, ts2)} end merge(vclock, nclock, [{node1, ct}|accclock]) end end @doc """ get the counter value in vclock set from node """ @spec get_counter(node :: atom, vclock::list) :: (integer|:undefined) def get_counter(node, vclock) do case :lists.keytake(node, 1, vclock) do {_, {c, _}} -> c; false -> :undefined end end @doc """ Get the timestamp value in a VClock set from node """ @spec get_timestamp(node :: atom, vclock :: list) :: (integer | :undefined) def get_timestamp(node, vclock) do case :lists.keytake(node, 1, vclock) do {_, {_, ts}} -> ts; false -> :undefined end end @doc """ increment VClock at node """ @spec increment(atom, list) :: integer def increment(node, vclock) do increment(node, timestamp(), vclock) end @spec increment(atom, integer, list) :: list def increment(node, incts, vclock) do IO.puts "#{inspect node}, #{inspect incts}, #{inspect vclock}" {{_ctr, _ts} = c1, newv} = case :lists.keytake(node, 1, vclock) do false -> {{1, incts}, vclock}; {:value, {_n, {c, _t}}, modv} -> {{c + 1, incts}, modv} end [{node, c1} | newv] end # retrun the list of all nodes that have ever incremented VClock @spec all_nodes(vclock :: list) :: (list) def all_nodes(vclock) do vclock |> Enum.map(fn({x, {_, _}}) -> x end) end @days_from_gergorian_base_to_epoch (1978 * 365 + 478) @seconds_from_gergorian_base_to_epoch (@days_from_gergorian_base_to_epoch * 24 * 60 * 60) @spec timestamp() :: integer def timestamp do {megaseconds, seconds, _} = :os.timestamp() @days_from_gergorian_base_to_epoch + megaseconds * 1000000 + seconds end @doc """ Compares two VClock for equality """ @spec equal(va :: list, vb :: list) :: (true | false) def equal(va, vb) do Enum.sort(va) === Enum.sort(vb) end @doc """ Possibly shrink the size of a vclock, depending on current age and size """ @spec prune(v :: list, now :: integer, bucketprops :: any) :: list def prune(v, now, bucketprops) do ## This sort need to be deterministic, to avoid spurious merge conflicts later, # We achieve this by using the node ID as secondary key sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v) prune_vclock1(sortv, now, bucketprops) end def prune_vclock1(v, now, bprops) do case get_property(:small_vclock, bprops) >= :erlang.length(v) do true -> v; false -> {_, {_, headtime}} = hd(v) case (now - headtime) < get_property(:young_vclock, bprops) do true -> v; false -> prune_vclock1(v, now, bprops, headtime) end end end def prune_vclock1(v, now, bprops, headtime) do # has a precondition that v is longer than small and older than young case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do true -> prune_vclock1(tl(v), now, bprops); false -> v end end def get_property(key, pairlist) do case :lists.keyfind(key, 1, pairlist) do {_key, value} -> value; false -> :undefined end end end
Riak VClock
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。