Reader small image

You're reading from  Artificial Intelligence for IoT Cookbook

Product typeBook
Published inMar 2021
Reading LevelIntermediate
PublisherPackt
ISBN-139781838981983
Edition1st Edition
Languages
Right arrow
Author (1)
Michael Roshak
Michael Roshak
author image
Michael Roshak

Michael Roshak is a cloud architect and strategist with extensive subject matter expertise in enterprise cloud transformation programs and infrastructure modernization through designing, and deploying cloud-oriented solutions and architectures. He is responsible for providing strategic advisory for cloud adoption, consultative technical sales, and driving broad cloud services consumption with highly strategic accounts across multiple industries.
Read more about Michael Roshak

Right arrow
Anomaly Detection

diarizationThe predictive/prescriptive AI life cycle of a device starts with data collection design. Data is analyzed for factors such as correlation and variance. Then the devices start being manufactured. Other than a small number of sample devices, there is usually no device failures, that produce machine learning models. To compensate for this, most manufacturers use duty cycle thresholds to determine whether a device is in a good state or a bad state. These duty cycle standards may be that that the device is running too hot or an arbitrary value is put on a sensor for an alert. But the data quickly needs more advanced analysis. The sheer volume of data can be daunting for an individual. The analyst needs to look through millions of records to find the proverbial needle in a haystack. Using an analyst-in-the-middle approach using anomaly detection can...

Using Z-Spikes on a Raspberry Pi and Sense HAT

Spikes or sudden changes to an individual device can warrant an alert. IoT devices are often subject to movement and weather. They can be affected by times of day or seasons of the year. The fleet of devices could be spread out throughout the world. Trying to get clear insights across the entire fleet can be challenging. Using a machine learning algorithm that incorporates the entire fleet enables us to treat each device separately.

Use cases for Z-Spikes can be a sudden discharge of batteries or a sudden temperature increase. People use Z-Spikes to tell whether something has been jostled or is suddenly vibrating. Z-Spikes can be used on pumps to see whether there is a blockage. Because Z-Spikes do so well across non-homologous environments, they are often a great candidate for edge deployments.

Getting ready

In this recipe, we are going to deploy Z-Spikes on a Raspberry Pi with a Sense HAT. The hardware itself is a fairly common development board and sensor setup for people learning about IoT. In fact, students can send their code to the International Space Station to be run on their Raspberry Pi and Sense HAT. If you do not have the equipment, there is an alternative code in the GitHub repository that simulates the device.

Once you have powered on your Raspberry Pi and attached your Sense HAT, you will need to install SciPy. In Python, you can usually install everything you need with pip, but in this case, you will need to install it through the Linux operating system. To do this, run the following commands in a terminal window:

sudo apt update
apt-cache show python3-scipy
sudo apt install -y python3-scipy

You will then need to pip install numpy, kafka, and sense_hat. You will also need to set up Kafka on a PC. There are instructions in Chapter 1, Setting up the IoT and...

How to do it...

The steps for this recipe are as follows:

  1. Import the libraries:
from scipy import stats
import numpy as np
from sense_hat import SenseHat
import json
from kafka import KafkaProducer
import time
  1. Wait for Sense HAT to register with the OS:
time.sleep(60)
  1. Initialize the variables:
device= "Pi1"
server = "[the address of the kafka server]:9092"
producer = KafkaProducer(bootstrap_servers=server)
sense = SenseHat()
sense.set_imu_config(False, True, True)
gyro = []
accel = []
  1. Create a Z-score helper function:
def zscore(data):
return np.abs(stats.zscore(np.array(data)))[0]
  1. Create a sendAlert helper function:
