Plugin patterns

Common shapes for nexo subprocess plugins. Each pattern is a template you adapt — pick the closest match to what you're building, copy the skeleton, modify.

All patterns work in any of the 4 SDK languages (Rust / Python / TypeScript / PHP). Examples below use the language that's clearest for the pattern.


Pattern 1 · Echo channel

When to use · You're learning the SDK or wiring a brand-new channel and want a smoke-test before adding logic.

The plugin echoes every inbound broker.event back as broker.publish on a mirrored topic. Useful for verifying the wire format end-to-end before you write business logic.

from nexo_plugin_sdk import PluginAdapter, Event

async def on_event(topic, event, broker):
    out_topic = topic.replace("plugin.outbound.", "plugin.inbound.")
    await broker.publish(out_topic, Event.new(out_topic, "my_plugin", event.payload))

await PluginAdapter(manifest_toml=MANIFEST, on_event=on_event).run()

→ Used in every template (extensions/template-plugin-{rust,python,typescript,php}/)


Pattern 2 · Webhook receiver

When to use · An external service POSTs JSON; you want the daemon to see it as a plugin.inbound.<kind> event.

Plugin runs an HTTP server (or listens on a Unix socket) for inbound POST requests. Each request becomes a broker publish. Plugin's manifest declares an http_server capability so the daemon's reverse-proxy / port-allocator wires the route.

use nexo_microapp_sdk::plugin::{BrokerSender, Event, PluginAdapter};
use axum::{Router, routing::post, extract::State, Json};

async fn webhook(State(broker): State<Arc<BrokerSender>>, Json(body): Json<Value>) {
    let event = Event::new("plugin.inbound.webhook", "my_plugin", body);
    let _ = broker.publish("plugin.inbound.webhook", event).await;
}

#[tokio::main]
async fn main() -> Result<()> {
    let adapter = PluginAdapter::new(MANIFEST);
    let broker = adapter.broker();
    tokio::spawn(async move {
        let app = Router::new().route("/webhook", post(webhook)).with_state(broker);
        axum::serve(listener, app).await
    });
    adapter.run().await
}

Manifest declares the inbound topic the plugin will publish to:

[[plugin.channels.register]]
kind = "webhook"
adapter = "WebhookAdapter"

Pattern 3 · RPC bridge to an external API

When to use · You're exposing a third-party service (Stripe, Twilio, internal CRM) as a tool the agent can call.

Plugin doesn't deal with channels — it registers as a tool provider. The agent sends a tool.call request; the plugin forwards to the external API and replies.

import { PluginAdapter } from "nexo-plugin-sdk";

const adapter = new PluginAdapter({
  manifestToml: MANIFEST,
  onEvent: async (topic, event, broker) => {
    if (topic === "plugin.tool.stripe.create_invoice") {
      const inv = await stripeClient.invoices.create(event.payload);
      await broker.publish("plugin.tool.stripe.create_invoice.reply",
        Event.new("plugin.tool.reply", "stripe-bridge", { result: inv }));
    }
  },
});

Manifest contributes the tool:

[[plugin.tools.expose]]
name = "stripe.create_invoice"
schema_path = "./tools/create_invoice.json"

Pattern 4 · Scheduled poller

When to use · You need to poll an external feed every N minutes and publish only changes (deltas) to the broker.

Plugin holds local state (last-seen IDs / etag / cursor), re-polls on a timer, dedupes against state, publishes new items. Persist state to <state_dir>/<plugin_id>/state.json so restarts don't re-emit historical items.

import asyncio, json, aiohttp
from pathlib import Path
from nexo_plugin_sdk import PluginAdapter, Event

STATE = Path(".nexo-state/poller.json")
seen_ids: set[str] = set(json.loads(STATE.read_text())) if STATE.exists() else set()

async def poll_loop(broker):
    while True:
        async with aiohttp.ClientSession() as s:
            items = await (await s.get("https://example.com/feed.json")).json()
        for item in items:
            if item["id"] in seen_ids:
                continue
            seen_ids.add(item["id"])
            await broker.publish("plugin.inbound.feed",
                Event.new("plugin.inbound.feed", "feed_poller", item))
        STATE.write_text(json.dumps(list(seen_ids)))
        await asyncio.sleep(300)  # 5 min

