UD 7 - Apache Kafka¶
En los últimos años el número de aplicaciones a desarrollar por las empresas ha aumentado considerablemente con la llegada de las arquitecturas basadas en microservicios y sistemas de Big Data.
Uno de los aspectos más relevantes es la comunicación entre ellos, o la necesidad de tener que integrarse con otros sistemas enviando o recibiendo información. En estos casos, estas comunicaciones deberán ser rápidas, seguras y fiables con una alta disponibilidad.
Una de las soluciones para solventar este tipo de casos han supuesto el uso de tecnologías basadas en colas de mensajes, las cuales permiten la comunicación asíncrona, lo que significa que los puntos de conexión que producen y consumen los mensajes interactúan con la cola, no entre sí. Además, ayudan a simplificar de forma significativa la escritura de código para aplicaciones desacopladas, mejorando el rendimiento, la fiabilidad y la escalabilidad.
A la hora de utilizar un sistema de colas de mensajes que requieren la transmisión de datos a tiempo real encontramos Apache Kafka como una de nuestras mejores soluciones.
Según Apache Kafka, más del 80% de todas las empresas del 100 Fortune confían y usan Kafka. Puedes ver la lista en su página oficial
1. ¿Qué es Apache Kafka?¶
1.1 Definición¶
Apache Kafka es un sistema de transmisión de datos distribuido con capacidad de escalado horizontal y tolerante a fallos. Gracias a su alto rendimiento nos permite transmitir datos en tiempo real utilizando el patrón de mensajería publish/subscribe.

Kafka fue creado por LinkedIn y actualmente es un proyecto open source mantenido por Confluent, empresa que está bajo la administración de Apache. Apache Kafka está escrito en Java y Scala y se distribuye con licencia Apache 2.0
Proporciona la plataforma para publicar y suscribirse a flujos de eventos y permite almacenar estos eventos de una forma tolerante a fallos, escalable, persistente y con capacidad de replicación.
Además de almacenar estos eventos, la funcionalidad se extiende con la capacidad de procesarlos en tiempo real, a medida que se reciben de múltiples fuentes de datos. Kafka se integra con numerosas tecnologías, de esta forma, permite construir flujos de datos en tiempo real entre distintos sistemas y aplicaciones de manera desacoplada.
Por estas razones, Apache Kafka es muy popular en la arquitectura Big Data de muchas empresas en la actualidad. Debido a su arquitectura, consigue obtener una baja latencia, escalabilidad horizontal y absorber los picos de carga que pueden ocurrir en el sistema.
Hoy día, existe una gran variedad de fuentes generadoras de datos, tales como microservicios, bases de datos, todo tipo de IoTs, servidores web, registros de logs, análisis de rendimiento y monitoreo, etc. Kafka interviene en este punto para simplificar la arquitectura: vincula a todos los productores de datos con Kafka y, a su vez, Kafka se encarga de distribuir estos datos a los consumidores pertinentes, facilitando así un ecosistema de datos más ágil y desacoplado. Todo ello de una forma tolerante a fallos, escalable, persistente, con capacidad de replicación, baja latencia, escalabilidad horizontal y con capacidad de absorción de los picos de carga.

1.2 Plataforma de transmisión de eventos¶
Apache Kafka® es una plataforma de transmisión de eventos. ¿Qué significa eso?
Kafka combina tres capacidades clave para que pueda implementar sus casos de uso para la transmisión de eventos de un extremo a otro con una única solución:
- Para publicar (escribir) y suscribirse (leer) a transmisiones de eventos, incluida la importación/exportación continua de sus datos desde otros sistemas.
- Para almacenar transmisiones de eventos de forma duradera y confiable durante el tiempo que desee.
- Para procesar flujos de eventos a medida que ocurren o de forma retrospectiva.
Y toda esta funcionalidad se proporciona de forma distribuida, altamente escalable, elástica, tolerante a fallos y segura. Kafka se puede implementar en hardware básico, máquinas virtuales y contenedores, tanto en las instalaciones como en la nube. Puede elegir entre autoadministrar sus entornos Kafka y utilizar servicios totalmente administrados ofrecidos por una variedad de proveedores.
2. Elementos Kafka¶
Apache Kafka esta formado por una serie de conceptos y elementos que describiremos a continuación
2.1 Eventos¶
Un evento (También llamados registros o mensajes) registra el hecho de que "algo sucedió" en el mundo o en su negocio. Cuando lees o escribes datos en Kafka, lo haces en forma de eventos. Conceptualmente, un evento tiene una clave, un valor, una marca de tiempo(timestamp) y encabezados de metadatos opcionales. Aquí hay un evento de ejemplo:
- Clave del evento: "Alicia"
- Valor del evento: "Se realizó un pago de $200 a Bob"
- Marca de tiempo del evento: "25 de marzo de 2024 a las 14:06"
Eventos de ejemplo son transacciones de pago, actualizaciones de geolocalización desde teléfonos móviles, pedidos de envío, mediciones de sensores desde dispositivos IoT o equipos médicos, y mucho más.
2.2 Producers/Consumers¶
Los producers(productores) son aquellas aplicaciones cliente que publican (escriben) eventos en Kafka, y los consumers(consumidores) son aquellos que se suscriben (leen y procesan) estos eventos. En Kafka, los productores y los consumidores están completamente desacoplados y son agnósticos entre sí, lo cual es un elemento de diseño clave para lograr la alta escalabilidad por la que Kafka es conocido. Por ejemplo, los productores nunca necesitan esperar a los consumidores. Kafka ofrece varias garantías, como la capacidad de procesar eventos exactamente una vez (events exactly-once).

2.3 Topics¶
Los eventos se organizan y almacenan de forma duradera en topics (temas). De manera muy simplificada, un topic es similar a una carpeta en un sistema de archivos y los eventos son los archivos en esa carpeta. Un nombre de topic de ejemplo podría ser "pagos/payments". Los topics en Kafka son siempre multiproductor y multisuscriptor: un topic puede tener cero, uno o muchos productores que le escriban eventos, así como cero, uno o muchos consumidores que se suscriban a estos eventos.
Los eventos de un topic se pueden leer tantas veces como sea necesario; a diferencia de los sistemas de mensajería tradicionales, los eventos no se eliminan después de su consumo. En su lugar, se define durante cuánto tiempo Kafka debe retener sus eventos a través de una configuración por topic, después del cual se descartarán los eventos antiguos. El rendimiento de Kafka es efectivamente constante con respecto al tamaño de los datos, por lo que almacenar datos durante un período prolongado también podría ser correcto.
2.4 Partitions¶
Los topics están divididos, lo que significa que un topic se distribuye en varios "bucket(depósitos)" ubicados en diferentes brokers (veremos más adelante) de Kafka.

