Scheduler
Overview
The Scheduler is a feature that enables users to create schedules for individual jobs. It provides a user-friendly interface that enables users to easily configure the timeline for executing individual jobs.
It operates by creating a Kubernetes CronJob, which utilizes a cron expression to determine the specific times at which to execute the job. Upon reaching the designated time, the CronJob makes a request to the ilum-core, which then initiates the creation of a Kubernetes pod. Within this pod, a spark-submit command is executed, applying all user-defined configurations alongside the essential configurations provided by the ilum framework.
Use cases:
1. Data Ingestion
Periodically fetch data from external APIs or other sources and load it into a database or file system for further processing or analysis.
2. Data Pipeline Orchestration (ETL)
Automate Spark-based ETL (Extract, Transform, Load) jobs that extract raw data from multiple sources, apply complex transformations, and load the results into a data warehouse
3. Reports preparation and Data Aggregation for analytics
Schedule a Spark job to aggregate large datasets from various sources (e.g., logs, sales data, user interactions) into summary tables that are used in dashboards and reports.
4. Data Clean-up
Schedule a Spark job that performs data cleanup, such as removing duplicates, correcting invalid entries, or filtering data, in a large dataset stored in a distributed system
Get started
1. Write a single job
For example in Python.
This Job's task is to generate data and put it into s3 bucket each specified time period
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Row
import random
from datetime import datetime
def generate_sample_data(n):
return [Row(id=i, name=f"Name_{i}", value=random.randrange(70, 100)) for i in range(n)]
if(__name__ == "__main__"):
spark = SparkSession.builder \
.appName("My Spark Job") \
.getOrCreate()
data = generate_sample_data(100)
df = spark.createDataFrame(data)
current_datetime = datetime.now()
current_timestamp = current_datetime.strftime("%Y-%m-%d %H:%M:%S")
output_path = f"s3a://ilum-files/generated_data/{current_timestamp}"
df.write.mode("overwrite").parquet(output_path)
spark.stop()
The spark session will be created internally, you don`t need to configure spark session manually. The configurations you see later in job logs, are the job is finished:
In case you require some customizatoin, you may use
2. Create a scheduler
- Upload your python script in Resources -> PyFiles
- Configure scheduler timline in Timing section: specify start time, end time and CRON expressions
In case you need custom configuraiton of timing, that cannot be set in user interface, you may write your own Cron exrepssion
Quick guide:
Cron expression consists of 5 fields:
<minute> <hour> <day-of-month> <month> <day-of-week>
* - every time unit
? - any time unit (? in week-day field results in ignoiring the week-day)
- - range (1-5)
, - values: (1,5)
/ - increments (5/15 in minute field = 5, 20, 35, 50)
Examples:
Each hour:
0 * * * *
Each Sunday at 3 AM:
0 3 * * 0
Each 15 minutes every day:
0/15 0 * * ?
Every five minutes starting at 1 p.m. and ending at 1:55 p.m. and then starting at 6 p.m. and ending at 6:55 p.m., every day:
0/5 13,18 * * ?
Here is Cron Job documentation
- Add custom configurations if needed in Configuration folder
You can add here spark configurations in Parameters
For example, you can add paramteres to make use of Delta Lake data format and catalog:
In case you designed you application to take command-line arguments, you can specify them in Args You can also use tags for filtering and jobs organization
- Resource Allocation
You don't need to specify them inside of spark configurations. Instead you can use UI:
Here you can specify the spark configs:
spark.executor.instances
spark.executor.memory
spark.executor.cores
spark.driver.memory
spark.driver.cores
Also you can enable Dynamic Allocation a configure it. With dynamic allocation the number of executors will very depending on current workload in the application. If all executors are busy and there are task to do, spark will assign more executors.
With UI you will be able to specify these spark parameters:
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors
3. Launch the scheduler and observe logs of appearing jobs
- Observer launched jobs
- Observe job logs
- Observe lineage Lineage tracks the sequence of transformations applied to data
-
Observe used spark configurations, command-line arugments and job tags
-
Observe executors metrics allocation
Tips
- Before creating scheduler test your single job work by launching a regular Single Job
- Remember to turn off the schedulers
If you dont need the job to be launched periodically, you should turn them off to avoid resource waste
- Edit your schedulers instead of creating new one
If you want to change configurations of existing schedule, simply click edit on it