Streams
Streams are an append-only log data structure with consumer groups for reliable message processing. They combine the best of pub/sub (fan-out) with durable, replayable message delivery and acknowledgment.
XADD / XLEN
Append entries to a stream. Each entry is a set of field-value pairs with an auto-generated or specified ID. Use * to let Lux generate the ID. XLEN returns the number of entries.
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold] *|id field value [field value ...]
XLEN key> XADD events * action "click" page "/home"
"1679012345678-0"
> XADD events * action "purchase" item "widget" amount "29.99"
"1679012345679-0"
> XADD events MAXLEN ~ 1000 * action "view" page "/pricing"
"1679012345680-0"
> XLEN events
(integer) 3XRANGE / XREVRANGE
Read entries in a range by ID. Use - and + for the minimum and maximum IDs. XREVRANGE returns entries in reverse order. Both support COUNT to limit results.
> XRANGE events - + COUNT 2
1) 1) "1679012345678-0"
2) 1) "action"
2) "click"
3) "page"
4) "/home"
2) 1) "1679012345679-0"
2) 1) "action"
2) "purchase"
3) "item"
4) "widget"
5) "amount"
6) "29.99"
> XREVRANGE events + - COUNT 1
1) 1) "1679012345680-0"
2) 1) "action"
2) "view"
3) "page"
4) "/pricing"XREAD
Read new entries from one or more streams. Use BLOCK to wait for new data. The $ ID means "only new entries from now on." Provide a specific ID to read entries after that point.
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]> XREAD COUNT 10 STREAMS events 0
1) 1) "events"
2) 1) 1) "1679012345678-0"
2) 1) "action"
2) "click"
3) "page"
4) "/home"
> XREAD BLOCK 5000 STREAMS events $Consumer Groups
Consumer groups allow multiple consumers to cooperatively process a stream. Each message is delivered to exactly one consumer in the group. Consumers must acknowledge processed messages.
XGROUP CREATE key group id|$ [MKSTREAM]
XGROUP DESTROY key group
XREADGROUP GROUP group consumer [COUNT count] [BLOCK ms] STREAMS key [key ...] id [id ...]
XACK key group id [id ...]> XGROUP CREATE events workers $ MKSTREAM
OK
> XADD events * action "signup" user "alice"
"1679012345681-0"
> XREADGROUP GROUP workers consumer1 COUNT 1 STREAMS events >
1) 1) "events"
2) 1) 1) "1679012345681-0"
2) 1) "action"
2) "signup"
3) "user"
4) "alice"
> XACK events workers "1679012345681-0"
(integer) 1XPENDING / XCLAIM / XAUTOCLAIM
Manage unacknowledged messages. XPENDING shows pending entries. XCLAIM transfers ownership of pending entries to another consumer. XAUTOCLAIM claims entries that have been idle for a minimum time.
> XPENDING events workers - + 10
1) 1) "1679012345681-0"
2) "consumer1"
3) (integer) 30000
4) (integer) 1
> XCLAIM events workers consumer2 30000 "1679012345681-0"
1) 1) "1679012345681-0"
2) 1) "action"
2) "signup"
3) "user"
4) "alice"
> XAUTOCLAIM events workers consumer2 60000 0
1) "0-0"
2) (empty array)XDEL / XTRIM / XINFO
XDEL removes specific entries by ID. XTRIM caps the stream length using MAXLEN or MINID. XINFO provides details about the stream, its groups, and consumers.
> XDEL events "1679012345678-0"
(integer) 1
> XTRIM events MAXLEN ~ 500
(integer) 0
> XINFO STREAM events
1) "length"
2) (integer) 2
3) "first-entry"
4) 1) "1679012345679-0"
2) 1) "action"
2) "purchase"
5) "last-entry"
6) 1) "1679012345681-0"
2) 1) "action"
2) "signup"Command Reference
| Command | Description |
|---|---|
| XADD | Append an entry to a stream |
| XLEN | Get the number of entries |
| XRANGE | Read entries in a range |
| XREVRANGE | Read entries in reverse range |
| XREAD | Read from one or more streams |
| XREADGROUP | Read as a consumer in a group |
| XGROUP CREATE | Create a consumer group |
| XGROUP DESTROY | Delete a consumer group |
| XACK | Acknowledge processed messages |
| XPENDING | View pending (unacknowledged) entries |
| XCLAIM | Transfer ownership of pending entries |
| XAUTOCLAIM | Claim idle pending entries automatically |
| XDEL | Delete specific entries by ID |
| XTRIM | Cap stream length with MAXLEN or MINID |
| XINFO | Get info about stream, groups, or consumers |