Esta ubicación distribuida de sus datos es muy importante para la escalabilidad porque permite que las aplicaciones cliente lean y escriban datos desde/hacia muchos brokers al mismo tiempo. Cuando se publica un nuevo evento en un topic, en realidad se agrega a una de las particiones del topic. Los eventos con la misma clave de evento (por ejemplo, un ID de cliente o de vehículo) se escriben en la misma partición, y Kafka garantiza que cualquier consumidor de una partición de topic determinada siempre leerá los eventos de esa partición exactamente en el mismo orden en que fueron escritos.

Este topic de ejemplo tiene cuatro particiones P1–P4. Dos clientes productores diferentes publican, independientemente uno del otro, nuevos eventos para el topic escribiendo eventos a través de la red en las particiones del topic. Los eventos con la misma clave (indicados por su color en la figura) se escriben en la misma partición. Tengamos en cuenta que ambos productores pueden escribir en la misma partición si corresponde.
2.5 Replication¶
Para que los datos sean tolerantes a fallos y tengan alta disponibilidad, cada topic se puede replicar, incluso entre regiones geográficas o centros de datos, de modo que siempre haya varios intermediarios que tengan una copia de los datos en caso de que algo salga mal, hacer mantenimiento a los brokers, etc. Una configuración de producción común es un factor de replica 3, es decir, siempre habrá tres copias de sus datos. Esta replica se realiza a nivel de particiones de topics.
3. Estructura¶
3.1 Topics, mensajes y particiones¶
Podemos crear tantos topics como queramos y estos serán identificados por su nombre. Los topics pueden dividirse en particiones. Cada elemento que se almacena en un topic se denomina evento. Éstos son inmutables y son añadidos a una partición determinada (específica definida por la clave del mensaje o mediante round-robin en el caso de ser nula) en el orden el que fueron enviados, es decir, se garantiza el orden dentro de una partición pero no entre ellas.
Cada evento dentro de una partición tiene un identificador numérico incremental llamado offset. Con este mecanismo, se puede identificar un evento con el nombre del topic que lo contiene, la partición y el offset.

3.2 Brokers y Topics¶
Un clúster de Kafka consiste en uno o más servidores denominados Kafka brokers. Cada broker es identificado por un ID (integer) y contiene ciertas particiones de un topic, no necesariamente todas.
Además, permite replicar y particionar dichos topics balanceando la carga de almacenamiento entre los brokers. Esta característica permite que Kafka sea tolerante a fallos y escalable.

El número de particiones debería ser siempre igual o mayor que el número de brokers. Si no, habrá brokers que no estén consumiendo mensajes.
3.3 Topic replication¶
Los topics deberán tener un factor de replica mayor a 1 (normalmente 2 y 3), de esta forma si un broker se cae, otro broker puede servir los datos.
En cada momento sólo puede haber un broker líder para cada partición de un topic. Sólo el líder puede recibir y servir datos de una partición, mientras tanto los otros brokers sincronizarán sus datos como seguidores. Si este se cae, se cambia el líder.

3.4 Grupos de consumidores¶
Los consumidores de Kafka son los clientes conectados suscritos a los topics que consumen los mensajes. Cada consumidor tiene asociado un grupo de consumidores. Kafka garantiza que cada mensaje sólo es leído por un consumidor de cada grupo.

En la imagen, un cluster de kafka está formado por dos brokers con 4 particiones con dos grupos de consumidores. En el grupo de consumidores A, Kafka asigna dos particiones a cada consumidor, mientras que en el grupo B, al haber 4 consumidores para 4 particiones, es posible asignar una partición a cada consumidor.
Video resumen
Si no te ha quedado claro, puedes ver el siguiente video que explica brevemente estos conceptos
3.5 Gestión de Sistemas distribuidos¶
Para ayudar en la gestión del cluster de Kafka en este sistema distribuido, existen 2 posibles soluciones:
Zookeeper¶
Entre los nodos de Zookeeper, se elige uno como líder. El resto de nodos del clúster se denominan seguidores, y uno de ellos es elegido como nuevo líder en el caso de que el líder actual tenga un fallo. Generalmente, los clusters de Zookeeper se despliegan con 3 ó 5 nodos.
Para gestionar un clúster de Kafka, Zookeeper almacena información del estado del clúster: detalles de los topics como el nombre, las particiones, las réplicas y los grupos de consumidores.

En el momento en el que Zookeeper detecta que uno de los brokers de Kafka está caído, realiza las siguientes acciones:
- Elige un nuevo broker para tomar el lugar del broker caído.
- Actualiza los metadatos para la distribución de carga de los productores y los consumidores para que no exista pérdida de servicio.
Tras estas acciones, se pueden volver a escribir y leer mensajes con normalidad.
KRaft¶
Se propone eliminar la dependencia de Kafka con Zookeeper reemplazándola con un mecanismo interno. KRaft funciona como un mecanismo de consenso para el protocolo de quorum. Kafka 3.3 marca este mecanismo como listo para producción. A partir de la versión 3.5 se proporciona un mecanismo de migración y se marca el soporte a Zookeeper como deprecated.

KRaft Mode desde 4.0.0
Desde la versión 4.0.0 de Kafka, KRaft mode es el modo por defecto. No existe la versión de Zookeeper
4. Kafka APIs¶
Una vez comentada la estructura de Apache Kafka, vamos a ver cómo se interactúa con él mediante APIs

4.1 Producer API¶
Producer API permite a las aplicaciones que pueda publicar/enviar mensajes a un topic de Kafka de forma asíncrona. Los productores automáticamente saben a qué broker y a qué partición deben escribir.
En el caso de que un broker se caiga, el productor sabe cómo recuperarse y seguirá escribiendo en el resto. Los productores envían los mensajes con clave (string, número, etc) o sin ella.
Si la clave es nula se enviarán en round robin entre los brokers. Si no es nula, todos los mensajes con esa clave se enviarán siempre a la misma partición.
Además, para confirmar que los mensajes han sido correctamente escritos en Kafka se podrá configurar la recepción de un ACK, ya sea por la recepción del mensaje por parte broker líder o por todos los brokers réplica:
- ACKs=0 (El productor no espera el acuse de recibo; posible pérdida de datos)
- ACKs=1 (El productor espera el reconocimiento del líder: pérdida de datos limitada)
- ACKs=all (El productor espera al líder + reconocimiento de réplicas; sin pérdida de datos)
La carga se balancea entre los brokers

