首页 > 代码库 > python coroutine测试
python coroutine测试
目的:实现一个类似于asyn await的用法,来方便的编写callback相关函数
from __future__ import print_function
import time
import threading
def asyn_task(callback):
def SleepTask():
time.sleep(1)
callback("world")
# t = threading.Timer(1.0, lambda :callback("world"))
t = threading.Thread(target=SleepTask)
t.start() # after 30 seconds, "hello, world" will be printed
def main():
asyn_task(lambda s:print(s))
print ("hello")
def main2():
class myCoroutine:
def __init__(self):
self.evt = threading.Event()
self.co=self.myCOroutine()
self.co.next()
def myCOroutine(self):
while 1:
self.evt.clear()
line = (yield)
# print (‘yield:‘ + line)
self.line = line
self.evt.set()
def await(self):
self.evt.wait()
return self.line
def send(self, *args):
self.co.send(*args)
g = myCoroutine()
asyn_task(lambda s:g.send(s))
print ("hello")
s = g.await()
print (s)
main2()
python coroutine测试
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。