Skip to content

Real-time & Events

Zatabase provides a full real-time event system built on an internal pub/sub bus with topic-based routing, persistent message queues, webhook delivery, triggers, consumer groups, and both WebSocket and WebRTC transport layers.

Connect to the core WebSocket endpoint for real-time event streaming:

wss://your-project.zatabase.io/ws

Authentication is required. Pass your JWT bearer token as a query parameter or in the initial handshake headers.

For applications requiring exactly-once delivery semantics (no duplicate or lost events), use the enhanced endpoint:

wss://your-project.zatabase.io/ws/exactly-once

This endpoint uses a persistent message queue with ACK/NACK semantics to guarantee each event is delivered and processed exactly once, even through reconnections.

All WebSocket messages use JSON. Client-to-server messages follow this structure:

{
"type": "subscribe",
"topic": "jobs/**",
"data": {},
"id": "optional-request-id"
}

Server-to-client event messages:

{
"topic": "jobs/3f2a-...",
"seq": 42,
"ts": "2026-03-04T12:00:00Z",
"data": {
"job_id": "3f2a...",
"org_id": 1,
"status": "completed"
}
}

Server responses to subscribe/unsubscribe/ping:

{
"id": "your-request-id",
"status": "success",
"data": { "action": "subscribed" },
"error": null
}

After connecting, send subscription messages to start receiving events on specific topics.

Subscribe:

{ "action": "subscribe", "topic": "jobs/my-job-id" }

Unsubscribe:

{ "action": "unsubscribe", "topic": "jobs/my-job-id" }

Topics follow a hierarchical slash-separated format. Wildcards are supported:

