Saltar a contenido

UD 6 - Apache Spark - Get Started

Vamos a realizar algunos ejemplos de prueba de uso de Spark con PySpark en nuestro cluster ya configurado

Para asegurar una comprensión profunda de la arquitectura, realizaremos los ejercicios siguiendo dos metodologías distintas. Elige la que corresponda a tu entorno de despliegue o prueba ambas para contrastar:

  • 🅰️ Enfoque Clásico (Máquinas Virtuales): Trabajaremos vía SSH dentro del nodo Master, moviendo ficheros en el sistema de archivos local de Linux y usando HDFS.
  • 🅱️ Enfoque Moderno (Docker + Spark Connect): Trabajaremos desde nuestro IDE (VS Code) en el Host, conectándonos remotamente al clúster y usando HDFS exclusivamente.

1. Ejemplo 1. Eventos tienda virtual

Info

Puedes consultar todos los métodos usados en la documentación oficial

  1. Obtenemos un dataset. Para este ejemplo, vamos a usar el siguiente dataset de kaggle. Esta dataset contiene datos de comportamiento durante 5 meses (octubre de 2019 – febrero de 2020) de una tienda online de cosmética mediana.

Cada fila del archivo representa un evento. Todos los eventos están relacionados con productos y usuarios. Cada evento es como una relación de muchos a muchos entre productos y usuarios. Obsérvalo en la descripción del propio dataset en kaggle.

  1. Descargamos el dataset. Esta compuesto por 5 archivos csv, cada uno correspondiente a un mes.
curl -L -o ecommerce-events-history-in-cosmetics-shop.zip  https://www.kaggle.com/api/v1/datasets/download/mkechinov/ecommerce-events-history-in-cosmetics-shop
  1. Lo copiamos en nuestro cluster
scp ecommerce-events-history-in-cosmetics-shop.zip hadoop@192.168.56.10:/home/hadoop
  1. Lo movemos al directorio que queramos. En mi caso /opt/spark/ejemplos/ejemplo1
mkdir -p /opt/spark/ejemplos/ejemplo1
mv ecommerce-events-history-in-cosmetics-shop.zip /opt/spark/ejemplos/ejemplo1/
cd /opt/spark/ejemplos/ejemplo1/
  1. Descomprimimos el archivo
unzip ecommerce-events-history-in-cosmetics-shop.zip
  1. Podéis observar como son más de 2 GB de datos
ls -lh
  1. Lógica a aplicar. En este caso vamos a usar la shell y explicar paso a paso los comandos a realizar.

  2. Ejecutamos pyspark

  3. Creamos un DataFrame donde leemos los ficheros fuentes a través de RDDs

  • header='True' => Los ficheros que vamos a cagar tienen cabecera. La primera linea no es información
  • inferSchema='True' => Inserta el esquema, la información de la estructura de datos(nombres de columna y tipos de datos de columnas). Automáticamente nos lo carga y no tenemos que ponerle nombres a las columnas
  • csv => Tipo de dato a cargar y la ruta
df = spark.read.options(header='True', inferSchema='True').csv("file:///opt/spark/ejemplos/ejemplo1/*.csv")
  1. Obtenemos el número de registros
df.count()
  1. Imprimimos el esquema (ya recogido de inferSchema='True')
df.printSchema()
  1. Ya podemos empezar a hacer Big Data y Business Intelligence. Por ejemplo, podemos obtener información para ayudar a mejorar nuestras ventas. Como cuáles son nuestros productos estrella, cuáles son los artículos que se vendan más y cruzarlos con la fecha o estación del tiempo de venta para establecer estrategias nuevas,...

En este caso, vamos a sacar todos los artículos que se han vendido alguna vez conjuntamente con otro artículo. Si un usuario compra un artículo ponerle al lado algunos otros artículos que sabemos que otros usuarios han comprado con este. Así intentamos subir las compras.

  1. Investigamos los datos que tenemos en nuestro DataFrame

  2. Vemos los tipos de eventos que tenemos en cada compra

df.select("event_type").distinct().show()
  1. También las marcas, por ejemplo
df.select("brand").distinct().show()
  1. Sólo me interesan los eventos de los productos que se han añadido al carrito
df.select("product_id").filter("event_type='cart'").show()
  1. Sólo el primero
df.select("product_id").filter("event_type='cart'").first()
  1. Como ejemplo de concepto, vamos a coger este primer producto y vamos a ver los productos que se añadieron al carrito junto a ese primer producto

  2. De ese primer producto obtenido, vamos a guardar las sesiones de usuario que añadieron ese producto

sessions = df.select("user_session").filter("event_type='cart' AND product_id=4958")
  1. Obtengo ahora los productos que hubo en esas sesiones de usuario que no sea el producto original
products=df.select("product_id").filter("event_type='cart' AND product_id<>4958").filter(df["user_session"].isin(sessions["user_session"]))
  1. Muestro y cuento los productos obtenidos
