首页 > 代码库 > [Python] Spark平台下实现分布式AC自动机(一)
[Python] Spark平台下实现分布式AC自动机(一)
转载请注明出处:http://www.cnblogs.com/kirai/ 作者:Kirai
零.问题的提出
最近希望在分布式平台上实现一个AC自动机,但是如何在这样的分布式平台上表示这样的非线性数据结构就难住我了。因为一直在使用RDD提供的一些基本的操作,没有需要什么复杂的操作。所以突然想到了在分布式的平台上实现一个AC自动机一定很有趣。网上搜了下,发现没有人实现,因此决定尝试实现。或许就是一个玩具,不过也是能帮助自己更深理解分布式平台上进行编程和普通编程的区别吧。
这个问题对我来讲还是有一定的难度的,加上课业、复习考研、竞赛三重压力,对于这个问题的研究时间可能会比较长,不过一定会抽出时间来考虑的。
文章仅仅是在记录自己对问题的思考过程和代码,并不能保证每一步都是合理、有用、正确的。
一.实现可持久化字典树
AC自动机是个什么东西就不赘述了,我首先用python实现了一个比较朴素的版本,这里查询是返回所有字典中出现的单词的起始位置,每一个单词一个list,最终组成一个dict。哈哈,也许有人看出来了这是去年的一道ICPC网赛题,没错。但是我忘记是哪一道了,只记得当时没做出来,所以用python写一个就当是对自己的一个补偿吧:)
1 # -*- coding: utf-8 -*- 2 class Node: 3 """ 4 A Trie‘s basic data structure. 5 """ 6 def __init__(self): 7 self.next_letter = {} 8 self.is_word = False 9 self.letter = ‘‘ 10 self.depth = -1 11 self.pre = None 12 self.fail = None 13 14 15 class Trie: 16 def __init__(self): 17 self.root = Node() 18 19 20 def insert(self, word): 21 """ 22 insert a word into the trie. 23 :param word: the word 24 :type word: str 25 :return: None 26 """ 27 cur_node = self.root 28 pre = self.root 29 depth = 1 30 for letter in word: 31 if not cur_node.next_letter.has_key(letter): 32 cur_node.next_letter[letter] = Node() 33 cur_node = cur_node.next_letter[letter] 34 35 cur_node.pre = pre 36 cur_node.letter = letter 37 cur_node.depth = depth 38 39 depth += 1 40 pre = cur_node 41 42 cur_node.is_word = True 43 44 45 def in_trie(self, word): 46 """ 47 judge if the word is in the trie or not. 48 :param word: the word 49 :type word: str 50 :return: if the word is in the trie or not. 51 :rtype: bool 52 """ 53 cur_node = self.root 54 for letter in word: 55 if not cur_node.next_letter.has_key(letter): 56 return False 57 cur_node = cur_node.next_letter[letter] 58 return cur_node.is_word 59 60 61 class AcAutomation: 62 def __init__(self): 63 self._trie = Trie() 64 self._dict = set([]) 65 66 67 def in_trie(self, word): 68 return self._trie.in_trie(word) 69 70 71 def insert(self, word): 72 map(self._dict.add, word) 73 self._trie.insert(word) 74 75 76 def build_ac_automation(self): 77 """ 78 build the fail pointers to make the ac automation work. 79 :return: 80 """ 81 queue = [] 82 queue.append(self._trie.root) 83 cur_node, tmp_node = None, None 84 85 while len(queue) != 0: 86 cur_node = queue.pop(0) 87 for letter in cur_node.next_letter: 88 if cur_node == self._trie.root: 89 cur_node.next_letter[letter].fail = self._trie.root 90 else: 91 tmp_node = cur_node.fail 92 while tmp_node != None: 93 if tmp_node.next_letter.has_key(letter): 94 cur_node.next_letter[letter].fail = 95 tmp_node.next_letter[letter] 96 break 97 tmp_node = tmp_node.fail 98 if tmp_node == None: 99 cur_node.next_letter[letter].fail = self._trie.root 100 101 queue.append(cur_node.next_letter[letter]) 102 103 104 def get_word_position(self, sentence): 105 """ 106 find the word‘s positions in the sentence according to 107 the dictionary. 108 :param sentence: 109 :rtype: Dict[List[]] 110 :return: an dictionary include the word that appears in the sentence 111 and the start and end positions. 112 """ 113 cur_node = self._trie.root 114 tmp_node = None 115 result = {} 116 length = len(sentence) 117 for idx in range(0, length): 118 letter = sentence[idx] 119 if letter not in self._dict: 120 cur_node = self._trie.root 121 continue 122 123 if cur_node.next_letter.has_key(letter): 124 cur_node = cur_node.next_letter[letter] 125 else: 126 while cur_node != None and cur_node.next_letter.has_key(letter) == False: 127 cur_node = cur_node.fail 128 if cur_node == None: 129 cur_node = self._trie.root 130 131 if cur_node.next_letter.has_key(letter): 132 cur_node = cur_node.next_letter[letter] 133 134 tmp_node = cur_node 135 while tmp_node != self._trie.root: 136 if tmp_node.is_word == True: 137 word = ‘‘ 138 word_node = tmp_node 139 while word_node != self._trie.root: 140 word += word_node.letter 141 word_node = word_node.pre 142 word = word[::-1] 143 if not result.has_key(word): 144 result[word] = [] 145 result[word].append((idx - tmp_node.depth + 1, idx)) 146 tmp_node = tmp_node.fail 147 148 return result 149 150 151 s = AcAutomation() 152 s.insert(‘AA‘) 153 s.insert(‘BB‘) 154 s.insert(‘CC‘) 155 s.insert(‘ACM‘) 156 s.build_ac_automation() 157 q = s.get_word_position(‘ooxxCC%dAAAAAAAAaaAAaoenGGACM‘) 158 159 print q
返回的结果是这样的:
1 {‘CC‘: [(4, 5)], ‘AA‘: [(8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (18, 19)], ‘ACM‘: [(26, 28)]}
如何在分布式的平台上实现这样的运算?显然这种实现是有内存共享的,如何做到消除这些共享?
确实想不到,看来还要多看资料学习。但是可以肯定的一点是,AC自动机的结构一定不允许是这样的。
由于Spark使用RDD对分布式内存抽象,支持的操作均是熟悉的函数式操作。所以考虑是不是可以把Trie“扁平化”成这样的结构?
于是定义:
{ letter: [{next level..}, is_word depth] }
代表一个节点,letter代表当前节点的字符,后接一个list,list第一个元素表示此节点的后继,后面跟着的是一些属性。这些属性是可以扩充的。稍微转换一下思路,就可以把这个字典树写出来:
1 def build_trie(): 2 """ 3 build a empty trie 4 :return: 5 """ 6 return {} 7 8 9 def insert(root, word, depth=1): 10 """ 11 insert a word into the trie recursively. 12 :param root: 13 :param word: 14 :param depth: 15 :return: 16 """ 17 letter = word[0] 18 if len(word) == 1: 19 if(root.has_key(letter)): 20 root[letter][1] = True 21 return root 22 else: 23 return {letter: [{}, True, depth]} 24 if root.has_key(letter) == False: 25 root[letter] = [{}, False, depth] 26 root[letter][0] = insert(root[letter][0], word[1:], depth+1) 27 else: 28 root[letter][0] = dict(root[letter][0], **insert(root[letter][0], word[1:], depth+1)) 29 return root 30 31 32 def find(root, word): 33 """ 34 find a word if it‘s in the trie or not. 35 :param root: 36 :param word: 37 :return: 38 """ 39 letter = word[0] 40 if root.has_key(letter) == False: 41 return False 42 if len(word) == 1: 43 if root[letter][1] == True: 44 return True 45 else: 46 return False 47 return find(root[letter][0], word[1:])
我考虑过字典树如何从一个分布式平台上构建,可以这样:
通过平时对单词的搜集,可以在单机上生成一个又一个小的字典树,保存在分布式文件系统上,在使用的时候只需要把这一个个小字典树合并成一个就行了。
所以这棵字典树仅仅这样是不够的,因为我希望这字典树支持这个合并的函数式操作,于是加上了归并的操作,这样这棵字典树变成了一棵可持久字典树:
1 def merge(a_root, b_root): 2 """ 3 merge two tries. 4 :param a_root: 5 :param b_root: 6 :return: 7 """ 8 def _merge(a_root, b_root): 9 for letter in b_root.keys(): 10 if a_root.has_key(letter): 11 if b_root[letter][1] == True: 12 a_root[letter][1] = True 13 a_root[letter][0] = dict(a_root[letter][0], **_merge(a_root[letter][0], b_root[letter][0])) 14 else: 15 a_root[letter] = copy.deepcopy(b_root[letter]) 16 return a_root 17 a_root = _merge(a_root, b_root) 18 return a_root
合并的操作思路很简单,就是同时深入,假如有一棵树上没有另一棵树上的儿子,整棵子树都拷贝过来即可(这也是用python的dict方便的地方)。
测试了一下:
1 if __name__ == ‘__main__‘: 2 trie_a = build_trie() 3 trie_b = build_trie() 4 5 trie_a = insert(trie_a, ‘acm‘) 6 trie_a = insert(trie_a, ‘acr‘) 7 trie_a = insert(trie_a, ‘ak‘) 8 trie_a = insert(trie_a, ‘bc‘) 9 10 trie_b = insert(trie_b, ‘ac‘) 11 trie_b = insert(trie_b, ‘cm‘) 12 trie_b = insert(trie_b, ‘br‘) 13 14 s = [trie_a, trie_b] 15 s = reduce(merge, s) 16 print json.dumps(s)
输出的结构dumps成json后格式化一下是这样的:
{ "a": [ { "c": [ { "r": [ {}, true, 3 ], "m": [ {}, true, 3 ] }, true, 2 ], "k": [ {}, true, 2 ] }, false, 1 ], "c": [ { "m": [ {}, true, 2 ] }, false, 1 ], "b": [ { "c": [ {}, true, 2 ], "r": [ {}, true, 2 ] }, false, 1 ] }
这样一棵可持久化的字典树就实现了,接下来考虑将这段程序翻译成Spark的程序,并且生成一些字典数据,看看是否符合自己的预期。
[Python] Spark平台下实现分布式AC自动机(一)