Nyx by Example

Queue Enqueue

nyx-queue is a persistent message queue on port 6381. ENQUEUE appends a message and returns a unique ID. QLEN returns queue depth. Messages survive restarts via the .ndb persistence format.

Code

// nyx-queue — ENQUEUE messages to a persistent queue
// nyx-queue — encolar mensajes en una cola persistente

fn resp_cmd(parts: Array) -> String {
    var sb: StringBuilder = StringBuilder.new()
    sb.append("*" + int_to_string(parts.length()) + "\r\n")
    var i: int = 0
    while i < parts.length() {
        let p: String = parts[i]
        sb.append("$" + int_to_string(p.length()) + "\r\n" + p + "\r\n")
        i = i + 1
    }
    return sb.to_string()
}

fn main() -> int {
    // nyx-queue listens on port 6381 (RESP protocol like nyx-kv)
    let fd: int = tcp_connect("127.0.0.1", 6381)
    if fd < 0 {
        print("connection failed (is nyx-queue running?)")
        return 1
    }

    // Enqueue a message — returns a unique message ID
    tcp_write(fd, resp_cmd(["ENQUEUE", "emails", "{\"to\":\"user@example.com\",\"subject\":\"Hi\"}"]))
    let hdr: String = tcp_read_line(fd)
    let msg_id: String = tcp_read_line(fd)
    print("ENQUEUE -> msg_id: " + msg_id.trim())

    // Get queue length
    tcp_write(fd, resp_cmd(["QLEN", "emails"]))
    let qlen: String = tcp_read_line(fd)
    print("QLEN emails -> " + qlen.trim())

    // Enqueue multiple messages for batch processing
    var i: int = 0
    while i < 3 {
        let payload: String = "task_" + int_to_string(i)
        tcp_write(fd, resp_cmd(["ENQUEUE", "tasks", payload]))
        tcp_read_line(fd)  // header
        tcp_read_line(fd)  // msg_id
        i = i + 1
    }
    print("enqueued 3 tasks")

    tcp_close(fd)
    return 0
}

Output

ENQUEUE -> msg_id: 1
QLEN emails -> :1
enqueued 3 tasks

Explanation

Queues separate "accept the work" from "actually do the work" — the web handler returns in microseconds, a worker pool chews through the backlog at its own pace. nyx-queue speaks RESP over port 6381 so any nyx-kv client library just works; the commands are the interesting difference. ENQUEUE appends to a named queue and returns a monotonic message ID that's useful for idempotency and tracing. QLEN is O(1) and perfect for autoscaling signals ("spin up a worker when depth > 1000"). Persistence uses the same .ndb binary format as nyx-kv, so messages survive a crash or restart — something in-memory queues like Redis lists can't promise without AOF. A single queue can handle email, push notifications, and billing events all in parallel.

← Previous Next →

Source: examples/by-example/90-queue-enqueue.nx