Saltar a contenido

UD 8 - Monitor Apache Spark

1. Integración Nativa con Prometheus

En el apartado anterior, aprendimos a monitorizar Kafka utilizando un agente externo (JMX Exporter) porque su arquitectura original no integra la compatibilidad con Prometheus.

Sin embargo, Spark es diferente. Desde la versión 3.0, Spark tiene una integración nativa con Prometheus, lo que significa que no necesitamos ningún agente externo para exponer sus métricas a Prometheus. Esto simplifica enormemente el proceso de monitorización y nos permite obtener métricas detalladas sobre el rendimiento de nuestras aplicaciones Spark de manera más eficiente.

Métricas de Clúster vs. Métricas de Aplicación

Es importante entender qué vamos a monitorizar:

  1. Métricas del Clúster (Lo que haremos aquí): Monitorizamos la salud del Master y los Workers (Memoria usada, Cores libres, Workers vivos). Siempre están disponibles mientras los contenedores estén encendidos.
  2. Métricas de Aplicación: Monitorizan un Job específico (Stages, Tasks, Shuffle Bytes). Estas métricas son efímeras: solo existen mientras el script de Python o Scala se está ejecutando.

2. Habilitar el Endpoint de Prometheus en Spark

Spark utiliza un archivo llamado metrics.properties para definir a dónde envía su telemetría.

Si nos vamos al repositorio oficial de Spark, podemos ver el template que tiene spark para las métricas en el archivo metrics.properties.template. Busca la configuración de Prometheus buscando PrometheusServlet (probablemente al final del archivo). Verás que Spark ya tiene una configuración predefinida comentada para exponer sus métricas a través de un endpoint HTTP compatible con Prometheus. Solo necesitamos habilitar esta configuración y asegurarnos de que el puerto que utiliza esté abierto para que Prometheus pueda acceder a él.

2.1 Habilitar el Endpoint de Prometheus en Spark

Vamos a activar esta configuración para que Spark exponga sus métricas a Prometheus en nuestro cluster de contenedores. Para ello, sigue estos pasos:

  1. En la raíz de nuestro proyecto de docker compose, vamos a crear una nueva carpeta para la configuración de Spark en nuestra monitorización:
mkdir -p monitoring/spark_conf
  1. Crea el archivo metrics.properties dentro de esa carpeta:
nano monitoring/spark_conf/metrics.properties
  1. Añade la siguiente línea (que debería ser la misma que la que aparece en el template). El asterisco (*) indica que esta regla aplica a todos los componentes (Master, Worker y Driver):
monitoring/spark_conf/metrics.properties
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.sink.prometheusServlet.path=/metrics/master/prometheus
applications.sink.prometheusServlet.path=/metrics/applications/prometheus

2.2 Actualizar el docker-compose.yml para Spark

Editamos las secciones volumes de spark-master y de nuestros tres spark-worker:

    # Ejemplo para el Master (Aplica lo mismo a los Workers)
    spark-master:
      # ... resto de configuración ...
      volumes:
        - spark_master_logs:/opt/spark/logs
        # --- NUEVO: Inyectamos la configuración de métricas ---
        - ./monitoring/spark_conf/metrics.properties:/opt/spark/conf/metrics.properties:ro
(Asegúrate de copiar esa última línea también en spark-worker-1, 2 y 3).

2.3 Actualizar la configuración de Prometheus para Spark

Ahora tenemos que decirle a nuestra base de datos de métricas (Prometheus) el endpoint.

Rutas diferentes para Master y Worker

A diferencia de Kafka donde todo colgaba de /metrics, Spark organiza sus endpoints web de forma diferente. El Master expone sus métricas en /metrics/master/prometheus (puerto 8080), mientras que los Workers lo hacen en /metrics/prometheus (puerto 8081). Puedes consultarlo en la documentación oficial de Spark (busca la tabla de componentes y métricas).

Abrimos nuestro archivo monitoring/prometheus.yml y añadimos estos dos nuevos jobs al final de la sección scrape_configs:

monitoring/prometheus.yml
  # ... (Jobs de prometheus y kafka-cluster anteriores) ...

  # 1. Scrape del Spark Master
  - job_name: "spark-master"
    metrics_path: '/metrics/master/prometheus'
    static_configs:
      - targets:
        - "spark-master:8080"

  # 2. Scrape de los Spark Workers
  - job_name: "spark-workers"
    metrics_path: '/metrics/prometheus'
    static_configs:
      - targets: 
        - "spark-worker-1:8081"
        - "spark-worker-2:8081"
        - "spark-worker-3:8081"

2.4 Docker compose completo

Para más comodidad, aquí tienes el docker-compose.yml completo con las modificaciones para la monitorización:

docker-compose.yml
networks:
    bda-network:
        driver: bridge
        name: bda-network

volumes:
    namenode_data:
    secondary_namenode_data:
    datanode1_data:
    datanode2_data:
    datanode3_data:
    portainer_data:
    #postgres_data: # Persistencia del catálogo
    spark_master_logs:
    spark_worker1_logs:
    spark_worker2_logs:
    spark_worker3_logs:
    kafka_controller_1_data:
    kafka_controller_2_data:
    kafka_controller_3_data:
    kafka_broker_1_data:
    kafka_broker_2_data:
    kafka_broker_3_data:
    prometheus_data:
    grafana_data: {}