def sendAlert(lastestGyro,latestAccel):
alert = {'Gyro':lastestGyro, 'Accel':latestAccel}
message = json.dumps(alert)
producer.send(device+'alerts', key=bytes("alert",
encoding='utf-8'),
value=bytes(message, encoding='utf-8...

How it works...

This algorithm is checking whether the last record is more than 4 standard deviations (σ) from the preceding 1,000 values.  should have an anomaly 1 in every 15,787 readings or once every 4 hours. If we were to change that to 4.5 it would be once every 40 hours.

We import scipy for our Z-score evaluation and numpy for data manipulation. We then add the script to the Raspberry Pi startup so that the script will start automatically whenever there is a power reset. The machine needs to wait for peripherals, such as the Sense HAT initialization. The 60-second delay allows the OS to be aware of the Sense HAT before trying to initialize it. Then we initialize our variables. These variables are the device name, the IP address of the Kafka server, and the Sense HAT. Then we enable the Sense HAT's internal measuring units (IMUs). We disable the compass and enable the gyroscope and accelerometer. Finally, we create two arrays to put the data...

Using autoencoders to detect anomalies in labeled data

If you have labeled data, you can train a model to detect whether the data is normal or abnormal. For example, reading the current of an electric motor can show when extra drag is put on the motor by such things as failing ball bearings or other failing hardware. In IoT, anomalies can be a previously known phenomenon or a new event that has not been seen before. As the name suggests, autoencoders take in data and encode it to an output. With anomaly detection, we see whether a model can determine whether data is non-anomalous. In this recipe, we are going to use a Python object detection library called pyod.

Getting ready

In this recipe, we are going to use data gathered from the motion sensors on our Sense HAT. The final recipe in this chapter shows how to generate this dataset. We have also put this labeled dataset in the GitHub repository for this book. We are going to use a Python outlier detection framework called pyod or Python Outlier Detection. It wraps TensorFlow and performs various machine learning algorithms, such as autoencoders and isolated forests.

How to do it...

The steps for this recipe are as follows:

  1. Import the libraries:
from pyod.models.auto_encoder import AutoEncoder
from pyod.utils.data import generate_data
from pyod.utils.data import evaluate_print
import numpy as np
import pickle
  1. Load text files into our notebooks using NumPy arrays:
X_train = np.loadtxt('X_train.txt', dtype=float)
y_train = np.loadtxt('y_train.txt', dtype=float)
X_test = np.loadtxt('X_test.txt', dtype=float)
y_test = np.loadtxt('y_test.txt', dtype=float)
  1. Use the autoencoder algorithm to fix the model to the dataset:
clf = AutoEncoder(epochs=30)
clf.fit(X_train)
  1. Get the prediction scores:
y_test_pred = clf.predict(X_test) # outlier labels (0 or 1)
y_test_scores = clf.decision_function(X_test) # outlier scores
evaluate_print('AutoEncoder', y_test, y_test_scores)
  1. Save the model:
pickle.dump( clf, open( "autoencoder.p", "wb" ) )

How it works...

First, we import pyod, our Python object detection library. Then we import numpy for data manipulation and pickle for saving our model. Next, we use numpy to load our data. Then we train our model and get the prediction scores. Finally, we save our model.

An autoencoder takes data as input and reduces the number of nodes through a smaller hidden layer that forces it to reduce the dimensionality. The target output for an autoencoder is the input. This allows us to use machine learning to train a model on what is non-anomalous. We can then determine how far a value falls away from the trained model. These values would be anomalous. The following diagram shows conceptually how data is coded into a set of inputs. Then, its dimensionality is reduced in the hidden layer and, finally, is outputted into a larger set of outputs:

There's more...

After training our model, we need to know at what level to send the alert. When training, setting the contamination (see the following code) determines the proportion of outliers in the data that are needed to trigger the alerting function:

AutoEncoder(epochs=30, contamination=0.2)

We could also change the regularizer, as in the following example. The regularizer is used to balance the bias and variance to prevent over and underfitting:

AutoEncoder(epochs=30, l2_regularizer=0.2)

We could also change the number of neurons, our loss function, or the optimizer. This is often referred to as changing or tuning the hyperparameters in data science. Tuning the hyperparameters allows us to affect our success metrics, thereby improving the model.

Using isolated forest for unlabeled datasets

Isolated forest is a popular machine learning algorithm for anomaly detection. Isolated forests can assist in complex data models that have overlapping values. An isolated forest is an ensemble regression. Rather than using a clustering or distance-based algorithm like other machine learning algorithms, it separates outlying data points from normal data points. It does this by building a decision tree and calculates a score based on node count traversal in its path decision tree of where the data lies. In other words, it counts the number of nodes it traverses to determine an outcome. The more data that has been trained on a model, the more nodes an isolated forest would need to traverse.

Similar to the previous recipe, we are going to use pyod to easily train a model. We are going to use the Sense HAT dataset that is in the GitHub repository.

Getting ready

If you have completed the previous recipe on autoencoders, then you have everything you need. In this recipe, we are using pyod for our object detection library. The training dataset and the test dataset are in the GitHub repository for this book.

How to do it...

The steps for this recipe are as follows:

  1. Import the libraries:
from pyod.models.iforest import IForest
from pyod.utils.data import generate_data
from pyod.utils.data import evaluate_print
import numpy as np
import pickle
  1. Upload the data:
X_train = np.loadtxt('X_train.txt', dtype=float)
y_train = np.loadtxt('y_train.txt', dtype=float)
X_test = np.loadtxt('X_test.txt', dtype=float)
y_test = np.loadtxt('y_test.txt', dtype=float)
  1. Train the model:
clf = IForest()
clf.fit(X_train)
  1. Evaluate against the test data:
y_test_pred = clf.predict(X_test) # outlier labels (0 or 1)
y_test_scores = clf.decision_function(X_test)
print(y_test_pred)

# evaluate and print the results
print("\nOn Test Data:")
evaluate_print('IForest', y_test, y_test_scores)
  1. Save the model:
pickle.dump( clf, open( "IForest.p", "wb" ) )

How it works...

First, we import pyod. We then import numpy for data processing and pickle for saving our model. Next, we perform the isolated forest training. Then we evaluate our results. We get two different types of results: one is a 1 or 0 to determine whether it is normal or anomalous, and the second gives us a score of the test. Finally, we save our model.

The isolated forest algorithm segments the data using a tree-based approach. The more clustered the data is, the more segmented it is. The isolated forest algorithm looks for the data that is not part of the dense segmented area by counting the amounts of segments it would need to traverse to get there.

There's more...

Anomaly detection is one of those analysis techniques where visualization can help us determine which hyperparameters and algorithms to use. scikit-learn has an example of how to do this on their website (https://scikit-learn.org/stable/auto_examples/miscellaneous/plot_anomaly_comparison.html). A reference to this is in the GitHub repository of this book. The diagram that follows is an example of anomaly detection using multiple algorithms and settings on a toy dataset. There is no right answer in anomaly detection, but only what works best for the problem at hand:

Detecting time series anomalies with Luminol

Luminol is a time series anomaly detection algorithm released by LinkedIn. It uses a bitmap to check how many detection strategies, that are robust in datasets, tend to drift. It is also very lightweight and can handle large amounts of data.

In this example, we are going to use a publicly accessible IoT dataset from the city of Chicago. The city of Chicago has IoT sensors measuring the water quality of their lakes. Because the dataset needs some massaging before we get it into the right format for anomaly detection, we will use the prepdata.py file to extract one data point from one lake.

Getting ready

To get ready for this recipe, you will need to download the CSV file from the GitHub repository for this book. Next, you will need to install luminol:

pip install luminol

How to do it...

The steps involved in this recipe are as follows:

  1. Prep the data with prepdata.py:
import pandas as pd 

df = pd.read_csv('Beach_Water_Quality_-_Automated_Sensors.csv',
header=0)

df = df[df['Beach Name'] == 'Rainbow Beach']
df = df[df['Water Temperature'] > -100]
df = df[df['Wave Period'] > -100]
df['Measurement Timestamp'] = pd.to_datetime(df['Measurement
Timestamp'])

Turbidity = df[['Measurement Timestamp', 'Turbidity']]
Turbidity.to_csv('Turbidity.csv', index=False, header=False)
  1. Import libraries in Luminol.py:
from luminol.anomaly_detector import AnomalyDetector
import time
  1. Perform anomaly detection:
my_detector = AnomalyDetector('Turbidity.csv')
score = my_detector.get_all_scores()
  1. Print the anomalies:
for (timestamp, value) in score.iteritems():
t_str = time.strftime('%y-%m-%d %H...

How it works...

In the dataprep Python library, you will only import pandas so that we can take the CSV file and turn it into a pandas DataFrame. Once we have a pandas DataFrame we will filter out on Rainbow Beach (in our case, we are only looking at Rainbow Beach). Then we will take out anomalous data such as data where the water temperature is below -100 degrees. Then we will convert the time string into a string that pandas can read. We do this so that when it outputs, it outputs to a standard time series format. Then we select only the two columns we need to analyze, Measurement Timestamp and Turbidity. Finally, we save the file in CSV format.

Next, we create a Luminol file. From here, we use pip to install luminol and time. We then use the anomaly detector on the CSV file and return all of the scores. Finally, we return scores if the value of our score item is greater than 0. In other words, we only return scores if there is an anomaly.

There's more...

In addition to anomaly detection, Luminol can also form correlation analysis. This can help the analyst determine whether two time series datasets are correlated to each other. So, for example, our dataset from the city of Chicago measured various aspects of water purity in their lakes. We could compare lakes against each other to see whether there was a common effect in two different lakes at the same time.

Detecting seasonality-adjusted anomalies

Data from a temperature sensor might trend upward throughout the day if the device is outdoors. Similarly, the internal temperature of an exterior device may be lower in the winter. Not all devices are affected by seasonality but for the ones that are, choosing an algorithm that handles seasonality and trends is important. According to a research paper (Automatic Anomaly Detection in the Cloud Via Statistical Learning) from data scientists at Twitter, Seasonal ESD is a machine learning algorithm that takes seasonality and trends to find anomalies regardless of the seasonality.

For this recipe, we are going to use the city of Chicago lake water purity dataset. We are going to pull in the data file we prepared in the Detecting time series anomalies with Luminol recipe.

Getting ready

To get ready, you will need the Seasonal ESD library. This can be installed simply with the following pip command:

pip install sesd

The dataset can be found in the GitHub repository of this book.

How to do it...

The steps to execute this recipe are as follows:

  1. Import the libraries:
import pandas as pd 
import sesd
import numpy as np
  1. Import and manipulate the data:
df = pd.read_csv('Beach_Water_Quality_-_Automated_Sensors.csv',
header=0)
df = df[df['Beach Name'] == 'Rainbow Beach']
df = df[df['Water Temperature'] > -100]
df = df[df['Wave Period'] > -100]
waveheight = df[['Wave Height']].to_numpy()
  1. Perform anomaly detection:
outliers_indices = sesd.seasonal_esd(waveheight, hybrid=True,
max_anomalies=2)
  1. Output the results:
for idx in outliers_indices:
print("Anomaly index: {}, anomaly value: {}"\
.format(idx, waveheight[idx]))

How it works...

In this recipe, we first imported numpy and pandas for data manipulation. We then imported sesd, our anomaly detection package. Next, we got the raw data ready for machine learning. We did this by removing the data that clearly had an issue, such as sensors that were not working properly. We then filtered the data into one column. We then put that column through the seasonal ESD algorithm.

Similar to the Z-score algorithm in the first recipe, this recipe uses an online approach. It uses Seasonal and Trend decomposition using Loess (STL) decomposition as a preprocessing step before doing anomaly detection. A data source may have a trend and a season, as shown in the following graph:

What decomposition allows you to do is look at the trend and the seasonality independently (as shown in the following trend graph). This helps to ensure the data is not affected by seasonality:

The Seasonal ESD algorithm is more complicated than the Z-score algorithm. For example, Z-score...

Detecting spikes with streaming analytics

Stream Analytics is a tool that connects IoT Hub to other resources within Azure using a SQL interface. Stream Analytics moves data from IoT Hub to Cosmos DB, storage blobs, serverless functions, or a number of other scalable options. Streaming analytics has a few functions built-in, and you can create more functions yourself using JavaScript; anomaly detection is one of those functions. In this example, we are going to use Raspberry Pi to stream gyroscope and acceleration data to IoT Hub. Then we'll connect streaming analytics and, using its SQL interface, we will output only the anomalous results.

Getting ready

For this experiment, you will need IoT Hub. Next, you'll need to create a streaming analytics job. To do this, you will go into the Azure portal and create a new streaming analytics job through the Create new resource wizard. After you create a new streaming analytics job, you will see that there are three main components on the Overview page. These are inputs, outputs, and queries. Inputs, as the name suggests, are the streams you want to input; in our case, we are inputting IoT Hub. To connect to IoT Hub you need to click on Inputs, then select the input type of IoT Hub, and then select the IoT Hub instance you created for this recipe. Next, you can create an output. This could be a database such as Cosmos DB or a function app so that you can send alerts through any number of messaging systems. For the sake of simplicity, we are not going to specify output for this recipe. For testing purposes, you can review the output on the Stream Analytics query editor.

...

How to do it...

The steps for this recipe are as follows:

  1.  Import the libraries:
#device.py

import time
from azure.iot.device import IoTHubDeviceClient, Message
from sense_hat import SenseHat
import json
  1. Declare the variables:
client = IoTHubDeviceClient.create_from_connection_string("your device key here")
sense = SenseHat()
sense.set_imu_config(True, True, True)
  1. Get a joined device value:
def combined_value(data):
return float(data['x'])+float(data['y'])+float(data['z'])
  1. Get and send the data:
while True:
gyro = combined_value(sense.gyro_raw)
accel = combined_value(sense.accel_raw)

msg_txt_formatted = msg.format(gyro=gyro, accel=accel)
message = Message(msg_txt_formatted)
client.send_message(message)

time.sleep(1)
  1. Create a SQL query that uses the AnomalyDetection_SpikeAndDip algorithm to detect anomalies:
    SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(gyro AS float) AS gyro,
AnomalyDetection_SpikeAndDip...

How it works...

To import the libraries on the Raspberry Pi you will need to log in to the Raspberry Pi and use pip to install azure-iot-device and SenseHat. Next, you'll need to go onto that machine and create a file called device.py. Then you will import the time, Azure IoT Hub, Sense HAT, and json libraries. Next, you'll need to go into IoT Hub and create a device through the portal, get your connection string, and enter it in the spot where it says Your device key here. You then initialize SenseHat and set the internal measuring units to True, initializing our sensors. Then create a helper function that combines our x, y, and z data. Next, get the data from sensors and send that to IoT Hub. Finally, wait for a second before sending that data again.

Next, go into the Stream Analytics job that you had set up and click on Edit query. From here, create a common table expression. A common table expression allows you to make a complex query more simple. Then use...

Detecting anomalies on the edge

In this final recipe, we are going to use SenseHat on the Raspberry Pi to collect data, train that data on our local computer, then deploy a machine learning model on the device. To avoid redundancy after recording your data you will need to run either of the recipes on autoencoders or isolated forest from earlier in this chapter.

People use motion sensors in IoT to ensure shipping containers are safely transported aboard ships. For example, proving that a shipping container was dropped in a particular harbor would help with insurance claims. They are also used for worker safety to detect falls or workers acting unsafely. They are also used on devices that are prone to vibration when malfunctioning. Some examples of this are washing machines, wind turbines, and cement mixers.

During the data collection phase, you will need to safely simulate falling or working unsafely. You could also put a sensor on a washing machine that is unbalanced....

Getting ready

To get ready for this you will need a Raspberry Pi with a Sense HAT. You will need a way of getting data from the Raspberry Pi. You can do this by enabling SSH or using a USB drive. On the Raspberry Pi, you will need to use pip to install sense_hat and numpy.

How to do it...

The steps for this recipe are as follows:

  1.  Import the libraries:
#Gather.py

import numpy as np
from sense_hat import SenseHat
import json
import time
  1. Initialize the variables:
sense = SenseHat()
sense.set_imu_config(True, True, True)
readings = 1000
gyro,accel = sense.gyro_raw, sense.accel_raw
actions = ['normal', 'anomolous']
dat = np.array([gyro['x'], gyro['y'], gyro['z'], accel['x'],
accel['y'], accel['z']])
x = 1
  1. Wait for the user input to start:
for user_input in actions:
activity = input('Hit enter to record '+user_input + \
' activity')
  1. Gather the data:
    x = 1
while x < readings:
x = x + 1
time.sleep(0.1)
gyro,accel = sense.gyro_raw, sense.accel_raw
dat = np.vstack([dat, [[gyro['x'], gyro['y'], gyro['z'],
accel['x'], accel...

How it works...

We create two files – one that gathers information (called Gather.py) and another that detects the anomalies on the device (called AnomalyDetection.py). In the Gather.py file, we import the classes, initialize SenseHat, set a variable for the number of readings we will be collecting, get both the gyroscopic and accelerometer readings, create an array of normal anonymous strings, and set the initial gyroscope and sensor ratings. Then we loop through our actions and tell the user to press Enter when they want to record normal greetings, and then tell them to press Enter when they want to record anomalous readings. From there, we gather data and give feedback to the user to let them know how many more data points they will be gathering. At this point, you should be using the device in a way that is normal for its use, such as fall detection by holding it close to your body. Then, for the next loop of anomalous readings, you drop the device. Finally, we create...

lock icon
The rest of the chapter is locked
You have been reading a chapter from
Artificial Intelligence for IoT Cookbook
Published in: Mar 2021Publisher: PacktISBN-13: 9781838981983
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
undefined
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $15.99/month. Cancel anytime

Author (1)

author image
Michael Roshak

Michael Roshak is a cloud architect and strategist with extensive subject matter expertise in enterprise cloud transformation programs and infrastructure modernization through designing, and deploying cloud-oriented solutions and architectures. He is responsible for providing strategic advisory for cloud adoption, consultative technical sales, and driving broad cloud services consumption with highly strategic accounts across multiple industries.
Read more about Michael Roshak