Índices Secundarios en Spark con Hyperspace

  • Tiempo de lectura:9 minutos de lectura
  • Autor de la entrada:
  • Última modificación de la entrada:28/12/2024

En esta entrada comprenderás qué son los índices secundarios o los subíndices en Apache Spark y una librería open source llamada Hyperspace que te ayudará a crearlos fácilmente.

Indices en Spark con Hyperspace

¿Qué es Hyperspace y por qué usar índices secundarios en Spark?

Hyperspace es una librería open source para Apache Spark desarrollada por Microsoft e integrada en Synapse.

Trabaja sobre ficheros parquet, CSV y Json sin modificar la tabla, aunque genera información adicional. También soporta el formato Delta Lake.

Comúnmente, los ficheros parquet se particionan por un campo fecha. Sin embargo, luego se opera con otras columnas. Por ejemplo, en procesos de medición de calidad de las tablas, se puede necesitar realizar operaciones por cliente. Al estar particionado por fecha, se necesita recorrer toda la tabla y es ineficiente.

Hyperspace también evita tener que hacer una copia de la tabla con otro índice.

¿Cómo funciona Hyperspace en Spark?

Hyperspace se puede usar en cualquier entorno Spark cargando la librería. Quizá lo más sencillo es usarlo en nuestros notebooks de Databricks. Para ello, deberemos cargar la librería desde la configuración del clúster, como se muestra en la imagen. Una vez cargada, podemos arrancar el clúster y estará lista para usarla.

Librería Hyperspace en clúster de Databricks
Librería Hyperspace en clúster de Databricks

Después, podemos importar la librería y crear una variable Hyperspace en nuestro programa

val df = spark.read.parquet("FileStore/financial_dataset/")
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._
val hs = new Hyperspace(spark)
hs.indexes.show

Para crear un índice sobre un dataset, usaremos la función createIndex. IndexedColumns son los nombres de las columnas usadas para join o filtros. IncludedColumns son los nombres de las columnas usadas para las operaciones:

hs.createIndex(df, IndexConfig("index", indexedColumns = Seq("nameOrig"), includedColumns = Seq("step")))
hs.indexes.show
+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets|              schema|       indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index|    [nameOrig]|         [step]|       200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+

Ahora, ya podemos crear una consulta y ejecutarla activando los índices:

val query = df.filter(df("nameOrig") === "C1305486145").select("step")
hs.explain(query, verbose = true)
+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets|              schema|       indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index|    [nameOrig]|         [step]|       200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
=============================================================
Plan with indexes:
=============================================================
Project [step#6517]
+- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145))
   <----+- FileScan Hyperspace(Type: CI, Name: index, LogVersion: 1) [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/indexes/index/v__=0/part-00000-tid-515507262485414933..., PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string,step:int>---->
=============================================================
Plan without indexes:
=============================================================
Project [step#6517]
+- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145))
   <----+- FileScan parquet [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/Filestore/financial_dataset], PartitionCount: 743, PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string>---->
=============================================================
Indexes used:
=============================================================
index:dbfs:/user/hive/warehouse/indexes/index/v__=0
=============================================================
Physical operator stats:
=============================================================
+------------------------------------------------------+-------------------+------------------+----------+
|                                     Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+------------------------------------------------------+-------------------+------------------+----------+
|*Scan Hyperspace(Type: CI, Name: index, LogVersion: 1)|                  0|                 1|         1|
|                                         *Scan parquet|                  1|                 0|        -1|
|                                                Filter|                  1|                 1|         0|
|                                               Project|                  1|                 1|         0|
|                                     WholeStageCodegen|                  1|                 1|         0|
+------------------------------------------------------+-------------------+------------------+----------+
query2: org.apache.spark.sql.DataFrame = [step: int]
spark.enableHyperspace
query.show

Por último, si queremos eliminar el índice, deberemos hacer un deleteIndex seguido de vacuumIndex para eliminarlo permanentemente del sistema de ficheros.

hs.refreshIndex("index")
hs.deleteIndex("index")
hs.vacuumIndex("index")

Con esto hemos revisado brevemente cómo usar índices secundarios en nuestro entorno de Databricks sobre ficheros parquet.

Siguientes Pasos, Formación y Cursos de Databricks

Aquí tienes mis dos cursos recomendados para que aprendas de forma eficiente Databricks, de los mejores que encontrarás online para cualquier nivel:

curso databricks coursera

Azure Databricks & Spark For Data Engineers

Este curso te preparará para comprender y sacar todo el partido posible al ecosistema de Databricks. Se centra en Azure y podrás seguir de forma gratuita todas las lecciones prácticas. Podrás aprender todos los fundamentos de Databricks y la API de Apache Spark para desarrollar trabajos de transformación de datos.

Curso Databricks Certificacion

Azure Databricks para Ingeniería de Datos

Este curso te preparará para la certificación de Databricks Certified Data Engineer Associate. Incluye prácticas que no pueden faltar antes de que hagas el examen. Recuerda que si tienes algún conocimiento de SQL y Python te acelerará el aprendizaje.

También, tienes este libro disponible en Amazon: Beginning Apache Spark Using Azure Databricks

Deja una respuesta