Messaging

bocpy includes an Erlang-style message-passing subsystem built on top of lock-free multi-producer single-consumer (MPSC) ring buffers implemented in C. Messages can be sent from any thread or sub-interpreter and received by any other — they are the primary mechanism for communication that does not require shared ownership of a cown.

Note

Messaging is a lower-level facility than @when / Cown. Most programs should model coordination through cowns and behaviors; reach for send / receive when you need a channel-like pattern (producer– consumer queues, heartbeat loops, event buses) or need to communicate with code running outside the behavior runtime (plain threads, the main thread before wait()).

Concepts

Tags

Every message carries a tag — a short string label (max 63 UTF-8 bytes) that acts as a routing key. The runtime maintains 16 internal queues; each tag is assigned to the first free slot the first time it is used. Receivers specify one or more tags and only dequeue messages whose tag matches.

There is no declaration step: the first send("my-tag", ...) auto-assigns the tag to a queue. If you want deterministic queue assignment (useful for benchmarks or when you need to isolate traffic), call set_tags() before any sends.

Selective Receive

receive() blocks the calling thread until a message with a matching tag arrives. You can pass a single tag or a sequence of tags to listen on multiple channels simultaneously:

# Wait for whichever arrives first
msg = receive(["order-ready", "order-cancelled"])

The return value is a two-element list [tag, contents].

Timeouts and the after Callback

receive() accepts an optional timeout (in seconds). When the timeout fires:

  • If an after callback is provided, the runtime calls after() and returns its result directly as the value of receive(...). By convention the callback returns a (tag, contents) tuple so the caller can pattern-match it the same way as a normal message, but the runtime itself does not interpret the value — nothing is enqueued and no other receiver sees it.

  • If no after is provided, receive returns (TIMEOUT, None). TIMEOUT is the tag slot of the synthetic two-element result; compare msg[0] against it.

from bocpy import receive, TIMEOUT

msg = receive("heartbeat", timeout=2.0)
if msg[0] == TIMEOUT:
    print("No heartbeat in 2 seconds")

With an after callback:

def after():
    return "heartbeat", "self-tick"

msg = receive("heartbeat", timeout=1.0, after=after)
# msg == ("heartbeat", "self-tick") if nothing arrived in 1 s

The after function may return any value; the (tag, contents) shape is purely a convention so the caller can pattern-match it uniformly with normal receive results.

Worked Example: Calculator Service

The following example (adapted from examples/calculator.py) demonstrates a concurrent calculator service. Multiple client threads send arithmetic operations to a server thread via the "calculator" tag. The server uses selective receive with a timeout to detect when clients have gone silent.

"""Concurrent calculator using message-passing channels."""

import random
from threading import Thread
import time

from bocpy import receive, send


def client(num_operations: int):
    """Send random arithmetic operations to the calculator channel."""
    actions = ["+", "-", "/", "*"]
    for _ in range(num_operations):
        time.sleep(random.random() * 0.1)
        action = random.choice(actions)
        value = random.random() * 10 - 5
        send("calculator", (action, value))


def server(timeout):
    """Receive and process arithmetic operations until stopped."""
    value = 0
    num_operations = 0
    running = True

    def after():
        return "calculator", ("print", True)

    while running:
        match receive("calculator", timeout, after):
            case [_, ("+", x)]:
                num_operations += 1
                value += x

            case [_, ("-", x)]:
                num_operations += 1
                value -= x

            case [_, ("*", x)]:
                num_operations += 1
                value *= x

            case [_, ("/", x)]:
                num_operations += 1
                value /= x

            case [_, ("print", _)]:
                print("Total operations:", num_operations)
                print("Final value:", value)
                running = False


# Start the server with a 2-second idle timeout
server_thread = Thread(target=server, args=(2.0,))
server_thread.start()

# Spawn 4 clients, each sending 5 operations
clients = [Thread(target=client, args=(5,)) for _ in range(4)]
for c in clients:
    c.start()
for c in clients:
    c.join()

# Once clients finish, send a shutdown signal (or let the timeout fire)
send("calculator", ("print", False))
server_thread.join()

Key observations:

  • send("calculator", ...) is non-blocking and thread-safe — all four clients fire concurrently.

  • The server’s match on receive(...) is a selective receive: it pattern-matches on the message contents, not just the tag.

  • The after callback fires when no message arrives within timeout seconds, causing the server to print results and exit gracefully.

  • No locks, no shared mutable state — the only coordination is the message queue.

Pre-assigning Tags

For performance-sensitive applications where you know your tag set ahead of time, set_tags() pins tags to specific internal queues, avoiding hash collisions:

from bocpy import set_tags

set_tags(["orders", "heartbeat", "shutdown"])

Calling set_tags clears all queued messages and reassigns the queue layout. Call it once at startup, before any sends.

Draining Queues

drain() discards all pending messages for one or more tags:

from bocpy import drain

drain("calculator")           # clear one tag
drain(["orders", "events"])   # clear multiple tags

This is useful for cleanup between test runs or when resetting a subsystem.

Warning

If new messages are arriving faster than they can be drained, drain may not return promptly.

Sending Custom Types Across Sub-interpreters

Messages cross sub-interpreter boundaries through CPython’s cross-interpreter data (XIData) machinery, with a pickle fallback when no XIData handler is registered for the payload’s type. The runtime makes no attempt to ship class definitions along with the message — the receiver must already be able to resolve the type by its fully qualified name.

In practice this means:

  • Builtins and stdlib containers just work. Numbers, strings, bytes, tuple, list, dict, set, frozenset and similar types either have a native XIData handler or pickle cleanly to types every interpreter already knows about.

  • C extension types can register a custom XIData handler to transfer ownership directly without going through pickle. Cown and Matrix use this path; see C ABI for how to expose your own type through the same mechanism.

  • Pure-Python custom classes fall back to pickle. Unpickling only succeeds if the receiving interpreter can already import the class by its fully qualified name. If a worker has never executed import my_pkg.my_module, then receiving an instance of my_pkg.my_module.MyClass will fail with a ModuleNotFoundError or AttributeError raised from inside receive.

  • Closures, lambdas, and locally-defined classes cannot be sent at all — the pickle fallback cannot resolve them by qualified name from any interpreter, and they have no XIData handler.

Inside @when behaviors the transpiler handles the import-side of this automatically: it rewrites the decorated module so each worker imports the same set of names the caller had in scope, and any class referenced by a behavior is therefore resolvable on the worker side. When you use send / receive from a plain thread, a sub-interpreter spawned outside the behavior runtime, or from inside a behavior body but with a type that was not part of the captured environment, you are responsible for ensuring the class is importable on the receiver (or for registering an XIData handler that bypasses pickle entirely).

The simplest way to satisfy the pickle path is to define message payload types at module scope in a module that every participating interpreter imports at startup — for example, a shared messages.py that the main program, the worker bootstrap, and any auxiliary threads all import before the first send.

API Reference

bocpy.send(tag, contents, /)

Sends a message.

bocpy.receive(tags, /, timeout=-1, after=None)

Receives a message.

bocpy.set_tags(tags, /)

Assigns tags to message queues.

bocpy.drain(tags, /)

Drains all messages for the given tags.

bocpy.TIMEOUT = '__timeout__'

str(object=’’) -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to ‘strict’.