Índice

Concurrencia — Threads y channels

¿Por qué concurrencia?

En el Capítulo 17, construiste un servidor TCP que maneja un cliente a la vez. Mientras habla con el cliente A, los clientes B, C y D tienen que esperar. Para una app de chat o un servidor web, eso es inaceptable.

Concurrencia significa hacer múltiples cosas a la vez — o al menos aparentarlo. Con concurrencia, tu servidor puede manejar cientos de clientes simultáneamente.

Nyx proporciona tres herramientas para concurrencia: threads, channels y mutexes.

¿Qué es un thread?

Un thread es como un segundo trabajador dentro de tu programa. Tu programa principal es un thread. Cuando creas un nuevo thread, obtienes un segundo trabajador que se ejecuta al mismo tiempo que el primero.

Piensa en la cocina de un restaurante: un chef solo puede cocinar un plato a la vez. Dos chefs pueden cocinar dos platos simultáneamente. Los threads son tus chefs extra.

Crear un thread

fn computar() -> int {
    var total: int = 0
    var i: int = 0
    while i < 1000000 {
        total += 1
        i += 1
    }
    return total
}

fn main() {
    print("Iniciando thread...")
    let handle: int = thread_spawn(computar)

    print("El thread principal continúa mientras el worker computa...")

    let resultado: int = thread_join(handle)
    print("Thread terminó con: " + int_to_string(resultado))
}

Entre spawn y join, ambos threads se ejecutan simultáneamente.

Múltiples threads

fn trabajo_a() -> int {
    print("Worker A iniciado")
    var i: int = 0
    while i < 500000 { i += 1 }
    print("Worker A terminado")
    return 1
}

fn trabajo_b() -> int {
    print("Worker B iniciado")
    var i: int = 0
    while i < 500000 { i += 1 }
    print("Worker B terminado")
    return 2
}

fn trabajo_c() -> int {
    print("Worker C iniciado")
    var i: int = 0
    while i < 500000 { i += 1 }
    print("Worker C terminado")
    return 3
}

fn main() {
    let ha: int = thread_spawn(trabajo_a)
    let hb: int = thread_spawn(trabajo_b)
    let hc: int = thread_spawn(trabajo_c)

    let ra: int = thread_join(ha)
    let rb: int = thread_join(hb)
    let rc: int = thread_join(hc)

    print("Resultados: " + int_to_string(ra) + ", " + int_to_string(rb) + ", " + int_to_string(rc))
}

Los tres workers se ejecutan concurrentemente. El orden de los mensajes "iniciado"/"terminado" puede variar entre ejecuciones — eso es la concurrencia en acción.

El problema de los datos compartidos

¿Qué pasa cuando dos threads intentan modificar la misma variable?

Thread A: lee contador (0)
Thread B: lee contador (0)
Thread A: escribe contador (1)
Thread B: escribe contador (1)   ← debería ser 2, ¡pero es 1!

Esto se llama una condición de carrera — ambos threads "compiten" por usar los datos, y el resultado depende de quién llegue primero. Las condiciones de carrera causan bugs sutiles y difíciles de reproducir.

Mutexes: proteger datos compartidos

Un mutex (exclusión mutua) es un candado. Solo un thread puede tener el candado a la vez. Si el thread B intenta bloquear un mutex que el thread A ya tiene, el thread B espera hasta que el thread A lo desbloquee.

fn main() {
    let m: Map = mutex_new()
    var contador: int = 0

    fn worker_a() -> int {
        var i: int = 0
        while i < 100 {
            mutex_lock(m)
            contador = contador + 1
            mutex_unlock(m)
            i += 1
        }
        return 0
    }

    fn worker_b() -> int {
        var i: int = 0
        while i < 100 {
            mutex_lock(m)
            contador = contador + 1
            mutex_unlock(m)
            i += 1
        }
        return 0
    }

    let ha: int = thread_spawn(worker_a)
    let hb: int = thread_spawn(worker_b)

    thread_join(ha)
    thread_join(hb)

    print("Contador: " + int_to_string(contador))    // 200
}

Sin el mutex, el contador podría terminar en menos de 200 por condiciones de carrera. Con él, exactamente un thread modifica contador a la vez.

Reglas clave:

Channels: comunicación segura entre threads

Un channel es una tubería entre threads. Un thread envía valores, otro thread los recibe. Los channels son la forma más segura de comunicación entre threads.

fn main() {
    let ch: Map = channel_new(8)

    fn productor() -> int {
        var i: int = 0
        while i < 5 {
            channel_send(ch, i * 10)
            i += 1
        }
        return 0
    }

    let handle: int = thread_spawn(productor)

    var i: int = 0
    while i < 5 {
        let valor: int = channel_recv(ch)
        print("Recibido: " + int_to_string(valor))
        i += 1
    }

    thread_join(handle)
}

Salida:

Recibido: 0
Recibido: 10
Recibido: 20
Recibido: 30
Recibido: 40

Patrón: pool de workers

