The original creators of Apache Spark established Databricks to solve the world's toughest data problems. Databricks was launched as a Spark-based unified data analytics platform in the cloud.
In this chapter, we will begin by understanding the internal architecture of Apache Spark™. This will be followed by an introduction to the basic components of Databricks. The following topics will be covered in this chapter:
For this chapter, you will need the following:
Please refer to the code sample from: Code samples from https://github.com/PacktPublishing/Optimizing-Databricks-Workload/tree/main/Chapter01
Spark is a distributed data processing framework capable of analyzing large datasets. At its very core, it consists of the following:
DataFrames in Spark are built on top of Resilient Distributed Datasets (RDDs), which are now treated as the assembly language of the Spark ecosystem. Spark is compatible with various programming languages – Scala, Python, R, Java, and SQL.
Spark encompasses an architecture with one driver node and multiple worker nodes. The driver and worker nodes together constitute a Spark cluster. Under the hood, these nodes are based in Java Virtual Machines (JVMs). The driver is responsible for assigning and coordinating work between the workers.
The worker nodes have executors running inside each of them, which host the Spark program. Each executor consists of one or more slots that act as the compute resource. Each slot can process a single unit of work at a time.
Every executor reserves memory for two purposes:
The cache section of the memory is used to store the DataFrames in a compressed format (called caching), while the compute section is utilized for data processing (aggregations, joins, and so on). For resource allocation, Spark can be used with a cluster manager that is responsible for provisioning the nodes of the cluster. Databricks has an in-built cluster manager as part of its overall offering.
Note
Executor slots are also called cores or threads.
Spark supports parallelism in two ways:
Spark processes the data by breaking it down into chunks called partitions. These partitions are usually 128 MB blocks that are read by the executors and assigned to them by the driver. The size and the number of partitions are decided by the driver node. While writing Spark code, we come across two functionalities, transformations and actions. Transformations instruct the Spark cluster to perform changes to the DataFrame. These are further categorized into narrow transformations and wide transformations. Wide transformations lead to the shuffling of data as data requires movement across executors, whereas narrow transformations do not lead to re-partitioning across executors.
Running these transformations does not make the Spark cluster do anything. It is only when an action is called that the Spark cluster begins execution, hence the saying Spark is lazy. Before executing an action, all that Spark does is make a data processing plan. We call this plan the Directed Acyclic Graph (DAG). The DAG consists of various transformations such as read, filter, and join and is triggered by an action.
Every time a DAG is triggered by an action, a Spark job gets created. A Spark job is further broken down into stages. The number of stages depends on the number of times a shuffle occurs. All narrow transformations occur in one stage while wide transformations lead to the formation of new stages. Each stage comprises of one or more tasks and each task processes one partition of data in the slots. For wide transformations, the stage execution time is determined by its slowest running task. This is not the case with narrow transformations.
At any moment, one or more tasks run parallelly across the cluster. Every time a Spark cluster is set up, it leads to the creation of a Spark session. This Spark session provides entry into the Spark program and is accessible with the spark
keyword.
Sometimes, a few tasks process small partitions while others process larger chunks, we call this data skewing. This skewing of data should always be avoided if you hope to run efficient Spark jobs. In a broad execution, the stage is determined by its slowest task, so if a task is slow, the overall stage is slow and everything waits for that to finish. Also, whenever a wide transformation is run, the number of partitions of the data in the cluster changes to 200. This is a default setting, but can be modified using Spark configuration.
As a rule of thumb, the total number of partitions should always be in the multiples of the total slots in the cluster. For instance, if a cluster has 16 slots and the data has 16 partitions, then each slot receives 1 task that processes 1 partition. But instead, if there are 15 partitions, then 1 slot will remain empty. This leads to the state of cluster underutilization. In the case of 17 partitions, a job will take twice the time to complete as it will wait for that 1 extra task to finish processing.
Let's move on from Spark for now and get acquainted with Databricks.
Databricks provides a collaborative platform for data engineering and data science. Powered by the potential of Apache Spark™, Databricks helps enable ML at scale. It has also revolutionized the existing data lakes by introducing the Lakehouse architecture. You can refer to the following published whitepaper to learn about the Lakehouse architecture in detail: http://cidrdb.org/cidr2021/papers/cidr2021_paper17.pdf.
Irrespective of the data role in any industry, Databricks has something for everybody:
Databricks and Spark together provide a unified platform for big data processing in the cloud. This is possible because Spark is a compute engine that remains decoupled from storage. Spark in Databricks combines ETL, ML, and real-time streaming with collaborative notebooks. Processing in Databricks can scale to petabytes of data and thousands of nodes in no time!
Spark can connect to any number of data sources, including Amazon S3, Azure Data Lake, HDFS, Kafka, and many more. As Databricks lives in the cloud, spinning up a Spark cluster is possible with the click of a button. We do not need to worry about setting up infrastructure to use Databricks. This enables us to focus on the data at hand and continue solving problems.
Currently, Databricks is available on all four major cloud platforms, Amazon Web Services (AWS), Microsoft Azure, Google Cloud Platform, and Alibaba Cloud. In this book, we will be working on Azure Databricks with the standard pricing tier. Databricks is a first-party service in Azure and is deeply integrated with the complete Azure ecosystem.
Since Azure Databricks is a cloud-native managed service, there is a cost associated with its usage. To view the Databricks pricing, check out https://azure.microsoft.com/en-in/pricing/details/databricks/.
To create a Databricks instance in Azure, we will need an Azure subscription and a resource group. An Azure subscription is a gateway to Microsoft's cloud services. It entitles us to create and use Azure's services. A resource group in Azure is equivalent to a logical container that hosts the different services. To create an Azure Databricks instance, we need to complete the following steps:
Azure Databricks
and select it from the drop-down menu. Click on Create.Now that we have a workspace up and running, let's explore how we can apply it to different concepts.
The Databricks workspace menu is displayed on the left pane. We can configure the menu based on our workloads, Data Science and Engineering or Machine Learning. Let's start with the former. We will learn more about the ML functionalities in Chapter 3, Learning about Machine Learning and Graph Processing with Databricks. The menu consists of the following:
Now that we have an understanding of the core concepts of Databricks, let's create our first Spark cluster!
It is time to create our first cluster! In the following steps, we will create an all-purpose cluster and later attach it to a notebook. We will be discussing cluster configurations in detail in Chapter 4, Managing Spark Clusters:
01
. Keep the Spot instances checkbox disabled. When enabled, the cluster uses Azure Spot VMs to save costs.With the Spark cluster initialized, let's create our first Databricks notebook!
Now we'll create our first Databricks notebook. On the left pane menu, click on Create and select Notebook. Give a suitable name to the notebook, keep the Default Language option as Python, set Cluster, and click on Create.
We can create documentation cells to independently run blocks of code. A new cell can be created with the click of a button. For people who have worked with Jupyter Notebooks, this interface might look familiar.
We can also execute code in different languages right inside one notebook. For example, the first notebook that we've created has a default language of Python, but we can also run code in Scala, SQL, and R in the same notebook! This is made possible with the help of magic commands. We need to specify the magic command at the beginning of a new cell:
%python
or %py
%r
%scala
%sql
Note
The %pip
magic command can also be used in Databricks notebooks to manage notebook-scoped libraries.
Let us look at executing code in multiple languages in the following image:
We can also render a cell as Markdown using the %md
magic command. This allows us to add rendered text between cells of code.
Databricks notebooks also support rendering HTML graphics using the displayHTML
function. Currently, this feature is only supported for Python, R, and Scala notebooks. To use the function, we need to pass in HTML, CSS, or JavaScript code:
We can use the %sh
magic command to run shell commands on the driver node.
Databricks provides a Databricks Utilities (dbutils) module to perform tasks collectively. With dbutils
, we can work with external storage, parametrize notebooks, and handle secrets. To list the available functionalities of dbutils
, we can run dbutils.help()
in a Python or Scala notebook.
The notebooks consist of another feature called widgets. These widgets help to add parameters to a notebook and are made available with the dbutils
module. By default, widgets are visible at the top of a notebook and are categorized as follows:
We can also run one notebook inside another using the %run
magic command. The magic command must be followed by the notebook path.
DBFS is a filesystem mounted in every Databricks workspace for temporary storage. It is an abstraction on top of a scalable object store in the cloud. For instance, in the case of Azure Databricks, the DBFS is built on top of Azure Blob Storage. But this is managed for us, so we needn't worry too much about how and where the DBFS is actually located. All we need to understand is how can we use DBFS inside a Databricks workspace.
DBFS helps us in the following ways:
DBFS has a default storage location called the DBFS root. We can access DBFS in several ways:
%fs
magic command: We can use the %fs
command in a notebook cell.dbutils
: We can call the dbutils
module to access the DBFS. Using dbutils.fs.ls("<path>")
is equivalent to running %fs ls <path>
. Here, <path>
is a DBFS path. Both these commands list the directories in a specific DBFS "path."Note
We need to enclose dbutils.fs.ls("path")
in Databricks' display()
function to obtain a rendered output.
A Databricks job helps to run and automate activities such as an ETL job or a data analytics task. A job can be executed either immediately or on a scheduled basis. A job can be created by using the UI or CLI or invoking the Jobs UI. We will now create a job using the Databricks UI:
jobs-notebook
and paste the following code in a new cell. This code creates a new delta table and inserts records into the table. We'll learn about Delta Lake in more detail later in this chapter. Note that the following two code blocks must be run in the same cell.The following code block creates a delta table in Databricks with the name of insurance_claims
. The table has four columns, user_id
, city
, country
, and amount
:
%sql -- Creating a delta table and storing data in DBFS -- Our table's name is 'insurance_claims' and has four columns CREATE OR REPLACE TABLE insurance_claims ( user_id INT NOT NULL, city STRING NOT NULL, country STRING NOT NULL, amount INT NOT NULL ) USING DELTA LOCATION 'dbfs:/tmp/insurance_claims';
Now, we will insert five records into the table. In the following code block, every INSERT INTO
statement inserts one new record into the delta table:
INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (100, 'Mumbai', 'India', 200000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (101, 'Delhi', 'India', 400000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (102, 'Chennai', 'India', 100000); INSERT INTO insurance_claims (user_id, city, country, amount) VALUES (103, 'Bengaluru', 'India', 700000);
jobs-notebook
notebook that we created, and in Cluster, select an existing all-purpose cluster.Databricks Community is a platform that provides a free-of-cost Databricks workspace. It supports a single node cluster wherein we have one driver and no workers. The community platform is great for beginners to get started with Databricks. But several features of Azure Databricks are not supported in the Community edition. For example, we cannot create jobs or change cluster configuration settings. To sign up for Databricks Community, visit https://community.cloud.databricks.com/login.html.
Delta Lake was launched by Databricks as an open source project owned by the Linux Foundation that converts a traditional data lake into a lakehouse. The term lakehouse refers to a platform that brings in the best of both data lakes and warehouses. Delta Lake offers the following features:
Next, we'll look at big data file formats.
Before we dive deeper into Delta Lake, let's first try to understand the file formats used to store big data. Traditional file formats such as CSV and TSV store data in a row-wise format and are not partitioned. CSVs are basically strings without any data types so they always need to be scanned entirely without any scope for filtering. This makes it difficult for processing and querying larger datasets. Instead, file formats such as Parquet, ORC, and Avro help us overcome many such challenges as they can be stored in a distributed fashion.
Note
Row-based file formats store data by row, whereas columnar file formats store data by column. Row-based file formats work best for transactional writes of data, whereas columnar file formats are ideal for data querying and analytical workloads.
Let us look at row-based file formats versus columnar file formats in the following image:
The similarities between Parquet, ORC, and Avro are as follows:
The differences between Parquet, ORC, and Avro are as follows:
Coming back to Delta Lake, it can simply be treated as a file format. Tables that are created on top of this delta file format are simply called delta tables. The delta file format is mainly composed of two components:
_delta_log
folder is created when data is written in the delta file format. This folder stores files that record all the transactions to data..parquet
) can also be compacted later using different functions. For efficient querying purposes, these Parquet partition files can also be distributed based on partition folders.It's also important for us to understand the use and value of the transactional log.
Having an understanding of the transactional log is imperative when working with Delta Lake. Let's take a peek at the contents of the _delta_log
folder.
Whenever a transaction is carried out on a delta table, the changes are recorded in the _delta_log
folder in the form of JSON files. The naming conventions of these JSON files begin sequentially, starting with 000000.json
. Subsequent JSON files are created as changes get committed (000001.json
, 000002.json
, and so on). Also, with each fresh transaction, a new set of Parquet files may be written. In this process, the new JSON file created in the _delta_log
folder keeps a record of which Parquet files to reference and which to omit. This happens because every transaction to a delta table results in a new version of the table.
Let's see how this works with an example. Suppose we have a delta table with a _delta_log
folder containing 00000.json
. Suppose this JSON file references two Parquet files, part-0000.parquet
and part-0001.parquet
.
Now we have an UPDATE
transaction carried out on the delta table. This creates a new JSON file in the _delta_log
folder by the name of 00001.json
. Also, a new Parquet file is added in the delta table's directory, part-0002.parquet
. Upon checking the new JSON file, we find that it references part-0001.parquet
and part-0002.parquet
but omits part-0000.parquet
.
Let's start by creating a Spark DataFrame by reading a CSV file. Create a new Databricks Python notebook and spin up a Spark cluster with one driver, one worker, the standard type, and autoscaling disabled. Every code block in the following section must be executed in a new notebook cell:
airlines
dataset from the databricks-datasets
repository. Databricks provides many sample datasets in every workspace. These are part of the databricks-datasets
directory of the DBFS. The following code block creates a new Spark DataFrame by specifying the first row as the header, automatically inferring the schema, and reading from a CSV file. Once the DataFrame is created, we will display the first five rows:airlines = (spark.read .option("header",True) .option("inferSchema",True) .option("delimiter",",") .csv("dbfs:/databricks-datasets/airlines/part-00000") # View the dataframe display(airlines.limit(5))
_delta_log
folder:airlines.write.mode("overwrite").format("delta").save("dbfs:/airlines/")
We can view the location where the data is written in delta format:
display(dbutils.fs.ls("dbfs:/airlines/"))
_delta_log
folder, we can find one JSON file:display(dbutils.fs.ls("dbfs:/airlines/_delta_log/"))
%sql
magic command. The name of the delta table created is airlines_delta_table
. A count operation on the newly created delta table returns the number of records in the table:%sql DROP TABLE IF EXISTS airlines_delta_table; CREATE TABLE airlines_delta_table USING DELTA LOCATION "dbfs:/airlines/"; %sql SELECT COUNT(*) as count FROM airlines_delta_table
DELETE
operation on the delta table. This will delete all the rows where the Month
column equals 10
. This deletes 448,620 rows from the delta table:%sql DELETE FROM airlines_delta_table WHERE Month = '10'
UPDATE
operation on the delta table. This transaction will update the Dest
column and replace all the SFO
values with San Francisco
. We can also see that 7,575 rows received updates in the table:%sql UPDATE airlines_delta_table SET Dest = 'San Francisco' WHERE Dest = 'SFO'
DELETE
and UPDATE
) were carried out:display(dbutils.fs.ls("dbfs:/airlines/"))
_delta_log
folder now contains two more JSON files, one for each transaction:display(dbutils.fs.ls("dbfs:/airlines/_delta_log/"))
DESCRIBE HISTORY
command on the delta table returns a list of all the versions of the table:%sql -- Time travel DESCRIBE HISTORY airlines_delta_table
VERSION AS OF
to the delta table. First, we'll try to query the data based on the condition that got updated. For instance, after the update operation, no record should have the SFO
value. Hence, we get a count of 0:%sql -- Return count of rows where Dest = 'SFO' in current version that is version 2 SELECT COUNT(*) FROM airlines_delta_table WHERE Dest = 'SFO'
%sql -- Return count of rows where Dest = 'SFO' in version 1 SELECT COUNT(*) FROM airlines_delta_table VERSION AS OF 1 WHERE Dest = 'SFO'
Let's recap what we've covered in this first chapter.
In this chapter, we learned about the fundamentals of Spark, got an introduction to Databricks, and explored Delta Lake. We were introduced to Azure Databricks and the important workspace components. We learned how to create an Azure Databricks instance, a notebook in the Databricks workspace, Spark clusters, and Databricks jobs. We also learned about the important big data file formats such as Parquet, Avro, and ORC. We also learned about the fundamentals of Delta Lake and went through a worked-out example.
In the next chapter, we will dive deeper into the concepts of batch and stream processing in Azure Databricks. We'll also see more examples in the chapter to practice working hands-on in a Databricks environment.
Where there is an eBook version of a title available, you can buy it from the book details for that title. Add either the standalone eBook or the eBook and print book bundle to your shopping cart. Your eBook will show in your cart as a product on its own. After completing checkout and payment in the normal way, you will receive your receipt on the screen containing a link to a personalised PDF download file. This link will remain active for 30 days. You can download backup copies of the file by logging in to your account at any time.
If you already have Adobe reader installed, then clicking on the link will download and open the PDF file directly. If you don't, then save the PDF file on your machine and download the Reader to view it.
Please Note: Packt eBooks are non-returnable and non-refundable.
Packt eBook and Licensing When you buy an eBook from Packt Publishing, completing your purchase means you accept the terms of our licence agreement. Please read the full text of the agreement. In it we have tried to balance the need for the ebook to be usable for you the reader with our needs to protect the rights of us as Publishers and of our authors. In summary, the agreement says:
If you want to purchase a video course, eBook or Bundle (Print+eBook) please follow below steps:
Our eBooks are currently available in a variety of formats such as PDF and ePubs. In the future, this may well change with trends and development in technology, but please note that our PDFs are not Adobe eBook Reader format, which has greater restrictions on security.
You will need to use Adobe Reader v9 or later in order to read Packt's PDF eBooks.
Packt eBooks are a complete electronic version of the print edition, available in PDF and ePub formats. Every piece of content down to the page numbering is the same. Because we save the costs of printing and shipping the book to you, we are able to offer eBooks at a lower cost than print editions.
When you have purchased an eBook, simply login to your account and click on the link in Your Download Area. We recommend you saving the file to your hard drive before opening it.
For optimal viewing of our eBooks, we recommend you download and install the free Adobe Reader version 9.