Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Leveraging Python in the World of Big Data

Save for later
  • 1560 min read
  • 2015-09-07 00:00:00

article-image

 We are generating more and more data day by day. We have generated more data this century than in the previous century and we are currently only 15 years into this century. big data is the new buzz word and everyone is talking about it. It brings new possibilities. Google Translate is able to translate any language, thanks to big data. We are able to decode our human genome due to it. We can predict the failure of a turbine and do the required maintenance on it because of big data.

There are three Vs of big data and they are defined as follows:

  • Volume: This defines the size of the data. Facebook has petabytes of data on its users.
  • Velocity: This is the rate at which data is generated.
  • Variety: Data is not only in a tabular form. We can get data from text, images, and sound. Data comes in the form of JSON, XML, and other types as well.

Let's take a look at the following screenshot:

 leveraging-python-world-big-data-img-0

In this article by Samir Madhavan, author of Mastering Python for Data Science, we'll learn how to use Python in the world of big data by doing the following:

  • Understanding Hadoop
  • Writing a MapReduce program in Python
  • Using a Hadoop library

(For more resources related to this topic, see here.)

What is Hadoop?

According to the Apache Hadoop's website, Hadoop stores data in a distributed manner and helps in computing it. It has been designed to scale easily to any number of machines with the help of computing power and storage. Hadoop was created by Doug Cutting and Mike Cafarella in the year 2005. It was named after Doug Cutting's son's toy elephant.

 leveraging-python-world-big-data-img-1

The programming model

Hadoop is a programming paradigm that takes a large distributed computation as a sequence of distributed operations on large datasets of key-value pairs. The MapReduce framework makes use of a cluster of machines and executes MapReduce jobs across these machines. There are two phases in MapReduce—a mapping phase and a reduce phase. The input data to MapReduce is key value pairs of data.

During the mapping phase, Hadoop splits the data into smaller pieces, which is then fed to the mappers. These mappers are distributed across machines within the cluster. Each mapper takes the input key-value pairs and generates intermediate key-value pairs by invoking a user-defined function within them.

After the mapper phase, Hadoop sorts the intermediate dataset by key and generates a set of key-value tuples so that all the values belonging to a particular key are together.

During the reduce phase, the reducer takes in the intermediate key-value pair and invokes a user-defined function, which then generates a output key-value pair. Hadoop distributes the reducers across the machines and assigns a set of key-value pairs to each of the reducers.

 leveraging-python-world-big-data-img-2Data processing through MapReduce

The MapReduce architecture

MapReduce has a master-slave architecture, where the master is the JobTracker and TaskTracker is the slave. When a MapReduce program is submitted to Hadoop, the JobTracker assigns the mapping/reducing task to the TaskTracker and it takes of the task over executing the program.

The Hadoop DFS

Hadoop's distributed filesystem has been designed to store very large datasets in a distributed manner. It has been inspired by the Google File system, which is a proprietary distributed filesystem designed by Google. The data in HDFS is stored in a sequence of blocks, and all blocks are of the same size except for the last block. The block sizes are configurable in Hadoop.

Hadoop's DFS architecture

It also has a master/slave architecture where NameNode is the master machine and DataNode is the slave machine. The actual data is stored in the data node. The NameNode keeps a tab on where certain kinds of data are stored and whether it has the required replication. It also helps in managing a filesystem by creating, deleting, and moving directories and files in the filesystem.

Python MapReduce

Hadoop can be downloaded and installed from https://hadoop.apache.org/. We'll be using the Hadoop streaming API to execute our Python MapReduce program in Hadoop. The Hadoop Streaming API helps in using any program that has a standard input and output as a MapReduce program.

We'll be writing three MapReduce programs using Python, they are as follows:

  • A basic word count
  • Getting the sentiment Score of each review
  • Getting the overall sentiment score from all the reviews

The basic word count

We'll start with the word count MapReduce. Save the following code in a word_mapper.py file:

import sys
for l in sys.stdin:
   # Trailing and Leading white space is removed
   l = l.strip()

   # words in the line is split
   word_tokens = l.split()

