Skip to main content

Streaming & CDC Ingestion with Kafka

Architecture Overview

Ilum provides a natural foundation for real-time streaming workloads. Apache Kafka is already a core dependency of the ilum platform, used internally for service communication. This means every ilum deployment has a Kafka cluster available that can also serve as the streaming backbone for your data pipelines.

The streaming architecture follows a three-layer pattern:

  1. Kafka acts as the durable, distributed message bus. Data producers (applications, CDC connectors, IoT devices) publish events to Kafka topics.
  2. Spark Structured Streaming or Apache Flink runs as a long-lived ilum job that continuously reads from Kafka topics, transforms the data, and writes results downstream.
  3. Iceberg or Delta Lake serves as the sink table format, providing ACID transactions, schema evolution, and time-travel queries on top of object storage (S3, MinIO, HDFS).

This combination delivers low-latency, exactly-once data ingestion pipelines that are fully managed through the ilum platform.

Prerequisites

Before building a streaming pipeline, ensure the following components are in place:

  • An ilum cluster with Kafka enabled (this is the default configuration).
  • Iceberg or Delta Lake table format configured in your catalog. You can verify table availability through the SQL Viewer.
  • A Kafka topic populated with data (or a producer actively writing to one).
  • Familiarity with submitting jobs through ilum. See Run a Simple Spark Job for an introduction.

Connecting Spark Jobs to Kafka

Spark Structured Streaming reads from Kafka using the kafka format. You must include the Kafka connector package when submitting your job.

Required Spark Package

Add the following Maven coordinate to your job's Spark Packages configuration in the ilum UI:

org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
tip

When submitting through the ilum UI, add this under the Resources tab in the Spark Packages field. Ilum will resolve and distribute the dependency automatically.

Reading from Kafka

The core pattern for connecting to a Kafka topic uses readStream with the kafka format.

PySpark:

df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
)

# Kafka messages are binary key/value pairs. Cast to string for processing.
parsed = df.selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"offset",
"timestamp"
)

Scala:

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()

val parsed = df.selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"offset",
"timestamp"
)
note

The default Kafka bootstrap server within an ilum cluster is ilum-kafka:9092. If your Kafka deployment uses a different service name or port, adjust accordingly.

Authentication and Security

For Kafka clusters configured with SASL or TLS, add the corresponding properties to your Spark configuration:

df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9093")
.option("subscribe", "events")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule required '
'username="user" password="password";')
.option("kafka.ssl.truststore.location", "/opt/spark/truststore.jks")
.option("kafka.ssl.truststore.password", "changeit")
.load()
)
warning

Avoid hardcoding credentials in job code. Use Kubernetes secrets mounted as files or environment variables, and reference them in your Spark configuration.

Exactly-Once Semantics

Structured Streaming achieves exactly-once processing guarantees through checkpointing and idempotent writes.

Checkpointing Configuration

Checkpoints track the processing state, including Kafka offsets, so that a restarted job resumes from where it left off without data loss or duplication. Store checkpoints on durable object storage.

query = (
parsed.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "s3a://my-bucket/checkpoints/events-pipeline")
.toTable("spark_catalog.default.events")
)
warning

Each streaming query must have a unique checkpoint location. Reusing a checkpoint path across different queries will cause data corruption.

Idempotent Writes to Iceberg

Iceberg tables support atomic commits, which means each micro-batch is written as a single transaction. If a batch fails partway through, the partial write is rolled back. Combined with checkpointing, this provides end-to-end exactly-once delivery.

Failure Recovery

