首页 > 代码库 > [Python] Spark平台下实现分布式AC自动机(一)

[Python] Spark平台下实现分布式AC自动机(一)

转载请注明出处:http://www.cnblogs.com/kirai/ 作者:Kirai

零.问题的提出

  最近希望在分布式平台上实现一个AC自动机,但是如何在这样的分布式平台上表示这样的非线性数据结构就难住我了。因为一直在使用RDD提供的一些基本的操作,没有需要什么复杂的操作。所以突然想到了在分布式的平台上实现一个AC自动机一定很有趣。网上搜了下,发现没有人实现,因此决定尝试实现。或许就是一个玩具,不过也是能帮助自己更深理解分布式平台上进行编程和普通编程的区别吧。

  这个问题对我来讲还是有一定的难度的,加上课业、复习考研、竞赛三重压力,对于这个问题的研究时间可能会比较长,不过一定会抽出时间来考虑的。

  文章仅仅是在记录自己对问题的思考过程和代码,并不能保证每一步都是合理、有用、正确的。

一.实现可持久化字典树

  AC自动机是个什么东西就不赘述了,我首先用python实现了一个比较朴素的版本,这里查询是返回所有字典中出现的单词的起始位置,每一个单词一个list,最终组成一个dict。哈哈,也许有人看出来了这是去年的一道ICPC网赛题,没错。但是我忘记是哪一道了,只记得当时没做出来,所以用python写一个就当是对自己的一个补偿吧:)

  1 # -*- coding: utf-8 -*-
  2 class Node:
  3   """
  4   A Trie‘s basic data structure.
  5   """
  6   def __init__(self):
  7     self.next_letter = {}
  8     self.is_word = False
  9     self.letter = ‘‘
 10     self.depth = -1
 11     self.pre = None
 12     self.fail = None
 13 
 14 
 15 class Trie:
 16   def __init__(self):
 17     self.root = Node()
 18 
 19 
 20   def insert(self, word):
 21     """
 22     insert a word into the trie.
 23     :param word: the word
 24     :type word: str
 25     :return: None
 26     """
 27     cur_node = self.root
 28     pre = self.root
 29     depth = 1
 30     for letter in word:
 31       if not cur_node.next_letter.has_key(letter):
 32         cur_node.next_letter[letter] = Node()
 33       cur_node = cur_node.next_letter[letter]
 34 
 35       cur_node.pre = pre
 36       cur_node.letter = letter
 37       cur_node.depth = depth
 38 
 39       depth += 1
 40       pre = cur_node
 41 
 42     cur_node.is_word = True
 43 
 44 
 45   def in_trie(self, word):
 46     """
 47     judge if the word is in the trie or not.
 48     :param word: the word
 49     :type word: str
 50     :return: if the word is in the trie or not.
 51     :rtype: bool
 52     """
 53     cur_node = self.root
 54     for letter in word:
 55       if not cur_node.next_letter.has_key(letter):
 56         return False
 57       cur_node = cur_node.next_letter[letter]
 58     return cur_node.is_word
 59 
 60 
 61 class AcAutomation:
 62   def __init__(self):
 63     self._trie = Trie()
 64     self._dict = set([])
 65 
 66 
 67   def in_trie(self, word):
 68     return self._trie.in_trie(word)
 69 
 70 
 71   def insert(self, word):
 72     map(self._dict.add, word)
 73     self._trie.insert(word)
 74 
 75 
 76   def build_ac_automation(self):
 77     """
 78     build the fail pointers to make the ac automation work.
 79     :return:
 80     """
 81     queue = []
 82     queue.append(self._trie.root)
 83     cur_node, tmp_node = None, None
 84 
 85     while len(queue) != 0:
 86       cur_node = queue.pop(0)
 87       for letter in cur_node.next_letter:
 88         if cur_node == self._trie.root:
 89           cur_node.next_letter[letter].fail = self._trie.root
 90         else:
 91           tmp_node = cur_node.fail
 92           while tmp_node != None:
 93             if tmp_node.next_letter.has_key(letter):
 94               cur_node.next_letter[letter].fail =  95                 tmp_node.next_letter[letter]
 96               break
 97             tmp_node = tmp_node.fail
 98           if tmp_node == None:
 99             cur_node.next_letter[letter].fail = self._trie.root
100 
101         queue.append(cur_node.next_letter[letter])
102 
103 
104   def get_word_position(self, sentence):
105     """
106     find the word‘s positions in the sentence according to
107     the dictionary.
108     :param sentence:
109     :rtype: Dict[List[]]
110     :return: an dictionary include the word that appears in the sentence
111              and the start and end positions.
112     """
113     cur_node = self._trie.root
114     tmp_node = None
115     result = {}
116     length = len(sentence)
117     for idx in range(0, length):
118       letter = sentence[idx]
119       if letter not in self._dict:
120         cur_node = self._trie.root
121         continue
122 
123       if cur_node.next_letter.has_key(letter):
124         cur_node = cur_node.next_letter[letter]
125       else:
126         while cur_node != None and cur_node.next_letter.has_key(letter) == False:
127           cur_node = cur_node.fail
128         if cur_node == None:
129           cur_node = self._trie.root
130 
131         if cur_node.next_letter.has_key(letter):
132           cur_node = cur_node.next_letter[letter]
133 
134       tmp_node = cur_node
135       while tmp_node != self._trie.root:
136         if tmp_node.is_word == True:
137           word = ‘‘
138           word_node = tmp_node
139           while word_node != self._trie.root:
140             word += word_node.letter
141             word_node = word_node.pre
142           word = word[::-1]
143           if not result.has_key(word):
144             result[word] = []
145           result[word].append((idx - tmp_node.depth + 1, idx))
146         tmp_node = tmp_node.fail
147 
148     return result
149 
150 
151 s = AcAutomation()
152 s.insert(AA)
153 s.insert(BB)
154 s.insert(CC)
155 s.insert(ACM)
156 s.build_ac_automation()
157 q = s.get_word_position(ooxxCC%dAAAAAAAAaaAAaoenGGACM)
158 
159 print q

