This blog post is a summary of my talk at the 4th Vilnius Microsoft Data Platform Meetup, which took place on November 6th @Cognizant Lithuania. In this talk I presented a gentle introduction to the new Delta Lake solution from Databricks foundation, its new features, use cases and our production experience.
Introduction
Let’s start with a fancy quote from Wikipedia:
“Big data” is a field that treats ways to analyze, systematically extract information from, or otherwise deal with data sets that are too large or complex to be dealt with by traditional data-processing application software – Wikipedia, 2019
Data is all around us. We generate it every day. Our smartphones, wearable devices, smart cars and tvs became an enormous data generation sources. Together with the vast expansion of data accumulation, advanced machine learning and data science techniques came into play. There is a fun saying in machine learning project world that “there is no such thing as too much data”.
However, behind every fancy-looking AI project there are tons of additional work that goes into data accumulation and storage. In 2015, Google released a nice figure which indicates machine learning code in size, compared to additional technical infrastructure around it.
This also correlates with “The Data Science Hierarchy of needs” pyramid which I absolutely adore. It shows that only a small portion of work required for the project consists of AI/ML/Deep learning part. The vast majority of time and resources are spent on data collection, storage, cleaning, transformation, exploration and optimization. As a Data Engineer I can personally support this argument with my experience on our production system.
With all this data required for different types of projects, there is always a need to store it somewhere. In the old days, one simple database or data warehouse was more than enough. Relational databases provided a very high quality data by enforcing the schema on it and querying was very fast. On the flip side, these databases provided no support for streaming, it was very expensive to scale and there was no support for the growing machine learning demand.
Everything changed after the introduction of Hadoop and data lake concept. It offered cheap horizontal scaling, open format support, together with streaming and ML. On the flip side - data could easily become very messy, additional resources were required to clean it and transform to specific structure. It was a nice overall improvement, but it didn’t solve all the issues around data.
But what if most of the issues could be solved by the data format itself, without relying on the storage technology? And that’s how Delta Lake project was born.
Delta Lake
Main Delta lake features
So what are the main features of this technology and what benefits does it bring? Let’s list most important facts about the Delta Lake:
- Build on top of well-known parquet file format.
- ACID Transcations. Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level.
- Scalable Metadata Handling. In big data, even the metadata itself can be “big data”. Delta Lake treats metadata just like data, leveraging Spark’s distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files with ease.
- Time Travel. Delta Lake provides snapshots of data, enabling developers to access and revert to earlier versions of data for audits, rollbacks or to reproduce experiments.
- Unified Batch and Streaming Source and Sink A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.
- Schema Enforcement. Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption.
- Schema Evolution. Big data is continuously changing. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL.
- Fully compatible with Apache Spark. Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark.
- Automatic file manage. Data access speeds up by organizing data into large files that can be read efficiently.
- Statistics, data skipping and manual optimization. Reads are 10–100x faster when statistics are tracked about the data in each file, allowing Delta to avoid reading irrelevant information.
- Delete and insert support Traditionally, data lake falls under the “save once, never change” way of thinking. Delta Lake changes that as it supports insertion and deletion of the data within dataset.
How to use it?
In 2019 Delta project went open source and you can use it on any project you want. You can read more about it in Github repository.
Installation on Pyspark and Scala:
Spark package:
pyspark --packages io.delta:delta-core_2.11:0.4.0
bin/spark-shell --packages io.delta:delta-core_2.11:0.4.0
Maven installation:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.4.0</version>
</dependency>
On Databricks Delta format is supported out of the box.
In order to start using all Delta format features you need to save your first dataset in Delta format.
In order to do that - pass the 'delta'
keyword to format()
parameter. Simple as that:
dataframe
.write
.format("delta")
.save("/yourlocation")
How does it work?
After saving the dataset you can expect to see the standard structure of Delta format folder:
The data is saved in parquet format with additional _delta_log
folder which stores all your transaction history. Each dataset change is represented by json
commit file. Be careful when moving or changing these files as this can ruin your dataset irrecoverably.
ACID
One of the biggest features of new Delta Lake technology is ACID transactions. A short reminder on what ACID means:
- Atomicity - all or nothing rule. Transactions ether succeed fully or fails completely. No partial writes are allowed.
- Consistency - schema enforcing. It is ensured, that the saved data is at certain quality.
- Isolation - transaction separation. No two writes can conflict with one another.
- Durability - if it’s saved, it’s there. Provides assurance that even in case of a breakdown a data is secured and can be restored.
In our production environment in the past we were faced with the issue which can occur during the writing of ORC format dataframe. In case of failure, a __FAILURE__
log file is created and the dataset can be considered unrecoverable. Every time this happened we were forced to reload the dataset from scratch. To solve this issue, we implemented an additional “safe write” procedure in which we first saved the data to a temporary location, confirmed that it was successfully written and moved it to permanent location. This added additional overhaul code and resource consumption. With Delta lake ACID transaction implementation this problem is solved by default. In case of parallel writes, Delta lake conflicts are solved optimistically as showed in the picture below.
User 1 and 2 read the same data and try to save it at the same time. The conflict is saved optimistically, by allowing to commit the change from User 2 and then from User 1.
Timetravel and metadata
Out of the box - Delta lake supports advanced matadata handling and time travel capabilities. This is particularly useful in GDPR era where all changes within data can fall under audit. In the example below, dataset history table is shown. It is possible to track back which user performed what change and in case of need - roll back to specific version or timestamp.
Optimization
Out of the box Delta Lake gathers information and statistics on the data in order to speed up queries by implementing data skipping techniques. There are additional manual optimization techniques which can be used:
- Compaction (bin-packing).
- This technique coalesces small files into larger ones to improve speed of read queries.
spark.sql("OPTIMIZE delta_table")
- This technique coalesces small files into larger ones to improve speed of read queries.
- Z-Ordering
- Co-locate related information in the same set of files. Z-Ordering provides the best result with filter-heavy transformations.
spark.sql("OPTIMIZE delta_table ZORDER BY (column name)")
- Co-locate related information in the same set of files. Z-Ordering provides the best result with filter-heavy transformations.
Delta Lake promises that using these techniques, the performance of some queries can be increased by x10-100 times.
Performance test
To test the performance of the new Delta Lake, I created a simple test notebook with different data types. The original dataset was saved in ORC format and consisted of 148768480 rows and 57 columns. The total size on disk was 8.8 GB.
Test cases used in the experiment:
- Plain delta – 7.5 GB
- Optimized manual (partitioned by ID). – 7.5 GB
- Optimized auto (bin-packing). – 22 GB
- Optimized auto (Z-Order). – 22 GB
- Original ORC. – 8.8 GB
- CSV. – 41 GB
To test the performance, dataframe transformation with multiple operations was used:
The results on particular transformations were as follows:
- Plain delta – 14s
- Optimized manual (partitioned by ID) – 4s
- Optimized auto (bin-packing) – 6s
- Optimized auto (Z-Order) – 2.5s
- Original ORC – 17 s
- CSV–12min
Even in the simple experiment like this, we can observe the advantage of Delta lake.
Our production experience and tips
We moved our production pipeline to Delta Lake solution in September. Currently, our structure consists of raw layer (ORC files) and Delta Lake layer. After the daily load from on-prem, the custom Delta Lake builder (our data consists of full, incremental and upsert loads) converts the files from ORC to delta format. We also simplified our writing part significantly as ACID transactions saves us a trouble of safe write technique logic (mentioned above). In result - we managed to speed up our pipeline by around 25% (from 12 hours to 7.5 - 9 hours in total). During this change we learned a few tips and tricks that could help in the future:
- Be careful with
_delta_log
folder. One time we removed it by accident, thus ruining the dataset completely. That meant there was no way of restoring the dataset and we had to convert it fully from RAW layer. - Delta cache persists on cluster level (Databricks). This was a very irritating and hard to discover issue. If after the error in data you fix the problem and try to read the dataframe again, you still get the same error, unless the cluster is restarted. As Delta format keeps statistics to improve speed with data skipping techniques, cache is stored no on the session, but on cluster (machine) level.
- Delta format is still highly dependable on Databricks Runtime Version. Don’t try to push for the newest databricks runtime version and you could face the issues after the update. Patience is a virtue!
- Z-Order optimization improves only specific transformations. We haven’t noticed any change in performance with join heavy operations. As mentioned before - Z-Ordering works best with huge number of filtering operations.
The End
Few related articles around Delta Lake: