Data Processing with Optimus

By Dr. Argenis Leon , Luis Aguirre
    Advance your knowledge in tech with a Packt subscription

  • Instant online access to over 7,500+ books and videos
  • Constantly updated with 100+ new titles each month
  • Breadth and depth in over 1,000+ technologies
  1. Chapter 1: Hi Optimus!

About this book

Optimus is a Python library that works as a unified API for data cleaning, processing, and merging data. It can be used for handling small and big data on your local laptop or on remote clusters using CPUs or GPUs.

The book begins by covering the internals of Optimus and how it works in tandem with the existing technologies to serve your data processing needs. You'll then learn how to use Optimus for loading and saving data from text data formats such as CSV and JSON files, exploring binary files such as Excel, and for columnar data processing with Parquet, Avro, and OCR. Next, you'll get to grips with the profiler and its data types - a unique feature of Optimus Dataframe that assists with data quality. You'll see how to use the plots available in Optimus such as histogram, frequency charts, and scatter and box plots, and understand how Optimus lets you connect to libraries such as Plotly and Altair. You'll also delve into advanced applications such as feature engineering, machine learning, cross-validation, and natural language processing functions and explore the advancements in Optimus. Finally, you'll learn how to create data cleaning and transformation functions and add a hypothetical new data processing engine with Optimus.

By the end of this book, you'll be able to improve your data science workflow with Optimus easily.

Publication date:
September 2021
Publisher
Packt
Pages
300
ISBN
9781801079563

 

Chapter 1: Hi Optimus!

Optimus is a Python library that loads, transforms, and saves data, and also focuses on wrangling tabular data. It provides functions that were designed specially to make this job easier for you; it can use multiple engines as backends, such as pandas, cuDF, Spark, and Dask, so that you can process both small and big data efficiently.

Optimus is not a DataFrame technology: it is not a new way to organize data in memory, such as arrow, or a way to handle data in GPUs, such as cuDF. Instead, Optimus relies on these technologies to load, process, explore, and save data.

Having said that, this book is for everyone, mostly data and machine learning engineers, who want to simplify writing code for data processing tasks. It doesn't matter if you want to process small or big data, on your laptop or in a remote cluster, if you want to load data from a database or from remote storage – Optimus provides all the tools you will need to make your data processing task easier.

In this chapter, we will learn about how Optimus was born and all the DataFrame technologies you can use as backends to process data. Then, we will learn about what features separate Optimus from all the various kinds of DataFrame technologies. After that, we will install Optimus and Jupyter Lab so that we will be prepared to code in Chapter 2, Data Loading, Saving, and File Formats.

Finally, we will analyze some of Optimus's internal functions to understand how it works and how you can take advantage of some of the more advanced features.

A key point: this book will not try to explain how every DataFrame technology works. There are plenty of resources on the internet that explain the internals and the day-to-day use of these technologies. Optimus is the result of an attempt to create an expressive and easy to use data API and give the user most of the tools they need to complete the data preparation process in the easiest way possible.

The topics we will be covering in this chapter are as follows:

  • Introducing Optimus
  • Installing everything you need to run Optimus
  • Using Optimus
  • Discovering Optimus internals
 

Technical requirements

To take full advantage of this chapter, please ensure you implement everything specified in this section.

Optimus can work with multiple backend technologies to process data, including GPUs. For GPUs, Optimus uses RAPIDS, which needs an NVIDIA card. For more information about the requirements, please go to the GPU configuration section.

To use RAPIDS on Windows 10, you will need the following:

  • Windows 10 version 2004 (OS build 202001.1000 or later)
  • CUDA version 455.41 in CUDA SDK v11.1

You can find all the code for this chapter in https://github.com/PacktPublishing/Data-Processing-with-Optimus.

 

Introducing Optimus

Development of Optimus began with work being conducted for another project. In 2016, Alberto Bonsanto, Hugo Reyes, and I had an ongoing big data project for a national retail business in Venezuela. We learned how to use PySpark and Trifacta to prepare and clean data and to find buying patterns.

But problems soon arose for both technologies: the data had different category/product names over the years, a 10-level categorization tree, and came from different sources, including CSV files, Excel files, and databases, which added an extra process to our workflow and could not be easily wrangled. On the other hand, when we tried Trifacta, we needed to learn its unique syntax. It also lacked some features we needed, such as the ability to remove a single character from every column in the dataset. In addition to that, the tool was closed source.

We thought we could do better. We wanted to write an open source, user-friendly library in Python that would let any non-experienced user apply functions to clean, prepare, and plot big data using PySpark.

From this, Optimus was born.

After that, we integrated other technologies. The first one we wanted to include was cuDF, which supports processing data 20x faster; soon after that, we also integrated Dask, Dask-cuDF, and Ibis. You may be wondering, why so many DataFrame technologies? To answer that, we need to understand a little bit more about how each one works.

Exploring the DataFrame technologies