4.2 Consumer API¶
Consumer API permite a las aplicaciones que puedan leer los mensajes desde un topic de Kafka y tratarlos desde nuestra aplicación. Podemos crear un consumidor o un grupo de consumidores.
La diferencia entre ellos es que el grupo de consumidores permite el consumo de mensaje de forma paralela, es decir, si un nodo de ese grupo consume un mensaje el resto no lo hará.
Esto es útil a la hora de tener más de una instancia de un microservicio corriendo en nuestro sistema. Cada consumidor del grupo de consumidores leerá de una partición exclusiva.
Si hay más consumidores que particiones, algunos de los consumidores estarán inactivos, para solucionar esto es recomendable tener el mismo número de particiones que de consumidores dentro de un grupo.
En el caso de que un broker de los que está leyendo se caiga, los consumidores saben cómo recuperarse. Los datos son leídos en orden dentro de cada partición pero no entre ellas. Kafka almacena los offsets de los grupos de consumidores cuando estos leen los datos.
Consumer offset¶
Los offsets son almacenados en un topic de Kafka denominado “consumer_offsets”. Cuando un consumidor de un grupo lee datos de Kafka, se actualiza el offset. Kafka almacena los offsets por el que va leyendo un grupo de consumidores, a modo de checkpoint. Si un consumidor se cae, cuando vuelva a ser levantado seguirá leyendo datos desde donde se quedó anteriormente.
Cuando un consumidor de un grupo ha procesado los datos que ha leído de Kafka, realizará un commit de sus offsets. Si el consumidor se cae, podrá volver a leer los mensajes desde el último offset sobre el que se realizó commit.
Garantías de Entrega¶
Como en otros sistemas distribuidos, Kafka tiene tres maneras de gestionar las garantías de entrega de los mensajes en sus protocolos:
- At-least-once: Garantiza que el mensaje siempre se entregará. Es posible que en caso de fallo se entregue varias veces, pero no se perderá ningún mensaje en el sistema.
- At-most-once: Garantiza que el mensaje se entregará una vez o no se entregará. Un mensaje nunca se entregará más de una vez.
- Exactly-once: Garantiza que todos los mensajes se van a entregar exactamente una vez, realizando el sistema las comprobaciones necesarias para que esto suceda.
4.3 Streams API¶
Streams API permiten transformar los streams de datos desde input topics hacia output topics. Podemos realizar modificaciones (ETL) sobre los mensajes (input topics) y escribir en otro topic (output topics) actuando como productor.
Combina la "simplicidad" del desarrollo de aplicaciones en lenguaje Java o Scala con los beneficios de la integración con el cluster de Kafka.
Los veremos con má profundidad más adelante.
4.4 Connect API¶
Se tratan de componentes listos para usar que nos permiten simplificar la integración entre sistemas externos y el cluster de Kafka. Podemos crear y ejecutar productores o consumidores reutilizables que conectan los topics de Kafka a las aplicaciones o sistemas externos, como por ejemplo una base de datos.
Además, algunos permiten realizar modificaciones simples sobre los mensajes que irán a los topics de Kafka. Se configuran mediante ficheros properties o a través de su API REST y entre sus características destacan ser distribuidos y escalables.
Los veremos con más profundidad más adelante.
4.5 Admin API¶
Admin API admite la gestión e inspección de topics, brokers, ACL y otros objetos de Kafka.
5. Quick Start¶
Usando algunas de las APIs
5.1 Obtener Kafka¶
Descarga la última versión de Kafka
5.2 Configura y arranca el Entorno de Kafka¶
- Debemos tener correctamente instalado y funcionando Java 17+
Versiones de Java
Debemos tener cuidado con las versiones de Java que usa cada aplicación, ya que Apache Hadoop, de momento, tiene soporte completo sólo para Java 8. Pero como añadimos la variable de entorno de JAVA_HOME
en etc/hadoop/hadoop-env.sh
, no debería entrar en conflicto si instalamos una nueva versión de Java para que Kafka funcione de manera correcta.
Recuerda que puedes usar el comando sudo update-alternatives -c java
para gestionar las diferentes versiones de Java del sistema.
- Movemos el directorio a nuestro directorio
/opt
para una correcta organización
- Accedemos al directorio de Kafka
- Apache Kafka puede ser iniciado usando Zookeeper o KRaft
- Arranca los servicios Zookeeper
- Abre otra terminal y ejecuta
-
Kafka se puede ejecutar usando el modo KRaft mediante scripts locales y archivos descargados o mediante la imagen de docker
-
Usando los archivos descargados
- Genera un cluster UUID
- Formatea el directorio de log
- Ejecuta el servidor Kafka
-
Usando la imagen de docker
- Obtenemos la imagen de docker
- Iniciamos el contenedor docker de kafka
Una vez que el servidor Kafka se haya iniciado exitosamente, tendremos un entorno Kafka básico ejecutándose y listo para usar.
5.3 Crea un Topic para Almacenar tus eventos¶
Antes de que podamos escribir nuestros primeros eventos, debemos crear un topic. Abre otra sesión de terminal y ejecute:
Argumentos en comandos de Kafka
Todos los scripts de línea de comandos de Kafka tienen opciones adicionales: para saber cuales ejecútalos sin ningún argumento para mostrar información de uso, por ejemplo: kafka-topics.sh
.
También podemos mostrar los detalles y partición de un topic:
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: b6ZzHS3aRT-GIkDz869d4g PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1Elr: LastKnownElr:
5.4 Escribe algún evento en el topic¶
Un cliente de Kafka se comunica con los brokers de Kafka a través de la red para escribir (o leer) eventos. Una vez recibidos, los brokers almacenarán los eventos de forma duradera y tolerante a fallos durante el tiempo que sea necesario, incluso para siempre.
Ejecutamos el cliente productor (Producer API) para escribir algunos eventos en el topic. De forma predeterminada, cada línea que ingresemos dará como resultado que se escriba un evento separado en el topic.
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>Este es mi primer evento
>Este es mi segundo evento
>Módulo Big Data Aplicado
>IES Gran Capitán
Podemos parar el cliente en cualquier momento con Ctrl-C
5.5 Lee los eventos¶
Abre otra sesión de terminal y ejecute el cliente consumidor (Consumer API) de consola para leer los eventos que acabamos de crear:
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
Este es mi primer evento
Este es mi segundo evento
Módulo Big Data Aplicado
IES Gran Capitán