services:
    # --- INFRAESTRUCTURA DEVOPS ---

    # --- TRAEFIK: Reverse Proxy y Gestor de Rutas ---
    # Traefik actúa como el único punto de entrada (puerta de enlace) para nuestro cluster.
    # Su función es interceptar todas las peticiones HTTP (en el puerto 80) y, basándose
    # en el dominio solicitado (ej. 'portainer.localhost'), redirigir el tráfico al 
    # contenedor correcto de forma automática. Esto nos evita tener que gestionar y recordar
    # un puerto diferente para cada servicio. El dashboard en el puerto 8089 nos permite
    # ver las rutas que ha descubierto y si están activas.
    #Si quiere añadirlo a más servicios, usa la etiqueta 'labels' para definir las reglas de Traefik, como está en portainer y namenode
    traefik:
        image: traefik:v3.6.2
        container_name: traefik
        command:
        - "--api.insecure=true" # Habilita el dashboard inseguro para desarrollo
        - "--providers.docker=true" # Escucha eventos de Docker
        - "--providers.docker.exposedbydefault=false" # Seguridad: No exponer nada automáticamente
        - "--entrypoints.web.address=:80" # Punto de entrada HTTP estándar
        ports:
        - "80:80"       # Peticiones HTTP del host
        - "8089:8080"   # Dashboard de administración de Traefik
        volumes:
        - "/var/run/docker.sock:/var/run/docker.sock:ro" # Traefik necesita acceso al socket de Docker
        networks:
        - bda-network

    # --- PORTAINER: Interfaz Gráfica para Docker ---
    # Portainer nos da una UI web para gestionar de forma visual nuestros contenedores,
    # imágenes, redes y volúmenes, facilitando la administración del entorno Docker.
    #
    # Este servicio está configurado para ser accesible de dos maneras:
    # 1. Acceso Directo: A través del puerto 9000 (http://localhost:9000). Esto es gracias
    #    a la sección 'ports'. Es un acceso fiable y directo.
    # 2. Acceso vía Traefik: Las 'labels' definen la regla para acceder por el dominio
    #    http://portainer.localhost. Esto permite unificar el acceso a todos los servicios
    #    bajo el mismo proxy inverso, usando nombres amigables en lugar de puertos
    portainer:
        image: portainer/portainer-ce:latest
        container_name: portainer
        networks:
        - bda-network
        ports:
            # Mapeo de puertos directo: <PUERTO_EN_EL_HOST>:<PUERTO_EN_EL_CONTENEDOR>
            # Exponemos la UI de Portainer (puerto 9000) en el puerto 9010 de nuestra máquina para evitar conflictos con el 9000 del namenode
            - "9010:9000"
        volumes:
        - "/var/run/docker.sock:/var/run/docker.sock:ro" # Acceso al socket de Docker
        - portainer_data:/data # Volumen para persistir la data de Portainer
        command: -H unix:///var/run/docker.sock
        labels:
            # Activamos Traefik para este contenedor
            - "traefik.enable=true"
            # Regla: si el host es 'portainer.localhost', reenvía a este servicio
            - "traefik.http.routers.portainer.rule=Host(`portainer.localhost`)"
            # Definimos el punto de entrada (entrypoint) como 'web' (puerto 80)
            - "traefik.http.routers.portainer.entrypoints=web"
            # IMPORTANTE: Decirle a Traefik cuál es el puerto interno del servicio web (9000)
            - "traefik.http.services.portainer.loadbalancer.server.port=9000"

    # --- SERVICIOS BIG DATA (Con etiquetas Traefik) ---
    # --- CAPA DE ALMACENAMIENTO (HDFS)  ---
    namenode:
        image: apache/hadoop:3.4.1
        container_name: namenode
        hostname: namenode
        user: root
        networks:
            bda-network:
                aliases:
                    - cluster-bda # Alias for the namenode service. Para que así pueda resolver docker. Docker registra ese nombre de host en la red.
        ports:
            - "9870:9870" # UI Web
        env_file:
            - ./hadoop.env
        #environment:
            # Esta variable formatea el NameNode si la carpeta está vacía
            #- ENSURE_NAMENODE_DIR="/opt/hadoop/hadoop_data/hdfs/namenode"
        volumes:
            - namenode_data:/opt/hadoop/hadoop_data/hdfs/namenode
        #command: ["hdfs", "namenode"]
        command: 
        - "/bin/bash"
        - "-c"
        - "if [ ! -d /opt/hadoop/hadoop_data/hdfs/namenode/current ]; then echo '--- FORMATTING NAMENODE (FRESH START) ---'; hdfs namenode -format -nonInteractive; else echo '--- NAMENODE DATA FOUND (NO FORMAT) ---'; fi; hdfs namenode"
        labels:
            # Activamos Traefik para este contenedor
            - "traefik.enable=true"
            # Regla de enrutamiento: responder a namenode.localhost
            - "traefik.http.routers.namenode.rule=Host(`namenode.localhost`)"
            # IMPORTANTE: Decirle a Traefik cuál es el puerto interno del servicio web (9870)
            - "traefik.http.services.namenode.loadbalancer.server.port=9870"


    secondary_namenode:
        image: apache/hadoop:3.4.1
        container_name: secondary_namenode
        hostname: secondary_namenode
        user: root
        networks:
            - bda-network
        ports:
            - "9868:9868"
        env_file:
            - ./hadoop.env
        depends_on:
            - namenode
        volumes:
            - secondary_namenode_data:/opt/hadoop/hadoop_data/hdfs/secondary_namenode
        command: ["hdfs", "secondarynamenode"]

    datanode1:
        image: apache/hadoop:3.4.1
        container_name: datanode1
        hostname: datanode1
        user: root
        networks:
            - bda-network
        env_file:
            - ./hadoop.env
        depends_on:
            - namenode
        volumes:
            - datanode1_data:/opt/hadoop/hadoop_data/hdfs/datanode
        command: ["hdfs", "datanode"]

    datanode2:
        image: apache/hadoop:3.4.1
        container_name: datanode2
        hostname: datanode2
        user: root
        networks:
            - bda-network
        env_file:
            - ./hadoop.env
        depends_on:
            - namenode
        volumes:
            - datanode2_data:/opt/hadoop/hadoop_data/hdfs/datanode
        command: ["hdfs", "datanode"]

    datanode3:
        image: apache/hadoop:3.4.1
        container_name: datanode3
        hostname: datanode3
        user: root
        networks:
            - bda-network
        env_file:
            - ./hadoop.env
        depends_on:
            - namenode
        volumes:
            - datanode3_data:/opt/hadoop/hadoop_data/hdfs/datanode
        command: ["hdfs", "datanode"]


    # --- CAPA DE PROCESAMIENTO (YARN) ---

    resourcemanager:
        image: apache/hadoop:3.4.1
        container_name: resourcemanager
        hostname: resourcemanager
        user: root
        networks:
            - bda-network
        ports:
            - "8088:8088" # YARN ResourceManager Web UI
            - "8032:8032" # YARN ResourceManager RPC
        env_file:
            - ./hadoop.env
        depends_on:
            - namenode
        command: ["yarn", "resourcemanager"]
        labels:
            - "traefik.enable=true"
            - "traefik.http.routers.resourcemanager.rule=Host(`yarn.localhost`)"
            - "traefik.http.services.resourcemanager.loadbalancer.server.port=8088"

    # NodeManager asociado a DataNode1
    nodemanager1:
        image: apache/hadoop:3.4.1
        container_name: nodemanager1
        hostname: nodemanager1
        user: root
        networks:
            - bda-network
        ports:
            - "8042:8042" # Puerto único para la Web UI del NM1
        env_file:
            - ./hadoop.env
        depends_on:
            - resourcemanager
            - namenode
        command: ["yarn", "nodemanager"]

    # NodeManager asociado a DataNode2
    nodemanager2:
        image: apache/hadoop:3.4.1
        container_name: nodemanager2
        hostname: nodemanager2
        user: root
        networks:
            - bda-network
        ports:
            - "8043:8042" # Puerto único para la Web UI del NM2
        env_file:
            - ./hadoop.env
        depends_on:
            - resourcemanager
            - namenode
        command: ["yarn", "nodemanager"]

    # NodeManager asociado a DataNode3
    nodemanager3:
        image: apache/hadoop:3.4.1
        container_name: nodemanager3
        hostname: nodemanager3
        user: root
        networks:
            - bda-network
        ports:
            - "8044:8042" # Puerto único para la Web UI del NM3
        env_file:
            - ./hadoop.env
        depends_on:
            - resourcemanager
            - namenode
        command: ["yarn", "nodemanager"]

    # --- CAPA DE PERSISTENCIA RELACIONAL (Catalog Database) ---
    #postgres:
    #  image: postgres:18
    #  container_name: postgres
    #  networks:
    #    - bda-network
    #  environment:
    #    POSTGRES_DB: metastore_db
    #    POSTGRES_USER: hive
    #    POSTGRES_PASSWORD: hive_password
    #    # Le decimos a Postgres dónde poner los datos exactamente (Necesario para Postgres 18+)
    #    PGDATA: /var/lib/postgresql/data/pgdata
    #  volumes:
    #    - postgres_data:/var/lib/postgresql/data
    #  # healthcheck declara una verificación que se ejecuta para determinar si los contenedores de servicio están "healthy" o no.
    #  healthcheck:
    #    test: ["CMD-SHELL", "pg_isready -U hive -d metastore_db"]
    #    interval: 10s
    #    timeout: 5s
    #    retries: 5
    # --- SERVICIO DE METADATOS (Hive Metastore) ---
    #metastore:
    #  image: apache/hive:standalone-metastore-4.2.0
    #  container_name: metastore
    #  networks:
    #      - bda-network
    #  environment:
    #    SERVICE_NAME: metastore
    #    # Credenciales explícitas para el script de inicio de la imagen
    #    DB_DRIVER: postgres
    #    # Configuración JAVA directa (Sin XMLs intermedios para la DB)
    #    SERVICE_OPTS: >-
    #        -Xmx1G 
    #        -Djavax.jdo.option.ConnectionDriverName=org.postgresql.Driver
    #        -Djavax.jdo.option.ConnectionURL=jdbc:postgresql://postgres:5432/metastore_db
    #        -Djavax.jdo.option.ConnectionUserName=hive
    #        -Djavax.jdo.option.ConnectionPassword=hive_password
    #        -Dhive.metastore.warehouse.dir=hdfs://namenode:9000/user/hive/warehouse
    #        -Dfs.defaultFS=hdfs://namenode:9000
    #  depends_on:
    #    postgres:
    #      # Establece la condición bajo la cual se considera satisfecha la dependencia (https://docs.docker.com/reference/compose-file/services/#depends_on)
    #      condition: service_healthy
    #    namenode:
    #      condition: service_started
    #  ports:
    #    - "9083:9083" # Puerto Thrift expuesto para clientes externos
    #  volumes:
    #    # Inyección del Driver postgres y librerias YARN (Fix SystemClock Error)
    #    - ./drivers/postgresql-42.7.8.jar:/opt/hive/lib/postgresql-jdbc.jar
    #    # --- NUEVOS PARCHES PARA YARN (Fix SystemClock Error) ---
    #    - ./drivers/hadoop-yarn-common-3.4.1.jar:/opt/hive/lib/hadoop-yarn-common-3.4.1.jar
    #    - ./drivers/hadoop-yarn-api-3.4.1.jar:/opt/hive/lib/hadoop-yarn-api-3.4.1.jar

    # --- MOTOR DE EJECUCIÓN SQL (HiveServer2) ---
    #hiveserver2:
    #  image: apache/hive:4.2.0
    #  container_name: hiveserver2
    #  networks:
    #    - bda-network
    #  environment:
    #    SERVICE_NAME: hiveserver2
    #    TEZ_CONTAINER_SIZE: 512 # Ajuste de memoria para contenedores Tez
    #    # Evita re-inicializar el esquema (ya lo hace el metastore)
    #    IS_RESUME: "true"
    #    # Configuración: Conéctate al metastore remoto y usa HDFS
    #    SERVICE_OPTS: >-
    #        -Dhive.metastore.uris=thrift://metastore:9083
    #        -Dhive.metastore.warehouse.dir=hdfs://namenode:9000/user/hive/warehouse
    #        -Dfs.defaultFS=hdfs://namenode:9000
    #  depends_on:
    #    metastore:
    #      condition: service_started
    #    resourcemanager:
    #      condition: service_started
    #  ports:
    #    - "10000:10000" # Puerto JDBC (Beeline/DBeaver)
    #    - "10002:10002" # Web UI de HiveServer2
    #  labels:
    #    - "traefik.enable=true"
    #    # Acceso HTTP a la UI de Hive
    #    - "traefik.http.routers.hive.rule=Host(`hive.localhost`)"
    #    - "traefik.http.services.hive.loadbalancer.server.port=10002"
    #    - "traefik.http.routers.hive.entrypoints=web"
    #    # Desactivando el paso de la cabecera Host (passHostHeader = false).
    #    # Le decimos a Traefik que NO pase el nombre de dominio original al contenedor.
    #    # Así, Traefik reescribirá la cabecera Host para que coincida con la IP interna del contenedor y el puerto 10002.
    #    # Esto evita que Jetty de error en hiveserver2 por recibir tráfico del puerto 80.
    #    - "traefik.http.services.hive.loadbalancer.passhostheader=false"

    # -------------------------------------------
    # --- MOTOR DE PROCESAMIENTO (Spark) ---
    # APACHE SPARK (COMPUTE LAYER)
    # Versión: 4.0.1 (Official Docker Image)
    # Arquitectura: Standalone Mode (1 Master + 3 Workers)
    # -------------------------------------------

    spark-master:
        image: spark:4.0.1
        container_name: spark-master
        hostname: spark-master
        user: root # Necesario para escribir logs en volúmenes
        networks:
            - bda-network
        ports:
            - "7077:7077" # Puerto RPC (Necesario para enviar trabajos desde fuera)
            - 8080:8080 # Puerto Web UI
        volumes:
            - spark_master_logs:/opt/spark/logs
            # --- Inyectamos la configuración de métricas ---
            - ./monitoring/spark_conf/metrics.properties:/opt/spark/conf/metrics.properties:ro
        labels:
            - "traefik.enable=true"
            - "traefik.http.routers.spark.rule=Host(`spark.localhost`)"
            - "traefik.http.services.spark.loadbalancer.server.port=8080"
            - "traefik.http.routers.spark.entrypoints=web"
        # Arrancamos la clase Master directamente
        command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.master.Master", "--host", "spark-master", "--port", "7077", "--webui-port", "8080"]

    spark-worker-1:
        image: spark:4.0.1
        container_name: spark-worker-1
        hostname: spark-worker-1
        user: root
        networks:
            - bda-network
        #environment:
            # --- GESTIÓN DE RECURSOS ---
            # Spark detecta nativamente estas variables al arrancar la JVM.
            #- SPARK_WORKER_MEMORY=1G  # Límite de RAM por Worker
            # - SPARK_WORKER_CORES=1  # Descomentar para limitar CPU
        volumes:
            - spark_worker1_logs:/opt/spark/logs
            # --- Inyectamos la configuración de métricas ---
            - ./monitoring/spark_conf/metrics.properties:/opt/spark/conf/metrics.properties:ro
        depends_on:
            - spark-master
            - namenode
        # Arrancamos la clase Worker apuntando al Master
        command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]

    spark-worker-2:
        image: spark:4.0.1
        container_name: spark-worker-2
        hostname: spark-worker-2
        user: root
        networks:
            - bda-network
        #environment:
            #- SPARK_WORKER_MEMORY=1G
            # - SPARK_WORKER_CORES=1  # Descomentar para limitar CPU
        volumes:
            - spark_worker2_logs:/opt/spark/logs
            # --- Inyectamos la configuración de métricas ---
            - ./monitoring/spark_conf/metrics.properties:/opt/spark/conf/metrics.properties:ro
        depends_on:
            - spark-master
            - namenode
        # Arrancamos la clase Worker apuntando al Master
        command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]

    spark-worker-3:
        image: spark:4.0.1
        container_name: spark-worker-3
        hostname: spark-worker-3
        user: root
        networks:
            - bda-network
        #environment:
            #- SPARK_WORKER_MEMORY=1G
            # - SPARK_WORKER_CORES=1  # Descomentar para limitar CPU
        volumes:
            - spark_worker3_logs:/opt/spark/logs
            # --- Inyectamos la configuración de métricas ---
            - ./monitoring/spark_conf/metrics.properties:/opt/spark/conf/metrics.properties:ro
        depends_on:
            - spark-master
            - namenode
        # Arrancamos la clase Worker apuntando al Master
        command: ["/opt/spark/bin/spark-class", "org.apache.spark.deploy.worker.Worker", "--webui-port", "8081", "spark://spark-master:7077"]

    # -------------------------------------------
    # SPARK CONNECT SERVER (Gateway para Clientes Remotos)
    # Este servicio permite conectar IDEs (VSCode/PyCharm) desde el Host
    # -------------------------------------------
    spark-connect:
        image: spark:4.0.1
        container_name: spark-connect
        hostname: spark-connect
        user: root
        networks:
            - bda-network
        ports:
            - "15002:15002" # Puerto gRPC expuesto al Host
            - "4040:4040"   # WebUI de este Driver específico
        depends_on:
            - spark-master
            - namenode
        labels:
            # Exponemos la UI del Driver de Spark Connect
            - "traefik.enable=true"
            - "traefik.http.routers.spark-connect-ui.rule=Host(`spark-connect.localhost`)"
            - "traefik.http.services.spark-connect-ui.loadbalancer.server.port=4040"
            - "traefik.http.routers.spark-connect-ui.entrypoints=web"
        # Arrancamos el servidor de Spark Connect conectándose al Master
        command: 
            - "/opt/spark/bin/spark-submit"
            - "--master"
            - "spark://spark-master:7077"
            - "--class"
            - "org.apache.spark.sql.connect.service.SparkConnectServer"
            - "--name"
            - "SparkConnectServer"
            - "--conf"
            - "spark.driver.bindAddress=0.0.0.0"
            # --- Límites para evitar Starvation si no tienes recursos suficientes ---
            # Limitamos a 1 núcleo en total (deja libres los otros para tus jobs)
            - "--conf"
            - "spark.cores.max=1"
            # Limitamos la memoria por ejecutor (deja RAM libre en los workers)
            - "--conf"
            - "spark.executor.memory=512m"
            # ---------------------------------------------
            # Importante: Las librerías de Connect suelen venir en la imagen,
            # pero apuntamos al paquete local por seguridad en la carga de clases.
            # Añadimos también la librería de Kafka separada por coma del paquete de Connect para Spark Streaming con Kafka
            - "--packages"
            - "org.apache.spark:spark-connect_2.13:4.0.1,org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1"

    # -------------------------------------------
    # APACHE KAFKA (STREAMING LAYER)
    # Arquitectura: 3 Controllers + 3 Brokers (KRaft Isolated)
    # -------------------------------------------

    kafka-controller-1:
      image: apache/kafka:4.2.0
      container_name: kafka-controller-1
      user: root
      networks:
        - bda-network
      environment:
        KAFKA_NODE_ID: 1
        KAFKA_PROCESS_ROLES: controller
        KAFKA_LISTENERS: CONTROLLER://:9093
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        # Limitamos RAM para entornos de laboratorio
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
        # --- INYECCIÓN DEL AGENTE JMX ---
        # Le decimos a Java que cargue el agente en el puerto 9091 usando el archivo de reglas
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      volumes:
        - kafka_controller_1_data:/tmp/kafka-logs
        # Montamos la carpeta local en el contenedor en modo lectura
        - ./monitoring/jmx:/opt/kafka/jmx:ro

    kafka-controller-2:
      image: apache/kafka:4.2.0
      container_name: kafka-controller-2
      user: root
      networks:
        - bda-network
      environment:
        KAFKA_NODE_ID: 2
        KAFKA_PROCESS_ROLES: controller
        KAFKA_LISTENERS: CONTROLLER://:9093
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      volumes:
        - kafka_controller_2_data:/tmp/kafka-logs
        - ./monitoring/jmx:/opt/kafka/jmx:ro

    kafka-controller-3:
      image: apache/kafka:4.2.0
      container_name: kafka-controller-3
      user: root
      networks:
        - bda-network
      environment:
        KAFKA_NODE_ID: 3
        KAFKA_PROCESS_ROLES: controller
        KAFKA_LISTENERS: CONTROLLER://:9093
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx256M -Xms256M"
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      volumes:
        - kafka_controller_3_data:/tmp/kafka-logs
        - ./monitoring/jmx:/opt/kafka/jmx:ro

    kafka-broker-1:
      image: apache/kafka:4.2.0
      container_name: kafka-broker-1
      user: root
      networks:
        - bda-network
      ports:
        - "9094:9094" # Puerto mapeado al Host
      environment:
        KAFKA_NODE_ID: 4
        KAFKA_PROCESS_ROLES: broker
        KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9094
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:9092,EXTERNAL://localhost:9094
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
        # --- INYECCIÓN DEL AGENTE JMX ---
        # Le decimos a Java que cargue el agente en el puerto 9091 usando el archivo de reglas
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      depends_on:
        - kafka-controller-1
        - kafka-controller-2
        - kafka-controller-3
      volumes:
        - kafka_broker_1_data:/tmp/kafka-logs
        # Montamos la carpeta local en el contenedor en modo lectura
        - ./monitoring/jmx:/opt/kafka/jmx:ro

    kafka-broker-2:
      image: apache/kafka:4.2.0
      container_name: kafka-broker-2
      user: root
      networks:
        - bda-network
      ports:
        - "9095:9095"
      environment:
        KAFKA_NODE_ID: 5
        KAFKA_PROCESS_ROLES: broker
        KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9095
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-2:9092,EXTERNAL://localhost:9095
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      depends_on:
        - kafka-controller-1
        - kafka-controller-2
        - kafka-controller-3
      volumes:
        - kafka_broker_2_data:/tmp/kafka-logs
        - ./monitoring/jmx:/opt/kafka/jmx:ro

    kafka-broker-3:
      image: apache/kafka:4.2.0
      container_name: kafka-broker-3
      user: root
      networks:
        - bda-network
      ports:
        - "9096:9096"
      environment:
        KAFKA_NODE_ID: 6
        KAFKA_PROCESS_ROLES: broker
        KAFKA_LISTENERS: PLAINTEXT://:9092,EXTERNAL://:9096
        KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-3:9092,EXTERNAL://localhost:9096
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
        KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
        KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-controller-1:9093,2@kafka-controller-2:9093,3@kafka-controller-3:9093
        KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
        CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
        KAFKA_JVM_PERFORMANCE_OPTS: "-Xmx512M -Xms512M"
        KAFKA_OPTS: "-javaagent:/opt/kafka/jmx/jmx_prometheus_javaagent-1.5.0.jar=9091:/opt/kafka/jmx/kafka-jmx.yml"
      depends_on:
        - kafka-controller-1
        - kafka-controller-2
        - kafka-controller-3
      volumes:
        - kafka_broker_3_data:/tmp/kafka-logs
        - ./monitoring/jmx:/opt/kafka/jmx:ro