There are many different well-known DataFrame technologies available today. Optimus can process data using one or many of those available technologies, including pandas, Dask, cuDF, Dask-cuDF, Spark, Vaex, or Ibis.

Let's look at some of the ones that work with Optimus:

  • pandas is, without a doubt, one of the more popular DataFrame technologies. If you work with data in Python, you probably use pandas a lot, but it has an important caveat: pandas cannot handle multi-core processing. This means that you cannot use all the power that modern CPUs can give you, which means you need to find a hacky way to use all the cores with pandas. Also, you cannot process data volumes greater than the memory available in RAM, so you need to write code to process your data in chunks.
  • Dask came out to help parallelize Python data processing. In Dask, we have the Dask DataFrame, an equivalent to the pandas DataFrame, that can be processed in parallel using multiple cores, as well as with nodes in a cluster. This gives the user the power to scale out data processing to hundreds of machines in a cluster. You can start 100 machines, process your data, and shut them down, quickly, easily, and cheaply. Also, it supports out-of-core processing, which means that it can process data volumes greater than the memory available in RAM.
  • At the user level, cuDF and Dask-cuDF work in almost the same way as pandas and Dask, but up to 20x faster for most operations when using GPUs. Although GPUs are expensive, they give you more value for money compared to CPUs because they can process data much faster.
  • Vaex is growing in relevance in the DataFrame landscape. It can process data out-of-core, is easier to use than Dask and PySpark, and is optimized to process string and stats in parallel because of its underlying C language implementation.
  • Ibis is gaining traction too. The amazing thing about Ibis is that it can use multiple engines (like Optimus but focused on SQL engines) and you can write code in Python that can be translated into SQL to be used in Impala, MySQL, PostgreSQL, and so on.

The following table provides a quick-glance comparison of several of these technologies:

Figure 1.1 – DataFrame technologies and capabilities available in Optimus

(*) Depends on the engine that's been configured

Figure 1.1 – DataFrame technologies and capabilities available in Optimus

There are some clear guidelines regarding when to use each engine:

  • Use pandas if the DataFrame fits comfortably in memory, or cuDF if you have GPUs and the data fits in memory. This is almost always faster than using distributed DataFrame technologies under the same circumstances. This works best for real-time or near-real-time data processing.
  • Use Dask if you need to process data greater than memory, and Dask-cuDF if you have data larger than memory and a multi-core and/or multi-node GPU infrastructure.
  • Use Vaex if you have a single machine and data larger than memory, or Spark if you need to process data at terabyte scale. This is slow for small datasets/datasets that fit in memory.

Now that you understand this, you can unleash Optimus's magic and start preparing data using the same Optimus API in any of the engines available.

Examining Optimus design principles

A key point about Optimus is that we are not trying to create a new DataFrame technology. As we've already seen, there are actually many amazing options that cover almost any use case. The purpose of Optimus is to simplify how users handle data and give that power to people who may not have any technical expertise. For that reason, Optimus follows three principles:

  • One API to rule them all.
  • Knowing the technology is optional.
  • Data types should be as rich as possible.

What do these mean? Let's look at each in detail.

One API to rule them all

Almost all DataFrame technologies try to mimic the pandas API. However, there are subtle differences regarding what the same function can do, depending on how you apply it; with Optimus, we want to abstract all this.

We'll go into more detail about this later, but here's a quick example: you can calculate a column square root using the .cols accessor, like so:

from optimus import Optimus
op = Optimus("dask")
df = op.create.dataframe({"A":[0,-1,2,3,4,5]})
df = df.cols.sqrt("A")

If you want to switch from Dask to any other engine, you can use any of the following values. Each one will instantiate a different class of the Optimus DataFrame:

  • "pandas" to use Pandas. This will instantiate a pandas DataFrame.
  • "dask" to use Dask. This will instantiate a DaskDataFrame.
  • "cudf" to use cuDF. This will instantiate a CUDFDataFrame.
  • "dask_cudf" to use Dask-cuDF. This will instantiate a DaskCUDFDataFrame.
  • "spark" to use PySpark. This will instantiate a SparkDataFrame.
  • "vaex" to use Vaex. This will instantiate a VaexDataFrame.
  • "ibis" to use Ibis. This will instantiate an IbisDataFrame.

An amazing thing about this flexibility is that you can process a sample of the data on your laptop using pandas, and then send a job to Dask-cuDF or a Spark cluster to process it using the faster engine.

Knowing the technical details is optional

pandas is complex. Users need to handle technical details such as rows, index, series, and masks, and you need to go low level and use NumPy/Numba to get all the power from your CPU/GPU.

With Numba, users can gain serious speed improvements when processing numerical data. It translates Python functions into optimized machine code at runtime. This simply means that we can write faster functions on CPU or GPU. For example, when we request a histogram using Optimus, the minimum and maximum values of a column are calculated in a single pass.

In Optimus, we try to take the faster approach for every operation, without requiring extensive knowledge of the technical details, to take full advantage of the technology. That is Optimus's job.

