Nyx by Example

nyx-kv Pub/Sub

PUBLISH broadcasts a message to all SUBSCRIBE clients on a channel. Pub/Sub decouples producers from consumers — perfect for real-time notifications.

Code

// nyx-kv Pub/Sub -- SUBSCRIBE and PUBLISH for real-time messaging

fn resp_cmd(parts: Array) -> String {
    var sb: StringBuilder = StringBuilder.new()
    sb.append("*")
    sb.append(int_to_string(parts.length()))
    sb.append("\r\n")
    var i: int = 0
    while i < parts.length() {
        let p: String = parts[i]
        sb.append("$")
        sb.append(int_to_string(p.length()))
        sb.append("\r\n")
        sb.append(p)
        sb.append("\r\n")
        i = i + 1
    }
    return sb.to_string()
}

fn main() -> int {
    // Publisher connection
    let pub_fd: int = tcp_connect("127.0.0.1", 6380)
    if pub_fd < 0 {
        print("publisher connection failed")
        return 1
    }

    // Publish a message to a channel
    tcp_write(pub_fd, resp_cmd(["PUBLISH", "notifications", "user signed up"]))
    let pub_reply: String = tcp_read_line(pub_fd)
    print("PUBLISH -> " + pub_reply.trim() + " subscribers received")

    tcp_close(pub_fd)

    // Subscriber connection (usually runs in a separate thread)
    let sub_fd: int = tcp_connect("127.0.0.1", 6380)
    if sub_fd < 0 { return 1 }

    // SUBSCRIBE -- the connection becomes a message stream
    tcp_write(sub_fd, resp_cmd(["SUBSCRIBE", "notifications"]))
    let sub_hdr: String = tcp_read_line(sub_fd)
    print("SUBSCRIBE header -> " + sub_hdr.trim())

    // In a real app, loop reading messages:
    //   let msg_type: String = tcp_read_line(sub_fd)
    //   let channel: String = tcp_read_line(sub_fd)
    //   let payload: String = tcp_read_line(sub_fd)

    // UNSUBSCRIBE when done
    tcp_write(sub_fd, resp_cmd(["UNSUBSCRIBE", "notifications"]))
    tcp_close(sub_fd)
    return 0
}

Output

PUBLISH -> :0 subscribers received
SUBSCRIBE header -> *3

Explanation

Pub/Sub is a fire-and-forget broadcast model. The publisher sends PUBLISH channel payload and the server returns the number of currently-subscribed clients that received the message. Messages are not persisted — subscribers that join later never see past messages. If you need replay, use a list or a stream.

A subscribing connection enters a special mode: it stops accepting regular commands and starts receiving a sequence of three-element arrays ["message", channel, payload] for every matching publish. Because the subscriber blocks on reads, it typically runs in its own thread or goroutine; the publisher can live on any other connection.

Use Pub/Sub for live dashboards, cache invalidation fanout, chat rooms, and cross-service event buses. For guaranteed delivery, combine with nyx-queue.

← Previous Next →

Source: examples/by-example/75-kv-pubsub.nx