Apache Iceberg Setup With AWS EMR and Basic CRUD operations in Iceberg using spark SQL

Rishav Sarkar
2 min readJan 3, 2023

--

This post will discuss how to set up Apache Iceberg on the EMR cluster, use AWS glue as the iceberg catalog, run basic CRUD operations in Iceberg tables using spark SQL, and see the changes in AWS Athena.

EMR Cluser Configuration :

  1. EMR Release: emr-6.9.0
  2. Select Software: Hadoop 3.3.3 and Spark 3.3.0
  3. Specify the below Software Settings Configuration
[
{
"classification":"spark-defaults",
"properties":{
"spark.jars.packages": "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.2,software.amazon.awssdk:bundle:2.20.26,software.amazon.awssdk:url-connection-client:2.20.68",
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.defaultCatalog":"<catalog_name>",
"spark.sql.catalog.<catalog_name>":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.<catalog_name>.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
"spark.sql.catalog.<catalog_name>.warehouse":"<your_warehouse_s3_path>",
"spark.sql.catalog.<catalog_name>.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
"spark.sql.sources.partitionOverwriteMode":"dynamic"
}
},
{
"Classification": "hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
]

4. Replace <catalog_name> with the name of the catalog for the iceberg table. You can specify any name (Preferred lower case with underscores). Replace <your_warehouse_s3_path> with the S3 path where iceberg metadata files will be stored.

5. Continue with the other settings as required.

CRUD operations on iceberg tables.

All the crud operations are reflected in the AWS Athena.

  1. Create Iceberg Database:
spark.sql(""" CREATE DATABASE IF NOT EXISTS <catalog_name>.<iceberg_database_name> """)

2. Create Iceberg Table:

spark.sql(""" CREATE TABLE IF NOT EXISTS <iceberg_catalog_name>.<iceberg_database_name>.<iceberg_table_name> 
(column_name_1 column_type_1, column_name_2 column_type_2 ...)
PARTITIONED BY (column_name)""")

3. Insert values into the Iceberg Table:

spark.sql(""" INSERT INTO <iceberg_catalog_name>.<iceberg_db>.<iceberg_table> 
VALUES (value_1, value_2),(value_1, value_2)""")

4. Read data from the Iceberg Table:

spark.sql(""" SELECT * FROM <iceberg_catalog_name>.<iceberg_db>.<iceberg_table>""")

5. Read snapshot history :

spark.sql(""" SELECT * FROM <iceberg_catalog_name>.<iceberg_db>.<iceberg_table>.snapshots""")

6. Read Data as of <snapshot_id> :

spark.read.option("snapshot-id",<snapshot_id>L)
.table("<iceberg_catalog_name>.<iceberg_db>.<iceberg_table>")

7. Read Data as of <timestamp> :

spark.read.option("as-of-timestamp", <timestamp>L)
.table("<iceberg_catalog_name>.<iceberg_db>.<iceberg_table>")

8. Rollback to the previous snapshot:

spark.sql(""" CALL <iceberg_catalog_name>.system
.rollback_to_snapshot('<iceberg_db>.<iceberg_table>',<snapshot_id>)""")

9. Rollback to the Previous timestamp:

spark.sql(""" CALL <iceberg_catalog_name>.system
.rollback_to_timestamp('<iceberg_db>.<iceberg_table>',TIMESTAMP <timestamp>)""")

--

--