The original creators of Apache Spark established Databricks to solve the world's toughest data problems. Databricks was launched as a Spark-based unified data analytics platform in the cloud.
In this chapter, we will begin by understanding the internal architecture of Apache Spark™. This will be followed by an introduction to the basic components of Databricks. The following topics will be covered in this chapter:
- Introducing Spark fundamentals
- Introducing Databricks
- Learning about Delta Lake
For this chapter, you will need the following:
- An Azure subscription
- Azure Databricks
Please refer to the code sample from: Code samples from https://github.com/PacktPublishing/Optimizing-Databricks-Workload/tree/main/Chapter01
Introducing Spark fundamentals
- DataFrames: Fundamental data structures consisting of rows and columns.
- Machine Learning (ML): Spark ML provides ML algorithms for processing big data.
- Graph processing: GraphX helps to analyze relationships between objects.
- Streaming: Spark's Structured Streaming helps to process real-time data.
- Spark SQL: A SQL to Spark engine with query plans and a cost-based optimizer.
DataFrames in Spark are built on top of Resilient Distributed Datasets (RDDs), which are now treated as the assembly language of the Spark ecosystem. Spark is compatible with various programming languages – Scala, Python, R, Java, and SQL.
Spark encompasses an architecture with one driver node and multiple worker nodes. The driver and worker nodes together constitute a Spark cluster. Under the hood, these nodes are based in Java Virtual Machines (JVMs). The driver is responsible for assigning and coordinating work between the workers.
The worker nodes have executors running inside each of them, which host the Spark program. Each executor consists of one or more slots that act as the compute resource. Each slot can process a single unit of work at a time.
Every executor reserves memory for two purposes:
The cache section of the memory is used to store the DataFrames in a compressed format (called caching), while the compute section is utilized for data processing (aggregations, joins, and so on). For resource allocation, Spark can be used with a cluster manager that is responsible for provisioning the nodes of the cluster. Databricks has an in-built cluster manager as part of its overall offering.
- Vertical parallelism: Scaling the number of slots in the executors
- Horizontal parallelism: Scaling the number of executors in a Spark cluster
Spark processes the data by breaking it down into chunks called partitions. These partitions are usually 128 MB blocks that are read by the executors and assigned to them by the driver. The size and the number of partitions are decided by the driver node. While writing Spark code, we come across two functionalities, transformations and actions. Transformations instruct the Spark cluster to perform changes to the DataFrame. These are further categorized into narrow transformations and wide transformations. Wide transformations lead to the shuffling of data as data requires movement across executors, whereas narrow transformations do not lead to re-partitioning across executors.
Running these transformations does not make the Spark cluster do anything. It is only when an action is called that the Spark cluster begins execution, hence the saying Spark is lazy. Before executing an action, all that Spark does is make a data processing plan. We call this plan the Directed Acyclic Graph (DAG). The DAG consists of various transformations such as read, filter, and join and is triggered by an action.
Every time a DAG is triggered by an action, a Spark job gets created. A Spark job is further broken down into stages. The number of stages depends on the number of times a shuffle occurs. All narrow transformations occur in one stage while wide transformations lead to the formation of new stages. Each stage comprises of one or more tasks and each task processes one partition of data in the slots. For wide transformations, the stage execution time is determined by its slowest running task. This is not the case with narrow transformations.
At any moment, one or more tasks run parallelly across the cluster. Every time a Spark cluster is set up, it leads to the creation of a Spark session. This Spark session provides entry into the Spark program and is accessible with the
Sometimes, a few tasks process small partitions while others process larger chunks, we call this data skewing. This skewing of data should always be avoided if you hope to run efficient Spark jobs. In a broad execution, the stage is determined by its slowest task, so if a task is slow, the overall stage is slow and everything waits for that to finish. Also, whenever a wide transformation is run, the number of partitions of the data in the cluster changes to 200. This is a default setting, but can be modified using Spark configuration.
As a rule of thumb, the total number of partitions should always be in the multiples of the total slots in the cluster. For instance, if a cluster has 16 slots and the data has 16 partitions, then each slot receives 1 task that processes 1 partition. But instead, if there are 15 partitions, then 1 slot will remain empty. This leads to the state of cluster underutilization. In the case of 17 partitions, a job will take twice the time to complete as it will wait for that 1 extra task to finish processing.
Let's move on from Spark for now and get acquainted with Databricks.
Databricks provides a collaborative platform for data engineering and data science. Powered by the potential of Apache Spark™, Databricks helps enable ML at scale. It has also revolutionized the existing data lakes by introducing the Lakehouse architecture. You can refer to the following published whitepaper to learn about the Lakehouse architecture in detail: http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf.
- Data engineer: Create ETL and ELT pipelines to run big data workloads.
- Data scientist: Perform exploratory data analysis and train ML models at scale.
- Data analyst: Perform big data analytics harnessing the power of Apache Spark.
- Business intelligence analyst: Build powerful dashboards using Databricks SQL Analytics.
Databricks and Spark together provide a unified platform for big data processing in the cloud. This is possible because Spark is a compute engine that remains decoupled from storage. Spark in Databricks combines ETL, ML, and real-time streaming with collaborative notebooks. Processing in Databricks can scale to petabytes of data and thousands of nodes in no time!
Spark can connect to any number of data sources, including Amazon S3, Azure Data Lake, HDFS, Kafka, and many more. As Databricks lives in the cloud, spinning up a Spark cluster is possible with the click of a button. We do not need to worry about setting up infrastructure to use Databricks. This enables us to focus on the data at hand and continue solving problems.
Currently, Databricks is available on all four major cloud platforms, Amazon Web Services (AWS), Microsoft Azure, Google Cloud Platform, and Alibaba Cloud. In this book, we will be working on Azure Databricks with the standard pricing tier. Databricks is a first-party service in Azure and is deeply integrated with the complete Azure ecosystem.
Since Azure Databricks is a cloud-native managed service, there is a cost associated with its usage. To view the Databricks pricing, check out https://azure.microsoft.com/en-in/pricing/details/databricks/.
Creating an Azure Databricks workspace
To create a Databricks instance in Azure, we will need an Azure subscription and a resource group. An Azure subscription is a gateway to Microsoft's cloud services. It entitles us to create and use Azure's services. A resource group in Azure is equivalent to a logical container that hosts the different services. To create an Azure Databricks instance, we need to complete the following steps:
- Go to Azure's website, portal.azure.com, and log in with your credentials.
- In the Navigate section, click on Resource groups and then Create.
- In Subscription, set the name of the Azure subscription, set a suitable name in Resource group, set Region, and click on Review + create.
- A Validation passed message will flash at the top; then, click on Create. Once the resource group is created, a notification will pop up saying Resource group created. Following the message, click on Go to resource group.
- This opens an empty resource group. Now it is time to create a Databricks instance. Click on Create and select Marketplace. In the search bar, type
Azure Databricksand select it from the drop-down menu. Click on Create.
- Set Subscription, Resource group, a name, Region, and Pricing Tier (Standard, Premium, or Trial). In this book, we will be working with the standard pricing tier of Azure Databricks.
- Click on Next : Networking. We will not be creating Databricks in a VNet. So, keep both the options toggled to No. Finally, click on Review + create. Once the validation is successful, click on Create. It takes a few minutes for the Databricks instance to get created. Once the deployment is complete, click on Go to Resource. To launch the Databricks workspace, click on Launch Workspace.
Now that we have a workspace up and running, let's explore how we can apply it to different concepts.
Core Databricks concepts
The Databricks workspace menu is displayed on the left pane. We can configure the menu based on our workloads, Data Science and Engineering or Machine Learning. Let's start with the former. We will learn more about the ML functionalities in Chapter 3, Learning about Machine Learning and Graph Processing with Databricks. The menu consists of the following:
- Workspace: A repository with a folder-like structure that contains all the Azure Databricks assets. These assets include the following:
- Notebooks: An interface that holds code, visualizations, and markdown text. Notebooks can also be imported and exported from the Databricks workspace.
- Library: A package of commands made available to a notebook or a Databricks job.
- Dashboard: A structured representation of the selective visualizations used in a notebook.
- Folder: A logical grouping of related assets in the workspace.
- Repos: This provides integration with Git providers such as GitHub, Bitbucket, GitLab, and Azure DevOps.
- Recents: Displays the most recently used notebooks in the Databricks workspace.
- Search: The search bar helps us to find assets in the workspace.
- Data: This is the data management tab that is built on top of a Hive metastore. Here, we can find all the Hive tables registered in the workspace. The Hive metastore stores all the metadata information, such as column details, storage path, and partitions, but not the actual data that resides in a cloud storage location. The tables are queried using Apache Spark APIs including Python, Scala, R, and SQL. Like any data warehouse, we need to create a database and then register the Hive tables.
- Compute: This is where we interact with the Spark cluster. It is further divided into three categories:
- All-Purpose Clusters: These are used to run Databricks notebooks and jobs. An all-purpose cluster can also be shared among users in the workspace. You can manually terminate or restart an all-purpose cluster.
- Job Clusters: These are created by the Azure Databricks job scheduler when a new job is created. Once the job is complete, it is automatically terminated. It is not possible to restart a job cluster.
- Pools: A pool is a set of idle node instances that help reduce cluster start up and autoscaling times. When a cluster is attached to a pool, it acquires the driver and worker instances from within the pool.
Now that we have an understanding of the core concepts of Databricks, let's create our first Spark cluster!
Creating a Spark cluster
It is time to create our first cluster! In the following steps, we will create an all-purpose cluster and later attach it to a notebook. We will be discussing cluster configurations in detail in Chapter 4, Managing Spark Clusters:
- Select the Clusters tab and click on Create Cluster. Give an appropriate name to the cluster.
- Set Cluster Mode to Standard. Standard cluster mode is ideal when there is a single user for the cluster. High Concurrency mode is recommended for concurrent users, but it does not support Scala. A Single Node cluster has no worker nodes and is only suitable for small data volumes.
- Leave the Pool option as None and Databricks Runtime Version as the default value. The Databricks runtime version decides the Spark version and configurations for the cluster.
- For Autopilot Options, disable the Enable autoscaling checkbox. Autoscaling helps the cluster to automatically scale between the maximum and minimum number of worker nodes. In the second autopilot option, replace 120 with 030 to terminate the cluster after 30 minutes of inactivity.
- We can leave the Worker Type and Driver Type options as their default values. Set the number of workers to
01. Keep the Spot instances checkbox disabled. When enabled, the cluster uses Azure Spot VMs to save costs.
- Click on Create Cluster.
With the Spark cluster initialized, let's create our first Databricks notebook!
Now we'll create our first Databricks notebook. On the left pane menu, click on Create and select Notebook. Give a suitable name to the notebook, keep the Default Language option as Python, set Cluster, and click on Create.
We can create documentation cells to independently run blocks of code. A new cell can be created with the click of a button. For people who have worked with Jupyter Notebooks, this interface might look familiar.
We can also execute code in different languages right inside one notebook. For example, the first notebook that we've created has a default language of Python, but we can also run code in Scala, SQL, and R in the same notebook! This is made possible with the help of magic commands. We need to specify the magic command at the beginning of a new cell:
Let us look at executing code in multiple languages in the following image:
Databricks notebooks also support rendering HTML graphics using the
We can use the
%sh magic command to run shell commands on the driver node.
Databricks provides a Databricks Utilities (dbutils) module to perform tasks collectively. With
dbutils, we can work with external storage, parametrize notebooks, and handle secrets. To list the available functionalities of
dbutils, we can run
dbutils.help() in a Python or Scala notebook.
The notebooks consist of another feature called widgets. These widgets help to add parameters to a notebook and are made available with the
dbutils module. By default, widgets are visible at the top of a notebook and are categorized as follows:
- Text: Input the string value in a textbox.
- Dropdown: Select a value from a list.
- Combobox: Combination of text and drop-down widgets.
- Multiselect: Select one or more values from a list:
Databricks File System (DBFS)
DBFS is a filesystem mounted in every Databricks workspace for temporary storage. It is an abstraction on top of a scalable object store in the cloud. For instance, in the case of Azure Databricks, the DBFS is built on top of Azure Blob Storage. But this is managed for us, so we needn't worry too much about how and where the DBFS is actually located. All we need to understand is how can we use DBFS inside a Databricks workspace.
- We can persist data to DBFS so that it is not lost after the termination of the cluster.
- DBFS allows us to mount object storage such as Azure Data Lake or Azure Blob Storage in the workspace. This makes it easy to access data without requiring credentials every time.
- We can access and interact with the data using the directory semantics instead of using URLs for object storage.
- With the
%fsmagic command: We can use the
%fscommand in a notebook cell.
dbutils: We can call the
dbutilsmodule to access the DBFS. Using
dbutils.fs.ls("<path>")is equivalent to running
%fs ls <path>. Here,
<path>is a DBFS path. Both these commands list the directories in a specific DBFS "path."
We need to enclose
dbutils.fs.ls("path") in Databricks'
display() function to obtain a rendered output.
A Databricks job helps to run and automate activities such as an ETL job or a data analytics task. A job can be executed either immediately or on a scheduled basis. A job can be created by using the UI or CLI or invoking the Jobs UI. We will now create a job using the Databricks UI:
- Create a new Databricks Python notebook with the name
jobs-notebookand paste the following code in a new cell. This code creates a new delta table and inserts records into the table. We'll learn about Delta Lake in more detail later in this chapter. Note that the following two code blocks must be run in the same cell.
The following code block creates a delta table in Databricks with the name of
insurance_claims. The table has four columns,
%sql -- Creating a delta table and storing data in DBFS -- Our table's name is 'insurance_claims' and has four columns CREATE OR REPLACE TABLE insurance_claims ( user_id INT NOT NULL, city STRING NOT NULL, country STRING NOT NULL, amount INT NOT NULL ) USING DELTA LOCATION 'dbfs:/tmp/insurance_claims';
INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (100, 'Mumbai', 'India', 200000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (101, 'Delhi', 'India', 400000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (102, 'Chennai', 'India', 100000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (103, 'Bengaluru', 'India', 700000);
- On the workspace menu, click on Jobs and then Create Job. Give a name to the job and keep Schedule Type as Manual (Paused). Under the Task heading, set Type to Notebook, select the
jobs-notebooknotebook that we created, and in Cluster, select an existing all-purpose cluster.
- Keep Maximum Concurrent Runs at the default value of 1. Under Alerts, click on Add. Add the email address to which alerts must be sent and select Success and Failure. This will ensure that the designated email address will be notified upon a job success or failure.
- Click on Create. Once the job is created, click on Runs and select Run Now.
- As soon as the job completes, we will receive an email informing us whether the job succeeded or failed. If the job is in progress, we can find more information about the current run under Active Runs.
- When the job finishes, a new record will be added under Completed Runs (past 60 days) giving the start time, mode of launch, duration of run, and status of run.
Databricks Community is a platform that provides a free-of-cost Databricks workspace. It supports a single node cluster wherein we have one driver and no workers. The community platform is great for beginners to get started with Databricks. But several features of Azure Databricks are not supported in the Community edition. For example, we cannot create jobs or change cluster configuration settings. To sign up for Databricks Community, visit https://community.cloud.databricks.com/login.html.
Learning about Delta Lake
Delta Lake was launched by Databricks as an open source project owned by the Linux Foundation that converts a traditional data lake into a lakehouse. The term lakehouse refers to a platform that brings in the best of both data lakes and warehouses. Delta Lake offers the following features:
- ACID transactions: Data readers never see inconsistent data, also called dirty reads.
- Handling metadata: Spark's distributed processing power makes it easy to handle the metadata of tables with terabytes and petabytes of data in a scalable fashion.
- Streaming and batch workloads: Delta Lake helps unify stream and batch processing on data.
- Schema enforcement: The schema is always checked before data gets appended. This helps to prevent the corruption of data.
- Time travel: View and roll back to previous versions of data enabling audit logs.
- Upserts and deletes: Perform transactions such as updates, inserts, and deletes on data lying in the lake.
Next, we'll look at big data file formats.
Big data file formats
Before we dive deeper into Delta Lake, let's first try to understand the file formats used to store big data. Traditional file formats such as CSV and TSV store data in a row-wise format and are not partitioned. CSVs are basically strings without any data types so they always need to be scanned entirely without any scope for filtering. This makes it difficult for processing and querying larger datasets. Instead, file formats such as Parquet, ORC, and Avro help us overcome many such challenges as they can be stored in a distributed fashion.
Row-based file formats store data by row, whereas columnar file formats store data by column. Row-based file formats work best for transactional writes of data, whereas columnar file formats are ideal for data querying and analytical workloads.
Let us look at row-based file formats versus columnar file formats in the following image:
- All three file formats are machine-readable formats and not human-readable.
- They can be partitioned across a cluster for parallel processing and distribution.
- The formats carry the data schema as well. This helps newer machines or clusters to process the data independently.
Coming back to Delta Lake, it can simply be treated as a file format. Tables that are created on top of this delta file format are simply called delta tables. The delta file format is mainly composed of two components:
- Transactional logs: A
_delta_logfolder is created when data is written in the delta file format. This folder stores files that record all the transactions to data.
- Versioned Parquet files: This is the actual data that is written out as partitions. These Parquet partition files (
.parquet) can also be compacted later using different functions. For efficient querying purposes, these Parquet partition files can also be distributed based on partition folders.
It's also important for us to understand the use and value of the transactional log.
Understanding the transactional log
Whenever a transaction is carried out on a delta table, the changes are recorded in the
_delta_log folder in the form of JSON files. The naming conventions of these JSON files begin sequentially, starting with
000000.json. Subsequent JSON files are created as changes get committed (
000002.json, and so on). Also, with each fresh transaction, a new set of Parquet files may be written. In this process, the new JSON file created in the
_delta_log folder keeps a record of which Parquet files to reference and which to omit. This happens because every transaction to a delta table results in a new version of the table.
Let's see how this works with an example. Suppose we have a delta table with a
_delta_log folder containing
00000.json. Suppose this JSON file references two Parquet files,
Now we have an
UPDATE transaction carried out on the delta table. This creates a new JSON file in the
_delta_log folder by the name of
00001.json. Also, a new Parquet file is added in the delta table's directory,
part-0002.parquet. Upon checking the new JSON file, we find that it references
part-0002.parquet but omits
Delta Lake in action
Let's start by creating a Spark DataFrame by reading a CSV file. Create a new Databricks Python notebook and spin up a Spark cluster with one driver, one worker, the standard type, and autoscaling disabled. Every code block in the following section must be executed in a new notebook cell:
- We will be using the
airlinesdataset from the
databricks-datasetsrepository. Databricks provides many sample datasets in every workspace. These are part of the
databricks-datasetsdirectory of the DBFS. The following code block creates a new Spark DataFrame by specifying the first row as the header, automatically inferring the schema, and reading from a CSV file. Once the DataFrame is created, we will display the first five rows:
airlines = (spark.read .option("header",True) .option("inferSchema",True) .option("delimiter",",") .csv("dbfs:/databricks-datasets/airlines/part-00000") # View the dataframe display(airlines.limit(5))
- Next, we will write the DataFrame as a delta file in the DBFS. Once the writing process is complete, we can look at the contents of the delta file. It contains four Parquet files and a
We can view the location where the data is written in delta format:
- Inside the
_delta_logfolder, we can find one JSON file:
- Now, we will create a delta table using the delta file that is written to Azure Blob Storage. Here, we will switch from PySpark to Spark SQL syntax using the
%sqlmagic command. The name of the delta table created is
airlines_delta_table. A count operation on the newly created delta table returns the number of records in the table:
%sql DROP TABLE IF EXISTS airlines_delta_table; CREATE TABLE airlines_delta_table USING DELTA LOCATION "dbfs:/airlines/"; %sql SELECT COUNT(*) as count FROM airlines_delta_table
- Let's perform a
DELETEoperation on the delta table. This will delete all the rows where the
10. This deletes 448,620 rows from the delta table:
%sql DELETE FROM airlines_delta_table WHERE Month = '10'
- Next, we will perform an
UPDATEoperation on the delta table. This transaction will update the
Destcolumn and replace all the
San Francisco. We can also see that 7,575 rows received updates in the table:
%sql UPDATE airlines_delta_table SET Dest = 'San Francisco' WHERE Dest = 'SFO'
- Before we move forward, let's look at the Parquet files and transactional logs folder once again. Inside the delta file, we can see that more Parquet files have been added after two transactions (
UPDATE) were carried out:
- Also, the
_delta_logfolder now contains two more JSON files, one for each transaction:
- Finally, it is time for time travel! Running the
DESCRIBE HISTORYcommand on the delta table returns a list of all the versions of the table:
%sql -- Time travel DESCRIBE HISTORY airlines_delta_table
- Switching to a previous version is as easy as adding
VERSION AS OFto the delta table. First, we'll try to query the data based on the condition that got updated. For instance, after the update operation, no record should have the
SFOvalue. Hence, we get a count of 0:
%sql -- Return count of rows where Dest = 'SFO' in current version that is version 2 SELECT COUNT(*) FROM airlines_delta_table WHERE Dest = 'SFO'
- But when the same query is run on the previous version of the delta table (version 1), we get a count of 7,575. This is because this SQL query is querying on the data that existed before the update operation:
%sql -- Return count of rows where Dest = 'SFO' in version 1 SELECT COUNT(*) FROM airlines_delta_table VERSION AS OF 1 WHERE Dest = 'SFO'
Let's recap what we've covered in this first chapter.
In this chapter, we learned about the fundamentals of Spark, got an introduction to Databricks, and explored Delta Lake. We were introduced to Azure Databricks and the important workspace components. We learned how to create an Azure Databricks instance, a notebook in the Databricks workspace, Spark clusters, and Databricks jobs. We also learned about the important big data file formats such as Parquet, Avro, and ORC. We also learned about the fundamentals of Delta Lake and went through a worked-out example.
In the next chapter, we will dive deeper into the concepts of batch and stream processing in Azure Databricks. We'll also see more examples in the chapter to practice working hands-on in a Databricks environment.