PatternDescription
jobs/abc-123Exact match for a specific job
jobs/*Match any single segment under jobs/
jobs/**Match any depth under jobs/
tables/users/**All events on the users table
**All events (use sparingly)

Events are auto-routed to topics based on their data:

Event SourceTopic Format
Job eventsjobs/{job_id}
Table eventstables/{table_name}/{event_type}
User eventsusers/{user_id}/{event_type}
System eventssystem/{component}/{event_type}
Connection eventsconnections/{connection_id}/{event_type}
Collection eventscollections/{collection_name}/{event_type}
Transaction eventstransactions/{transaction_id}/{event_type}
Custom eventscustom/{event_type}

You can execute SQL transactions over the WebSocket connection:

{
"type": "BeginTransaction",
"data": { "transaction_id": "tx-001" }
}
{
"type": "ExecuteInTransaction",
"data": {
"transaction_id": "tx-001",
"sql": "INSERT INTO orders (product, qty) VALUES ('widget', 5)",
"params": null
}
}
{
"type": "CommitTransaction",
"data": { "transaction_id": "tx-001" }
}

Transaction responses include execution timing:

{
"type": "TransactionResult",
"data": {
"transaction_id": "tx-001",
"operation": "execute",
"success": true,
"result": [{ "product": "widget", "qty": 5 }],
"duration_ms": 3
}
}

Zatabase uses a bounded per-connection queue (256 messages). When the queue fills up:

  1. The oldest message is dropped to make room for the new one
  2. A metadata notification is injected so the client knows events were dropped:
{
"topic": "meta/ws",
"seq": 0,
"ts": "2026-03-04T12:00:00Z",
"data": { "event": "dropped", "count": 5 }
}
  1. Dropped message counts are tracked in server metrics

To avoid drops, ensure your client processes messages quickly and subscribes only to topics it needs.

Send periodic ping messages to keep the connection alive:

{ "type": "ping", "data": {} }

Response:

{ "status": "success", "data": { "pong": true } }

The event system (zevents) provides a high-performance pub/sub bus with persistent storage, exactly-once delivery, filtering, batching, and webhook delivery.

Zatabase emits events across all subsystems. The event_type field uses dot-notation:

CategoryEvent Types
Jobsjob.created, job.started, job.completed, job.failed, job.canceled, job.retrying
Databasetable.created, table.dropped, index.built, index.dropped, data.ingested, query.executed
Usersuser.created, user.updated, user.deleted, user.logged_in, user.logged_out
Systemsystem.started, system.stopped, system.config_changed, system.health_check_failed, system.memory_pressure, system.disk_space_warning
Real-timewebsocket.connected, websocket.disconnected, webrtc.connected, webrtc.disconnected
Rowsrow.inserted, row.updated, row.deleted, schema.changed
Transactionstransaction.begun, transaction.committed, transaction.rolled_back, transaction.aborted
Collectionscollection.created, collection.updated, collection.deleted, projection.updated
Functionsfunction.call
Customcustom.{your_name}

Publish custom events via the HTTP API:

Terminal window
curl -s -X POST https://your-project.zatabase.io/v1/events \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"event_type": "custom.order_placed",
"data": {
"order_id": "ord-123",
"total": 49.99
},
"correlation_id": "req-abc",
"source": "checkout-service",
"attributes": {
"region": "us-east"
}
}'

Response:

{
"event_id": "01HXYZ...",
"event_type": "custom.order_placed",
"timestamp": "2026-03-04T12:00:00Z",
"topic": "custom/custom.order_placed"
}

Retrieve stored events with filtering:

Terminal window
curl -s "https://your-project.zatabase.io/v1/events?event_types=job.created,job.completed&limit=50" \
-H "Authorization: Bearer $ZATABASE_TOKEN"

Query parameters:

ParameterDescription
event_typesComma-separated event types to filter
topicsComma-separated topic patterns (supports wildcards)
start_timeISO 8601 start time for range filter
end_timeISO 8601 end time for range filter
limitMaximum events to return (default: 100)
stream_idOptional stream ID

Replay historical events from the event store:

Terminal window
curl -s -X POST https://your-project.zatabase.io/v1/events/replay \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"from_event_id": "01HXYZ...",
"event_types": ["job.created"],
"limit": 100
}'

Get event bus and delivery statistics:

Terminal window
curl -s https://your-project.zatabase.io/v1/events/stats \
-H "Authorization: Bearer $ZATABASE_TOKEN"

Filters allow fine-grained control over which events a subscriber receives. Filters can combine multiple criteria:

  • Tables: Only events for specific tables (e.g., ["users", "orders"])
  • Operations: Only specific operations (e.g., ["INSERT", "UPDATE"])
  • Users: Events by specific user IDs
  • Organizations: Events for specific org IDs
  • Event types: Specific event type categories
  • Connections: Events from specific connection IDs
  • Transactions: Events within specific transaction IDs
  • Attributes: Custom key-value metadata matching
  • Permissions: Permission-based filtering (admin, operator, read)

Predefined filter shortcuts:

FilterMatches
Database changesrow.inserted, row.updated, row.deleted, schema.changed
User activityuser.created, user.updated, user.deleted, user.logged_in, user.logged_out
System eventssystem.started, system.stopped, system.config_changed, system.health_check_failed, system.memory_pressure, system.disk_space_warning

For high-throughput scenarios, events can be batched to reduce overhead. The batching system supports:

  • Size-based batching: Flush when a batch reaches a configured number of events (default: 100)
  • Time-based batching: Flush after a maximum age (default: 5 seconds)
  • Transaction-scoped batching: Group events by transaction ID
  • Organization-scoped batching: Group events by organization

Batching configuration is set server-side. Batches are flushed automatically when either the size or age threshold is reached, whichever comes first.

Consumer groups allow multiple consumers to process events in parallel with load balancing. Each message is delivered to exactly one consumer in the group.

Key features:

  • Concurrency control: Configurable max concurrent messages per consumer (default: 10)
  • Processing timeout: Automatic NACK on timeout (default: 30 seconds)
  • ACK/NACK semantics: Explicit acknowledgment with retry on failure
  • Dead letter queue: Failed messages after max retries go to the dead letter queue
  • Statistics: Per-consumer stats including processed count, error rate, and average processing time

Consumer groups are managed server-side. Clients interact through WebSocket subscriptions or webhook endpoints.

Webhooks deliver events to external HTTP endpoints with HMAC signature verification, retry logic, and rate limiting.

Terminal window
curl -s -X POST https://your-project.zatabase.io/v1/webhooks \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/webhook",
"secret": "whsec_your_signing_secret",
"event_types": ["job.created", "job.completed", "row.inserted"],
"topics": ["jobs/**"],
"headers": {
"X-Custom-Header": "my-value"
},
"max_retries": 5,
"timeout_seconds": 15
}'
Terminal window
curl -s https://your-project.zatabase.io/v1/webhooks \
-H "Authorization: Bearer $ZATABASE_TOKEN"
Terminal window
curl -s https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \
-H "Authorization: Bearer $ZATABASE_TOKEN"
Terminal window
curl -s -X DELETE https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \
-H "Authorization: Bearer $ZATABASE_TOKEN"

When an event matches a webhook’s event types and topic patterns, Zatabase sends an HTTP POST to the registered URL with:

Headers:

HeaderDescription
Content-Typeapplication/json
X-Zatabase-SignatureHMAC-SHA256 signature: t=TIMESTAMP,v1=SIGNATURE
X-Zatabase-Event-TypeThe event type (e.g., job.created)
X-Zatabase-Event-IDUnique event ID (ULID)
X-Zatabase-Delivery-IDUnique delivery attempt ID

Body: The full event JSON payload.

Verify webhook signatures to ensure authenticity. The signature is computed as:

HMAC-SHA256(secret, "{timestamp}.{payload}")

Parse the X-Zatabase-Signature header to extract the timestamp (t=) and signature (v1=). Recompute the HMAC using your secret and compare. Reject if the timestamp is older than 5 minutes (300 seconds).

Node.js example:

const crypto = require('crypto');
function verifyWebhook(payload, header, secret) {
const parts = Object.fromEntries(
header.split(',').map(p => p.split('='))
);
const timestamp = parts.t;
const signature = parts.v1;
const expected = crypto
.createHmac('sha256', secret)
.update(`${timestamp}.${payload}`)
.digest('hex');
const age = Math.floor(Date.now() / 1000) - parseInt(timestamp);
if (age > 300) return false;
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}

Failed deliveries (non-2xx status codes or network errors) are retried with exponential backoff:

AttemptDelay
1st retry2 seconds
2nd retry4 seconds
3rd retry8 seconds
4th retry16 seconds
5th retry32 seconds

Maximum retry delay is capped at 300 seconds (5 minutes). After exhausting all retries, the delivery is marked as dropped.

Webhooks support per-endpoint rate limiting:

{
"rate_limit": {
"requests_per_period": 100,
"period_seconds": 60,
"burst": 10
}
}

When rate-limited, deliveries return a 429 status and are not retried.

Triggers execute actions automatically when event conditions are met. They support complex conditions, multiple action types, rate limiting, and execution tracking.

Terminal window
curl -s -X POST https://your-project.zatabase.io/v1/triggers \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "notify-on-failure",
"condition": {
"type": "Compound",
"operator": "And",
"conditions": [
{ "type": "EventType", "event_type": "job.failed" },
{ "type": "OrganizationId", "org_id": 1 }
]
},
"actions": [
{
"type": "CallWebhook",
"url": "https://hooks.slack.com/services/...",
"method": "POST",
"headers": { "Content-Type": "application/json" },
"body_template": "{\"text\": \"Job {{event_id}} failed at {{timestamp}}\"}"
},
{
"type": "Log",
"level": "Error",
"message_template": "Job failure detected: {{event_data}}"
}
],
"rate_limit": {
"max_triggers": 10,
"period_seconds": 60
},
"tags": ["alerting", "jobs"]
}'
Terminal window
curl -s https://your-project.zatabase.io/v1/triggers \
-H "Authorization: Bearer $ZATABASE_TOKEN"

Conditions determine when a trigger fires:

Condition TypeDescription
EventTypeMatch a specific event type
TopicPatternMatch a topic pattern (supports wildcards)
DataContainsMatch a field in the event data with comparison operators (Equal, GreaterThan, Contains, etc.)
MetadataContainsMatch event metadata attributes
OrganizationIdMatch a specific organization
TimeWindowOnly fire during specific hours (with timezone support)
FrequencyFire when an event type occurs N times within a time window
CompoundCombine conditions with And, Or, or Not
RegexMatch event fields with regular expressions

Actions executed when the condition is met:

Action TypeDescription
EmitEventPublish a new event to the bus
CallWebhookSend an HTTP request (GET, POST, PUT, DELETE)
ExecuteFunctionRun a serverless function
LogWrite a log message (Debug, Info, Warn, Error)
StoreDataStore a key-value pair with optional TTL
SendNotificationSend a notification on a channel with priority

Action templates support {{event_id}}, {{event_type}}, {{org_id}}, {{timestamp}}, and {{event_data}} placeholders.

Create scheduled tasks that run on a cron schedule:

Terminal window
curl -s -X POST https://your-project.zatabase.io/v1/tasks \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "daily-cleanup",
"schedule": "0 0 * * *",
"actions": [
{
"type": "EmitEvent",
"event_type": "custom.scheduled_cleanup",
"event_data": { "scope": "all" }
}
]
}'
Terminal window
curl -s https://your-project.zatabase.io/v1/tasks \
-H "Authorization: Bearer $ZATABASE_TOKEN"

Monitor active WebSocket connections and backpressure:

Terminal window
curl -s https://your-project.zatabase.io/v1/metrics/ws \
-H "Authorization: Bearer $ZATABASE_TOKEN"

Returns:

{
"active_connections": 12,
"total_dropped_messages": 0
}
const ws = new WebSocket('wss://your-project.zatabase.io/ws', {
headers: { 'Authorization': 'Bearer YOUR_TOKEN' }
});
ws.onopen = () => {
// Subscribe to job events
ws.send(JSON.stringify({
action: 'subscribe',
topic: 'jobs/**'
}));
// Subscribe to row changes on a specific table
ws.send(JSON.stringify({
action: 'subscribe',
topic: 'tables/orders/**'
}));
};
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
// Check for backpressure warnings
if (msg.topic === 'meta/ws') {
console.warn('Dropped events:', msg.data.count);
return;
}
console.log(`[${msg.topic}]`, msg.data);
};
ws.onclose = (event) => {
console.log('Disconnected:', event.code, event.reason);
// Implement reconnection with backoff
};
// Keepalive ping every 30 seconds
setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: 'ping', data: {} }));
}
}, 30000);
import asyncio
import json
import websockets
async def connect():
uri = "wss://your-project.zatabase.io/ws"
headers = {"Authorization": "Bearer YOUR_TOKEN"}
async with websockets.connect(uri, extra_headers=headers) as ws:
# Subscribe to events
await ws.send(json.dumps({
"action": "subscribe",
"topic": "jobs/**"
}))
await ws.send(json.dumps({
"action": "subscribe",
"topic": "tables/orders/**"
}))
# Listen for events
async for message in ws:
event = json.loads(message)
if event.get("topic") == "meta/ws":
print(f"Warning: {event['data']['count']} events dropped")
continue
print(f"[{event['topic']}] {event['data']}")
asyncio.run(connect())
Terminal window
# Create a webhook for job events
curl -s -X POST https://your-project.zatabase.io/v1/webhooks \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"url": "https://example.com/zatabase-webhook",
"secret": "whsec_mysecretkey123",
"event_types": ["job.created", "job.completed", "job.failed"],
"max_retries": 3
}'
# List all webhooks
curl -s https://your-project.zatabase.io/v1/webhooks \
-H "Authorization: Bearer $ZATABASE_TOKEN"
# Delete a webhook
curl -s -X DELETE https://your-project.zatabase.io/v1/webhooks/WEBHOOK_ID \
-H "Authorization: Bearer $ZATABASE_TOKEN"
# Publish a custom event
curl -s -X POST https://your-project.zatabase.io/v1/events \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"event_type": "custom.deployment_started",
"data": { "service": "api", "version": "2.1.0" }
}'
# Query recent events
curl -s "https://your-project.zatabase.io/v1/events?event_types=job.created&limit=10" \
-H "Authorization: Bearer $ZATABASE_TOKEN"

For ultra-low-latency scenarios, Zatabase supports WebRTC data channels as an alternative transport. WebRTC bypasses HTTP overhead and provides direct peer-to-peer-style communication.

WebRTC connections provide five named data channels:

ChannelPurpose
queriesExecute SQL and QueryBuilder queries with sub-millisecond overhead
eventsReceive critical event notifications with priority levels and correlation tracking
logsStream structured log entries at configurable severity levels
metricsNon-blocking telemetry streaming with lossy semantics under load
transactionsTransaction coordination (2PC prepare/commit/abort, saga steps)

WebRTC connections are established via SDP signaling over HTTP:

Terminal window
# Send SDP offer
curl -s -X POST https://your-project.zatabase.io/v1/webrtc/offer \
-H "Authorization: Bearer $ZATABASE_TOKEN" \
-H "Content-Type: application/json" \
-d '{"sdp": "v=0\r\n...", "type": "offer"}'

The server responds with an SDP answer. After ICE negotiation completes, data channels become available for bidirectional communication.

Query messages sent over the queries data channel follow this format:

{
"id": "req-001",
"query_type": "Sql",
"payload": "SELECT * FROM users LIMIT 10"
}

Response:

{
"id": "req-001",
"status": "Success",
"data": [{ "name": "alice", "email": "alice@example.com" }],
"metadata": {
"rows_returned": 1,
"execution_time_ms": 2
}
}

WebRTC is recommended for dashboard applications, live data visualizations, and any use case where sub-10ms event delivery matters. See the WebRTC signaling endpoints in the API Reference for the full handshake flow.

The event system coordinates several components:

  • Event Bus: High-performance broadcast channel with configurable buffer (default: 10,000 events)
  • Event Store: Persistent event log for replay and audit
  • Webhook Manager: HTTP delivery with HMAC signing, retry, and rate limiting
  • Trigger Manager: Condition-action engine with rate limiting and execution tracking
  • Cron Scheduler: Time-based task scheduling
  • Transaction Manager: Transactional event publishing with batch support
  • Metrics Collector: Event throughput, latency, and alert monitoring
  • Unified Streamer: Cross-system event multiplexing with deduplication and health monitoring

The dead letter queue captures events that fail delivery after all retries, ensuring no events are silently lost.

  • WebSocket connections require JWT authentication
  • All events are scoped to the authenticated organization (multi-tenant isolation)
  • The event store persists all events for replay and audit
  • Topic patterns follow MQTT-style glob matching (* = one segment, ** = any depth)
  • Webhook signatures use HMAC-SHA256 with a 5-minute timestamp tolerance
  • The events feature flag must be enabled for webhook, trigger, event publish/query, and scheduled task endpoints
  • The websocket feature flag enables the exactly-once WebSocket endpoint
  • The core /ws endpoint is always available