X-Git-Url: http://dolda2000.com/gitweb/?a=blobdiff_plain;f=python3%2Fashd%2Fasync.py;h=99da89af0d7bdc6e656ec968412b7b8b5dcfcf0f;hb=407963f25c664cd1450ec5f6eeb80c449ff57e74;hp=02c75a92a9175a61e3f257e05c866be93e305631;hpb=940f9c467d2f2c923468109c529c4902d95f5b9c;p=ashd.git diff --git a/python3/ashd/async.py b/python3/ashd/async.py index 02c75a9..99da89a 100644 --- a/python3/ashd/async.py +++ b/python3/ashd/async.py @@ -3,11 +3,15 @@ import sys, os, errno, threading, select, traceback class epoller(object): exc_handler = None - def __init__(self): + def __init__(self, check=None): self.registered = {} self.lock = threading.RLock() self.ep = None self.th = None + self.stopped = False + self.loopcheck = set() + if check is not None: + self.loopcheck.add(check) self._daemon = True @staticmethod @@ -25,7 +29,7 @@ class epoller(object): def exception(self, ch, *exc): self.remove(ch) if self.exc_handler is None: - traceback.print_exception(exc) + traceback.print_exception(*exc) else: self.exc_handler(ch, *exc) @@ -38,6 +42,14 @@ class epoller(object): except Exception as exc: self.exception(ch, *sys.exc_info()) + def _closeall(self): + with self.lock: + while self.registered: + fd, (ch, evs) = next(iter(self.registered.items())) + del self.registered[fd] + self.ep.unregister(fd) + self._cb(ch, "close") + def _run(self): ep = select.epoll() try: @@ -47,6 +59,11 @@ class epoller(object): self.ep = ep while self.registered: + for ck in self.loopcheck: + ck(self) + if self.stopped: + self._closeall() + break try: evlist = ep.poll(10) except IOError as exc: @@ -139,15 +156,39 @@ class epoller(object): if self.ep: self.ep.modify(fd, evs) + def stop(self): + if threading.current_thread() == self.th: + self.stopped = True + else: + def tgt(): + self.stopped = True + cb = callbuffer() + cb.call(tgt) + cb.stop() + self.add(cb) + def watcher(): return epoller() -class sockbuffer(object): - def __init__(self, sk): - self.sk = sk +class channel(object): + readable = False + writable = False + + def __init__(self): + self.watcher = None + + def fileno(self): + raise NotImplementedError("fileno()") + + def close(self): + pass + +class sockbuffer(channel): + def __init__(self, socket, **kwargs): + super().__init__(**kwargs) + self.sk = socket self.eof = False self.obuf = bytearray() - self.watcher = None def fileno(self): return self.sk.fileno() @@ -188,8 +229,9 @@ class sockbuffer(object): self.obuf[:] = b"" self.eof = True -class callbuffer(object): - def __init__(self): +class callbuffer(channel): + def __init__(self, **kwargs): + super().__init__(**kwargs) self.queue = [] self.rp, self.wp = os.pipe() self.lock = threading.Lock() @@ -239,3 +281,9 @@ class callbuffer(object): if self.wp >= 0: os.close(self.wp) self.wp = -1 + +def currentwatcher(io, current): + def check(io): + if not current: + io.stop() + io.loopcheck.add(check)