Events
In-process pub/sub bus for plugin-to-plugin lifecycle signals.
Kernia ships a tiny in-process event bus that lets plugins react to lifecycle
signals from other plugins without taking hard imports on each other. The bus
is per-AuthContext and lives at auth.plugin_state["events"]. Use the
get_bus(auth) helper to access it.
Events are best-effort: handlers run sequentially under the caller's task; an exception in one handler is logged but other handlers still run. There is no event queue and no background workers — auth plugins want to react in the same request lifecycle as the change that triggered them.
Standard events
| Event name | Payload | Emitted by |
|---|---|---|
organization.member.added | MemberEvent | organization plugin (/accept-invitation succeeds) |
organization.member.removed | MemberEvent | organization plugin (/remove-member, /leave-organization) |
organization.member.updated | MemberEvent | organization plugin (/update-member-role) |
user.deleted | UserEvent | admin / update-user /delete-user |
Payloads
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class MemberEvent:
organization_id: str
user_id: str
role: str
action: str # "added" | "removed" | "updated"
@dataclass(frozen=True, slots=True)
class UserEvent:
user_id: str
action: str # "deleted"Subscribing from a plugin
The canonical pattern: subscribe in the plugin's init hook so the bus exists
before any request comes through.
from kernia.events import MemberEvent, get_bus
from kernia.types.context import AuthContext
async def init(ctx: AuthContext) -> None:
bus = get_bus(ctx)
async def on_member_change(event: MemberEvent) -> None:
# do something — push to a queue, write an audit row, sync Stripe, etc.
await update_external_system(event.organization_id, event.user_id)
bus.on("organization.member.added", on_member_change)
bus.on("organization.member.removed", on_member_change)Real-world example: Stripe seat-sync
The Stripe plugin uses this bus to keep Stripe subscription quantity in
lockstep with organization membership. When configured for
subscription_for="organization" and any plan declares seats=True, the
plugin registers a listener that, on every member add/remove, computes the
new member count and calls subscriptions.update(quantity=N). The local
subscription.seats field is updated to match.
See the Stripe plugin → Organization seat-sync section for the full configuration.
Emitting your own events
Plugins can emit their own events for other plugins (or downstream code) to react to. Pick a stable namespaced name and document the payload shape.
from kernia.events import get_bus
async def handler(ctx):
# ...do the thing...
await get_bus(ctx.auth).emit("billing.invoice.paid", invoice_payload)There is no schema enforcement on payloads — pass plain dicts or your own dataclasses. Keep them small and JSON-serializable so they survive a future move to an out-of-process bus.