Hadoop Commands

# test code
cat testfile | ./mapper.py | sort | ./reducer.py

# run a job
hs mapper.py reducer.py input_folder output_folder

# view the results
hadoop fs -cat output_folder/part-00000 | less

# retrieve the results
hadoop fs -get output_folder/part-00000 results.txt

# delete a folder
hadoop fs -rm -r delete_folder

Filtering Exercise

In [1]:
#!/usr/bin/python
import sys
import csv

# To run this code on the actual data, please download the additional dataset.
# You can find instructions in the course materials (wiki) and in the instructor notes.
# There are some things in this data file that are different from what you saw
# in Lesson 3. The dataset is more complicated and closer to what you might
# see in the real world. It was generated by exporting data from a SQL database.
# 
# The data in at least one of the fields (the body field) can include newline
# characters, and all the fields are enclosed in double quotes. Therefore, we
# will need to process the data file in a way other than using split(","). To do this, 
# we have provided sample code for using the csv module of Python. Each 'line'
# will be a list that contains each field in sequential order.
# 
# In this exercise, we are interested in the field 'body' (which is the 5th field, 
# line[4]). The objective is to count the number of forum nodes where 'body' either 
# contains none of the three punctuation marks: period ('.'), exclamation point ('!'), 
# question mark ('?'), or else 'body' contains exactly one such punctuation mark as the 
# last character. There is no need to parse the HTML inside 'body'. Also, do not pay
# special attention to newline characters.

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)

    for line in reader:

        # YOUR CODE HERE
        if ('.' not in line[4][:-1] and '!' not in line[4][:-1] and '?' not in line[4][:-1]):
            writer.writerow(line)



test_text = """\"\"\t\"\"\t\"\"\t\"\"\t\"This is one sentence\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"Also one sentence!\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"Hey!\nTwo sentences!\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"One. Two! Three?\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"One Period. Two Sentences\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"Three\nlines, one sentence\n\"\t\"\"
"""

# This function allows you to test the mapper with the provided test string
def main():
    import StringIO
    sys.stdin = StringIO.StringIO(test_text)
    mapper()
    sys.stdin = sys.__stdin__

if __name__ == "__main__":
    main()
""	""	""	""	"This is one sentence"	""
""	""	""	""	"Also one sentence!"	""
""	""	""	""	"Three
lines, one sentence
"	""

Top 10

In [2]:
#!/usr/bin/python
"""
Your mapper function should print out 10 lines containing longest posts, sorted in
ascending order from shortest to longest.
Please do not use global variables and do not change the "main" function.
"""
import sys
import csv


def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)
    
    # simple approach: use a list to store the posts, with 0 being the longest and 9 being the shortest
    top10 = []

    for line in reader:

        # YOUR CODE HERE
        top10.append(line)
        if len(top10) > 10:
            top10.sort(key=lambda x: len(x[4]), reverse=True)
            del top10[10]
            
    for line in reversed(top10):
        writer.writerow(line)



test_text = """\"\"\t\"\"\t\"\"\t\"\"\t\"333\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"88888888\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"1\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"11111111111\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"1000000000\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"22\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"4444\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"666666\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"55555\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"999999999\"\t\"\"
\"\"\t\"\"\t\"\"\t\"\"\t\"7777777\"\t\"\"
"""

# This function allows you to test the mapper with the provided test string
def main():
    import StringIO
    sys.stdin = StringIO.StringIO(test_text)
    mapper()
    sys.stdin = sys.__stdin__

main()
""	""	""	""	"22"	""
""	""	""	""	"333"	""
""	""	""	""	"4444"	""
""	""	""	""	"55555"	""
""	""	""	""	"666666"	""
""	""	""	""	"7777777"	""
""	""	""	""	"88888888"	""
""	""	""	""	"999999999"	""
""	""	""	""	"1000000000"	""
""	""	""	""	"11111111111"	""

Inverted Index

Mapper

In [3]:
#!/usr/bin/python
import sys
import csv
import re

# split on white space and: .,!?:;"()<>[]#$=-/

reader = csv.reader(sys.stdin, delimiter='\t')
writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)

for line in reader:
    words = re.split('\s*[,.!?:;"()<>\[\]#$=\-/\s]+\s*',line[4].lstrip().lower()+' ')
    #print words
    for word in words[:-1]:
        #writer.writerow([word, line[0]])
        print word, '\t', line[0]

Reducer

In [4]:
#!/usr/bin/python

import sys

# the input is of the format: word \t node_id

old_word = None
count = 0
id_set = set()

for line in sys.stdin:
    data = line.strip().split("\t")
    if len(data) != 2:
        # Something has gone wrong. Skip this line.
        continue

    # same word --> add to the count
    if old_word and old_word == data[0]:
        count += 1
        id_set.add(data[1])
        
    # new word
    else:
        # not the first word
        if old_word:
            print old_word, '\t', count, '\t', sorted(id_set)
            
        # reset the old_word and the count
        old_word = data[0]
        count = 1
        id_set = set()

# the last word
if old_word != None:
    print old_word, '\t', count, '\t', sorted(id_set)

Finding Mean

Mapper

In [5]:
#!/usr/bin/python
import sys
from datetime import datetime

# weekday = datetime.strptime(date, '%Y-%m-%d').weekday()

for line in sys.stdin:
    data = line.strip().split('\t')
    if len(data) == 6:
        # print the weekday and the sale amount
        weekday = datetime.strptime(data[0], '%Y-%m-%d').weekday()
        print "{0}\t{1}".format(weekday, data[4])