6. Ejemplo1. Python en Kafka¶
En principio, Kafka sólo tiene soporte para crear aplicaciones en lenguaje Java y Scala. Pero gracias a la librería Kafka-python, podemos crear aplicaciones en Python para ser usadas con Kafka. Para ello vamos a crear el productor a través de KafkaProducer y el consumidor a través de KafkaConsumer
Elaboraremos una pequeña aplicación para implementar un sistema de registro de eventos de sensores. En este escenario, el productor enviará eventos (simulados) que contienen lecturas de sensores en formato JSON, y el consumidor procesará estos eventos para imprimir la información.
- Instalamos la librería
- Suponemos que tendremos la siguiente configuración kafka, que crearemos después
- Servidor en
localhost:9092
- Topic llamado
topic-python-sensor
-
Grupo de consumidores
group_consumer_sensor
-
Creamos el productor con KafkaProducer
from kafka import KafkaProducer
import json
from datetime import datetime
import random
import time
# Configuración del productor con serialización JSON
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
# Simulación de envío de eventos de sensores
while True:
sensor_event = {
'sensor_id': random.randint(1, 100),
'sensor_type': random.choice(['temperature', 'humidity', 'pressure']),
'value': round(random.uniform(20.0, 30.0), 2),
'timestamp': datetime.now().isoformat()
}
# Enviamos un mensaje
producer.send('topic-python-sensor', value=sensor_event)
print(f"Sent: {sensor_event}")
time.sleep(1)
- Creamos el productor con KafkaConsumer
from kafka import KafkaConsumer
import json
# Configuración del consumidor con deserialización JSON
consumer = KafkaConsumer(
'topic-python-sensor',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', # Para leer mensajes desde el inicio del topic
enable_auto_commit=True, # Para confirmaciones automáticas
group_id='group_consumer_sensor', # Identificador del grupo de consumidores
value_deserializer=lambda x: json.loads(x.decode('utf-8')) # Para deserializar los mensajes desde JSON
)
# Procesamiento de eventos de sensores
for message in consumer:
event = message.value
print(f"Received event from sensor {event['sensor_id']}: {event['sensor_type']} = {event['value']} at {event['timestamp']}")
- Iniciamos Kafka
#Genera un cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
#Si no te permite formatear (con el comando siguiente) es debido a que se mantienen los logs del ejemplo anterior. debes borrarlos
#sudo rm -r /tmp/kraft-combined-logs/
#Formatea el directorio de log
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
#Ejecuta el servidor Kafka
bin/kafka-server-start.sh config/server.properties
- Creamos el topic sensores
bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 --topic topic-python-sensor --bootstrap-server localhost:9092
- Lanzamos el productor
- Lanzamos el consumidor

7. Kafka on KRaft(Kafka Raft Metadata mode)¶
Como hemos visto anteriormente, recordemos que desde la versión 3.5 se elimina la dependencia de Kafka con Zookeeper y se marca el soporte de Zookeeper como deprecated, pasándose a usar de manera estable el sistema interno KRaft para la gestión del cluster de Kafka en este sistema distribuido.

