首页 > 代码库 > 多线程安全的滑动窗口设计实现
多线程安全的滑动窗口设计实现
滑动窗口是日志模块重要的数据结构,用于日志发送接收以及日志索引查询,和组内同学讨论了的多线程安全的滑动窗口设计,有三种实现方案,写此文档记录下。
1.接口描述
滑动窗口内部使用数组,每个数组项的是一个结构体:
Structentry
{
Struct ValueNode *head;
Struct ValueNode *tail;
Int64_t cnt;
Int64_tstat;
}
由于在对同一项多次写入不同值的情况下,写入的多个值会以链表组织,head指向链表头,tail指向链表尾,cnt表明读取当前entry的引用计数,包括链表中所有节点。
StructValueNode的定义如下:
StructValueNode
{
Void *value
Struct ValueNode *next;
}
滑动窗口需要提供以下接口:
1. Init(int64-_t size)
初始化滑动窗口,size 用于指明滑动窗口的大小。
2. set(int64-_t id, const void*val)
set接口用于向滑动窗口中写入数据,id用于指明所写入数据的序号,val指向写入 的数据指针。对同一个id插入不同的值,会发生覆盖。
3. get(int64-_t id, void* &val)
get接口用于从滑动窗口中读出数据,id用于指明所读数据的序号,val指向所读到 的数据的指针。
4. revert (int64-_t id)
读取某一项结束时候,需要调用revert接口。
5. move_foward ()
move_foward用于将滑动窗口向前移动,对于移除滑动窗口的项,需要调用其revert 接口,将entry重置,方便后续复用此接口。
2方案一:读写锁保护start_id
方案一是并发度较低但思路比较简单的实现方案,此方案中,滑动窗口需要维护的成员变量:
1. size:此变量用于指明滑动窗口的大小;
2. start_id:此变量用于指明滑动窗口中最小的id;
3. end_id:此变量用于指明滑动窗口中最大的id;
4. rw_lock:用于保护start_id;
接口实现描述:
1. Init(int64_t size):
将size记录到成员变量中,并申请数组内存(大小为size),将start_id end_id设置为0;
2. set(int64_t id, const void*val)
1) 对rw_lock 加读锁;
2) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;
3) 使用id对size取模,找到对应的entry,读取此entry的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);
4) 对rw_lock解锁。
3. get(int64_t id, void* &val)
1) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;
2) 将当前start_id记录到临时变量tmp_start_id中;
3) 根据id取模,找到对应的entry,如果cnt == 0,跳转步骤7;
4) 将cnt引用计数递增,然后将tail指针所值的vallue 赋值给val;
5) 读取当前start_id和tmp_start_id比较,如果不相等,则跳转步骤1;
6) 返回;
4. revert (int64-_t id)
1) 根据id找到都应的entry,如果cnt ==0,报错;
2) 递减cnt,如果递减后大于0,则退出;
3) 如果递减后的cnt ==0,则遍历head指向的链表,调用每个value的revert函数,并将每个ValueNode内存释放;
5. Move_forwad()
1) 调用get接口,读取start_id位置的状态,根据注册的函数,判断是否可以将其移动出滑动窗口,如果不可以则返回;
2) 将rw_lock加写锁,从start_id开始,扫描滑动窗口,对于每个entry采用以下操作:
a) 根据注册的函数判断是否可以移出滑动窗口,如果不可以,跳转到步骤3;
b) 如果可以,则递减其引用计数,如果递减后不为0,则需要阻塞等待;
c) 将start_id递增1;
3) 对rw_lock解锁,函数返回。
3.方案二:无锁(一)
在方案一中,读写锁在一定程度上影响了并发度,方案二将介绍一种可以不用读写锁的线程安全实现。需要补充的是,方案二需要在Entry中增加stat字段,stat有三个可选值:
1. NULL
表明此entry无人使用,可以写入数据;
2. USE
表明此entry正在被使用,可以读;
3. LOCKED
表明此entry正在处于NULL和USE中间状态,不可读也不可写入;
此外,滑动窗口还需要维护成员变量Last_start_id,其代表在一次move_forward()过程中,上一次的start_id,last_start_id 和start之间的entry,是需要调用reset()清除掉的。
在这些基础上,接口描述如下:
1. Get():
与方案一一致
2. Revert():
与方案一一致。
3. set():
1) 判断是否满足start_id <= id < last_start_id+ size,如果不满足则跳转到步骤5;
2) 使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;
3) 将entry状态修改为LOCKED,再次判断是否满足start_id <= id <last_start_id+ size;如果不满足则将entry状态修改回原来状态,并跳转步骤5;
4) 读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;
5) 返回
4. move_foward()
1) 先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;
2) 将start_id记录到临时变量tmp_start,然后将start修改为target_start,这需要在一个原子操作中完成
3) 对于tmp_start到target_start中间的每一个entry,执行以下操作:
a) 判断其stat是否为LOCKED,如果是则阻塞等待;
b) 将stat修改为LOCKED,;
c) 递减其引用计数,如果递减后不为0,则需要阻塞等待,如果为0,则释放其内存,并设置其状态为NULL;
4) 比较last_start_id 和tmp_start的大小关系,如果相等,则将last_start 修改为target_start ,否则阻塞等待。
在这个方案中,其实是通过给每个entry添加状态值,对每个entry的修改做并发控制,相对于方案一,减小了锁粒度。
在move_foward()接口实现的第四步中,比较last_start和tmp_start的大小关系,事实上是为了保证,当多个线程同时调用move_foward()接口,同时修改last_start_id时,能够做到串行化,即保证last_start_id顺序递增修改。
4.方案二:无锁(二)
下面介绍第二种无锁实现,在此方案中,无需维护last_log_id,但还需要维护entry状态。
在这些基础上,接口描述如下:
1. Get():
与方案一基本一致,但需要判断所读的ENTRY状态,如果是LOCKED,则需要返回步骤一重新判断。
2. Revert():
与方案一一致。
3. set():
1) 判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤5;
2) 使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;
3) 将entry状态修改为LOCKED,再次判断是否满足start_id <= id<start_id + size:如果不满足则将entry状态修改回原来状态,并跳转步骤5;
4) 读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤5;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;
5) 返回。
4. move_foward()
1) 先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;
2) 将start_id记录到临时变量,tmp_start中,对于start到target_start中间的每一个entry,执行以下操作:
a) 判断其stat是否为LOCKED,如果是则阻塞等待;
b) 将stat修改为LOCKED,重新读取start,判断start_id是否等于tmp_start,如果不等,则跳转到步骤2开始;
c) 递减其引用计数,如果递减后不为0,则需要阻塞等待;如果为0,则释放其内存,并设置其状态为NULL;
d) 将start_id递增1;
4.特殊需求:
在我们的设计中,新当选的leader需要写一条sync barrier日志,之后才能处理滑动窗口中的未决日志。如果此时滑动窗口中普通的未决日志已经写满,则无法再写入sync barrier日志,导致恢复流程失败。
因此,滑动窗口需要提供一种特殊接口:set_common_entry()和set_special_entry(),同时初始化时候需要传入common_size和special_size,通常,special_size>common_size。两个不同的接口使用不同的size,保证sync_barrier日志可以写入滑动窗口。
多线程安全的滑动窗口设计实现