Streams API
Append-only logs with entry IDs for events, feeds, and audit trails
Streams are append-only logs with entry IDs and field-value payloads. They work well for events, feeds, message processing, and audit trails.
Endpoints
| Method | Endpoint | Description |
|---|---|---|
| POST | /[id]/streams/add | Add an entry |
| GET | /[id]/streams/length/[key] | Get stream length |
| GET | /[id]/streams/range/[key] | Forward range |
| GET | /[id]/streams/reverse/[key] | Reverse range |
| POST | /[id]/streams/read | XREAD |
| POST | /[id]/streams/trim | XTRIM |
| POST | /[id]/streams/delete | XDEL |
| GET | /[id]/streams/info/[key] | XINFO |
| POST | /[id]/streams/groups/create | Create a consumer group |
| POST | /[id]/streams/groups/read | XREADGROUP |
| POST | /[id]/streams/groups/ack | XACK |
| GET | /[id]/streams/groups/pending/[key]/{group} | XPENDING |
| POST | /[id]/streams/groups/claim | XCLAIM |
| POST | /[id]/streams/groups/autoclaim | XAUTOCLAIM |
| POST | /[id]/streams/command | Execute allowed stream commands |
| POST | /[id]/streams/[key]/command | Execute key-specific stream commands |
Add an Entry
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/add`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
key: "events",
values: {
type: "signup",
user_id: 123
}
})
});
const data = await response.json();
console.log(data);Get Stream Length
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/length/events`, {
method: "GET",
headers: {
"Authorization": apiKey
}
});
const data = await response.json();
console.log(data);Read a Range
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const params = new URLSearchParams({
start: "-",
end: "+",
count: "10"
});
const response = await fetch(`${baseUrl}/$[id]/streams/range/events?${params}`, {
method: "GET",
headers: {
"Authorization": apiKey
}
});
const data = await response.json();
console.log(data);Read from Multiple Streams
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/read`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
streams: [
{ key: "events", id: "$" },
{ key: "audit", id: "$" }
],
block_ms: 5000,
count: 25
})
});
const data = await response.json();
console.log(data);Trim by Max Length
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/trim`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
key: "events",
strategy: "maxlen",
threshold: 1000,
approximate: true
})
});
const data = await response.json();
console.log(data);Consumer Groups
Consumer groups allow multiple consumers to process a stream as a team, with built-in tracking of unacknowledged messages.
Create a Group
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/groups/create`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
key: "events",
group: "workers",
id: "$",
mkstream: true
})
});
const data = await response.json();
console.log(data);Read as a Consumer Group
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/groups/read`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
group: "workers",
consumer: "worker-1",
streams: [
{ key: "events", id: ">" }
],
count: 10
})
});
const data = await response.json();
console.log(data);Command Endpoints
POST /[id]/streams/command
Execute allowed stream commands across the plate.
Allowed Commands
| Command | Description |
|---|---|
| XADD | Add entry |
| XLEN | Get length |
| XRANGE | Forward range |
| XREVRANGE | Reverse range |
| XREAD | Read from streams |
| XTRIM | Trim stream |
| XDEL | Delete entries |
| XINFO | Get stream info |
| XGROUP | Manage groups |
| XREADGROUP | Group read |
| XACK | Acknowledge |
| XPENDING | Pending entries |
| XCLAIM | Claim messages |
| XAUTOCLAIM | Auto claim |
Execute Command
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/command`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
command: "XADD",
args: ["events", "*", "type", "signup"]
})
});
const data = await response.json();
console.log(data);POST /[id]/streams/[key]/command
Execute allowed commands on a specific stream key.
Allowed Commands
Same as above.
Execute Key-Specific Command
const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";
const response = await fetch(`${baseUrl}/$[id]/streams/events/command`, {
method: "POST",
headers: {
"Authorization": apiKey,
"Content-Type": "application/json"
},
body: JSON.stringify({
command: "XLEN"
})
});
const data = await response.json();
console.log(data);