# --- OBSERVABILIDAD (Kafbat UI) ---
    kafka-ui:
      image: ghcr.io/kafbat/kafka-ui:latest
      container_name: kafka-ui
      networks:
        - bda-network
      environment:
        # Nombre que aparecerá en la interfaz
        KAFKA_CLUSTERS_0_NAME: bda-cluster
        # Apuntamos a los puertos INTERNOS de los brokers en Docker
        KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
        # Habilitamos la configuración dinámica desde la propia UI (muy útil en Labs)
        DYNAMIC_CONFIG_ENABLED: 'true'
        # Le dice a la aplicación Spring Boot que respete las cabeceras que le envía Traefik
        SERVER_FORWARD_HEADERS_STRATEGY: FRAMEWORK
        # Le inyectamos el worker directamente. La UI lo auto-descubrirá.
        KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: cluster-conectores
        KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-worker-1:8083
      depends_on:
        - kafka-broker-1
        - kafka-broker-2
        - kafka-broker-3
      labels:
        # Integración con nuestro Traefik
        - "traefik.enable=true"
        - "traefik.http.routers.kafkaui.rule=Host(`kafka.localhost`)"
        - "traefik.http.services.kafkaui.loadbalancer.server.port=8080"
        - "traefik.http.routers.kafkaui.entrypoints=web"
