- Webcam (outputs image)
- Navigation (inputs a map and a target, outputs a path)
- Detection (takes an image and a vision model like YOLO, outputs a stream of detections)
Prerequisite: Blueprint visualization (both SVG export and the Rerun Graph tab) requires Graphviz:
skip output=assets/go2_nav.svg
Camera Module
Let’s learn how to build stuff like the above, starting with a simple camera module.skip session=camera_module_demo output=assets/camera_module.svg
.io() call. We will do this from now on.
session=camera_module_demo ansi=false
color_imagewith sensor_msgs.Image typecamera_infowith sensor_msgs.CameraInfo type
start() and stop() (lifecycle methods).
It also exposes an agentic skill called take_a_picture (more on skills in the Blueprints guide).
We can start this module and explore the output of its streams in real time (this will use your webcam).
skip session=camera_module_demo ansi=false
Connecting modules
Let’s load a standard 2D detector module and hook it up to a camera.skip ansi=false session=detection_module
skip ansi=false
Distributed Execution
As we build module structures, we’ll quickly want to utilize all cores on the machine (which Python doesn’t allow as a single process) and potentially distribute modules across machines or even the internet. For this, we usedimos.core and DimOS transport protocols.
Defining message exchange protocols and message types also gives us the ability to write models in faster languages.
Dedicated workers
By default the coordinator assigns modules to worker processes by least-load, so multiple modules share a worker. Heavy modules (robot connections, voxel mappers) should run alone so they don’t contend with anything else for CPU or the GIL. Setdedicated_worker = True on the class and the coordinator will give that module a worker process to itself.
Sync input handlers
If you don’t need an asyncio loop, subscribe to yourIn[T] streams from start() and register the unsubscribe with register_disposable so cleanup happens automatically at stop().
In.subscribe(cb) returns an unsubscribe function, not a DisposableBase. Wrap it in Disposable(...) so register_disposable can dispose it on stop(). Without this, your handler keeps running after stop() and tests will fail thread-leak checks.
The callback runs on whatever thread emits the message, so guard mutable state with a lock if multiple inputs share it.
Triggering side effects via Specs
A common pattern is “subscribe to a stream, react by calling another module”. Declare the other module’s protocol as aSpec field (single-underscore, private). The coordinator binds the proxy at deploy time, so handlers can call it directly with no extra wiring:
@rpc signatures (sync/async are interchangeable — see Async modules).
To deploy Watchdog, add Watchdog.blueprint() to an existing blueprint’s autoconnect(...) chain. The coordinator matches Out[T] to In[T] by name across the union of modules, and resolves _notifier: NotifierSpec to whichever module in the blueprint implements notify. No manual wiring required.
Testing modules
Mock spec dependencies (anything typed: SomeSpec) after construction, since the framework normally wires them at deploy time:
skip
m.stop() in teardown matters. The test session-wide thread-leak detector will fail the test otherwise, even if your test body never started any threads.
Restarting a module
While iterating on a module it’s often convenient to edit its source file and pick up the changes without tearing down the whole coordinator. Therestart_module call stops a single deployed module, reloads its source
via importlib.reload, then redeploys it onto a fresh worker process while
keeping its stream transports and reconnecting any other modules that held
a reference to it.
skip
Async modules (lock-free state)
Modules contain a per-instance asyncio loop on a daemon thread (self._loop). It is possible to write modules using only async def methods so that everything runs on the same thread and you don’t need to use locks. The module’s auto-bound input handlers, async @rpc methods, and process_observable callbacks all run on self._loop, and each handler subscription is serialized through a dedicated dispatcher task.
Auto-bound input handlers
For every declaredx: In[T], if the module defines async def handle_x(self, msg: T), the handler is automatically subscribed at start() and dispatched onto self._loop. Subscriptions are cleaned up at stop().
self._loop. Handlers are serialized: only one invocation of handle_x runs at a time. If messages arrive faster than the handler can process them, intermediate messages are dropped — only the most recent unprocessed message is kept (LATEST policy). The handler is guaranteed to eventually run with the most recently published value.
Async @rpc methods
@rpc works on both sync and async def methods. When applied to an async method, the call site dispatches automatically:
- From another thread (the RPC dispatcher, sync test code, a sync
@rpcon the same module), the call blocks until the coroutine completes onself._loop. - From inside the loop (another async
@rpc, ahandle_*, or aprocess_observablecallback), it returns the coroutine so the caller canawaitit.
@rpc methods are interchangeable for cross-module linking. Both are discovered via Module.rpcs and served through the same RPC machinery. A module ref or RPC client doesn’t care whether the underlying method is sync or async.
When the consumer types a module ref using a Spec that declares async def, the proxy automatically exposes those methods as awaitables: await self._name_module.say_hello(name).
NameModule is async. But if you need to call it from a sync module, you just need to create a SyncNameSpec:
NameModule. You can call it synchronously from your module, but it will run in the self._loop async loop in the NameModule module.
The reverse is also true: you can call a sync module from async code.
spawn: schedule a long-running coroutine from sync code
When you need to start a long-running async task from start() (e.g., a timer loop), use self.spawn(coro) instead of asyncio.run_coroutine_threadsafe(coro, self._loop). The helper wires up a done-callback that surfaces unhandled exceptions to the module logger. bare run_coroutine_threadsafe silently stores the exception on the returned Future, where it disappears unless the user remembers to read .result().
process_observable: async subscriptions to arbitrary observables
Sometimes you have rxpy observables which you need to run inside self._loop. You can do this with self.process_observable(observable, async_handler) .
skip
main(): combined setup/teardown
When a module owns a resource that needs construction at startup and explicit cleanup at shutdown, define async def main(self) as an async generator with exactly one yield. Code before yield runs at start(), code after yield runs at stop().
__init__ / start() / stop(), main() keeps the construction-and-destruction of each resource visually adjacent.
Blueprints
A blueprint is a predefined structure of interconnected modules. You can include blueprints or modules in new blueprints. A basic Unitree Go2 blueprint looks like what we saw before.skip session=blueprints output=assets/go2_agentic.svg
