The two tools in most analysts/data scientists toolkits today are SQL and R. SQL to pull data from a database and R to reshape, visualize and ultimately analyze. For those of us who are working in the Hadoop world, which is at this point synonymous with Big Data, we have the option to write large, complicated MapReduce jobs in java to get our data out. Alternatively, we can use either of the low level languages written to help analysts and data scientists get data easily from Hadoop and go on with their lives. These languages are Pig Latin for Pig, and HQL for Hive. HQL is almost identical to MySQL. Hive takes an HQL query and translates it into a MapReduce job, making our lives easier. One of the cool things you can do with Hadoop is Hadoop Streaming. It makes it easier to write MapReduce jobs in any language on data stored in Hadoop. For those uninitiated with MapReduce, it’s an algorithm popularized by Google which processes line partitioned data in parallel using key values pairs and is infinitely scalable. It’s unlocked the capability for us to analyze huge datasets that years ago was not possible. There have been many tutorials made on how to write MapReduce jobs using Hadoop Streaming, including one or two where the Map and Reduce functions are written in R. Today I want to show readers how they can easily (I hope) write a MapReduce job in Hive, using HQL and plugging in Mappers and Reducers written in R – since these are the two languages most Data Analysts/Scientists use. I believe this is something many people are interested in doing and to my knowledge there aren’t any examples of how to do this anywhere online. I was motivated to do this because I was searching for a tutorial for myself but came up empty.
We’ll start with a very simple example. How simple? Well simple enough such that if you had a small dataset you could accomplish this with two lines of code in R. Also simple enough that if you data was too big, you could still crunch it using a simple SQL-like query in Hive. However, think of this as boilerplate code for your next cooler, more interesting application. Okay, let’s get to the data. I created a very small dataset – only 10,000 lines. It’s made up data about 100 different users and 50 ratings they made on two different sites, site A and site B. Site A takes ratings anywhere from 0 to 100, whereas site B only accepts ratings from 0 to 50. We know there are 3 different types of users (for each user I randomly pulled ratings from one of 3 different Gaussians) and ultimately want to create clusters using K-means or something similar to classify each one. To do so, we want to find each user’s mean rating, scaling ratings from site B so that they are comparable to site A. It’s a very easy problem. First step is to get the data. You can take it from my public folder on dropbox. (part of the setup.txt file you can find on github).
Put this is a directory where you interact with Hadoop (yes, having access to a Hadoop cluster with Hive installed is a pre-requisite here).
write.table(hive.df, file = '/directory/demo.txt', quote=F, sep = '\t', row.names=F, col.names =F)
Now send this data to your home directory in Hadoop.
hadoop dfs -mkdir /user/tabraham/demo hadoop dfs -put demo.txt /user/tabraham/demo/
If you’ve got this far you should be in good shape the rest of the way! Next, you want this data to be accessible via Hive, so create a hive table.
hive #(to open the hive command line interface)
DROP TABLE IF EXISTS my_hive_r_demo;
CREATE EXTERNAL TABLE my_hive_r_demo(users int, rating double, grp string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
Finally, check to see if it all worked. You should see the column headings and 10 lines of data:
set hive.cli.print.header=true; select * from my_hive_r_demo limit 10;
Next, we’ll write our Map and Reduce functions in R. map.R Remember MapReduce works on line partitioned data, so your function must look at lines. It’s not like reading data into R for analysis using read.table or read.delim. Next, you want to parse the fields within a line. In this case you have 3 fields, the userid, rating and type. We write a simple if/else statement to map field 2 (rating) to half its value if field 3 is A and do nothing if field 3 is B. This way we have all ratings in the range 0 50. After applying that logic we concatenate out the mapped fields, delimiting them by tabs and ending each line with a line break.
|1 2 3 4 5 6 7 8 9 10 11||
|1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22||
- Loaded our data into Hadoop and created a Hive table with it.
- Wrote a mapper in R to divide some of the ratings by 2 for site A.
- Wrote a reducer in R to collect each unique user’s ratings and then calculate its mean.
Now we want to run all this together. We first need to create a new table and give it a schema that our reduced data will fit in when we Insert Overwrite it. Then we add the two R functions (make sure all your files are in the same directory, or just git clone mine). Last, we’ll run this script that looks very similar to SQL.
|1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21||
Hopefully all these steps are pretty straightforward and takes away some of the mystique you might have regarding MapReduce, Hive, and Hadoop. For me, an easy example using languages I am familiar with is extremely helpful and allows me to iterate with ease to adapt it to harder problems. I encourage readers following this tutorial to edit this code to write your own jobs, share them in the comments section, and of course let me know if I screwed anything up. For the record, we easily could have written this whole job in HQL using one subquery:
hive -e 'select user, mean(mapped_rating) from (select user, (case when type = 'A' then rating/2 else rating end) as mapped_rating from my_hive_demo ) a group by user;'
Which would give the same result without bothering with any of this fancy streaming, but the syntax is what I wanted to stress here. Computing less trivial examples shouldn’t be much more difficult with this framework in place.