Search icon
Subscription
0
Cart icon
Close icon
You have no products in your basket yet
Save more on your purchases!
Savings automatically calculated. No voucher code required
Arrow left icon
All Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletters
Free Learning
Arrow right icon
Azure Databricks Cookbook

You're reading from  Azure Databricks Cookbook

Product type Book
Published in Sep 2021
Publisher Packt
ISBN-13 9781789809718
Pages 452 pages
Edition 1st Edition
Languages
Authors (2):
Phani Raj Phani Raj
Profile icon Phani Raj
Vinod Jaiswal Vinod Jaiswal
Profile icon Vinod Jaiswal
View More author details

Table of Contents (12) Chapters

Preface 1. Chapter 1: Creating an Azure Databricks Service 2. Chapter 2: Reading and Writing Data from and to Various Azure Services and File Formats 3. Chapter 3: Understanding Spark Query Execution 4. Chapter 4: Working with Streaming Data 5. Chapter 5: Integrating with Azure Key Vault, App Configuration, and Log Analytics 6. Chapter 6: Exploring Delta Lake in Azure Databricks 7. Chapter 7: Implementing Near-Real-Time Analytics and Building a Modern Data Warehouse 8. Chapter 8: Databricks SQL 9. Chapter 9: DevOps Integrations and Implementing CI/CD for Azure Databricks 10. Chapter 10: Understanding Security and Monitoring in Azure Databricks 11. Other Books You May Enjoy

Reading and writing data from and to an Azure SQL database using native connectors

Reading and writing data from and to an Azure SQL database is the most important step in most data ingestion pipelines. You will have a step in your data ingestion pipeline where you will load the transformed data into Azure SQL or read raw data from Azure SQL to perform some transformations.

In this recipe, you will learn how to read and write data using SQL Server JDBC Driver and the Apache Spark connector for Azure SQL.

Getting ready

The Apache Spark connector for Azure SQL only supports Spark 2.4.x and 3.0.x clusters as of now and might change in future. SQL Server JDBC Driver supports both Spark 2.4.x and 3.0.x clusters. Before we start working on the recipe, we need to create a Spark 2.4.x or 3.0.x cluster. You can follow the steps mentioned in the Creating a cluster from the UI to create 2.x clusters recipe from Chapter 1, Creating an Azure Databricks Service.

We have used Databricks Runtime Version 7.3 LTS with Spark 3.0.1 having Scala version as 2.12 for this recipe. The code is tested with Databricks Runtime Version 6.4 that includes Spark 2.4.5 and Scala 2.11 as well

You need to create an Azure SQL database—to do so, follow the steps at this link:

https://docs.microsoft.com/en-us/azure/azure-sql/database/single-database-create-quickstart?tabs=azure-portal

After your Azure SQL database is created, connect to the database and create the following table in the newly created database:

CREATE TABLE [dbo].[CUSTOMER](
     [C_CUSTKEY] [int] NULL,
     [C_NAME] [varchar](25) NULL,
     [C_ADDRESS] [varchar](40) NULL,
     [C_NATIONKEY] [smallint] NULL,
     [C_PHONE] [char](15) NULL,
     [C_ACCTBAL] [decimal](18, 0) NULL,
     [C_MKTSEGMENT] [char](10) NULL,
     [C_COMMENT] [varchar](117) NULL
) ON [PRIMARY]
GO

Once the table is created, you can proceed with the steps mentioned in the How to do it… section. You can follow along the steps mentioned in the notebook 2_4.Reading and Writing from and to Azure SQL Database.ipynb.

How to do it…

You will learn how to use SQL Server JDBC Driver and the Apache Spark connector for Azure SQL to read and write data to and from an Azure SQL database. You will learn how to install the Spark connector for Azure SQL in a Databricks cluster.

Here are the steps to read data from an Azure SQL database using SQL Server JDBC Driver:

  1. First, create a variable for the connection string and the table from which we will be reading and writing the data. We will load the csv files from ADLS Gen2 that we saw in the Reading and writing data from and to ADLS Gen2 recipe:
    # Details about connection string
    logicalServername = "demologicalserver.database.windows.net"
    databaseName = "demoDB"
    tableName = "CUSTOMER"
    userName = "sqladmin"
    password = "Password@Strong12345" # Please specify password here
    jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(logicalServername, 1433, databaseName)
    connectionProperties = {
      "user" : userName,
      "password" : password,
      "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
  2. As you can see from the preceding step, the driver we are using is called SQLServerDriver, which comes installed as part of Databricks Runtime.
  3. Create a schema for the csv files, store this in ADLS Gen-2, and mount the storage to DBFS. Follow the steps mentioned in the third recipe, Reading and writing data from and to ADLS Gen2, to learn how to mount storage to DBFS:
    #Creating a schema which can be passed while creating the DataFrame
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DecimalType(18,2)),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  4. Once a schema is created, we will read the csv files in a DataFrame:
    # Reading customer csv files in a DataFrame. This Dataframe will be written to Customer table in Azure SQL DB
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
  5. After the preceding step is executed, we will write the DataFrame to the dbo.CUSTOMER table that we have already created as part of the Getting ready section:
    df_cust.write.jdbc(jdbcUrl,  
                       mode ="append", 
                       table=tableName, 
                       properties=connectionProperties)
  6. After loading the data, we will read the table to count the number of records inserted in the table:
    df_jdbcRead= spark.read.jdbc(jdbcUrl,  
                       table=tableName, 
                       properties=connectionProperties)
    # Counting number of rows
    df_jdbcRead.count()