When a streaming job fails and is restarted (manually or via ilum's Max Retries setting):

  1. Spark reads the latest checkpoint to determine the last successfully committed Kafka offset.
  2. Processing resumes from the next offset, skipping already-committed data.
  3. No manual intervention or offset management is required.
tip

Set Max Retries on your ilum job to a reasonable value (e.g., 3-5) so that transient failures are automatically recovered without operator intervention.

CDC Patterns

Change Data Capture (CDC) pipelines replicate changes from operational databases (PostgreSQL, MySQL, MongoDB) into your lakehouse. The typical architecture is:

Source DB --> Debezium --> Kafka --> Structured Streaming --> Iceberg/Delta

Debezium CDC Source

Debezium is an open-source CDC platform that captures row-level changes from databases and publishes them to Kafka topics. A typical Debezium deployment runs as a Kafka Connect connector alongside your ilum Kafka cluster.

Key Debezium configuration properties:

connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres-host
database.port=5432
database.user=debezium
database.password=secret
database.dbname=mydb
database.server.name=mydb
table.include.list=public.orders,public.customers
topic.prefix=cdc

This produces Kafka topics like cdc.public.orders and cdc.public.customers, each containing insert, update, and delete events.

Upsert with MERGE INTO

CDC events include inserts, updates, and deletes. To apply these changes to an Iceberg or Delta table, use the foreachBatch sink with a MERGE INTO statement.

PySpark example:

def upsert_to_iceberg(batch_df, batch_id):
# Register the micro-batch as a temporary view
batch_df.createOrReplaceTempView("cdc_batch")

batch_df.sparkSession.sql("""
MERGE INTO spark_catalog.default.orders AS target
USING cdc_batch AS source
ON target.id = source.id
WHEN MATCHED AND source.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND source.op != 'd' THEN INSERT *
""")

query = (
cdc_stream.writeStream
.foreachBatch(upsert_to_iceberg)
.option("checkpointLocation", "s3a://my-bucket/checkpoints/orders-cdc")
.start()
)
note

The op field in Debezium events indicates the operation type: c (create), u (update), d (delete), and r (read/snapshot). Adjust the MERGE logic based on your Debezium envelope format.

Example: End-to-End Streaming Pipeline

The following complete example reads JSON events from a Kafka topic, parses them, and writes them to an Iceberg table.

Complete PySpark Code

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder \
.appName("streaming-ingestion-pipeline") \
.getOrCreate()

# Define the expected JSON schema
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("action", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("event_time", TimestampType(), True),
])

# Read from Kafka
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.load()
)

# Parse JSON values
parsed_stream = (
raw_stream
.selectExpr("CAST(value AS STRING) AS json_str")
.select(from_json(col("json_str"), schema).alias("data"))
.select("data.*")
.filter(col("event_id").isNotNull())
)

# Write to Iceberg table
query = (
parsed_stream.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "s3a://my-bucket/checkpoints/user-events")
.toTable("spark_catalog.default.user_events")
)

query.awaitTermination()

Submitting as an Ilum Job

  1. Save the script above as streaming_pipeline.py.
  2. In the ilum UI, create a new job with Job Type set to Spark Job and Language set to Python.
  3. Upload streaming_pipeline.py under the Resources tab.
  4. Add the Kafka connector package under Spark Packages:
    org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
  5. Under the Configuration tab, add the following Spark properties for Iceberg support:
    spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.spark_catalog.type = hive
  6. Submit the job. For detailed submission steps, see Run a Simple Spark Job.

Monitoring Streaming Jobs

Streaming jobs run continuously, so monitoring is essential:

  • Ilum UI: The job will appear with a RUNNING status. Use the Logs tab to monitor micro-batch progress and throughput.
  • Spark UI: Access the Spark UI through ilum to view the Structured Streaming tab, which shows input rate, processing rate, and batch duration metrics.
  • Max Retries: Configure automatic restart on failure to maintain pipeline uptime.
tip

For long-running streaming jobs, allocate sufficient driver and executor memory to handle state accumulation. Monitor memory usage through the ilum UI and adjust resources as needed.

Apache Flink is supported as a streaming engine in ilum, complementing Spark Structured Streaming. Flink provides true event-at-a-time processing with lower latency and is well suited for complex event processing (CEP), session windowing, and use cases where sub-second response times are critical.

Ilum deploys Flink jobs on Kubernetes using the Flink Kubernetes Operator. Flink applications are submitted through the ilum UI or REST API just like Spark jobs. Ilum handles pod lifecycle management, checkpoint storage configuration, and integration with the shared Kafka cluster and catalog layer.

Key architectural points:

  • Job Manager and Task Manager pods are orchestrated by Kubernetes, with resource allocation managed through ilum's cluster configuration.
  • Flink jobs connect to the same Kafka cluster used by ilum internally (ilum-kafka:9092 by default).
  • Flink integrates with the Hive Metastore and Iceberg catalog, allowing Flink and Spark to read and write the same tables.
  • Checkpoints and savepoints are stored on the ilum default storage (S3/MinIO/HDFS), enabling job recovery and upgrades without data loss.

