Índices Secundarios en Spark con Hyperspace

Última actualización: 27/06/2022

En esta entrada comprenderás qué son los índices secundarios o los subíndices en Apache Spark y una librería open source llamada Hyperspace que te ayudará a crearlos fácilmente.

Indices en Spark con Hyperspace

¿Qué es Hyperspace y por qué usar índices secundarios en Spark?

Hyperspace es una librería open source para Apache Spark desarrollada por Microsoft e integrada en Synapse.

Trabaja sobre ficheros parquet, CSV y Json sin modificar la tabla, aunque genera información adicional. También soporta el formato Delta Lake.

Comúnmente, los ficheros parquet se particionan por un campo fecha. Sin embargo, luego se opera con otras columnas. Por ejemplo, en procesos de medición de calidad de las tablas, se puede necesitar realizar operaciones por cliente. Al estar particionado por fecha, se necesita recorrer toda la tabla y es ineficiente.

Hyperspace también evita tener que hacer una copia de la tabla con otro índice.

¿Cómo funciona Hyperspace en Spark?

Hyperspace se puede usar en cualquier entorno Spark cargando la librería. Quizá lo más sencillo es usarlo en nuestros notebooks de Databricks. Para ello, deberemos cargar la librería desde la configuración del clúster, como se muestra en la imagen. Una vez cargada, podemos arrancar el clúster y estará lista para usarla.

Librería Hyperspace en clúster de Databricks
Librería Hyperspace en clúster de Databricks

Después, podemos importar la librería y crear una variable Hyperspace en nuestro programa

val df = spark.read.parquet("FileStore/financial_dataset/")

import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._

val hs = new Hyperspace(spark)
hs.indexes.show

Para crear un índice sobre un dataset, usaremos la función createIndex. IndexedColumns son los nombres de las columnas usadas para join o filtros. IncludedColumns son los nombres de las columnas usadas para las operaciones:

hs.createIndex(df, IndexConfig("index", indexedColumns = Seq("nameOrig"), includedColumns = Seq("step")))

hs.indexes.show

+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets|              schema|       indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index|    [nameOrig]|         [step]|       200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+

Ahora, ya podemos crear una consulta y ejecutarla activando los índices:

val query = df.filter(df("nameOrig") === "C1305486145").select("step")
hs.explain(query, verbose = true)

+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets|              schema|       indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index|    [nameOrig]|         [step]|       200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+




=============================================================
Plan with indexes:
=============================================================
Project [step#6517]
+- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145))
   <----+- FileScan Hyperspace(Type: CI, Name: index, LogVersion: 1) [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/indexes/index/v__=0/part-00000-tid-515507262485414933..., PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string,step:int>---->

=============================================================
Plan without indexes:
=============================================================
Project [step#6517]
+- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145))
   <----+- FileScan parquet [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/Filestore/financial_dataset], PartitionCount: 743, PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string>---->

=============================================================
Indexes used:
=============================================================
index:dbfs:/user/hive/warehouse/indexes/index/v__=0

=============================================================
Physical operator stats:
=============================================================
+------------------------------------------------------+-------------------+------------------+----------+
|                                     Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference|
+------------------------------------------------------+-------------------+------------------+----------+
|*Scan Hyperspace(Type: CI, Name: index, LogVersion: 1)|                  0|                 1|         1|
|                                         *Scan parquet|                  1|                 0|        -1|
|                                                Filter|                  1|                 1|         0|
|                                               Project|                  1|                 1|         0|
|                                     WholeStageCodegen|                  1|                 1|         0|
+------------------------------------------------------+-------------------+------------------+----------+

query2: org.apache.spark.sql.DataFrame = [step: int]

spark.enableHyperspace
query.show

Por último, si queremos eliminar el índice, deberemos hacer un deleteIndex seguido de vacuumIndex para eliminarlo permanentemente del sistema de ficheros.

hs.refreshIndex("index")
hs.deleteIndex("index")
hs.vacuumIndex("index")

Con esto hemos revisado brevemente cómo usar índices secundarios en nuestro entorno de Databricks sobre ficheros parquet.

Aprende sobre Databricks con estos Cursos y Recursos

Para aprender más sobre Databricks, te recomiendo uno de estos dos cursos. En mi opinión de los mejores que encontrarás online para cualquier nivel.

curso databricks coursera

Especialización en Ciencia de Datos con Databricks

Este curso ofrecido por Databricks en la plataforma de formación online Coursera es de los más populares. Podrás aprender todos los fundamentos de Databricks y la API de Apache Spark para desarrollar trabajos de transformación de datos. Si tienes algún conocimiento de SQL y Python te acelerará el aprendizaje.

curso azure databricks coursera

Azure Databricks para Ingeniería de Datos

Este curso ofrecido por Microsoft en Coursera te enseñará todo lo que necesitarás de Databricks en Azure. Desde cómo configurar los clústeres de Spark, ejecutar trabajos y usar los notebooks con Scala o Python.

Deja una respuesta