Skip to main content

Ilum Tables

Ilum Tables is a Spark format that serves as a wrapper for Delta, Iceberg, and Hudi data formats. It enables you to access and create datasets in these formats using a unified interface, resulting in more flexible code design.

Import Ilum Tables

To make use of Ilum Tables you should include ilum-spark-format package into your Ilum Jobs. You can do this by adding this configuration:

spark.jars.packages=cloud.ilum:ilum-spark-format:6.1.0

or by adding the package as a separate jar into your Ilum Job resources

How to include package into your Scala Application

  • Using sbt:
libraryDependencies += "cloud.ilum" % "ilum-spark-format" % "6.1.0"
  • Using maven:
<dependency>
<groupId>cloud.ilum</groupId>
<artifactId>ilum-spark-format</artifactId>
<version>6.1.0</version>
</dependency>
  • Using gradle:
implementation group: 'cloud.ilum', name: 'ilum-spark-format', version: '6.1.0'

How to use it?

  • Read and write data specifying 'ilum format'
  • Read and write data using ilum method

To do this you shoud import them like this:

import cloud.ilum.implicits.{
IlumDataFrameReader,
IlumDataFrameWriter,
IlumDataFrameWriterV2,
IlumDataStreamWriter,
IlumDataStreamReader
}
  • Read and write data by preconfiguring the catalog ( writeTo, read.table)

Reading

val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = Some("delta")

// without ilum method
val mydf = spark.read.format("ilum").option("tableFormat", tableFormat).load(filepath)

// with ilum method
val mydf2 = sparkSession.read.ilum(filePath, tableFormat)


Writing

val filepath = "s3a://ilum-files/ilum-tables/table"
val tableFormat = "delta"

val data = Seq(
(1, "Alice"),
(2, "Bob"),
(3, "Cathy")
)

val df = spark.createDataFrame(data).toDF("id", "name")

// using DataframeWriterV1

// you can use syntax like this
df.write.format("ilum").option("tableFormat", tableFormat).save(filepath)

// or you can use ilum function

df.write.ilum(filepath + "/1", format)

// using DataframeWriterV2 with preconfigured Delta catalog
val catalog = "catalog"
val table = "tablename"
df.writeTo(s"${catalog}.${table}").ilum(format, None ).createOrReplace()

Streaming

Without Ilum Methods

val filepath = "s3a://ilum-files/ilum-tables/streaming"
val tableFormat = "delta"

val input = spark.readStream
.format("ilum")
.option("tableFormat", tableFormat)
.load(filepath)

val query = input.writeStream
.outputMode("append")
.format("ilum")
.option("tableFormat", tableFormat)
.option("path", filepath + "_copy")
.option("checkpointLocation", filepath + "_checkpoint")
.start()

query.awaitTermination()

With Ilum Methods:


val filePath = s"s3a://ilum-files/ilum-tables/smth"
val tableFormat = Some("delta")

val df = sparkSession.readStream.ilum(filePath, tableFormat)

val query = df.writeStream
.option("checkpointLocation", filePath + "_checkpoint")
.ilum(filePath+"_copy", tableFormat)

query.awaitTermination()

Configuring Data formats

Delta

In order to make use of Delta you should use these spark configurations:

spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.sql.warehouse.dir=s3a://ilum-files/ilum-warehouse

and you must include Delta package into your environment. To do this you can use kubernetes spark image with delta extensions preinstalled:

spark.kubernetes.container.image=ilum/spark:3.5.2-delta

or install required extension package yourself

Iceberg

In order to make use of Iceberg you should add these configurations:

spark.sql.catalog.iceberg_catalog=org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.iceberg_catalog.type=hive
spark.sql.catalog.iceberg_catalog, org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg_catalog.type=hadoop
spark.sql.catalog.iceberg_catalog.warehouse=s3a://ilum-files/ilum-tables/iceberg/warehouse

and you must include org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1 package into your environment by adding its jar to resources or by adding it to spark configurations like this:

spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.6.1

Hudi

In order to make use of Hudi you should add these configurations:

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSpearkSessionExtension
spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog

and import org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0 package into your environment by adding its jar to the resources or by adding it to spark configurations like this:

spark.jars.packages=org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0