Skip to main content

NiFi

Empty NiFi homepage

Apache NiFi is an open-source platform for automating and managing the flow of data between systems. It provides a web-based, drag-and-drop interface for designing, controlling, and monitoring data pipelines, which simplifies building complex data flows with minimal code.

NiFi supports a wide variety of data sources and destinations, including databases, cloud services, and APIs. Its visual interface allows users to easily create, configure, and connect processors for data ingestion, transformation, and routing.


Integrating NiFi with Ilum

info

To learn how to enable the NiFi deployment, refer to the Production page.

You can integrate NiFi with Ilum to:

  • Connect to external data sources.
  • Orchestrate data movement between Ilum and other systems.
  • Trigger Ilum workflows based on data events.
  • Transform and route data for downstream processing.

This guide demonstrates how to create a simple data pipeline that lists files from a MinIO bucket and uses them to trigger an interactive Spark job in Ilum.

Get Started: NiFi & MinIO

First, let's configure NiFi to access Ilum's default MinIO instance.

  1. Create a Controller Service for AWS Credentials. Right-click on the NiFi canvas and select Controller Services. In the new window, click the + icon and add an AWSCredentialsProviderControllerService. Controller Services

  2. Configure the credentials. Click the Edit icon (pencil) for the new service. In the Properties tab, set the Access Key ID and Secret Access Key. The default MinIO instance uses minioadmin for both. Apply the changes. Placing the key of the credentials provider

  3. Enable the service. Click the Enable icon (lightning bolt) and confirm the action. Enabling the credentials provider

  4. Add a ListS3 processor. Drag the processor icon from the top bar onto the canvas and select the ListS3 processor. Default S3 list processor on the canvas

  5. Configure the ListS3 processor. Right-click the processor and select Configure. In the Properties tab, set the following values:

    • Bucket: ilum-files (This bucket should contain files to ensure a non-empty response).
    • AWS Credentials Provider Service: Select the controller service created earlier.
    • Endpoint Override URL: http://ilum-minio:9000 (Required since we are using a local S3-compatible service).

    Apply the changes. You can use the Verify button in the processor’s configuration to confirm that the connection is successful. Verification of the values

Get Started part 2: NiFi & Ilum

Now that NiFi can list files from MinIO, let’s process these files using an interactive Spark job in Ilum.

  1. Create an Ilum Spark Service. Create a Python file with the following IlumJob:

    from ilum.api import IlumJob

    class PathPrinter(IlumJob):
    def run(self, spark, config):
    path = config["path"]
    print(f"Got a new path: {path}")

    This job simply prints the file path it receives. In a real-world scenario, you would perform more complex data processing.

    Next, create a new Ilum service, making sure to:

    • Upload the Python file in the pyFiles field.
    • Set the language to Python.
  2. Get the Job Execution URL. After creating the service, navigate to the Execute Job tab and copy the job execution URL. Execute job tab The URL will look similar to http://ilum-core:9888/api/v1/group/20251015-1117-tq8pr/job/execute

  3. Add a ReplaceText processor in NiFi. This processor will transform the output of ListS3 into a JSON payload for Ilum. Add a ReplaceText processor and connect it to the success relationship of the ListS3 processor. Configure it with these properties:

    • Replacement Strategy: Always Replace
    • Replacement Value:
      {
      "type": "interactive_job_execute",
      "jobClass": "ilumJob.PathPrinter",
      "jobConfig": {
      "path": "${filename}"
      }
      }
    • Evaluation Mode: Entire text

    In the Relationships tab, auto-terminate the failure relationship, as it will not be used. The relationships tab

  4. Add an InvokeHTTP processor. This processor will send the JSON payload to Ilum. Add an InvokeHTTP processor and connect it to the success relationship of the ReplaceText processor. Configure it with:

    • HTTP Method: POST
    • Remote URL: The Ilum job execution URL you copied earlier.
    • Content-Type: application/json
  5. Finalize and run the flow. Auto-terminate all unused relationships for the InvokeHTTP processor and start all processors. Final flow look The final flow should look like this

If the flow is configured correctly, you will see requests being sent to the Ilum service. List of job's requests

Additionally, the service’s job log in Ilum will show the printed file paths. The log of the job

This example demonstrates how to build a simple yet powerful data pipeline connecting NiFi and Ilum. By leveraging NiFi’s data orchestration capabilities and Ilum’s distributed processing power with Spark, you can create robust, scalable, and event-driven workflows. This integration allows you to automate data ingestion, transformation, and processing, unlocking new possibilities for your data-intensive applications.