nyx-kv Pub/Sub
PUBLISH broadcasts a message to all SUBSCRIBE clients on a channel. Pub/Sub decouples producers from consumers — perfect for real-time notifications.
Code
// nyx-kv Pub/Sub -- SUBSCRIBE and PUBLISH for real-time messaging
fn resp_cmd(parts: Array) -> String {
var sb: StringBuilder = StringBuilder.new()
sb.append("*")
sb.append(int_to_string(parts.length()))
sb.append("\r\n")
var i: int = 0
while i < parts.length() {
let p: String = parts[i]
sb.append("$")
sb.append(int_to_string(p.length()))
sb.append("\r\n")
sb.append(p)
sb.append("\r\n")
i = i + 1
}
return sb.to_string()
}
fn main() -> int {
// Publisher connection
let pub_fd: int = tcp_connect("127.0.0.1", 6380)
if pub_fd < 0 {
print("publisher connection failed")
return 1
}
// Publish a message to a channel
tcp_write(pub_fd, resp_cmd(["PUBLISH", "notifications", "user signed up"]))
let pub_reply: String = tcp_read_line(pub_fd)
print("PUBLISH -> " + pub_reply.trim() + " subscribers received")
tcp_close(pub_fd)
// Subscriber connection (usually runs in a separate thread)
let sub_fd: int = tcp_connect("127.0.0.1", 6380)
if sub_fd < 0 { return 1 }
// SUBSCRIBE -- the connection becomes a message stream
tcp_write(sub_fd, resp_cmd(["SUBSCRIBE", "notifications"]))
let sub_hdr: String = tcp_read_line(sub_fd)
print("SUBSCRIBE header -> " + sub_hdr.trim())
// In a real app, loop reading messages:
// let msg_type: String = tcp_read_line(sub_fd)
// let channel: String = tcp_read_line(sub_fd)
// let payload: String = tcp_read_line(sub_fd)
// UNSUBSCRIBE when done
tcp_write(sub_fd, resp_cmd(["UNSUBSCRIBE", "notifications"]))
tcp_close(sub_fd)
return 0
}
Output
PUBLISH -> :0 subscribers received SUBSCRIBE header -> *3
Explanation
Pub/Sub is a fire-and-forget broadcast model. The publisher sends PUBLISH channel payload and the server returns the number of currently-subscribed clients that received the message. Messages are not persisted — subscribers that join later never see past messages. If you need replay, use a list or a stream.
A subscribing connection enters a special mode: it stops accepting regular commands and starts receiving a sequence of three-element arrays ["message", channel, payload] for every matching publish. Because the subscriber blocks on reads, it typically runs in its own thread or goroutine; the publisher can live on any other connection.
Use Pub/Sub for live dashboards, cache invalidation fanout, chat rooms, and cross-service event buses. For guaranteed delivery, combine with nyx-queue.