SmallPlate

Streams API

Append-only logs with entry IDs for events, feeds, and audit trails

Streams are append-only logs with entry IDs and field-value payloads. They work well for events, feeds, message processing, and audit trails.

Endpoints

MethodEndpointDescription
POST/[id]/streams/addAdd an entry
GET/[id]/streams/length/[key]Get stream length
GET/[id]/streams/range/[key]Forward range
GET/[id]/streams/reverse/[key]Reverse range
POST/[id]/streams/readXREAD
POST/[id]/streams/trimXTRIM
POST/[id]/streams/deleteXDEL
GET/[id]/streams/info/[key]XINFO
POST/[id]/streams/groups/createCreate a consumer group
POST/[id]/streams/groups/readXREADGROUP
POST/[id]/streams/groups/ackXACK
GET/[id]/streams/groups/pending/[key]/{group}XPENDING
POST/[id]/streams/groups/claimXCLAIM
POST/[id]/streams/groups/autoclaimXAUTOCLAIM
POST/[id]/streams/commandExecute allowed stream commands
POST/[id]/streams/[key]/commandExecute key-specific stream commands

Add an Entry

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/add`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    key: "events",
    values: {
      type: "signup",
      user_id: 123
    }
  })
});

const data = await response.json();
console.log(data);

Get Stream Length

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/length/events`, {
  method: "GET",
  headers: {
    "Authorization": apiKey
  }
});

const data = await response.json();
console.log(data);

Read a Range

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const params = new URLSearchParams({
  start: "-",
  end: "+",
  count: "10"
});

const response = await fetch(`${baseUrl}/$[id]/streams/range/events?${params}`, {
  method: "GET",
  headers: {
    "Authorization": apiKey
  }
});

const data = await response.json();
console.log(data);

Read from Multiple Streams

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/read`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    streams: [
      { key: "events", id: "$" },
      { key: "audit", id: "$" }
    ],
    block_ms: 5000,
    count: 25
  })
});

const data = await response.json();
console.log(data);

Trim by Max Length

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/trim`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    key: "events",
    strategy: "maxlen",
    threshold: 1000,
    approximate: true
  })
});

const data = await response.json();
console.log(data);

Consumer Groups

Consumer groups allow multiple consumers to process a stream as a team, with built-in tracking of unacknowledged messages.

Create a Group

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/groups/create`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    key: "events",
    group: "workers",
    id: "$",
    mkstream: true
  })
});

const data = await response.json();
console.log(data);

Read as a Consumer Group

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/groups/read`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    group: "workers",
    consumer: "worker-1",
    streams: [
      { key: "events", id: ">" }
    ],
    count: 10
  })
});

const data = await response.json();
console.log(data);

Command Endpoints

POST /[id]/streams/command

Execute allowed stream commands across the plate.

Allowed Commands

CommandDescription
XADDAdd entry
XLENGet length
XRANGEForward range
XREVRANGEReverse range
XREADRead from streams
XTRIMTrim stream
XDELDelete entries
XINFOGet stream info
XGROUPManage groups
XREADGROUPGroup read
XACKAcknowledge
XPENDINGPending entries
XCLAIMClaim messages
XAUTOCLAIMAuto claim

Execute Command

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/command`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    command: "XADD",
    args: ["events", "*", "type", "signup"]
  })
});

const data = await response.json();
console.log(data);

POST /[id]/streams/[key]/command

Execute allowed commands on a specific stream key.

Allowed Commands

Same as above.

Execute Key-Specific Command

const plateId = "[id]";
const apiKey = "your-api-key";
const baseUrl = "[base-url]";

const response = await fetch(`${baseUrl}/$[id]/streams/events/command`, {
  method: "POST",
  headers: {
    "Authorization": apiKey,
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    command: "XLEN"
  })
});

const data = await response.json();
console.log(data);

On this page