Messaging¶
bocpy includes an Erlang-style message-passing subsystem built on top of lock-free multi-producer single-consumer (MPSC) ring buffers implemented in C. Messages can be sent from any thread or sub-interpreter and received by any other — they are the primary mechanism for communication that does not require shared ownership of a cown.
Note
Messaging is a lower-level facility than @when / Cown.
Most programs should model coordination through cowns and behaviors; reach
for send / receive when you need a channel-like pattern (producer–
consumer queues, heartbeat loops, event buses) or need to communicate with
code running outside the behavior runtime (plain threads, the main thread
before wait()).
Concepts¶
Selective Receive¶
receive() blocks the calling thread until a message with a matching tag
arrives. You can pass a single tag or a sequence of tags to listen on multiple
channels simultaneously:
# Wait for whichever arrives first
msg = receive(["order-ready", "order-cancelled"])
The return value is a two-element list [tag, contents].
Timeouts and the after Callback¶
receive() accepts an optional timeout (in seconds). When the timeout
fires:
If an
aftercallback is provided, the runtime callsafter()and returns its result directly as the value ofreceive(...). By convention the callback returns a(tag, contents)tuple so the caller can pattern-match it the same way as a normal message, but the runtime itself does not interpret the value — nothing is enqueued and no other receiver sees it.If no
afteris provided,receivereturns(TIMEOUT, None).TIMEOUTis the tag slot of the synthetic two-element result; comparemsg[0]against it.
from bocpy import receive, TIMEOUT
msg = receive("heartbeat", timeout=2.0)
if msg[0] == TIMEOUT:
print("No heartbeat in 2 seconds")
With an after callback:
def after():
return "heartbeat", "self-tick"
msg = receive("heartbeat", timeout=1.0, after=after)
# msg == ("heartbeat", "self-tick") if nothing arrived in 1 s
The after function may return any value; the (tag, contents)
shape is purely a convention so the caller can pattern-match it
uniformly with normal receive results.
Worked Example: Calculator Service¶
The following example (adapted from
examples/calculator.py)
demonstrates a concurrent calculator service. Multiple client threads send
arithmetic operations to a server thread via the "calculator" tag. The
server uses selective receive with a timeout to detect when clients have gone
silent.
"""Concurrent calculator using message-passing channels."""
import random
from threading import Thread
import time
from bocpy import receive, send
def client(num_operations: int):
"""Send random arithmetic operations to the calculator channel."""
actions = ["+", "-", "/", "*"]
for _ in range(num_operations):
time.sleep(random.random() * 0.1)
action = random.choice(actions)
value = random.random() * 10 - 5
send("calculator", (action, value))
def server(timeout):
"""Receive and process arithmetic operations until stopped."""
value = 0
num_operations = 0
running = True
def after():
return "calculator", ("print", True)
while running:
match receive("calculator", timeout, after):
case [_, ("+", x)]:
num_operations += 1
value += x
case [_, ("-", x)]:
num_operations += 1
value -= x
case [_, ("*", x)]:
num_operations += 1
value *= x
case [_, ("/", x)]:
num_operations += 1
value /= x
case [_, ("print", _)]:
print("Total operations:", num_operations)
print("Final value:", value)
running = False
# Start the server with a 2-second idle timeout
server_thread = Thread(target=server, args=(2.0,))
server_thread.start()
# Spawn 4 clients, each sending 5 operations
clients = [Thread(target=client, args=(5,)) for _ in range(4)]
for c in clients:
c.start()
for c in clients:
c.join()
# Once clients finish, send a shutdown signal (or let the timeout fire)
send("calculator", ("print", False))
server_thread.join()
Key observations:
send("calculator", ...)is non-blocking and thread-safe — all four clients fire concurrently.The server’s
matchonreceive(...)is a selective receive: it pattern-matches on the message contents, not just the tag.The
aftercallback fires when no message arrives withintimeoutseconds, causing the server to print results and exit gracefully.No locks, no shared mutable state — the only coordination is the message queue.
Draining Queues¶
drain() discards all pending messages for one or more tags:
from bocpy import drain
drain("calculator") # clear one tag
drain(["orders", "events"]) # clear multiple tags
This is useful for cleanup between test runs or when resetting a subsystem.
Warning
If new messages are arriving faster than they can be drained, drain
may not return promptly.
Sending Custom Types Across Sub-interpreters¶
Messages cross sub-interpreter boundaries through CPython’s cross-interpreter data (XIData) machinery, with a pickle fallback when no XIData handler is registered for the payload’s type. The runtime makes no attempt to ship class definitions along with the message — the receiver must already be able to resolve the type by its fully qualified name.
In practice this means:
Builtins and stdlib containers just work. Numbers, strings, bytes,
tuple,list,dict,set,frozensetand similar types either have a native XIData handler or pickle cleanly to types every interpreter already knows about.C extension types can register a custom XIData handler to transfer ownership directly without going through pickle.
CownandMatrixuse this path; see C ABI for how to expose your own type through the same mechanism.Pure-Python custom classes fall back to pickle. Unpickling only succeeds if the receiving interpreter can already import the class by its fully qualified name. If a worker has never executed
import my_pkg.my_module, then receiving an instance ofmy_pkg.my_module.MyClasswill fail with aModuleNotFoundErrororAttributeErrorraised from insidereceive.Closures, lambdas, and locally-defined classes cannot be sent at all — the pickle fallback cannot resolve them by qualified name from any interpreter, and they have no XIData handler.
Inside @when behaviors the transpiler handles the
import-side of this automatically: it rewrites the decorated module so
each worker imports the same set of names the caller had in scope, and
any class referenced by a behavior is therefore resolvable on the worker
side. When you use send / receive from a plain thread, a
sub-interpreter spawned outside the behavior runtime, or from inside a
behavior body but with a type that was not part of the captured
environment, you are responsible for ensuring the class is importable
on the receiver (or for registering an XIData handler that bypasses
pickle entirely).
The simplest way to satisfy the pickle path is to define message payload
types at module scope in a module that every participating interpreter
imports at startup — for example, a shared messages.py that the main
program, the worker bootstrap, and any auxiliary threads all import
before the first send.
API Reference¶
- bocpy.send(tag, contents, /)
Sends a message.
- bocpy.receive(tags, /, timeout=-1, after=None)
Receives a message.
- bocpy.set_tags(tags, /)
Assigns tags to message queues.
- bocpy.drain(tags, /)
Drains all messages for the given tags.
- bocpy.TIMEOUT = '__timeout__'
str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.