Some other DataFrame features that are abstracted in Optimus include indices, series, and masks (the exception is PySpark). In Optimus, you only have columns and rows; the intention is to use familiar concepts from spreadsheets so that you can have a soft landing when you start using Optimus.

In Optimus, you have two main accessors, .cols and .rows, that provide most of the transformation methods that are available. For example, you can use df.cols.lower to transform all the values of a column into lowercase, while you can use df.rows.drop_duplicates to drop duplicated rows in the entire dataset. Examples of these will be addressed later in this book.

Data types should be as rich as possible

All DataFrame technologies have data types to represent integers, decimals, time, and dates. In pandas and Dask, you can use NumPy data types to assign diverse types of integers such as int8, int16, int32, and int64, or different decimals types, such as float32, float64, and so on.

This gives the user a lot of control to optimize how the data is saved and reduces the total size of data in memory and on disk. For example, if you have 1 million rows with values between 1 and 10, you can save the data as uint8 instead of inf64 to reduce the data size.

Besides this internal data representation, Optimus can infer and detect a richer set of data types so that you can understand what data in a column matches a specific data type (URL, email, date, and so on) and then apply specific functions to handle it.

In Optimus, we use the term quality to express three data characteristics:

  • Number of values that match the data type being inferred
  • Number of values that differ from the data type being inferred
  • Number of missing values

Using the df.cols.quality method, Optimus can infer the data type of every loaded column and return how many values in the column match its data types. In the case of date data types, Optimus can infer the date format.

The following list shows all the data types that Optimus can detect:

  • Integer.
  • Strings.
  • Email.
  • URL.
  • Gender.
  • Boolean.
  • US ZIP code.
  • Credit card.
  • Time and date format.
  • Object.
  • Array.
  • Phone number.
  • Social security number.
  • HTTP code.

Many of these data types have special functions that you can use, as follows:

  • URL: Schemas, domains, extensions, and query strings
  • Date: Year, month, and day
  • Time: Hours, minutes, and seconds
  • Email: domains and domain extensions

The best part of this is that you can define your own data types and create special functions for them. We will learn more about this later in this book. We will also learn about the functions we can use to process or remove matches, mismatches, and missing values.

Now that we've had a look at how Optimus works, let's get it running on your machine.

 

Installing everything you need to run Optimus

To start using Optimus, you will need a laptop with Windows, Ubuntu, or macOS installed with support for Python, PIP packages, and Conda. If you are new to Python, PIP is the main package manager. It allows Python users to install and manage packages that expand the Python standard library's functionality.

The easiest way to install Optimus is through PIP. It will allow us to start running examples in just a few minutes. Later in this section, we will see some examples of Optimus running on a notebook, on a shell terminal, and on a file read by Python, but before that, we will need to install Optimus and its dependencies.

First, let's install Anaconda.

Installing Anaconda

Anaconda is a free and open source distribution of the Python and R programming languages. The distribution comes with the Python interpreter, Conda, and various packages related to machine learning and data science so that you can start easier and faster.

To install Anaconda on any system, you can use an installer or install it through a system package manager. In the case of Linux and macOS, you can install Anaconda using APT or Homebrew, respectively.

On Linux, use the following command:

sudo apt-get install anaconda # on Linux

For macOS and Windows, go to https://www.anaconda.com/products/individual. Download the Windows file that best matches your system and double-click the file after downloading it to start the installation process:

brew cask install anaconda # on macOS

With Anaconda now installed, let's install Optimus.

Installing Optimus

With Anaconda installed, we can use Conda to install Optimus:

As stated on the Conda website, Conda is provides "package, dependency, and environment management for any language." With Conda, we can manage multiple Python environments without polluting our system with dependencies. For example, you could create a Conda environment that uses Python 3.8 and pandas 0.25, and another with Python 3.7 and pandas 1.0. Let's take a look:

  1. To start, we need to open the Anaconda Prompt. This is just the command-line interface that comes with Conda:
    • For Windows: From the Start menu, search for and open Anaconda Prompt.
    • For macOS: Open Launchpad and click the Terminal icon.
    • For Linux: Open a Terminal window.
  2. Now, in the terminal, we are going to create a Conda environment named Optimus to create a clean Optimus installation:
    conda create -n optimus python=3.8
  3. Now, you need to change from the (base) environment to the (optimus) environment using the following command:
    conda activate optimus
  4. Running the following command on your terminal will install Optimus with its basic features, ready to be tested:
    pip install pyoptimus
  5. If you have done this correctly, running a simple test will tell us that everything is correct:
    python -c 'import optimus; optimus.__version__'

    Now, we are ready to use Optimus!

We recommend using Jupyter Notebook, too.

Installing JupyterLab

If you have not been living under a rock the last 5 years, you probably know about Jupyter Notebook. JupyterLab is the next generation of Jupyter Notebook: it is a web-based interactive development environment for coding. Jupyter (for short) will help us test and modify our code easily and will help us try out our code faster. Let's take a look:

  1. To install JupyterLab, go to the Terminal, as explained in the Installing Optimus section, and run the following command:
    conda install -c conda-forge jupyterlab
  2. At this point, you could simply run Jupyter. However, we are going to install a couple of handy extensions to debug Dask and track down GPU utilization and RAM:
    conda install nodejs
    conda install -c conda-forge dask-labextension
    jupyter labextension install dask-labextension
    jupyter serverextension enable dask_labextension
  3. Now, let's run Jupyter using the following command:
    jupyter lab --ip=0.0.0.0
  4. You can access Jupyter using any browser:
Figure 1.2 – JupyterLab UI

Figure 1.2 – JupyterLab UI

Next, let's look at how to install RAPIDS.

Installing RAPIDS

There are some extra steps you must take if you want to use a GPU engine with Optimus.

RAPIDS is a set of libraries developed by NVIDIA for handling end-to-end data science pipelines using GPUs; cuDF and Dask-cuDF are among these libraries. Optimus can use both to process data in a local and distributed way.

For RAPIDS to work, you will need a GPU, NVIDIA Pascal™ or better, with a compute capability of 6.0+. You can check the compute capability by looking at the tables on the NVIDIA website: bit.ly/cc_gc.

First, let's install RAPIDS on Windows.

Installing RAPIDS on Windows 10

RAPIDS is not fully supported at the time of writing (December 2020), so you must use the Windows Subsystem for Linux version 2 (WSL2). WSL is a Windows 10 feature that enables you to run native Linux command-line tools directly on Windows.

You will need the following:

  • Windows 10 version 2004 (OS build 202001.1000 or later). You must sign up to get Windows Insider Preview versions, specifically to the Developer Channel. This is required for the WSL2 VM to have GPU access: https://insider.windows.com/en-us/.
  • CUDA version 455.41 in CUDA SDK v11.1. You must use a special version of the NVIDA CUDA drivers, which you can get by downloading them from NVIDIA's site. You must join the NVIDIA Developer Program to get access to the version; searching for WSL2 CUDA Driver should lead you to it.

Here are the steps:

  1. Install the developer preview version of Windows. Make sure that you click the checkbox next to Update to install other recommended updates too.
  2. Install the Windows CUDA driver from the NVIDIA Developer Program.
  3. Enable WSL 2 by enabling the Virtual Machine Platform optional feature. You can find more steps here: https://docs.microsoft.com/en-us/windows/wsl/install-win10.
  4. Install WSL from the Windows Store (Ubuntu-20.04 is confirmed to be working).
  5. Install Python on the WSL VM, tested with Anaconda.
  6. Go to the Installing RAPIDS section of this chapter.

Installing RAPIDS on Linux

First, you need to install the CUDA and NVIDIA drivers. Pay special attention if your machine is running code that depends on a specific CUDA version. For more information about the compatibility between the CUDA and NVIDIA drivers, check out bit.ly/cuda_c.

If you do not have a compatible GPU, you can use a cloud provider such as Google Cloud Platform, Amazon, or Azure.

In this case, we are going to use Google Cloud Platform. As of December 2020, you can get an account with 300 USD on it to use. After creating an account, you can set up a VM instance to install RAPIDS.

To create a VM instance on Google Cloud Platform, follow these steps:

  1. First, go to the hamburger menu, click Compute Engine, and select VM Instances.
  2. Click on CREATE INSTANCE. You will be presented with a screen that looks like this:
    Figure 1.3 – Google Cloud Platform instance creation

    Figure 1.3 – Google Cloud Platform instance creation

  3. Select a region that can provide a GPU. Not all zones have GPUs available. For a full list, check out https://cloud.google.com/compute/docs/gpus.
  4. Make sure you choose N1 series from the dropdown.
  5. Be sure to select an OS that's compatible with the CUDA drivers (check the options available here: https://developer.nvidia.com/cuda-downloads). After the installation, you will be using 30 GB of storage space, so make sure you assign enough disk space:
    Figure 1.4 – Google Cloud Platform OS selection

    Figure 1.4 – Google Cloud Platform OS selection

  6. Check the Allow HTTP traffic option:
    Figure 1.5 – Google Cloud Platform OS selection

    Figure 1.5 – Google Cloud Platform OS selection

  7. To finish, click the Create button at the bottom of the page:
Figure 1.6 – Google Cloud instance creation

Figure 1.6 – Google Cloud instance creation

Now, you are ready to install RAPIDS.

Installing RAPIDS

After checking that your GPU works with Optimus, go to https://rapids.ai/start.html. Select the options that match your requirements and copy the output from the command section to your command-line interface:

Figure 1.7 – Google Cloud Platform OS selection

Figure 1.7 – Google Cloud Platform OS selection

After the installation process is complete, you can test RAPIDS by importing the library and getting its version:

python -c 'import cudf; cudf.__version__'

Next, let's learn how to install Coiled for easier setups.

Using Coiled

Coiled is a deployment-as-a-service library for scaling Python that facilitates Dask and Dask-cuDF clusters for users. It takes the DevOps out of the data role to enable data professionals to spend less time setting up networking, managing fleets of Docker images, creating AWS IAM roles, and other setups they would have to handle otherwise, so that they can spend more time on their real job.

To use a Coiled cluster on Optimus, we can just pass minimal configuration to our Optimus initialization function and include our token provided by Coiled in a parameter; to get this token, you must create an account at https://cloud.coiled.io and get the token from your dashboard, like so:

op = Optimus(coiled_token="<your token here>", n_workers=2)

In this example, we initialized Optimus using a Coiled token, and set the number of workers to 2. Optimus will initialize a Dask DataFrame and handle the connection to the cluster that was created by Coiled. After this, Optimus will work as normal.

When using Coiled, it's important to maintain the same versions between the packages in the remote cluster and the packages in your local machine. For this, you can install a Coiled software environment as a local conda environment using its command-line tool. To use Optimus, we will use a specific software environment called optimus/default:

coiled install optimus/default 
conda activate coiled-optimus-default

In the preceding example, we told coiled install to create the conda environment and then used conda activate to start using it.

Using a Docker container

If you know how to use Docker and you have it installed on your system, you can use it to quickly set up Optimus in a working environment.

To use Optimus in a Docker environment, simply run the following command:

docker run -p 8888:8888 --network="host" optimus-df/optimus:latest

This will pull the latest version of the Optimus image from Docker Hub and run a notebook process inside it. You will see something like the following:

To access the notebook, open this file in a browser:
    file://...
Or copy and paste one of these URLs:
    http://127.0.0.1:8888/?token=<GENERATED TOKEN>

Just copy the address and paste it into your browser, making sure it has the same token, and you'll be using a notebook with Optimus installed in its environment.

 

Using Optimus

Now that we have Optimus installed, we can start using it. In this section, we'll run through some of the main features and how to use them.

The Optimus instance

You use the Optimus instance to configure the engine, as well as load and save data. Let's see how this works.

Once Optimus has been installed on your system, you can use it in a Python environment. Let's import the Optimus class and instantiate an object of it:

from optimus import Optimus
op = Optimus(engine=pandas)

In Optimus, we call a DataFrame technology an engine. In the preceding example, we're setting up Optimus using Pandas as the base engine. Very easy!

Now, let's instantiate Optimus using Dask in a remote cluster. For this, we'll have to pass the configuration in the arguments to the Optimus function – specifically, the session argument – which allows us to pass a Dask client:

from dask.distributed import Client
client = Client("127.0.0.105")
op = Optimus(engine="dask", session=client)

In the preceding code, we instantiated a Dask distributed client and passed it to the Optimus initialization.

To initialize with a different number of workers, you can pass a named argument as well:

op = Optimus(engine="dask", n_workers=2)

This will create a local client automatically, instead of passing just one, as in the previous example.

Using Dask, you can now access more than 100 functions to transform strings, as well as filter and merge data.

Saving and loading data from any source

Using the Optimus instance, you can easily load DataFrames from files or databases. To load a file, simply call one of the available methods for different formats (.csv, .parquet, .xlsx, and more) or simply the generic file method, which will infer the file format and other parameters:

op.load.csv("path/to/file.csv")
op.load.file("path/to/file.csv")

For databases or external buckets, Optimus can handle connections as different instances, which allows us to maintain operations and clean any credentials and addresses that may or may not repeat on different loading routines:

db = op.connect.database( *db_args )
op.load.database_table("table name", connection=db)
conn = op.connect.s3( *s3_args )
op.load.file("relative/path/to/file.csv", connection=conn)

On the other hand, to save to a file or to the table of a database, you can use the following code:

df.save.csv("relative/path/to/file.csv", connection=conn)
df.save.database_table("table_name", db=db)

Now that we have started our engine and have our data ready, let's see how we can process it using the Optimus DataFrame.

The Optimus DataFrame

One of the main goals of Optimus is to try and provide an understandable, easy-to-remember API, along with all the tools needed to clean and shape your data. In this section, we are going to highlight the main features that separate Optimus from the available DataFrame technologies.

Using accessors

Optimus DataFrames are made to be modified in a natural language, dividing the different methods available into components. For example, if we want to modify a whole column, we may use the methods available in the .cols accessor, while if we want to filter rows by the value of a specific column, we may use a method in the .rows accessor, and so on.

An example of an operation could be column renaming:

df.cols.rename("function", "job")

In this case, we are simply renaming the function column to "job", but the modified DataFrame is not saved anywhere, so the right way to do this is like so:

df = df.cols.rename("function", "job") 

In addition, most operations return a modified version of the DataFrame so that those methods can be called, chaining them:

df = df.cols.upper("name").cols.lower("job") 

When Optimus instantiates a DataFrame, it makes an abstraction of the core DataFrame that was made using the selected engine. There is a DataFrame class for every engine that's supported. For example, a Dask DataFrame is saved in a class called DaskDataFrame that contains all the implementations from Optimus. Details about the internals of this will be addressed later in this book.

As we mentioned previously, to use most of these methods on an Optimus DataFrame, it's necessary to use accessors, which separate different methods that may have distinct behaviors, depending on where they're called:

df.cols.drop("name")

The preceding code will drop the entire "name" column. The following command returns a different DataFrame:

df.rows.drop(df["name"]==MEGATRON)

The preceding code will drop the rows with values in the "name" column that match the MEGATRON value.

Obtaining richer DataFrame data

Optimus aims to give the user important information when needed. Commonly, you will use head() or show() to print DataFrame information. Optimus can provide additional useful information when you use display:

df.display()

This produces the following output:

Figure 1.8 – Optimus DataFrame display example

Figure 1.8 – Optimus DataFrame display example

In the previous screenshot, we can see information about the requested DataFrame, such as the number of columns and rows, all the columns, along with their respective data types, some values, as well as the number of partitions and the type of the queried DataFrame (in this case, a DaskDataFrame). This information is useful when you're transforming data to make sure you are on the right track.

Automatic casting when operating

Optimus will cast the data type of a column based on the operation you apply. For example, to calculate min and max values, Optimus will convert the column into a float and ignore non-numeric data:

dfn = op.create.dataframe({"A":["1",2,"4","!",None]}) 
dfn.cols.min("A"), df.cols.max("A")
(1.0, 4.0)

In the preceding example, Optimus ignores the "!" and None values and only returns the lower and higher numeric values, respectively, which in this case are 1.0 and 4.0.

Managing output columns

For most column methods, we can choose to make a copy of every input column and save it to another so that the operation does not modify the original column. For example, to save an uppercase copy of a string type column, we just need to call the same df.cols.upper with an extra argument called output_cols:

df.cols.capitalize("name", output_cols="cap_name")

This parameter is called output_cols and is plural because it supports multiple names when multiple input columns are passed to the method:

df.cols.upper(["a", "b"], 
              output_cols=["upper_a", "upper_b"])

In the preceding example, we doubled the number of columns in the resulting DataFrame, one pair untouched and another pair with its values transformed into uppercase.

Profiling

To get an insight into the data being transformed by Optimus, we can use df.profile(), which provides useful information in the form of a Python dictionary:

df = op.create.dataframe({"A":["1",2,"4","!",None],
                          "B":["Optimus","Bumblebee", 
                               "Eject", None, None]}) 
df.profile(bins=10)

This produces the following output:

Figure 1.9 – Profiler output

In the preceding screenshot, we can see data, such as every column, along with its name, missing values, and mismatch values, its inferred data type, its internal data type, a histogram (or values by frequency in categorical columns), and unique values. For the DataFrame, we have the name of the DataFrame, the name of the file (if the data comes from one), how many columns and rows we have, the total count of data types, how many missing rows we have, and the percentage of values that are missing.

Visualization

One of the most useful features of Optimus is its ability to plot DataFrames in a variety of visual forms, including the following:

  • Frequency charts
  • Histograms
  • Boxplots
  • Scatterplots

To achieve this, Optimus uses Matplotlib and Seaborn, but you can also get the necessary data in Python Dictionary format to use with any other plotting library or service.

Python Dictionary output

By default, every output operation in Optimus will get us a dictionary (except for some cases, such as aggregations, which get us another DataFrame by default). Dictionaries can easily be transformed and saved into a JSON file, in case they are needed for a report or to provide data to an API:

df.columns_sample("*")

String, numeric, and encoding tools

Optimus tries to provide out-of-the-box tools to process strings and numbers, and gives you tools for the data preparation process so that you can create machine learning models.

String clustering

String clustering refers to the operation of grouping different values that might be alternative representations of the same thing. A good example of this are the strings "NYC" and "New York City". In this case, they refer to the same thing.

Processing outliers

Finding outliers is one of the most common applications for statistical DataFrames. When finding outliers, there are various possible methods that will get us different results, such as z-score, fingerprint, n-gram fingerprint, and more. With Optimus, these methods are provided as alternatives so that you can adapt to most cases.

Encoding techniques

Encoding is useful for machine learning models since they require all the data going out or coming in to be numeric. In Optimus, we have methods such as string_to_index, which allows us to transform categorical data into numerical data.

Technical details

When dealing with distributed DataFrame technologies, there are two concepts that arise that are an integral part of how Optimus is designed. These are lazy and eager execution.

Let's explore how this works in Optimus.

Distributed engines process data in a lazy way

In Dask, for example, when you apply a function to a DataFrame, it is not applied immediately like it would be in pandas. You need to trigger this computation explicitly, by calling df.execute(), or implicitly, when calling other operations that trigger this processing.

Optimus makes use of this functionality to trigger all the computation when explicitly requested. For example, when we request the profile of a dataset, every operation, such as histogram calculation, top-n values, and data types inference, is pushed to a cluster in a directed acyclic graph (DAG) and executed in parallel.

The following representation shows multiple operations being executed in parallel being visualized in a DAG:

Figure 1.10 – Visualizing a DAG in Dask

Figure 1.10 – Visualizing a DAG in Dask

Aggregations return computed operations (eager execution)

As we mentioned earlier, distributed engines process aggregations in a lazy way. Optimus triggers aggregation so that you can always visualize the result immediately.

Triggering an execution

Optimus is capable of immediately executing an operation if requested. This only applies to engines that support delayed operations, such as Dask and PySpark. This way, we reduce the computation time when you know some operations will change on your workflow:

df = df.cols.replace("address", "MARS PLANET",
                     "mars").execute()

In the preceding example, we're replacing every match with "MARS PLANET" on the address column with "mars" and then saving a cache of this operation.

However, there are some operations or methods that will also trigger all the delayed operations that were made previously. Let's look at some of these cases:

  • Requesting a sample: For example, calling df.display() after any delayed function will require the final data to be calculated. For this reason, Optimus will trigger all the delayed operations before requesting any kind of output; this will also happen when we call any other output function.
  • Requesting a profile: When calling df.profile(), some aggregations are done in the background, such as counting for unique, mismatched, and missing values. Also, getting the frequent values or the histogram calculation of every column will require a delayed function to have been executed previously.

When using a distributed DataFrame technology, when an operation is executed, the data is executed and stored in every worker. In the case of Dask, this function would be cache, which is called when we call execute on our Optimus DataFrame. Note that if we call compute directly on a Dask DataFrame instead, all the data will be brought to the client, which might cause memory issues in our system if the data is too big.

 

Discovering Optimus internals

Optimus is designed to be easy to use for non-technical users and developers. Once you know how some of the internals work, you'll know how some transformations work, and hopefully how to avoid any unexpected behavior. Also, you'll be able to expand Optimus or make more advanced or engine-specific transformations if the situation requires it.

Engines

Optimus handles all the details that are required to initialize any engine. Although pandas, Vaex, and Ibis won't handle many configuration parameters because they are non-distributed engines, Dask and Spark handle many configurations, some of which are mapped and some of which are passed via the *args or **kwargs arguments.

Optimus always keeps a reference to the engine you initialize. For example, if you want to get the Dask client from the Optimus instance, you can use the following command:

op.client

This will show you the following information:

Figure 1.11 – Dask client object inside Optimus

Figure 1.11 – Dask client object inside Optimus

One interesting thing about Optimus is that you can use multiple engines at the same time. This might seem weird at first, but it opens up amazing opportunities if you get creative. For example, you can combine Spark, to load data from a database, and pandas, to profile a data sample in real time, or use pandas to load data and use Ibis to output the instructions as a set of SQL instructions.

At the implementation level, all the engines inherit from BaseEngine. Let's wrap all the engine functionality to make three main operations:

  • Initialization: Here, Optimus handles all the initialization processes for the engine you select.
  • Dataframe creation: op.create.dataframe maps to the DataFrame's creation, depending on the engine that was selected.
  • Data Loading: op.load handles file loading and databases.

The DataFrame behind the DataFrame

The Optimus DataFrame is a wrapper that exposes and implements a set of functions to process string and numerical data. Internally, when Optimus creates a DataFrame, it creates it using the engine you select to keep a reference in the .data property. The following is an example of this:

op = Optimus("pandas")
df = op.load.csv("foo.txt", sep=",")
type(df.data)

This produces the following result:

Pandas.core.frame.DataFrame

A key point is that Optimus always keeps the data representation as DataFrames and not as a Series. This is important because in pandas, for example, some operations return a Series as result.

In pandas, use the following code:

import pandas as pd
type(pd.DataFrame({"A":["A",2,3]})["A"].str.lower())
pandas.core.series.Series

In Optimus, we use the following code:

from optimus import Optimus
op = Optimus("pandas")
type(op.create.dataframe({"A":["A",2,3]}).cols.lower().data)
pandas.core.frame.DataFrame

As you can see, both values have the same types.

Meta

Meta is used to keep some data that does not belong in the core dataset, but can be useful for some operations, such as saving the result of a top-N operation in a specific column. To achieve this, we save metadata in our DataFrames. This can be accessed using df.meta. This metadata is used for three main reasons. Let's look at each of them.

Saving file information

If you're loading a DataFrame from a file, it saves the file path and filename, which can be useful for keeping track of the data being handled:

from optimus import Optimus 
op = Optimus("pandas") 
df = op.load.csv("foo.txt", sep=",")
df.meta

You will get the following output:

{'file_name': 'foo.txt', 'name': 'foo.txt'}

Data profiling

