Large-scale file processing using Snowpark multi-threading and Asynch Jobs capability

Dated: Feb-2024

Key Take-ways

  • Walk through the solution pattern for processing multiple files (850,000).
  • Highlight new capabilities in Snowpark.
  • Demonstrate a POC implementation for Indexing LiDar files.

Snowflake as the Data Cloud Platform, offers the ability to process both structured and unstructured data as a core Data-Lake feature. Many of our customers have implemented numerous scenarios to process files. Here are 3 such scenarios that I have implemented in the past:

And there are many more such use-cases that have been implemented by our customers. A common ask from our customers has been how to process large quantities of files natively in Snowflake.

In this article, I am demonstrating one possible solution that solves this scenario. To demonstrate the functionality I am showcasing how to index 850,000 LiDar files. Indexing is the process of parsing and extracting core metadata information for a given LiDar file and storing this information which can then be used for filtering. The LiDar files are sourced from USGS-LiDar hosted in AWS Open Data, specifically the ones present in the folder: USGS_LPC_TX_Panhandle_B10_2017_LAS_2019/ept-data

Solution Overview

I will be explaining the steps/processes in a bottom-up approach. The key features of Snowflake that make this possible are:

Partitioning / Bucketing files

We want to parallelize the parsing functionality; to achieve this we form groups/partitions of files. This does not mean we are moving or copying files around, but rather define a group ID and assign the files to it. Since there are no relationships between these files, they can be grouped in any order and also can be processed in any random order. Instead of doing this operation in memory using the pandas or numpy mechanism, I am using Snowflake. This will allow for easier interrogation and also enable stored procedures to query the table and retrieve the list of files easily.

- First, we do a list stage operation and store the results in a table ‘usgs_data_files’. Snowflake Directory table, is often the go-to mechanism to make the process simpler. However, there is currently a limitation on the number of files in the stage and how much the directory table can keep track of. The AWS open data has far more than a couple of million files; hence, the directory table would not work.

- I use the NTILE function to split the list of files into equal groups and store the group id/partition in the column ‘processing_bucket’.

The number of groups depends on your SLA and how much parallelism you would want to control. In my demo, I chose an arbitrary number of 100.

Parsing function

We define a function _get_metadata_of_file’, which essentially handles the process of reading the file and extracting the metadata of the lidar file. The function is implemented as stateless, which reads the file and returns the response. It does not retain any memory or interact with other modules.

Store procedure for parsing

We define a snowpark stored procedure ‘batch_parse_and_extract_lazmetadata_sproc’, which takes in the bucket / group ID as a paramter and does the below steps:

- splits or form mini-batch of 10 records representing the location url for the file.

- using [Joblib.Parallel], concurrently parse the files for each batch.

- captures the result and stores them in a parquet file

- uploads the parque file to a stage

You might be wondering why not write or merge records into a table, instead of storing as parquet files. It is possible, I had taken this approach, as each write_pandas like operation results in creating a temporary stage & table step. Staging the parquet file and then doing a copy operation, later in the pipeline, saves some time.

Orchestrating

The above stored procedure has been developed to process only 1 group at a time. Turns out, you can even further parallelize this operation with the newly available Asynch Job functionality.

As you see below we could now call multiple instance of the stored procedure, with each instance processing different groups. During my initial experiments I was able to run 40 instances in a single XS warehouse. Oooffff!! didnt realize the XS warehouse had this level horse power.

You could enhance this further by running this stored procedure across multiple warehouses too. In my demo I had this done across 6 warehouse with each running 17 instance of the stored procedures.

Why different warehouse ? what happened to multi-clustering ?

Snowpark stored procedure, currently is limited to running in a single node for a given cluster. Adding more and more instance of the stored procedure to a warehouse would not result in horizontal scalling. This is evident based on the screenshot below:

So instead of Snowflake current limitation on horizontal scalling for stored proc; i created 6 warehouses and distributed the workload accordingly. I choose 6 , but i could easily choose 4 or even 10.

In the future, you should look into the option of defining the above orchestration with DAG & TASK.

Loading the parquet file

Once the processes has finished, you can copy the files into a pre-defined table and working on the data.

Gist

Below I am sharing the notebook, which demonstrates the solution.

Link: https://gist.github.com/sfc-gh-vsekar/4d4677f6f3db5a07793b4136e433fce2

Learnings and Observations

  • As of today, Snowflake does not have the ability to define a stored procedure which could create asychronous jobs. This might be possible in the future; based on what demands and roadmaps, reach out to your Snowflake contact.

Observations

Based on the above solution here is the results:

  • I need to index approximately 850,000 files.
  • It took on an average 30 min to process 1 group / bucket, containing approximately 9,000 files.
  • Distributing the workload I had 6 X-Small warehouses running with 17 jobs simultaneously.
  • The whole end-to-end was completed approximately in 35 minutes.
  • If the cost of warehouse is roughly $3 / credit. The whole process would cost less than $20. No additional infrastructure components needed :-)

Other scenarios and use-case

I could see this distributed level of processing can be applied to many scenarios. For example to name a few:

  • Processing HL7 / DICOM / FHIR record files in the Health care sector
  • Processing various EDI files, used in manufacturing sector
  • Processing raster images and other file formats in Geospatial related use-cases

For now: Get Inspired -> Learn -> Develop -> Share -> ☺️

--

--