Performance & Query Optimization
Ilum combines three complementary execution engines -- Spark SQL, Trino, and DuckDB -- to address the full spectrum of data workloads, from large-scale ETL to sub-second ad-hoc analytics. This page covers the performance architecture behind each engine and the strategies available for optimizing query execution, caching, and workload isolation in Kubernetes environments.
Dual-Engine Architecture
Ilum's SQL Viewer provides a unified interface to three execution engines, each optimized for different workload profiles.
| Characteristic | Spark SQL | Trino | DuckDB |
|---|---|---|---|
| Architecture | Distributed (driver + executors) | Distributed (coordinator + workers) | Embedded (in-process) |
| Best For | Batch ETL, ML pipelines, streaming | Sub-second ad-hoc analytics | Local analytical queries, prototyping |
| Execution Model | Stage-based DAG with shuffle | Pipelined MPP | Vectorized push-based |
| Overhead | High (pod scheduling per job) | Medium (always-on coordinator) | Very low (no additional pods) |
| Scalability | Thousands of executors | Hundreds of workers | Single process |
When to Use Which Engine
- Spark SQL -- Use for batch ETL jobs processing terabytes of data, machine learning pipelines with MLlib, structured streaming workloads, and jobs requiring broad connector/format support.
- Trino -- Use for interactive analytics requiring sub-second response times, federated queries across multiple data sources, and concurrent ad-hoc queries from multiple analysts.
- DuckDB -- Use for quick data exploration on moderate datasets, rapid prototyping of SQL logic, and scenarios where spinning up a distributed cluster is unnecessary overhead.
Start with DuckDB for interactive exploration and prototyping. Move to Trino when you need multi-user concurrency or federated access. Use Spark SQL for production ETL, large-scale processing, and workloads requiring the full Spark ecosystem.
Massively Parallel Processing (MPP)
Trino: Pipelined MPP Execution
Trino is a native MPP engine designed for low-latency analytics. Its architecture distributes query processing across a cluster of workers coordinated by a single coordinator node.
How Trino executes a query:
- Query Planning -- The coordinator parses SQL, generates a logical plan, and produces a distributed execution plan split into fragments.
- Fragment Distribution -- Fragments are assigned to workers based on data locality and available capacity.
- Pipelined Execution -- Unlike Spark's stage-based model, Trino pipelines data between operators without writing intermediate results to disk. Data flows continuously from one stage to the next as it becomes available.
- Result Assembly -- The coordinator collects partial results from workers and streams the final result set to the client.
This pipelined approach eliminates the shuffle-to-disk overhead that Spark incurs at stage boundaries, making Trino significantly faster for short-running analytical queries.
Spark: Stage-Based Parallel Execution
Spark uses a DAG (Directed Acyclic Graph) scheduler to decompose jobs into stages separated by shuffle boundaries.
Spark's execution flow:
- DAG Construction -- Spark builds a DAG of transformations from your query or DataFrame operations.
- Stage Decomposition -- The DAG is split into stages at shuffle boundaries (wide dependencies such as
join,groupBy,repartition). - Task Scheduling -- Each stage is divided into tasks, one per data partition, and distributed across executor pods.
- Shuffle Exchange -- Between stages, data is redistributed across the network. This is written to disk before the next stage begins.
Spark's model trades latency for fault tolerance and scalability. If an executor fails, only the affected stage needs to be recomputed rather than the entire query.
Query Routing in Ilum
Ilum's SQL Viewer allows users to select the execution engine for each query. Engine selection is available directly in the query editor, enabling teams to route workloads to the engine best suited for the task without managing separate infrastructure endpoints.
Vectorized Query Execution
Spark Tungsten Engine
Spark's Tungsten execution engine optimizes CPU and memory efficiency through several techniques:
- Whole-Stage Code Generation -- Instead of interpreting a query plan operator by operator, Tungsten fuses multiple operators into a single optimized function. This eliminates virtual function dispatch overhead and enables the JVM to apply loop optimizations.
- Columnar In-Memory Format -- Tungsten stores data in a compact binary format rather than as Java objects, avoiding garbage collection overhead and reducing memory footprint.
- Off-Heap Memory Management -- Critical data structures are allocated outside the JVM heap, giving Spark direct control over memory lifecycle and preventing GC pauses during large shuffles.
Trino Vectorized Pipeline
Trino processes data in a vectorized, pipelined manner:
- Page-Based Processing -- Data is organized into pages (columnar batches) that flow through operators. Processing entire pages at a time improves CPU cache utilization.
- Operator Fusion -- Adjacent operators in the execution plan are fused where possible to reduce materialization overhead.
- Memory Tracking -- Each query has a memory budget enforced by the coordinator. Queries that exceed their allocation are killed gracefully, protecting cluster stability.
DuckDB Vectorized Execution
DuckDB implements a push-based, morsel-driven execution model optimized for analytical workloads:
- Push-Based Pipeline -- Instead of pulling data through operators (Volcano model), DuckDB pushes batches (morsels) through a pipeline of operators. This reduces function call overhead and improves branch prediction.
- Morsel-Driven Parallelism -- Work is divided into morsels (small batches of rows) that are dynamically assigned to threads. This achieves load balancing without the overhead of full partitioning.
- Columnar Vectors -- All operators work on columnar vectors, enabling SIMD (Single Instruction, Multiple Data) optimizations and efficient cache usage.
Adaptive Query Execution (AQE)
Spark's Adaptive Query Execution re-optimizes query plans at runtime based on actual data statistics observed during execution. AQE addresses three common performance problems:
- Skewed Join Handling -- Detects when one side of a join has partitions significantly larger than others and automatically splits those partitions for more balanced processing.
- Partition Coalescing -- Merges small post-shuffle partitions into larger ones, reducing task scheduling overhead when the initial partition count was too high.
- Join Strategy Switching -- Converts sort-merge joins to broadcast hash joins at runtime if one side turns out to be small enough after filtering.
Enable AQE in your Spark job configuration:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
AQE is available for Spark SQL queries run through the SQL Viewer and for programmatic Spark jobs submitted via Ilum. It is enabled by default in Spark 3.2+.
Caching Strategies
Spark In-Memory Caching
Spark can persist intermediate DataFrames in memory to avoid recomputation. This is particularly useful when the same dataset is accessed multiple times within a pipeline.
Storage levels:
| Storage Level | Description |
|---|---|
MEMORY_ONLY | Store as deserialized Java objects in JVM heap. Fastest access. |
MEMORY_AND_DISK | Spill to disk when heap is full. Recommended for large datasets. |
MEMORY_ONLY_SER | Store as serialized bytes. More memory-efficient, slower to access. |
MEMORY_AND_DISK_SER | Serialized with disk spillover. Best balance for constrained clusters. |
DISK_ONLY | Store only on disk. Use when memory is scarce. |
Example in PySpark:
df = spark.read.parquet("s3a://bucket/large-dataset/")
df.cache() # Equivalent to persist(StorageLevel.MEMORY_AND_DISK)
df.count() # Triggers caching
# Subsequent actions reuse cached data
df.groupBy("category").agg({"amount": "sum"}).show()
df.filter("amount > 1000").count()
# Release when no longer needed
df.unpersist()
In Kubernetes environments, executor pod memory is finite and governed by resource limits. Over-caching can lead to OOM kills. Use MEMORY_AND_DISK_SER in resource-constrained namespaces and monitor memory usage via the Monitoring dashboard.
Columnar Storage Optimization
Using columnar formats like Parquet and ORC provides implicit caching benefits at the storage layer:
- Predicate Pushdown -- Only reads the row groups that match filter conditions, dramatically reducing I/O.
- Column Pruning -- Reads only the columns referenced in the query, skipping irrelevant data.
- Compression -- Columnar data compresses efficiently (Snappy, ZSTD), reducing storage costs and network transfer during reads.
-- Creating an optimized Parquet table with partitioning
CREATE TABLE analytics.events
USING PARQUET
PARTITIONED BY (event_date)
AS SELECT * FROM raw_events;
Trino Result Caching
Trino supports caching query results to accelerate repeated queries:
# Trino catalog configuration
query.cache.enabled=true
query.cache.ttl=10m
For Trino deployments in Ilum, configure caching through the Helm values under the Trino catalog section. See the SQL Viewer documentation for Trino configuration details.
Cache Sizing in Kubernetes
When running in resource-constrained Kubernetes namespaces, follow these guidelines:
- Set explicit memory limits -- Ensure
spark.executor.memory+spark.executor.memoryOverheadstays within the pod memory limit defined by your Resource Control settings. - Prefer serialized storage --
MEMORY_AND_DISK_SERreduces per-record memory overhead by 2-5x compared to deserialized storage. - Monitor cache hit rates -- Use the Spark UI Storage tab to verify that cached data is actually being reused. Unnecessary caching wastes memory.
- Scope caching to the job -- Cached DataFrames are tied to the SparkSession. They do not persist across job restarts.
Workload Management & Isolation
Separating ETL from Analytics
A common pattern is to dedicate Spark for batch ETL workloads and Trino for interactive analytics. This separation ensures that long-running ETL jobs do not compete with latency-sensitive queries for cluster resources.
| Workload Type | Engine | Resource Profile |
|---|---|---|
| Batch ETL | Spark SQL | Large executors, high memory, scheduled |
| Interactive queries | Trino | Always-on, moderate memory, low latency |
| Data exploration | DuckDB | Embedded, minimal resources |
| ML training | Spark SQL | GPU-enabled executors, dynamic scaling |
Namespace-Level Resource Isolation
Kubernetes namespaces provide the primary isolation boundary in Ilum. Each team or project can operate in its own namespace with dedicated resource quotas.
# Example namespace quota for an analytics team
apiVersion: v1
kind: ResourceQuota
metadata:
name: analytics-team-quota
namespace: analytics
spec:
hard:
requests.cpu: "32"
requests.memory: 64Gi
limits.cpu: "64"
limits.memory: 128Gi
pods: "50"
Ilum manages these quotas through its cluster settings interface. For detailed configuration, see Resource Control & Governance.
Spark Dynamic Allocation
For bursty workloads where resource demand varies significantly, Spark's dynamic allocation automatically scales the number of executors based on workload pressure.
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=20
spark.dynamicAllocation.executorIdleTimeout=60s
spark.dynamicAllocation.schedulerBacklogTimeout=5s
How it works:
- When tasks are queued and no executors are idle, Spark requests additional executors from Kubernetes.
- When executors are idle beyond
executorIdleTimeout, they are released back to the cluster. maxExecutorsacts as a safety cap, working in conjunction with namespace-level quotas to prevent runaway scaling.
Dynamic allocation requires the shuffle tracking service to preserve shuffle data from decommissioned executors. This is enabled by default in Spark on Kubernetes with spark.dynamicAllocation.shuffleTracking.enabled=true.
Priority and Preemption
Kubernetes PriorityClasses can be used to ensure that critical workloads are scheduled before lower-priority ones. When the cluster is at capacity, lower-priority pods may be preempted to make room.
# High priority for production ETL
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: production-etl
value: 1000
globalDefault: false
description: "Priority for production ETL Spark jobs"
Assign the priority class in your Spark job configuration:
spark.kubernetes.driver.podTemplateFile=/path/to/driver-template.yaml
spark.kubernetes.executor.podTemplateFile=/path/to/executor-template.yaml
Apache Arrow & Columnar Standards
Arrow as the Common Columnar Format
Apache Arrow defines a language-independent columnar memory format that enables zero-copy data sharing between systems. In Ilum's architecture, Arrow serves as the common data representation across engines:
- Spark (Tungsten) -- Spark's internal Tungsten format is conceptually similar to Arrow. Spark uses Arrow for efficient data exchange between JVM and Python processes (PySpark), eliminating serialization overhead in UDF execution.
- Trino -- Trino's page-based columnar format aligns with Arrow's design principles, enabling efficient data transfer between operators.
- DuckDB -- DuckDB natively uses a columnar vector format compatible with Arrow, enabling zero-copy reads from Arrow-backed data sources.
Spark Connect and Arrow Serialization
Spark Connect uses Arrow as the serialization format for data exchange between the client and the Spark cluster. This provides significant performance benefits:
- Efficient serialization -- Arrow's columnar format serializes and deserializes faster than row-based formats like Java serialization.
- Cross-language support -- Arrow enables native-speed data access from Python, R, and other languages without JVM overhead.
- Reduced network transfer -- Arrow's compressed columnar format minimizes the bytes transferred between client and cluster.
Enable Arrow-based optimization for PySpark UDFs:
spark.sql.execution.arrow.pyspark.enabled=true
spark.sql.execution.arrow.pyspark.fallback.enabled=true
Arrow Flight SQL is a protocol for high-performance SQL query execution over Arrow-based data streams. While not yet integrated into ilum's production stack, it represents the direction for future query engine interoperability. The protocol enables clients to execute SQL and retrieve results as Arrow record batches over gRPC, offering significant performance improvements over JDBC/ODBC for analytical workloads.