Saltar a contenido

UD 6 - Apache Spark - Get Started

Vamos a realizar algunos ejemplos de prueba de uso de Spark con PySpark en nuestro cluster ya configurado

Para probar los ejemplos, vamos a combinar las opciones de ejecución de Spark:

  • pyspark (usando Jupyter en lugar de la shell si lo crees necesario)-
  • crear un fichero .py y lanzarlo directamente con spark-submit

1. Ejemplo 1. Eventos tienda virtual

Info

Puedes consultar todos los métodos usados en la documentación oficial

  1. Obtenemos un dataset. Para este ejemplo, vamos a usar el siguiente dataset de kaggle. Esta dataset contiene datos de comportamiento durante 5 meses (octubre de 2019 – febrero de 2020) de una tienda online de cosmética mediana.

Cada fila del archivo representa un evento. Todos los eventos están relacionados con productos y usuarios. Cada evento es como una relación de muchos a muchos entre productos y usuarios. Obsérvalo en la descripción del propio dataset en kaggle.

  1. Descargamos el dataset. Esta compuesto por 5 archivos csv, cada uno correspondiente a un mes.

  2. Lo copiamos en nuestro cluster

scp archive.zip hadoop@192.168.56.10:/home/hadoop
  1. Lo movemos al directorio que queramos. En mi caso /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1
mkdir -p /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1
mv archive.zip /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/
cd /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/
  1. Descomprimimos el archivo
unzip archive.zip
  1. Podéis observar como son más de 2 GB de datos
ls -lh
  1. Lógica a aplicar. En este caso vamos a usar la shell y explicar paso a paso los comandos a realizar.

  2. Ejecutamos pyspark

  3. Creamos un DataFrame donde leemos los ficheros fuentes a través de RDDs

  • header='True' => Los ficheros que vamos a cagar tienen cabecera. La primera linea no es información
  • inferSchema='True' => Inserta el esquema, la información de la estructura de datos(nombres de columna y tipos de datos de columnas). Automáticamente nos lo carga y no tenemos que ponerle nombres a las columnas
  • csv => Tipo de dato a cargar y la ruta
