En esta entrada comprenderás qué son los índices secundarios o los subíndices en Apache Spark y una librería open source llamada Hyperspace que te ayudará a crearlos fácilmente.
Contenidos
¿Qué es Hyperspace y por qué usar índices secundarios en Spark?
Hyperspace es una librería open source para Apache Spark desarrollada por Microsoft e integrada en Synapse.
Trabaja sobre ficheros parquet, CSV y Json sin modificar la tabla, aunque genera información adicional. También soporta el formato Delta Lake.
Comúnmente, los ficheros parquet se particionan por un campo fecha. Sin embargo, luego se opera con otras columnas. Por ejemplo, en procesos de medición de calidad de las tablas, se puede necesitar realizar operaciones por cliente. Al estar particionado por fecha, se necesita recorrer toda la tabla y es ineficiente.
Hyperspace también evita tener que hacer una copia de la tabla con otro índice.
¿Cómo funciona Hyperspace en Spark?
Hyperspace se puede usar en cualquier entorno Spark cargando la librería. Quizá lo más sencillo es usarlo en nuestros notebooks de Databricks. Para ello, deberemos cargar la librería desde la configuración del clúster, como se muestra en la imagen. Una vez cargada, podemos arrancar el clúster y estará lista para usarla.
Después, podemos importar la librería y crear una variable Hyperspace en nuestro programa
val df = spark.read.parquet("FileStore/financial_dataset/")
import com.microsoft.hyperspace._
import com.microsoft.hyperspace.index._
val hs = new Hyperspace(spark)
hs.indexes.show
Para crear un índice sobre un dataset, usaremos la función createIndex. IndexedColumns son los nombres de las columnas usadas para join o filtros. IncludedColumns son los nombres de las columnas usadas para las operaciones:
hs.createIndex(df, IndexConfig("index", indexedColumns = Seq("nameOrig"), includedColumns = Seq("step"))) hs.indexes.show +-----+--------------+---------------+----------+--------------------+--------------------+------+ | name|indexedColumns|includedColumns|numBuckets| schema| indexLocation| state| +-----+--------------+---------------+----------+--------------------+--------------------+------+ |index| [nameOrig]| [step]| 200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE| +-----+--------------+---------------+----------+--------------------+--------------------+------+
Ahora, ya podemos crear una consulta y ejecutarla activando los índices:
val query = df.filter(df("nameOrig") === "C1305486145").select("step") hs.explain(query, verbose = true) +-----+--------------+---------------+----------+--------------------+--------------------+------+ | name|indexedColumns|includedColumns|numBuckets| schema| indexLocation| state| +-----+--------------+---------------+----------+--------------------+--------------------+------+ |index| [nameOrig]| [step]| 200|{"type":"struct",...|dbfs:/user/hive/w...|ACTIVE| +-----+--------------+---------------+----------+--------------------+--------------------+------+ ============================================================= Plan with indexes: ============================================================= Project [step#6517] +- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145)) <----+- FileScan Hyperspace(Type: CI, Name: index, LogVersion: 1) [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/indexes/index/v__=0/part-00000-tid-515507262485414933..., PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string,step:int>----> ============================================================= Plan without indexes: ============================================================= Project [step#6517] +- Filter (isnotnull(nameOrig#6509) && (nameOrig#6509 = C1305486145)) <----+- FileScan parquet [nameOrig#6509,step#6517] Batched: true, DataFilters: [isnotnull(nameOrig#6509), (nameOrig#6509 = C1305486145)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/Filestore/financial_dataset], PartitionCount: 743, PartitionFilters: [], PushedFilters: [IsNotNull(nameOrig), EqualTo(nameOrig,C1305486145)], ReadSchema: struct<nameOrig:string>----> ============================================================= Indexes used: ============================================================= index:dbfs:/user/hive/warehouse/indexes/index/v__=0 ============================================================= Physical operator stats: ============================================================= +------------------------------------------------------+-------------------+------------------+----------+ | Physical Operator|Hyperspace Disabled|Hyperspace Enabled|Difference| +------------------------------------------------------+-------------------+------------------+----------+ |*Scan Hyperspace(Type: CI, Name: index, LogVersion: 1)| 0| 1| 1| | *Scan parquet| 1| 0| -1| | Filter| 1| 1| 0| | Project| 1| 1| 0| | WholeStageCodegen| 1| 1| 0| +------------------------------------------------------+-------------------+------------------+----------+ query2: org.apache.spark.sql.DataFrame = [step: int] spark.enableHyperspace query.show
Por último, si queremos eliminar el índice, deberemos hacer un deleteIndex seguido de vacuumIndex para eliminarlo permanentemente del sistema de ficheros.
hs.refreshIndex("index")
hs.deleteIndex("index")
hs.vacuumIndex("index")
Con esto hemos revisado brevemente cómo usar índices secundarios en nuestro entorno de Databricks sobre ficheros parquet.
Siguientes Pasos, Formación y Cursos de Databricks
Aquí tienes mis dos cursos recomendados para que aprendas de forma eficiente Databricks, de los mejores que encontrarás online para cualquier nivel:
Azure Databricks & Spark For Data Engineers
Este curso te preparará para comprender y sacar todo el partido posible al ecosistema de Databricks. Se centra en Azure y podrás seguir de forma gratuita todas las lecciones prácticas. Podrás aprender todos los fundamentos de Databricks y la API de Apache Spark para desarrollar trabajos de transformación de datos.
Azure Databricks para Ingeniería de Datos
Este curso te preparará para la certificación de Databricks Certified Data Engineer Associate. Incluye prácticas que no pueden faltar antes de que hagas el examen. Recuerda que si tienes algún conocimiento de SQL y Python te acelerará el aprendizaje.
También, tienes este libro disponible en Amazon: Beginning Apache Spark Using Azure Databricks