Streaming & CDC Ingestion with Kafka
Architecture Overview
Ilum provides a natural foundation for real-time streaming workloads. Apache Kafka is already a core dependency of the ilum platform, used internally for service communication. This means every ilum deployment has a Kafka cluster available that can also serve as the streaming backbone for your data pipelines.
The streaming architecture follows a three-layer pattern:
- Kafka acts as the durable, distributed message bus. Data producers (applications, CDC connectors, IoT devices) publish events to Kafka topics.
- Spark Structured Streaming or Apache Flink runs as a long-lived ilum job that continuously reads from Kafka topics, transforms the data, and writes results downstream.
- Iceberg or Delta Lake serves as the sink table format, providing ACID transactions, schema evolution, and time-travel queries on top of object storage (S3, MinIO, HDFS).
This combination delivers low-latency, exactly-once data ingestion pipelines that are fully managed through the ilum platform.
Prerequisites
Before building a streaming pipeline, ensure the following components are in place:
- An ilum cluster with Kafka enabled (this is the default configuration).
- Iceberg or Delta Lake table format configured in your catalog. You can verify table availability through the SQL Viewer.
- A Kafka topic populated with data (or a producer actively writing to one).
- Familiarity with submitting jobs through ilum. See Run a Simple Spark Job for an introduction.
Connecting Spark Jobs to Kafka
Spark Structured Streaming reads from Kafka using the kafka format. You must include the Kafka connector package when submitting your job.
Required Spark Package
Add the following Maven coordinate to your job's Spark Packages configuration in the ilum UI:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
When submitting through the ilum UI, add this under the Resources tab in the Spark Packages field. Ilum will resolve and distribute the dependency automatically.
Reading from Kafka
The core pattern for connecting to a Kafka topic uses readStream with the kafka format.
PySpark:
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
)
# Kafka messages are binary key/value pairs. Cast to string for processing.
parsed = df.selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"offset",
"timestamp"
)
Scala:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
val parsed = df.selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"offset",
"timestamp"
)
The default Kafka bootstrap server within an ilum cluster is ilum-kafka:9092. If your Kafka deployment uses a different service name or port, adjust accordingly.
Authentication and Security
For Kafka clusters configured with SASL or TLS, add the corresponding properties to your Spark configuration:
df = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9093")
.option("subscribe", "events")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "SCRAM-SHA-512")
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule required '
'username="user" password="password";')
.option("kafka.ssl.truststore.location", "/opt/spark/truststore.jks")
.option("kafka.ssl.truststore.password", "changeit")
.load()
)
Avoid hardcoding credentials in job code. Use Kubernetes secrets mounted as files or environment variables, and reference them in your Spark configuration.
Exactly-Once Semantics
Structured Streaming achieves exactly-once processing guarantees through checkpointing and idempotent writes.
Checkpointing Configuration
Checkpoints track the processing state, including Kafka offsets, so that a restarted job resumes from where it left off without data loss or duplication. Store checkpoints on durable object storage.
query = (
parsed.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "s3a://my-bucket/checkpoints/events-pipeline")
.toTable("spark_catalog.default.events")
)
Each streaming query must have a unique checkpoint location. Reusing a checkpoint path across different queries will cause data corruption.
Idempotent Writes to Iceberg
Iceberg tables support atomic commits, which means each micro-batch is written as a single transaction. If a batch fails partway through, the partial write is rolled back. Combined with checkpointing, this provides end-to-end exactly-once delivery.
Failure Recovery
When a streaming job fails and is restarted (manually or via ilum's Max Retries setting):
- Spark reads the latest checkpoint to determine the last successfully committed Kafka offset.
- Processing resumes from the next offset, skipping already-committed data.
- No manual intervention or offset management is required.
Set Max Retries on your ilum job to a reasonable value (e.g., 3-5) so that transient failures are automatically recovered without operator intervention.
CDC Patterns
Change Data Capture (CDC) pipelines replicate changes from operational databases (PostgreSQL, MySQL, MongoDB) into your lakehouse. The typical architecture is:
Source DB --> Debezium --> Kafka --> Structured Streaming --> Iceberg/Delta
Debezium CDC Source
Debezium is an open-source CDC platform that captures row-level changes from databases and publishes them to Kafka topics. A typical Debezium deployment runs as a Kafka Connect connector alongside your ilum Kafka cluster.
Key Debezium configuration properties:
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=postgres-host
database.port=5432
database.user=debezium
database.password=secret
database.dbname=mydb
database.server.name=mydb
table.include.list=public.orders,public.customers
topic.prefix=cdc
This produces Kafka topics like cdc.public.orders and cdc.public.customers, each containing insert, update, and delete events.
Upsert with MERGE INTO
CDC events include inserts, updates, and deletes. To apply these changes to an Iceberg or Delta table, use the foreachBatch sink with a MERGE INTO statement.
PySpark example:
def upsert_to_iceberg(batch_df, batch_id):
# Register the micro-batch as a temporary view
batch_df.createOrReplaceTempView("cdc_batch")
batch_df.sparkSession.sql("""
MERGE INTO spark_catalog.default.orders AS target
USING cdc_batch AS source
ON target.id = source.id
WHEN MATCHED AND source.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND source.op != 'd' THEN INSERT *
""")
query = (
cdc_stream.writeStream
.foreachBatch(upsert_to_iceberg)
.option("checkpointLocation", "s3a://my-bucket/checkpoints/orders-cdc")
.start()
)
The op field in Debezium events indicates the operation type: c (create), u (update), d (delete), and r (read/snapshot). Adjust the MERGE logic based on your Debezium envelope format.
Example: End-to-End Streaming Pipeline
The following complete example reads JSON events from a Kafka topic, parses them, and writes them to an Iceberg table.
Complete PySpark Code
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder \
.appName("streaming-ingestion-pipeline") \
.getOrCreate()
# Define the expected JSON schema
schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("action", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("event_time", TimestampType(), True),
])
# Read from Kafka
raw_stream = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "ilum-kafka:9092")
.option("subscribe", "user-events")
.option("startingOffsets", "earliest")
.load()
)
# Parse JSON values
parsed_stream = (
raw_stream
.selectExpr("CAST(value AS STRING) AS json_str")
.select(from_json(col("json_str"), schema).alias("data"))
.select("data.*")
.filter(col("event_id").isNotNull())
)
# Write to Iceberg table
query = (
parsed_stream.writeStream
.format("iceberg")
.outputMode("append")
.option("checkpointLocation", "s3a://my-bucket/checkpoints/user-events")
.toTable("spark_catalog.default.user_events")
)
query.awaitTermination()
Submitting as an Ilum Job
- Save the script above as
streaming_pipeline.py. - In the ilum UI, create a new job with Job Type set to
Spark Joband Language set toPython. - Upload
streaming_pipeline.pyunder the Resources tab. - Add the Kafka connector package under Spark Packages:
org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 - Under the Configuration tab, add the following Spark properties for Iceberg support:
spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.spark_catalog.type = hive - Submit the job. For detailed submission steps, see Run a Simple Spark Job.
Monitoring Streaming Jobs
Streaming jobs run continuously, so monitoring is essential:
- Ilum UI: The job will appear with a
RUNNINGstatus. Use the Logs tab to monitor micro-batch progress and throughput. - Spark UI: Access the Spark UI through ilum to view the Structured Streaming tab, which shows input rate, processing rate, and batch duration metrics.
- Max Retries: Configure automatic restart on failure to maintain pipeline uptime.
For long-running streaming jobs, allocate sufficient driver and executor memory to handle state accumulation. Monitor memory usage through the ilum UI and adjust resources as needed.
Flink
Apache Flink is supported as a streaming engine in ilum, complementing Spark Structured Streaming. Flink provides true event-at-a-time processing with lower latency and is well suited for complex event processing (CEP), session windowing, and use cases where sub-second response times are critical.
Flink on ilum
Ilum deploys Flink jobs on Kubernetes using the Flink Kubernetes Operator. Flink applications are submitted through the ilum UI or REST API just like Spark jobs. Ilum handles pod lifecycle management, checkpoint storage configuration, and integration with the shared Kafka cluster and catalog layer.
Key architectural points:
- Job Manager and Task Manager pods are orchestrated by Kubernetes, with resource allocation managed through ilum's cluster configuration.
- Flink jobs connect to the same Kafka cluster used by ilum internally (
ilum-kafka:9092by default). - Flink integrates with the Hive Metastore and Iceberg catalog, allowing Flink and Spark to read and write the same tables.
- Checkpoints and savepoints are stored on the ilum default storage (S3/MinIO/HDFS), enabling job recovery and upgrades without data loss.
Flink SQL: Kafka to Iceberg
Flink SQL provides a declarative way to build streaming pipelines. The following example reads JSON events from a Kafka topic and writes them to an Iceberg table.
-- Register the Kafka source table
CREATE TABLE kafka_events (
event_id STRING,
user_id STRING,
action STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'ilum-kafka:9092',
'properties.group.id' = 'flink-ingestion',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
);
-- Register the Iceberg sink table
CREATE TABLE iceberg_events (
event_id STRING,
user_id STRING,
action STRING,
amount DOUBLE,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://ilum-hive-metastore:9083',
'warehouse' = 's3a://ilum-data/warehouse'
);
-- Continuous streaming insert
INSERT INTO iceberg_events
SELECT event_id, user_id, action, amount, event_time
FROM kafka_events
WHERE event_id IS NOT NULL;
Flink SQL jobs can be submitted as SQL scripts through the ilum UI. Set the Job Type to Flink and provide the SQL statements in the code editor.
Flink DataStream API
For more complex processing logic, use the Flink DataStream API. The following Java example reads from Kafka, applies a tumbling window aggregation, and writes the results to an Iceberg table.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // checkpoint every 60 seconds
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("ilum-kafka:9092")
.setTopics("user-events")
.setGroupId("flink-datastream")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "Kafka Source")
.map(json -> parseEvent(json)) // custom parsing logic
.keyBy(event -> event.getUserId())
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.sum("amount")
.sinkTo(icebergSink); // Iceberg sink connector
env.execute("flink-aggregation-pipeline");
Flink CDC Connector
Flink provides a native CDC connector that captures database changes directly without requiring a separate Debezium deployment. This simplifies the CDC pipeline from a three-component architecture to two.
Source DB --> Flink CDC Connector --> Iceberg/Delta
Flink SQL example with PostgreSQL CDC:
-- Register a CDC source directly from PostgreSQL
CREATE TABLE orders_cdc (
id INT,
customer_id INT,
product STRING,
amount DOUBLE,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-host',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot'
);
-- Write changes directly to Iceberg with upsert semantics
INSERT INTO iceberg_orders
SELECT id, customer_id, product, amount, updated_at
FROM orders_cdc;
Flink CDC connectors support PostgreSQL, MySQL, MongoDB, Oracle, and SQL Server. Each connector requires the corresponding Flink CDC library to be included in the job dependencies.
Exactly-Once Guarantees with Flink
Flink achieves exactly-once semantics through its distributed snapshot (Chandy-Lamport) checkpointing algorithm:
- Checkpoint barriers flow through the data stream, ensuring consistent state snapshots across all operators.
- State backends (RocksDB or heap-based) persist operator state to durable storage (S3/MinIO) on each checkpoint.
- Two-phase commit sinks (Kafka, Iceberg) ensure that output records are committed atomically with the checkpoint.
Configure checkpointing in your Flink job:
env.enableCheckpointing(60000); // checkpoint interval (ms)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointStorage("s3://ilum-data/flink-checkpoints");
For Flink SQL jobs, set checkpointing via Flink configuration properties:
execution.checkpointing.interval=60s
execution.checkpointing.mode=EXACTLY_ONCE
state.checkpoints.dir=s3://ilum-data/flink-checkpoints
When to Use Flink vs Spark Structured Streaming
Both engines are production-ready for streaming workloads on ilum. Choose based on your requirements:
| Aspect | Spark Structured Streaming | Apache Flink |
|---|---|---|
| Processing Model | Micro-batch (default) or continuous | True event-at-a-time |
| Latency | Seconds (micro-batch) | Milliseconds |
| State Management | Limited by micro-batch boundaries | Fine-grained, key-partitioned state |
| Windowing | Tumbling, sliding, session | Tumbling, sliding, session, custom |
| Complex Event Processing | Basic | Advanced (CEP library) |
| CDC | Via Debezium + Kafka | Native CDC connectors |
| Batch + Stream Unification | Strong (same DataFrame API) | Strong (same DataStream/Table API) |
| Catalog Integration | Hive, Nessie, Unity, Iceberg, Delta, Hudi | Hive, Iceberg |
| Best For | ETL pipelines, unified batch/stream, existing Spark codebases | Low-latency event processing, CEP, native CDC |
General guidance:
- Use Spark Structured Streaming when you have existing Spark batch jobs and want a unified batch/streaming codebase, or when micro-batch latency (seconds) is acceptable.
- Use Flink when you need sub-second latency, complex event processing patterns, native CDC without Debezium, or fine-grained state management with event-time processing.