首页 > 代码库 > 多线程安全的滑动窗口设计实现

多线程安全的滑动窗口设计实现

滑动窗口是日志模块重要的数据结构,用于日志发送接收以及日志索引查询,和组内同学讨论了的多线程安全的滑动窗口设计,有三种实现方案,写此文档记录下。

1.接口描述

    滑动窗口内部使用数组,每个数组项的是一个结构体:

    Structentry

    {

        Struct ValueNode *head;

    Struct ValueNode *tail;

        Int64_t cnt;

       Int64_tstat;

    }

    由于在对同一项多次写入不同值的情况下,写入的多个值会以链表组织,head指向链表头,tail指向链表尾,cnt表明读取当前entry的引用计数,包括链表中所有节点。

    StructValueNode的定义如下:

    StructValueNode

    {

        Void *value

        Struct ValueNode *next;

    }

 

    滑动窗口需要提供以下接口:

1.  Init(int64-_t size)

    初始化滑动窗口,size 用于指明滑动窗口的大小。

 

2.  set(int64-_t id, const void*val)

    set接口用于向滑动窗口中写入数据,id用于指明所写入数据的序号,val指向写入    的数据指针。对同一个id插入不同的值,会发生覆盖。

 

3.  get(int64-_t id,  void* &val)

    get接口用于从滑动窗口中读出数据,id用于指明所读数据的序号,val指向所读到    的数据的指针。

 

4.  revert (int64-_t id)

    读取某一项结束时候,需要调用revert接口。

 

5.  move_foward ()

    move_foward用于将滑动窗口向前移动,对于移除滑动窗口的项,需要调用其revert   接口,将entry重置,方便后续复用此接口。

2方案一:读写锁保护start_id

    方案一是并发度较低但思路比较简单的实现方案,此方案中,滑动窗口需要维护的成员变量:

1.  size:此变量用于指明滑动窗口的大小;

2.  start_id:此变量用于指明滑动窗口中最小的id;

3.  end_id:此变量用于指明滑动窗口中最大的id;

4.  rw_lock:用于保护start_id;

 

    接口实现描述:

1.  Init(int64_t size):

将size记录到成员变量中,并申请数组内存(大小为size),将start_id end_id设置为0;

 

2.  set(int64_t id, const void*val)

1)  对rw_lock 加读锁;

2)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

3)  使用id对size取模,找到对应的entry,读取此entry的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);

4)  对rw_lock解锁。

 

3.  get(int64_t id,  void* &val)

1)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

2)  将当前start_id记录到临时变量tmp_start_id中;

3)  根据id取模,找到对应的entry,如果cnt == 0,跳转步骤7;

4)  将cnt引用计数递增,然后将tail指针所值的vallue 赋值给val;

5)  读取当前start_id和tmp_start_id比较,如果不相等,则跳转步骤1;

6)  返回;

 

4.  revert (int64-_t id)

1)  根据id找到都应的entry,如果cnt ==0,报错;

2)  递减cnt,如果递减后大于0,则退出;

3)  如果递减后的cnt ==0,则遍历head指向的链表,调用每个value的revert函数,并将每个ValueNode内存释放;

 

5.  Move_forwad()

1)  调用get接口,读取start_id位置的状态,根据注册的函数,判断是否可以将其移动出滑动窗口,如果不可以则返回;

2)  将rw_lock加写锁,从start_id开始,扫描滑动窗口,对于每个entry采用以下操作:

a)  根据注册的函数判断是否可以移出滑动窗口,如果不可以,跳转到步骤3;

b)  如果可以,则递减其引用计数,如果递减后不为0,则需要阻塞等待;

c)  将start_id递增1;

3)  对rw_lock解锁,函数返回。

 

3.方案二:无锁(一)

    在方案一中,读写锁在一定程度上影响了并发度,方案二将介绍一种可以不用读写锁的线程安全实现。需要补充的是,方案二需要在Entry中增加stat字段,stat有三个可选值:

1.  NULL

表明此entry无人使用,可以写入数据;

2.  USE

表明此entry正在被使用,可以读;

3.  LOCKED

表明此entry正在处于NULL和USE中间状态,不可读也不可写入;

 

    此外,滑动窗口还需要维护成员变量Last_start_id,其代表在一次move_forward()过程中,上一次的start_id,last_start_id 和start之间的entry,是需要调用reset()清除掉的。

    在这些基础上,接口描述如下:

1.  Get():

与方案一一致

2.  Revert():

与方案一一致。

3.  set():

1)  判断是否满足start_id <= id < last_start_id+ size,如果不满足则跳转到步骤5;

2)  使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3)  将entry状态修改为LOCKED,再次判断是否满足start_id <= id <last_start_id+ size;如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4)  读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5)  返回

 

4.  move_foward()

1)  先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2)  将start_id记录到临时变量tmp_start,然后将start修改为target_start,这需要在一个原子操作中完成

3)  对于tmp_start到target_start中间的每一个entry,执行以下操作:

a)  判断其stat是否为LOCKED,如果是则阻塞等待;

b)  将stat修改为LOCKED,;

c)  递减其引用计数,如果递减后不为0,则需要阻塞等待,如果为0,则释放其内存,并设置其状态为NULL;

4)  比较last_start_id 和tmp_start的大小关系,如果相等,则将last_start 修改为target_start ,否则阻塞等待。

    在这个方案中,其实是通过给每个entry添加状态值,对每个entry的修改做并发控制,相对于方案一,减小了锁粒度。

    在move_foward()接口实现的第四步中,比较last_start和tmp_start的大小关系,事实上是为了保证,当多个线程同时调用move_foward()接口,同时修改last_start_id时,能够做到串行化,即保证last_start_id顺序递增修改。

4.方案二:无锁(二)

    下面介绍第二种无锁实现,在此方案中,无需维护last_log_id,但还需要维护entry状态。

    在这些基础上,接口描述如下:

1.  Get():

与方案一基本一致,但需要判断所读的ENTRY状态,如果是LOCKED,则需要返回步骤一重新判断。

2.  Revert():

与方案一一致。

3.  set():

1)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤5;

2)  使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3)  将entry状态修改为LOCKED,再次判断是否满足start_id <= id<start_id + size:如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4)  读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤5;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5)  返回。

 

4.  move_foward()

1)  先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2)  将start_id记录到临时变量,tmp_start中,对于start到target_start中间的每一个entry,执行以下操作:

a)  判断其stat是否为LOCKED,如果是则阻塞等待;

b)  将stat修改为LOCKED,重新读取start,判断start_id是否等于tmp_start,如果不等,则跳转到步骤2开始;

c)  递减其引用计数,如果递减后不为0,则需要阻塞等待;如果为0,则释放其内存,并设置其状态为NULL;

d)  将start_id递增1;

 

4.特殊需求:

    在我们的设计中,新当选的leader需要写一条sync barrier日志,之后才能处理滑动窗口中的未决日志。如果此时滑动窗口中普通的未决日志已经写满,则无法再写入sync barrier日志,导致恢复流程失败。

    因此,滑动窗口需要提供一种特殊接口:set_common_entry()和set_special_entry(),同时初始化时候需要传入common_size和special_size,通常,special_size>common_size。两个不同的接口使用不同的size,保证sync_barrier日志可以写入滑动窗口。

 

多线程安全的滑动窗口设计实现