Apache Iceberg with Apache Spark
Recently I was assigned a task to write Iceberg utilities using Apache Spark with Scala. So the code should automatically write data to Datalake in Iceberg format and should be able to perform maintenance on the data in iceberg format. I was able to achieve it using Spark SQL. I will go step by step on how we can achieve the solution.
Setting up Spark and importing packages
I am using Spark version — 3.3.0 and Scala version — 2.12.12.
In the build.sbt we need to add the packages below for Iceberg to work:
"org.apache.iceberg" %% "iceberg-spark-runtime-3.3" % "1.3.0",
"software.amazon.awssdk" % "bundle" % "2.20.26",
"software.amazon.awssdk" % "url-connection-client" % "2.20.68",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.10.0",
A Spark session variable can be created using the code below:
val spark = SparkSession.builder()
.config("spark.sql.defaultCatalog", "<iceberg_catalog_name>")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.<iceberg_catalog_name>", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.<iceberg_catalog_name>.warehouse", "<warehouse_s3_path>")
.config("spark.sql.catalog.<iceberg_catalog_name>.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
.config("spark.sql.catalog.<iceberg_catalog_name>.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.source.partitionOverviewMode", "dynamic")
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
.enableHiveSupport()
.getOrCreate()
Note: The above configs are required in order for Spark to work with Iceberg.
Replace <iceberg_catalog_name> with the name of the catalog for the iceberg table. You can specify any name (Preferred lower case with underscores). Replace <warehouse_s3_path> with the S3 path where iceberg metadata files will be stored.
Create Table Query (Example):
spark.sql("""CREATE TABLE `<iceberg_catalog_name>`.`<iceberg_database>`.`<iceberg_table>` (
| id BIGINT,
| name STRING,
| ts TIMESTAMP
|)
|USING iceberg
|PARTITIONED BY (
| days(ts),
| name,
| date_hour(ts)
|)
|LOCATION '<Table_location_in_datalake>'
|
|TBLPROPERTIES (
|'format'='PARQUET',
|'write.merge.mode'='merge-on-read',
|'max-concurrent-file-group-rewrites'='20',
|'write.update.mode'='merge-on-read',
|'write.target-file-size-bytes'='134217728',
|'write.delete.mode'='merge-on-read',
|'table_type'='ICEBERG',
|'partial-progress.enabled'='true'
|)""".stripMargin)
Above is an example of how we can create a table in iceberg format using Spark with Scala. We can add the catalog name, database name and table name as required.
Table properties are absolutely optional. We can add table properties as per required. More table properties can be found from here.
Write DataFrame to Iceberg table:
df.write.mode("append")
.insertInto(f"`<iceberg_catalog_name>`.`<iceberg_database>`.`<iceberg_table>`")
We can write a DataFrame to an Iceberg table using the above code. We can change the mode as required (append / overwrite)
Add Partition to existing Iceberg table:
spark.sql("""ALTER TABLE `<iceberg_catalog_name>`.`<iceberg_database>`.`<iceberg_table>`
|ADD PARTITION FIELD <column_name>""".stripMargin)
Drop Partition to existing Iceberg table:
spark.sql("""ALTER TABLE `<iceberg_catalog_name>`.`<iceberg_database>`.`<iceberg_table>`
|DROP PARTITION FIELD <column_name>""".stripMargin)
Generate table DDL:
spark.sql(s"SHOW CREATE TABLE `<iceberg_catalog_name>`.`<iceberg_database>`.`<iceberg_table>`").show(false)
Set maximum number of metadata files:
spark.sql(
s"""ALTER TABLE <iceberg_catalog_name>.<iceberg_database>.<iceberg_table>` SET TBLPROPERTIES (
| 'write.metadata.delete-after-commit.enabled'='true',
| 'write.metadata.previous-versions-max'='<numOfMetadataFiles>'
|)""".stripMargin).show(false)
This will set the maximum number of metadata files that will be created in the metadata folder of iceberg table.
It is recommended to set this property when the table is created so that it maintains the number of metadata files from the beginning. If we do not set this from the beginning, there might be a scenario when huge number of metadata files are created and are not tracked by metadata-log and in that case many metadata files might not get deleted even after setting this property and thus will take more space in the datalake. We have to then identify the required metadata files and delete the ones which are not required manually.
Expire Snapshots:
Expire snapshot by number of days:
val expireDate = java.time.LocalDateTime.now.minusDays(<numOfDays>)
val minCatalogsRetain = 1
spark.sql(s"CALL <iceberg_catalog_name>.system.expire_snapshots(table => '<iceberg_database>.<iceberg_table>', older_than => TIMESTAMP '$expireDate', retain_last => $minCatalogsRetain)").show(false)
Expire snapshot by date:
val expireDate: java.time.LocalDateTime = <any_date>
val minCatalogsRetain = 1
spark.sql(s"CALL <iceberg_catalog_name>.system.expire_snapshots(table => '<iceberg_database>.<iceberg_table>', older_than => TIMESTAMP '$expireDate', retain_last => $minCatalogsRetain)").show(false)
We can expire snapshots by date or number of days using the above code. We need to give a minimum catalog retain value in order to retain some catalogs if all catalogs falls under the expiry date. Default value is 1 as iceberg does not allow deletion of all snapshots.
Compaction (rewrite-data-files):
// Function to rewrite data files i.e. compaction with default strategy
def defaultRewriteDataFiles(): Unit = {
spark.sql(s"""CALL <iceberg_catalog_name>.system.rewrite_data_files(table => '<iceberg_database>.<iceberg_table>')""").show(false)
}
// Function to rewrite data files using sort strategy
def sortRewriteDataFiles(): Unit = {
val sortOrder: String = "<column_name> <ASC / DESC>"
spark.sql(s"""CALL <iceberg_catalog_name>.system.rewrite_data_files(table => '<iceberg_database>.<iceberg_table>', strategy => 'sort', sort_order => '$sortOrder')""").show(false)
}
// Function to rewrite data files using bin-pack strategy
def binpackRewriteDataFiles(): Unit = {
val numOfFiles: Int = <any_number>
spark.sql(s"""CALL <iceberg_catalog_name>.system.rewrite_data_files(table => '<iceberg_database>.<iceberg_table>', options => map('min-input-files','$numOfFiles'))""").show(false)
}
Compaction or rewrite-data-files is the method of combining small data files into fewer large files or splitting large files into smaller files accordingly. This behavior can be adjusted by adding the table property ‘write.target-file-size-bytes’. There are 3 ways for data compaction and can be implemented using the code above. For more information, visit here.
Remove orphan files:
spark.sql(s"CALL <iceberg_catalog_name>.system.remove_orphan_files(table => '<iceberg_database>.<iceberg_table>')").show(false)
This will only remove data files which are no longer associated with any snapshots.
Rewrite Manifest Files:
spark.sql(s"""CALL <iceberg_catalog_name>.system.rewrite_manifests('<iceberg_database>.<iceberg_table>', false)""").show(false)
Rewrite manifests procedure will rewrite the manifests to optimize the scan planning in iceberg tables. This procedure will sort the data files by fields in the partition spec.
The important iceberg utilities are covered for more functionality please refer to the iceberg documentation.