El modo KRaft utiliza un nuevo servicio de controlador de quorum en Kafka que reemplaza al controlador anterior y utiliza una variante basada en eventos del protocolo de consenso Raft.
Esto nos trae ciertas consideraciones a tener en cuenta (además de otras ventajas):
- Kafka no necesita un sistema de terceros para la gestión del sistema distribuido.
- Todos los nodos del cluster son nodos Kafka, que tomarán roles diferentes dependiendo de su función en el cluster en Kraft mode:
controller
,broker
(como puedes ver en la figura) yserver
.
Por tanto, a la hora de configurar un cluster Kafka, estos roles de cada nodo hay que configurarlos para un correcto diseño de nuestro sistema.
Dynamic KRaft Quorums
Los controladores en KRaft son nodos Kafka que utilizan el algoritmo de consenso Raft para elegir un líder (controlador activo) y replicar los metadatos del clúster. Antes de Kafka v3.9.0, los clusters basados en KRaft sólo permitían configuraciones de quórum estáticas, en las que el conjunto de nodos controladores (también conocidos como votantes) es fijo y no puede cambiarse sin reiniciar.
Kafka v3.9.0 introduce soporte para quórum dinámico con KIP-853. Esta es una de las características que faltaban para alcanzar la paridad de características con los clústeres basados en ZooKeeper. A partir de ahora, el quórum dinámico debería ser la forma preferida de configurar un clúster basado en KRaft, ya que no requiere tiempo de inactividad del clúster.
El escalado dinámico del clúster es importante para una serie de casos de uso:
- Escalado: Queremos escalar el número de controladores añadiendo o quitando un controlador (esto es bastante raro en la práctica, ya que el quórum del controlador se establece una vez y rara vez se cambia).
- Sustitución: Queremos sustituir un controlador debido a un fallo de disco o hardware.
- Migración: Necesitamos migrar el clúster de máquinas antiguas a nuevas, o cambiar la arquitectura de KRaft (por ejemplo, pasar de controladores dedicados a nodos de modo combinado).
Static versus Dynamic KRaft Quorums¶
Por tanto, ahora hay dos formas de ejecutar KRaft: la antigua, utilizando quórums de controladores estáticos, y la nueva, utilizando quórums de controladores dinámicos KIP-853.
Cuando se utiliza un quórum estático, el archivo de configuración para cada broker y controlador debe especificar los IDs, nombres de host y puertos de todos los controladores en controller.quorum.voters
.
En cambio, cuando se utiliza un quórum dinámico, se debe configurar controller.quorum.bootstrap.servers
. No es necesario que esta clave de configuración contenga todos los controladores, pero debe contener tantos como sea posible para que todos los servidores puedan localizar el quórum. En otras palabras, su función es muy parecida a la configuración bootstrap.servers utilizada por los clientes Kafka.
Desde la versión 4.0.0 de kafka, Dynamic Quorum es la configuración por defecto que debemos usar, y por tanto, es la que usaremos.
7.1 Roles¶
Cuando opera Apache Kafka® en modo KRaft, debemos configurar la propiedad process.roles
. Esta propiedad especifica si el servidor(nodo que forma parte del cluster Kafka) actúa como controller
, broker
(como puedes ver en la figura) o combinado, aunque actualmente el modo combinado no es compatible con cargas de trabajo de producción.
A) Controller¶
- Role: El controller es un componente crítico en Kafka que gestiona el estado del clúster. Esto incluye, pero no se limita a, la asignación de particiones a los brokers y la gestión de los cambios en la membresía del clúster. En el modo KRaft, el controlador también gestiona el quorum y los metadatos del clúster sin depender de Zookeeper.
- Configuración: Cuando levantas un nodo en modo controller en KRaft, específicas
process.roles=controller
en la configuración. Esto indica que el nodo debe actuar exclusivamente como un controlador. - Comunicación: El controlador se comunica con los brokers para coordinar la gestión del clúster. Se configura un conjunto de votantes del quorum dinámico(
controller.quorum.bootstrap.servers
) para participar en el consenso Raft.
En el modo KRaft, se seleccionan servidores Kafka específicos para que sean controladores, almacenando metadatos para el clúster en el registro de metadatos, y se seleccionan otros servidores para que sean intermediarios. Los servidores seleccionados para ser controladores participarán en el quorum de metadatos. Cada controlador es activo o de reserva activa para el controlador activo actual.
En un entorno de producción, el quorum del controlador se implementará en varios nodos. A esto se le llama ensemble. Ensemble es un conjunto de 2n + 1 controladores donde n es cualquier número mayor que 0. El número impar de controladores permite que el quorum de controladores realice elecciones mayoritarias para el liderazgo. En cualquier momento dado, puede haber hasta n servidores fallidos en un conjunto y el clúster mantendrá el quorum. Por ejemplo, con 3 controladores, el clúster puede tolerar un fallo de controlador. Si en algún momento se pierde el quorum, el clúster dejará de funcionar. Actualmente, no se recomiendan más de 3 controladores en entornos críticos.
B) Broker¶
- Role: Los brokers son los nodos que realmente almacenan y sirven datos (es decir, los mensajes de Kafka) a los productores y consumidores. En un clúster de Kafka, puedes tener muchos brokers para escalar horizontalmente el rendimiento y la capacidad de almacenamiento.
- Configuración: Levantar un nodo en modo broker en KRaft implica configurar
process.roles=broker
en las propiedades del nodo. Esto lo designa para servir como un broker, manejando las solicitudes de los clientes y el almacenamiento de los datos. - Comunicación: Los brokers se registran con el controlador y siguen sus instrucciones para la asignación de particiones y otras tareas de gestión del clúster. Los brokers no participan directamente en el consenso Raft; solo el controlador gestiona los metadatos del clúster.
C) Combinado¶
- Role: En algunos contextos, especialmente en entornos de desarrollo o pruebas, puedes querer que un nodo actúe tanto como controlador como broker. Esto simplifica la configuración y reduce la cantidad de recursos necesarios, ya que no es necesario ejecutar nodos separados para los roles de controlador y broker.
- Configuración: Para levantar un nodo en modo combinado (server), configuras
process.roles=broker,controller
en las propiedades del nodo. Esto indica que el nodo debe asumir ambos roles. - Comunicación: Como este nodo combina ambos roles, gestionará tanto los metadatos del clúster como el almacenamiento y la transmisión de los datos de los mensajes.
Resumen¶
- Controller: Gestiona el estado del clúster y los metadatos, no almacena ni sirve datos de los mensajes.
- Broker: Almacena y sirve datos de los mensajes, pero no gestiona los metadatos del clúster directamente.
- Combinado: Realiza ambas funciones, tanto la gestión de metadatos del clúster como el almacenamiento y servicio de los datos de los mensajes, útil para configuraciones simplificadas.
Al configurar un clúster Kafka en modo KRaft, es importante planificar el número y tipo de nodos según los requisitos de escala, rendimiento y alta disponibilidad de tu aplicación o sistema.
A continuación, vamos a ver algunos de las configuraciones más importantes y de uso más frecuente para una correcta configuración de nuestro cluster Kafka en Kraft mode teniendo en cuenta los posibles roles de cada uno de los servers que componen el cluster.
7.2 Server Basic¶
7.2.1 process.roles
¶
Como ya hemos visto, puede tener los siguientes valores: controller
, broker
,controller,broker
.
- Type: string
- Default:
- Importance: required for KRaft mode
7.2.2 node.id
¶
El identificador único de este servidor. Cada ID de nodo debe ser único en todos los servidores de un clúster en particular. No hay dos servidores que puedan tener el mismo ID de nodo independientemente de su valor de process.roles
. Este identificador reemplaza a broker.id
, que se utiliza cuando se opera en modo ZooKeeper.
- Type: int
- Default:
- Importance: required for KRaft mode
7.2.3 controller.quorum.bootstrap.servers
¶
Todos los servidores (controllers y brokers) en un clúster de Kafka deben establecer la propiedad controller.quorum.bootstrap.servers
, debido a que todos deben descubrir los votantes del quórum. Por tanto, debemos identificar a todos los controllers incluyéndolos en la lista que proporciona esta propiedad controller.quorum.bootstrap.servers
, separados por comas.
Además, todos los controladores deben estar enumerados. Cada controlador se identifica con su información de host
y puerto
con el siguiente formato: {host}:{port}
. Varias entradas estarán separadas por comas y pueden tener un aspecto similar al siguiente:
Por ejemplo, si un clúster Kafka tiene 3 controladores denominados controller1, controller2 y controller3, entonces el controller1 tendría la siguiente configuración:
process.roles=controller
node.id=1
listeners=CONTROLLER://controller1.example.com:9093
controller.quorum.bootstrap.servers=controller1.example.com:9093,controller2.example.com:9093,controller3.example.com:9093
controller.listener.names=CONTROLLER
- Type: string
- Default:
- Importance: required for KRaft mode
Cada broker y controller deben establecer la propiedad controller.quorum.bootstrap.servers
.
7.3 Socket Server Settings¶
7.3.1 listeners
¶
Los servidores Kafka admiten la escucha (listening) de conexiones en varios puertos. Esto se configura a través de la propiedad listeners
en la configuración del servidor, que acepta una lista separada por comas de los listeners habilitados. Debe definirse al menos una escucha en cada servidor. El formato de cada listener definido en listeners es el siguiente: {LISTENER_NAME}://{hostname}:{port}
.
LISTENER_NAME
suele ser un nombre descriptivo que define el propósito de la escucha. Por ejemplo, muchas configuraciones utilizan un listener independiente para el tráfico de clientes, por lo que podrían referirse al listener correspondiente como CLIENT
en la configuración:
Hostname config
Si no se configura el nombre del host, este será igual al valor de java.net.InetAddress.getCanonicalHostName()
con el nombre de listener PLAINTEXT
, y el puerto 9092
. Por ejemplo, ocurre cuando dejamos la configuración listeners=PLAINTEXT://:9092
Para más información sobre la seguridad de la comunicación entre las conexiones de escucha, consulta la documentación oficial
- Type: string with the format
listener_name://host_name:port
- Default: If not configured, the host name will be equal to the value of
java.net.InetAddress.getCanonicalHostName()
, withPLAINTEXT
listener name, and port9092
. Example:listeners=PLAINTEXT://your.host.name:9092
- Importance: high
Por otro lado, el protocolo de seguridad de cada oyente (listener) se define en una configuración independiente: listener.security.protocol.map
.
7.3.2 listener.security.protocol.map
¶
Como hemos dicho, el protocolo de seguridad de cada oyente se define en esta configuración. El valor es una lista separada por comas de cada listener mapeado a su protocolo de seguridad. Por ejemplo, la siguiente configuración de valores especifica que el listener CLIENT utilizará SSL mientras que el listener BROKER utilizará plaintext.
A continuación se indican las opciones posibles (sin distinguir mayúsculas de minúsculas) para el protocolo de seguridad:
- PLAINTEXT
- SSL
- SASL_PLAINTEXT
- SASL_SSL
El protocolo de texto plano (PLAINTEXT) no proporciona seguridad y no requiere ninguna configuración adicional. Puedes mirar la documentación oficial que indica como configurar los demás protocolos.
Si cada oyente (listener) necesario utiliza un protocolo de seguridad distinto, también es posible utilizar el nombre del protocolo de seguridad como nombre del oyente en los oyentes. Usando el ejemplo anterior, podríamos omitir la definición de los escuchadores CLIENT y BROKER usando la siguiente definición:
Sin embargo, recomendamos a los usuarios que proporcionen nombres explícitos para los escuchadores (listeners) ya que lo hace más descriptivo e intuitivo para su configuración.
7.3.3 controller.listener.names
¶
Una lista separada por comas de entradas listener_name
para los listeners
utilizados por el controller. En un nodo con process.roles=broker
, sólo el primer listener de la lista será utilizado por el broker. Para controladores KRaft en modo aislado o combinado, el nodo escuchará como controlador KRaft en todos los listeners que estén listados para esta propiedad, y cada uno debe aparecer en la propiedad listeners
.
- Type: string
- Default: null
- Importance: required for KRaft mode.
7.4 Log Basics¶
7.4.1 log.dir
¶
Directorio en el que se guardan los datos de registro (complementario por la propiedad log.dirs
)
- Type: string
- Default:
/tmp/kafka-logs
- Importance: high
7.4.2 log.dirs
¶
Lista separada por comas de los directorios donde se almacenan los datos de registro. Si no se define, se utiliza el valor de log.dir
- Type: string
- Default: null
- Importance: high
7.4.3 num.partitions
¶
El número por defecto de particiones de registro por topic. Más particiones permiten un mayor paralelismo para el consumo, pero esto también resultará en más archivos a través de los brokers. Establece esta configuración sólo para los brokers. Esto será ignorado por los controladores KRaft.
- Type: int
- Default: 1
- Importance: medium
7.4.4 metadata.log.dir
¶
Se utiliza para especificar dónde se aloja el registro de metadatos de los clusters en modo KRaft. Si no se establece, el registro de metadatos se coloca en el primer directorio de registro especificado en la propiedad log.dirs
.
- Type: string
- Default: null
- Importance: high
7.5 Otras configuraciones¶
Para consultar el resto de propiedades puedes consultar la documentación oficial
7.6 Kraft Storage - Provisioning Nodes¶
El comando bin/kafka-storage.sh random-uuid
se utilizaa para generar un ID de clúster para tu nuevo clúster. Este ID de clúster debe utilizarse al formatear cada servidor del clúster con el comando bin/kafka-storage.sh format
.
Esto es diferente de cómo Kafka ha operado en el pasado. Anteriormente, Kafka formateaba automáticamente los directorios de almacenamiento vacíos y también generaba automáticamente un nuevo ID de clúster. Uno de los motivos de este cambio es que el formateo automático a veces puede ocultar una condición de error. Esto es particularmente importante para el registro de metadatos mantenido por los servidores controllers y brokers. Si la mayoría de los controllers pudieran comenzar con un directorio de registro vacío, podría elegirse un líder con datos comprometidos faltantes.
7.6.1 Bootstrap a Standalone Controller¶
El método recomendado para crear un nuevo cluster de controllers KRaft es arrancarlo con un votante (parámetro --standalone
) y añadir dinámicamente el resto de controllers. El arranque del primer controlador se puede hacer con el siguiente comando CLI:
bin/kafka-storage.sh format --cluster-id <CLUSTER_ID> --standalone --config config/controller.properties
- creará un archivo meta.properties en metadata.log.dir con un directory.id generado aleatoriamente.
- creará una instantánea en 00000000000000000000-0000000000.checkpoint con los registros de control necesarios (KRaftVersionRecord y VotersRecord) para que este nodo Kafka sea el único votante para el quórum.
7.6.2 Bootstrap with Multiple Controllers¶
La partición de metadatos del clúster de controllers KRaft también se puede arrancar con más de un votante. Esto se puede hacer utilizando el indicador --initial-controllers
:
CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_0_UUID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_1_UUID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_2_UUID="$(bin/kafka-storage.sh random-uuid)"
# In each controller execute
bin/kafka-storage.sh format --cluster-id ${CLUSTER_ID} \
--initial-controllers "0@controller-0:1234:${CONTROLLER_0_UUID},1@controller-1:1234:${CONTROLLER_1_UUID},2@controller-2:1234:${CONTROLLER_2_UUID}" \
--config config/controller.properties
Este comando es similar a la versión standalone, pero la instantánea en 00000000000000000000-0000000000.checkpoint contendrá en su lugar un VotersRecord que incluye información para todos los controllers especificados en --initial-controllers
. Es importante que el valor de esta flag sea el mismo en todos los controllers con el mismo id de cluster. En la descripción de la réplica 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 es el id de la réplica, 3Db5QLSqSZieL3rJBUUegA es el id del directorio de la réplica, controller-0 es el host de la réplica y 1234 es el puerto de la réplica.
Consulta la documentación oficial para saber más sobre como añadir o eliminar controllers en un cluster Kafka existente.
7.6.3 Formateando nuevos Brokers y Controllers¶
A la hora de aprovisionar nuevos nodos broker y controller que queramos añadir a un cluster Kafka existente, utilizaremos el comando kafka-storage.sh format
con el parámetro --no-initial-controllers
.
bin/kafka-storage.sh format --cluster-id <CLUSTER_ID> --config config/server.properties --no-initial-controllers
7.7 Tools for debugging KRaft mode¶
Vamos a ver algunos de ellos, los de uso más frecuente.
7.7.1 Describe runtime status¶
Puede describir el estado en tiempo de ejecución de la partición de metadatos del clúster utilizando kafka-metadata-quorum
Output might look like the following:
ClusterId: fMCL8kv1SWm87L_Md-I2hg
LeaderId: 3002
LeaderEpoch: 2
HighWatermark: 10
MaxFollowerLag: 0
MaxFollowerLagTimeMs: -1
CurrentVoters: [{"id": 3000, "directoryId": "ILZ5MPTeRWakmJu99uBJCA", "endpoints": ["CONTROLLER://localhost:9093"]},
{"id": 3001, "directoryId": "b-DwmhtOheTqZzPoh52kfA", "endpoints": ["CONTROLLER://localhost:9094"]},
{"id": 3002, "directoryId": "g42deArWBTRM5A1yuVpMCg", "endpoints": ["CONTROLLER://localhost:9095"]}]
CurrentObservers: [{"id": 0, "directoryId": "3Db5QLSqSZieL3rJBUUegA"},
{"id": 1, "directoryId": "UegA3Db5QLSqSZieL3rJBU"},
{"id": 2, "directoryId": "L3rJBUUegA3Db5QLSqSZie"}]
7.7.2 Dump Log Tool¶
Podemos utilizar kafka-dump-log
para depurar los segmentos de registro y las instantáneas del directorio de metadatos del clúster. La herramienta escaneará los archivos proporcionados y decodificará los registros de metadatos. Por ejemplo, el siguiente comando decodifica e imprime los registros del primer segmento de registro:
bin/kafka-dump-log.sh --cluster-metadata-decoder --files metadata_log_dir/__cluster_metadata-0/00000000000000000000.log
7.7.3 Inspect the metadata partition¶
Podemos utilizar kafka-metadata-shell
para inspeccionar de forma interactiva el clúster de metadatos. El siguiente ejemplo muestra cómo abrir el shell.
bin/kafka-metadata-shell.sh --snapshot metadata_log_dir/__cluster_metadata-0/00000000000000000000.checkpoint
>> ls /
brokers local metadataQuorum topicIds topics
>> ls /topics
foo
>> cat /topics/foo/0/data
{
"partitionId" : 0,
"topicId" : "5zoAlv-xEh9xRANKXt1Lbg",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : null,
"addingReplicas" : null,
"leader" : 1,
"leaderEpoch" : 0,
"partitionEpoch" : 0
}
>> exit
8. Ejemplo 2. Cluster Kafka¶
Imagina que estamos construyendo un sistema para un banco que necesita procesar eventos de transacciones financieras (por ejemplo, transferencias de dinero entre cuentas) en tiempo real. El sistema debe ser capaz de manejar un alto volumen de transacciones de forma eficiente y confiable. Utilizaremos Kafka para construir la infraestructura de mensajería que soportará este procesamiento.
Configuración cluster Kafka
Vamos a configurar el cluster de Kafka usando mode KRaft. Siguiendo las consideraciones del punto anterior y teniendo en cuenta que estamos en un entorno de pruebas y no de producción, para este ejemplo de concepto, vamos a configurar un cluster con 3 servidores (en un sólo nodo o máquina) con 1 controller y 2 brokers
8.1 Requisitos¶
Vamos a configurar un cluster las siguientes características:
- 1 topic con 2 particiones
- 1 factor de replica de 2
- 1 nodo con 2 brokers y 1 controller
8.2 Configuración del Clúster de Kafka¶
- Consideraciones previas:
- Vamos a establecer todos los archivos de configuración en una carpeta de ejemplo llamada ejemplo2, que en mi caso estará alojada en
/opt/kafka/ejemplo2
- Dado que todas las instancias se ejecutarán en el mismo nodo, es crucial asignar puertos únicos y directorios de log para cada broker y el controller.
- Configuración:
- Para el controller, debemos usar como base la configuración de propiedades de controller de kafka (KRaft mode) que se encuentran
config/controller.properties
-
Para cada broker, necesitaremos crear un archivo de configuración por separado. Para ello debemos usar como base la configuración de propiedades de brokers de kafka que se encuentran
config/broker.properties
-
Creamos los directorios necesarios para nuestro
ejemplo2
- Hacemos 2 y 1 copia de los ficheros correspondientes de configuración para cada uno
cp config/controller.properties /opt/kafka/ejemplo2/config/controller1.properties
cp config/broker.properties /opt/kafka/ejemplo2/config/broker1.properties
cp config/broker.properties /opt/kafka/ejemplo2/config/broker2.properties
- Asignamos la configuración al controller
# Server Basics
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9093
# Socket Server Settings
listeners=CONTROLLER://:9093
advertised.listeners=CONTROLLER://localhost:9093
controller.listener.names=CONTROLLER
# Log Basics
log.dirs=/opt/kafka/ejemplo2/logs/controller1
- Asignamos la siguiente configuración para cada broker
- Iniciamos Kafka
#Genera un cluster UUID
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
echo $KAFKA_CLUSTER_ID
#Formateamos los directorios de log
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID --standalone -c /opt/kafka/ejemplo2/config/controller1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo2/config/broker1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo2/config/broker2.properties
- Iniciamos los server(1 controller y 2 brokers) cada uno en una terminal
#Ejecuta el servidor Kafka
bin/kafka-server-start.sh /opt/kafka/ejemplo2/config/controller1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo2/config/broker1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo2/config/broker2.properties
8.3 Creación del Topic¶
- Creamos el topic con factor de replica 2 y 2 particiones. El topic debe conectarse a un broker.
bin/kafka-topics.sh --create --topic financial-transactions --bootstrap-server localhost:9094 --replication-factor 2 --partitions 2
- Podemos ver la descripción del topic creado
Topic: financial-transactions TopicId: wQ94xoPJRVCkm9UdaaOKCw PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: financial-transactions Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: financial-transactions Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2 Elr: LastKnownElr:
8.4 Productor y Consumidor¶
- Creamos el productor con KafkaProducer
from kafka import KafkaProducer
import json
import time
import random
producer = KafkaProducer(bootstrap_servers=['localhost:9094', 'localhost:9095'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
while True:
transaction = {
"source_account": random.randint(1000, 1999),
"destination_account": random.randint(2000, 2999),
"amount": round(random.uniform(10.00, 1000.00), 2),
"currency": "EUR",
"timestamp": time.time()
}
producer.send('financial-transactions', value=transaction)
print(f"Sent transaction: {transaction}")
time.sleep(random.randint(1, 5))
- Creamos el consumidor con KafkaConsumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('financial-transactions',
bootstrap_servers=['localhost:9094', 'localhost:9095'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
transaction = message.value
print(f"Received transaction: {transaction}")
8.5 Ejecución de la aplicación¶
- Lanzamos el productor
- Lanzamos el consumidor

9. Ejemplo 3. Cluster Kafka (Bootstrap with Multiple Controllers)¶
Para aprender a realizar un cluster con varios controller en el nuevo modo Dynamic Quorum, vamos a realizar el mismo ejemplo anterior, pero en este caso, levantando 3 controller y 3 brokers, para así entender las diferencias con la versión anterior y asegurarnos de haber aprendido correctamente el uso de Kafka.
9.1 Requisitos¶
Vamos a configurar un cluster las siguientes características:
- 1 topic con 3 particiones
- 1 factor de replica de 2
- 1 nodo con 3 brokers y 3 controllers
9.2 Configuración del Clúster de Kafka¶
- Consideraciones previas:
- Vamos a establecer todos los archivos de configuración en una carpeta de ejemplo llamada ejemplo3, que en mi caso estará alojada en
/opt/kafka/ejemplo3
- Dado que todas las instancias se ejecutarán en el mismo nodo, es crucial asignar puertos únicos y directorios de log para cada broker y el controller.
- Configuración:
- Para el controller, debemos usar como base la configuración de propiedades de controller de kafka (KRaft mode) que se encuentran
config/controller.properties
-
Para cada broker, necesitaremos crear un archivo de configuración por separado. Para ello debemos usar como base la configuración de propiedades de brokers de kafka que se encuentran
config/broker.properties
-
Creamos los directorios necesarios para nuestro
ejemplo3
- Hacemos 3 copias de los ficheros correspondientes de configuración para cada uno
cp config/controller.properties /opt/kafka/ejemplo3/config/controller1.properties
cp config/controller.properties /opt/kafka/ejemplo3/config/controller2.properties
cp config/controller.properties /opt/kafka/ejemplo3/config/controller3.properties
cp config/broker.properties /opt/kafka/ejemplo3/config/broker1.properties
cp config/broker.properties /opt/kafka/ejemplo3/config/broker2.properties
cp config/broker.properties /opt/kafka/ejemplo3/config/broker3.properties
- Como estamos en el mismo nodo, le vamos a dar los siguiente puertos y ids a cada server:
- controller1: puerto 9096, nodo 1
- controller2: puerto 9097, nodo 2
- controller3: puerto 9098, nodo 3
- broker1: puerto 9092, nodo 5
- broker2: puerto 9093, nodo 6
- broker3: puerto 9094, nodo 7
- Asignamos la configuración al controller
# Server Basics
process.roles=controller
node.id=1
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=CONTROLLER://:9096
advertised.listeners=CONTROLLER://localhost:9096
controller.listener.names=CONTROLLER
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/controller1
# Server Basics
process.roles=controller
node.id=2
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=CONTROLLER://:9097
advertised.listeners=CONTROLLER://localhost:9097
controller.listener.names=CONTROLLER
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/controller2
# Server Basics
process.roles=controller
node.id=3
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=CONTROLLER://:9098
advertised.listeners=CONTROLLER://localhost:9098
controller.listener.names=CONTROLLER
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/controller3
- Asignamos la siguiente configuración para cada broker
# Server Basics
process.roles=broker
node.id=5
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/broker1
# Server Basics
process.roles=broker
node.id=6
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=PLAINTEXT://localhost:9093
advertised.listeners=PLAINTEXT://localhost:9093
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/broker2
# Server Basics
process.roles=broker
node.id=7
controller.quorum.bootstrap.servers=localhost:9096,localhost:9097,localhost:9098
# Socket Server Settings
listeners=PLAINTEXT://localhost:9094
advertised.listeners=PLAINTEXT://localhost:9094
# Log Basics
log.dirs=/opt/kafka/ejemplo3/logs/broker3
- Generamos los uuid
#Generamos un cluster UUID y los IDs de los controllers
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_1_UUID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_2_UUID="$(bin/kafka-storage.sh random-uuid)"
CONTROLLER_3_UUID="$(bin/kafka-storage.sh random-uuid)"
#Formateamos los directorios de log indicando los controllers iniciales
#controller1
bin/kafka-storage.sh format --cluster-id ${KAFKA_CLUSTER_ID} \
--initial-controllers "1@localhost:9096:${CONTROLLER_1_UUID},2@localhost:9097:${CONTROLLER_2_UUID},3@localhost:9098:${CONTROLLER_3_UUID}" \
--config /opt/kafka/ejemplo3/config/controller1.properties
#controller2
bin/kafka-storage.sh format --cluster-id ${KAFKA_CLUSTER_ID} \
--initial-controllers "1@localhost:9096:${CONTROLLER_1_UUID},2@localhost:9097:${CONTROLLER_2_UUID},3@localhost:9098:${CONTROLLER_3_UUID}" \
--config /opt/kafka/ejemplo3/config/controller2.properties
#controller3
bin/kafka-storage.sh format --cluster-id ${KAFKA_CLUSTER_ID} \
--initial-controllers "1@localhost:9096:${CONTROLLER_1_UUID},2@localhost:9097:${CONTROLLER_2_UUID},3@localhost:9098:${CONTROLLER_3_UUID}" \
--config /opt/kafka/ejemplo3/config/controller3.properties
- Damos formato a los logs de los brokers
#Formateamos los directorios de log de los brokers
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo3/config/broker1.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo3/config/broker2.properties
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c /opt/kafka/ejemplo3/config/broker3.properties
- Iniciamos los servers(3 controllers y 3 brokers) cada uno en una terminal
#Ejecutamos los servidores Kafka (uno en cada terminal)
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/controller1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/controller2.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/controller3.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/broker1.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/broker2.properties
bin/kafka-server-start.sh /opt/kafka/ejemplo3/config/broker3.properties
Metadata Quorum Tool
Podemos ver la describir el estado en tiempo de ejecución de la partición de metadatos del clúster con:
Vemos como se encuentra la configuración del Quorum de los controllers además de la lista de brokers
ClusterId: pOScImhcQ2-exV4PwTonuA
LeaderId: 2
LeaderEpoch: 27
HighWatermark: 708
MaxFollowerLag: 2
MaxFollowerLagTimeMs: 966
CurrentVoters: [{"id": 1, "directoryId": "UIzTVxjnTIOEH3WW5VdOAw", "endpoints": ["CONTROLLER://localhost:9096"]}, {"id": 2, "directoryId": "187-ElH6RJqEc4QftkoSqQ", "endpoints": ["CONTROLLER://localhost:9097"]}, {"id": 3, "directoryId": "XGGfTEh6RaaXlpQBK-3dug", "endpoints": ["CONTROLLER://localhost:9098"]}]
CurrentObservers: [{"id": 6, "directoryId": "4D5AbljfLEOno0wnk-N7_w"}, {"id": 5, "directoryId": "e4KmRqCr0wweYkHqWn2qkw"}, {"id": 7, "directoryId": "cxJ2pf7x8mi93Oi0PrKKUA"}]
9.3 Creación del Topic¶
- Creamos el topic con factor de replica 3 y 3 particiones. El topic debe conectarse a un broker.
bin/kafka-topics.sh --create --topic financial-transactions --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3
- Podemos ver la descripción del topic creado
Topic: financial-transactions TopicId: 4WfNcmv3SU-Kh7mMmWRaTw PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: financial-transactions Partition: 0 Leader: 6 Replicas: 6,7,5 Isr: 6,7,5 Elr: LastKnownElr:
Topic: financial-transactions Partition: 1 Leader: 7 Replicas: 7,5,6 Isr: 7,5,6 Elr: LastKnownElr:
Topic: financial-transactions Partition: 2 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7 Elr: LastKnownElr:
9.4 Productor y Consumidor¶
- Creamos el productor con KafkaProducer
from kafka import KafkaProducer
import json
import time
import random
producer = KafkaProducer(bootstrap_servers=['localhost:9094', 'localhost:9095'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
while True:
transaction = {
"source_account": random.randint(1000, 1999),
"destination_account": random.randint(2000, 2999),
"amount": round(random.uniform(10.00, 1000.00), 2),
"currency": "EUR",
"timestamp": time.time()
}
producer.send('financial-transactions', value=transaction)
print(f"Sent transaction: {transaction}")
time.sleep(random.randint(1, 5))
- Creamos el consumidor con KafkaConsumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer('financial-transactions',
bootstrap_servers=['localhost:9094', 'localhost:9095'],
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
transaction = message.value
print(f"Received transaction: {transaction}")
9.5 Ejecución de la aplicación¶
- Lanzamos el productor
- Lanzamos el consumidor