Here are the steps to read data from an Azure SQL database using Apache Spark Connector for Azure SQL Database.

Table 2.1 - Compatible connectors for Spark 2.4.x and Spark 3.0.x clusters

Table 2.1 - Compatible connectors for Spark 2.4.x and Spark 3.0.x clusters

You can also download the connector from https://search.maven.org/search?q=spark-mssql-connector.

  1. After a Spark 2.4.x or 3.0.x cluster is created, you need to install Spark connector for Azure SQL DB from Maven. Make sure you use the coordinates as mentioned in the preceding table. Go to the Databricks Clusters page and click on Libraries. Then, click on Install New and select the library source as Maven. Now, click on Search Packages and search for spark-mssql-connector:
    Figure 2.13 – Installing Spark connector on Azure SQL database

    Figure 2.13 – Installing Spark connector on Azure SQL database

  2. Under Search Packages, select Maven Central and search for spark-mssql-connector and select the version with artifact id spark-mssql-connector_2.12 with Releases as 1.1.0 as we are using Spark 3.0.1 cluster and click on Select. You can use any latest version which is available when you are going through the recipe. If you are using Spark 2.4.x cluster then you must use the version with Artifact Id as spark-mssql-connector with Releases version 1.0.2.
    Figure 2.14 – Installing Spark connector on Azure SQL database (continued)

    Figure 2.14 – Installing Spark connector on Azure SQL database (continued)

  3. After selecting the package, it gets installed and you will see the status as Installed:
    Figure 2.15 – Spark connector to Azure SQL database installed

    Figure 2.15 – Spark connector to Azure SQL database installed

  4. After the Spark connector for Azure SQL is installed then you can run the follow code for setting the connection string for Azure SQL
    server_name = f"jdbc:sqlserver://{logicalServername}" 
    database_name = "demoDB"
    url = server_name + ";" + "databaseName=" + database_name + ";"
    table_name = "dbo.Customer"
    username = "sqladmin"
    password = "xxxxxx" # Please specify password here
  5. After the Spark connector is installed, we will read the records from the dbo.CUSTOMER table using the newly installed Spark connector for Azure SQL:
    sparkconnectorDF = spark.read \
            .format("com.microsoft.sqlserver.jdbc.spark") \
            .option("url", url) \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", password).load()
  6. Run the following code to check the schema of the DataFrame created as part of the preceding step:
    display(sparkconnectorDF.printSchema())
  7. To view a few records from the DataFrame, run the following code:
    display(sparkconnectorDF.limit(10))
  8. Create a schema for the csv files, store this in ADLS Gen-2, and mount it to DBFS. Follow the steps mentioned in the Reading and writing data from and to ADLS Gen2 recipe to learn how to mount ADLS Gen-2 Storage Account to DBFS:
    #Creating a schema which can be passed while creating the DataFrame
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DecimalType(18,2)),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  9. Once a schema is created, we will load the csv files in a DataFrame by running the following code:
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
  10. In the following step, we will learn how we can write the DataFrame to an Azure SQL database table using the append method:
    #Appending records to the existing table
    try:
      df_cust.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", tableName) \
        .option("user", userName) \
        .option("password", password) \
        .save()
    except ValueError as error :
        print("Connector write failed", error)
  11. The preceding code will append the data in the existing table; if no table exists, then it will throw an error. The following code will overwrite the existing data in the table:
    try:
      df_cust.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("overwrite") \
        .option("truncate",True) \
        .option("url", url) \
        .option("dbtable", tableName) \
        .option("user", userName) \
        .option("password", password) \
        .save()
    except ValueError as error :
        print("Connector write failed", error)
  12. As the last step, we will read the data loaded in the customer table to ensure the data is loaded properly:
    #Read the data from the table
    sparkconnectorDF = spark.read \
            .format("com.microsoft.sqlserver.jdbc.spark") \
            .option("url", url) \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", password).load()

How it works…

The Apache Spark connector works the latest version of Spark 2.4.x and Spark 3.0.x. It can be used for both SQL Server and Azure SQL Database and is customized for SQL Server and Azure SQL Database for performing big data analytics efficiently. The following document outlines the benefits of using the Spark connector and provides a performance comparison between the JDBC connector and the Spark connector:

https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15

To overwrite the data using the Spark connector, we are in overwrite mode, which will drop and recreate the table with the scheme based on the source DataFrame schema, and none of the indexes that were present on that table will be added after the table is recreated. If we want to recreate the indexes with overwrite mode, then we need to include the True option (truncate). This option will ensure that after a table is dropped and created, the required index will be created as well.

Just to append data to an existing table, we will use append mode, whereby the existing table will not be dropped or recreated. If the table is not found, it throws an error. This option is used when we are just inserting data into a raw table. If it's a staging table where we want to truncate before load, then we need to use overwrite mode with the truncate option.

You have been reading a chapter from
Azure Databricks Cookbook
Published in: Sep 2021 Publisher: Packt ISBN-13: 9781789809718
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime}