Queue Consumer
Consumers DEQUEUE a message, process it, then ACK on success or NACK on failure. NACK triggers redelivery (with optional backoff). Multiple consumers on the same queue get round-robin distribution.
Code
// nyx-queue consumer — DEQUEUE + ACK loop with redelivery
fn resp_cmd(parts: Array) -> String {
var sb: StringBuilder = StringBuilder.new()
sb.append("*" + int_to_string(parts.length()) + "\r\n")
var i: int = 0
while i < parts.length() {
let p: String = parts[i]
sb.append("$" + int_to_string(p.length()) + "\r\n" + p + "\r\n")
i = i + 1
}
return sb.to_string()
}
fn process_message(payload: String) -> bool {
// Your business logic here — return true if successful
print(" processing: " + payload)
return true
}
fn main() -> int {
let fd: int = tcp_connect("127.0.0.1", 6381)
if fd < 0 {
print("connection failed")
return 1
}
// Consumer loop: dequeue, process, ack/nack
var processed: int = 0
while processed < 3 {
// DEQUEUE returns an array: [msg_id, payload] or nil if empty
tcp_write(fd, resp_cmd(["DEQUEUE", "tasks"]))
let hdr: String = tcp_read_line(fd)
if hdr.trim() == "$-1" {
print("queue empty")
processed = 3
} else {
// Read msg_id and payload
let id_hdr: String = tcp_read_line(fd)
let msg_id: String = tcp_read_line(fd)
let payload_hdr: String = tcp_read_line(fd)
let payload: String = tcp_read_line(fd)
let success: bool = process_message(payload)
if success {
// ACK removes the message from in-flight
tcp_write(fd, resp_cmd(["ACK", "tasks", msg_id.trim()]))
tcp_read_line(fd)
} else {
// NACK triggers redelivery
tcp_write(fd, resp_cmd(["NACK", "tasks", msg_id.trim()]))
tcp_read_line(fd)
}
processed = processed + 1
}
}
print("consumer processed " + int_to_string(processed) + " messages")
tcp_close(fd)
return 0
}
Output
processing: {"task":"send_email","to":"a@b.com"}
processing: {"task":"send_email","to":"c@d.com"}
queue empty
consumer processed 3 messages
Explanation
The consumer loop is the core of any queue worker. DEQUEUE atomically reserves a message (moving it to in-flight state), your handler processes it, then ACK confirms completion or NACK returns it for redelivery. If the consumer crashes before ACK, nyx-queue automatically re-delivers after a visibility timeout. With multiple consumers connected, nyx-queue distributes messages round-robin — horizontal scaling comes for free.