Skip to main content

Scheduler

Overview

The Scheduler is a feature that enables users to create schedules for individual jobs. It provides a user-friendly interface that enables users to easily configure the timeline for executing individual jobs.

It operates by creating a Kubernetes CronJob, which utilizes a cron expression to determine the specific times at which to execute the job. Upon reaching the designated time, the CronJob makes a request to the ilum-core, which then initiates the creation of a Kubernetes pod. Within this pod, a spark-submit command is executed, applying all user-defined configurations alongside the essential configurations provided by the ilum framework.

Use cases:

1. Data Ingestion

Periodically fetch data from external APIs or other sources and load it into a database or file system for further processing or analysis.

2. Data Pipeline Orchestration (ETL)

Automate Spark-based ETL (Extract, Transform, Load) jobs that extract raw data from multiple sources, apply complex transformations, and load the results into a data warehouse

3. Reports preparation and Data Aggregation for analytics

Schedule a Spark job to aggregate large datasets from various sources (e.g., logs, sales data, user interactions) into summary tables that are used in dashboards and reports.

4. Data Clean-up

Schedule a Spark job that performs data cleanup, such as removing duplicates, correcting invalid entries, or filtering data, in a large dataset stored in a distributed system

Get started

1. Write a single job

For example in Python.

This Job's task is to generate data and put it into s3 bucket each specified time period

    from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql import Row
import random
from datetime import datetime

def generate_sample_data(n):
return [Row(id=i, name=f"Name_{i}", value=random.randrange(70, 100)) for i in range(n)]

if(__name__ == "__main__"):

spark = SparkSession.builder \
.appName("My Spark Job") \
.getOrCreate()

data = generate_sample_data(100)
df = spark.createDataFrame(data)

current_datetime = datetime.now()
current_timestamp = current_datetime.strftime("%Y-%m-%d %H:%M:%S")

output_path = f"s3a://ilum-files/generated_data/{current_timestamp}"
df.write.mode("overwrite").parquet(output_path)

spark.stop()


The spark session will be created internally, you don`t need to configure spark session manually. The configurations you see later in job logs, are the job is finished:

Ilum

In case you require some customizatoin, you may use

2. Create a scheduler

  • Upload your python script in Resources -> PyFiles

Ilum

  • Configure scheduler timline in Timing section: specify start time, end time and CRON expressions

Ilum

In case you need custom configuraiton of timing, that cannot be set in user interface, you may write your own Cron exrepssion

Quick guide:

    Cron expression consists of 5 fields: 
<minute> <hour> <day-of-month> <month> <day-of-week>

* - every time unit
? - any time unit (? in week-day field results in ignoiring the week-day)
- - range (1-5)
, - values: (1,5)
/ - increments (5/15 in minute field = 5, 20, 35, 50)

Examples:

    Each hour:              
0 * * * *

Each Sunday at 3 AM:
0 3 * * 0

Each 15 minutes every day:
0/15 0 * * ?

Every five minutes starting at 1 p.m. and ending at 1:55 p.m. and then starting at 6 p.m. and ending at 6:55 p.m., every day:
0/5 13,18 * * ?

Here is Cron Job documentation

  • Add custom configurations if needed in Configuration folder

Ilum

You can add here spark configurations in Parameters

For example, you can add paramteres to make use of Delta Lake data format and catalog:

Ilum

In case you designed you application to take command-line arguments, you can specify them in Args You can also use tags for filtering and jobs organization

  • Resource Allocation

You don't need to specify them inside of spark configurations. Instead you can use UI:

Ilum

Here you can specify the spark configs:

    spark.executor.instances
spark.executor.memory
spark.executor.cores
spark.driver.memory
spark.driver.cores

Also you can enable Dynamic Allocation a configure it. With dynamic allocation the number of executors will very depending on current workload in the application. If all executors are busy and there are task to do, spark will assign more executors.

Ilum

With UI you will be able to specify these spark parameters:

    spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.minExecutors
spark.dynamicAllocation.maxExecutors

3. Launch the scheduler and observe logs of appearing jobs

  • Observer launched jobs

Ilum

  • Observe job logs

Ilum

  • Observe lineage Lineage tracks the sequence of transformations applied to data

Ilum

  • Observe used spark configurations, command-line arugments and job tags

  • Observe executors metrics allocation

Tips

  • Before creating scheduler test your single job work by launching a regular Single Job
  • Remember to turn off the schedulers

If you dont need the job to be launched periodically, you should turn them off to avoid resource waste

  • Edit your schedulers instead of creating new one

If you want to change configurations of existing schedule, simply click edit on it