# --- CLIENTE KAFKA (Edge Node) ---
    # Usado exclusivamente para administrar el clúster sin afectar a los brokers
    kafka-client:
      image: apache/kafka:4.2.0
      container_name: kafka-client
      networks:
        - bda-network
      depends_on:
        - kafka-broker-1
        - kafka-broker-2
        - kafka-broker-3
      # Comando para mantener el contenedor vivo
      command: tail -f /dev/null

# --- Kafka Connect ---    
    # =======================================================
    # PLANTILLA BASE PARA KAFKA CONNECT (YAML Anchor)
    # Definimos toda la configuración común para no repetirla
    # =======================================================
    x-connect-defaults: &connect-defaults
      image: apache/kafka:4.2.0
      networks:
        - bda-network
      environment:
        CONNECT_BOOTSTRAP_SERVERS: kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092
        # Mismo Group ID une los workers en un solo clúster
        CONNECT_GROUP_ID: bda-connect-cluster
        CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
        CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
        CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
        CONNECT_PLUGIN_PATH: /opt/kafka/plugins
      depends_on:
        - kafka-broker-1
        - kafka-broker-2
        - kafka-broker-3
      volumes:
        - ./kafka_connect_plugins:/opt/kafka/plugins:ro
        # Inyección directa del driver MySQL (Asegúrate de tenerlo en esa ruta)
        - ./kafka_connect_plugins/drivers/mysql-connector-j-9.6.0.jar:/opt/kafka/libs/mysql-connector-j.jar:ro
      command: 
        - bash
        - -c
        - |
          echo "bootstrap.servers=$$CONNECT_BOOTSTRAP_SERVERS" > /tmp/connect-distributed.properties
          echo "group.id=$$CONNECT_GROUP_ID" >> /tmp/connect-distributed.properties
          echo "key.converter=$$CONNECT_KEY_CONVERTER" >> /tmp/connect-distributed.properties
          echo "value.converter=$$CONNECT_VALUE_CONVERTER" >> /tmp/connect-distributed.properties
          echo "key.converter.schemas.enable=$$CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE" >> /tmp/connect-distributed.properties
          echo "value.converter.schemas.enable=$$CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE" >> /tmp/connect-distributed.properties
          echo "offset.storage.topic=connect-offsets" >> /tmp/connect-distributed.properties
          echo "offset.storage.replication.factor=3" >> /tmp/connect-distributed.properties
          echo "config.storage.topic=connect-configs" >> /tmp/connect-distributed.properties
          echo "config.storage.replication.factor=3" >> /tmp/connect-distributed.properties
          echo "status.storage.topic=connect-status" >> /tmp/connect-distributed.properties
          echo "status.storage.replication.factor=3" >> /tmp/connect-distributed.properties
          echo "plugin.path=$$CONNECT_PLUGIN_PATH" >> /tmp/connect-distributed.properties

          echo "Iniciando Kafka Connect en modo distribuido..."
          /opt/kafka/bin/connect-distributed.sh /tmp/connect-distributed.properties

    # -------------------------------------------
    # KAFKA CONNECT CLUSTER (3 Workers)
    # Heredan de la plantilla base (<<: *connect-defaults)
    # -------------------------------------------
    kafka-worker-1:
      <<: *connect-defaults
      container_name: kafka-worker-1
      ports:
        - "8083:8083" # API REST Worker 1

    kafka-worker-2:
      <<: *connect-defaults
      container_name: kafka-worker-2
      ports:
        - "8084:8083" # API REST Worker 2 (Mapeado al 8084 del Host)

    kafka-worker-3:
      <<: *connect-defaults
      container_name: kafka-worker-3
      ports:
        - "8085:8083" # API REST Worker 3 (Mapeado al 8085 del Host)

    # -------------------------------------------
    # Monitoring con Prometheus
    # -------------------------------------------
    prometheus:
      image: prom/prometheus:v3.5.1
      container_name: prometheus
      networks:
        - bda-network
      ports:
        - "9090:9090" # Acceso directo a la UI de Prometheus
      volumes:
        # Inyectamos nuestra configuración
        - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
        # Persistencia de métricas
        - prometheus_data:/prometheus
      command:
        - '--config.file=/etc/prometheus/prometheus.yml'
        - '--storage.tsdb.path=/prometheus'
      labels:
        # --- ENRUTAMIENTO TRAEFIK ---
        - "traefik.enable=true"
        - "traefik.http.routers.prometheus.rule=Host(`prometheus.localhost`)"
        - "traefik.http.services.prometheus.loadbalancer.server.port=9090"
        - "traefik.http.routers.prometheus.entrypoints=web"

    # --- DASHBOARDS VISUALES ---
    grafana:
      image: grafana/grafana
      container_name: grafana
      restart: unless-stopped
      networks:
        - bda-network
      ports:
        - "3000:3000" # Acceso a la UI de Grafana
      environment:
        - GF_SECURITY_ADMIN_PASSWORD=admin # Contraseña por defecto
      volumes:
        - grafana_data:/var/lib/grafana
      depends_on:
        - prometheus
      labels:
        # Opcional: Integración con Traefik
        - "traefik.enable=true"
        - "traefik.http.routers.grafana.rule=Host(`grafana.localhost`)"
        - "traefik.http.services.grafana.loadbalancer.server.port=3000"
        - "traefik.http.routers.grafana.entrypoints=web"

    # --- KAFKA EXPORTER (Calculador de Lag de Consumidores) ---
    kafka-exporter:
      image: danielqsj/kafka-exporter:latest
      container_name: kafka-exporter
      networks:
        - bda-network
      command: 
        - "--kafka.server=kafka-broker-1:9092"
        - "--kafka.server=kafka-broker-2:9092"
        - "--kafka.server=kafka-broker-3:9092"
      ports:
        - "9308:9308" # Puerto donde expone las métricas
      depends_on:
        - kafka-broker-1
        - kafka-broker-2
        - kafka-broker-3