# Key Value pair is outputted
for w in word_tokens:
   print '%st%s' % (w, 1)

In the preceding mapper code, each line of the file is stripped of the leading and trailing white spaces. The line is then divided into tokens of words and then these tokens of words are outputted as a key value pair of 1.

Save the following code in a word_reducer.py file:

from operator import itemgetter
import sys

current_word_token = None
counter = 0
word = None

# STDIN Input
for l in sys.stdin:
# Trailing and Leading white space is removed
l = l.strip()

# input from the mapper is parsed
word_token, counter = l.split('t', 1)

# count is converted to int
try:
   counter = int(counter)
   except ValueError:
     # if count is not a number then ignore the line
     continue

#Since Hadoop sorts the mapper output by key, the following
# if else statement works
if current_word_token == word_token:
   current_counter += counter
else:
   if current_word_token:
     print '%st%s' % (current_word_token, current_counter)

     current_counter = counter
     current_word_token = word_token

# The last word is outputed
if current_word_token == word_token:
print '%st%s' % (current_word_token, current_counter)

In the preceding code, we use the current_word_token parameter to keep track of the current word that is being counted. In the for loop, we use the word_token parameter and a counter to get the value out of the key-value pair. We then convert the counter to an int type.

In the if/else statement, if the word_token value is same as the previous instance, which is current_word_token, then we keep counting else statement's value. If it's a new word that has come as the output, then we output the word and its count. The last if statement is to output the last word.

We can check out if the mapper is working fine by using the following command:

$ echo 'dolly dolly max max jack tim max' | ./BigData/word_mapper.py

The output of the preceding command is shown as follows:

dolly1
dolly1
max1
max1
jack1
tim1
max1

Now, we can check if the reducer is also working fine by piping the reducer to the sorted list of the mapper output:

$ echo "dolly dolly max max jack tim max" | ./BigData/word_mapper.py | sort -k1,1 | ./BigData/word_reducer.py

The output of the preceding command is shown as follows:

dolly2
jack1
max3
tim1

Now, let's try to apply the same code on a local file containing the summary of mobydick:

$ cat ./Data/mobydick_summary.txt | ./BigData/word_mapper.py | sort   -k1,1 | ./BigData/word_reducer.py

The output of the preceding command is shown as follows:

a28
A2
abilities1
aboard3
about2

A sentiment score for each review

We'll extend this to write a MapReduce program to determine the sentiment score for each review. Write the following code in the senti_mapper.py file:

import sys
import re

positive_words = open('positive-words.txt').read().split('n')
negative_words = open('negative-words.txt').read().split('n')

def sentiment_score(text, pos_list, neg_list):
positive_score = 0
negative_score = 0

for w in text.split(''):
   if w in pos_list: positive_score+=1
   if w in neg_list: negative_score+=1

return positive_score - negative_score

for l in sys.stdin:
# Trailing and Leading white space is removed
l = l.strip()

#Convert to lower case
l = l.lower()

#Getting the sentiment score
score = sentiment_score(l, positive_words, negative_words)

# Key Value pair is outputted
print '%st%s' % (l, score)

In the preceding code, we used the sentiment_score function, which was designed to give the sentiment score as output. For each line, we strip the leading and trailing white spaces and then get the sentiment score for a review. Finally, we output a sentence and the score.

For this program, we don't require a reducer as we can calculate the sentiment in the mapper itself and we just have to output the sentiment score.

Let's test whether the mapper is working fine locally with a file containing the reviews for Jurassic World:

$ cat ./Data/jurassic_world_review.txt | ./BigData/senti_mapper.py

there is plenty here to divert, but little to leave you enraptored. such is the fate of the sequel: bigger. louder. fewer teeth.0
if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.-1
there's a problem when the most complex character in a film is the dinosaur-2
not so much another bloated sequel as it is the fruition of dreams deferred in the previous films. too bad the genre dictates that those dreams are once again destined for disaster.-2

We can see that our program is able to calculate the sentiment score well.

The overall sentiment score

To calculate the overall sentiment score, we would require the reducer and we'll use the same mapper but with slight modifications.