Flink SQL provides a declarative way to build streaming pipelines. The following example reads JSON events from a Kafka topic and writes them to an Iceberg table.

-- Register the Kafka source table
CREATE TABLE kafka_events (
event_id STRING,
user_id STRING,
action STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'ilum-kafka:9092',
'properties.group.id' = 'flink-ingestion',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);

-- Register the Iceberg sink table
CREATE TABLE iceberg_events (
event_id STRING,
user_id STRING,
action STRING,
amount DOUBLE,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://ilum-hive-metastore:9083',
'warehouse' = 's3a://ilum-data/warehouse'
);

-- Continuous streaming insert
INSERT INTO iceberg_events
SELECT event_id, user_id, action, amount, event_time
FROM kafka_events
WHERE event_id IS NOT NULL;
tip

Flink SQL jobs can be submitted as SQL scripts through the ilum UI. Set the Job Type to Flink and provide the SQL statements in the code editor.

For more complex processing logic, use the Flink DataStream API. The following Java example reads from Kafka, applies a tumbling window aggregation, and writes the results to an Iceberg table.

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // checkpoint every 60 seconds

KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("ilum-kafka:9092")
.setTopics("user-events")
.setGroupId("flink-datastream")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Kafka Source")
.map(json -> parseEvent(json)) // custom parsing logic
.keyBy(event -> event.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount")
.sinkTo(icebergSink); // Iceberg sink connector

env.execute("flink-aggregation-pipeline");

Flink provides a native CDC connector that captures database changes directly without requiring a separate Debezium deployment. This simplifies the CDC pipeline from a three-component architecture to two.

Source DB --> Flink CDC Connector --> Iceberg/Delta

Flink SQL example with PostgreSQL CDC:

-- Register a CDC source directly from PostgreSQL
CREATE TABLE orders_cdc (
id INT,
customer_id INT,
product STRING,
amount DOUBLE,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-host',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot'
);

-- Write changes directly to Iceberg with upsert semantics
INSERT INTO iceberg_orders
SELECT id, customer_id, product, amount, updated_at
FROM orders_cdc;
note

Flink CDC connectors support PostgreSQL, MySQL, MongoDB, Oracle, and SQL Server. Each connector requires the corresponding Flink CDC library to be included in the job dependencies.

Flink achieves exactly-once semantics through its distributed snapshot (Chandy-Lamport) checkpointing algorithm:

  1. Checkpoint barriers flow through the data stream, ensuring consistent state snapshots across all operators.
  2. State backends (RocksDB or heap-based) persist operator state to durable storage (S3/MinIO) on each checkpoint.
  3. Two-phase commit sinks (Kafka, Iceberg) ensure that output records are committed atomically with the checkpoint.

Configure checkpointing in your Flink job:

env.enableCheckpointing(60000);                          // checkpoint interval (ms)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointStorage("s3://ilum-data/flink-checkpoints");
tip

For Flink SQL jobs, set checkpointing via Flink configuration properties:

execution.checkpointing.interval=60s
execution.checkpointing.mode=EXACTLY_ONCE
state.checkpoints.dir=s3://ilum-data/flink-checkpoints

Both engines are production-ready for streaming workloads on ilum. Choose based on your requirements:

AspectSpark Structured StreamingApache Flink
Processing ModelMicro-batch (default) or continuousTrue event-at-a-time
LatencySeconds (micro-batch)Milliseconds
State ManagementLimited by micro-batch boundariesFine-grained, key-partitioned state
WindowingTumbling, sliding, sessionTumbling, sliding, session, custom
Complex Event ProcessingBasicAdvanced (CEP library)
CDCVia Debezium + KafkaNative CDC connectors
Batch + Stream UnificationStrong (same DataFrame API)Strong (same DataStream/Table API)
Catalog IntegrationHive, Nessie, Unity, Iceberg, Delta, HudiHive, Iceberg
Best ForETL pipelines, unified batch/stream, existing Spark codebasesLow-latency event processing, CEP, native CDC

General guidance:

  • Use Spark Structured Streaming when you have existing Spark batch jobs and want a unified batch/streaming codebase, or when micro-batch latency (seconds) is acceptable.
  • Use Flink when you need sub-second latency, complex event processing patterns, native CDC without Debezium, or fine-grained state management with event-time processing.