Source code for noob.network.loop
import asyncio
import threading
try:
import zmq
from tornado.ioloop import IOLoop
except ImportError as e:
raise ImportError(
"Attempted to import zmq runner, but zmq deps are not installed. install with `noob[zmq]`",
) from e
[docs]
class EventloopMixin:
"""
Provide an eventloop in a separate thread to an inheriting class.
Any eventloop that is running in the current context is not used
because the inheriting classes are presumed to operate mostly synchronously for now,
pending a refactor to all async networking classes.
"""
def __init__(self):
self._context = None
self._loop = None
self._quitting = asyncio.Event()
self._thread: threading.Thread | None = None
@property
def context(self) -> zmq.Context:
if self._context is None:
self._context = zmq.Context.instance()
return self._context
@property
def loop(self) -> IOLoop:
# To ensure that the loop is always created in the spawned thread,
# we don't create it here (since this property could be accessed elsewhere)
# and throw to protect that.
if self._loop is None:
raise RuntimeError("Loop is not running")
return self._loop
[docs]
def start_loop(self) -> None:
if self._thread is not None:
raise RuntimeWarning("Node already started")
self._quitting.clear()
_ready = threading.Event()
def _signal_ready() -> None:
_ready.set()
def _run() -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loop = IOLoop.current()
if hasattr(self, "logger"):
self.logger.debug("Starting eventloop")
while not self._quitting.is_set():
try:
self.loop.add_callback(_signal_ready)
self.loop.start()
except RuntimeError:
# loop already started
if hasattr(self, "logger"):
self.logger.debug("Eventloop already started, quitting")
break
if hasattr(self, "logger"):
self.logger.debug("Eventloop stopped")
self._thread = None
self._thread = threading.Thread(target=_run)
self._thread.start()
# wait until the loop has started
_ready.wait(5)
if hasattr(self, "logger"):
self.logger.debug("Event loop started")
[docs]
def stop_loop(self) -> None:
if self._thread is None:
return
self._quitting.set()
self.loop.add_callback(self.loop.stop)