Índice

Colas de mensajes con nyx-queue

Que es una cola de mensajes?

Una cola de mensajes es un buffer que retiene mensajes hasta que un consumidor esta listo para procesarlos. A diferencia de Pub/Sub (donde los mensajes se pierden si nadie escucha), una cola persiste los mensajes y garantiza la entrega.

Producer ──► ENQUEUE "emails" "welcome:alice" ──► nyx-queue
                                                       │
                                                    ┌──┴──┐
                                                    │Queue│
                                                    │     │
                                                    │msg1 │
                                                    │msg2 │
                                                    │msg3 │
                                                    └──┬──┘
                                                       │ DEQUEUE
                                            ┌──────────┼──────────┐
                                            ▼          ▼          ▼
                                       Consumer 1  Consumer 2  Consumer 3

Diferencia clave con Pub/Sub: cada mensaje va a exactamente un consumidor (round-robin), no a todos.

Primeros pasos

nyx-queue habla el protocolo RESP, asi que usas redis-cli para interactuar con el:

$ redis-cli -p 6381
127.0.0.1:6381> PING
PONG

Producir mensajes

Envia un mensaje a una cola con ENQUEUE:

127.0.0.1:6381> ENQUEUE emails "welcome:alice@example.com"
"msg_1712001234_1"

El comando devuelve un ID de mensaje unico. La cola se crea automaticamente si no existe.

Consumir mensajes

DEQUEUE recupera el siguiente mensaje. Se bloquea hasta que un mensaje este disponible:

127.0.0.1:6381> DEQUEUE emails
1) "msg_1712001234_1"
2) "welcome:alice@example.com"

La respuesta es un array de dos elementos: el ID del mensaje y el contenido. Para modo no bloqueante:

127.0.0.1:6381> DEQUEUE emails NOWAIT
(nil)

Confirmar mensajes

Despues de procesar un mensaje, confirmalo con ACK:

127.0.0.1:6381> ACK emails msg_1712001234_1
OK

Si el procesamiento falla, envia NACK para devolver el mensaje a la cola para reenvio:

127.0.0.1:6381> NACK emails msg_1712001234_1
OK

Reenvio automatico

Si un consumidor se cae sin enviar ACK o NACK, el mensaje se devuelve automaticamente a la cola despues de 30 segundos (configurable con --ack-timeout). Esto proporciona entrega al-menos-una-vez — cada mensaje sera procesado al menos una vez.

Gestion de colas

ComandoDescripcion
QLEN queueNumero de mensajes pendientes
QINFO queueDevuelve [pendientes, entregados, total_encolados, total_confirmados]
QUEUESLista todos los nombres de colas
QDEL queueElimina una cola y todos sus mensajes
INFOEstadisticas del servidor

Persistencia

Los mensajes sobreviven a los reinicios. nyx-queue guarda su estado en un archivo binario queue.ndb usando el mismo patron de guardado en segundo plano que nyx-kv — cada 60 segundos o despues de 100 cambios. Al reiniciar, los mensajes que fueron entregados pero no confirmados se restablecen a pendientes.

Ejemplo: procesador de emails en segundo plano

Una aplicacion web encola emails. Tres procesos worker los desencolan y envian:

# Productor (aplicacion web)
import redis
q = redis.Redis(port=6381)
q.execute_command("ENQUEUE", "emails", "welcome:alice@example.com")
q.execute_command("ENQUEUE", "emails", "receipt:order_123")
q.execute_command("ENQUEUE", "emails", "reset:bob@example.com")
# Consumidor (worker de email)
import redis
q = redis.Redis(port=6381)

while True:
    msg = q.execute_command("DEQUEUE", "emails")
    msg_id, payload = msg[0], msg[1]
    try:
        send_email(payload)
        q.execute_command("ACK", "emails", msg_id)
    except Exception:
        q.execute_command("NACK", "emails", msg_id)

Ejecuta tres workers en paralelo. nyx-queue distribuye mensajes en round-robin — ningun mensaje se procesa dos veces (a menos que se haga NACK).

Resumen

← Anterior: Tiempo real con Pub/Sub Siguiente: HTTP/2 — Protocolos binarios →