zmq¶
Central command pub/sub
each sub-runner has its own set of sockets for publishing and consuming events
use the node_id.signal etc. as basically a feed address
Todo
Currently only IPC is supported, and thus the zmq runner can’t run across machines. Supporting TCP is WIP, it will require some degree of authentication among nodes to prevent arbitrary code execution, since we shouldn’t count on users to properly firewall their runners.
Todo
The socket spawning and event handling is awfully manual here. Leaving it as is because it’s somewhat unlikely we’ll need to generalize it, but otherwise it would be great to standardize socket names and have event handler decorators like:
@on_router(MessageType.sometype)
- class CommandNode(runner_id: str, protocol: str = 'ipc', port: int | None = None)[source]¶
Pub node that controls the state of the other nodes/announces addresses
one PUB socket to distribute commands
one ROUTER socket to receive return messages from runner nodes
one SUB socket to subscribe to all events
The wrapping runner should register callbacks with add_callback to handle incoming messages.
- process(epoch: int, input: dict | None = None) None[source]¶
Emit a ProcessMsg to process a single round through the graph
- add_callback(type_: Literal['inbox', 'router'], cb: Callable[[Message], Any]) None[source]¶
Add a callback called for message received - by the inbox: the subscriber that receives all events from node runners - by the router: direct messages sent by node runners to the command node
- await_ready(node_ids: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]], timeout: float = 10) None[source]¶
Wait until all the node_ids have announced themselves
- on_identify(msg: IdentifyMsg) None[source]¶
- class NodeRunner(spec: NodeSpecification, runner_id: str, command_outbox: str, command_router: str, input_collection: InputCollection, protocol: str = 'ipc')[source]¶
Runner for a single node
DEALER to communicate with command inbox
PUB (outbox) to publish events
SUB (inbox) to subscribe to events from other nodes.
- property depends: tuple[tuple[str, str], ...] | None¶
(node, signal) tuples of the wrapped node’s dependencies
- property status: NodeStatus¶
- classmethod run(spec: NodeSpecification, **kwargs: Any) None[source]¶
Target for multiprocessing.run, init the class and start it!
- update_status(status: NodeStatus) None[source]¶
Update our internal status and announce it to the command node
- on_announce(msg: AnnounceMsg) None[source]¶
Store map, connect to the nodes we depend on
- on_process(msg: ProcessMsg) None[source]¶
Process a single graph iteration
- on_deinit(msg: DeinitMsg) None[source]¶
Deinitialize the node, close networking thread.
Cause the main loop to end, which calls deinit
- error(err: Exception) None[source]¶
Capture the error and traceback context from an exception using
traceback.TracebackExceptionand send to command node to re-raise
- class ZMQRunner(tube: Tube, store: ~noob.store.EventStore = <factory>, max_iter_loops: int = 100, _callbacks: list[Callable[[Event | MetaEvent], None]] = <factory>, _logger: Logger = None, _runner_id: str | None = None, node_procs: dict[~typing.Annotated[str, ~pydantic.functional_validators.AfterValidator(func=~noob.types._is_identifier), ~pydantic.functional_validators.AfterValidator(func=~noob.types._not_reserved)], ~multiprocessing.Process] = <factory>, command: ~noob.runner.zmq.CommandNode | None = None, quit_timeout: float = 10, autoclear_store: bool = True, _initialized: ~multiprocessing.synchronize.Event = <factory>, _running: ~multiprocessing.synchronize.Event = <factory>, _init_lock: ~threading.RLock = <factory>, _running_lock: ~_thread.lock = <factory>, _ignore_events: bool = False, _return_node: ~noob.node.return_.Return | None = None, _to_throw: ~noob.network.message.ErrorValue | None = None, _current_epoch: int = 0)[source]¶
A concurrent runner that uses zmq to broker events between nodes running in separate processes
- node_procs: dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Process]¶
- command: CommandNode | None = None¶
- quit_timeout: float = 10¶
time in seconds to wait after calling deinit to wait before killing runner processes
- store: EventStore¶
- autoclear_store: bool = True¶
If
True(default), clear the event store after events are processed and returned. IfFalse, don’t clear events from the event store
- iter(n: int | None = None) Generator[None | dict[str, Any] | Any, None, None][source]¶
Iterate over results as they are available.
Tube runs in free-run mode for n iterations, This method is usually only useful for tubes with
Returnnodes. This method yields only when return is available: the tube will run more than nprocesscalls if there are e.g. gather nodes that cause the return value to be empty.To call the tube a specific number of times and do something with the events other than returning a value, use callbacks and
run()!Note that backpressure control is not yet implemented!!! If the outer iter method is slow, or there is a bottleneck in your tube, you might incur some serious memory usage! Backpressure and observability is a WIP!
If you need a version of this method that always makes a fixed number of process calls, raise an issue!
- run(n: int) list[None | dict[str, Any] | Any][source]¶
- run(n: None = None) None
Run the tube in freerun mode - every node runs as soon as its dependencies are satisfied, not waiting for epochs to complete before starting the next epoch.
Blocks when
nis not None - This is for consistency with the synchronous/asyncio runners, but may change in the future.If
nis None, does not block. stop processing by callingstop()or deinitializing (exiting the contextmanager, or callingdeinit())