Real-time & Events
Zatabase provides a full real-time event system built on an internal pub/sub bus with topic-based routing, persistent message queues, webhook delivery, triggers, consumer groups, and both WebSocket and WebRTC transport layers.
WebSocket Connections
Section titled “WebSocket Connections”Basic WebSocket
Section titled “Basic WebSocket”Connect to the core WebSocket endpoint for real-time event streaming:
wss://your-project.zatabase.io/wsAuthentication is required. Pass your JWT bearer token as a query parameter or in the initial handshake headers.
Exactly-Once WebSocket
Section titled “Exactly-Once WebSocket”For applications requiring exactly-once delivery semantics (no duplicate or lost events), use the enhanced endpoint:
wss://your-project.zatabase.io/ws/exactly-onceThis endpoint uses a persistent message queue with ACK/NACK semantics to guarantee each event is delivered and processed exactly once, even through reconnections.
Message Format
Section titled “Message Format”All WebSocket messages use JSON. Client-to-server messages follow this structure:
{ "type": "subscribe", "topic": "jobs/**", "data": {}, "id": "optional-request-id"}Server-to-client event messages:
{ "topic": "jobs/3f2a-...", "seq": 42, "ts": "2026-03-04T12:00:00Z", "data": { "job_id": "3f2a...", "org_id": 1, "status": "completed" }}Server responses to subscribe/unsubscribe/ping:
{ "id": "your-request-id", "status": "success", "data": { "action": "subscribed" }, "error": null}Subscribing to Topics
Section titled “Subscribing to Topics”After connecting, send subscription messages to start receiving events on specific topics.
Subscribe:
{ "action": "subscribe", "topic": "jobs/my-job-id" }Unsubscribe:
{ "action": "unsubscribe", "topic": "jobs/my-job-id" }Topic Patterns
Section titled “Topic Patterns”Topics follow a hierarchical slash-separated format. Wildcards are supported:
| Pattern | Description |
|---|---|
jobs/abc-123 | Exact match for a specific job |
jobs/* | Match any single segment under jobs/ |
jobs/** | Match any depth under jobs/ |
tables/users/** | All events on the users table |
** | All events (use sparingly) |
Events are auto-routed to topics based on their data:
| Event Source | Topic Format |
|---|---|
| Job events | jobs/{job_id} |
| Table events | tables/{table_name}/{event_type} |
| User events | users/{user_id}/{event_type} |
| System events | system/{component}/{event_type} |
| Connection events | connections/{connection_id}/{event_type} |
| Collection events | collections/{collection_name}/{event_type} |
| Transaction events | transactions/{transaction_id}/{event_type} |
| Custom events | custom/{event_type} |
WebSocket Transactions
Section titled “WebSocket Transactions”You can execute SQL transactions over the WebSocket connection:
{ "type": "BeginTransaction", "data": { "transaction_id": "tx-001" }}{ "type": "ExecuteInTransaction", "data": { "transaction_id": "tx-001", "sql": "INSERT INTO orders (product, qty) VALUES ('widget', 5)", "params": null }}{ "type": "CommitTransaction", "data": { "transaction_id": "tx-001" }}Transaction responses include execution timing:
{ "type": "TransactionResult", "data": { "transaction_id": "tx-001", "operation": "execute", "success": true, "result": [{ "product": "widget", "qty": 5 }], "duration_ms": 3 }}Backpressure Handling
Section titled “Backpressure Handling”Zatabase uses a bounded per-connection queue (256 messages). When the queue fills up:
- The oldest message is dropped to make room for the new one
- A metadata notification is injected so the client knows events were dropped:
{ "topic": "meta/ws", "seq": 0, "ts": "2026-03-04T12:00:00Z", "data": { "event": "dropped", "count": 5 }}- Dropped message counts are tracked in server metrics
To avoid drops, ensure your client processes messages quickly and subscribes only to topics it needs.
Keepalive
Section titled “Keepalive”Send periodic ping messages to keep the connection alive:
{ "type": "ping", "data": {} }Response:
{ "status": "success", "data": { "pong": true } }Event System
Section titled “Event System”The event system (zevents) provides a high-performance pub/sub bus with persistent storage, exactly-once delivery, filtering, batching, and webhook delivery.
Event Types
Section titled “Event Types”Zatabase emits events across all subsystems. The event_type field uses dot-notation:
| Category | Event Types |
|---|---|
| Jobs | job.created, job.started, job.completed, job.failed, job.canceled, job.retrying |
| Database | table.created, table.dropped, index.built, index.dropped, data.ingested, query.executed |
| Users | user.created, user.updated, user.deleted, user.logged_in, user.logged_out |
| System | system.started, system.stopped, system.config_changed, system.health_check_failed, system.memory_pressure, system.disk_space_warning |
| Real-time | websocket.connected, websocket.disconnected, webrtc.connected, webrtc.disconnected |
| Rows | row.inserted, row.updated, row.deleted, schema.changed |
| Transactions | transaction.begun, transaction.committed, transaction.rolled_back, transaction.aborted |
| Collections | collection.created, collection.updated, collection.deleted, projection.updated |
| Functions | function.call |
| Custom | custom.{your_name} |
Publishing Events
Section titled “Publishing Events”Publish custom events via the HTTP API:
curl -s -X POST https://your-project.zatabase.io/v1/events \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "event_type": "custom.order_placed", "data": { "order_id": "ord-123", "total": 49.99 }, "correlation_id": "req-abc", "source": "checkout-service", "attributes": { "region": "us-east" } }'Response:
{ "event_id": "01HXYZ...", "event_type": "custom.order_placed", "timestamp": "2026-03-04T12:00:00Z", "topic": "custom/custom.order_placed"}Querying Events
Section titled “Querying Events”Retrieve stored events with filtering:
curl -s "https://your-project.zatabase.io/v1/events?event_types=job.created,job.completed&limit=50" \ -H "Authorization: Bearer $ZATABASE_TOKEN"Query parameters:
| Parameter | Description |
|---|---|
event_types | Comma-separated event types to filter |
topics | Comma-separated topic patterns (supports wildcards) |
start_time | ISO 8601 start time for range filter |
end_time | ISO 8601 end time for range filter |
limit | Maximum events to return (default: 100) |
stream_id | Optional stream ID |
Replaying Events
Section titled “Replaying Events”Replay historical events from the event store:
curl -s -X POST https://your-project.zatabase.io/v1/events/replay \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "from_event_id": "01HXYZ...", "event_types": ["job.created"], "limit": 100 }'Event Statistics
Section titled “Event Statistics”Get event bus and delivery statistics:
curl -s https://your-project.zatabase.io/v1/events/stats \ -H "Authorization: Bearer $ZATABASE_TOKEN"Event Filtering
Section titled “Event Filtering”Filters allow fine-grained control over which events a subscriber receives. Filters can combine multiple criteria:
- Tables: Only events for specific tables (e.g.,
["users", "orders"]) - Operations: Only specific operations (e.g.,
["INSERT", "UPDATE"]) - Users: Events by specific user IDs
- Organizations: Events for specific org IDs
- Event types: Specific event type categories
- Connections: Events from specific connection IDs
- Transactions: Events within specific transaction IDs
- Attributes: Custom key-value metadata matching
- Permissions: Permission-based filtering (admin, operator, read)
Predefined filter shortcuts:
| Filter | Matches |
|---|---|
| Database changes | row.inserted, row.updated, row.deleted, schema.changed |
| User activity | user.created, user.updated, user.deleted, user.logged_in, user.logged_out |
| System events | system.started, system.stopped, system.config_changed, system.health_check_failed, system.memory_pressure, system.disk_space_warning |
Event Batching
Section titled “Event Batching”For high-throughput scenarios, events can be batched to reduce overhead. The batching system supports:
- Size-based batching: Flush when a batch reaches a configured number of events (default: 100)
- Time-based batching: Flush after a maximum age (default: 5 seconds)
- Transaction-scoped batching: Group events by transaction ID
- Organization-scoped batching: Group events by organization
Batching configuration is set server-side. Batches are flushed automatically when either the size or age threshold is reached, whichever comes first.
Consumer Groups
Section titled “Consumer Groups”Consumer groups allow multiple consumers to process events in parallel with load balancing. Each message is delivered to exactly one consumer in the group.
Key features:
- Concurrency control: Configurable max concurrent messages per consumer (default: 10)
- Processing timeout: Automatic NACK on timeout (default: 30 seconds)
- ACK/NACK semantics: Explicit acknowledgment with retry on failure
- Dead letter queue: Failed messages after max retries go to the dead letter queue
- Statistics: Per-consumer stats including processed count, error rate, and average processing time
Consumer groups are managed server-side. Clients interact through WebSocket subscriptions or webhook endpoints.
Webhooks
Section titled “Webhooks”Webhooks deliver events to external HTTP endpoints with HMAC signature verification, retry logic, and rate limiting.
Create a Webhook
Section titled “Create a Webhook”curl -s -X POST https://your-project.zatabase.io/v1/webhooks \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "url": "https://example.com/webhook", "secret": "whsec_your_signing_secret", "event_types": ["job.created", "job.completed", "row.inserted"], "topics": ["jobs/**"], "headers": { "X-Custom-Header": "my-value" }, "max_retries": 5, "timeout_seconds": 15 }'List Webhooks
Section titled “List Webhooks”curl -s https://your-project.zatabase.io/v1/webhooks \ -H "Authorization: Bearer $ZATABASE_TOKEN"Get a Webhook
Section titled “Get a Webhook”curl -s https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \ -H "Authorization: Bearer $ZATABASE_TOKEN"Delete a Webhook
Section titled “Delete a Webhook”curl -s -X DELETE https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \ -H "Authorization: Bearer $ZATABASE_TOKEN"Webhook Delivery
Section titled “Webhook Delivery”When an event matches a webhook’s event types and topic patterns, Zatabase sends an HTTP POST to the registered URL with:
Headers:
| Header | Description |
|---|---|
Content-Type | application/json |
X-Zatabase-Signature | HMAC-SHA256 signature: t=TIMESTAMP,v1=SIGNATURE |
X-Zatabase-Event-Type | The event type (e.g., job.created) |
X-Zatabase-Event-ID | Unique event ID (ULID) |
X-Zatabase-Delivery-ID | Unique delivery attempt ID |
Body: The full event JSON payload.
Signature Verification
Section titled “Signature Verification”Verify webhook signatures to ensure authenticity. The signature is computed as:
HMAC-SHA256(secret, "{timestamp}.{payload}")Parse the X-Zatabase-Signature header to extract the timestamp (t=) and signature (v1=). Recompute the HMAC using your secret and compare. Reject if the timestamp is older than 5 minutes (300 seconds).
Node.js example:
const crypto = require('crypto');
function verifyWebhook(payload, header, secret) { const parts = Object.fromEntries( header.split(',').map(p => p.split('=')) ); const timestamp = parts.t; const signature = parts.v1;
const expected = crypto .createHmac('sha256', secret) .update(`${timestamp}.${payload}`) .digest('hex');
const age = Math.floor(Date.now() / 1000) - parseInt(timestamp); if (age > 300) return false;
return crypto.timingSafeEqual( Buffer.from(signature), Buffer.from(expected) );}Retry Policy
Section titled “Retry Policy”Failed deliveries (non-2xx status codes or network errors) are retried with exponential backoff:
| Attempt | Delay |
|---|---|
| 1st retry | 2 seconds |
| 2nd retry | 4 seconds |
| 3rd retry | 8 seconds |
| 4th retry | 16 seconds |
| 5th retry | 32 seconds |
Maximum retry delay is capped at 300 seconds (5 minutes). After exhausting all retries, the delivery is marked as dropped.
Rate Limiting
Section titled “Rate Limiting”Webhooks support per-endpoint rate limiting:
{ "rate_limit": { "requests_per_period": 100, "period_seconds": 60, "burst": 10 }}When rate-limited, deliveries return a 429 status and are not retried.
Triggers
Section titled “Triggers”Triggers execute actions automatically when event conditions are met. They support complex conditions, multiple action types, rate limiting, and execution tracking.
Create a Trigger
Section titled “Create a Trigger”curl -s -X POST https://your-project.zatabase.io/v1/triggers \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "name": "notify-on-failure", "condition": { "type": "Compound", "operator": "And", "conditions": [ { "type": "EventType", "event_type": "job.failed" }, { "type": "OrganizationId", "org_id": 1 } ] }, "actions": [ { "type": "CallWebhook", "url": "https://hooks.slack.com/services/...", "method": "POST", "headers": { "Content-Type": "application/json" }, "body_template": "{\"text\": \"Job {{event_id}} failed at {{timestamp}}\"}" }, { "type": "Log", "level": "Error", "message_template": "Job failure detected: {{event_data}}" } ], "rate_limit": { "max_triggers": 10, "period_seconds": 60 }, "tags": ["alerting", "jobs"] }'List Triggers
Section titled “List Triggers”curl -s https://your-project.zatabase.io/v1/triggers \ -H "Authorization: Bearer $ZATABASE_TOKEN"Trigger Conditions
Section titled “Trigger Conditions”Conditions determine when a trigger fires:
| Condition Type | Description |
|---|---|
EventType | Match a specific event type |
TopicPattern | Match a topic pattern (supports wildcards) |
DataContains | Match a field in the event data with comparison operators (Equal, GreaterThan, Contains, etc.) |
MetadataContains | Match event metadata attributes |
OrganizationId | Match a specific organization |
TimeWindow | Only fire during specific hours (with timezone support) |
Frequency | Fire when an event type occurs N times within a time window |
Compound | Combine conditions with And, Or, or Not |
Regex | Match event fields with regular expressions |
Trigger Actions
Section titled “Trigger Actions”Actions executed when the condition is met:
| Action Type | Description |
|---|---|
EmitEvent | Publish a new event to the bus |
CallWebhook | Send an HTTP request (GET, POST, PUT, DELETE) |
ExecuteFunction | Run a serverless function |
Log | Write a log message (Debug, Info, Warn, Error) |
StoreData | Store a key-value pair with optional TTL |
SendNotification | Send a notification on a channel with priority |
Action templates support {{event_id}}, {{event_type}}, {{org_id}}, {{timestamp}}, and {{event_data}} placeholders.
Scheduled Tasks (Cron)
Section titled “Scheduled Tasks (Cron)”Create scheduled tasks that run on a cron schedule:
curl -s -X POST https://your-project.zatabase.io/v1/tasks \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "name": "daily-cleanup", "schedule": "0 0 * * *", "actions": [ { "type": "EmitEvent", "event_type": "custom.scheduled_cleanup", "event_data": { "scope": "all" } } ] }'curl -s https://your-project.zatabase.io/v1/tasks \ -H "Authorization: Bearer $ZATABASE_TOKEN"WebSocket Metrics
Section titled “WebSocket Metrics”Monitor active WebSocket connections and backpressure:
curl -s https://your-project.zatabase.io/v1/metrics/ws \ -H "Authorization: Bearer $ZATABASE_TOKEN"Returns:
{ "active_connections": 12, "total_dropped_messages": 0}Client Examples
Section titled “Client Examples”JavaScript
Section titled “JavaScript”const ws = new WebSocket('wss://your-project.zatabase.io/ws', { headers: { 'Authorization': 'Bearer YOUR_TOKEN' }});
ws.onopen = () => { // Subscribe to job events ws.send(JSON.stringify({ action: 'subscribe', topic: 'jobs/**' }));
// Subscribe to row changes on a specific table ws.send(JSON.stringify({ action: 'subscribe', topic: 'tables/orders/**' }));};
ws.onmessage = (event) => { const msg = JSON.parse(event.data);
// Check for backpressure warnings if (msg.topic === 'meta/ws') { console.warn('Dropped events:', msg.data.count); return; }
console.log(`[${msg.topic}]`, msg.data);};
ws.onclose = (event) => { console.log('Disconnected:', event.code, event.reason); // Implement reconnection with backoff};
// Keepalive ping every 30 secondssetInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'ping', data: {} })); }}, 30000);Python
Section titled “Python”import asyncioimport jsonimport websockets
async def connect(): uri = "wss://your-project.zatabase.io/ws" headers = {"Authorization": "Bearer YOUR_TOKEN"}
async with websockets.connect(uri, extra_headers=headers) as ws: # Subscribe to events await ws.send(json.dumps({ "action": "subscribe", "topic": "jobs/**" }))
await ws.send(json.dumps({ "action": "subscribe", "topic": "tables/orders/**" }))
# Listen for events async for message in ws: event = json.loads(message)
if event.get("topic") == "meta/ws": print(f"Warning: {event['data']['count']} events dropped") continue
print(f"[{event['topic']}] {event['data']}")
asyncio.run(connect())curl (Webhook Management)
Section titled “curl (Webhook Management)”# Create a webhook for job eventscurl -s -X POST https://your-project.zatabase.io/v1/webhooks \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "url": "https://example.com/zatabase-webhook", "secret": "whsec_mysecretkey123", "event_types": ["job.created", "job.completed", "job.failed"], "max_retries": 3 }'
# List all webhookscurl -s https://your-project.zatabase.io/v1/webhooks \ -H "Authorization: Bearer $ZATABASE_TOKEN"
# Delete a webhookcurl -s -X DELETE https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \ -H "Authorization: Bearer $ZATABASE_TOKEN"
# Publish a custom eventcurl -s -X POST https://your-project.zatabase.io/v1/events \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{ "event_type": "custom.deployment_started", "data": { "service": "api", "version": "2.1.0" } }'
# Query recent eventscurl -s "https://your-project.zatabase.io/v1/events?event_types=job.created&limit=10" \ -H "Authorization: Bearer $ZATABASE_TOKEN"WebRTC Data Channels (Advanced)
Section titled “WebRTC Data Channels (Advanced)”For ultra-low-latency scenarios, Zatabase supports WebRTC data channels as an alternative transport. WebRTC bypasses HTTP overhead and provides direct peer-to-peer-style communication.
WebRTC connections provide five named data channels:
| Channel | Purpose |
|---|---|
queries | Execute SQL and QueryBuilder queries with sub-millisecond overhead |
events | Receive critical event notifications with priority levels and correlation tracking |
logs | Stream structured log entries at configurable severity levels |
metrics | Non-blocking telemetry streaming with lossy semantics under load |
transactions | Transaction coordination (2PC prepare/commit/abort, saga steps) |
WebRTC connections are established via SDP signaling over HTTP:
# Send SDP offercurl -s -X POST https://your-project.zatabase.io/v1/webrtc/offer \ -H "Authorization: Bearer $ZATABASE_TOKEN" \ -H "Content-Type: application/json" \ -d '{"sdp": "v=0\r\n...", "type": "offer"}'The server responds with an SDP answer. After ICE negotiation completes, data channels become available for bidirectional communication.
Query messages sent over the queries data channel follow this format:
{ "id": "req-001", "query_type": "Sql", "payload": "SELECT * FROM users LIMIT 10"}Response:
{ "id": "req-001", "status": "Success", "data": [{ "name": "alice", "email": "alice@example.com" }], "metadata": { "rows_returned": 1, "execution_time_ms": 2 }}WebRTC is recommended for dashboard applications, live data visualizations, and any use case where sub-10ms event delivery matters. See the WebRTC signaling endpoints in the API Reference for the full handshake flow.
Event System Architecture
Section titled “Event System Architecture”The event system coordinates several components:
- Event Bus: High-performance broadcast channel with configurable buffer (default: 10,000 events)
- Event Store: Persistent event log for replay and audit
- Webhook Manager: HTTP delivery with HMAC signing, retry, and rate limiting
- Trigger Manager: Condition-action engine with rate limiting and execution tracking
- Cron Scheduler: Time-based task scheduling
- Transaction Manager: Transactional event publishing with batch support
- Metrics Collector: Event throughput, latency, and alert monitoring
- Unified Streamer: Cross-system event multiplexing with deduplication and health monitoring
The dead letter queue captures events that fail delivery after all retries, ensuring no events are silently lost.
- WebSocket connections require JWT authentication
- All events are scoped to the authenticated organization (multi-tenant isolation)
- The event store persists all events for replay and audit
- Topic patterns follow MQTT-style glob matching (
*= one segment,**= any depth) - Webhook signatures use HMAC-SHA256 with a 5-minute timestamp tolerance
- The
eventsfeature flag must be enabled for webhook, trigger, event publish/query, and scheduled task endpoints - The
websocketfeature flag enables the exactly-once WebSocket endpoint - The core
/wsendpoint is always available