Nyx by Example

Queue Consumer

Consumers DEQUEUE a message, process it, then ACK on success or NACK on failure. NACK triggers redelivery (with optional backoff). Multiple consumers on the same queue get round-robin distribution.

Code

// nyx-queue consumer — DEQUEUE + ACK loop with redelivery

fn resp_cmd(parts: Array) -> String {
    var sb: StringBuilder = StringBuilder.new()
    sb.append("*" + int_to_string(parts.length()) + "\r\n")
    var i: int = 0
    while i < parts.length() {
        let p: String = parts[i]
        sb.append("$" + int_to_string(p.length()) + "\r\n" + p + "\r\n")
        i = i + 1
    }
    return sb.to_string()
}

fn process_message(payload: String) -> bool {
    // Your business logic here — return true if successful
    print("  processing: " + payload)
    return true
}

fn main() -> int {
    let fd: int = tcp_connect("127.0.0.1", 6381)
    if fd < 0 {
        print("connection failed")
        return 1
    }

    // Consumer loop: dequeue, process, ack/nack
    var processed: int = 0
    while processed < 3 {
        // DEQUEUE returns an array: [msg_id, payload] or nil if empty
        tcp_write(fd, resp_cmd(["DEQUEUE", "tasks"]))
        let hdr: String = tcp_read_line(fd)
        if hdr.trim() == "$-1" {
            print("queue empty")
            processed = 3
        } else {
            // Read msg_id and payload
            let id_hdr: String = tcp_read_line(fd)
            let msg_id: String = tcp_read_line(fd)
            let payload_hdr: String = tcp_read_line(fd)
            let payload: String = tcp_read_line(fd)

            let success: bool = process_message(payload)
            if success {
                // ACK removes the message from in-flight
                tcp_write(fd, resp_cmd(["ACK", "tasks", msg_id.trim()]))
                tcp_read_line(fd)
            } else {
                // NACK triggers redelivery
                tcp_write(fd, resp_cmd(["NACK", "tasks", msg_id.trim()]))
                tcp_read_line(fd)
            }
            processed = processed + 1
        }
    }
    print("consumer processed " + int_to_string(processed) + " messages")

    tcp_close(fd)
    return 0
}

Output

  processing: {"task":"send_email","to":"a@b.com"}
  processing: {"task":"send_email","to":"c@d.com"}
queue empty
consumer processed 3 messages

Explanation

The consumer loop is the core of any queue worker. DEQUEUE atomically reserves a message (moving it to in-flight state), your handler processes it, then ACK confirms completion or NACK returns it for redelivery. If the consumer crashes before ACK, nyx-queue automatically re-delivers after a visibility timeout. With multiple consumers connected, nyx-queue distributes messages round-robin — horizontal scaling comes for free.

← Previous Next →

Source: examples/by-example/91-queue-consumer.nx