Data cleaning is an iterative process; maybe you want to calculate the histogram or top-N values in the dataset to spot some data that you want to remove or modify. When you calculate profiling for data using df.profile(), Optimus will calculate a histogram or frequency chart, depending on the data type. The idea is that while working with the Actions data, we can identify when the histogram or top-N values should be recalculated. Next, you will see how Actions work.

Actions

As we saw previously, Optimus tries to cache certain operations to ensure that you do not waste precious compute time rerunning tasks over data that has not changed.

To optimize the cache usage and reconstruction, Optimus handles multiple internal Actions to operate accordingly.

You can check how Actions are saved by trying out the following code:

from optimus import Optimus 
op = Optimus("pandas") 
df = op.load.csv("foo.txt", sep=",")
df = df.cols.upper("*")

To check the actions you applied to the DataFrame, use the following command:

df.meta["transformations"]

You will get a Python dictionary with the action name and the column that's been affected by the action:

{'actions': [[{'upper': ['name']}], [{'upper': ['function']}]]}

A key point is that different actions have different effects on how the data is profiled and how the DataFrame's metadata is handled. Every Optimus operation has a unique Action name associated with it. Let's look at the five Actions that are available in Optimus and what effect they have on the DataFrame:

  • Columns: These actions are triggered when operations are applied to entire Optimus columns; for example, df.cols.lower() or df.cols.sqrt().
  • Rows: These actions are triggered when operations are applied to any row in an Optimus column; for example, df.rows.set()or df.rows.drop_duplicate().
  • Copy: Triggered only for a copy column operation, such as df.cols.copy(). Internally, it just creates a new key on the dict meta with the source metadata column. If you copy an Optimus column, a profiling operation is not triggered over it.
  • Rename: Triggered only for a rename column operation, such as df.cols.rename(). Internally, it just renames a key in the meta dictionary. If you copy an Optimus column, a profiling operation is not triggered over it.
  • Drop: Triggered only for a rename column operation, such as df.cols.drop(). Internally, it removes a key in the meta dictionary. If you copy an Optimus column, a profiling operation is not triggered over it.

Dummy functions

There are some functions that do not apply to all the DataFrame technologies. Functions such as .repartition(), .cache(), and compute() are used in distributed DataFrames such as Spark and Dask to trigger operations in the workers, but these concepts do not exist in pandas or cuDF. To preserve the API's cohesion in all the engines, we can simply use pass or return the same DataFrame object.

Diagnostics

When you use Dask and Spark as your Optimus engine, you have access to their respective diagnostics dashboards. For very complex workflows, it can be handy to understand what operations have been executed and what could be slowing down the whole process.

Let's look at how this works in the case of Dask. To gain access to the diagnostic panel, you can use the following command:

op.client()

This will provide you with information about the Dask client:

Figure 1.12 – Dask client information

In this case, you can point to http://192.168.86.249:39011/status in your browser to see the Dask Diagnostics dashboard:

Figure 1.13 – Dask Diagnostics dashboard

Figure 1.13 – Dask Diagnostics dashboard

An in-depth discussion about diagnostics is beyond the scope of this book. To find out more about this topic, go to https://docs.dask.org/en/latest/diagnostics-distributed.html.

 

Summary

In this chapter, we learned about Optimus's basic capabilities and which of the available engines is the most suitable, based on the infrastructure you're using. You also learned why it is beneficial to use Optimus instead of a vanilla DataFrame and what features separate Optimus from the DataFrame technology that's available. Then, we learned how to install Optimus on Windows, macOS, and Linux, both on the cloud and with external services such as Coiled.

Finally, we took a deep dive into the internals of Optimus so that you have a better understanding of how it works and how it allows you to get creative and take full advantage of Optimus.

In the next chapter, we will learn how to load and save data from files, databases, and remote locations such as Amazon S3.

About the Authors

  • Dr. Argenis Leon

    Argenis Leon created Optimus, an open-source python library built over PySpark aimed to provide an easy-to-use API to clean, process, and merge data at scale. Since 2012, Argenis has been working on big data-related projects using Postgres, MongoDB, Elasticsearch for social media data collection and analytics. In 2015, he started working on Machine learning projects in Retail, AdTech, and Real Estate in Venezuela and Mexico. In 2019 he created Bumblebee, a low-code open-source web platform to clean and wrangle big data using CPU and GPUs using NVIDIA RAPIDS. Nowadays Argenis is Co-founder and CTO of boitas (backed by YCombinator) a wholesale marketplace for SMB in Latin America.

    Browse publications by this author
  • Luis Aguirre

    Luis Aguirre began working with web development projects for Mood Agency in 2018, creating sites for brands from all across Latin America. One year later he started working on Bumblebee, a low-code web platform to transform data that uses Optimus. In mid-2020 he started participating in the Optimus project as a core developer; focusing on creating the easiest-to-use experience for both projects. In 2021 he started working on the Optimus REST API, a tool to allow requests from the web focused on data wrangling.

    Browse publications by this author
Data Processing with Optimus
Unlock this book and the full library for $5 a month*
Start now