Here is the mapper code that we'll use stored in the overall_senti_mapper.py file:

import sys
import hashlib

positive_words = open('./Data/positive-words.txt').read().split('n')
negative_words = open('./Data/negative-words.txt').read().split('n')

def sentiment_score(text, pos_list, neg_list):
positive_score = 0
negative_score = 0

for w in text.split(''):
   if w in pos_list: positive_score+=1
   if w in neg_list: negative_score+=1
return positive_score - negative_score

for l in sys.stdin:
# Trailing and Leading white space is removed
l = l.strip()
#Convert to lower case
l = l.lower()
#Getting the sentiment score
score = sentiment_score(l, positive_words, negative_words)
#Hashing the review to use it as a string
hash_object = hashlib.md5(l)
# Key Value pair is outputted
print '%st%s' % (hash_object.hexdigest(), score)

This mapper code is similar to the previous mapper code, but here we use the MD5 hash library to review and then to get the output as the key.

Here is the reducer code that is utilized to determine the overall sentiments score of the movie. Store the following code in the overall_senti_reducer.py file:

from operator import itemgetter
import sys

total_score = 0

# STDIN Input
for l in sys.stdin:
# input from the mapper is parsed
key, score = l.split('t', 1)
# count is converted to int
try:
   score = int(score)
except ValueError:
   # if score is not a number then ignore the line
   continue

#Updating the total score
total_score += score

print '%s' % (total_score,)

In the preceding code, we strip the value containing the score and we then keep adding to the total_score variable. Finally, we output the total_score variable, which shows the sentiment of the movie.

Let's locally test the overall sentiment on Jurassic World, which is a good movie, and then test the sentiment for the movie, Unfinished Business, which was critically deemed poor:

$ cat ./Data/jurassic_world_review.txt |     ./BigData/overall_senti_mapper.py | sort -k1,1 |     ./BigData/overall_senti_reducer.py
19

$ cat ./Data/unfinished_business_review.txt |     ./BigData/overall_senti_mapper.py | sort -k1,1 |     ./BigData/overall_senti_reducer.py
-8

We can see that our code is working well and we also see that Jurassic World has a more positive score, which means that people have liked it a lot. On the contrary, Unfinished Business has a negative value, which shows that people haven't liked it much.

Deploying the MapReduce code on Hadoop

We'll create a directory for data on Moby Dick, Jurassic World, and Unfinished Business in the HDFS tmp folder:

$ Hadoop fs -mkdir /tmp/moby_dick
$ Hadoop fs -mkdir /tmp/jurassic_world
$ Hadoop fs -mkdir /tmp/unfinished_business

Let's check if the folders are created:

$ Hadoop fs -ls /tmp/

Found 6 items
drwxrwxrwx   - mapred Hadoop         0 2014-11-14 15:42 /tmp/Hadoop-mapred
drwxr-xr-x   - samzer Hadoop        0 2015-06-18 18:31 /tmp/jurassic_world
drwxrwxrwx   - hdfs   Hadoop         0 2014-11-14 15:41 /tmp/mapred
drwxr-xr-x   - samzer Hadoop         0 2015-06-18 18:31 /tmp/moby_dick
drwxr-xr-x   - samzer Hadoop         0 2015-06-16 18:17 /tmp/temp635459726
drwxr-xr-x   - samzer Hadoop         0 2015-06-18 18:31 /tmp/unfinished_business

Once the folders are created, let's copy the data files to the respective folders.

$ Hadoop fs -copyFromLocal ./Data/mobydick_summary.txt /tmp/moby_dick
$ Hadoop fs -copyFromLocal ./Data/jurassic_world_review.txt /tmp/jurassic_world
$ Hadoop fs -copyFromLocal ./Data/unfinished_business_review.txt /tmp/unfinished_business

Let's verify that the file is copied:

$ Hadoop fs -ls /tmp/moby_dick
$ Hadoop fs -ls /tmp/jurassic_world
$ Hadoop fs -ls /tmp/unfinished_business

Found 1 items
-rw-r--r--   3 samzer Hadoop       5973 2015-06-18 18:34 /tmp/moby_dick/mobydick_summary.txt

