Desarrollo de Procesadores Personalizados en Apache NiFi

  • Tiempo de lectura:11 minutos de lectura
  • Autor de la entrada:
  • Última modificación de la entrada:14/07/2024

Apache NiFi es una herramienta de procesamiento de datos de código abierto que permite el flujo de datos entre diferentes sistemas y aplicaciones de manera automatizada. Con su interfaz visual y su capacidad de integración, Apache NiFi se ha convertido en una herramienta popular en el mundo del big data. Los procesadores personalizados en Apache NiFi nos permitirán extender fácilmente su funcionalidad.

Procesadores personalizados apache nifi

Una de las características clave de NiFi es su capacidad de personalización y extensibilidad. En este artículo, exploraremos el desarrollo de procesadores personalizados en NiFi y cómo esta funcionalidad puede ayudar a adaptar la herramienta a las necesidades específicas de cada proyecto de procesamiento de datos.

¿Qué son los procesadores personalizados y por qué son útiles en Apache NiFi?

Los procesadores personalizados son componentes que permiten realizar tareas específicas de procesamiento de datos que no están incluidas en los procesadores predeterminados. Estos procesadores personalizados se pueden desarrollar utilizando el lenguaje de programación Java.

Los procesadores personalizados también pueden ayudar a mejorar la eficiencia y la velocidad del procesamiento de datos al eliminar la necesidad de transferir los datos a un sistema externo para realizar una tarea específica. Así podemos personalizar el flujo de trabajo de datos y mejorar el rendimiento.

Requisitos para el desarrollo de procesadores personalizados en Apache NiFi

Para desarrollar procesadores personalizados en Apache NiFi, es necesario cumplir con ciertos requisitos técnicos. En primer lugar, es necesario contar con conocimientos sólidos en programación Java, ya que los procesadores personalizados se escriben en este lenguaje.

Además, se requiere tener una comprensión completa de la arquitectura de Apache NiFi, incluyendo su modelo de programación basado en flujos y su API de desarrollo de procesadores. También es importante tener experiencia en el uso de herramientas como Apache Maven y Git para la gestión de dependencias y control de versiones del proyecto.

Creación de un procesador personalizado desde cero en Apache NiFi

Clonar un procesador existente y personalizarlo según las necesidades es una manera fácil de crear un procesador personalizado en Apache NiFi.

Lo primero que debemos hacer es definir la funcionalidad que queremos que tenga. Para ello, debemos tener en cuenta el tipo de datos que queremos procesar y qué acciones queremos llevar a cabo con ellos.

Una vez definida la funcionalidad, podemos proceder a escribir el código del procesador personalizado. Este proceso implica crear un nuevo proyecto Maven, definir los detalles del procesador, agregar propiedades y crear la lógica de procesamiento en el código. Luego, deberemos compilar el proyecto y crear un paquete JAR para el procesador personalizado.

Finalmente, podemos subir el archivo JAR al repositorio de procesadores personalizados en Apache NiFi y utilizar nuestro nuevo procesador en nuestros flujos de datos.

Ejemplo de Procesador Personalizado

Este procesador personalizado de ejemplo obtiene un flujo de archivo y realiza alguna operación con una propiedad proporcionada por el usuario. Después, transfiere el flujo a la relación «success» o «failure» dependiendo del resultado de la operación. Este es solo un ejemplo básico, pero puedes personalizarlo para adaptarlo a tus necesidades específicas.

@Tags({"example"})
@CapabilityDescription("Procesador personalizado")
public class ExampleProcessor extends AbstractProcessor {

    public static final PropertyDescriptor EXAMPLE_PROPERTY = new PropertyDescriptor
            .Builder().name("Example Property")
            .description("An example property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship SUCCESS = new Relationship.Builder()
            .name("success")
            .description("Success relationship")
            .build();

    public static final Relationship FAILURE = new Relationship.Builder()
            .name("failure")
            .description("Failure relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(EXAMPLE_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS);
        relationships.add(FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final FlowFile flowfile = session.get();
        if (flowfile == null) {
            return;
        }

        final String exampleProperty = context.getProperty(EXAMPLE_PROPERTY).getValue();

        try {
            // Aquí podríamos implementar la lógica de nuestro procesador
            // como manipular el flowfile y usar la variable exampleProperty

            session.transfer(flowfile, SUCCESS);
        } catch (Exception e) {
            getLogger().error("Failed to process ExampleProcessor: " + e.getMessage(), e);
            session.transfer(flowfile, FAILURE);
        }
    }

    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }
}

Integración de procesadores personalizados en un flujo de trabajo en Apache NiFi

Una vez que se ha desarrollado un procesador personalizado en Apache NiFi, podremos integrarlo en un flujo de trabajo. Para hacer esto, es necesario crear una instancia del procesador personalizado dentro del flujo de trabajo.