2.5 Prometheus completo

Para más comodidad, aquí tienes el bloque completo de prometheus.yml con la configuración que vimos antes.:

monitoring/prometheus.yml
# my global config
global:
  scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
    - static_configs:
        - targets:
          # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: "prometheus"

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
      - targets: ["localhost:9090"]
       # The label name is added as a label `label_name=<label_value>` to any timeseries scraped from this config.
        labels:
          app: "prometheus"
  # Monitorización de nuestros 3 Brokers de Kafka
  # Prometheus usará el DNS interno de Docker para encontrar estos contenedores
  - job_name: "kafka-brokers"
    static_configs:
      - targets: 
        - "kafka-broker-1:9091"
        - "kafka-broker-2:9091"
        - "kafka-broker-3:9091"
  # Monitorización de nuestros 3 Controllers de Kafka
  # Prometheus usará el DNS interno de Docker para encontrar estos contenedores
  - job_name: "kafka-controllers"
    static_configs:
      - targets: 
        - "kafka-controller-1:9091"
        - "kafka-controller-2:9091"
        - "kafka-controller-3:9091"

  - job_name: "kafka"
    static_configs:
      - targets: 
        - "kafka-controller-1:9091"
        - "kafka-controller-2:9091"
        - "kafka-controller-3:9091"
        - "kafka-broker-1:9091"
        - "kafka-broker-2:9091"
        - "kafka-broker-3:9091"

  # Scrape del calculador de Lag
  - job_name: "kafka-exporter"
    static_configs:
      - targets: ["kafka-exporter:9308"]

  # Monitorización de nuestro Clúster de Spark
  # Scrape del Spark Master
  - job_name: "spark-master"
    metrics_path: '/metrics/master/prometheus'
    static_configs:
      - targets:
        - "spark-master:8080"

  # Scrape de los Spark Workers
  - job_name: "spark-workers"
    metrics_path: '/metrics/prometheus'
    static_configs:
      - targets: 
        - "spark-worker-1:8081"
        - "spark-worker-2:8081"
        - "spark-worker-3:8081"

