While a hierarchical data warehouse stores data in files of folders, a typical Hadoop based system relies on a flat architecture to store your data. Without proper data governance or a clear understanding of what your data is all about, there is an undeniable chance of turning data lakes into swamps, where an interesting dataset such as GDELT would be nothing more than a folder containing a vast amount of unstructured text files. For that reason, data classification is probably one of the most widely used machine learning techniques in large scale organizations as it allows users to properly categorize and label their data, publish these categories as part of their metadata solutions, and therefore access specific information in the most efficient way. Without a proper tagging mechanism executed upfront, ideally at ingest, finding all news articles about a specific topic would require parsing the entire dataset looking for specific...
You're reading from Mastering Spark for Data Science
Data classification is a supervised learning technique. This means that you can only predict the labels and categories you have learned from a training dataset. Because the latter has to be properly labeled, this becomes the main challenge which we will be addressing in this chapter.
None of our data, within the context of news articles, has been properly labeled upfront; there is strictly nothing we can learn out of it. Common sense for data scientists is to start labeling some input records manually, records that will serve as a training dataset. However, because the number of classes may be relatively large, at least in our case (hundreds of labels), the amount of data to label could be significant (thousands of articles) and would require tremendous effort. A first solution is to outsource this laborious task to a "Mechanical Turk", the term being used as reference to one of the most famous hoaxes in history where an automated chess player fooled...
Building a real-time application differs from batch processing in terms of architecture and components involved. While the latter can easily be built bottom-up, where programmers add functionalities and components when needed, the former usually needs to be built top-down with a solid architecture in place. In fact, due to the constraints of volume and velocity (or veracity in a streaming context), an inadequate architecture will prevent programmers from adding new functionalities. One always needs a clear understanding of how streams of data are interconnected, how and where they are processed, cached, and retrieved.
In terms of stream processing using Apache Spark, there are two emerging architectures that should be considered: Lambda architecture and Kappa architecture. Before we delve into the details of the two architectures, let's discuss the problems they are trying to solve, what they have in common, and in what context...
Similar to a batch processing job, we create a new Spark application using a SparkConf
object and a context. In a streaming application, the context is created using a batch size parameter that will be used for any incoming stream (both GDELT and Twitter layers, part of the same context, will both be tied to the same batch size). GDELT data being published every 15 minutes, our batch size will be naturally 15 minutes as we want to predict categories in a pseudo real-time basis:
val sparkConf = new SparkConf().setAppName("GZET") val ssc = new StreamingContext(sparkConf, Minutes(15)) val sc = ssc.sparkContext
There are many ways of publishing external data into a Spark streaming application. One could open a simple socket and start publishing data over the netcat utility, or could be streaming data through a Flume agent monitoring an external directory. Production systems usually use Kafka as a default broker for both its high throughput and...
The second main constraint of using Twitter is the constraint of noise. When most classification models are trained against dozens of different classes, we will be working against hundreds of thousands of distinct hashtags per day. We will be focusing on popular topics only, meaning the trending topics occurring within a defined batch window. However, because a 15 minute batch size on Twitter will not be sufficient enough to detect trends, we will apply a 24-hour moving window where all hashtags will be observed and counted, and where only the most popular ones will be kept.
Using this approach, we reduce the noise of unpopular hashtags, making our classifier much more accurate and scalable, and significantly reducing the number of articles to fetch as we only focus on trending URLs mentioned alongside popular topics. This allows us to save lots of time and resources spent analyzing irrelevant data (with regards...
We've already introduced web scrapers in a previous chapter, using Goose library recompiled for Scala 2.11. We will create a method that takes a DStream
as input instead of an RDD, and only keep the valid text content with at least 500 words. We will finally return a stream of text alongside the associated hashtags (the popular ones):
def fetchHtmlContent(tStream: DStream[(String, Array[String])]) = { tStream .reduceByKey(_++_.distinct) .mapPartitions { it => val htmlFetcher = new HtmlHandler() val goose = htmlFetcher.getGooseScraper val sdf = new SimpleDateFormat("yyyyMMdd") it.map { case (url, tags) => val content = htmlFetcher.fetchUrl(goose, url, sdf) (content, tags) } .filter { case (contentOpt, tags) => contentOpt.isDefined && contentOpt.get.body.isDefined && contentOpt.get.body.get.split("\\s+").length >= 500 } .map { case (contentOpt...
Our ultimate goal is to train a new classifier at each batch (every 15 minutes). However, the classifier will be trained using more than just the few records we downloaded within that current batch. We somehow have to cache the text content over a larger period of time (set to 24h) and retrieve it whenever we need to train a new classifier. With Larry Wall's quote in mind, we will try to be as lazy as possible maintaining the data consistency over this online layer. The basic idea is to use a Time to live (TTL) parameter that will seamlessly drop any outdated record. The Cassandra database provides this feature out of the box (so does HBase or Accumulo), but Elasticsearch is already part of our core architecture and can easily be used for that purpose. We will create the following mapping for the gzet
/twitter
index with the _ttl
parameter enabled:
$ curl -XPUT 'http://localhost:9200/gzet' $ curl -XPUT 'http://localhost:9200/gzet/_mapping/twitter' -d...
The remaining part of our application is to start classifying data. As introduced earlier, the reason for using Twitter was to steal ground truth from external resources. We will train a Naive Bayes classification model using Twitter data while predicting categories of the GDELT URLs. The convenient side of using a Kappa architecture approach is that we do not have to worry much about exporting some common pieces of code across different applications or different environments. Even better, we do not have to export/import our model between a batch and a speed layer (both GDELT and Twitter, sharing the same Spark context, are part of the same physical layer). We could save our model to HDFS for auditing purposes, but we simply need to pass a reference to a Scala object between both classes.
The accuracy of a classification algorithm should be measured against a test dataset, meaning a labeled dataset that was not included in the training phase. We do not have access to such a dataset (this is the reason we bootstrapped our model initially), hence we cannot compare the original versus predicted categories. Instead of the true accuracy, we can estimate an overall confidence level by visualizing our results. With all our data on Elasticsearch, we build a Kibana dashboard with an additional plugin for tag cloud visualizations (https://github.com/stormpython/tagcloud).
The following figure shows the number of GDELT articles that were analyzed and predicted on May 1, 2016. Around 18,000 articles have been downloaded in less than 24h (by batch interval of 15 minutes). At each batch, we observe no more than 100 distinct predicted hashtags; this is fortunate as we only kept the top 100 popular hashtags occurring within a 24h time window. Besides, it gives...
Although we were impressed with many of the overall model consistencies, we appreciate that we certainly did not build the most accurate classification system ever. Crowd sourcing this task to millions of users was an ambitious task and by far not the easiest way of getting clearly defined categories. However, this simple proof of concept shows us a few important things:
It technically validates our Spark Streaming architecture.
It validates our assumption of bootstrapping GDELT using an external dataset.
It made us lazy, impatient, and proud.
It learns without any supervision and eventually gets better at every batch.
No data scientist can build a fully functional and highly accurate classification system in just a few weeks, especially not on dynamic data; a proper classifier needs to be evaluated, trained, re-evaluated, tuned, and retrained for at least the first few months, and then re-evaluated every half a year at the very least. Our goal here was to describe the components involved...