Colas de mensajes con nyx-queue
Que es una cola de mensajes?
Una cola de mensajes es un buffer que retiene mensajes hasta que un consumidor esta listo para procesarlos. A diferencia de Pub/Sub (donde los mensajes se pierden si nadie escucha), una cola persiste los mensajes y garantiza la entrega.
Producer ──► ENQUEUE "emails" "welcome:alice" ──► nyx-queue
│
┌──┴──┐
│Queue│
│ │
│msg1 │
│msg2 │
│msg3 │
└──┬──┘
│ DEQUEUE
┌──────────┼──────────┐
▼ ▼ ▼
Consumer 1 Consumer 2 Consumer 3
Diferencia clave con Pub/Sub: cada mensaje va a exactamente un consumidor (round-robin), no a todos.
Primeros pasos
nyx-queue habla el protocolo RESP, asi que usas redis-cli para interactuar con el:
$ redis-cli -p 6381 127.0.0.1:6381> PING PONG
Producir mensajes
Envia un mensaje a una cola con ENQUEUE:
127.0.0.1:6381> ENQUEUE emails "welcome:alice@example.com" "msg_1712001234_1"
El comando devuelve un ID de mensaje unico. La cola se crea automaticamente si no existe.
Consumir mensajes
DEQUEUE recupera el siguiente mensaje. Se bloquea hasta que un mensaje este disponible:
127.0.0.1:6381> DEQUEUE emails 1) "msg_1712001234_1" 2) "welcome:alice@example.com"
La respuesta es un array de dos elementos: el ID del mensaje y el contenido. Para modo no bloqueante:
127.0.0.1:6381> DEQUEUE emails NOWAIT (nil)
Confirmar mensajes
Despues de procesar un mensaje, confirmalo con ACK:
127.0.0.1:6381> ACK emails msg_1712001234_1 OK
Si el procesamiento falla, envia NACK para devolver el mensaje a la cola para reenvio:
127.0.0.1:6381> NACK emails msg_1712001234_1 OK
Reenvio automatico
Si un consumidor se cae sin enviar ACK o NACK, el mensaje se devuelve automaticamente a la cola despues de 30 segundos (configurable con --ack-timeout). Esto proporciona entrega al-menos-una-vez — cada mensaje sera procesado al menos una vez.
Gestion de colas
| Comando | Descripcion |
|---|---|
QLEN queue | Numero de mensajes pendientes |
QINFO queue | Devuelve [pendientes, entregados, total_encolados, total_confirmados] |
QUEUES | Lista todos los nombres de colas |
QDEL queue | Elimina una cola y todos sus mensajes |
INFO | Estadisticas del servidor |
Persistencia
Los mensajes sobreviven a los reinicios. nyx-queue guarda su estado en un archivo binario queue.ndb usando el mismo patron de guardado en segundo plano que nyx-kv — cada 60 segundos o despues de 100 cambios. Al reiniciar, los mensajes que fueron entregados pero no confirmados se restablecen a pendientes.
Ejemplo: procesador de emails en segundo plano
Una aplicacion web encola emails. Tres procesos worker los desencolan y envian:
# Productor (aplicacion web)
import redis
q = redis.Redis(port=6381)
q.execute_command("ENQUEUE", "emails", "welcome:alice@example.com")
q.execute_command("ENQUEUE", "emails", "receipt:order_123")
q.execute_command("ENQUEUE", "emails", "reset:bob@example.com")
# Consumidor (worker de email)
import redis
q = redis.Redis(port=6381)
while True:
msg = q.execute_command("DEQUEUE", "emails")
msg_id, payload = msg[0], msg[1]
try:
send_email(payload)
q.execute_command("ACK", "emails", msg_id)
except Exception:
q.execute_command("NACK", "emails", msg_id)
Ejecuta tres workers en paralelo. nyx-queue distribuye mensajes en round-robin — ningun mensaje se procesa dos veces (a menos que se haga NACK).
Resumen
ENQUEUE queue payloadagrega un mensaje y devuelve un ID unico.DEQUEUE queuese bloquea hasta que un mensaje este disponible.ACKmarca un mensaje como procesado;NACKlo devuelve para reintento.- El reenvio automatico despues de 30s asegura que ningun mensaje se pierda.
- Los mensajes persisten en disco y sobreviven a los reinicios.
- Entrega round-robin entre multiples consumidores.