UD 7 - Apache Kafka y Spark¶
1. Introducción¶
Para entender y aprender como podemos usar Kafka y Spark en un sistema de Big Data, lo vamos a realizar mediante un ejemplo en el punto 3.
2. Uso de Spark con Kafka en Docker¶
Para que Spark sepa cómo leer de Kafka, necesita el conector spark-sql-kafka. Debemos decirle al Servidor de Spark Connect que descargue esta librería al arrancar.
Editamos nuestro docker-compose.yml en el servicio spark-connect. En la sección command, busca --packages y modifícalo para incluir el paquete de Kafka correspondiente a tu versión de Spark (4.0.1 con Scala 2.13):
spark-connect:
# ...
command:
- "/opt/spark/bin/spark-submit"
# ... resto de parámetros ...
- "--packages"
# Añadimos la librería de Kafka separada por coma del paquete de Connect
- "org.apache.spark:spark-connect_2.13:4.0.1,org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1"
(Si ya tuvieras levantado tu docker-compose.yml, tras guardar, reinicia el servicio: docker compose up -d --force-recreate spark-connect).
Por tanto, cuando lancemos nuestro programa de Spark, no hará falta que le indiquemos el paquete de Kafka, ya que el servidor de Spark Connect ya lo tiene disponible.
2.1 Docker compose completo¶
networks:
bda-network:
driver: bridge
name: bda-network
volumes:
namenode_data:
secondary_namenode_data:
datanode1_data:
datanode2_data:
datanode3_data:
portainer_data:
#postgres_data: # Persistencia del catálogo
spark_master_logs:
spark_worker1_logs:
spark_worker2_logs:
spark_worker3_logs:
kafka_controller_1_data:
kafka_controller_2_data:
kafka_controller_3_data:
kafka_broker_1_data:
kafka_broker_2_data:
kafka_broker_3_data:
services:
# --- INFRAESTRUCTURA DEVOPS ---
# --- TRAEFIK: Reverse Proxy y Gestor de Rutas ---
# Traefik actúa como el único punto de entrada (puerta de enlace) para nuestro cluster.
# Su función es interceptar todas las peticiones HTTP (en el puerto 80) y, basándose
# en el dominio solicitado (ej. 'portainer.localhost'), redirigir el tráfico al
# contenedor correcto de forma automática. Esto nos evita tener que gestionar y recordar
# un puerto diferente para cada servicio. El dashboard en el puerto 8089 nos permite
# ver las rutas que ha descubierto y si están activas.
#Si quiere añadirlo a más servicios, usa la etiqueta 'labels' para definir las reglas de Traefik, como está en portainer y namenode
traefik:
image: traefik:v3.6.2
container_name: traefik
command:
- "--api.insecure=true" # Habilita el dashboard inseguro para desarrollo
- "--providers.docker=true" # Escucha eventos de Docker
- "--providers.docker.exposedbydefault=false" # Seguridad: No exponer nada automáticamente
- "--entrypoints.web.address=:80" # Punto de entrada HTTP estándar
ports:
- "80:80" # Peticiones HTTP del host
- "8089:8080" # Dashboard de administración de Traefik
volumes:
- "/var/run/docker.sock:/var/run/docker.sock:ro" # Traefik necesita acceso al socket de Docker
networks:
- bda-network
# --- PORTAINER: Interfaz Gráfica para Docker ---
# Portainer nos da una UI web para gestionar de forma visual nuestros contenedores,
# imágenes, redes y volúmenes, facilitando la administración del entorno Docker.
#
# Este servicio está configurado para ser accesible de dos maneras:
# 1. Acceso Directo: A través del puerto 9000 (http://localhost:9000). Esto es gracias
# a la sección 'ports'. Es un acceso fiable y directo.
# 2. Acceso vía Traefik: Las 'labels' definen la regla para acceder por el dominio
# http://portainer.localhost. Esto permite unificar el acceso a todos los servicios
# bajo el mismo proxy inverso, usando nombres amigables en lugar de puertos
portainer:
image: portainer/portainer-ce:latest
container_name: portainer
networks:
- bda-network
ports:
# Mapeo de puertos directo: <PUERTO_EN_EL_HOST>:<PUERTO_EN_EL_CONTENEDOR>
# Exponemos la UI de Portainer (puerto 9000) en el puerto 9010 de nuestra máquina para evitar conflictos con el 9000 del namenode
- "9010:9000"
volumes:
- "/var/run/docker.sock:/var/run/docker.sock:ro" # Acceso al socket de Docker
- portainer_data:/data # Volumen para persistir la data de Portainer
command: -H unix:///var/run/docker.sock
labels:
# Activamos Traefik para este contenedor
- "traefik.enable=true"
# Regla: si el host es 'portainer.localhost', reenvía a este servicio
- "traefik.http.routers.portainer.rule=Host(`portainer.localhost`)"
# Definimos el punto de entrada (entrypoint) como 'web' (puerto 80)
- "traefik.http.routers.portainer.entrypoints=web"
# IMPORTANTE: Decirle a Traefik cuál es el puerto interno del servicio web (9000)
- "traefik.http.services.portainer.loadbalancer.server.port=9000"
# --- SERVICIOS BIG DATA (Con etiquetas Traefik) ---
# --- CAPA DE ALMACENAMIENTO (HDFS) ---
namenode:
image: apache/hadoop:3.4.1
container_name: namenode
hostname: namenode
user: root
networks:
bda-network:
aliases:
- cluster-bda # Alias for the namenode service. Para que así pueda resolver docker. Docker registra ese nombre de host en la red.
ports:
- "9870:9870" # UI Web
env_file:
- ./hadoop.env
#environment:
# Esta variable formatea el NameNode si la carpeta está vacía
#- ENSURE_NAMENODE_DIR="/opt/hadoop/hadoop_data/hdfs/namenode"
volumes:
- namenode_data:/opt/hadoop/hadoop_data/hdfs/namenode
#command: ["hdfs", "namenode"]
command:
- "/bin/bash"
- "-c"
- "if [ ! -d /opt/hadoop/hadoop_data/hdfs/namenode/current ]; then echo '--- FORMATTING NAMENODE (FRESH START) ---'; hdfs namenode -format -nonInteractive; else echo '--- NAMENODE DATA FOUND (NO FORMAT) ---'; fi; hdfs namenode"
labels:
# Activamos Traefik para este contenedor
- "traefik.enable=true"
# Regla de enrutamiento: responder a namenode.localhost
- "traefik.http.routers.namenode.rule=Host(`namenode.localhost`)"
# IMPORTANTE: Decirle a Traefik cuál es el puerto interno del servicio web (9870)
- "traefik.http.services.namenode.loadbalancer.server.port=9870"
secondary_namenode:
image: apache/hadoop:3.4.1
container_name: secondary_namenode
hostname: secondary_namenode
user: root
networks:
- bda-network
ports:
- "9868:9868"
env_file:
- ./hadoop.env
depends_on:
- namenode
volumes:
- secondary_namenode_data:/opt/hadoop/hadoop_data/hdfs/secondary_namenode
command: ["hdfs", "secondarynamenode"]
datanode1:
image: apache/hadoop:3.4.1
container_name: datanode1
hostname: datanode1
user: root
networks:
- bda-network
env_file:
- ./hadoop.env
depends_on:
- namenode
volumes:
- datanode1_data:/opt/hadoop/hadoop_data/hdfs/datanode
command: ["hdfs", "datanode"]
datanode2:
image: apache/hadoop:3.4.1
container_name: datanode2
hostname: datanode2
user: root
networks:
- bda-network
env_file:
- ./hadoop.env
depends_on:
- namenode
volumes:
- datanode2_data:/opt/hadoop/hadoop_data/hdfs/datanode
command: ["hdfs", "datanode"]
datanode3:
image: apache/hadoop:3.4.1
container_name: datanode3
hostname: datanode3
user: root
networks:
- bda-network
env_file:
- ./hadoop.env
depends_on:
- namenode
volumes:
- datanode3_data:/opt/hadoop/hadoop_data/hdfs/datanode
command: ["hdfs", "datanode"]
# --- CAPA DE PROCESAMIENTO (YARN) ---
resourcemanager:
image: apache/hadoop:3.4.1
container_name: resourcemanager
hostname: resourcemanager
user: root
networks:
- bda-network
ports:
- "8088:8088" # YARN ResourceManager Web UI
- "8032:8032" # YARN ResourceManager RPC
env_file:
- ./hadoop.env
depends_on:
- namenode
command: ["yarn", "resourcemanager"]
labels:
- "traefik.enable=true"
- "traefik.http.routers.resourcemanager.rule=Host(`yarn.localhost`)"
- "traefik.http.services.resourcemanager.loadbalancer.server.port=8088"
# NodeManager asociado a DataNode1
nodemanager1:
image: apache/hadoop:3.4.1
container_name: nodemanager1
hostname: nodemanager1
user: root
networks:
- bda-network
ports:
- "8042:8042" # Puerto único para la Web UI del NM1
env_file:
- ./hadoop.env
depends_on:
- resourcemanager
- namenode
command: ["yarn", "nodemanager"]
# NodeManager asociado a DataNode2
nodemanager2:
image: apache/hadoop:3.4.1
container_name: nodemanager2
hostname: nodemanager2
user: root
networks:
- bda-network
ports:
- "8043:8042" # Puerto único para la Web UI del NM2
env_file:
- ./hadoop.env
depends_on:
- resourcemanager
- namenode
command: ["yarn", "nodemanager"]
# NodeManager asociado a DataNode3
nodemanager3:
image: apache/hadoop:3.4.1
container_name: nodemanager3
hostname: nodemanager3
user: root
networks:
- bda-network
ports:
- "8044:8042" # Puerto único para la Web UI del NM3
env_file:
- ./hadoop.env
depends_on:
- resourcemanager
- namenode
command: ["yarn", "nodemanager"]
# --- CAPA DE PERSISTENCIA RELACIONAL (Catalog Database) ---
#postgres:
# image: postgres:18
# container_name: postgres
# networks:
# - bda-network
# environment:
# POSTGRES_DB: metastore_db
# POSTGRES_USER: hive
# POSTGRES_PASSWORD: hive_password
# # Le decimos a Postgres dónde poner los datos exactamente (Necesario para Postgres 18+)
# PGDATA: /var/lib/postgresql/data/pgdata
# volumes:
# - postgres_data:/var/lib/postgresql/data
# # healthcheck declara una verificación que se ejecuta para determinar si los contenedores de servicio están "healthy" o no.
# healthcheck:
# test: ["CMD-SHELL", "pg_isready -U hive -d metastore_db"]
# interval: 10s
# timeout: 5s
# retries: 5
# --- SERVICIO DE METADATOS (Hive Metastore) ---
#metastore:
# image: apache/hive:standalone-metastore-4.2.0
# container_name: metastore
# networks:
# - bda-network
# environment:
# SERVICE_NAME: metastore
# # Credenciales explícitas para el script de inicio de la imagen
# DB_DRIVER: postgres
# # Configuración JAVA directa (Sin XMLs intermedios para la DB)
# SERVICE_OPTS: >-
# -Xmx1G
# -Djavax.jdo.option.ConnectionDriverName=org.postgresql.Driver
# -Djavax.jdo.option.ConnectionURL=jdbc:postgresql://postgres:5432/metastore_db
# -Djavax.jdo.option.ConnectionUserName=hive
# -Djavax.jdo.option.ConnectionPassword=hive_password
# -Dhive.metastore.warehouse.dir=hdfs://namenode:9000/user/hive/warehouse
# -Dfs.defaultFS=hdfs://namenode:9000
# depends_on:
# postgres:
# # Establece la condición bajo la cual se considera satisfecha la dependencia (https://docs.docker.com/reference/compose-file/services/#depends_on)
# condition: service_healthy
# namenode:
# condition: service_started
# ports:
# - "9083:9083" # Puerto Thrift expuesto para clientes externos
# volumes:
# # Inyección del Driver postgres y librerias YARN (Fix SystemClock Error)
# - ./drivers/postgresql-42.7.8.jar:/opt/hive/lib/postgresql-jdbc.jar
# # --- NUEVOS PARCHES PARA YARN (Fix SystemClock Error) ---
# - ./drivers/hadoop-yarn-common-3.4.1.jar:/opt/hive/lib/hadoop-yarn-common-3.4.1.jar
# - ./drivers/hadoop-yarn-api-3.4.1.jar:/opt/hive/lib/hadoop-yarn-api-3.4.1.jar
# --- MOTOR DE EJECUCIÓN SQL (HiveServer2) ---
#hiveserver2:
# image: apache/hive:4.2.0
# container_name: hiveserver2
# networks:
# - bda-network
# environment:
# SERVICE_NAME: hiveserver2
# TEZ_CONTAINER_SIZE: 512 # Ajuste de memoria para contenedores Tez
# # Evita re-inicializar el esquema (ya lo hace el metastore)
# IS_RESUME: "true"
# # Configuración: Conéctate al metastore remoto y usa HDFS
# SERVICE_OPTS: >-
# -Dhive.metastore.uris=thrift://metastore:9083
# -Dhive.metastore.warehouse.dir=hdfs://namenode:9000/user/hive/warehouse
# -Dfs.defaultFS=hdfs://namenode:9000
# depends_on:
# metastore:
# condition: service_started
# resourcemanager:
# condition: service_started
# ports:
# - "10000:10000" # Puerto JDBC (Beeline/DBeaver)
# - "10002:10002" # Web UI de HiveServer2
# labels:
# - "traefik.enable=true"
# # Acceso HTTP a la UI de Hive
# - "traefik.http.routers.hive.rule=Host(`hive.localhost`)"
# - "traefik.http.services.hive.loadbalancer.server.port=10002"
# - "traefik.http.routers.hive.entrypoints=web"
# # Desactivando el paso de la cabecera Host (passHostHeader = false).
# # Le decimos a Traefik que NO pase el nombre de dominio original al contenedor.
# # Así, Traefik reescribirá la cabecera Host para que coincida con la IP interna del contenedor y el puerto 10002.
# # Esto evita que Jetty de error en hiveserver2 por recibir tráfico del puerto 80.
# - "traefik.http.services.hive.loadbalancer.passhostheader=false"
# -------------------------------------------
# --- MOTOR DE PROCESAMIENTO (Spark) ---
# APACHE SPARK (COMPUTE LAYER)
# Versión: 4.0.1 (Official Docker Image)
# Arquitectura: Standalone Mode (1 Master + 3 Workers)
# -------------------------------------------
spark-master:
image: spark:4.0.1
container_name: spark-master
hostname: spark-master
user: root # Necesario para escribir logs en volúmenes
networks:
- bda-network
ports:
- "7077:7077" # Puerto RPC (Necesario para enviar trabajos desde fuera)
- 8080:8080 # Puerto Web UI
volumes:
- spark_master_logs:/opt/spark/logs
labels:
- "traefik.enable=true"
- "traefik.http.routers.spark.rule=Host(`spark.localhost`)"
- "traefik.http.services.spark.loadbalancer.server.port=8080"
- "traefik.http.routers.spark.entrypoints=web"
# Arrancamos la clase Master directamente
command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.master.Master", "--host", "spark-master", "--port", "7077", "--webui-port", "8080"]
spark-worker-1:
image: spark:4.0.1
container_name: spark-worker-1
hostname: spark-worker-1
user: root
networks:
- bda-network
#environment:
# --- GESTIÓN DE RECURSOS ---
# Spark detecta nativamente estas variables al arrancar la JVM.
#- SPARK_WORKER_MEMORY=1G # Límite de RAM por Worker
# - SPARK_WORKER_CORES=1 # Descomentar para limitar CPU
volumes:
- spark_worker1_logs:/opt/spark/logs
depends_on:
- spark-master
- namenode
# Arrancamos la clase Worker apuntando al Master
command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]
spark-worker-2:
image: spark:4.0.1
container_name: spark-worker-2
hostname: spark-worker-2
user: root
networks:
- bda-network
#environment:
#- SPARK_WORKER_MEMORY=1G
# - SPARK_WORKER_CORES=1 # Descomentar para limitar CPU
volumes:
- spark_worker2_logs:/opt/spark/logs
depends_on:
- spark-master
- namenode
# Arrancamos la clase Worker apuntando al Master
command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]
spark-worker-3:
image: spark:4.0.1
container_name: spark-worker-3
hostname: spark-worker-3
user: root
networks:
- bda-network
#environment:
#- SPARK_WORKER_MEMORY=1G
# - SPARK_WORKER_CORES=1 # Descomentar para limitar CPU
volumes:
- spark_worker3_logs:/opt/spark/logs
depends_on:
- spark-master
- namenode
# Arrancamos la clase Worker apuntando al Master
command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]
# -------------------------------------------
# SPARK CONNECT SERVER (Gateway para Clientes Remotos)
# Este servicio permite conectar IDEs (VSCode/PyCharm) desde el Host
# -------------------------------------------
spark-connect:
image: spark:4.0.1
container_name: spark-connect
hostname: spark-connect
user: root
networks:
- bda-network
ports:
- "15002:15002" # Puerto gRPC expuesto al Host
- "4040:4040" # WebUI de este Driver específico
depends_on:
- spark-master
- namenode
labels:
# Exponemos la UI del Driver de Spark Connect
- "traefik.enable=true"
- "traefik.http.routers.spark-connect-ui.rule=Host(`spark-connect.localhost`)"
- "traefik.http.services.spark-connect-ui.loadbalancer.server.port=4040"
- "traefik.http.routers.spark-connect-ui.entrypoints=web"
# Arrancamos el servidor de Spark Connect conectándose al Master
command:
- "/opt/spark/bin/spark-submit"
- "--master"
- "spark://spark-master:7077"
- "--class"
- "org.apache.spark.sql.connect.service.SparkConnectServer"
- "--name"
- "SparkConnectServer"
- "--conf"
- "spark.driver.bindAddress=0.0.0.0"
# --- Límites para evitar Starvation si no tienes recursos suficientes ---
# Limitamos a 1 núcleo en total (deja libres los otros para tus jobs)
- "--conf"
- "spark.cores.max=1"
# Limitamos la memoria por ejecutor (deja RAM libre en los workers)
- "--conf"
- "spark.executor.memory=512m"
# ---------------------------------------------
# Importante: Las librerías de Connect suelen venir en la imagen,
# pero apuntamos al paquete local por seguridad en la carga de clases.
# Añadimos también la librería de Kafka separada por coma del paquete de Connect para Spark Streaming con Kafka
- "--packages"
- "org.apache.spark:spark-connect_2.13:4.0.1,org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1"
# -------------------------------------------
# APACHE KAFKA (STREAMING LAYER)
# Arquitectura: 3 Controllers + 3 Brokers (KRaft Isolated)
# -------------------------------------------
kafka-controller-1:
image: apache/kafka:4.2.0
container_name: kafka-controller-1
user: root
networks:
- bda-network
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
# Limitamos RAM para entornos de laboratorio
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
volumes:
- kafka_controller_1_data:/tmp/kafka-logs
kafka-controller-2:
image: apache/kafka:4.2.0
container_name: kafka-controller-2
user: root
networks:
- bda-network
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
volumes:
- kafka_controller_2_data:/tmp/kafka-logs
kafka-controller-3:
image: apache/kafka:4.2.0
container_name: kafka-controller-3
user: root
networks:
- bda-network
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
volumes:
- kafka_controller_3_data:/tmp/kafka-logs
kafka-broker-1:
image: apache/kafka:4.2.0
container_name: kafka-broker-1
user: root
networks:
- bda-network
ports:
- "9094:9094" # Puerto mapeado al Host
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
volumes:
- kafka_broker_1_data:/tmp/kafka-logs
kafka-broker-2:
image: apache/kafka:4.2.0
container_name: kafka-broker-2
user: root
networks:
- bda-network
ports:
- "9095:9095"
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9095
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
volumes:
- kafka_broker_2_data:/tmp/kafka-logs
kafka-broker-3:
image: apache/kafka:4.2.0
container_name: kafka-broker-3
user: root
networks:
- bda-network
ports:
- "9096:9096"
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9096
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:9092,EXTERNAL://localhost:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
depends_on:
- kafka-controller-1
- kafka-controller-2
- kafka-controller-3
volumes:
- kafka_broker_3_data:/tmp/kafka-logs
# --- OBSERVABILIDAD (Kafbat UI) ---
kafka-ui:
image: ghcr.io/kafbat/kafka-ui:latest
container_name: kafka-ui
networks:
- bda-network
environment:
# Nombre que aparecerá en la interfaz
KAFKA_CLUSTERS_0_NAME: bda-cluster
# Apuntamos a los puertos INTERNOS de los brokers en Docker
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
# Habilitamos la configuración dinámica desde la propia UI (muy útil en Labs)
DYNAMIC_CONFIG_ENABLED: 'true'
# Le dice a la aplicación Spring Boot que respete las cabeceras que le envía Traefik
SERVER_FORWARD_HEADERS_STRATEGY: FRAMEWORK
depends_on:
- kafka-broker-1
- kafka-broker-2
- kafka-broker-3
labels:
# Integración con nuestro Traefik
- "traefik.enable=true"
- "traefik.http.routers.kafkaui.rule=Host(`kafka.localhost`)"
- "traefik.http.services.kafkaui.loadbalancer.server.port=8080"
- "traefik.http.routers.kafkaui.entrypoints=web"
# --- CLIENTE KAFKA (Edge Node) ---
# Usado exclusivamente para administrar el clúster sin afectar a los brokers
kafka-client:
image: apache/kafka:4.2.0
container_name: kafka-client
networks:
- bda-network
depends_on:
- kafka-broker-1
- kafka-broker-2
- kafka-broker-3
# Comando para mantener el contenedor vivo
command: tail -f /dev/null
# --- Kafka Connect ---
# =======================================================
# PLANTILLA BASE PARA KAFKA CONNECT (YAML Anchor)
# Definimos toda la configuración común para no repetirla
# =======================================================
x-connect-defaults: &connect-defaults
image: apache/kafka:4.2.0
networks:
- bda-network
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
# Mismo Group ID une los workers en un solo clúster
CONNECT_GROUP_ID: bda-connect-cluster
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_PLUGIN_PATH: /opt/kafka/plugins
depends_on:
- kafka-broker-1
- kafka-broker-2
- kafka-broker-3
volumes:
- ./kafka_connect_plugins:/opt/kafka/plugins:ro
# Inyección directa del driver MySQL (Asegúrate de tenerlo en esa ruta)
- ./kafka_connect_plugins/drivers/mysql-connector-j-9.6.0.jar:/opt/kafka/libs/mysql-connector-j.jar:ro
command:
- bash
- -c
- |
echo "bootstrap.servers=$$CONNECT_BOOTSTRAP_SERVERS" > /tmp/connect-distributed.properties
echo "group.id=$$CONNECT_GROUP_ID" >> /tmp/connect-distributed.properties
echo "key.converter=$$CONNECT_KEY_CONVERTER" >> /tmp/connect-distributed.properties
echo "value.converter=$$CONNECT_VALUE_CONVERTER" >> /tmp/connect-distributed.properties
echo "key.converter.schemas.enable=$$CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE" >> /tmp/connect-distributed.properties
echo "value.converter.schemas.enable=$$CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE" >> /tmp/connect-distributed.properties
echo "offset.storage.topic=connect-offsets" >> /tmp/connect-distributed.properties
echo "offset.storage.replication.factor=3" >> /tmp/connect-distributed.properties
echo "config.storage.topic=connect-configs" >> /tmp/connect-distributed.properties
echo "config.storage.replication.factor=3" >> /tmp/connect-distributed.properties
echo "status.storage.topic=connect-status" >> /tmp/connect-distributed.properties
echo "status.storage.replication.factor=3" >> /tmp/connect-distributed.properties
echo "plugin.path=$$CONNECT_PLUGIN_PATH" >> /tmp/connect-distributed.properties
echo "Iniciando Kafka Connect en modo distribuido..."
/opt/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties
# -------------------------------------------
# KAFKA CONNECT CLUSTER (3 Workers)
# Heredan de la plantilla base (<<: *connect-defaults)
# -------------------------------------------
kafka-worker-1:
<<: *connect-defaults
container_name: kafka-worker-1
ports:
- "8083:8083" # API REST Worker 1
kafka-worker-2:
<<: *connect-defaults
container_name: kafka-worker-2
ports:
- "8084:8083" # API REST Worker 2 (Mapeado al 8084 del Host)
kafka-worker-3:
<<: *connect-defaults
container_name: kafka-worker-3
ports:
- "8085:8083" # API REST Worker 3 (Mapeado al 8085 del Host)
3. Ejemplo 7. Streaming en Tiempo Real: Kafka y Spark¶
Imagina que vas a trabajar para un gran equipo de futbol en el análisis de datos para obtener business intelligence para que durante los partidos, podamos servir y dar soporte al stack técnico.
Vamos a realizar un pequeño ejemplo de concepto, donde vamos a generar datos sintéticos de las posiciones de los jugadores durante el partido, para seguidamente obtener información como: distancia recorrida, velocidades, sprints, etc; en tiempo real, para que ayude al stack técnico en su toma de decisiones.
Vamos a establecer un cluster de Kafka para recoger los eventos en tiempo real, que dispondrá los datos para que Spark los recoja en tiempo real en un dataset y pueda aplicarle Pandas para obtener estos datos.
Configuración cluster Kafka
Vamos a configurar el cluster de Kafka usando mode KRaft teniendo en cuenta que estamos en un entorno de pruebas y no de producción, para este ejemplo de concepto, vamos a configurar un cluster con 3 servidores (en un sólo nodo o máquina) con 1 controller y 2 brokers
3.1 Requisitos¶
Vamos a configurar un cluster las siguientes características:
- 1 topic jugadores con 2 particiones
- 1 factor de replica de 2
- 1 nodo con 2 brokers y 1 controller
3.2 Configuración del Clúster de Kafka¶
- Consideraciones previas:
- Vamos a establecer todos los archivos de configuración en una carpeta de ejemplo llamada ejemplo7, que en mi caso estará alojada en
/opt/kafka/ejemplo7 - Dado que todas las instancias se ejecutarán en el mismo nodo, es crucial asignar puertos únicos y directorios de log para cada broker y el controller.
- Configuración:
- Para el controller, debemos usar como base la configuración de propiedades de controller de kafka (KRaft mode) que se encuentran
config/kraft/controller.properties -
Para cada broker, necesitaremos crear un archivo de configuración por separado. Para ello debemos usar como base la configuración de propiedades de brokers de kafka que se encuentran
config/kraft/broker.properties -
Creamos los directorios necesarios para nuestro
ejemplo7
- Hacemos 2 y 1 copia de los ficheros correspondientes de configuración para cada uno
cp config/kraft/controller.properties /opt/kafka/ejemplo7/config/controller1.properties
cp config/kraft/broker.properties /opt/kafka/ejemplo7/config/broker1.properties
cp config/kraft/broker.properties /opt/kafka/ejemplo7/config/broker2.properties
- Asignamos la configuración al controller
# Server Basics
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=192.168.11.10:9093
# Socket Server Settings
listeners=CONTROLLER://localhost:9093
controller.listener.names=CONTROLLER
# Log Basics
log.dirs=/opt/kafka/ejemplo7/logs/controller1
- Asignamos la siguiente configuración para cada broker
Spark como consumer (nodo master de Spark)
Hay que tener en cuenta:
- Vamos a conectarnos con un cliente(consumer) que va a consumir datos en streaming con SPARK
- Por seguridad, kafka no permite esa conexión del cliente(consumer) desde localhost
- Teniendo en cuenta nuestra configuración del cluster (ip del master de Spark es 192.168.11.10), debemos configurarla correctamente en los brokers para que tengan acceso.
- La conexión se hace a través del master de spark, y la ejecución de la aplicación en Spark se reparte entre los workers del cluster, aunque estén en diferentes ips
# Server Basics
process.roles=broker
node.id=2
controller.quorum.bootstrap.servers=192.168.11.10:9093
# Socket Server Settings
listeners=PLAINTEXT://192.168.11.10:9094
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.11.10:9094
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# Log Basics
log.dirs=/opt/kafka/ejemplo7/logs/broker1
# Server Basics
process.roles=broker
node.id=3
controller.quorum.bootstrap.servers=192.168.11.10:9093
# Socket Server Settings
listeners=PLAINTEXT://192.168.11.10:9095
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.11.10:9095
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# Log Basics
log.dirs=/opt/kafka/ejemplo7/logs/broker2
Configuración con varios consumers
En el caso de que tuviéramos que configurar más de un cliente en diferentes máquinas, tendríamos que cambiar la configuración de nuestros brokers. Recordando para que sirven las siguiente propiedades:
listeners: son las interfaces a las que se conecta Kafka.advertised.listeners: es la forma en que los clientes pueden conectarse.listener.security.protocol.map: define pares clave/valor para el protocolo de seguridad a utilizar por nombre de listener.
1) Opción 1
Abrir el acceso en listeners configurando la ip como 0.0.0.0:puerto. Esto no sería la mejor solución, ya que debemos filtrar el acceso desde advertised.listeners y este valor, por configuración propia de Kafka, debe ser un subconjunto de listeners
2) Opción 2
Indicamos por ip cada uno de los interfaces que damos acceso (solo un interface por puerto) a los brokers de Kafka (listeners), y filtramos (advertised.listeners). Pero no hay que olvidar indicar el protocolo por el cual te conectas, que hay que indicarlo en listener.security.protocol.map, que también debe ser único. Imaginando que tenemos varios clientes de acceso, la configuración debería ser en ese caso
# Server Basics
process.roles=broker
node.id=2
controller.quorum.bootstrap.servers=localhost:9093
# Socket Server Settings
listeners=BROKER://localhost:9094,LISTENER_CLIENTE1://nodo1:11094,LISTENER_CLIENTE2://nodo2:12094,LISTENER_CLIENTE3://nodo3:13094
inter.broker.listener.name=BROKER
advertised.listeners=BROKER://localhost:9094,LISTENER_CLIENTE1://nodo1:11094,LISTENER_CLIENTE2://nodo2:12094,LISTENER_CLIENTE3://nodo3:13094
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,LISTENER_CLIENTE1:PLAINTEXT,LISTENER_CLIENTE2:PLAINTEXT,LISTENER_CLIENTE3:PLAINTEXT
# Log Basics
log.dirs=/opt/kafka/ejemplo7/logs/broker1
# Server Basics
process.roles=broker
node.id=3
controller.quorum.bootstrap.servers=localhost:9093
# Socket Server Settings
listeners=BROKER://localhost:9095,LISTENER_CLIENTE1://nodo1:11095,LISTENER_CLIENTE2://nodo2:12095,LISTENER_CLIENTE3://nodo3:13095
inter.broker.listener.name=BROKER
advertised.listeners=BROKER://localhost:9095,LISTENER_CLIENTE1://nodo1:11095,LISTENER_CLIENTE2://nodo2:12095,LISTENER_CLIENTE3://nodo3:13095
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,BROKER:PLAINTEXT,LISTENER_CLIENTE1:PLAINTEXT,LISTENER_CLIENTE2:PLAINTEXT,LISTENER_CLIENTE3:PLAINTEXT
# Log Basics
log.dirs=/opt/kafka/ejemplo7/logs/broker2
- Iniciamos Kafka
#Genera un cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
#Formateamos los directorios de log
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID --standalone -c /opt/kafka/ejemplo7/config/controller1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo7/config/broker1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo7/config/broker2.properties
- Iniciamos los server(1 controller y 2 brokers) cada uno en una terminal
#Ejecuta el servidor Kafka
bin/kafka-server-start.sh /opt/kafka/ejemplo7/config/controller1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo7/config/broker1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo7/config/broker2.properties
3.3 Creación del Topic¶
- Creamos el topic con factor de replica 2 y 2 particiones. El topic debe conectarse a un broker.
bin/kafka-topics.sh --create --topic player-position --bootstrap-server 192.168.11.10:9094 --replication-factor 2 --partitions 2
- Podemos ver la descripción del topic creado
Topic: player-position TopicId: AVypABAdSeq9--Pxs4Qtew PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: player-position Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: LastKnownElr:
Topic: player-position Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
3.4 Productor¶
- Creamos el programa que va a generar los datos sintéticos.
- Vamos a intentar simular datos lo más reales posibles a los movimientos de un jugador de futbol.
- Suponemos que el campo es de 100x65 metros
- El punto (0,0) será la esquina inferior izquierda y el punto (100,65) será la esquina superior derecha.
- Generaremos 10 eventos por segundo
import random
import time
from datetime import datetime, timedelta
import math
# Parámetros del campo y jugadores
FIELD_LENGTH = 100
FIELD_WIDTH = 65
NUM_PLAYERS = 11
TIMESTAMPS_PER_SECOND = 10
DURATION_MINUTES = 45
# Velocidades en metros por segundo
WALK_SPEED = 1.4
RUN_SPEED = 5.0
SPRINT_SPEED = 8.0
def generate_initial_positions():
"""Genera posiciones iniciales razonables para los jugadores."""
positions = {}
for player_id in range(1, NUM_PLAYERS + 1):
if player_id == 1:
# Portero
positions[player_id] = (random.uniform(0, 5), random.uniform(FIELD_WIDTH / 2 - 5, FIELD_WIDTH / 2 + 5))
else:
# Jugadores de campo
positions[player_id] = (random.uniform(0, FIELD_LENGTH), random.uniform(0, FIELD_WIDTH))
return positions
def move_player(position, speed):
"""Mueve al jugador a una nueva posición basada en la velocidad y dirección."""
angle = random.uniform(0, 360) # Dirección aleatoria
distance = speed / TIMESTAMPS_PER_SECOND
delta_x = distance * math.cos(math.radians(angle))
delta_y = distance * math.sin(math.radians(angle))
new_x = max(0, min(FIELD_LENGTH, position[0] + delta_x))
new_y = max(0, min(FIELD_WIDTH, position[1] + delta_y))
return (new_x, new_y)
def generate_player_position_data(match_id, player_id, position):
timestamp = datetime.utcnow().isoformat()
return {
"match_id": match_id,
"player_id": player_id,
"timestamp": timestamp,
"position_x": position[0],
"position_y": position[1]
}
def simulate_player_movement(positions):
"""Simula el movimiento de los jugadores en el campo."""
new_positions = {}
for player_id, position in positions.items():
if player_id == 1:
# Portero: se mueve dentro del área de gol
new_positions[player_id] = move_player(position, random.uniform(0, WALK_SPEED))
else:
# Jugadores de campo: pueden caminar, correr o sprintar
speed = random.choice([WALK_SPEED, RUN_SPEED, SPRINT_SPEED])
new_positions[player_id] = move_player(position, speed)
return new_positions
def main():
positions = generate_initial_positions()
end_time = datetime.utcnow() + timedelta(minutes=DURATION_MINUTES)
while datetime.utcnow() < end_time:
for _ in range(TIMESTAMPS_PER_SECOND):
for player_id in range(1, NUM_PLAYERS + 1):
data = generate_player_position_data("match_1", player_id, positions[player_id])
print(data) # Aquí enviaríamos los datos a Kafka en lugar de imprimirlos
positions = simulate_player_movement(positions)
time.sleep(1 / TIMESTAMPS_PER_SECOND)
if __name__ == "__main__":
main()
- Creamos el productor con KafkaProducer
from kafka import KafkaProducer
import json
from generate_synthetic_data import generate_player_position_data, generate_initial_positions, simulate_player_movement
import time
from datetime import datetime, timedelta
# Parámetros del campo y jugadores
NUM_PLAYERS = 11
TIMESTAMPS_PER_SECOND = 10
DURATION_MINUTES = 45
def main():
match_id = "match_" + datetime.utcnow().strftime("%Y%m%d%H%M%S")
producer = KafkaProducer(bootstrap_servers='192.168.11.10:9094',
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
positions = generate_initial_positions()
end_time = datetime.utcnow() + timedelta(minutes=DURATION_MINUTES)
while datetime.utcnow() < end_time:
for _ in range(TIMESTAMPS_PER_SECOND):
for player_id in range(1, NUM_PLAYERS + 1):
data = generate_player_position_data(match_id, player_id, positions[player_id])
producer.send('player-position', value=data)
print(f"Sent: {data}")
positions = simulate_player_movement(positions)
time.sleep(1 / TIMESTAMPS_PER_SECOND)
if __name__ == "__main__":
main()
- Aprovecharemos la Consumer API de Kafka para ver está consumiendo los datos correctamente una vez lanzada la aplicación
bin/kafka-console-consumer.sh --topic player-position --from-beginning --bootstrap-server 192.168.11.10:9094
3.5 Configuración del Clúster de Spark¶
Aprovecharemos las máquinas master y los 3 nodos que ya tenemos preparadas para lanzar el cluster de Spark. Consulta la documentación de Spark para cualquier duda
-
Código del programa
- Consumirá los eventos en tiempo real de kafka
- Lo almacena en un dataset
- Obtenemos ETL a través de Pandas
- Mostramos el resultado por consola:
- El calculo de cada microBatch
- El acumulado del (último) partido.
-
Para una correcta configuración, nos basaremos en la documentación oficial de Apache Spark y su integración con Kafka: Structured Streaming + Kafka Integration Guide
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
import pandas as pd
import numpy as np
# Definir el esquema de los datos
schema = StructType([
StructField("match_id", StringType(), True),
StructField("player_id", IntegerType(), True),
StructField("timestamp", StringType(), True),
StructField("position_x", FloatType(), True),
StructField("position_y", FloatType(), True)
])
# Crear la sesión de Spark
spark = SparkSession.builder \
.appName("KafkaDataConsumer") \
.getOrCreate()
# Set Spark logging level to ERROR to avoid varios otros logs en consola.
spark.sparkContext.setLogLevel("ERROR")
# Leer los datos de Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.11.10:9094") \
.option("subscribe", "player-position") \
.option("startingOffsets", "earliest") \
.load()
player_position_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
# Crear un DataFrame acumulativo vacío
accumulated_schema = StructType([
StructField("match_id", StringType(), True),
StructField("player_id", IntegerType(), True),
StructField("total_distance", FloatType(), True)
])
accumulated_df = spark.createDataFrame([], accumulated_schema)
# Variable global para el `match_id` actual
current_match_id = None
# Definir una función para procesar cada microbatch
def process_batch(df, epoch_id):
global accumulated_df, current_match_id
# Convertir a Pandas DataFrame
pd_df = df.toPandas()
# Obtener el `match_id` del batch actual
if not pd_df.empty:
match_id = pd_df['match_id'].iloc[0]
current_match_id = match_id
# Calcular la distancia recorrida por cada jugador
pd_df['prev_position_x'] = pd_df.groupby('player_id')['position_x'].shift(1)
pd_df['prev_position_y'] = pd_df.groupby('player_id')['position_y'].shift(1)
pd_df['distance'] = np.sqrt((pd_df['position_x'] - pd_df['prev_position_x']) ** 2 +
(pd_df['position_y'] - pd_df['prev_position_y']) ** 2)
pd_df['distance'] = pd_df['distance'].fillna(0)
# Sumar la distancia total por jugador
total_distance_df = pd_df.groupby(['match_id', 'player_id'])['distance'].sum().reset_index()
# Convertir de nuevo a DataFrame de Spark
spark_df = spark.createDataFrame(total_distance_df)
# Unir con el DataFrame acumulativo
accumulated_df = accumulated_df.union(spark_df).groupBy("match_id", "player_id").agg({"total_distance": "sum"}).withColumnRenamed("sum(total_distance)", "total_distance")
# Filtrar el acumulado para el `match_id` actual
current_match_df = accumulated_df.filter(col("match_id") == current_match_id)
# Mostrar el resultado del batch actual y el acumulado del último partido
print("Batch Result:")
spark_df.show()
print(f"Resultado acumulado para match_id = {current_match_id}:")
current_match_df.show()
# Escribir los datos procesados usando el método foreachBatch
query = player_position_df \
.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.start()
query.awaitTermination()
- Lanzamos spark master y los workers del cluster.
hadoop@master:/opt/hadoop-3.4.1/spark-3.5.4$ ./sbin/start-master.sh
hadoop@master:/opt/hadoop-3.4.1/spark-3.5.4$ ./sbin/start-workers.sh
- Lanzamos el programa. Siguiendo la documentación oficial, añadimos el paquete del conector de Kafka
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4 --master spark://192.168.11.10:7077 /opt/kafka/ejemplo7/Spark_PlayerPositionProcessing.py
3.6 Ejecución de la aplicación¶
- Lanzamos el productor
3.7 Persistencia de datos¶
Si queremos realizar una persistencia de estos datos, sólo tendríamos que almacenarlos usando algunas de las ya explicadas a lo largo del módulo, o cualquier otra que sea conveniente para el escenario en concreto (HDFS, Sistema de ficheros, S3, MongoDB,...).
3.8 Conclusiones¶
A partir de esta configuración, podemos añadir ETL para obtener más conocimiento sobre los jugadores, como velocidades, sprints, mapas de calor, etc, que podría ayudar al stack técnico en diferentes tomas de decisiones (sistema de juego, rendimiento de jugadores, cansancio acumulado,...).
Podríamos añadir información adicional si pudiéramos capturar los datos del balón, que combinados con la de los jugadores, podríamos obtener otros datos valiosos de aplicación de business intelligence para ofrecérselos al stack técnico, tales como: estadísticas de pases, disparos, zonas de mayor juego,...
Imagina que trabajas para un equipo de fútbol analizando la telemetría de los jugadores para dar soporte al cuerpo técnico durante un partido.
Vamos a realizar un pequeño ejemplo de concepto, donde generaremos datos sintéticos de las posiciones de los jugadores durante el partido, para seguidamente obtener información como: distancia recorrida, velocidades, sprints, etc; en tiempo real, para que ayude al stack técnico en su toma de decisiones.
Vamos a establecer un cluster de Kafka para recoger los eventos en tiempo real, que dispondrá los datos para que Spark los recoja en tiempo real en un dataset y pueda aplicarle Pandas para obtener estos datos.
Configuraremos un flujo completo:
- Un script en Python enviará posiciones X/Y de los jugadores a un clúster de Kafka.
- Un script de Spark Structured Streaming leerá los datos en tiempo real.
- Spark calculará la distancia acumulada de cada jugador y mostrará los resultados.
3.1 Requisitos Previos y Preparación (Docker)¶
- Arrancamos nuestro cluster
- Creamos el Topic
Usamos nuestro nodo cliente para crear el topic donde se recogeran las coordenadas:
docker exec -it kafka-client /opt/kafka/bin/kafka-topics.sh \
--create --topic player-position \
--bootstrap-server kafka-broker-1:9092 \
--replication-factor 3 --partitions 3
- Preparamos el entorno para el productor y el consumidor.
Para ejecutar el script de Spark en Spark Connect y poder lanzar la aplicación en kafka con Spark Connect, necesitamos las librerías adecuadas:
- Spark connect:
pyspark-connectygrpcio-status. - Kafka:
kafka-python.
Como ya comentamos en la Unidad Didáctica de Spark, debemos asegurarnos de que pandas y pyarrow están instalados en todos los contenedores del clúster.
Si no lo has hecho aún, ejecuta el siguiente comando en tu terminal Host:
for container in spark-master spark-worker-1 spark-worker-2 spark-worker-3 spark-connect; do
docker exec -u 0 $container pip install pandas pyarrow numpy grpcio grpcio-status
done
Versiones de Python
La imagen de Docker usa Python 3.10. Tu entorno local debe usar Python 3.10 para evitar errores de serialización (PYTHON_VERSION_MISMATCH).
Instala Python 3.10 en tu sistema (vía apt con PPA deadsnakes en Ubuntu).
- Creamos un entorno virtual y activa las librerías de cliente en el host (con la versión de python3.10, que es la que tiene el clúster con las imágenes Docker spark 4.0.1)
# Creamos un nuevo directorio para el ejemplo 7
mkdir -p kafka-spark/ejemplo7
cd kafka-spark/ejemplo7
# Creamos/Activamos un entorno virtual para aislar las dependencias
python3.10 -m venv venv
source venv/bin/activate
# Instalamos las librerías necesarias para Spark Connect
pip install pyspark-connect grpcio-status
# Instalamos las librerías necesarias para Kafka
pip install kafka-python
- Configuramos Spark Connect con la librería de Kafka en nuestro docker-compose
Recuerda tener correctamente configurado el entorno para que Spark sepa cómo leer de Kafka. Para ello, como hemos visto en el punto anterior, necesitamos el conector spark-sql-kafka. Debemos decirle al Servidor de Spark Connect que descargue esta librería al arrancar.
Si no lo has hecho, vuelve al punto anterior y sigue las instrucciones para añadir el conector de Kafka a la configuración del servidor de Spark Connect.
3.2 Productor: Simulador de Posiciones de Jugadores (Docker)¶
Crearemos en nuestro host local los siguientes archivo y los guardaremos en kafka-spark/ejemplo7.
A. Lógica de Movimiento (generate_synthetic_data.py)
Programa que simula el movimiento de los jugadores en el campo, generando posiciones X/Y cada 100ms, con velocidades y patrones de movimiento lo más realista posible.
import random
import time
from datetime import datetime, timedelta, timezone
import math
# Parámetros del campo y jugadores
FIELD_LENGTH = 100
FIELD_WIDTH = 65
NUM_PLAYERS = 11
TIMESTAMPS_PER_SECOND = 10
DURATION_MINUTES = 45
# Velocidades en metros por segundo
WALK_SPEED = 1.4
RUN_SPEED = 5.0
SPRINT_SPEED = 8.0
def generate_initial_positions():
"""Genera posiciones iniciales razonables para los jugadores."""
positions = {}
for player_id in range(1, NUM_PLAYERS + 1):
if player_id == 1:
# Portero
positions[player_id] = (random.uniform(0, 5), random.uniform(FIELD_WIDTH / 2 - 5, FIELD_WIDTH / 2 + 5))
else:
# Jugadores de campo
positions[player_id] = (random.uniform(0, FIELD_LENGTH), random.uniform(0, FIELD_WIDTH))
return positions
def move_player(position, speed):
"""Mueve al jugador a una nueva posición basada en la velocidad y dirección."""
angle = random.uniform(0, 360) # Dirección aleatoria
distance = speed / TIMESTAMPS_PER_SECOND
delta_x = distance * math.cos(math.radians(angle))
delta_y = distance * math.sin(math.radians(angle))
new_x = max(0, min(FIELD_LENGTH, position[0] + delta_x))
new_y = max(0, min(FIELD_WIDTH, position[1] + delta_y))
return (new_x, new_y)
def generate_player_position_data(match_id, player_id, position):
timestamp = datetime.now(timezone.utc).isoformat()
return {
"match_id": match_id,
"player_id": player_id,
"timestamp": timestamp,
"position_x": position[0],
"position_y": position[1]
}
def simulate_player_movement(positions):
"""Simula el movimiento de los jugadores en el campo."""
new_positions = {}
for player_id, position in positions.items():
if player_id == 1:
# Portero: se mueve dentro del área de gol
new_positions[player_id] = move_player(position, random.uniform(0, WALK_SPEED))
else:
# Jugadores de campo: pueden caminar, correr o sprintar
speed = random.choice([WALK_SPEED, RUN_SPEED, SPRINT_SPEED])
new_positions[player_id] = move_player(position, speed)
return new_positions
def main():
positions = generate_initial_positions()
end_time = datetime.now(timezone.utc) + timedelta(minutes=DURATION_MINUTES)
while datetime.now(timezone.utc) < end_time:
for _ in range(TIMESTAMPS_PER_SECOND):
for player_id in range(1, NUM_PLAYERS + 1):
data = generate_player_position_data("match_1", player_id, positions[player_id])
print(data) # Aquí enviaríamos los datos a Kafka en lugar de imprimirlos
positions = simulate_player_movement(positions)
time.sleep(1 / TIMESTAMPS_PER_SECOND)
if __name__ == "__main__":
main()
B. Script Productor (producer_positions.py)
Conexión Externa vs Interna
Fíjate que el Productor (Host) se conecta a los puertos mapeados (localhost:9094), mientras que luego verás que Spark (Contenedor) se conectará a las redes internas de Docker (kafka-broker-1:9092).
from kafka import KafkaProducer
import json
from generate_synthetic_data import generate_player_position_data, generate_initial_positions, simulate_player_movement
import time
from datetime import datetime, timedelta, timezone
# Parámetros del campo y jugadores
NUM_PLAYERS = 11
TIMESTAMPS_PER_SECOND = 10
DURATION_MINUTES = 45
def main():
match_id = "match_" + datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
producer = KafkaProducer(
bootstrap_servers=['localhost:9094', 'localhost:9095', 'localhost:9096'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
positions = generate_initial_positions()
end_time = datetime.now(timezone.utc) + timedelta(minutes=DURATION_MINUTES)
print("--- Iniciando Partido (Enviando Telemetría) ---")
while datetime.now(timezone.utc) < end_time:
for _ in range(TIMESTAMPS_PER_SECOND):
for player_id in range(1, NUM_PLAYERS + 1):
data = generate_player_position_data(match_id, player_id, positions[player_id])
producer.send('player-position', value=data)
positions = simulate_player_movement(positions)
time.sleep(1 / TIMESTAMPS_PER_SECOND)
if __name__ == "__main__":
main()
3.3 Procesamiento en Spark Streaming - El Consumidor (Docker)¶
Query Detached (Consulta Desprendida) o Orphaned Query
En la arquitectura tradicional de Spark (VMs), si ejecutamos el spark-submit en tu terminal y hacíamos Ctrl+C, estabamos matando al Driver. Como el Driver moría, el clúster (Workers) dejaba de recibir instrucciones y el trabajo se detenía inmediatamente.
Con Spark Connect, las reglas cambian: En nuestro Host, al hacer Ctrl+C matamos a tu script local de Python. Pero en el clúster, el Servidor de Spark Connect sigue vivo, y el trabajo de streaming que le ordenaste seguir procesando, sigue activo en segundo plano, aunque tu script local ya no esté conectado.
Para solucionar este problema, necesitamos implementar un mecanismo en nuestro script local, para que al recibir la señal de interrupción (Ctrl+C), le enviemos una orden explícita al Servidor de Spark Connect para que detenga el stream correctamente. Este código lo añadimos en el script de Spark que hemos creado debajo para procesar los datos.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, lag, coalesce, sqrt, pow, sum as _sum
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.window import Window
# 1. Definir el esquema de los datos
schema = StructType([
StructField("match_id", StringType(), True),
StructField("player_id", IntegerType(), True),
StructField("timestamp", StringType(), True),
StructField("position_x", FloatType(), True),
StructField("position_y", FloatType(), True)
])
# 2. Crear la sesión de Spark Connect
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
print("--- Iniciando Recepción de Datos de Telemetría ---")
# 3. Leer los datos de Kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092") \
.option("subscribe", "player-position") \
.option("startingOffsets", "latest") \
.load()
# Parsear el JSON
player_position_df = df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col("value"), schema).alias("data")) \
.select("data.*")
# --- VARIABLE GLOBAL SEGURA (En la RAM del Spark Connect Server) ---
# Usamos un diccionario de Python estándar en lugar de un DataFrame de Spark
# para evitar que el historial de operaciones (DAG) crezca hasta el infinito.
global_accumulated_state = {}
# 4. Definir el procesamiento DISTRIBUIDO del Micro-Batch
def process_batch(batch_df, epoch_id):
global global_accumulated_state
if batch_df.isEmpty():
return
# 1. CÁLCULO DISTRIBUIDO (En los Workers)
w = Window.partitionBy("match_id", "player_id").orderBy("timestamp")
dist_df = batch_df \
.withColumn("prev_x_raw", lag("position_x").over(w)) \
.withColumn("prev_y_raw", lag("position_y").over(w))
dist_df = dist_df \
.withColumn("prev_x", coalesce(col("prev_x_raw"), col("position_x"))) \
.withColumn("prev_y", coalesce(col("prev_y_raw"), col("position_y")))
dist_df = dist_df.withColumn("step_distance",
sqrt(pow(col("position_x") - col("prev_x"), 2) +
pow(col("position_y") - col("prev_y"), 2)))
summary_df = dist_df.groupBy("match_id", "player_id") \
.agg(_sum("step_distance").alias("distance_meters"))
# Imprimimos la palabra "Batch:" para que tu grep la detecte
print(f"\nBatch: {epoch_id} (Distancia de este Micro-Batch)")
summary_df.orderBy(col("distance_meters").desc()).show()
# 2. CÁLCULO DEL ACUMULADO (En el Driver)
local_results = summary_df.collect()
current_match = None
for row in local_results:
m_id = row["match_id"]
p_id = row["player_id"]
dist = row["distance_meters"]
current_match = m_id
#Usamos una tupla (m_id, p_id) como clave del diccionario.
# Las tuplas son inmutables en Python, así que son claves perfectas.
key = (m_id, p_id)
if key in global_accumulated_state:
global_accumulated_state[key] += dist
else:
global_accumulated_state[key] = dist
# Convertimos el diccionario de vuelta a una lista de tuplas para Spark
# k[0] es match_id, k[1] es player_id
acc_data = [(k[0], k[1], v) for k, v in global_accumulated_state.items()]
# Si hay datos, creamos el DataFrame y lo mostramos
if acc_data:
acc_df = spark.createDataFrame(acc_data,["match_id", "player_id", "total_distance_meters"])
print(f"Batch: {epoch_id} (Acumulado Total Partido)")
acc_df.orderBy(col("total_distance_meters").desc()).show()
# NOTA: En este ejemplo, el acumulado se mantiene en la RAM del servidor de Spark Connect usando un diccionario de Python. Esto es solo para fines educativos y no es una práctica recomendada para producción, ya que no es escalable ni tolerante a fallos. En un entorno real, deberíamos usar un sistema de almacenamiento externo (como Redis, Cassandra, HDFS, etc.) para mantener el estado acumulado de manera persistente y escalable.
# 5. Ejecutar el Stream
query = player_position_df \
.writeStream \
.foreachBatch(process_batch) \
.outputMode("append") \
.start()
print("Streaming en ejecución. Pulsa Ctrl+C para detener...")
try:
# Await termination mantiene el proceso principal bloqueado esperando
query.awaitTermination()
except KeyboardInterrupt:
# Capturamos el Ctrl+C del usuario
print("\nSeñal de parada recibida. Deteniendo el stream en el servidor...")
# Enviamos la orden de parada a Spark Connect
query.stop()
print("Stream detenido correctamente.")
except Exception as e:
print(f"Ocurrió un error inesperado: {e}")
finally:
# Asegurarnos de que cerramos la sesión limpia
spark.stop()
3.4 Ejecución de la Arquitectura (Docker)¶
- Lanzamos el Simulador de movimientos de jugadores (Productor): En tu terminal local o VS Code, ejecuta el script del productor para empezar a enviar datos a Kafka:
- Lanzamos Spark Streaming: En otra terminal local o VS Code, ejecuta el script de Spark para empezar a consumir los datos en tiempo real:
- Monitorización:
- Kafka: Puedes usar el consumidor de consola para ver los mensajes crudos que llegan a Kafka:
docker exec -it kafka-client /opt/kafka/bin/kafka-console-consumer.sh \
--topic player-position \
--bootstrap-server kafka-broker-1:9092 \
--from-beginning
- Spark Connect:
Dado que usamos foreachBatch(process_batch) con prints, los resultados tabulados se imprimirán en los logs del servidor Spark Connect, no en la terminal de tu VS Code.
Visualización de Resultados: Filtrando el Ruido
En la arquitectura **Spark Connect**, la instrucción `.format("console")` imprime los resultados en la salida estándar del **Driver**, que se está ejecutando dentro del contenedor `spark-connect`, no en tu terminal local.
Por defecto, Spark genera una gran cantidad de logs operativos (nivel INFO) que "entierran" las tablas de resultados. Para visualizar limpiamente los micro-batches en tiempo real, utilizaremos un comando de filtrado "positivo":
```bash
docker logs -f spark-connect 2>&1 | grep --line-buffered -E "Batch:|\||\+---"
```
**Desglose técnico del comando:**
* **`2>&1`**: Redirige el canal de error (stderr) al canal estándar para que `grep` pueda procesar todo el flujo de logs.
* **`--line-buffered`**: Obligatorio en streaming. Fuerza a `grep` a imprimir cada línea inmediatamente en lugar de esperar a llenar un buffer de memoria, lo que causaría retrasos en la visualización.
* **`-E "Batch:|\||\+---"`**: Utilizamos una Expresión Regular para filtrar y mostrar **solo** las líneas que nos interesan: las cabeceras de los lotes ("Batch:"), las barras verticales de las columnas ("|") y los bordes de la tabla ("+---").
Abre una terminal para ver las estadísticas del partido en tiempo real:
# Para ver el log completo sin filtro
docker logs -f spark-connect
# Para ver solo los resultados tabulados (filtrando el ruido)
docker logs -f spark-connect 2>&1 | grep --line-buffered -E "Batch:|\||\+---"
3.5 Conclusiones (Docker)¶
Este Pipeline demuestra cómo interactúan múltiples sistemas. A partir de aquí, el foreachBatch podría modificarse fácilmente para escribir el DataFrame final en PostgreSQL, Elasticsearch o HDFS para alimentar un Dashboard de Business Intelligence (como Grafana) para el entrenador del equipo.
Además, a partir de esta configuración, podemos añadir ETL para obtener más conocimiento sobre los jugadores, como velocidades, sprints, mapas de calor, etc, que podría ayudar al stack técnico en diferentes tomas de decisiones (sistema de juego, rendimiento de jugadores, cansancio acumulado,...).
También podríamos añadir información adicional si pudiéramos capturar los datos del balón, que combinados con la de los jugadores, podríamos obtener otros datos valiosos de aplicación de business intelligence para ofrecérselos al stack técnico, tales como: estadísticas de pases, disparos, zonas de mayor juego,...
4. Alternativa a la instalación de librerías pip en cada nodo de Spark.¶
En producción, no es recomendable instalar las librerías de Python manualmente en cada nodo del cluster de Spark, ya que esto puede ser propenso a errores y difícil de mantener. En su lugar, existen varias alternativas para gestionar las dependencias de Python de manera más eficiente:
4.1 Inyectar las dependencias al arrancar el contenedor¶
Forzamos a que el contenedor de Spark Connect ejecute un comando al arrancar que instale las dependencias necesarias antes de iniciar el servidor de Spark Connect. Esto se puede hacer modificando el comando de inicio en el docker-compose.yml:
spark-connect:
image: spark:4.0.1
container_name: spark-connect
hostname: spark-connect
user: root
networks:
- bda-network
ports:
- "15002:15002" # Puerto gRPC expuesto al Host
- "4040:4040" # WebUI de este Driver específico
depends_on:
- spark-master
- namenode
labels:
# Exponemos la UI del Driver de Spark Connect
- "traefik.enable=true"
- "traefik.http.routers.spark-connect-ui.rule=Host(`spark-connect.localhost`)"
- "traefik.http.services.spark-connect-ui.loadbalancer.server.port=4040"
- "traefik.http.routers.spark-connect-ui.entrypoints=web"
# Instalamos librerías y luego lanzamos el servidor
command: >
bash -c "
pip install --no-cache-dir pandas pyarrow grpcio grpcio-status &&
/opt/spark/bin/spark-submit
--master spark://spark-master:7077
--class org.apache.spark.sql.connect.service.SparkConnectServer
--name SparkConnectServer
--conf spark.driver.bindAddress=0.0.0.0
--conf spark.cores.max=1
--conf spark.executor.memory=512m
--packages org.apache.spark:spark-connect_2.13:4.0.1,org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1
"
Habría que hacerlo también en los spark-workers y spark-master si queremos que tengan las mismas librerías, o al menos en los nodos donde vayamos a ejecutar código Python que necesite estas librerías.
4.2 Construir una Imagen Personalizada¶
En producción, no se suele hacer pip install al arrancar un contenedor. Lo que se hace es construir una Imagen Personalizada una sola vez, y usar esa imagen para el Master, los Workers y el Connect.
- Creación de un archivo Dockerfile junto a tu docker-compose.yml:
FROM spark:4.0.1
USER root
# Instalamos todas las dependencias necesarias para cualquier nodo
RUN pip install --no-cache-dir pandas pyarrow grpcio grpcio-status kafka-python-ng
USER 185
- Modificar el docker-compose para construir la imagen personalizada:
spark-connect:
build: . # Construye la imagen desde el Dockerfile
# image: spark:4.0.1... (Lo comentamos o borramos)
# Ya NO necesitas el bash -c "pip install..."
command:["/opt/spark/bin/spark-submit", "--master", ...]
Al hacer docker-compose build, Docker crea una imagen con Spark y Pandas integrados. Luego levanta los 5 contenedores usando esa misma imagen. Todos los nodos son idénticos y tienen las mismas capacidades. Fin de los problemas de dependencias.
¿Cuál opción elegir?
Dejo a vuestra elección la opción que prefiráis, aunque en producción es mucho más común y recomendable la segunda opción (Imagen Personalizada) para garantizar consistencia y tiempos de arranque rápidos.
Pero también depende de las necesidades de tu proyecto, tu entorno de desarrollo y los recursos de tu máquina. Para un entorno de pruebas o desarrollo rápido, la primera opción puede ser más conveniente por su simplicidad. Para producción, la segunda opción es la más profesional y escalable.