Adding cloud functions before your workflow for a cost-effective solution
Databricks is a Platform-as-a-Service offering that lets Data Engineers perform ingestion, exploration, and transformation of data. Recently, they included the ability to create data pipelines into their platform with multi-task workflows. In addition, they provide a configurable and scalable data ingestion feature called the Autoloader. With it, you can ingest data both on-demand and in a scheduled fashion. However, if you want to ingest your data as soon as it comes, you must have your job active while it waits for the files to arrive. This is incompatible with the workflow feature and costs you money for having a compute cluster indefinitely turned on. Such is the case that a former company where I worked spent thousands of dollars on a single project having idle clusters listening for data.
But do you have to do it that way though? Using cloud functions as your data sensor, you can cost-effectively trigger your Databricks pipelines. You will only use your cluster for actual processing. Here’s how!
You can find my companion repository here. Microsoft Azure will be the Cloud provider for this particular exercise, but the concepts can be translated to other providers. The dataset will be the “New York Yellow Taxi Trip Data” and can be found here. However, there are many partitions of the dataset. Download one CSV file and create a new file with a subset of that data to minimize waiting when uploading it to the Azure Container in the testing step.
You will need a storage service where your data is going to land and will be later picked by Databricks for consumption. Such service could be an Amazon S3, Azure Blob Storage Container, etc. This entry will use Azure services and VS Code.
In the Azure plugin, there is a tab called “Resources”. Click on the “+” button to the right and select “Create Storage Account”:
Give it a unique name, and an Azure region. Then, check the Azure Activity log in the terminal area until the resource is created. Look for the resource in the left pane, right-click and create a new container:
After the container is created, you will need to mount it on the Databricks filesystem. To do that, run this code cell inside your Databricks workspace:
Now, create the appropriate tables based on your data model. In this simple example, a single table is used:
This table will receive taxi trip data coming from the files in the staging area.
This is the core of the workload! But first, let us define the required parameters in Databricks using
The Autoloader query will define the ETL from the staging area to the table. Let us review the code:
cloudFiles: It indicates to the structured streaming query, that you are going to use the Autoloader.
cloudFiles.format: The format of the data coming in. In this case, CSV.
cloudFiles.schemaHints: By default, all the discovered fields are going to be treated as strings. Here you can specify the datatypes of specific fields.
load(): To specify the location of the data that is going to be loaded. In this case, the location of the mounted container.
select(): To pick a subset of all the columns.
trigger(availableNow=True): Very important. It specifies to the query that you want to process all the data that has not been consumed yet and is available right now. Otherwise, this query will run every half a second by default.
table(): To store the data in a specified table.
Now that the query is defined, the actual data pipeline is going to be created. The beauty of the redefined Databricks workflows is that you can define a set of tasks and dependencies. Something that could not be done on this platform before. For this easy example, a single task is going to be defined. You can read about how to create more complex workflows here. Also, check out my article about pipeline orchestration in Databricks.
Inside “Path”, look for the location of the notebook where the Autoloader script is. Furthermore, issue the required arguments as depicted in the screenshot:
a couple of things to note:
dbfs:/mnt/testis the directory of the mounted Azure storage container, as defined earlier in the mounting snippet.
defaultis the name of the database that gets created by default.
- checkpoint_directory is set to
/tmp/which is a directory that gets wiped out every time the cluster restarts. Never set it as a checkpoint directory on a real scenario, this is just for practice.
This is the key component to enabling the intended on-demand behavior. The function will be triggered as soon as the data arrives at the staging area and, in turn, it will run the Databricks Workflow using the Databricks Jobs API.
To create a function in VS Code, look for the Azure plugin. On the workspace tab select “+” and click “Create Function”:
Now, one great thing is that we get a variety of options for the function trigger. This means that our Databricks workflow can be activated by any of these methods. For this exercise, follow these steps:
- Click on the “Azure Blob Storage” trigger.
- Give the function a unique name and hit enter.
- Click on “Create new local app setting”.
Your VS Code folder should now be populated with files from the created function. When the project is available in the editor, define the required environment variables inside
: Will hold the connection string from your Azure storage account. To get the value, go to the Azure portal and select your storage account. Click on“Access Keys” in the left pane and copy the connection string.
DB_JOB_ID: Is the id of your Databricks job. After your job is created copy the id associated with it and paste it here.
DATABRICKS_HOST: Is the URL of your Databricks workspace. It should be similar to “https://adb-xxxxxxxxx.xx.azuredatabricks.net/”
DATABRICKS_TOKEN: You need to create a Databricks Personal Access Token and insert the value here. This will allow the function to interact with your Databricks workspace.
- Ignore the other parameters.
After the parameters are set, we are going to implement the behavior of the function inside the
Things to note from this script:
- Databricks CLI is a tool made for the shell. However, this script is leveraging the fact that it is written in python and we can use those objects for the logic of this function. Include
EnvironmentVariableConfigProvideris a class from the Databricks CLI and uses the
DATABRICKS_TOKENenvironment variables to initialize the environment.
JobsServiceis also a class from the Datbaricks CLI, and allows you to create, run, list, and delete jobs. It is a very powerful tool.
- The control flow of the code is designed to avoid multiple function executions to trigger many concurrent runs of the same job. There should be only one run at a time.
After the code is complete we have to create the actual cloud resource and upload the configuration we just implemented. For that, under the Azure plugin, in the “Resources” pane click on “Create a Function App in Azure”:
- Give the function a unique name and hit enter.
- Click on the newest python runtime stack.
- For this example, select East US as the region. In more realistic scenarios you should pick the region that best fits your business case.
Upon the creation of the function. Right-click on the latter and select “Deploy to Function App”:
This will upload the code to the cloud resource. To finalize, right-click under the “Application Settings” sub-menu inside your function and select “Upload Local Settings”:
It will set the
local.settings.json contents as the environment variables of our function app. That is it! The Databricks workspace should be able to react to file uploads in your storage container and run the appropriate workflow.
Now that the infrastructure is ready, it is time to drop the sample dataset. The initial yellow trip data is too heavy to upload quickly. Furthermore, pick a small sample of records and upload it to the storage container:
As soon as the file is loaded, the function will be triggered, and in turn, it will signal the Databricks job to run. This is a sample of the logs produced by the Azure function:
In addition, if you go to Databricks after a couple of minutes, the job will be finished:
And the table should be populated with the yellow trip data records:
It works like magic!
The implementation of this project was done in Visual Studio Code and the Databricks workspace. However, if you need your infrastructure to be CI-CD or IaC friendly, you can always use the Azure resource manager templates and the Azure CLI to create the environment. Just extract the core concept of this article, and adapt it to your use case.
Additionally, test and tweak the behavior of the function and the Databricks job accordingly. This logic is not battle-tested for loading a large number of files at very small intervals. However, it should provide a starting point so that you can ingest data on-demand without the need for an indefinitely active cluster consuming resources and money while just listening for data.
Finally, the job is set to create a cluster in a cold start setting. This will add some minutes to the execution time. Consider creating compute pools so that clusters can use pre-warmed instances if you need your jobs to run as fast as possible.