2.6 Despliegue y verificación

Reiniciamos la infraestructura para aplicar los cambios de volúmenes y la nueva configuración de Prometheus.

docker-compose down
docker-compose up -d

Antes de ir a Grafana, verificamos que el endpoint HTTP está funcionando.

  1. Nos aseguramos que se ha habilitado correctamente las métricas en Spark. Lo podemos observar entrando en el apartado Spark Properties de Environments en el Spark UI http://spark.localhost/environment/ usando Traefik o http://localhost:8080/environment/. Deberíamos ver la propiedad spark.ui.prometheus.enabled con el valor true.

  2. También podemos verificar que el endpoint HTTP de métricas está funcionando correctamente. Para ello, abrimos el navegador y entramos en el endpoint del Master http://spark.localhost:8080/metrics/master/prometheus. Deberíamos ver una página con un formato similar a este:

metrics_master_aliveWorkers_Number{type="gauges"} 3
metrics_master_aliveWorkers_Value{type="gauges"} 3
metrics_master_apps_Number{type="gauges"} 0
metrics_master_apps_Value{type="gauges"} 0
metrics_master_waitingApps_Number{type="gauges"} 0
metrics_master_waitingApps_Value{type="gauges"} 0
metrics_master_workers_Number{type="gauges"} 3
metrics_master_workers_Value{type="gauges"} 3
.....
  1. Por último, verificamos que Prometheus está recogiendo correctamente las métricas. Para ello, entramos en el panel de Prometheus (http://prometheus.localhost/targets) usando Traefik o (http://localhost:9090/targets). Deberíamos ver los jobs spark-master y spark-workers con todos los endpoints en estado UP (Verde).
Figura 8.1_Monitor_Spark: Prometheus Docker

Abrimos el navegador y entramos en el panel de Prometheus: http://prometheus.localhost/targets

Deberíamos ver los jobs spark-master y spark-workers con todos los endpoints en estado UP (Verde).

3. Visualización con Grafana

Ya tenemos los datos fluyendo hacia Prometheus. Solo falta añadir el dashboard. Haremos lo mismo que hicimos para el clúster de Kafka, pero esta vez con un dashboard específico para Spark. Por tanto:

  1. Abrimos Grafana en tu navegador: http://grafana.localhost
  2. Vamos al menú lateral: Dashboards -> New -> Import.
  3. En este caso, usaremos de nuevo un dashboard personalizado que puedes descargar en este enlace.
  4. Haz clic en Upload JSON file y selecciona el archivo que acabas de descargar.
  5. En el último paso, en la opción DS_PROMETHEUS, selecciona tu conexión de Prometheus en el desplegable y haz clic en Import.
Figura 8.1_Monitor_Spark: Grafana y Prometheus 1 Docker

Generar Tráfico

Si el dashboard aparece muy "plano", es porque el clúster está inactivo. Lanza el script del "Cálculo de Pi" o el ejemplo de "Pandas on Spark" que hicimos en los temas anteriores. Inmediatamente verás cómo las gráficas de Cores Used, Active Workers y Memory pegan un salto en tiempo real. ¡Has logrado visibilidad total sobre tu motor de cómputo!

4. Ejemplo 1: Monitorización del cálculo de Pi con Spark

Vamos a lanzar el clásico ejemplo de cálculo de Pi con Spark para generar algo de tráfico en el clúster y ver cómo se refleja en nuestro dashboard de Grafana.

  1. Abrimos nuestro Dashboard de Grafana para Spark.
  2. Lanzamos el script de cálculo de Pi en nuestro cluster, tal y como hicimos en el ejemplo del cálculo de Pi con Spark en la UD6.
  3. Ejecutamos el script:

docker exec -it spark-master /opt/spark/bin/spark-submit \
  --master spark://spark-master:7077 \
  --class org.apache.spark.examples.SparkPi \
  /opt/spark/examples/jars/spark-examples_2.13-4.0.1.jar 10
4. Cambia el parámetro de entrada (10) por un número mayor para generar más carga y observar mejor las métricas. Por ejemplo, puedes usar 100 o 1000.

  1. Observa cómo las métricas de tu dashboard de Grafana para Spark reflejan el aumento de actividad en el clúster: el número de cores usados, la memoria consumida, el número de tareas activas, etc.
Animación 8.1_Monitor_Spark: Ejemplo 1 Docker