TensorFlow Enterprise can easily access data sources in Google Cloud Storage as well as BigQuery. Either of these data sources can easily host gigabytes to terabytes of data. Reading training data into the JupyterLab runtime at this magnitude of size is definitely out of question, however. Therefore, streaming data as batches through training is the way to handle data ingestion. The tf.data API is the way to build a data ingestion pipeline that aggregates data from files in a distributed system. After this step, the data object can go through transformation steps and evolve into a new data object for training.
In this section, we are going to learn basic coding patterns for the following tasks:
- Reading data from a Cloud Storage bucket
- Reading data from a BigQuery table
- Writing data into a Cloud Storage bucket
- Writing data into BigQuery table
After this, you will have a good grasp of reading and writing data to a Google Cloud Storage option and persisting your data or objects produced as a result of your TensorFlow runtime.
Cloud Storage Reader
Cloud Storage Reader is integrated with tf.data, so a tf.data object can easily access data in Google Cloud Storage. For example, the following code snippet demonstrates how to read a tfrecord dataset:
my_train_dataset = tf.data.TFRecordDataset('gs://<BUCKET_NAME>/<FILE_NAME>*.tfrecord')
my_train_dataset = my_train_dataset.repeat()
my_train_dataset = my_train_dataset.batch()
…
model.fit(my_train_dataset, …)
In the example preceding pattern, the file stored in the bucket is serialized into tfrecord, which is a binary format of your original data. This is a very common way of storing and serializing large amounts of data or files in the cloud for TensorFlow consumption. This format enables a more efficient read for data being streamed over a network. We will discuss tfrecord in more detail in a future chapter.
BigQuery Reader
Likewise, BigQuery Reader is also integrated into the TensorFlow Enterprise environment, so training data or derived datasets stored in BigQuery can be consumed by TensorFlow Enterprise.
There are three commonly used methods to read a table stored in a BigQuery data warehouse. The first way is the %%bigquery magic command. The second way is using the BigQuery API in a general Python runtime, and the third way is to use TensorFlow I/O. Each has its advantages.
The BigQuery magic command
This method is perfect for running SQL statements directly in a JupyterLab cell. This is equivalent to switching the cell's command interpreter. The %%bigquery interpreter executes a standard SQL query and the results are returned as a pandas DataFrame.
The following code snippet shows how to use the %%bigquery interpreter and assign a pandas DataFrame name to the result. Each step is a JupyterLab cell:
- Specify a project ID. This JupyterLab cell uses a default interpreter. Therefore, this is a Python variable. If the BigQuery table is in the same project, then you don't need to specify the project ID:
project_id = '<PROJECT-XXXXX>'
- Invoke the
%%bigquery magic command, and assign the project ID and a DataFrame name to hold the result:%%bigquery --project $project_id mydataframe
SELECT * from `bigquery-public-data.covid19_jhu_csse.summary` limit 5
If the table is in the same project as you currently running from, you don't need --project argument.
- Verify the result is a pandas DataFrame:
type(mydataframe)
- Show the DataFrame:
mydataframe
The complete code snippet for this example is as follows:
Figure 1.23 – Code snippet for BigQuery and Python runtime integration
Here are the key takeaways:
- It is required to have a project ID in order to use the BigQuery API.
- You may pass a Python variable such as the project ID as a value into the cell that runs the
%%bigquery interpreter using the $ prefix.
- In order for the result to be reusable further by the Python preprocessing functionality or for TensorFlow consumption, you need to specify a name for the DataFrame that will hold the query result.
The Python BigQuery API
The second method by which we can invoke the BigQuery API is through Google Cloud's BigQuery client. This will give us direct access to the data, execute the query, and allow us to receive the results right away. This method does not require the user to know about the table schema. In fact, it simply wraps a SQL statement inside the BigQuery client instantiated through a library call.
This code snippet demonstrates how to invoke the BigQuery API and use it to return the results in a pandas DataFrame:
from google.cloud import bigquery
project_id ='project-xxxxx'
client = bigquery.Client(project=project_id)
sample_count = 1000
row_count = client.query('''
SELECT
COUNT(*) as total
FROM `bigquery-public-data.covid19_jhu_csse.summary`''').to_dataframe().total[0]
df = client.query('''
SELECT
*
FROM
`bigquery-public-data.covid19_jhu_csse.summary`
WHERE RAND() < %d/%d
''' % (sample_count, row_count)).to_dataframe()
print('Full dataset has %d rows' % row_count)
The output of the preceding code is as follows:
Figure 1.24 – Code output
Let's take a closer look at the preceding code:
- An import of the BigQuery library is required to create a BigQuery client.
- The project ID is required for using this API to create a BigQuery client.
- This client wraps a SQL statement and executes it.
- The returned data can be easily converted to a pandas DataFrame right away.
The pandas DataFrame rendition of the BigQuery table has the following columns:
Figure 1.25 – The pandas DataFrame rendition of the BigQuery table
This is ready for further consumption. It is now a pandas DataFrame that occupies memory space in your Python runtime.
This method is very straightforward, as it can help you explore the data schema and do simple aggregation and filtering, and since it is basically a SQL statement wrapper, it is very easy to just get the data out of the warehouse and start using it. You didn't have to know much about the table schema to do this.
However, the problem with this approach is when the table is big enough to overflow your memory. TensorFlow I/O can help solve this problem.
TensorFlow I/O
For TensorFlow consumption of BigQuery data, it is better if we use TensorFlow I/O to invoke the BigQuery API. This is because TensorFlow I/O will provide us with a dataset object that represents the query results, rather than the entire results, as in the previous method. A dataset object is the means to stream training data for a model during training. Therefore not all training data has to be in memory at once. This complements mini-batch training, which is arguably the most common implementation of gradient descent optimization used in deep learning. However, this is a bit more complicated than the previous method. It requires you to know the schema of the table. This example uses a public dataset hosted by Google Cloud.
We need to start with the columns of our interest from the table. We can use the previous method to examine the column names and datatypes, and create a session definition:
- Load the required libraries and set up the variables as follows:
import tensorflow as tf
from tensorflow_io.bigquery import BigQueryClient
PROJECT_ID = 'project-xxxxx' # This is from what you created in your Google Cloud Account.
DATASET_GCP_PROJECT_ID = 'bigquery-public-data'
DATASET_ID = 'covid19_jhu_csse'
TABLE_ID = 'summary'
- Instantiate a BigQuery client and specify the batch size:
batch_size = 2048
client = BigQueryClient()
- Use the client to create a read session and specify the columns and datatypes of interest. Notice that when using the BigQuery client, you need to know the correct column names and their respective datatypes:
read_session = client.read_session(
'projects/' + PROJECT_ID,
DATASET_GCP_PROJECT_ID, TABLE_ID, DATASET_ID,
['province_state',
'country_region',
'confirmed',
'deaths',
'date',
'recovered'
],
[tf.string,
tf.string,
tf.int64,
tf.int64,
tf.int32,
tf.int64],
requested_streams=10
)
- Now we can use the session object created to execute a read operation:
dataset = read_session.parallel_read_rows(sloppy=True).batch(batch_size)
- Let's take a look at the dataset with
type():type(dataset)
Here's the output:
Figure 1.26 – Output
- In order to actually see the data, we need to convert the dataset ops to a Python iterator and use
next() to see the content of the first batch:itr = tf.compat.v1.data.make_one_shot_iterator(
dataset
)
next(itr)
The output of the preceding command shows it is organized as an ordered dictionary, where the keys are column names and the values are Tensors:
Figure 1.27 – Raw data as an iterator
Here are the key takeaways:
- TensorFlow I/O's BigQuery Client requires setting up a read session, which consists of column names from your table of interest.
- This client then executes a read operation that also includes data batching.
- The output of the read operation is a TensorFlow ops.
- This ops may be converted to a Python iterator, so it can output the actual data read by the ops.
- This improves the efficiency of memory use during training, as data is sent for training in batches.
Persisting data in BigQuery
We have looked at how to read data stored in Google Storage solutions, such as Cloud Storage buckets or a BigQuery data warehouse, and how to enable the data for consumption by AI Platform's TensorFlow Enterprise instance running in JupyterLab. Now let's take a look at some ways to write data back, or persist our working data, into our cloud Storage.
Our first example concerns writing a file stored in JupyterLab runtime's directory (in some TensorFlow Enterprise documentations, this is also referred to as a local file). The process in general is as follows:
- For convenience, execute a BigQuery SQL
read command on a table from a public dataset.
- Store the result locally as a comma-separated file (CSV).
- Write the CSV file to a table in our BigQuery dataset.
Each step is a code cell. The following step-by-step code snippet applies to JupyterLab in any of the three AI platforms (AI Notebook, AI Deep Learning VM, and Deep Learning Container):
- Designate a project ID:
project_id = 'project1-190517'
- Execute the BigQuery SQL command and assign the result to a pandas DataFrame:
%%bigquery --project $project_id mydataframe
SELECT * from `bigquery-public-data.covid19_jhu_csse.summary`
The BigQuery results come back as a pandas DataFrame by default. In this case, we designate the DataFrame name to be mydataframe.
- Write the pandas DataFrame to a CSV file in a local directory. In this case, we used the
/home directory of this JupyterLab runtime:import pandas as pd
mydataframe.to_csv('my_new_data.csv')
- Designate a dataset name:
dataset_id = 'my_new_dataset'
- Use the BigQuery command-line tool to create an empty table in this project's dataset. This command starts with
!bq:!bq --location=US mk --dataset $dataset_id
This command creates a new dataset. This dataset doesn't have any tables yet. We are going to write a new table into this dataset in the next step.
- Write the local CSV file to a new table:
!bq \
--location=US \
load \
--autodetect \
--skip_leading_rows=1 \
--source_format=CSV \
{dataset_id}.my_new_data_table \
'my_new_data.csv'In this command, since the CSV file is stored in the current directory, its filename of 'my_new_data.csv' will suffice. Otherwise, a full path is required. Also, {dataset_id}.my_new_data_table indicates that we want to write the CSV file into this particular dataset and the table name.
- Now you can navigate to the BigQuery portal, and you will find the dataset and the table:
Figure 1.28 – The BigQuery portal and navigation to the dataset
In this case, we have one dataset, which contains one table.
- Then, execute a simple query, as follows:
Figure 1.29 – A query for examining the table
This is a very simple query where we just want to show 1,000 randomly selected rows. You can now execute this query and the output will be as shown in the following screenshot.
The following query output shows the data from the BigQuery table we just created:
Figure 1.30 – Example table output
Here are the key takeaways:
- Data generated during the TensorFlow workflow in the AI Platform's JupyterLab runtime can be seamlessly persisted as a table in BigQuery.
- Persisting data in a structured format, such as a pandas DataFrame or a CSV file, in BigQuery can easily be done using the BigQuery command-line tool.
- When you need to move a data object (such as a table) between the JupyterLab runtime and BigQuery, use the BigQuery command-line tool with
!bq to save time and effort.
Persisting data in a storage bucket
In the previous Persisting data in BigQuery section, we saw how a structured data source such as a CSV file or a pandas DataFrame can be persisted in a BigQuery dataset as a table. In this section, we are going to see how to persist working data such as a NumPy array. In this case, the suitable target storage is a Google Cloud Storage bucket.
The workflow for this demonstration is as follows:
- For convenience, read a NumPy array from
tf.keras.dataset.
- Save the NumPy array as a pickle (
pkl) file. (FYI: The pickle file format, while convenient and easy to use for serializing Python objects, also has its downsides. For one, it may be slow and creates a larger object than the original. Second, a pickle file may contain bugs or security risks for any process that opens it. It is used only for convenience here.)
- Use the
!gsutil storage command-line tool to transfer files from JupyterLab's /home directory (in some documentation, this is referred to as the local directory) to the storage bucket.
- Use
!gsutil to transfer the content in the bucket back to the JupyterLab runtime. Since we will use Python with !gsutil, we need to keep the content in separate cells.
Follow these steps to complete the workflow:
- Let's use the IMDB dataset because it is already provided in NumPy format:
import tensorflow as tf
import pickle as pkl
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.imdb.load_data(
path='imdb.npz',
num_words=None,
skip_top=0,
maxlen=None,
seed=113,
start_char=1,
oov_char=2,
index_from=3
)
with open('/home/jupyter/x_train.pkl','wb') as f:
pkl.dump(x_train, f)x_train, y_train, x_test, and y_test are returned as NumPy arrays. Let's use x_train for the purposes of this demonstration. The x_train array is going to be saved as a pkl file in the JupyterLab runtime.
The preceding code opens the IMDB movie review dataset that is distributed as a part of TensorFlow. This dataset is formatted as tuples of NumPy arrays and separated as training and test partitions. Then we proceed to save the x_train array as a pickle file in the runtime's /home directory. This pickle file will then be persisted in a storage bucket in the next step.
- Designate a name for the new storage bucket:
bucket_name = 'ai-platform-bucket'
- Create a new bucket with the designated name:
!gsutil mb gs://{bucket_name}/Use !gsutil to move the pkl file from the runtime to the storage bucket:
!gsutil cp /home/jupyter/x_train.pkl gs://{bucket_name}/
- Read the
pkl file back:!gsutil cp gs://{bucket_name}/x_train.pkl /home/jupyter/x_train_readback.pkl
- Now let's inspect the Cloud Storage bucket:
Figure 1.31 – Serializing an object in a bucket from the workflow in AI Platform
Here are the key takeaways:
- Working data generated during the TensorFlow workflow can be persisted as a serialized object in the storage bucket.
- Google AI Platform's JupyterLab environment provides seamless integration between the TensorFlow runtime and the Cloud Storage command-line tool,
gsutil.
- When you need to transfer content between Google Cloud Storage and AI Platform, use the
!gsutil command-line tool.