组和池
一个组是一组正在运行的 greenlet 的集合,它们作为组一起管理和调度。它还兼作反映 Pythonmultiprocessing库的并行调度程序。
import gevent
from gevent.pool import Group
def talk(msg):
for i in range(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
'''
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz
Process finished with exit code 0
'''
这对于管理异步任务组非常有用。
如上所述,Group还提供了一个 API,用于将作业分派到分组的 greenlet 并以各种方式收集其结果。
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, range(3))
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, range(3)):
print(i)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):
print(i)
'''
Size of group 3
Hello from Greenlet 40757720
Size of group 3
Hello from Greenlet 40757992
Size of group 3
Hello from Greenlet 40758264
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
Process finished with exit code 0
'''
通常在构建 gevent 驱动的服务时,
人们会将整个服务集中在一个池结构周围。一个例子可能是一个轮询各种套接字的类。
from gevent.pool import Pool
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()