Automating Spark Jobs with GitLab CI/CD
This guide demonstrates how to build a complete CI/CD pipeline for Apache Spark applications on Kubernetes using GitLab CI/CD and Ilum. By implementing this architecture, you establish a robust DataOps practice that automates the lifecycle of your data engineering code.
Key Benefits
- Automated Deployments: Eliminate manual
spark-submitcommands. - Version Control: Ensure every job running in your cluster corresponds to a specific Git commit.
- Consistency: Automatically synchronize environment configurations (Ilum Groups) and job definitions.
- Feedback Loop: Get immediate status reporting on job submission success or failure directly in GitLab.
Prerequisites
- A GitLab project (SaaS or Self-Managed).
- Ilum API endpoint reachable from your GitLab Runner (e.g., via Ingress or internal network).
curlandjqinstalled in the runner image (or use an image likealpineand install them).
1. Architecture of a Spark CI/CD Pipeline
The pipeline uses the Ilum REST API to interact with the Kubernetes cluster.
- Check/Manage Groups: Ensures the target Ilum Group exists (and optionally recreates it to update group-level files/settings).
- Submit Job: Posts the Spark job definition and code to Ilum.
- Execute: The job runs on the Spark cluster, independent of the CI runner.
2. Automating Ilum Group Management & Environment Setup
This example pipeline stage demonstrates how to manage Ilum Groups—logical containers for interactive sessions and shared resources. It checks if a group exists, deletes it (to cleanup old state), and recreates it with fresh configuration.
The Group Service (service.py)
This Python script defines an interactive Spark job that the group will execute. It inherits from IlumJob and provides details about a specified table.
from ilum.api import IlumJob
from pyspark.sql.functions import col, sum as spark_sum
from io import StringIO
class SparkInteractiveExample(IlumJob):
def run(self, spark, config) -> str:
table_name = config.get('table')
database_name = config.get('database') # optional
report_lines = []
if not table_name:
raise ValueError("Config must provide a 'table' key")
# Use specified database if provided
if database_name:
spark.catalog.setCurrentDatabase(database_name)
report_lines.append(f"Using database: {database_name}")
# Check if table exists in catalog
if table_name not in [t.name for t in spark.catalog.listTables()]:
raise ValueError(f"Table '{table_name}' not found in catalog")
df = spark.table(table_name)
report_lines.append(f"=== Details for table: {table_name} ===")
# Total rows
total_rows = df.count()
report_lines.append(f"Total rows: {total_rows}")
# Total columns
total_columns = len(df.columns)
report_lines.append(f"Total columns: {total_columns}")
# Distinct counts per column
report_lines.append("Distinct values per column:")
for c in df.columns:
distinct_count = df.select(c).distinct().count()
report_lines.append(f" {c}: {distinct_count}")
# Schema info
report_lines.append("Schema:")
# Spark does not easily return schema as string; we can reconstruct:
for f in df.schema.fields:
report_lines.append(f" {f.name}: {f.dataType}")
# Sample data
report_lines.append("Sample data (first 5 rows):")
sample_rows = df.take(5)
for row in sample_rows:
report_lines.append(str(row.asDict()))
# Null counts per column
report_lines.append("Null counts per column:")
null_counts_df = df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts = null_counts_df.collect()[0].asDict()
for c, v in null_counts.items():
report_lines.append(f" {c}: {v}")
return "\n".join(report_lines)
The CI Pipeline (Group Management)
This pipeline stage manages the lifecycle of the Ilum Group. It checks if the group exists, removes it if necessary to apply code updates, and recreates it with the latest service.py.
stages:
- manage_group
manage_group:
stage: manage_group
image: alpine:3.20
before_script:
- apk add --no-cache curl jq
script:
- echo "--- Checking Group Existence ---"
- |
GROUP_NAME="ILUM_COURSE"
API_URL="http://ilum-core.default:9888/api/v1"
# 1. Check if group exists
RESPONSE=$(curl -s "$API_URL/group")
GROUP_ID=$(echo "$RESPONSE" | jq -r ".content[] | select(.name==\"$GROUP_NAME\") | .id")
# 2. Delete if exists (to update code/config)
if [ -n "$GROUP_ID" ] && [ "$GROUP_ID" != "null" ]; then
echo "Deleting existing group $GROUP_ID..."
curl -s -X POST "$API_URL/group/$GROUP_ID/stop"
curl -s -X DELETE "$API_URL/group/$GROUP_ID"
fi
# 3. Create new group with updated service.py
echo "Creating new group..."
CREATE_RESP=$(curl -s -X POST \
-F "name=$GROUP_NAME" \
-F "clusterName=default" \
-F "language=PYTHON" \
-F "[email protected]" \
-F "jobConfig=spark.executor.instances=2;spark.driver.memory=2g" \
"$API_URL/group")
NEW_ID=$(echo "$CREATE_RESP" | jq -r '.groupId // empty')
if [ -n "$NEW_ID" ]; then
echo "Group created successfully with ID: $NEW_ID"
else
echo "Failed to create group. Response: $CREATE_RESP"
exit 1
fi
3. Continuous Deployment of Spark Jobs
This stage implements the Continuous Deployment (CD) logic by submitting a PySpark job to the Ilum cluster. It is configured to trigger automatically on a git push to the main branch.
The Spark Job (submit.py)
Create a file named submit.py in your repository root. This example job creates a university database with Hive tables and uses Delta Lake format.
import logging
from pyspark.sql import SparkSession
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info("""
=== Example Spark Job: Student Enrollments ===
This Spark job demonstrates a simple educational data pipeline using Hive tables.
It performs the following steps:
1. Creates a 'students' table with student information.
2. Creates a 'courses' table with available courses.
3. Creates an 'enrollments' table linking students to courses.
4. Joins the tables to calculate enrollment statistics and saves them into 'course_stats'.
All intermediate results are stored in the 'university' Hive database.
=============================================
""")
spark = SparkSession \
.builder \
.appName("Spark") \
.getOrCreate()
logger.info("SparkSession initialized")
spark.sql("CREATE DATABASE IF NOT EXISTS university")
logger.info("Database 'university' ensured")
# --- Create Students Table ---
students_data = [
(1, "Alice", "Computer Science"),
(2, "Bob", "Mathematics"),
(3, "Charlie", "Physics"),
(4, "Diana", "Computer Science")
]
df_students = spark.createDataFrame(students_data, ["student_id", "name", "major"])
spark.sql("DROP TABLE IF EXISTS university.students")
df_students.write.format("delta").saveAsTable("university.students")
logger.info("Created table: university.students")
# --- Create Courses Table ---
courses_data = [
(101, "Big Data"),
(102, "Linear Algebra"),
(103, "Quantum Mechanics")
]
df_courses = spark.createDataFrame(courses_data, ["course_id", "course_name"])
spark.sql("DROP TABLE IF EXISTS university.courses")
df_courses.write.format("delta").saveAsTable("university.courses")
logger.info("Created table: university.courses")
# --- Create Enrollments Table ---
enrollments_data = [
(1, 101), # Alice -> Big Data
(2, 102), # Bob -> Linear Algebra
(3, 103), # Charlie -> Quantum Mechanics
(4, 101), # Diana -> Big Data
(2, 101) # Bob -> Big Data
]
df_enrollments = spark.createDataFrame(enrollments_data, ["student_id", "course_id"])
spark.sql("DROP TABLE IF EXISTS university.enrollments")
df_enrollments.write.format("delta").saveAsTable("university.enrollments")
logger.info("Created table: university.enrollments")
# --- Join to calculate course enrollment counts ---
df_course_stats = spark.sql("""
SELECT
c.course_id,
c.course_name,
COUNT(e.student_id) AS total_students
FROM university.courses c
LEFT JOIN university.enrollments e ON c.course_id = e.course_id
GROUP BY c.course_id, c.course_name
""")
spark.sql("DROP TABLE IF EXISTS university.course_stats")
df_course_stats.write.format("delta").saveAsTable("university.course_stats")
logger.info("Inserted final data into course_stats table")
The CI Pipeline (Job Submission)
Add the job submission stage to your .gitlab-ci.yml. This configuration uses alpine:3.20 and includes error handling for the REST API response.
stages:
- submit_job
# Define variables globally or per job
variables:
# In a real project, use CI/CD Variables for endpoints and tokens
ILUM_API_URL: "http://ilum-core.default:9888/api/v1"
submit_job:
stage: submit_job
image: alpine:3.20
rules:
- if: '$CI_COMMIT_BRANCH == "main"'
before_script:
- apk add --no-cache curl jq
script:
- echo "Creating job ILUM_JOB_SUBMIT with submit.py..."
- |
# Submit job request and capture HTTP status code + body
# Note: jobClass is mandatory.
# - For a script, use the filename without .py (e.g., "submit").
# - For no specific class (main entry point), use an empty string.
# - "filename.classname" is only for interactive jobs or packages.
RESPONSE=$(curl -s -X POST \
-F "name=gitlab_pipeline_job" \
-F "[email protected]" \
-F "clusterName=default" \
-F "language=PYTHON" \
-F "jobClass=submit" \
-w "\nHTTP_STATUS:%{http_code}" \
"$ILUM_API_URL/job/submit")
# Extract Status and Body
HTTP_STATUS=$(echo "$RESPONSE" | grep HTTP_STATUS | cut -d':' -f2)
BODY=$(echo "$RESPONSE" | sed '/HTTP_STATUS/d')
echo "HTTP Status: $HTTP_STATUS"
echo "Response Body: $BODY"
if [ "$HTTP_STATUS" -ne 200 ]; then
echo "Error: Failed to create job (Status: $HTTP_STATUS)"
exit 1
fi
JOB_ID=$(echo "$BODY" | jq -r '.jobId // empty')
if [ -n "$JOB_ID" ]; then
echo "✅ Job created successfully with ID $JOB_ID."
echo "You can check status in Ilum UI."
else
echo "Warning: Job created but ID not returned."
fi
Pipeline Variables
For security, avoid hardcoding URLs. Use GitLab CI/CD Variables (Settings -> CI/CD -> Variables):
When using the multipart/form-data endpoint (like /job/submit or /group), the jobConfig should be a semicolon-separated string (e.g., spark.key=value;spark.key2=value2).
However, when submitting an interactive execution via the JSON API (e.g., /group/{groupId}/job/execute), the jobConfig must be a standard JSON object.
ILUM_API_URL: e.g.,https://ilum.example.com/apiILUM_AUTH_TOKEN: If authentication is enabled, pass this header incurl(-H "Authorization: Bearer $ILUM_AUTH_TOKEN").
Verification
- Push your code to the
mainbranch. - Go to Build -> Pipelines in GitLab.
- Wait for the
submit_jobstage to pass. - Open the Ilum UI -> Jobs tab.
- You should see
gitlab_daily_reportin the running or completed state.
Frequently Asked Questions (FAQ)
Why use the REST API instead of spark-submit?
The Ilum REST API provides a programmatic, language-agnostic way to interact with the cluster. Unlike spark-submit which requires a complex client-side setup (Java, Hadoop configs, K8s credentials) in your CI runner, the REST API only requires curl and network access to the Ilum endpoint. This drastically simplifies your CI runner images.
How do I handle secrets in my pipeline?
Never hardcode secrets like S3 keys or database passwords in your submit.py. Instead:
- Store them as GitLab CI/CD Variables (masked and protected).
- Pass them to the Ilum job as Spark configuration properties or environment variables during the API call (e.g.,
-F "jobConfig=spark.hadoop.fs.s3a.access.key=$AWS_ACCESS_KEY...").
Can I run this on GitHub Actions?
Yes. The concepts are identical. You would replace the .gitlab-ci.yml syntax with GitHub Actions workflow syntax, using curl steps to hit the same Ilum API endpoints.