Tim's Prior

Updated whenever I learn new stuff

  • About
  • Pickles And Data

Streaming Hive with R: An easy MapReduce Tutorial

September 10

The two tools in most analysts/data scientists toolkits today are SQL and R. SQL to pull data from a database and R to reshape, visualize and ultimately analyze. For those of us who are working in the Hadoop world, which is at this point synonymous with Big Data, we have the option to write large, complicated MapReduce jobs in java to get our data out. Alternatively, we can use either of the low level languages written to help analysts and data scientists get data easily from Hadoop and go on with their lives. These languages are Pig Latin for Pig, and HQL for Hive. HQL is almost identical to MySQL. Hive takes an HQL query and translates it into a MapReduce job, making our lives easier. One of the cool things you can do with Hadoop is Hadoop Streaming. It makes it easier to write MapReduce jobs in any language on data stored in Hadoop. For those uninitiated with MapReduce, it’s an algorithm popularized by Google which processes line partitioned data in parallel using key values pairs and is infinitely scalable. It’s unlocked the capability for us to analyze huge datasets that years ago was not possible. There have been many tutorials made on how to write MapReduce jobs using Hadoop Streaming, including one or two where the Map and Reduce functions are written in R. Today I want to show readers how they can easily (I hope) write a MapReduce job in Hive, using HQL and plugging in Mappers and Reducers written in R – since these are the two languages most Data Analysts/Scientists use.  I believe this is something many people are interested in doing and to my knowledge there aren’t any examples of how to do this anywhere online. I was motivated to do this because I was searching for a tutorial for myself but came up empty.

We’ll start with a very simple example. How simple? Well simple enough such that if you had a small dataset you could accomplish this with two lines of code in R. Also simple enough that if you data was too big, you could still crunch it using a simple SQL-like query in Hive. However, think of this as boilerplate code for your next cooler, more interesting application. Okay, let’s get to the data. I created a very small dataset – only 10,000 lines. It’s made up data about 100 different users and 50 ratings they made on two different sites, site A and site B. Site A takes ratings anywhere from 0 to 100, whereas site B only accepts ratings from 0 to 50. We know there are 3 different types of users (for each user I randomly pulled ratings from one of 3 different Gaussians) and ultimately want to create clusters using K-means or something similar to classify each one. To do so, we want to find each user’s mean rating, scaling ratings from site B so that they are comparable to site A. It’s a very easy problem. First step is to get the data. You can take it from my public folder on dropbox. (part of the setup.txt file you can find on github).

load(url('http://dl.dropbox.com/u/6132890/hive_df.Rdata'))

Put this is a directory where you interact with Hadoop (yes, having access to a Hadoop cluster with Hive installed is a pre-requisite here).

write.table(hive.df, file = '/directory/demo.txt', quote=F, sep = '\t', row.names=F, col.names =F)

Now send this data to your home directory in Hadoop.

hadoop dfs -mkdir /user/tabraham/demo hadoop dfs -put demo.txt /user/tabraham/demo/

If you’ve got this far you should be in good shape the rest of the way! Next, you want this data to be accessible via Hive, so create a hive table.