Reducer

In [6]:
#!/usr/bin/python

import sys

# the input is of the format: day_of_week \t sale_amount

old_day = None
count = 0
sales = 0

for line in sys.stdin:
    data = line.strip().split("\t")
    if len(data) != 2:
        # Something has gone wrong. Skip this line.
        continue

    # same day --> add to the sales count
    if old_day and old_day == data[0]:
        count += 1
        sales += float(data[1])
        
    # new word
    else:
        # not the first word
        if old_day:
            print old_day, '\t', sales/count
            
        # reset the old_word and the count
        old_day = data[0]
        count = 1
        sales = float(data[1])

# the last word
if old_day != None:
    print old_day, '\t', sales/count

Combiners

Mapper

In [7]:
#!/usr/bin/python
import sys
from datetime import datetime

# weekday = datetime.strptime(date, '%Y-%m-%d').weekday()

for line in sys.stdin:
    data = line.strip().split('\t')
    if len(data) == 6:
        # print the weekday and the sale amount
        weekday = datetime.strptime(data[0], '%Y-%m-%d').weekday()
        print "{0}\t{1}".format(weekday, data[4])

Reducer & Combiner

In [8]:
#!/usr/bin/python

import sys

# the input is of the format: day_of_week \t sale_amount

old_day = None
sales = 0

for line in sys.stdin:
    data = line.strip().split("\t")
    if len(data) != 2:
        # Something has gone wrong. Skip this line.
        continue

    # same day --> add to the sales
    if old_day and old_day == data[0]:
        sales += float(data[1])
        
    # new word
    else:
        # not the first word
        if old_day:
            print old_day, '\t', sales
            
        # reset the old_word and the sales
        old_day = data[0]
        sales = float(data[1])

# the last word
if old_day != None:
    print old_day, '\t', sales

Combine Datasets

Mapper

In [9]:
#!/usr/bin/python
"""
Your goal for this task is to write mapper and reducer code 
that will combine some of the forum and user data. 
In relational algebra, this is known as a join operation. 
At the moment, this is not an auto-gradable exercise, but instructions below are given on how to test your code on your machine. 

The goal is to have the output from the reducer with the following fields for each forum post: 

"id"  "title"  "tagnames"  "author_id"  "node_type"  "parent_id"  "abs_parent_id"  "added_at" 
"score"  "reputation"  "gold"  "silver"  "bronze"

FROM forum_node: 0,1,2,3, 5,6,7,8,9
    "id"	"title"	"tagnames"	"author_id"	"BODY"	"node_type"	"parent_id"	"abs_parent_id"	"added_at"	
    "score"

FROM forum_users:
    "user_ptr_id"	"reputation"	"gold"	"silver"	"bronze"

Note that for each post we have taken some of the information describing the post, 
and joined it with user information. The body of the post is not included in the final output. 
The reason is that it is difficult to handle a multiline body, as it might be split on separate 
lines during the intermediate steps Hadoop performs - shuffle and sort.   

Your mapper code should take in records from both forum_node and forum_users. 
It needs to keep, for each record, those fields that are needed for the final output given above. 
In addition, mapper needs to add some text (e.g. "A" and "B") to mark where each output comes from 
(user information vs forum post information). Example output from mapper is:

"12345"  "A"  "11"  "3"  "4"  "1"
"12345"  "B"   "6336" "Unit 1: Same Value Q"  "cs101 value same"  "question"  "\N"  "\N"  "2012-02-25 08:09:06.787181+00"  "1" 
  
The first line originally comes from the forum_users input. It is from a student with user id: 12345 - the mapper key. 
The next field is the marker A specifying that the record comes from forum_users. 
What follows is the remaining information user information. 

The second line originally comes from the forum_node input. 
It also starts with the student id (mapper key) followed by a marker B and the information about the forum post.  
   
The mapper key for both types of records is the student ID: 
"user_ptr_id" from "forum_users" or  "author_id" from "forum_nodes" file. 
Remember that during the sort and shuffle phases records will be grouped based on the student ID (12345 in our example). 
You can use that to process and join the records appropriately in the reduce phase. 
"""

import sys
import csv

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)

    for line in reader:
        # user data
        if len(line)==5:
            if line[0] != 'user_ptr_id':
                writer.writerow(line[0:1]+['A']+line[1:])
            
        # post data
        else:
            if line[0] != 'id':
                writer.writerow(line[0:1]+['B']+line[1:4]+line[5:10])
        
if __name__=='__main__':
    mapper()

Reducer

In [10]:
#!/usr/bin/python
# Here you will be able to combine the values that come from 2 sources
# Value that starts with A will be the user data
# Values that start with B will be forum node data

'''
FROM forum_node: 0,1,2,3, 5,6,7,8,9
    "id"	"B" "title"	"tagnames"	"author_id"	"node_type"	"parent_id"	"abs_parent_id"	"added_at"	"score"

FROM forum_users:
    "user_ptr_id"	"A" "reputation"	"gold"	"silver"	"bronze"
'''

import sys
import csv

def reducer():
    writer = csv.writer(sys.stdout, delimiter='\t', quotechar='"', quoting=csv.QUOTE_ALL)
    user = []
    
    for line in sys.stdin:
        data = line.strip().split('\t')
        if len(data) != 6 and len(data) != 10:
            # something is wrong
            pass
            
        # new user
        if user and user[0] != data[0]:
            user = data
            continue
            
        # posts by the user
        writer.writerow(data[0:1]+data[2:]+user[2:])
        
        
if __name__=='__main__':
    reducer()