Found 1 items
-rw-r--r--   3 samzer Hadoop       3185 2015-06-18 18:34 /tmp/jurassic_world/jurassic_world_review.txt

Found 1 items
-rw-r--r--   3 samzer Hadoop       2294 2015-06-18 18:34 /tmp/unfinished_business/unfinished_business_review.txt

We can see that files have been copied successfully.

With the following command, we'll execute our mapper and reducer's script in Hadoop. In this command, we define the mapper, reducer, input, and output file locations, and then use Hadoop streaming to execute our scripts.

Let's execute the word count program first:

$ Hadoop jar /usr/lib/Hadoop-0.20-mapreduce/contrib/streaming/Hadoop-*streaming*.jar -file ./BigData/word_mapper.py -mapper word_mapper.py -file ./BigData/word_reducer.py -reducer word_reducer.py -input /tmp/moby_dick/* -output /tmp/moby_output

Let's verify that the word count MapReduce program is working successfully:

$ Hadoop fs -cat /tmp/moby_output/*

The output of the preceding command is shown as follows:

(Queequeg1
A2
Africa1
Africa,1
After1
Ahab13
Ahab,1
Ahab's6
All1
American1
As1
At1
Bedford,1
Bildad1
Bildad,1
Boomer,2
Captain1
Christmas1
Day1
Delight,1
Dick6
Dick,2

The program is working as intended. Now, we'll deploy the program that calculates the sentiment score for each of the reviews. Note that we can add the positive and negative dictionary files to the Hadoop streaming:

$ Hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-*streaming*.jar -file ./BigData/word_mapper.py -mapper word_mapper.py -file ./BigData/word_reducer.py -reducer word_reducer.py -input /tmp/moby_dick/* -output /tmp/moby_output

In the preceding code, we use the Hadoop command with the Hadoop streaming JAR file and then define the mapper and reducer files, and finally, the input and output directories in Hadoop.

Let's check the sentiments score of the movies review:

$ Hadoop fs -cat /tmp/jurassic_output/*

The output of the preceding command is shown as follows:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime

"jurassic world," like its predecessors, fills up the screen with roaring, slathering, earth-shaking dinosaurs, then fills in mere humans around the edges. it's a formula that works as well in 2015 as it did in 1993.3
a perfectly fine movie and entertaining enough to keep you watching until the closing credits.4
an angry movie with a tragic moral ... meta-adoration and criticism ends with a genetically modified dinosaur fighting off waves of dinosaurs.-3
if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.-1

This program is also working as intended. Now, we'll try out the overall sentiment of a movie:

$ Hadoop jar /usr/lib/Hadoop-0.20-mapreduce/contrib/streaming/Hadoop-*streaming*.jar -file ./BigData/overall_senti_mapper.py -mapper

Let's verify the result:

$ Hadoop fs -cat /tmp/unfinished_business_output/*

The output of the preceding command is shown as follows:

-8

We can see that the overall sentiment score comes out correctly from MapReduce. Here is a screenshot of the JobTracker status page:

 leveraging-python-world-big-data-img-3

The preceding image shows a portal where the jobs submitted to the JobTracker can be viewed and the status can be seen. This can be seen on port 50070 of the master system.

From the preceding image, we can see that a job is running, and the status above the image shows that the job has been completed successfully.

File handling with Hadoopy

Hadoopy is a library in Python, which provides an API to interact with Hadoop to manage files and perform MapReduce on it. Hadoopy can be downloaded from http://www.Hadoopy.com/en/latest/tutorial.html#installing-Hadoopy.

Let's try to put a few files in Hadoop through Hadoopy in a directory created within HDFS, called data:

$ Hadoop fs -mkdir data

Here is the code that puts the data into HDFS:

importHadoopy
import os
hdfs_path = ''
def read_local_dir(local_path):
for fn in os.listdir(local_path):
   path = os.path.join(local_path, fn)
   if os.path.isfile(path):
     yield path

def main():
local_path = './BigData/dummy_data'
for file in  read_local_dir(local_path):
   Hadoopy.put(file, 'data')
   print"The file %s has been put into hdfs"% (file,)

if __name__ =='__main__':
main()
The file ./BigData/dummy_data/test9 has been put into hdfs
The file ./BigData/dummy_data/test7 has been put into hdfs
The file ./BigData/dummy_data/test1 has been put into hdfs
The file ./BigData/dummy_data/test8 has been put into hdfs
The file ./BigData/dummy_data/test6 has been put into hdfs
The file ./BigData/dummy_data/test5 has been put into hdfs
The file ./BigData/dummy_data/test3 has been put into hdfs
The file ./BigData/dummy_data/test4 has been put into hdfs
The file ./BigData/dummy_data/test2 has been put into hdfs

In the preceding code, we list all the files in a directory and then put each of the files into Hadoop using the put() method of Hadoopy.

Let's check if all the files have been put into HDFS:

$ Hadoop fs -ls data

The output of the preceding command is shown as follows:

Found 9 items
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test1
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test2
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test3
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test4
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test5
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test6
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test7
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test8
-rw-r--r--   3 samzer Hadoop         0 2015-06-23 00:19 data/test9

So, we have successfully been able to put files into HDFS.

Pig

Pig is a platform that has a very expressive language to perform data transformations and querying. The code that is written in Pig is done in a scripting manner and this gets compiled to MapReduce programs, which execute on Hadoop. The following image is the logo of Pig Latin:

 leveraging-python-world-big-data-img-4The Pig logo

Pig helps in reducing the complexity of raw-level MapReduce programs, and enables the user to perform fast transformations.

Pig Latin is the textual language that can be learned from http://pig.apache.org/docs/r0.7.0/piglatin_ref2.html.

We'll be covering how to perform the top 10 most occurring words with Pig, and then we'll see how you can create a function in Python that can be used in Pig.

Let's start with the word count. Here is the Pig Latin code, which you can save in thepig_wordcount.py file:

data = load '/tmp/moby_dick/';

word_token = foreach data generate flatten(TOKENIZE((chararray)$0)) as word;
group_word_token = group word_token by word; count_word_token = foreach group_word_token generate COUNT(word_token) as cnt, group; sort_word_token = ORDER count_word_token by cnt DESC; top10_word_count = LIMIT sort_word_token 10; DUMP top10_word_count;

In the preceding code, we can load the summary of Moby Dick, which is then tokenized line by line and is basically split into individual elements. The flatten function converts a collection of individual word tokens in a line to a row-by-row form. We then group by the words and then take a count of the words for each word. Finally, we sort the count of words in a descending order and then we limit the count of the words to the first 10 rows to get the top 10 most occurring words.

Let's execute the preceding pig script:

$ pig ./BigData/pig_wordcount.pig

The output of the preceding command is shown as follows:

(83,the)
(36,and)
(28,a)
(25,of)
(24,to)
(15,his)
(14,Ahab)
(14,Moby)
(14,is)
(14,in)

We are able to get our top 10 words. Let's now create a user-defined function with Python, which will be used in Pig.

We'll define two user-defined functions to score positive and negative sentiments of a sentence.

The following code is the UDF used to score the positive sentiment and it's available in the positive_sentiment.py file:

positive_words = [ 'a+', 'abound', 'abounds', 'abundance', 'abundant', 'accessable', 'accessible', 'acclaim', 'acclaimed', 'acclamation', 'acco$ ]

@outputSchema("pnum:int")
def sentiment_score(text):
positive_score = 0
for w in text.split(''):
   if w in positive_words: positive_score+=1
return positive_score

In the preceding code, we define the positive word list, which is used by the sentiment_score() function. The function checks for the positive words in a sentence and finally outputs their total count. There is an outputSchema() decorator that is used to tell Pig what type of data is being outputted, which in our case is int.

Here is the code to score the negative sentiment and it's available in the negative_sentiment.py file. The code is almost similar to the positive sentiment:

negative_words = ['2-faced', '2-faces', 'abnormal', 'abolish', 'abominable', 'abominably', 'abominate', 'abomination', 'abort', 'aborted', 'ab$....]

@outputSchema("nnum:int")
def sentiment_score(text):
negative_score = 0
for w in text.split(''):
   if w in negative_words: negative_score-=1
return  negative_score

The following code is used by Pig to score the sentiments of the Jurassic World reviews and its available in the pig_sentiment.pig file:

register 'positive_sentiment.py' using org.apache.pig.scripting.jython.JythonScriptEngine as positive;

register 'negative_sentiment.py' using org.apache.pig.scripting.jython.JythonScriptEngine as negative;

data = load '/tmp/jurassic_world/*';

feedback_sentiments = foreach data generate LOWER((chararray)$0) as feedback, positive.sentiment_score(LOWER((chararray)$0)) as psenti,

negative.sentiment_score(LOWER((chararray)$0)) as nsenti;

average_sentiments = foreach feedback,feedback_sentiments generate psenti + nsenti;

dump average_sentiments;

In the preceding Pig script, we first register the Python UDF scripts using the register command and give them an appropriate name. We then load our Jurassic World review. We then convert our reviews to lowercase and score the positive and negative sentiments of a review. Finally, we add the score to get the overall sentiments of a review.

Let's execute the Pig script and see the results:

$ pig ./BigData/pig_sentiment.pig

The output of the preceding command is shown as follows:

(there is plenty here to divert, but little to leave you enraptored. such is the fate of the sequel: bigger. louder. fewer teeth.,0)
(if you limit your expectations for jurassic world to "more teeth," it will deliver on that promise. if you dare to hope for anything more-relatable characters, narrative coherence-you'll only set yourself up for disappointment.,-1)

(there's a problem when the most complex character in a film is the dinosaur,-2)

(not so much another bloated sequel as it is the fruition of dreams deferred in the previous films. too bad the genre dictates that those dreams are once again destined for disaster.,-2)

(a perfectly fine movie and entertaining enough to keep you watching until the closing credits.,4)

(this fourth installment of the jurassic park film series shows some wear and tear, but there is still some gas left in the tank. time is spent to set up the next film in the series. they will keep making more of these until we stop watching.,0)

We have successfully scored the sentiments of the Jurassic World review using the Python UDF in Pig.

Python with Apache Spark

Apache Spark is a computing framework that works on top of HDFS and provides an alternative way of computing that is similar to MapReduce. It was developed by AmpLab of UC Berkeley. Spark does its computation mostly in the memory because of which, it is much faster than MapReduce, and is well suited for machine learning as it's able to handle iterative workloads really well.

 leveraging-python-world-big-data-img-5

Spark uses the programming abstraction of RDDs (Resilient Distributed Datasets) in which data is logically distributed into partitions, and transformations can be performed on top of this data.

Python is one of the languages that is used to interact with Apache Spark, and we'll create a program to perform the sentiment scoring for each review of Jurassic Park as well as the overall sentiment.

You can install Apache Spark by following the instructions at https://spark.apache.org/docs/1.0.1/spark-standalone.html.

Scoring the sentiment

Here is the Python code to score the sentiment:

from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
positive_words = open('positive-words.txt').read().split('n')
negative_words = open('negative-words.txt').read().split('n')
def sentiment_score(text, pos_list, neg_list):
positive_score = 0
negative_score = 0
for w in text.split(''):
   if w in pos_list: positive_score+=1
   if w in neg_list: negative_score+=1
return positive_score - negative_score
if __name__ == "__main__":
if len(sys.argv) != 2:
   print("Usage: sentiment <file>", file=sys.stderr)
   exit(-1)
sc = SparkContext(appName="PythonSentiment")
lines = sc.textFile(sys.argv[1], 1)
scores = lines.map(lambda x: (x, sentiment_score(x.lower(),   positive_words, negative_words)))
output = scores.collect()
for (key, score) in output:
   print("%s: %i" % (key, score))
sc.stop()

In the preceding code, we define our standard sentiment_score() function, which we'll be reusing. The if statement checks whether the Python script and the text file is given. The sc variable is a Spark Context object with the PythonSentiment app name. The filename in the argument is passed into Spark through the textFile() method of the sc variable. In the map() function of Spark, we define a lambda function, where each line of the text file is passed, and then we obtain the line and its respective sentiment score. The output variable gets the result, and finally, we print the result on the screen.

Let's score the sentiment of each of the reviews of Jurassic World. Replace the <hostname> with your hostname, this should suffice:

$ ~/spark-1.3.0-bin-cdh4/bin/spark-submit --master spark://<hostname>:7077 ./BigData/spark_sentiment.py hdfs://localhost:8020/tmp/jurassic_world/*

We'll get the following output for the preceding command:

There is plenty here to divert but little to leave you enraptured. Such is the fate of the sequel: Bigger, Louder, Fewer teeth: 0

If you limit your expectations for Jurassic World to more teeth, it will deliver on this promise. If you dare to hope for anything more—relatable characters or narrative coherence—you'll only set yourself up for disappointment:-1

We can see that our Spark program was able to score the sentiment for each of the reviews. The number in the end of the output of the sentiment score shows that if the review has been positive or negative, the higher the number of the sentiment score—the better the review and the more negative the number of the sentiment score—the more negative the review has been.

We use the Spark Submit command with the following parameters:

  • A master node of the Spark system
  • A Python script containing the transformation commands
  • An argument to the Python script

The overall sentiment

Here is a Spark program to score the overall sentiment of all the reviews:

from __future__ import print_function
import sys
from operator import add
from pyspark import SparkContext
positive_words = open('positive-words.txt').read().split('n')
negative_words = open('negative-words.txt').read().split('n')
def sentiment_score(text, pos_list, neg_list):
positive_score = 0
negative_score = 0
for w in text.split(''):
   if w in pos_list: positive_score+=1
   if w in neg_list: negative_score+=1
return positive_score - negative_score
if __name__ =="__main__":
if len(sys.argv) != 2:
   print("Usage: Overall Sentiment <file>", file=sys.stderr)
   exit(-1)
sc = SparkContext(appName="PythonOverallSentiment")
lines = sc.textFile(sys.argv[1], 1)
scores = lines.map(lambda x: ("Total",   sentiment_score(x.lower(), positive_words, negative_words)))
.reduceByKey(add)
output = scores.collect()
for (key, score) in output:
   print("%s: %i"% (key, score))
sc.stop()

In the preceding code, we have added a reduceByKey() method, which reduces the value by adding the output values, and we have also defined the key as Total, so that all the scores are reduced based on a single key.

Let's try out the preceding code to get the overall sentiment of Jurassic World. Replace the <hostname> with your hostname, this should suffice:

$ ~/spark-1.3.0-bin-cdh4/bin/spark-submit --master spark://<hostname>:7077 ./BigData/spark_overall_sentiment.py hdfs://localhost:8020/tmp/jurassic_world/*

The output of the preceding command is shown as follows:

Total: 19

We can see that Spark has given an overall sentiment score of 19.

The applications that get executed on Spark can be viewed in the browser on the 8080 port of the Spark master. Here is a screenshot of it:

 leveraging-python-world-big-data-img-6

We can see that the number of nodes of Spark, applications that are getting executed currently, and the applications that have been executed.

Summary

In this article, you were introduced to big data, learned about how the Hadoop software works, and the architecture associated with it. You then learned how to create a mapper and a reducer for a MapReduce program, how to test it locally, and then put it into Hadoop and deploy it. You were then introduced to the Hadoopy library and using this library, you were able to put files into Hadoop. You also learned about Pig and how to create a user-defined function with it. Finally, you learned about Apache Spark, which is an alternative to MapReduce and how to use it to perform distributed computing.

With this article, we have come to an end in our journey, and you should be in a state to perform data science tasks with Python. From here on, you can participate in Kaggle Competitions at https://www.kaggle.com/ to improve your data science skills with real-world problems. This will fine-tune your skills and help understand how to solve analytical problems.

Also, you can sign up for the Andrew NG course on Machine Learning at https://www.coursera.org/learn/machine-learning to understand the nuances behind machine learning algorithms.

Resources for Article:


Further resources on this subject:


Modal Close icon
Modal Close icon