Streams
A stream is a typed event channel. The declaration looks like a struct,
and from then on the program can emit events of that type. Anything with
a matching on handler runs in response, typically an
agent but also a top-level handler.
Streams give events a name, a shape, and a place in the type system. They are the bus that connects agents.
Declaring a stream
stream Sensor { id: string, temperature: float }
The fields look like a struct's. The difference is in usage. Streams are constructed and emitted, not stored.
A program may declare any number of streams:
stream Click { user_id: int, target: string }
stream Purchase { user_id: int, sku: string, total: float }
stream Error { code: int, message: string }
Emitting events
emit <Stream> { <fields> } produces an event. Every matching handler
runs.
emit Sensor { id: "s-01", temperature: 22.5 }
Field assignments use the same syntax as struct literals. Order does not matter; every required field must be present.
emit is a statement. It does not return a value, and it cannot appear
inside a value expression. To fan out to many listeners, emit repeatedly:
for s in pending_sensors {
emit Sensor { id: s.id, temperature: s.temp }
}
Top-level handlers
Outside an agent, a free-standing on handler runs every time the named
stream emits.
stream Greeting { name: string }
on Greeting as g {
print("hello, " + g.name)
}
emit Greeting { name: "ada" }
emit Greeting { name: "lin" }
hello, ada
hello, lin
Top-level handlers suit small scripts. For state, prefer an agent.
Handlers inside agents
Most production code uses agents as the host for on handlers, because the
agent provides somewhere to keep state.
stream Sensor { id: string, temp: float }
agent monitor {
var max: float = 0.0
on Sensor as s {
if s.temp > max { max = s.temp }
}
}
See agents for the full handler surface.
Filtering with where
Handlers accept a guard with where <expression>:
on Sensor as s where s.temp > 30.0 {
print("hot!", s.id, s.temp)
}
The body runs only when the guard is true. The compiled dispatch matches
what an explicit if would produce, with the predicate kept next to the
handler signature.
Stream pipelines
A program often reads input from one stream and emits to another after
transforming. Mochi has no special pipeline operator for streams. Write an
on handler that emits.
stream Raw { body: string }
stream Parsed { record: User }
agent ingest {
on Raw as r {
let user = parse_user(r.body)
if user != nil {
emit Parsed { record: user }
}
}
}
Pipelines fan out (one handler emits multiple downstream events) or fan in (multiple upstream emits trigger one downstream).
Backpressure and ordering
Within a single agent, handlers run serially in emit order. There is no implicit concurrency. A handler that does slow work blocks subsequent handlers on the same instance.
Across agents, handlers may run concurrently. Mochi does not currently expose a backpressure protocol. A producer faster than its consumer queues events.
Lifecycle of a stream
A stream has no lifecycle of its own; it is a type. emit is synchronous.
By the time emit returns, every handler has been scheduled. The handlers
themselves run on Mochi's runtime scheduler.
Stream types as values
Streams are types. They appear in function signatures, lists, and other positions where a type is expected:
fun describe(s: Sensor): string {
return s.id + ": " + str(s.temperature)
}
let buffered: list<Sensor> = []
A list<Sensor> is a list of values that share a stream declaration.
Storing them in a list does not emit anything; only emit does.
External producers
A real program often pulls events from outside Mochi: an HTTP endpoint, a
file watcher, a Kafka topic. Bridge them into a stream by calling emit
from a normal function or agent.
fun bridge_kafka(topic: string) {
for record in kafka.read(topic) {
emit Raw { body: record.value }
}
}
There is no special API; any code path that calls emit participates in
dispatch.
Common patterns
Logging tap
on Sensor as s {
log_line(s.id + " " + str(s.temperature))
}
A free-standing on handler is a small tap that runs alongside other
handlers. Useful for logging, tracing, and metrics.
Replay
fun replay(events: list<Sensor>) {
for e in events {
emit Sensor { id: e.id, temperature: e.temperature }
}
}
Replaying a saved list of events is a for loop with emit. No special
replay machinery is needed.
Multiplexing
stream Click { user: string, target: string }
agent metrics {
var by_target: map<string, int> = {}
on Click as c {
let count = by_target[c.target] ?? 0
by_target[c.target] = count + 1
}
}
Testing streams
The simplest test setup instantiates the agent under test, emits a sequence of events, and asserts via the agent's intents.
test "monitor tracks max temperature" {
let m = monitor {}
emit Sensor { id: "a", temp: 22.5 }
emit Sensor { id: "b", temp: 31.0 }
expect m.peak() == 31.0
}
Common errors
| Message | Cause | Fix |
|---|---|---|
cannot emit non-stream type | emit on a struct that lacks stream | Replace type with stream, or wrap the value in a stream type. |
unknown stream <Name> | Typo in on handler | Check the stream declaration. |
handler must specify a binding | on Sensor { ... } without as <name> | Add as <name> to bind the event. |
See also
- Agents, stateful blocks that own handlers.
- Tutorial for a worked example.
- Generative AI for letting language models call intents.