  1. Agregar el procesador personalizado al repositorio de NiFi: Antes de poder utilizar el procesador personalizado en un flujo de trabajo, es necesario agregarlo al repositorio de NiFi. Para hacer esto, se debe copiar el archivo .nar del procesador personalizado en la carpeta /lib dentro del directorio de instalación de NiFi.
  2. Agregar el procesador personalizado al panel de herramientas de NiFi: Una vez que se ha agregado el procesador personalizado al repositorio de NiFi, se debe agregar al panel de herramientas para poder utilizarlo en un flujo de trabajo. Para hacer esto, se debe abrir la interfaz de NiFi y seleccionar la pestaña «Procesadores». Luego, hacer clic en el botón «+» y seleccionar el procesador personalizado que se agregó anteriormente al repositorio.
  3. Configurar el procesador personalizado: Después de agregar el procesador personalizado al lienzo de trabajo podemos configurarlo según sea necesario. Esto puede incluir la configuración de propiedades específicas del procesador personalizado, como las entradas y salidas de datos y los parámetros de configuración.

Ejemplos de casos de uso para procesadores personalizados en Apache NiFi

Los procesadores personalizados en Apache NiFi son muy útiles para implementar algunos casos de uso específicos:

  • Enmascaramiento de datos sensibles: mediante la creación de un procesador personalizado, se pueden enmascarar campos sensibles en los datos, como números de tarjetas de crédito o identificadores personales, para proteger la privacidad de los usuarios.
  • Integración con sistemas externos: los procesadores personalizados pueden ser utilizados para integrar flujos de trabajo de Apache NiFi con sistemas externos, como bases de datos o servicios web, para realizar operaciones específicas en los datos que se procesan.
  • Uso de librerías personalizadas: nos permitirán implementar código con dependencias de librerías específicas.

Siguientes Pasos Con Apache NiFi: Curso

El curso de Apache NiFi te enseñará esta tecnología desde cero. De esta forma podrás implementar tus pipelines de movimiento y de ingesta de datos. Al finalizar el curso podrás usar Apache NiFi en un entorno profesional. Se compone de clases teóricas y prácticas para que aprendas a tu ritmo, de forma incremental y paso a paso. ¡Aprovecha el cupón del enlace a continuación!

Apache NiFi desde cero: La guía esencial

Preguntas Frecuentes

¿Qué es un archivo NAR en Apache NiFi?

Un archivo NAR (NiFi Archive) es un paquete que contiene los componentes necesarios para ejecutar un procesador personalizado en NiFi. Incluye las clases compiladas, dependencias y metadatos requeridos para integrar el procesador con el framework de NiFi.

¿Cómo se maneja la configuración en un procesador personalizado?

En un procesador personalizado, se manejan las configuraciones utilizando propiedades de procesador. Estas propiedades pueden ser definidas y configuradas a través de la interfaz gráfica de NiFi, y se accede a ellas mediante la API durante la ejecución del procesador.

¿Qué son los controladores de servicios en NiFi y cómo se relacionan con los procesadores personalizados?

Los controladores de servicios en NiFi son componentes que proporcionan servicios compartidos, como conexiones a bases de datos o acceso a archivos, que pueden ser utilizados por múltiples procesadores. Los procesadores personalizados pueden utilizar estos controladores para acceder a recursos externos de manera centralizada y eficiente.

¿Cómo se implementa la lógica de procesamiento en un procesador personalizado?

La lógica de procesamiento en un procesador personalizado se implementa sobrescribiendo el método onTrigger de la clase base AbstractProcessor. Este método es llamado cada vez que el procesador recibe datos, y aquí se define la lógica para procesar esos datos.

Deja una respuesta