Un patrón común es tener múltiples threads worker leyendo del mismo channel:

fn main() {
    let trabajos: Map = channel_new(16)
    let resultados: Map = channel_new(16)

    fn worker() -> int {
        while 1 > 0 {
            let trabajo: int = channel_recv(trabajos)
            if trabajo < 0 { return 0 }
            // "Procesar" el trabajo: elevar al cuadrado
            channel_send(resultados, trabajo * trabajo)
        }
        return 0
    }

    // Iniciar 4 workers
    let w1: int = thread_spawn(worker)
    let w2: int = thread_spawn(worker)
    let w3: int = thread_spawn(worker)
    let w4: int = thread_spawn(worker)

    // Enviar 8 trabajos
    var i: int = 1
    while i <= 8 {
        channel_send(trabajos, i)
        i += 1
    }

    // Recolectar 8 resultados
    var total: int = 0
    i = 0
    while i < 8 {
        let r: int = channel_recv(resultados)
        print("Resultado: " + int_to_string(r))
        total += r
        i += 1
    }
    print("Total: " + int_to_string(total))

    // Señalar a los workers que paren
    channel_send(trabajos, -1)
    channel_send(trabajos, -1)
    channel_send(trabajos, -1)
    channel_send(trabajos, -1)

    thread_join(w1)
    thread_join(w2)
    thread_join(w3)
    thread_join(w4)
}

Los workers se ejecutan en paralelo, cada uno tomando el siguiente trabajo disponible del channel. Así es como los servidores del mundo real distribuyen trabajo entre los núcleos de CPU.

Servidor HTTP multi-threaded

La biblioteca estándar de Nyx incluye http_serve_mt — un servidor HTTP multi-threaded que usa un pool de threads internamente:

import { http_serve_mt, http_response } from "std/http"

fn al_recibir_solicitud(request: Array) -> String {
    let ruta: String = request[2]

    if ruta == "/" {
        return http_response(200, "¡Hola desde un servidor multi-threaded!")
    }
    return http_response(404, "No Encontrado")
}

fn main() {
    print("Servidor multi-threaded en http://localhost:8080")
    http_serve_mt(8080, 4, al_recibir_solicitud)
}

El segundo parámetro (4) es el número de threads worker. Cada thread maneja solicitudes independientemente, así que múltiples clientes son atendidos simultáneamente. Así es como Nyx logra más de 73,000 solicitudes por segundo.

Ejemplo práctico: computación paralela

Divide una tarea grande entre múltiples threads:

fn main() {
    let ch: Map = channel_new(4)

    fn sumar_rango_1() -> int {
        var total: int = 0
        var i: int = 1
        while i <= 250000 {
            total += i
            i += 1
        }
        channel_send(ch, total)
        return 0
    }

    fn sumar_rango_2() -> int {
        var total: int = 0
        var i: int = 250001
        while i <= 500000 {
            total += i
            i += 1
        }
        channel_send(ch, total)
        return 0
    }

    fn sumar_rango_3() -> int {
        var total: int = 0
        var i: int = 500001
        while i <= 750000 {
            total += i
            i += 1
        }
        channel_send(ch, total)
        return 0
    }

    fn sumar_rango_4() -> int {
        var total: int = 0
        var i: int = 750001
        while i <= 1000000 {
            total += i
            i += 1
        }
        channel_send(ch, total)
        return 0
    }

    let h1: int = thread_spawn(sumar_rango_1)
    let h2: int = thread_spawn(sumar_rango_2)
    let h3: int = thread_spawn(sumar_rango_3)
    let h4: int = thread_spawn(sumar_rango_4)

    var gran_total: int = 0
    var i: int = 0
    while i < 4 {
        gran_total += channel_recv(ch)
        i += 1
    }

    thread_join(h1)
    thread_join(h2)
    thread_join(h3)
    thread_join(h4)

    print("Suma de 1 a 1,000,000 = " + int_to_string(gran_total))
    // 500000500000
}

Cuatro threads computan cada uno un cuarto de la suma, luego el thread principal suma los resultados parciales.

Ejercicios

  1. Escribe un programa que cree 3 threads, cada uno imprimiendo su propio mensaje, y espera a que todos terminen.
  1. Usa un mutex para incrementar de forma segura un contador compartido desde 5 threads, cada uno sumando 1000. El resultado final debe ser 5000.
  1. Construye un sistema productor-consumidor: un thread genera números del 1 al 20 y los envía por un channel. Otro thread los recibe e imprime.
  1. Escribe un map paralelo: divide un array en partes, procesa cada parte en un thread separado, y recolecta resultados via un channel.
  1. Construye un servidor echo multi-threaded usando tcp_listen, tcp_accept, y thread_spawn — crea un nuevo thread para cada conexión de cliente.

Resumen

Siguiente capítulo: Tu segundo proyecto — Un servidor web →

← Anterior: Networking — Servidores TCP Siguiente: Tu segundo proyecto — Un servidor web →