mirror of
https://github.com/Sneed-Group/Poodletooth-iLand
synced 2025-01-03 17:11:22 -06:00
408 lines
12 KiB
Python
408 lines
12 KiB
Python
# Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
|
|
"""Managing greenlets in a group.
|
|
|
|
The :class:`Group` class in this module abstracts a group of running greenlets.
|
|
When a greenlet dies, it's automatically removed from the group.
|
|
|
|
The :class:`Pool` which a subclass of :class:`Group` provides a way to limit
|
|
concurrency: its :meth:`spawn <Pool.spawn>` method blocks if the number of
|
|
greenlets in the pool has already reached the limit, until there is a free slot.
|
|
"""
|
|
|
|
import sys
|
|
from bisect import insort_right
|
|
|
|
from gevent.hub import GreenletExit, getcurrent, kill as _kill, PY3
|
|
from gevent.greenlet import joinall, Greenlet
|
|
from gevent.timeout import Timeout
|
|
from gevent.event import Event
|
|
from gevent.lock import Semaphore, DummySemaphore
|
|
|
|
__all__ = ['Group', 'Pool']
|
|
|
|
|
|
class Group(object):
|
|
"""Maintain a group of greenlets that are still running.
|
|
|
|
Links to each item and removes it upon notification.
|
|
"""
|
|
greenlet_class = Greenlet
|
|
|
|
def __init__(self, *args):
|
|
assert len(args) <= 1, args
|
|
self.greenlets = set(*args)
|
|
if args:
|
|
for greenlet in args[0]:
|
|
greenlet.rawlink(self._discard)
|
|
# each item we kill we place in dying, to avoid killing the same greenlet twice
|
|
self.dying = set()
|
|
self._empty_event = Event()
|
|
self._empty_event.set()
|
|
|
|
def __repr__(self):
|
|
return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self.greenlets)
|
|
|
|
def __len__(self):
|
|
return len(self.greenlets)
|
|
|
|
def __contains__(self, item):
|
|
return item in self.greenlets
|
|
|
|
def __iter__(self):
|
|
return iter(self.greenlets)
|
|
|
|
def add(self, greenlet):
|
|
try:
|
|
rawlink = greenlet.rawlink
|
|
except AttributeError:
|
|
pass # non-Greenlet greenlet, like MAIN
|
|
else:
|
|
rawlink(self._discard)
|
|
self.greenlets.add(greenlet)
|
|
self._empty_event.clear()
|
|
|
|
def _discard(self, greenlet):
|
|
self.greenlets.discard(greenlet)
|
|
self.dying.discard(greenlet)
|
|
if not self.greenlets:
|
|
self._empty_event.set()
|
|
|
|
def discard(self, greenlet):
|
|
self._discard(greenlet)
|
|
try:
|
|
unlink = greenlet.unlink
|
|
except AttributeError:
|
|
pass # non-Greenlet greenlet, like MAIN
|
|
else:
|
|
unlink(self._discard)
|
|
|
|
def start(self, greenlet):
|
|
self.add(greenlet)
|
|
greenlet.start()
|
|
|
|
def spawn(self, *args, **kwargs):
|
|
greenlet = self.greenlet_class(*args, **kwargs)
|
|
self.start(greenlet)
|
|
return greenlet
|
|
|
|
# def close(self):
|
|
# """Prevents any more tasks from being submitted to the pool"""
|
|
# self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
|
|
|
|
def join(self, timeout=None, raise_error=False):
|
|
if raise_error:
|
|
greenlets = self.greenlets.copy()
|
|
self._empty_event.wait(timeout=timeout)
|
|
for greenlet in greenlets:
|
|
if greenlet.exception is not None:
|
|
raise greenlet.exception
|
|
else:
|
|
self._empty_event.wait(timeout=timeout)
|
|
|
|
def kill(self, exception=GreenletExit, block=True, timeout=None):
|
|
timer = Timeout.start_new(timeout)
|
|
try:
|
|
try:
|
|
while self.greenlets:
|
|
for greenlet in list(self.greenlets):
|
|
if greenlet not in self.dying:
|
|
try:
|
|
kill = greenlet.kill
|
|
except AttributeError:
|
|
_kill(greenlet, exception)
|
|
else:
|
|
kill(exception, block=False)
|
|
self.dying.add(greenlet)
|
|
if not block:
|
|
break
|
|
joinall(self.greenlets)
|
|
except Timeout:
|
|
ex = sys.exc_info()[1]
|
|
if ex is not timer:
|
|
raise
|
|
finally:
|
|
timer.cancel()
|
|
|
|
def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
|
|
if greenlet not in self.dying and greenlet in self.greenlets:
|
|
greenlet.kill(exception, block=False)
|
|
self.dying.add(greenlet)
|
|
if block:
|
|
greenlet.join(timeout)
|
|
|
|
def apply(self, func, args=None, kwds=None):
|
|
"""Equivalent of the apply() builtin function. It blocks till the result is ready."""
|
|
if args is None:
|
|
args = ()
|
|
if kwds is None:
|
|
kwds = {}
|
|
if getcurrent() in self:
|
|
return func(*args, **kwds)
|
|
else:
|
|
return self.spawn(func, *args, **kwds).get()
|
|
|
|
def apply_cb(self, func, args=None, kwds=None, callback=None):
|
|
result = self.apply(func, args, kwds)
|
|
if callback is not None:
|
|
Greenlet.spawn(callback, result)
|
|
return result
|
|
|
|
def apply_async(self, func, args=None, kwds=None, callback=None):
|
|
"""A variant of the apply() method which returns a Greenlet object.
|
|
|
|
If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready
|
|
callback is applied to it (unless the call failed)."""
|
|
if args is None:
|
|
args = ()
|
|
if kwds is None:
|
|
kwds = {}
|
|
if self.full():
|
|
# cannot call spawn() directly because it will block
|
|
return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
|
|
else:
|
|
greenlet = self.spawn(func, *args, **kwds)
|
|
if callback is not None:
|
|
greenlet.link(pass_value(callback))
|
|
return greenlet
|
|
|
|
def map(self, func, iterable):
|
|
return list(self.imap(func, iterable))
|
|
|
|
def map_cb(self, func, iterable, callback=None):
|
|
result = self.map(func, iterable)
|
|
if callback is not None:
|
|
callback(result)
|
|
return result
|
|
|
|
def map_async(self, func, iterable, callback=None):
|
|
"""
|
|
A variant of the map() method which returns a Greenlet object.
|
|
|
|
If callback is specified then it should be a callable which accepts a
|
|
single argument.
|
|
"""
|
|
return Greenlet.spawn(self.map_cb, func, iterable, callback)
|
|
|
|
def imap(self, func, iterable):
|
|
"""An equivalent of itertools.imap()"""
|
|
return IMap.spawn(func, iterable, spawn=self.spawn)
|
|
|
|
def imap_unordered(self, func, iterable):
|
|
"""The same as imap() except that the ordering of the results from the
|
|
returned iterator should be considered in arbitrary order."""
|
|
return IMapUnordered.spawn(func, iterable, spawn=self.spawn)
|
|
|
|
def full(self):
|
|
return False
|
|
|
|
def wait_available(self):
|
|
pass
|
|
|
|
|
|
class IMapUnordered(Greenlet):
|
|
|
|
def __init__(self, func, iterable, spawn=None):
|
|
from gevent.queue import Queue
|
|
Greenlet.__init__(self)
|
|
if spawn is not None:
|
|
self.spawn = spawn
|
|
self.func = func
|
|
self.iterable = iterable
|
|
self.queue = Queue()
|
|
self.count = 0
|
|
self.finished = False
|
|
self.rawlink(self._on_finish)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
value = self.queue.get()
|
|
if isinstance(value, Failure):
|
|
raise value.exc
|
|
return value
|
|
|
|
if PY3:
|
|
__next__ = next
|
|
del next
|
|
|
|
def _run(self):
|
|
try:
|
|
func = self.func
|
|
for item in self.iterable:
|
|
self.count += 1
|
|
self.spawn(func, item).rawlink(self._on_result)
|
|
finally:
|
|
self.__dict__.pop('spawn', None)
|
|
self.__dict__.pop('func', None)
|
|
self.__dict__.pop('iterable', None)
|
|
|
|
def _on_result(self, greenlet):
|
|
self.count -= 1
|
|
if greenlet.successful():
|
|
self.queue.put(greenlet.value)
|
|
else:
|
|
self.queue.put(Failure(greenlet.exception))
|
|
if self.ready() and self.count <= 0 and not self.finished:
|
|
self.queue.put(Failure(StopIteration))
|
|
self.finished = True
|
|
|
|
def _on_finish(self, _self):
|
|
if self.finished:
|
|
return
|
|
if not self.successful():
|
|
self.queue.put(Failure(self.exception))
|
|
self.finished = True
|
|
return
|
|
if self.count <= 0:
|
|
self.queue.put(Failure(StopIteration))
|
|
self.finished = True
|
|
|
|
|
|
class IMap(Greenlet):
|
|
|
|
def __init__(self, func, iterable, spawn=None):
|
|
from gevent.queue import Queue
|
|
Greenlet.__init__(self)
|
|
if spawn is not None:
|
|
self.spawn = spawn
|
|
self.func = func
|
|
self.iterable = iterable
|
|
self.queue = Queue()
|
|
self.count = 0
|
|
self.waiting = [] # QQQ maybe deque will work faster there?
|
|
self.index = 0
|
|
self.maxindex = -1
|
|
self.finished = False
|
|
self.rawlink(self._on_finish)
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
while True:
|
|
if self.waiting and self.waiting[0][0] <= self.index:
|
|
index, value = self.waiting.pop(0)
|
|
else:
|
|
index, value = self.queue.get()
|
|
if index > self.index:
|
|
insort_right(self.waiting, (index, value))
|
|
continue
|
|
self.index += 1
|
|
if isinstance(value, Failure):
|
|
raise value.exc
|
|
return value
|
|
|
|
if PY3:
|
|
__next__ = next
|
|
del next
|
|
|
|
def _run(self):
|
|
try:
|
|
func = self.func
|
|
for item in self.iterable:
|
|
self.count += 1
|
|
g = self.spawn(func, item)
|
|
g.rawlink(self._on_result)
|
|
self.maxindex += 1
|
|
g.index = self.maxindex
|
|
finally:
|
|
self.__dict__.pop('spawn', None)
|
|
self.__dict__.pop('func', None)
|
|
self.__dict__.pop('iterable', None)
|
|
|
|
def _on_result(self, greenlet):
|
|
self.count -= 1
|
|
if greenlet.successful():
|
|
self.queue.put((greenlet.index, greenlet.value))
|
|
else:
|
|
self.queue.put((greenlet.index, Failure(greenlet.exception)))
|
|
if self.ready() and self.count <= 0 and not self.finished:
|
|
self.maxindex += 1
|
|
self.queue.put((self.maxindex, Failure(StopIteration)))
|
|
self.finished = True
|
|
|
|
def _on_finish(self, _self):
|
|
if self.finished:
|
|
return
|
|
if not self.successful():
|
|
self.maxindex += 1
|
|
self.queue.put((self.maxindex, Failure(self.exception)))
|
|
self.finished = True
|
|
return
|
|
if self.count <= 0:
|
|
self.maxindex += 1
|
|
self.queue.put((self.maxindex, Failure(StopIteration)))
|
|
self.finished = True
|
|
|
|
|
|
class Failure(object):
|
|
__slots__ = ['exc']
|
|
|
|
def __init__(self, exc):
|
|
self.exc = exc
|
|
|
|
|
|
class Pool(Group):
|
|
|
|
def __init__(self, size=None, greenlet_class=None):
|
|
if size is not None and size < 0:
|
|
raise ValueError('size must not be negative: %r' % (size, ))
|
|
Group.__init__(self)
|
|
self.size = size
|
|
if greenlet_class is not None:
|
|
self.greenlet_class = greenlet_class
|
|
if size is None:
|
|
self._semaphore = DummySemaphore()
|
|
else:
|
|
self._semaphore = Semaphore(size)
|
|
|
|
def wait_available(self):
|
|
self._semaphore.wait()
|
|
|
|
def full(self):
|
|
return self.free_count() <= 0
|
|
|
|
def free_count(self):
|
|
if self.size is None:
|
|
return 1
|
|
return max(0, self.size - len(self))
|
|
|
|
def add(self, greenlet):
|
|
self._semaphore.acquire()
|
|
try:
|
|
Group.add(self, greenlet)
|
|
except:
|
|
self._semaphore.release()
|
|
raise
|
|
|
|
def _discard(self, greenlet):
|
|
Group._discard(self, greenlet)
|
|
self._semaphore.release()
|
|
|
|
|
|
class pass_value(object):
|
|
__slots__ = ['callback']
|
|
|
|
def __init__(self, callback):
|
|
self.callback = callback
|
|
|
|
def __call__(self, source):
|
|
if source.successful():
|
|
self.callback(source.value)
|
|
|
|
def __hash__(self):
|
|
return hash(self.callback)
|
|
|
|
def __eq__(self, other):
|
|
return self.callback == getattr(other, 'callback', other)
|
|
|
|
def __str__(self):
|
|
return str(self.callback)
|
|
|
|
def __repr__(self):
|
|
return repr(self.callback)
|
|
|
|
def __getattr__(self, item):
|
|
assert item != 'callback'
|
|
return getattr(self.callback, item)
|