Concurrencia — Threads y channels
¿Por qué concurrencia?
En el Capítulo 17, construiste un servidor TCP que maneja un cliente a la vez. Mientras habla con el cliente A, los clientes B, C y D tienen que esperar. Para una app de chat o un servidor web, eso es inaceptable.
Concurrencia significa hacer múltiples cosas a la vez — o al menos aparentarlo. Con concurrencia, tu servidor puede manejar cientos de clientes simultáneamente.
Nyx proporciona tres herramientas para concurrencia: threads, channels y mutexes.
¿Qué es un thread?
Un thread es como un segundo trabajador dentro de tu programa. Tu programa principal es un thread. Cuando creas un nuevo thread, obtienes un segundo trabajador que se ejecuta al mismo tiempo que el primero.
Piensa en la cocina de un restaurante: un chef solo puede cocinar un plato a la vez. Dos chefs pueden cocinar dos platos simultáneamente. Los threads son tus chefs extra.
Crear un thread
fn computar() -> int { var total: int = 0 var i: int = 0 while i < 1000000 { total += 1 i += 1 } return total } fn main() { print("Iniciando thread...") let handle: int = thread_spawn(computar) print("El thread principal continúa mientras el worker computa...") let resultado: int = thread_join(handle) print("Thread terminó con: " + int_to_string(resultado)) }
thread_spawn(funcion)inicia un nuevo thread ejecutando la función dada. Devuelve un handle.thread_join(handle)espera a que el thread termine y devuelve su resultado.
Entre spawn y join, ambos threads se ejecutan simultáneamente.
Múltiples threads
fn trabajo_a() -> int { print("Worker A iniciado") var i: int = 0 while i < 500000 { i += 1 } print("Worker A terminado") return 1 } fn trabajo_b() -> int { print("Worker B iniciado") var i: int = 0 while i < 500000 { i += 1 } print("Worker B terminado") return 2 } fn trabajo_c() -> int { print("Worker C iniciado") var i: int = 0 while i < 500000 { i += 1 } print("Worker C terminado") return 3 } fn main() { let ha: int = thread_spawn(trabajo_a) let hb: int = thread_spawn(trabajo_b) let hc: int = thread_spawn(trabajo_c) let ra: int = thread_join(ha) let rb: int = thread_join(hb) let rc: int = thread_join(hc) print("Resultados: " + int_to_string(ra) + ", " + int_to_string(rb) + ", " + int_to_string(rc)) }
Los tres workers se ejecutan concurrentemente. El orden de los mensajes "iniciado"/"terminado" puede variar entre ejecuciones — eso es la concurrencia en acción.
El problema de los datos compartidos
¿Qué pasa cuando dos threads intentan modificar la misma variable?
Thread A: lee contador (0) Thread B: lee contador (0) Thread A: escribe contador (1) Thread B: escribe contador (1) ← debería ser 2, ¡pero es 1!
Esto se llama una condición de carrera — ambos threads "compiten" por usar los datos, y el resultado depende de quién llegue primero. Las condiciones de carrera causan bugs sutiles y difíciles de reproducir.
Mutexes: proteger datos compartidos
Un mutex (exclusión mutua) es un candado. Solo un thread puede tener el candado a la vez. Si el thread B intenta bloquear un mutex que el thread A ya tiene, el thread B espera hasta que el thread A lo desbloquee.
fn main() { let m: Map = mutex_new() var contador: int = 0 fn worker_a() -> int { var i: int = 0 while i < 100 { mutex_lock(m) contador = contador + 1 mutex_unlock(m) i += 1 } return 0 } fn worker_b() -> int { var i: int = 0 while i < 100 { mutex_lock(m) contador = contador + 1 mutex_unlock(m) i += 1 } return 0 } let ha: int = thread_spawn(worker_a) let hb: int = thread_spawn(worker_b) thread_join(ha) thread_join(hb) print("Contador: " + int_to_string(contador)) // 200 }
Sin el mutex, el contador podría terminar en menos de 200 por condiciones de carrera. Con él, exactamente un thread modifica contador a la vez.
Reglas clave:
mutex_new()crea un nuevo mutex.mutex_lock(m)adquiere el candado (espera si otro thread lo tiene).mutex_unlock(m)libera el candado.- Siempre desbloquea lo que bloqueas. Olvidar desbloquear causa deadlocks — threads esperando para siempre.
Channels: comunicación segura entre threads
Un channel es una tubería entre threads. Un thread envía valores, otro thread los recibe. Los channels son la forma más segura de comunicación entre threads.
fn main() { let ch: Map = channel_new(8) fn productor() -> int { var i: int = 0 while i < 5 { channel_send(ch, i * 10) i += 1 } return 0 } let handle: int = thread_spawn(productor) var i: int = 0 while i < 5 { let valor: int = channel_recv(ch) print("Recibido: " + int_to_string(valor)) i += 1 } thread_join(handle) }
Salida:
Recibido: 0 Recibido: 10 Recibido: 20 Recibido: 30 Recibido: 40
channel_new(capacidad)crea un channel que puede almacenar hastacapacidaditems.channel_send(ch, valor)envía un valor. Bloquea si el channel está lleno.channel_recv(ch)recibe un valor. Bloquea si el channel está vacío.channel_try_recv(ch)intenta recibir sin bloquear. Devuelve -1 si está vacío.
Patrón: pool de workers
Un patrón común es tener múltiples threads worker leyendo del mismo channel:
fn main() { let trabajos: Map = channel_new(16) let resultados: Map = channel_new(16) fn worker() -> int { while 1 > 0 { let trabajo: int = channel_recv(trabajos) if trabajo < 0 { return 0 } // "Procesar" el trabajo: elevar al cuadrado channel_send(resultados, trabajo * trabajo) } return 0 } // Iniciar 4 workers let w1: int = thread_spawn(worker) let w2: int = thread_spawn(worker) let w3: int = thread_spawn(worker) let w4: int = thread_spawn(worker) // Enviar 8 trabajos var i: int = 1 while i <= 8 { channel_send(trabajos, i) i += 1 } // Recolectar 8 resultados var total: int = 0 i = 0 while i < 8 { let r: int = channel_recv(resultados) print("Resultado: " + int_to_string(r)) total += r i += 1 } print("Total: " + int_to_string(total)) // Señalar a los workers que paren channel_send(trabajos, -1) channel_send(trabajos, -1) channel_send(trabajos, -1) channel_send(trabajos, -1) thread_join(w1) thread_join(w2) thread_join(w3) thread_join(w4) }
Los workers se ejecutan en paralelo, cada uno tomando el siguiente trabajo disponible del channel. Así es como los servidores del mundo real distribuyen trabajo entre los núcleos de CPU.
Servidor HTTP multi-threaded
La biblioteca estándar de Nyx incluye http_serve_mt — un servidor HTTP multi-threaded que usa un pool de threads internamente:
import { http_serve_mt, http_response } from "std/http" fn al_recibir_solicitud(request: Array) -> String { let ruta: String = request[2] if ruta == "/" { return http_response(200, "¡Hola desde un servidor multi-threaded!") } return http_response(404, "No Encontrado") } fn main() { print("Servidor multi-threaded en http://localhost:8080") http_serve_mt(8080, 4, al_recibir_solicitud) }
El segundo parámetro (4) es el número de threads worker. Cada thread maneja solicitudes independientemente, así que múltiples clientes son atendidos simultáneamente. Así es como Nyx logra más de 73,000 solicitudes por segundo.
Ejemplo práctico: computación paralela
Divide una tarea grande entre múltiples threads:
fn main() { let ch: Map = channel_new(4) fn sumar_rango_1() -> int { var total: int = 0 var i: int = 1 while i <= 250000 { total += i i += 1 } channel_send(ch, total) return 0 } fn sumar_rango_2() -> int { var total: int = 0 var i: int = 250001 while i <= 500000 { total += i i += 1 } channel_send(ch, total) return 0 } fn sumar_rango_3() -> int { var total: int = 0 var i: int = 500001 while i <= 750000 { total += i i += 1 } channel_send(ch, total) return 0 } fn sumar_rango_4() -> int { var total: int = 0 var i: int = 750001 while i <= 1000000 { total += i i += 1 } channel_send(ch, total) return 0 } let h1: int = thread_spawn(sumar_rango_1) let h2: int = thread_spawn(sumar_rango_2) let h3: int = thread_spawn(sumar_rango_3) let h4: int = thread_spawn(sumar_rango_4) var gran_total: int = 0 var i: int = 0 while i < 4 { gran_total += channel_recv(ch) i += 1 } thread_join(h1) thread_join(h2) thread_join(h3) thread_join(h4) print("Suma de 1 a 1,000,000 = " + int_to_string(gran_total)) // 500000500000 }
Cuatro threads computan cada uno un cuarto de la suma, luego el thread principal suma los resultados parciales.
Ejercicios
- Escribe un programa que cree 3 threads, cada uno imprimiendo su propio mensaje, y espera a que todos terminen.
- Usa un mutex para incrementar de forma segura un contador compartido desde 5 threads, cada uno sumando 1000. El resultado final debe ser 5000.
- Construye un sistema productor-consumidor: un thread genera números del 1 al 20 y los envía por un channel. Otro thread los recibe e imprime.
- Escribe un
mapparalelo: divide un array en partes, procesa cada parte en un thread separado, y recolecta resultados via un channel.
- Construye un servidor echo multi-threaded usando
tcp_listen,tcp_accept, ythread_spawn— crea un nuevo thread para cada conexión de cliente.
Resumen
thread_spawn(fn)inicia un nuevo thread.thread_join(handle)espera a que termine.- Las condiciones de carrera ocurren cuando los threads acceden a datos compartidos sin coordinación.
mutex_new(),mutex_lock(m),mutex_unlock(m)protegen datos compartidos.channel_new(cap),channel_send(ch, val),channel_recv(ch)permiten a los threads comunicarse de forma segura.- Los pools de workers usan channels para distribuir trabajo entre threads.
http_serve_mt(puerto, workers, handler)ejecuta un servidor HTTP multi-threaded.- Regla general: prefiere channels sobre mutexes. "No comuniques compartiendo memoria; comparte memoria comunicando."
Siguiente capítulo: Tu segundo proyecto — Un servidor web →