products.select("product_id").show()
products.select("product_id").count()
  1. Evidentemente están todos los productos repetidos. Vamos a indicar que se muestren sólo una vez evitando duplicados
products = products.select("product_id").distinct()
products.select("product_id").show()
  1. Para poder hacer uso de estos datos, probablemente tendríamos que exportarlos. En este caso en el mismo formato csv(podríamos hacerlo en otro formato:json, parquet,...)
products.write.mode("overwrite").csv('file:///opt/spark/ejemplos/ejemplo1/salida')
  1. Si observas el resultado, es probable ver archivos tipo HDFS. Esto es porque los procesa en paralelo para mejorar la velocidad y eficiencia. También el correspondiente _SUCCESS con la misma funcionalidad que en HDFS

  2. Observa el resultado

ls /opt/spark/ejemplos/ejemplo1/salida/
cat /opt/spark/ejemplos/ejemplo1/salida/*.csv
  1. Esta es una prueba de concepto. Aquí podríamos haber añadido el número de veces que se ha repetido el producto para dar más prioridad mostrándolos en la tienda virtual. Así podemos mejorar el marketing y estamos haciendo mejor nuestro BI. Esta sería una buena práctica.

  2. Como bien sabemos, también podemos hacer uso de sql dentro de nuestro DataFrame

df.createOrReplaceTempView("datos")
spark.sql("SELECT * FROM datos LIMIT 5").show()
  1. Y hacer las consultas que queramos
spark.sql("SELECT * FROM datos WHERE event_type='cart' LIMIT 5").show()

Info

Puedes consultar todos los métodos usados en la documentación oficial

  1. Obtenemos un dataset. Para este ejemplo, vamos a usar el siguiente dataset de kaggle. Esta dataset contiene datos de comportamiento durante 5 meses (octubre de 2019 – febrero de 2020) de una tienda online de cosmética mediana.

Cada fila del archivo representa un evento. Todos los eventos están relacionados con productos y usuarios. Cada evento es como una relación de muchos a muchos entre productos y usuarios. Obsérvalo en la descripción del propio dataset en kaggle.

  1. Descargamos el dataset. Esta compuesto por 5 archivos csv, cada uno correspondiente a un mes.
curl -L -o ecommerce-events-history-in-cosmetics-shop.zip  https://www.kaggle.com/api/v1/datasets/download/mkechinov/ecommerce-events-history-in-cosmetics-shop
  1. Descomprimimos el archivo
unzip ecommerce-events-history-in-cosmetics-shop.zip -d ejemplo1/
  1. Lo copiamos en nuestro cluster
docker cp ejemplo1/2019-Oct.csv spark-master:/tmp/2019-Oct.csv
  1. Podéis observar como son más de 2 GB de datos
docker exec spark-master ls -lh /tmp/2019-Oct.csv
  1. Lógica a aplicar. En este caso vamos a usar la shell y explicar paso a paso los comandos a realizar.

  2. Ejecutamos pyspark

docker exec -it spark-master /opt/spark/bin/pyspark
  1. Creamos un DataFrame donde leemos los ficheros fuentes a través de RDDs
  • header='True' => Los ficheros que vamos a cagar tienen cabecera. La primera linea no es información
  • inferSchema='True' => Inserta el esquema, la información de la estructura de datos(nombres de columna y tipos de datos de columnas). Automáticamente nos lo carga y no tenemos que ponerle nombres a las columnas
  • csv => Tipo de dato a cargar y la ruta
df = spark.read.options(header='True', inferSchema='True').csv("file:///tmp/2019-Oct.csv")
  1. Obtenemos el número de registros
df.count()
  1. Imprimimos el esquema (ya recogido de inferSchema='True')
df.printSchema()
  1. Ya podemos empezar a hacer Big Data y Business Intelligence. Por ejemplo, podemos obtener información para ayudar a mejorar nuestras ventas. Como cuáles son nuestros productos estrella, cuáles son los artículos que se vendan más y cruzarlos con la fecha o estación del tiempo de venta para establecer estrategias nuevas,... En este caso, vamos a sacar todos los artículos que se han vendido alguna vez conjuntamente con otro artículo. Si un usuario compra un artículo ponerle al lado algunos otros artículos que sabemos que otros usuarios han comprado con este. Así intentamos subir las compras.

  2. Investigamos los datos que tenemos en nuestro DataFrame

  3. Vemos los tipos de eventos que tenemos en cada compra

df.select("event_type").distinct().show()
  1. También las marcas, por ejemplo
df.select("brand").distinct().show()
  1. Sólo me interesan los eventos de los productos que se han añadido al carrito
df.select("product_id").filter("event_type='cart'").show()
  1. Sólo el primero
df.select("product_id").filter("event_type='cart'").first()
  1. Como ejemplo de concepto, vamos a coger este primer producto y vamos a ver los productos que se añadieron al carrito junto a ese primer producto

  2. De ese primer producto obtenido, vamos a guardar las sesiones de usuario que añadieron ese producto

sessions = df.select("user_session").filter("event_type='cart' AND product_id=4958")
  1. Obtengo ahora los productos que hubo en esas sesiones de usuario que no sea el producto original
products=df.select("product_id").filter("event_type='cart' AND product_id<>4958").filter(df["user_session"].isin(sessions["user_session"]))
  1. Muestro y cuento los productos obtenidos
products.select("product_id").show()
products.select("product_id").count()
  1. Evidentemente están todos los productos repetidos. Vamos a indicar que se muestren sólo una vez evitando duplicados
products = products.select("product_id").distinct()
products.select("product_id").show()
  1. Para poder hacer uso de estos datos, probablemente tendríamos que exportarlos. En este caso en el mismo formato csv(podríamos hacerlo en otro formato:json, parquet,...)
products.write.mode("overwrite").csv('file:///tmp/salida')
  1. Si observas el resultado, es probable ver archivos tipo HDFS. Esto es porque los procesa en paralelo para mejorar la velocidad y eficiencia. También el correspondiente _SUCCESS con la misma funcionalidad que en HDFS

  2. Observa el resultado

docker exec spark-master ls -lh /tmp/salida/
  1. Esta es una prueba de concepto. Aquí podríamos haber añadido el número de veces que se ha repetido el producto para dar más prioridad mostrándolos en la tienda virtual. Así podemos mejorar el marketing y estamos haciendo mejor nuestro BI. Esta sería una buena práctica.

  2. Como bien sabemos, también podemos hacer uso de sql dentro de nuestro DataFrame

df.createOrReplaceTempView("datos")
spark.sql("SELECT * FROM datos LIMIT 5").show()
  1. Y hacer las consultas que queramos
spark.sql("SELECT * FROM datos WHERE event_type='cart' LIMIT 5").show()

2. Ejemplo 2. Eventos tienda virtual 2

  1. Usando el mismo ejemplo con el mismo dataset, vamos a realizar otro ejercicio. En este caso, vamos a ver cuantos productos se han comprado de una marca determinada.

  2. Mostramos la lista de marcas

df.select("brand").distinct().show()
  1. Vamos a hacer la búsqueda. Elegimos una marca, por ejemplo jaguar

  2. Usamos el objeto rdd donde le podemos pasar varias acciones.

  3. Dentro de las acciones de rdd usaremos flatMap. A flatMap, le podemos pasar una lambda o una función. En Java y Scala, le podemos añadir un conjunto de instrucciones. Sin embargo en python, solo le podemos pasar una instrucción (un lambda o una función). Así usaremos una función para poder realizar nuestro objetivo (que se aplicará a cada uno de los registros)

  4. Esta sería la función

def suma_prod_por_marcas(s):
    if s["brand"]=="jaguar" and s["event_type"]=="cart":
        return [ ( s["product_id"], 1) ]
    return []
  1. Esta función se la añadimos a la acción flatMap. Al método flatMap le aplicas la función al registro y te devuelve de 0 a n registros. Esta función nos devuelve otro RDD con los registros filtrados en la función

  2. Ahora recogemos todos los registros con la key correspondiente y le pasaremos una instrucción. En este caso una lambda. Recoge 2 registros y los suma (recuerda s["product_id"], 1). Por tanto suma cuando encuentra el producto de la marca determinada (en este caso 1, porque sumamos. Aquí podríamos hacer medias, medianas,....) y se la sumamos en el lambda. La instrucción final sería

lines = df.rdd.flatMap(suma_prod_por_marcas).reduceByKey(lambda a, b: a + b)
  1. Mostramos los datos. En este caso vamos a optar por devolver collect(). Cuidado si tenemos mucha cantidad de datos
for e in lines.collect():
    print(e)
  1. Con take(). Mismo caso que el anterior en cuanto a gran numero de datos
print(lines.take(20))
  1. Ahora vamos convertir el RDD en dataframe
lines.toDF().show()
  1. Guardamos
lines.saveAsTextFile('file:///opt/spark/ejemplos/ejemplo2/salida_shell')
  1. Ahora vamos a convertirlo en un programa que vamos a lanzar con spark-submit.

  2. Creamos el fichero ejemplo2.py. En mi caso en el directorio /opt/spark/ejemplos/ejemplo2/

ejemplo2.py
import sys
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Ejemplo2").getOrCreate() 

df = spark.read.options(header='True', inferSchema='True').csv(sys.argv[1])


def suma_prod_por_marcas(s):
    if s["brand"]=="jaguar" and s["event_type"]=="cart":
        return [ ( s["product_id"], 1) ]
    return []

lines = df.rdd.flatMap(suma_prod_por_marcas).reduceByKey(lambda a, b: a + b)

lines.saveAsTextFile(sys.argv[2])
  1. Hemos cambiado para indicar por parámetro los dataset de entrada y donde guardaremos por los datos con sys.argv[]

  2. Importante. Al enviarlo al cluster, para que todos los nodos tengan acceso al fichero, no pueden cargarse los ficheros fuente desde el sistema local donde se lanza spark-submit. Debemos subirlo a HDFS para que todo el cluster tenga acceso.

hdfs dfs -copyFromLocal /opt/spark/ejemplos/ejemplo1/*.csv /bda/spark/ejemplos
  1. Ejecutamos la app para guardarlo en HDFS (No podemos obtener ni guardar los datos en el sistema de ficheros, ya que los workers (nodo1, nodo2, nodo3) no pueden acceder al sistema de ficheros local)
spark-submit --master spark://192.168.11.10:7077 /opt/spark/ejemplos/ejemplo2/ejemplo2.py "/bda/spark/ejemplos/*.csv" "/bda/spark/ejemplos/ejemplo2/salida_submit"
hdfs dfs -ls /bda/spark/ejemplos/ejemplo2/salida_submit

Usaremos los mismos datos en HDFS que en el ejemplo anterior.

  1. Subir a HDFS
# Crear directorio en HDFS
docker exec namenode hdfs dfs -mkdir -p /bda/spark/ejemplo2

# En el caso de que no lo tengas descargado del ejemplo anterior
curl -L -o ecommerce-events-history-in-cosmetics-shop.zip  https://www.kaggle.com/api/v1/datasets/download/mkechinov/ecommerce-events-history-in-cosmetics-shop

# Copiar del Host al contenedor puente (namenode)
unzip ecommerce-events-history-in-cosmetics-shop.zip -d ejemplo2/
docker cp ejemplo2/ namenode:/tmp/

# Mover de /tmp a HDFS
docker exec namenode hdfs dfs -put -f /tmp/ejemplo2 /bda/spark/
  1. Código (VS Code). Crea el archivo ejemplo2.py. Usaremos los mismos datos en HDFS que en el ejemplo anterior.
ejemplo2.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc

# 1. Conexión Spark Connect
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

# 2. Lectura HDFS
# Aunque Spark Connect prefiere Parquet/ORC, lee CSV perfectamente
print("--- Leyendo datos ---")
df = spark.read.options(header='True', inferSchema='True') \
        .csv("hdfs://namenode:9000/bda/spark/ejemplo2/*.csv")

# 3. Lógica de Negocio (Equivalente al MapReduce)
# Objetivo: Contar productos de la marca 'jaguar' añadidos al 'carrito'

# A. Filtrado (Equivalente al flatMap/filter del RDD)
df_filtered = df.filter(
    (col("brand") == "jaguar") & 
    (col("event_type") == "cart")
)

# B. Agregación (Equivalente al reduceByKey)
# Agrupamos por producto y contamos
result_df = df_filtered.groupBy("product_id").count()

# 4. Mostrar resultados
print("--- Top 10 productos Jaguar en carrito ---")
result_df.orderBy(desc("count")).show(10)

# 5. Guardar
# Spark Connect maneja la escritura distribuida
output_path = "hdfs://namenode:9000/bda/spark/ejemplo2/salida_connect"
print(f"--- Guardando en {output_path} ---")

# Usamos mode("overwrite") para poder re-ejecutar el script sin error
result_df.write.mode("overwrite").csv(output_path)

spark.stop()
  1. Crea un entorno virtual y activa las librerías de cliente en el host (si no lo has hecho ya en los ejemplos anteriores)
python -m venv venv
source venv/bin/activate  # O venv\Scripts\activate en Windows
pip install pyspark-connect grpcio-status
  1. Observamos los logs del contenedor de Spark Connect para verificar la conexión y comprobar como se crean y ejecutan los jobs. También puedes abrir la webui de Spark Connect en http://spark-connect.localhost
docker logs -f spark-connect
  1. Ejecutamos el script
mkdir -p ejemplo2 # Aunque debería existir ya del paso de subir a HDFS
mv ejemplo2.py ejemplo2/
cd ejemplo2
python ejemplo2.py
  1. Comprobamos el resultado en HDFS
docker exec namenode hdfs dfs -ls /bda/spark/ejemplo2/salida_connect
docker exec namenode hdfs dfs -cat /bda/spark/ejemplo2/salida_connect/*.csv 

3. Ejemplo 3. Dataframes y esquemas

En este ejemplo vamos a ver diferentes ejemplos para la conversión y creación de DataFrame

3.1 Desde un RDD

  1. Podemos crear un DataFrame desde un RDD mediante toDF
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
rdd = spark.sparkContext.parallelize(subjects)
# Creamos un DataFrame y mostramos su esquema
df = rdd.toDF()
df.printSchema()
  1. El DataFrame resultante no tiene configurado el esquema. Lo añadimos
schema = ["nombre","Horas","Abreviatura"]
df = rdd.toDF(schema)
df.printSchema()

3.2 Usando createDataFrame

  1. Podemos crear un DataFrame con createDataFrame
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
schema = ["Nombre","Horas","Abreviatura"]
df = spark.createDataFrame(subjects, schema)
df.show()
  1. Podemos recoger información de nuestro DataFrame con describe
df.describe()
df.describe().show()

3.3 Dando esquemas más completos

Se puede describir un esquema más completo gracias a Structype. Este me permite dar tipos de datos más concretos a cada columna mediante el modelo StructType. Podemos asignar una gran variedad de tipos de datos: StringType, IntegerType, CharType, ArrayType,.... Aquí tienes la lista completa

from pyspark.sql.types import *
data = [("Alice", ["Java", "Scala"]), ("Bob", ["Python", "Scala"])]
schema = StructType([
    StructField("name", StringType()),
    StructField("languagesSkills", ArrayType(StringType())),
])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

df.show()
  1. Otro ejemplo
from pyspark.sql.types import *
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
schema = ["nombre","Horas","Abreviatura"]
schema= StructType([
    StructField("Nombre", StringType(), True),
    StructField("Horas", IntegerType(), True),
    StructField("Abreviatura", StringType(), True)
])
df = spark.createDataFrame(subjects, schema)
df.printSchema()
df.show()
df.describe()
df.describe().show()

3.4 Persistencia de DataFrame

  1. Podemos guardar los DataFrame con el método DataFrameWriter. En este recursos puedes observar los diferentes métodos que tiene(mode, format, options,...) y las diferentes configuraciones de cada uno
#Prueba alguno de ellos (Algunos esquemas no soportan la persistencia en ciertos formatos)
# CSV
df.write.csv("file:///opt/spark/ejemplos/ejemplo3/datos1.csv")
df.write.format("csv").save("file:///opt/spark/ejemplos/ejemplo3/datos2.csv")
df.write.format("csv").mode("overwrite").save("file:///opt/spark/ejemplos/ejemplo3/datos3.csv")
# JSON
df.write.json("file:///opt/spark/ejemplos/ejemplo3/datos1.json")
df.write.format("json").save("file:///opt/spark/ejemplos/ejemplo3/datos2.json")
# TXT
df.write.text("file:///opt/spark/ejemplos/ejemplo3/datos1.txt")
df.write.option("lineSep",";").text("file:///opt/spark/ejemplos/ejemplo3/datos2.txt")
df.write.format("txt").save("file:///opt/spark/ejemplos/ejemplo3/datos3.txt")

3.5 Esquema en Tienda virtual

#creamos el esquema
schema= StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("user_session", StringType(), True)
])

# Aplicando el esquema personalizado al DataFrame 

df_with_schema = spark.read.options(header='True').schema(schema).csv("file:///opt/spark/ejemplos/ejemplo1/*.csv")

df_with_schema.printSchema()
df_with_schema.show()
df_with_schema.describe()
df_with_schema.describe().show()
  1. Preparar HDFS
# Crear directorio en HDFS
docker exec namenode hdfs dfs -mkdir -p /bda/spark/ejemplo3
  1. Código (VS Code). Crea el archivo ejemplo3_schemas.py. Usaremos los mismos datos en HDFS que en el ejemplo anterior.

Warning

La función spark.createDataFrame(data, schema) cuando se usa con listas locales de Python en Spark Connect, intenta negociar una configuración de transferencia de datos (localRelationChunkSizeRows) que, por algún motivo de versión exacta o configuración de la imagen Docker 4.0.1, falla en la negociación RPC.

Cambiamos la estrategia. La forma más "Cloud Native" y robusta de crear pequeños DataFrames de prueba en Spark Connect (y que recomienda Databricks y la documentación moderna) no es subir listas locales, sino generar los datos directamente en el servidor usando SQL. Esto evita la transferencia de red y el error de configuración.

Comprueba para versiones posteriores si este problema persiste. Lo dejo comentado en el código al principio del mismo.

ejemplo3_schemas.py
#from pyspark.sql import SparkSession
#from pyspark.sql.types import StructType, StructField, StringType, IntegerType

#spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

# Definir Esquema Manual
#schema = StructType([
#    StructField("Nombre", StringType(), True),
#    StructField("Horas", IntegerType(), True),
#    StructField("Abreviatura", StringType(), True)
#])

#data = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD")]

# Crear DF
#df = spark.createDataFrame(data, schema)
#df.show()

# Persistencia en HDFS (Formatos modernos)
#base_path = "hdfs://namenode:9000/bda/spark/ejemplo3"

#print("Guardando JSON...")
#df.write.mode("overwrite").json(f"{base_path}/salida.json")

#print("Guardando Parquet...")
#df.write.mode("overwrite").parquet(f"{base_path}/salida.parquet")
#spark.stop()

from pyspark.sql import SparkSession
# Importamos los tipos para usarlos en castings explícitos
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import col

# 1. Conexión
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

print("--- Generando DataFrame en el Cluster ---")

# 2. Crear Datos (Estrategia Server-Side)
# Usamos SQL VALUES para generar los datos directamente en el motor de Spark.
# Esto es mucho más robusto en Spark Connect que enviar listas locales.
df = spark.sql("""
    SELECT * FROM VALUES
    ('Big Data Aplicado', 4, 'BDA'),
    ('Sistemas de Big Data', 3, 'SBD')
    AS datos(Nombre, Horas, Abreviatura)
""")

# 3. Aplicar Tipos Estrictos (Schema Enforcement)
# Forzamos los tipos de datos que queremos (equivalente a aplicar un StructType)
df_typed = df.select(
    col("Nombre").cast(StringType()),
    col("Horas").cast(IntegerType()),
    col("Abreviatura").cast(StringType())
)

print("--- Esquema definido: ---")
df_typed.printSchema()
df_typed.show()

# 4. Persistencia en HDFS
base_path = "hdfs://namenode:9000/bda/spark/ejemplo3"

print("Guardando JSON...")
df_typed.write.mode("overwrite").json(f"{base_path}/json")

print("Guardando Parquet (Formato Columnar)...")
df_typed.write.mode("overwrite").parquet(f"{base_path}/parquet")

# 5. Verificación de lectura
print("--- Verificando datos guardados en Parquet ---")
spark.read.parquet(f"{base_path}/parquet").show()

spark.stop()
3. Crea un entorno virtual y activa las librerías de cliente en el host (si no lo has hecho ya en los ejemplos anteriores)

python -m venv venv
source venv/bin/activate  # O venv\Scripts\activate en Windows
pip install pyspark-connect grpcio-status
  1. Ejecutamos el script
mkdir -p ejemplo3
mv ejemplo3_schemas.py ejemplo3/
cd ejemplo3
python ejemplo3_schemas.py
  1. Comprobamos el resultado en HDFS
docker exec namenode hdfs dfs -ls /bda/spark/ejemplo3/json/*.json
docker exec namenode hdfs dfs -cat /bda/spark/ejemplo3/json/*.json
docker exec namenode hdfs dfs -ls /bda/spark/ejemplo3/parquet/*.parquet 
docker exec namenode hdfs dfs -cat /bda/spark/ejemplo3/parquet/*.parquet 

Ejemplo 4. Uso de Pandas

Imagina que eres un analista de datos en una empresa de comercio electrónico. Se te ha proporcionado dos conjuntos de datos:

  • Ventas: Contiene información sobre las ventas diarias de productos (ventas.csv). El conjunto de datos de ventas incluye id_producto, fecha, y cantidad_vendida
  • Productos: Contiene detalles de los productos (productos.csv). El conjunto de datos de productos incluye id_producto, nombre_producto, y categoría.

Objetivo: Tu tarea es generar un informe que contenga el total de ventas por categoría de producto para el último año disponible en el conjunto de datos. Además, se requiere identificar el producto más vendido por categoría.

4.1 Crear y Empaquetar el Entorno Virtual (En el Nodo Master) (VM)

Vamos a crear un entorno aislado, instalar las librerías ahí y empaquetarlo en un archivo .tar.gz que Spark distribuirá automáticamente a todos los nodos.

  1. Instalar herramientas de entorno virtual (si no las tienes):
sudo apt-get update
sudo apt-get install python3-venv

Using Virtualenv

Siguiendo la documentación oficial de Spark para crear un entorno virtual aislado para cumplir con PEP 668 (no se puede usar pip global).

  1. Crear el entorno y activarlo:
cd /opt/spark/ejemplos/ejemplo4
# Creamos un entorno llamado 'pyspark_venv'
python3 -m venv pyspark_venv

# Lo activamos
source pyspark_venv/bin/activate
  1. Instalar las dependencias dentro del entorno:

Ahora sí, pip funcionará porque estamos en un entorno aislado. Instalamos también venv-pack para empaquetarlo.

pip install pandas pyarrow numpy venv-pack
  1. Empaquetar el entorno:

Usamos venv-pack para crear un archivo comprimido portable.

venv-pack -o pyspark_venv.tar.gz

  1. Desactivar el entorno:
deactivate

Ahora tenemos un archivo pyspark_venv.tar.gz en nuestra carpeta del ejemplo /opt/spark/ejemplos/ejemplo4. Este archivo contiene Python, Pandas y PyArrow listos para usar.

4.2 Preparando el entorno del ejemplo4 (VM)

  1. Descargamos los ficheros fuente.
mkdir -p /opt/spark/ejemplos/ejemplo4
cd /opt/spark/ejemplos/ejemplo4
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_productos.csv
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_ventas.csv
  1. Renombramos y movemos
mv Ejemplo4_productos.csv /opt/spark/ejemplos/ejemplo4/productos.csv
mv Ejemplo4_ventas.csv /opt/spark/ejemplos/ejemplo4/ventas.csv
hdfs dfs -mkdir -p /bda/spark/ejemplos/ejemplo4
hdfs dfs -put /opt/spark/ejemplos/ejemplo4/*.csv /bda/spark/ejemplos/ejemplo4/
  1. Nuestro código de ejemplo
ejemplo4_pandas.py
import pyspark.pandas as ps
# No importamos pandas estándar (pd) para evitar confusiones y asegurar ejecución distribuida
from pyspark.sql import SparkSession

# Iniciar sesión
spark = SparkSession.builder.appName("PandasOnSpark").getOrCreate()

# Habilitar Arrow para rendimiento
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

print("--- Leyendo datos desde HDFS ---")
# Usamos ps.read_csv (API de Pandas-on-Spark) en lugar de pd.read_csv
# Esto lee los datos de forma distribuida en los workers
ventas_df = ps.read_csv("hdfs://cluster-bda:9000/bda/spark/ejemplos/ejemplo4/ventas.csv")
productos_df = ps.read_csv("hdfs://cluster-bda:9000/bda/spark/ejemplos/ejemplo4/productos.csv")

# Conversión de tipos (Sintaxis Pandas)
ventas_df['fecha'] = ps.to_datetime(ventas_df['fecha'])

# Filtrado: Ventas del último año
# Nota: .max() aquí lanza un trabajo de Spark distribuido
max_year = ventas_df['fecha'].dt.year.max()
ventas_ultimo_anio = ventas_df[ventas_df['fecha'].dt.year == max_year]

print(f"--- Filtradas ventas del año {max_year} ---")

# Join (Merge)
print("--- Realizando JOIN ---")
ventas_productos = ventas_ultimo_anio.merge(productos_df, on='id_producto', how='inner')

# Agregación: Ventas por categoría
print("--- Agrupando por categoría ---")
resumen = ventas_productos.groupby('categoria').agg({'cantidad_vendida': 'sum'})
resumen.columns = ['Total_Ventas']

print(resumen)

# Persistencia: Guardar en HDFS
output_path = "hdfs://cluster-bda:9000/bda/spark/ejemplos/ejemplo4/resumen_ventas"
print(f"--- Guardando resultado en {output_path} ---")
resumen.to_csv(output_path)

spark.stop()

4.3: Ejecutar con el Entorno Virtual (VM)

Al lanzar spark-submit, debemos decirle a Spark:

  1. --archives: "Toma este archivo comprimido (pyspark_venv.tar.gz), envíalo a todos los workers y descomprímelo en una carpeta llamada entorno".
  2. PYSPARK_PYTHON: "Dentro de esa carpeta entorno, usa el binario de python que está en bin/python".

Ejecutamos esto desde donde tengas el .tar.gz y el script .py:

# Definimos que Spark use el python de nuestro entorno empaquetado
export PYSPARK_PYTHON=./entorno/bin/python

# Lanzamos el trabajo
spark-submit \
--master spark://192.168.11.10:7077 \
--archives pyspark_venv.tar.gz#entorno \
/opt/spark/ejemplos/ejemplo4/ejemplo4_pandas.py

Desglose del comando:

  • --archives pyspark_venv.tar.gz#entorno: El #entorno es un alias. Significa "Descomprime aquí y llama a la carpeta 'entorno'".
  • PYSPARK_PYTHON=./entorno/bin/python: Le dice a los workers que usen ese python descomprimido, que ya tiene pandas y pyarrow instalados.
  1. Observamos la salida de la ejecución y el resultado final guardado en HDFS
hdfs dfs -ls /bda/spark/ejemplos/ejemplo4/resumen_ventas
hdfs dfs -cat /bda/spark/ejemplos/ejemplo4/resumen_ventas/part-00000*.csv

Usaremos la API de Pandas (pyspark.pandas) sobre el clúster Docker.

Gestión de Dependencias en Docker

A diferencia de las VMs donde tuvimos que usar venv-pack, en Docker tenemos una ventaja: Todos los nodos son idénticos.

Sin embargo, la imagen oficial spark:4.0.1 NO trae pandas ni pyarrow. Antes de ejecutar este ejemplo, debemos instalarlos en los contenedores (Master y Workers).

Para instalar las librerías en todo el clúster:

# Ejecuta esto en tu terminal Host una sola vez
for container in spark-master spark-worker-1 spark-worker-2 spark-worker-3 spark-connect; do
    docker exec -u 0 $container pip install pandas pyarrow numpy
done

En un entorno de producción, crearíamos una imagen Docker personalizada (Dockerfile) con estas librerías preinstaladas.

4.1 Preparación de las dependencias (Docker)

Como hemos comentado en el cuadro de advertencia, debemos asegurarnos de que pandas y pyarrow están instalados en todos los contenedores del clúster.

Si no lo has hecho aún, ejecuta el siguiente comando en tu terminal Host:

for container in spark-master spark-worker-1 spark-worker-2 spark-worker-3 spark-connect; do
    docker exec -u 0 $container pip install pandas pyarrow numpy
done

4.2 Preparación de datos y entorno en nuestro host (Docker)

Versiones de Python

La imagen de Docker usa Python 3.10. Tu entorno local debe usar Python 3.10 para evitar errores de serialización (PYTHON_VERSION_MISMATCH).

Instala Python 3.10 en tu sistema (vía apt con PPA deadsnakes en Ubuntu).

sudo apt-get update
sudo apt-get install -y software-properties-common
sudo add-apt-repository ppa:deadsnakes/ppa
sudo apt-get update
sudo apt-get install -y python3.10 python3.10-venv
  1. Crea un entorno virtual y activa las librerías de cliente en el host (con la versión de python3.10, que es la que tiene el clúster con las imágenes Docker spark 4.0.1)
#Si tienes un entorno creado para los ejemplos anteriores, elimínalo o crea uno nuevo en otro directorio
# Si lo quieres eliminar y crear uno nuevo
deactivate  # Si tienes un venv activo
rm -rf venv

# Crear entorno con python3.10
python3.10 -m venv venv
source venv/bin/activate  # O venv\Scripts\activate en Windows
pip install pyspark-connect grpcio-status
  1. Creamos el directorio y descargamos (Host):
mkdir -p ejemplo4
cd ejemplo4
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_productos.csv
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_ventas.csv
  1. Subimos los archivos a HDFS:
docker exec namenode hdfs dfs -mkdir -p /bda/spark/ejemplo4

docker cp Ejemplo4_ventas.csv namenode:/tmp/
docker exec namenode hdfs dfs -put /tmp/Ejemplo4_ventas.csv /bda/spark/ejemplo4/ventas.csv

docker cp Ejemplo4_productos.csv namenode:/tmp/
docker exec namenode hdfs dfs -put /tmp/Ejemplo4_productos.csv /bda/spark/ejemplo4/productos.csv

4.3 Código fuente en VS Code (Docker)

Creamos ejemplo4_pandas.py.

import os
import sys

# --- CONFIGURACIÓN PREVIA (Evita Warnings) ---
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

import pyspark.pandas as ps
from pyspark.sql import SparkSession

# Conectar al clúster Docker
spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()

# Optimización Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

print("--- Leyendo datos desde HDFS ---")
# Leemos directamente de HDFS usando la API de Pandas distribuida
# Nota: La ruta es la interna del contenedor
path_ventas = "hdfs://namenode:9000/bda/spark/ejemplo4/ventas.csv"
path_prods = "hdfs://namenode:9000/bda/spark/ejemplo4/productos.csv"

ventas_df = ps.read_csv(path_ventas)
productos_df = ps.read_csv(path_prods)

# Transformación (API Pandas estándar)
ventas_df['fecha'] = ps.to_datetime(ventas_df['fecha'])
max_year = ventas_df['fecha'].dt.year.max()
ventas_ultimo_anio = ventas_df[ventas_df['fecha'].dt.year == max_year]

print(f"--- Filtradas ventas del año {max_year} ---")

# Join
ventas_productos = ventas_ultimo_anio.merge(productos_df, on='id_producto', how='inner')

# Agregación
resumen = ventas_productos.groupby('categoria').agg({'cantidad_vendida': 'sum'})
resumen.columns = ['Total_Ventas']

print(resumen)

# Producto más vendido
ventas_prod = ventas_productos.groupby(['categoria', 'nombre_producto']).agg({'cantidad_vendida':'sum'}).reset_index()
top_prod = ventas_prod.sort_values(['categoria', 'cantidad_vendida'], ascending=[True, False]) \
                    .drop_duplicates(subset=['categoria'], keep='first')

print(top_prod)

# Persistencia en HDFS
output_path = "hdfs://namenode:9000/bda/spark/ejemplo4/resumen_ventas"
print(f"--- Guardando en {output_path} ---")
resumen.to_csv(output_path)

spark.stop()

4.4 Ejecución (Docker)

Lanzamos el script desde nuestra terminal local:

python ejemplo4_pandas.py

4.5 Verificación de Resultados (Docker)

Puedes verificar los resultados en HDFS:

docker exec namenode hdfs dfs -ls /bda/spark/ejemplo4/resumen_ventas
docker exec namenode hdfs dfs -cat /bda/spark/ejemplo4/resumen_ventas/part-00000*.csv

Si ves los resultados, ¡enhorabuena! Hemos ejecutado un pipeline de Pandas distribuido sobre un clúster Docker.