adapter = PluginAdapter(manifest_toml=MANIFEST)
asyncio.create_task(poll_loop(adapter.broker))
await adapter.run()

→ See Build a poller module for the YAML-only path that doesn't need a plugin at all.


Pattern 5 · Long-running connection (websocket / SSE)

When to use · The external service is push-based (Slack RTM, Discord gateway, MQTT broker, custom WebSocket).

Plugin opens the persistent connection at startup. Inbound messages from the external side become broker.publish events. On disconnect, the plugin reconnects with exponential backoff.

#![allow(unused)]
fn main() {
use tokio_tungstenite::connect_async;

let (ws, _) = connect_async("wss://gateway.discord.gg/").await?;
let (write, mut read) = ws.split();
// Auth handshake omitted...

while let Some(msg) = read.next().await {
    let evt = parse_discord(msg?)?;
    broker.publish("plugin.inbound.discord", evt).await?;
}
// On disconnect: reconnect with backoff.
}

The SDK's signal handling (default-on) lets the daemon shut the plugin down cleanly even mid-connection.


Pattern 6 · Stateful conversation glue

When to use · The external channel sends fragments (audio chunks, typing indicators, partial messages) and you want to assemble them before the agent sees a complete event.

Plugin maintains a per-conversation buffer; only emits a broker.publish when the message is "complete" (final chunk, silence timeout, or explicit done marker).

buffer: dict[str, list[str]] = {}
timers: dict[str, asyncio.Task] = {}

async def on_chunk(conv_id, fragment, broker):
    buffer.setdefault(conv_id, []).append(fragment)
    if conv_id in timers:
        timers[conv_id].cancel()
    timers[conv_id] = asyncio.create_task(flush_after(conv_id, broker, delay=2.0))

async def flush_after(conv_id, broker, delay):
    await asyncio.sleep(delay)
    text = "".join(buffer.pop(conv_id, []))
    await broker.publish("plugin.inbound.assembled",
        Event.new("plugin.inbound.assembled", "voice_glue", {"text": text}))

Pattern 7 · Outbound-only adapter

When to use · The plugin only sends (Twilio SMS sender, push notification dispatcher, Slack outbound webhook).

Plugin subscribes to plugin.outbound.<kind> events from the daemon, calls the external API, and publishes a delivery_status event back so the agent knows whether it landed.

const adapter = new PluginAdapter({
  manifestToml: MANIFEST,
  onEvent: async (topic, event, broker) => {
    if (topic.startsWith("plugin.outbound.sms")) {
      const result = await twilio.messages.create({
        to: event.payload.to,
        body: event.payload.body,
        from: TWILIO_FROM,
      });
      await broker.publish("plugin.delivery.sms",
        Event.new("plugin.delivery.sms", "twilio-out",
                  { sid: result.sid, status: result.status }));
    }
  },
});

Pattern 8 · Provider abstraction (multi-instance)

When to use · Operator wants 3 different Telegram bots, each isolated. Or 5 WhatsApp accounts.

Plugin manifest declares instance support. Operator's config spawns N copies, each with a distinct instance label. Topics become plugin.inbound.<kind>.<instance> so agent bindings can target a specific one.

# operator's pollers.yaml
plugins:
  telegram:
    - instance: support-bot
      bot_token_env: TG_SUPPORT_TOKEN
    - instance: sales-bot
      bot_token_env: TG_SALES_TOKEN

The plugin reads instance from args or env at startup and publishes to plugin.inbound.telegram.<instance>.


Choosing between patterns

If you...Use
Are wiring a brand-new channel for the first timeEcho (pattern 1)
Need to receive HTTP from an external serviceWebhook receiver (2)
Are exposing an external API as a toolRPC bridge (3)
Need to poll something on a timerScheduled poller (4)
Have a push-based external serviceLong-running connection (5)
Receive fragmented inputs (chunks, partials)Stateful glue (6)
Only need to send (no receive)Outbound-only (7)
Want N copies of the same pluginProvider abstraction (8)

See also