df = spark.read.options(header='True', inferSchema='True').csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/*.csv")
  1. Obtenemos el número de registros
df.count()
  1. Imprimimos el esquema (ya recogido de inferSchema='True')

df.printSchema()
12. Ya podemos empezar a hacer Big Data y Business Intelligence. Por ejemplo, podemos obtener información para ayudar a mejorar nuestras ventas. Como cuáles son nuestros productos estrella, cuáles son los artículos que se vendan más y cruzarlos con la fecha o estación del tiempo de venta para establecer estrategias nuevas,...

En este caso, vamos a sacar todos los artículos que se han vendido alguna vez conjuntamente con otro artículo. Si un usuario compra un artículo ponerle al lado algunos otros artículos que sabemos que otros usuarios han comprado con este. Así intentamos subir las compras.

  1. Investigamos los datos que tenemos en nuestro DataFrame

  2. Vemos los tipos de eventos que tenemos en cada compra

df.select("event_type").distinct().show()
  1. También las marcas, por ejemplo
df.select("brand").distinct().show()
  1. Sólo me interesan los eventos de los productos que se han añadido al carrito
df.select("product_id").filter("event_type='cart'").show()
  1. Sólo el primero
df.select("product_id").filter("event_type='cart'").first()
  1. Como ejemplo de concepto, vamos a coger este primer producto y vamos a ver los productos que se añadieron al carrito junto a ese primer producto

  2. De ese primer producto obtenido, vamos a guardar las sesiones de usuario que añadieron ese producto

sessions = df.select("user_session").filter("event_type='cart' AND product_id=4958")
  1. Obtengo ahora los productos que hubo en esas sesiones de usuario que no sea el producto original
products=df.select("product_id").filter("event_type='cart' AND product_id<>4958").filter(df["user_session"].isin(sessions["user_session"]))
  1. Muestro y cuento los productos obtenidos
products.select("product_id").show()
products.select("product_id").count()
  1. Evidentemente están todos los productos repetidos. Vamos a indicar que se muestren sólo una vez evitando duplicados
products = products.select("product_id").distinct()
products.select("product_id").show()
  1. Para poder hacer uso de estos datos, probablemente tendríamos que exportarlos. En este caso en el mismo formato csv(podríamos hacerlo en otro formato:json, parquet,...)
products.write.mode("overwrite").csv('file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/salida')
  1. Si observas el resultado, es probable ver archivos tipo HDFS. Esto es porque los procesa en paralelo para mejorar la velocidad y eficiencia. También el correspondiente _SUCCESS con la misma funcionalidad que en HDFS

  2. Observa el resultado

ls /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/salida/
cat /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/salida/*.csv
  1. Esta es una prueba de concepto. Aquí podríamos haber añadido el número de veces que se ha repetido el producto para dar más prioridad mostrándolos en la tienda virtual. Así podemos mejorar el marketing y estamos haciendo mejor nuestro BI. Esta sería una buena práctica.

  2. Como bien sabemos, también podemos hacer uso de sql dentro de nuestro DataFrame

df.createOrReplaceTempView("datos")
spark.sql("SELECT * FROM datos LIMIT 5").show()
  1. Y hacer las consultas que queramos
spark.sql("SELECT * FROM datos WHERE event_type='cart' LIMIT 5").show()

2. Ejemplo 2. Eventos tienda virtual 2

Info

Puedes consultar todos los métodos usados en la documentación oficial

  1. Usando el mismo ejercicio con el mismo dataset, vamos a realizar otro ejercicio. En este caso, vamos a ver cuantos productos se han comprado de una marca determinada.

  2. Mostramos la lista de marcas

df.select("brand").distinct().show()
  1. Vamos a hacer la búsqueda. Elegimos una marca, por ejemplo jaguar

  2. Usamos el objeto rdd donde le podemos pasar varias acciones.

  3. Dentro de las acciones de rdd usaremos flatMap. A flatMap, le podemos pasar una lambda o una función. En Java y Scala, le podemos añadir un conjunto de instrucciones. Sin embargo en python, solo le podemos pasar una instrucción (un lambda o una función). Así usaremos una función para poder realizar nuestro objetivo (que se aplicará a cada uno de los registros)
  4. Esta sería la función
def suma_prod_por_marcas(s):
    if s["brand"]=="jaguar" and s["event_type"]=="cart":
        return [ ( s["product_id"], 1) ]
    return []
  1. Esta función se la añadimos a la acción flatMap. Al método flatMap le aplicas la función al registro y te devuelve de 0 a n registros. Esta función nos devuelve otro RDD con los registros filtrados en la función

  2. Y ahora . Recoge todos los registros con la key que le hemos pasado y le pasaremos una instrucción. En este caso una lambda. Recoge 2 registros y los suma ( recuerda s["product_id"], 1). Por tanto suma cuando encuentra el producto de la marca determinada (en este caso 1, porque sumamos. Aquí podríamos hacer medias, medianas,....) y se la sumamos en el lambda. La instrucción final sería

lines = df.rdd.flatMap(suma_prod_por_marcas).reduceByKey(lambda a, b: a + b)
  1. Mostramos los datos. En este caso vamos a optar por devolver collect(). Cuidado si tenemos mucha cantidad de datos
for e in lines.collect():
    print(e)
  1. Con take(). Mismo caso que el anterior en cuanto a gran numero de datos
print(lines.take(20))
  1. Ahora vamos convertir el RDD en dataframe
lines.toDF().show()
  1. Guardamos
lines.saveAsTextFile('file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo2/salida_shell')
  1. Ahora vamos a convertirlo en un programa que vamos a lanzar con spark-submit.

  2. Creamos el fichero ejemplo2.py. En mi caso en el directorio /opt/hadoop-3.4.1/spark/ejemplos/ejemplo2/

ejemplo2.py
import sys
from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName("Ejemplo2").getOrCreate() 

df = spark.read.options(header='True', inferSchema='True').csv(sys.argv[1])


def suma_prod_por_marcas(s):
    if s["brand"]=="jaguar" and s["event_type"]=="cart":
        return [ ( s["product_id"], 1) ]
    return []

lines = df.rdd.flatMap(suma_prod_por_marcas).reduceByKey(lambda a, b: a + b)

lines.saveAsTextFile(sys.argv[2])
  1. Hemos cambiado para indicar por parámetro los dataset de entrada y donde guardaremos por los datos con sys.argv[]

  2. Importante. Al enviarlo al cluster, para que todos los nodos tengan acceso al fichero, no pueden cargarse los ficheros fuente desde el sistema local donde se lanza spark-submit. Debemos subirlo a HDFS para que todo el cluster tenga acceso.

hdfs dfs -copyFromLocal /opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/*.csv /bda/spark/ejemplos
  1. Ejecutamos la app para guardarlo en HDFS (No podemos obtener ni guardar los datos en el sistema de ficheros, ya que los workers (nodo1, nodo2, nodo3) no pueden acceder al sistema de ficheros local)
spark-submit --master spark://192.168.11.10:7077 /opt/hadoop-3.4.1/spark/ejemplos/ejemplo2/ejemplo2.py "/bda/spark/ejemplos/*.csv" "/bda/spark/ejemplos/ejemplo2/salida_submit"
hdfs dfs -ls /bda/spark/ejemplos/ejemplo2/salida_submit

3. Ejemplo 3. Dataframes y esquemas

Info

Puedes consultar todos los métodos usados en la documentación oficial

En este ejemplo vamos a ver diferentes ejemplos para la conversión y creación de DataFrame

3.1 Desde un RDD

  1. Podemos crear un DataFrame desde un RDD mediante toDF
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
rdd = spark.sparkContext.parallelize(subjects)
# Creamos un DataFrame y mostramos su esquema
df = rdd.toDF()
df.printSchema()
  1. El DataFrame resultante no tiene configurado el esquema. Lo añadimos
schema = ["nombre","Horas","Abreviatura"]
df = rdd.toDF(schema)
df.printSchema()

3.2 Usando createDataFrame

  1. Podemos crear un DataFrame con createDataFrame
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
schema = ["Nombre","Horas","Abreviatura"]
df = spark.createDataFrame(subjects, schema)
df.show()
  1. Podemos recoger información de nuestro DataFrame con describe
df.describe()
df.describe().show()

3.3 Dando esquemas más completos

Se puede describir un esquema más completo gracias a Structype. Este me permite dar tipos de datos más concretos a cada columna mediante el modelo StructType. Podemos asignar una gran variedad de tipos de datos: StringType, IntegerType, CharType, ArrayType,.... Aquí tienes la lista completa

from pyspark.sql.types import *
data = [("Alice", ["Java", "Scala"]), ("Bob", ["Python", "Scala"])]
schema = StructType([
    StructField("name", StringType()),
    StructField("languagesSkills", ArrayType(StringType())),
])

df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()

df.show()
  1. Otro ejemplo
from pyspark.sql.types import *
subjects = [("Big Data Aplicado", 4, "BDA"), ("Sistemas de Big Data", 3, "SBD"), ("Programación de Inteligencia Artificial", 7, "PIA"),("Sistemas de Aprendizaje Automático", 3, "SAA"),("Modelos de Inteligencia Artificial", 3, "MIA")]
schema = ["nombre","Horas","Abreviatura"]
schema= StructType([
    StructField("Nombre", StringType(), True),
    StructField("Horas", IntegerType(), True),
    StructField("Abreviatura", StringType(), True)
])
df = spark.createDataFrame(subjects, schema)
df.printSchema()
df.show()
df.describe()
df.describe().show()

3.4 Persistencia de DataFrame

  1. Podemos guardar los DataFrame con el método DataFrameWriter. En este recursos puedes observar los diferentes métodos que tiene(mode, format, options,...) y las diferentes configuraciones de cada uno
#Prueba alguno de ellos (Algunos esquemas no soportan la persistencia en ciertos formatos)
# CSV
df.write.csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos1.csv")
df.write.format("csv").save("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos2.csv")
df.write.format("csv").mode("overwrite").save("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos3.csv")
# JSON
df.write.json("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos1.json")
df.write.format("json").save("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos2.json")
# TXT
df.write.text("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos1.txt")
df.write.option("lineSep",";").text("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos2.txt")
df.write.format("txt").save("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo3/datos3.txt")

3.5 Esquema en Tienda virtual

#creamos el esquema
schema= StructType([
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("user_session", StringType(), True)
])

# Aplicando el esquema personalizado al DataFrame 

df_with_schema = spark.read.options(header='True').schema(schema).csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo1/*.csv")

df_with_schema.printSchema()
df_with_schema.show()
df_with_schema.describe()
df_with_schema.describe().show()

Ejemplo 4. Uso de Pandas

Imagina que eres un analista de datos en una empresa de comercio electrónico. Se te ha proporcionado dos conjuntos de datos:

  • Ventas: Contiene información sobre las ventas diarias de productos (ventas.csv). El conjunto de datos de ventas incluye id_producto, fecha, y cantidad_vendida
  • Productos: Contiene detalles de los productos (productos.csv). El conjunto de datos de productos incluye id_producto, nombre_producto, y categoría.

Objetivo: Tu tarea es generar un informe que contenga el total de ventas por categoría de producto para el último año disponible en el conjunto de datos. Además, se requiere identificar el producto más vendido por categoría.

  1. Descargamos los ficheros fuente.
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_productos.csv
wget https://gist.githubusercontent.com/jaimerabasco/3f5475b8dcbc5f1b18f7aa5f8b267cfa/raw/c69bc8880a4fab2762ceaeb0047c5b790d6abe01/Ejemplo4_ventas.csv
  1. Renombramos y movemos
mv Ejemplo4_productos.csv /opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/productos.csv
mv Ejemplo4_ventas.csv /opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/ventas.csv
  1. Inicializamos el entorno
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

# Iniciar sesión de Spark (Si vamos a crear una app. Si es en Jupyter o Databricks no es necesario)
spark = SparkSession.builder.appName("PandasOnSpark").getOrCreate()

# Habilitamos Apache Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
  1. Cargamos los datos
# Cargamos los datos
ventas_df = pd.read_csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/ventas.csv")
productos_df = pd.read_csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/productos.csv")
  1. Exploramos los datos
ventas_df
productos_df
  1. Transformación de los datos. Vamos a convertir la columna fecha en el formato correcto. Filtramos también las ventas del último año
# Transformación de datos
# Asumiendo que `fecha` es una cadena de texto en el formato 'yyyy-MM-dd'
ventas_df['fecha'] = ventas_df['fecha'].astype('datetime64[ns]')
ventas_ultimo_año_df = ventas_df[ventas_df['fecha'].dt.year == ventas_df['fecha'].dt.year.max()]
  1. Combinamos los productos con las ventas del último año usando id_producto como clave para enriquecer el conjunto de datos de ventas con nombre_producto y categoria.
# Combinación de los conjuntos de datos
ventas_productos_df = ventas_ultimo_año_df.merge(productos_df, on='id_producto', how='inner')

# Visualización preliminar de los datos combinados
print(ventas_productos_df.head())

# DataFrame completo
ventas_productos_df
  1. Total de ventas por categorías
# Agrupar por categoría y sumar las cantidades vendidas
ventas_por_categoria = ventas_productos_df.groupby('categoria').agg({'cantidad_vendida':'sum'}).reset_index()

# Renombrar las columnas para claridad
ventas_por_categoria.columns = ['Categoria', 'Total_Ventas']

# Mostrar el total de ventas por categoría
ventas_por_categoria
  1. Producto Más Vendido por Categoría
# Agregamos la cantidad vendida por producto y categoría
ventas_producto_categoria = ventas_productos_df.groupby(['categoria', 'nombre_producto']).agg({'cantidad_vendida':'sum'}).reset_index()

# Ordenamos por categoría y cantidad vendida para encontrar el más vendido por categoría
producto_mas_vendido_por_categoria = ventas_producto_categoria.sort_values(['categoria', 'cantidad_vendida'], ascending=[True, False])

# Eliminamos duplicados, manteniendo el primer producto (el más vendido) de cada categoría
producto_mas_vendido_por_categoria = producto_mas_vendido_por_categoria.drop_duplicates(subset=['categoria'], keep='first').reset_index(drop=True)

# Renombramos columnas para claridad
producto_mas_vendido_por_categoria.columns = ['Categoria', 'Producto_Mas_Vendido', 'Total_Ventas']

# Mostramos el producto más vendido por categoría
producto_mas_vendido_por_categoria
  1. Generación de Informes (Persistencia de los datos). Finalmente, para generar el informe final, mostramos los DataFrame generados o exportarlos a archivos CSV para una distribución más sencilla.
# Exportamos el total de ventas por categoría a CSV
ventas_por_categoria.to_csv('ventas_por_categoria.csv', index=False)

# Exportamos el producto más vendido por categoría a CSV
producto_mas_vendido_por_categoria.to_csv('producto_mas_vendido_por_categoria.csv', index=False)
  1. Código completo
ejemplo4_pandas.py
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

# Iniciar sesión de Spark (Si vamos a crear una app. Si es en Jupyter o Databricks no es necesario)
spark = SparkSession.builder.appName("PandasOnSpark").getOrCreate()

# Habilitamos Apache Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Cargamos los datos
ventas_df = pd.read_csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/ventas.csv")
productos_df = pd.read_csv("file:///opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/productos.csv")

ventas_df

productos_df

# Transformación de datos
# Asumiendo que `fecha` es una cadena de texto en el formato 'yyyy-MM-dd'
ventas_df['fecha'] = ventas_df['fecha'].astype('datetime64[ns]')
ventas_ultimo_año_df = ventas_df[ventas_df['fecha'].dt.year == ventas_df['fecha'].dt.year.max()]

# Combinación de los conjuntos de datos
ventas_productos_df = ventas_ultimo_año_df.merge(productos_df, on='id_producto', how='inner')

# Visualización preliminar de los datos combinados
print(ventas_productos_df.head())

# DataFrame completo
ventas_productos_df

# Agrupar por categoría y sumar las cantidades vendidas
ventas_por_categoria = ventas_productos_df.groupby('categoria').agg({'cantidad_vendida':'sum'}).reset_index()

# Renombrar las columnas para claridad
ventas_por_categoria.columns = ['Categoria', 'Total_Ventas']

# Mostrar el total de ventas por categoría
ventas_por_categoria

# Agregamos la cantidad vendida por producto y categoría
ventas_producto_categoria = ventas_productos_df.groupby(['categoria', 'nombre_producto']).agg({'cantidad_vendida':'sum'}).reset_index()

# Ordenamos por categoría y cantidad vendida para encontrar el más vendido por categoría
producto_mas_vendido_por_categoria = ventas_producto_categoria.sort_values(['categoria', 'cantidad_vendida'], ascending=[True, False])

# Eliminamos duplicados, manteniendo el primer producto (el más vendido) de cada categoría
producto_mas_vendido_por_categoria = producto_mas_vendido_por_categoria.drop_duplicates(subset=['categoria'], keep='first').reset_index(drop=True)

# Renombramos columnas para claridad
producto_mas_vendido_por_categoria.columns = ['Categoria', 'Producto_Mas_Vendido', 'Total_Ventas']

# Mostramos el producto más vendido por categoría
producto_mas_vendido_por_categoria

# Exportamos el total de ventas por categoría a CSV
ventas_por_categoria.to_csv('ventas_por_categoria.csv', index=False)

# Exportamos el producto más vendido por categoría a CSV
producto_mas_vendido_por_categoria.to_csv('producto_mas_vendido_por_categoria.csv', index=False)
  1. Lo ejecutamos en spark.

Warning

Recuerda que si te da error AttributeError:np.NaNwas removed in the NumPy 2.0 release. Usenp.naninstead., debes hacer un downgradear a una versión de numpy compatible. Revisa la documentación que vimos en el punto anterior

spark-submit --master spark://192.168.11.10:7077 /opt/hadoop-3.4.1/spark/ejemplos/ejemplo4/ejemplo4_pandas.py