hive #(to open the hive command line interface) 
DROP TABLE IF EXISTS my_hive_r_demo; 
CREATE EXTERNAL TABLE my_hive_r_demo(users int, rating double, grp string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE 
LOCATION '/user/tabraham/demo/';

Finally, check to see if it all worked. You should see the column headings and 10 lines of data:

set hive.cli.print.header=true; select * from my_hive_r_demo limit 10;

Next, we’ll write our Map and Reduce functions in R. map.R Remember MapReduce works on line partitioned data, so your function must look at lines. It’s not like reading data into R for analysis using read.table or read.delim. Next, you want to parse the fields within a line. In this case you have 3 fields, the userid, rating and type. We write a simple if/else statement to map field 2 (rating) to half its value if field 3 is A and do nothing if field 3 is B. This way we have all ratings in the range 0 50. After applying that logic we concatenate out the mapped fields, delimiting them by tabs and ending each line with a line break.

1 2 3 4 5 6 7 8 9 10 11
#! /usr/bin/env Rscript
 
con <-file('stdin', open = 'r')
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
fields <- unlist(strsplit(line, '\t'))
if (identical(fields[[3]], 'A')) {
rating<-as.numeric(fields[[2]])/2}
else {
rating<-as.numeric(fields[[2]])}
cat(fields[[1]], '\t', rating, '\t', fields[[3]], '\n')
}
view raw map.R This Gist brought to you by GitHub.
reduce.R So the map stage was pretty straightforward. We just streamed through the lines in the Hive table, and used the identity mapper on the userid and the site type. For the rating, we applied a simple if/else function to map some of the values to half their original values. Now, for the reduce step we want to take each user and calculate her mean rating value. To do so, we need to name a key. In this case, the userid is the key, since for each userid we want to take all the mapped rating values and collect them. Then we want to apply the mean function to those collected values. The reduce function will look at each line, determine if the key is it is the same as the current key it’s reducing. If it is, it’s going to take the rating and add it to a vector which contain’s all the ratings for that particular user. To ensure our keys are in order before passing them to the reducer, we cluster them as you’ll see in the actual HQL script.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#! /usr/bin/env Rscript
 
con <-file('stdin', open = 'r')
means <- numeric(0)
lastKey <- ""
while(length(line<-readLines(con, n = 1, warn = FALSE))>0) {
fields <- unlist(strsplit(line, '\t'))
key <- fields[[1]]
user.rating <- as.numeric(fields[[2]])
 
if (!(identical(lastKey, "")) & (!(identical(lastKey, key)))) {
cat(lastKey, '\t', (mean(means)), '\n')
lastKey <- key
means <-c(user.rating)
} else {
lastKey <- key
means <- c(means, user.rating)
}
}
cat(paste(lastKey, '\t', mean(means), '\n'))
 
close(con)
view raw reduce.R This Gist brought to you by GitHub.
Putting It all TogetherOK, to summarize what we’ve done so far:

  • Loaded our data into Hadoop and created a Hive table with it.
  • Wrote a mapper in R to divide some of the ratings by 2 for site A.
  • Wrote a reducer in R to collect each unique user’s ratings and then calculate its mean.

Now we want to run all this together. We first need to create a new table and give it a schema that our reduced data will fit in when we Insert Overwrite it. Then we add the two R functions (make sure all your files are in the same directory, or just git clone mine). Last, we’ll run this script that looks very similar to SQL.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
DROP TABLE IF EXISTS tim_hive_r_demo_new;
CREATE TABLE tim_hive_r_demo_new (
users double,
rating double)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
 
add FILE map.R;
add FILE reduce.R;
FROM
(
map users, rating, grp
using 'map.R'
as users, rating, grp
FROM my_hive_r_demo
cluster by users
) a
INSERT OVERWRITE TABLE tim_hive_r_demo_new
reduce a.users, a.rating
USING 'reduce.R'
AS users, rating;
view raw MapReduce.sql This Gist brought to you by GitHub.
Doing a “hive -f” on this script will start the MapReduce job and write the new data to the specified table. You can then run your k-means clustering on it or whatever you like on the crunched data set. Things scale nicely in Hadoop, so you could use this on any size dataset, whereas you might hit a wall in R using the ‘aggregate()’ function or something from plyr.

Hopefully all these steps are pretty straightforward and takes away some of the mystique you might have regarding MapReduce, Hive, and Hadoop. For me, an easy example using languages I am familiar with is extremely helpful and allows me to iterate with ease to adapt it to harder problems. I encourage readers following this tutorial to edit this code to write your own jobs, share them in the comments section, and of course let me know if I screwed anything up. For the record, we easily could have written this whole job in HQL using one subquery:

hive -e 'select user, mean(mapped_rating) from (select user, (case when type = 'A' then rating/2 else rating end) as mapped_rating from my_hive_demo ) a group by user;'

Which would give the same result without bothering with any of this fancy streaming, but the syntax is what I wanted to stress here. Computing less trivial examples shouldn’t be much more difficult with this framework in place.

This Entry Was Written by timmy

Post a Comment

You must be logged in to post a comment.

  • Blogroll

    • Oakland Urban Dining Guide
    • R – Bloggers

Designed and Built by Industrias Don Gambas S.A