Writing to AWS S3 from Spark
.write(s3a://……...)
Background
AWS being de facto standard for cloud environment for most of the enterprises and their storage service S3 being very low cost, elastic and promising SLA(availability and durability) is one of the motivations to move away from a relatively expensive storage solutions like HDFS as it involves maintaining a cluster and its related admin costs and support.I am mostly talking about Data Engineering applications specifically ETL and Data Integration applications.
As ETL professional and working many years with on premises infrastructure with traditional ETL tool kit , it has been always a standard practice to write intermediate data between the phases to persist which enables data sharing with different downstream consumers and create chains of ETL pipelines using those data sets, similar mindset continues while writing ETL applications in spark on cloud environment i.e. writing data to S3 for persistence. There is nothing wrong in this approach and works absolutely fine in most of the use cases, but there might be some challenges due to the way in S3 files are written. Transient S3 failures in spark production process more frequently made me learn few internals and some best practices while investigating the root cause which I am writing as a part of this post. With the evolving technology change this might change soon as well.
Two Important Concepts to Understand
S3(Object Store) != POSIX File System : Rename Operation
File rename process in POSIX based file system is a metadata only operation.Only the pointer changes and file remains as is on the disk. For example, I have a file abc.txt
and I want to rename it as xyz.txt
its instantaneous and atomic. xyz.txt’s last modified timestamp remain same as abc.txt’s last modfied timestamp.
Where as in AWS S3 (object store) the file rename under the hood is a copy followed by a delete operation. The source file is first copied to destination and then the source file is deleted.So “aws s3 mv”
changes the last modified timestamp of destination file unlike POSIX file system.The metadata here is a key value store where key is the file path and value is the content of the file and there is no such process as changing the key and get this done immediately. The rename process depends on the size of the file. If there is a directory rename(there is nothing called directory in S3 for for simplicity we can assume a recusrive set of files as a directory) then it depends on the # of files inside the dir along with size of each file. So in a nutshell rename is very expensive operation in S3 as compared to normal file system.
S3 Consistency Model
S3 comes with 2 kinds of consistency a.read after write b.eventual consistency and which some cases results in file not found expectation.Files being added and not listed or files being deleted or not removed from list.
Deep dive
Wait !! We were supposed to discuss on the spark file writing to S3. Consistency model makes some sense and looks related to this but why rename operation ? These are related I would say. Next paragraph I will explain how.
Spark leverages Hadoop’s “FileOutputCommitter” implementations to write data. Writing data again involves multiple steps and on a high level staging output files and then committing them i.e. writing final files.Here the rename step is involved as I was talking earlier from staging to final step.As you know a spark job is divided into multiple stages and set of tasks and due to nature of distributed computing the tasks are prone to failure so there is also provision to re-launch same task due to system failure or speculative execution of slow running tasks and that leads to concepts of task commit and job commit functions.Here we have 2 options of readily available algorithms and how job and task commits are done and having said this not one algorithm is better then other rather based on where we are committing data.
mapreduce.fileoutputcommitter.algorithm.version=1
- commitTask renames the data generated by task from task temporary directory to job temporary directory.
- When all the tasks are complete commitJob rename all the data from job temporary directory to final destination and at the end creates _SUCCESS file.
Here driver does the work of commitJob at the end so object stores like S3 may take longer time because of lots of task temporary file being queued up for rename operation(its not serial though)and the write performance is not optimized.It might work pretty well for HDFS as rename is not expensive and just a metadata change.For AWS S3 during commitJob each rename operation of files opens up huge number of API calls to AWS S3 and might cause issues of unexpected API call closure if the number of files are high. It might not also. I have seen both the cases on the same job running in two different times.
mapreduce.fileoutputcommitter.algorithm.version=2
- commitTask moves data generated by task from task temporary directory directly to the final destination as soon as task is complete.
- commitJob basically writes the _SUCCESS file and doesn't do much.
From a high level this looks optimized but it comes with a limitation not to have the speculative task execution and also if any task fails due to corrupt data then we might end up with residual data in the final destination and needs a clean up. So this algorithm doesn't give 100% data correctness or doesn't work for use cases where we need data in append mode to existing files.Even if this ensures optimised results comes with a risk.The reason for good performance is basically because of less number of rename operations as compared to algorithm 1(still there are renames). Here we might encounter issues of file not found expectations because commitTask writes the file in temporary path and immediately renames them and there are light chances of eventual consistency issues.
Best Practices
Well, everyone in industry working in spark application integrated with AWS cloud and S3 storage are aware of these issues and there are certain best practices we follow during file write to S3. Here are few I think we can use while writing spark data processing applications :
- If you have a HDFS cluster available then write data from Spark to HDFS and copy it to S3 to persist. s3-dist-cp can be used for data copy from HDFS to S3 optimally.Here we can avoid all that rename operation.With AWS EMR being running for only duration of compute and then terminated afterwards to persist result this approach looks preferable.
- Try avoiding writing files and reading it again and again unless there are consumers for the files , and spark is well known for in-memory processing and careful data persistence/cache in-memory will help the optimized run time of the application.
- Writing custom file output commiters optimized and error free with S3. Newbies like me in spark application development of course can’t write such algorithm and it needs lots of programming and computer science knowledge.
- Use the open source solutions provided by Netlix or use Databrick’s DBIO Transactional commit protocol available with Databricks spark distribution or improved S3Guard from Amazon to address S3 consistency issues.I have not evaluated any of these or used but as per documentation and spark issues these are some of the best practices to address the issue.
- Job failures might cause residual data in destination directory when Algorithm 2 is chosen can cause problems for downstream data consumption. For e.g. based on the data available in S3(trigger) if there is a Lambda function invoked to process data and upstream spark job which writes data is failed or takes long time lambda might get trigger as soon as some files are available and we might end up data issues. So in such cases I can think of lambda should be triggered based on _SUCCSS file only in the directory and process the files in the directory.
Finally few configuration below worth taking into consideration for spark applications writing to S3. If you are using a licensed or open source product which uses spark , we can configure these settings if not already done for better performance and of course testing is the way to understand the problem and bench mark optimization.
References
- Apache Hadoop Documentation
- Databricks Documentation
- AWS Documentation
If you liked this article, please Clap 👏 and share it with your network and if you wish you can buy me a coffee.