返回的结果是这样的:

1 {CC: [(4, 5)], AA: [(8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (18, 19)], ACM: [(26, 28)]}

 

  如何在分布式的平台上实现这样的运算?显然这种实现是有内存共享的,如何做到消除这些共享?

  确实想不到,看来还要多看资料学习。但是可以肯定的一点是,AC自动机的结构一定不允许是这样的。

  由于Spark使用RDD对分布式内存抽象,支持的操作均是熟悉的函数式操作。所以考虑是不是可以把Trie“扁平化”成这样的结构?

  于是定义:

{ letter: [{next level..}, is_word depth] }

  代表一个节点,letter代表当前节点的字符,后接一个list,list第一个元素表示此节点的后继,后面跟着的是一些属性。这些属性是可以扩充的。稍微转换一下思路,就可以把这个字典树写出来:

 1 def build_trie():
 2   """
 3   build a empty trie
 4   :return:
 5   """
 6   return {}
 7 
 8 
 9 def insert(root, word, depth=1):
10   """
11   insert a word into the trie recursively.
12   :param root:
13   :param word:
14   :param depth:
15   :return:
16   """
17   letter = word[0]
18   if len(word) == 1:
19     if(root.has_key(letter)):
20       root[letter][1] = True
21       return root
22     else:
23       return {letter: [{}, True, depth]}
24   if root.has_key(letter) == False:
25     root[letter] = [{}, False, depth]
26     root[letter][0] = insert(root[letter][0], word[1:], depth+1)
27   else:
28     root[letter][0] = dict(root[letter][0], **insert(root[letter][0], word[1:], depth+1))
29   return root
30 
31 
32 def find(root, word):
33   """
34   find a word if it‘s in the trie or not.
35   :param root:
36   :param word:
37   :return:
38   """
39   letter = word[0]
40   if root.has_key(letter) == False:
41     return False
42   if len(word) == 1:
43     if root[letter][1] == True:
44       return True
45     else:
46       return False
47   return find(root[letter][0], word[1:])

  我考虑过字典树如何从一个分布式平台上构建,可以这样:

  通过平时对单词的搜集,可以在单机上生成一个又一个小的字典树,保存在分布式文件系统上,在使用的时候只需要把这一个个小字典树合并成一个就行了。

  所以这棵字典树仅仅这样是不够的,因为我希望这字典树支持这个合并的函数式操作,于是加上了归并的操作,这样这棵字典树变成了一棵可持久字典树:

 1 def merge(a_root, b_root):
 2   """
 3   merge two tries.
 4   :param a_root:
 5   :param b_root:
 6   :return:
 7   """
 8   def _merge(a_root, b_root):
 9     for letter in b_root.keys():
10       if a_root.has_key(letter):
11         if b_root[letter][1] == True:
12           a_root[letter][1] = True
13         a_root[letter][0] = dict(a_root[letter][0], **_merge(a_root[letter][0], b_root[letter][0]))
14       else:
15         a_root[letter] = copy.deepcopy(b_root[letter])
16     return a_root
17   a_root = _merge(a_root, b_root)
18   return a_root

  合并的操作思路很简单,就是同时深入,假如有一棵树上没有另一棵树上的儿子,整棵子树都拷贝过来即可(这也是用python的dict方便的地方)。

 

  测试了一下:

 1 if __name__ == __main__:
 2   trie_a = build_trie()
 3   trie_b = build_trie()
 4 
 5   trie_a = insert(trie_a, acm)
 6   trie_a = insert(trie_a, acr)
 7   trie_a = insert(trie_a, ak)
 8   trie_a = insert(trie_a, bc)
 9 
10   trie_b = insert(trie_b, ac)
11   trie_b = insert(trie_b, cm)
12   trie_b = insert(trie_b, br)
13 
14   s = [trie_a, trie_b]
15   s = reduce(merge, s)
16   print json.dumps(s)

  输出的结构dumps成json后格式化一下是这样的:

{
    "a": [
        {
            "c": [
                {
                    "r": [
                        {},
                        true,
                        3
                    ],
                    "m": [
                        {},
                        true,
                        3
                    ]
                },
                true,
                2
            ],
            "k": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ],
    "c": [
        {
            "m": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ],
    "b": [
        {
            "c": [
                {},
                true,
                2
            ],
            "r": [
                {},
                true,
                2
            ]
        },
        false,
        1
    ]
}

  这样一棵可持久化的字典树就实现了,接下来考虑将这段程序翻译成Spark的程序,并且生成一些字典数据,看看是否符合自己的预期。

[Python] Spark平台下实现分布式AC自动机(一)