Introduction

Big Data

Big data is a term used to refer data that is so large and complex that it cannot be stored or processed using traditional database software.

Big Data Analytics

Big data analytics is the process of analyzing big data to discover useful information.

Problems with Big Data

  • Storing exponentially growing large volumes of data (E.g., Social media data)
  • Processing data having complex structure (unstructured data)
  • Processing data faster

Apache Hadoop

Apache Hadoop is an open source framework that allows us to store and process big data in parallel and distributed fashion on computer clusters.

Hadoop has three main components.

  • Hadoop Distributed File System (HDFS – storage)

    It allows us to store any kind of data across the cluster.

  • Map / Reduce (Processing)

    It allows parallel processing of the data stored in HDFS.

  • YARN

    It is the resource management and job scheduling technology for distributed processing.


Running Map / Reduce Programs in Java

Hadoop is written originally to support Map / Reduce programming in Java language but it allows writing Map/Reduce programs in any language using an API called Hadoop Streaming. Let’s test some hadoop Map / Reduce programs on the Data Science Unit (DSU) cluster.

Word Count Program (Java)

It reads a text file and counts how often words occur.

Let’s run a wordcount.java program using the DSU-Hadoop Cluster


Steps to run the program are given below:

  • Get a text file (sample_text.txt) and save it on the Desktop

  • Put the file in the HDFS.

    dsu@master: ~$hadoop fs –put /home/dsu/Desktop/sample_text.txt /user/WordCount

    Here, /user/WordCount is the directory in HDFS where your file will be stored.

  • Create a jar file for wordcount.java and save it on the Desktop

    There are several ways to create a jar file (E.g. Using Eclipse). You may find details online for creating a jar file.

  • Run the program

    dsu@master:~$hadoop jar Desktop/wordcount.jar /user/sample_text.txt /user/WordCount

    The output will be stored inside “/user/WordCount” directory.

  • View the output

    dsu@master:~$hadoop fs –cat /WordCount/part-r-00000

    By default, Hadoop names the output file as part-r-00000.


The word count program for java is given below.


package wordcount;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

public class WordCount {
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
   	 
   	 public void map(LongWritable key, Text value, Context context) throws
   	 IOException, InterruptedException
   	 {
   		 String line = value.toString();
   		 StringTokenizer tokenizer = new StringTokenizer(line);
   		 while (tokenizer.hasMoreTokens())
   		 {
   			 value.set(tokenizer.nextToken());
   			 context.write(value, new IntWritable(1));
   		 }
   	 }
    }
    
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
   	 
   	 public void reduce (Text key, Iterable<IntWritable> values, Context context)
   	 throws IOException, InterruptedException {
   		 int sum=0;
   		 for (IntWritable x: values)
   		 {
   			 sum += x.get();
   		 }
   		 context.write(key, new IntWritable(sum));
   	 }
    }
    
    public static void main (String [] args) throws Exception {
   	 
   	 //Reads the default configuration of cluster from the configuration xml files
   	 Configuration conf = new Configuration();
   	 
   	 //Initializing the job with the default configuration of the cluster
   	 Job job = Job.getInstance(conf, "WordCount");
   	 
   	 //Assigning the driver class name
   	 job.setJarByClass(WordCount.class);
   	 
   	 
   	 //Defining the mapper class name
   	 job.setMapperClass(Map.class);
   	 
   	 //Defining the reducer class name
   	 job.setReducerClass(Reduce.class);
   	 
   	 //Key type coming out of mapper
   	 job.setMapOutputKeyClass(Text.class);
   	 
   	 //value type coming out of mapper
   	 job.setMapOutputValueClass(IntWritable.class);
   	 
   	 //Defining input Format class which is responsible to parse the dataset into a key value pair
   	 job.setInputFormatClass(TextInputFormat.class);
   	 
   	 //Defining output Format class which is responsible to parse the dataset into a key value pair
   	 job.setOutputFormatClass(TextOutputFormat.class);
   	 
   	 //Setting the second argument as a path in a path variable
   	 Path OutputPath = new Path(args[1]);
   	 
   	 //Configuring the input path from the file system into the job
   	 FileInputFormat.addInputPath(job,  new Path(args[0]));
   	 
   	 //Configuring the output path from the file system into the job
   	 FileOutputFormat.setOutputPath(job,  new Path(args[1]));
   	 
   	 //Deleting the context path automatically from hdfs so that we don't have to delete it explicitly
   	 OutputPath.getFileSystem(conf).delete(OutputPath, true);
   	 
   	 //Exiting the job only if the flag value becomes false
   	 System.exit(job.waitForCompletion(true) ? 0:1);
   	
    }

Hadoop Streaming

Hadoop Streaming is a utility that comes with the Hadoop distribution. It allows us to write Map / Reduce programs in any language


Running Map / Reduce Programs in Python using Hadoop Streaming

Let’s code the word count program in Python and run it using Hadoop Streaming.


Mapper.py

#!/usr/bin/env python
import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    
    for word in words:
        print '%s\t%s' % (word,1)

Reducer.py

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

In the previous example, we uploaded the sample text file to the HDFS. Save the mapper.py and reducer.py in a folder (E.g., /home/dsu/Desktop/mapper.py, /home/dsu/Desktop/reducer.py).

Run the program as follows.

dsu@master:~$ hadoop jar /home/dsu/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar –file /home/dsu/Desktop/mapper.py –mapper mapper.py –file /home/dsu/Desktop/reducer.py –reducer reducer.py –input /user/WordCount/sample_text.txt –output /user/WordCount/wordcount_py

The output will be stored as part-0000 by default inside /user/WordCount/wordcount_python directory in HDFS.

You can view the output as follows.

dsu@master:~$hadoop fs –cat /user/WordCount/wordcount_python/part-00000

Even though Hadoop streaming is a standard way to code map-reduce programs in languages other than Java, some other APIs such as Mrjob, Hadoopy and Pydoop also support Python map-reduce programming as well.


Running Map / Reduce Programs in R using Hadoop Streaming

In this section, we will code the above “word count” program in R using Hadoop Streaming. You also can use an API called RHadoop to code map-reduce programs in R.


Mapper.R

#! /usr/bin/env Rscript
# mapper.R - Wordcount program in R
# script for Mapper (R-Hadoop integration)
 
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+"))
 
con <- file("stdin", open = "r")
 
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    words <- splitIntoWords(line)
 
    for (w in words)
        cat(w, "\t1\n", sep="")
}
close(con)

Reducer.R

#! /usr/bin/env Rscript
# reducer.R - Wordcount program in R
# script for Reducer (R-Hadoop integration)
 
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
 
splitLine <- function(line) {
    val <- unlist(strsplit(line, "\t"))
    list(word = val[1], count = as.integer(val[2]))
}
 
env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
 
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    split <- splitLine(line)
    word <- split$word
    count <- split$count
 
    if (exists(word, envir = env, inherits = FALSE)) {
        oldcount <- get(word, envir = env)
        assign(word, oldcount + count, envir = env)
    }
    else assign(word, count, envir = env)
}
close(con)
 
for (w in ls(env, all = TRUE))
    cat(w, "\t", get(w, envir = env), "\n", sep = "")

Run the code as follows.

dsu@master:~$ hadoop jar /home/dsu/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.1.0.jar –file /home/dsu/Desktop/mapper.R –mapper mapper.R –file /home/dsu/Desktop/reducer.R –reducer reducer.R –input /user/WordCount/sample_text.txt –output /user/WordCount/wordcount_R

You can view the output as we explained earlier.


Hadoop Ecosystem

The Hadoop ecosystem includes other tools to address particular needs. As of now, DSU Cluster accommodates Apache Pig, Apache Hive and Apache Flume.


Apache Pig

Apache Pig provides an alternative way to write map-reduce programs in a more simplified way using a language called pig-latin. Non- programmers can easily learn this language and code map-reduce programs in pig-latin. Two hundred lines of Java code can be replaced by 10 lines of pig-latin scripts. Pig includes inbuilt operations such as join, group, filter, sort and more so that we don’t have to write code for those operations. Pig sits on the top of Hadoop and uses Pig Engine to convert pig-latin scripts into map-reduce jobs before execution

Let’s code the “word count” program in Pig (word_count.pig).

lines = LOAD '/input_file.txt' AS (line:chararray);
words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word;
grouped = GROUP words BY word;
wordcount = FOREACH grouped GENERATE group, COUNT(words);
DUMP wordcount;

Command to execute it.

dsu@master:~$pig word_count.pig

The output will be displayed on the terminal.


Apache Hive

Apache Hive is a data warehousing package built on top of hadoop for data analysis. Hive is targeted towards users comfortable with Structured Query Language (SQL). Hive uses a language similar to SQL called HiveQL. Hive abstracts the complexity of Hadoop so users do not need to write map-reduce programs. Hive queries will be automatically converted into map-reduce jobs.

SQL is designed for tables that reside in a single machine. In HDFS, table data is distributed across multiple machines. HiveQL is used to analyze data in distributed storage.

Hive is read-based and therefore not applicable for transaction processing that typically involves a higher percentage of write operations (e.g., ATM transactions). It is also not suitable for applications that need very fast response times as hadoop is intended for long sequential scans.

The following Demos use Pig and Hive.


Demo 1: Apache Hive

Load data from Local File System to HDFS and HDFS to Hive Table.


Step 1: Creating a table in Hive

Step 2: Copying the data from local file system to HDFS

Step 3: Loading the data from HDFS to Hive table


These steps are demonstrated in detail.


Step 1: Creating a table in Hive

(Inside Hive Shell)

hive> create database hive_demo;


List down all the available databases.

hive> show databases;


Use the newly created database

hive> use hive_demo;

Create a table inside the newly created database (hive_demo).


hive> create table user_details (
    > user_id int,
    > name string,
    > age int,
    > country string,
    > gender string
    > ) row format delimited field terminated by ','
    > stored as textfile;

Check whether the table is created using the following commands.

hive> show tables;

hive> desc user_details;


Step 2: Copying the data from local file system to HDFS

hadoop fs -put /home/dsu/Desktop/dsu_review/userdata /


Step 3: Loading the data from HDFS to Hive table

hive> load data inpath '/userdata' into table user_details;


Now we can analyse the data using Hive Query Language.

hive> select * from user_details;

hive> select name from user_details where age >= 18;


Demo 2: Clickstream Analysis using Apache Hive and Apache Pig

Our task is to list down the top 3 websites that are viewed by the people under16.


User Details (table_name = "user_details") Website Visited (table name = clickstream)
1,Dilan,20,SL,M 1,www.bbc.com
2,Nisha,15,SL,F 1,www.facebook.com
3,Vani,14,SL,F 1,www.gmail.com
4,Steno,12,SL,F 2,www.cnn.com
5,Ajith,17,SL,M 2,www.facebook.com
  2,www.gmail.com
  2,www.stackoverflow.com
  2,www.bbc.com
  3,www.facebook.com
  3,www.stackoverflow.com
  3,www.gmail.com
  3,www.cnn.com
  4,www.facebook.com
  4,www.abc.com
  4,www.stackoverflow.com
  5,www.gmail.com
  5,www.stackoverflow.com
  5,www.facebook.com

Here, there are 2 tables (user_details & clickstream).

Clickstream table lists down the websites viewed by each user.

User_details table is created in the previous demo.

We have to create clickstream table.

Please follow the steps.


Here, there are 2 tables (user_details & clickstream).

Clickstream table lists down the websites viewed by each user.

User_details table is created in the previous demo.

We have to create clickstream table.

Please follow the steps.

hive> create table clickstream (userid int, url string)
    > row format delimited fields terminated by ','
    > stored as textfile;

hive> show tables;
    > desc clickstream;

$hadoop fs -put /home/dsu/Desktop/dsu_review/clickstream /

hive> load data inpath '/clickstream' into table clickstream;

hive> select * from clickstream;

Task 1: Analyze the data using Hive

hive> select url, count(url) c from user_details u JOIN clickstream c ON (u.user_id=c.userid) where u.age<16 group by url order by c DESC limit 3;

Answer:

www.stackoverflow.com	3
www.facebook.com	3
www.gmail.com	2

Task 2: Analyze the data using Apache Pig

Load user_details file to HDFS.

$hadoop fs -put /home/dsu/Desktop/dsu_review/userdata /

$hadoop fs -put /home/dsu/Desktop/dsu_review/clickstream /


Program (pig_demo.pig)

users = load '/userdata' using PigStorage(',') as (user_id, name, age:int, country, gender);
filtered = filter users by age <= 16;
pages = load '/clickstream' using PigStorage(',') as (userid, url);
joined = join filtered by user_id, pages by userid;
grouped = group joined by url;
summed = foreach grouped generate group , COUNT(joined) as clicks;
sorted = order summed by clicks desc;
top3 = limit sorted 3;
dump top3;

Command to execute it.

$cd /home/dsu/Desktop/dsu_review

$pig pig_demo.pig


Answer:

(www.facebook.com,3)
(www.stackoverflow.com,3)
(www.gmail.com,2)

Apache Flume

Apache Flume is an ecosystem tool used for streaming the log files from applications into HDFS. E.g. downloading tweets and storing them in HDFS


Demo 3: Downloading Tweets from Twitter using Apache Flume

Steps in brief:

Step 1: Create a Twitter application from apps.twitter.com.


Step 2: Get the following credentials.

Consumer key

Consumer key (secret)

Access Token

Access Token Secret


Step 3: Prepare a configuration file to download tweets

Provide all the necessary details and the type of tweets that you want to download.

Provide all the necessary details and the type of tweets that you want to download.


Step 4: Run the configuration file

flume/bin$ ./flume-ng agent -n TwitterAgent -c conf -f ../conf/twitter.conf -Dtwitter4j.http.proxyHost=cachex.pdn.ac.lk -Dtwitter4j.http.proxyPort=3128

It will download tweets and store it in HDFS.


Apache HBase


SQL vs NoSQL Databases

SQL - Structured Query Language

NOSQL - Not Only SQL


Relational Database Management Systems (RDBMS) use SQL. Simple SQL queries (e.g. select * from [table name] are used to retieve data. RDBMS do not incorporate distributed storage (store data across multiple computers).

NoSQL databases address distributed storage. They do not have the SQL interface.


What is HBase?

HBase is a NoSQL database built on top of Hadoop Distributed File System (HDFS). Data is stored as key, value pairs. Both key and value are stored in byte arrays


Difference between HBase and RDBMS

 

HBASE RDBMS
Column-oriented Row-oriented (mostly)
Flexible Schema, add columns dynamically Fixed Schema
Good with sparse tables Not optimized for sparse tables (too many null values)
Joins using Mapreduce – not optimized Optimized for joins
Leverages batch processing with Mapreduce distributed processing No
Good for semi-structured data as well as structured data Good for structured data
Scales linearly and automatically with new nodes Usually scales vertically by adding more hardware resources

When to use HBase?
  • Processing huge data (in Terabytes / Petabytes usually)
  • Column-oriented data
  • Unstructured data
  • High throughput (millions of inputs/transactions per second)
  • Variable columns
  • Versioned data – stores multiple versions of data using timestamps
  • Need random reads and writes
  • Generating data from Mapreduce workflow

Difference between HBase and Hive

 

HBASE HIVE
Well suited for CRUD (Create, Read, Update, Delete) Not well suited for CRUD
Maintain versions Not supported
Less support for aggregations. E.g. find max/min/avg of a column Good support for aggregations
No table joins Supports table joins
Look up is very fast so that read activity is very fast Not fast

Two ways to use HBase
  • Using Command Line called HBase shell (create table, update data, etc)
  • Common way – interact with HBase through Java programs

HBase has a Java API. It is the only first class citizen. There are other programmatic interfaces (e.g. REST API) as well.

HBase does not have an inbuilt SQL interface. There are non-native SQL interfaces available for HBase (e.g. Apache Phoenix, Impala, Presto, & Hive).


HBase Demo using DSU Cluster


We are going to demonstrate how to create a table, store, update, retrieve & delete data, and drop a table using both of the above mentioned ways.

The following table will be used for the demonstration.

Employee (Table Name)
Personal (column family) Professional (column family)
eid name gender experience salary
'eid01' 'Rahul' 'male' 5 80000
'eid02' 'Priya' 'female' 2 50000

Column-family is used to group set of columns together. Column-family is not available in RDBMS.


CRUD using HBase Shell

You need to connect to the DSU master node and type ‘hbase shell’ in a terminal. It will open the hbase shell.

dsu@master:~$ hbase shell

hbase(main)>


Create a table

When creating a table, we only need to provide table-name and column-families. Column names are not required.


hbase(main)>create ‘table-name’, ‘column-family-1’, ‘column-family-2’,....’column-family-n’

hbase(main)> create 'employee', 'personal', 'professional'

To check whether the table is created, use ‘list’ command. ‘List’ command will display all the tables that are present in HBase.

hbase(main)>list


Insert Values

‘Put’ command is used to insert data.

Column names are specified when values are inserted.

Hbase(main)> put ‘table-name’, ‘row-id’, ‘column-family:column-name’, ‘value’


hbase(main)> put 'employee', 'eid01', 'personal:name', 'Rahul'

hbase(main)> put 'employee', 'eid01', 'personal:gender', 'male'

hbase(main)> put 'employee', 'eid01', 'professional:experience', '5'

hbase(main)> put 'employee', 'eid01', 'professional:salary', '80000'

hbase(main)> put 'employee', 'eid02', 'personal:name', 'Priya’'

hbase(main)> put 'employee', 'eid02', 'personal:gender', 'female'

hbase(main)> put 'employee', 'eid02', 'professional:experience', '2'

hbase(main)> put 'employee', 'eid02', 'professional:salary', '50000'


Scan the entire records

Hbase(main)> scan ‘table-name’

hbase(main)> scan ‘employee’


Get Values

Get all the records of ‘eid01’

hbase(main)> get 'employee', 'eid01', 'personal'


Get the personal details of ‘eid01’

hbase(main)> get 'employee', 'eid01', 'personal'


Get the name of ‘eid01’

hbase(main)> get ‘employee’, ‘eid01’, ‘personal:name’


Update a value

‘Put’ command performs something called as upsert (update or insert).

If row_id exists, content will be updated.

If the row_id does not exist, it will be inserted.


Let’s make ‘Rahul’ to ‘Rajiv’ and run the code. It will be updated.

hbase(main)> put 'employee', 'eid01', 'personal:name', 'Rajiv'


Delete value

Delete a particular column from a column family.

hbase(main)> delete ‘employee’, ‘eid01’, ‘personal:gender’


Drop a Table

Disable the table first and then delete it.

hbase(main)> disable ‘employee’

hbase(main)> drop ‘employee’


Now, we will do the above operations using Java API.


CRUD using Java API


The demonstration is illustrated using Eclipse – an integrated development environment for Java.

Eclipse is installed in DSU/HBase master node.

Please follow these steps carefully.


Step 1: Create a new Java project in Eclipse, name it as you wish and click ‘Finish’.

Step 2: Add the External Jars from /dsu/home/hbase-1.48/lib.

To do that, Right click the folder you created → ‘Build Path’ → ‘Configure Build Path’ → ‘Libraries’ → Add External jars → select all → apply and close.

These jars contain the hbase classes.

Some classes and their functionalities are listed below.

Class Functionalities
HBaseAdmin Create table, checks if table exists, disable a specific table, drop a table
HTableDescriptor Responsible for handling tables
HColumnDescriptor Handles column families
HBaseConfiguration Identifies the HBase configurations
HTable this class is responsible for interacting with any HBase table or DML operations. It has several methods – get, put, delete

Now you are ready to write HBase programs.


Create a table

Create a java class ‘HBaseDDL.java’ inside src. You can use whatever class name you want.


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseDDL {
	
	public static void main(String[] args) throws IOException{
		// create a configuration object
		// it will store all the default configuration of hbase
		Configuration conf = HBaseConfiguration.create();
		
		HBaseAdmin admin = new HBaseAdmin(conf);
		
		HTableDescriptor des = new HTableDescriptor(Bytes.toBytes("employee"));
		
		des.addFamily(new HColumnDescriptor("personal"));
		des.addFamily(new HColumnDescriptor("professional"));
		
		if(admin.tableExists("employee")) {
			System.out.println("Table Already exists!");
			admin.disableTable("employee");
			admin.deleteTable("employee");
			System.out.println("Table: employee deleted");
		}
		
		admin.createTable(des);
		System.out.println("Table: employee successfully created");
	}

}

Insert Values

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class PutDemo {
	public static void main(String[] args)
	{
		Configuration conf = HBaseConfiguration.create();
		
		try {
			HTable table = new HTable(conf, "employee");
			
			Put put = new Put(Bytes.toBytes("eid01"));
			put.add(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("Rahul"));
			put.add(Bytes.toBytes("professional"), Bytes.toBytes("exp"), Bytes.toBytes("4"));
			table.put(put);
			System.out.println("inserted record eid01 to table employee ok.");
			
			put = new Put(Bytes.toBytes("eid02"));
			put.add(Bytes.toBytes("personal"), Bytes.toBytes("name"), Bytes.toBytes("Suraj"));
			put.add(Bytes.toBytes("professional"), Bytes.toBytes("exp"), Bytes.toBytes("2"));
			table.put(put);
			System.out.println("inserted record eid02 to table employee ok.");
			
		}
		
		catch (IOException e)
		{
			e.printStackTrace();
		}
		
	}

}

Update a Value

In the about PutDemo class, change the name of ‘eid01’ from ‘Rahul’ to ‘Rajiv’ and rerun it again. It will be updated.


Get Values

 

import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

public class GetDemo {
	public static void main(String [] args) throws IOException {
		Configuration conf = HBaseConfiguration.create();
		HTable table = new HTable(conf, "employee");
		Get get = new Get(Bytes.toBytes("eid01"));
		Result rs = table.get(get);
		
		for(KeyValue kv: rs.raw()) {
			System.out.print(new String(kv.getRow()) + " ");
			System.out.print(new String(kv.getFamily()) + ":");
			System.out.print(new String(kv.getQualifier()) + " ");
			System.out.print(kv.getTimestamp() + " ");
			System.out.println(new String(kv.getValue()));
		}
	}

}

Delete a Row

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.util.Bytes;

public class DeleteDemo {
	public static void main(String[] args) throws IOException {
		Configuration conf = HBaseConfiguration.create();
		HTable table = new HTable(conf, "employee");
		Delete del = new Delete(Bytes.toBytes("eid01"));
		table.delete(del);
		System.out.println("Row eid01 deleted");
	}

}

Scan the entire records

 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;

public class ScanDemo {
	public static void main(String[] args) throws IOException {
		Configuration conf = HBaseConfiguration.create();
		HTable table = new HTable(conf, "employee");
		Scan sc = new Scan();
		ResultScanner rs = table.getScanner(sc);
		System.out.println("Get all records\n");
		
		for(Result r:rs) {
			for(KeyValue kv: r.raw()) {
				System.out.print(new String(kv.getRow()) + " ");
				System.out.print(new String(kv.getFamily()) + ":");
				System.out.print(new String(kv.getQualifier()) + " ");
				System.out.print(kv.getTimestamp() + " ");
				System.out.println(new String(kv.getValue()));
				
			}
		}
	}

}

Apache Sqoop

Apache Sqoop is a tool in Hadoop ecosystem that is designed for effectively transferring data between Hadoop and Relational Database Management Systems (RDBMS) such as MySQL, Microsoft SQL Server, Oracle DB, and others.

Sqoop addresses the difficulty of writing mapreduce programs to transfer data between Hadoop and RDBMS. When a sqoop command is executed, sqoop internally generates mapreduce code.

SQOOP = [SQ]L + HAD[OOP]

Sqoop Demo

MySQL database is used for the following demonstration. An overview of the demonstration is listed below.

  1. Creating a table in MySQL
  2. Connect to MySQL via SQOOP
  3. Import data from MySQL into HDFS
  4. Export data from HDFS to MySQL
  5. Import data from MySQL to Hive

1. Creating a table in MySQL

MySQL is installed in the master node of DSU Hadoop Cluster. Once you are connected to the cluster, you need to log in to the MySQL server.

dsu@master:~$mysql –u root –p

It will prompt you to enter the password. The MySQL credentials will be issued when you create an account at DSU.

Mysql>

Now, you are logged into MySQL.

Let’s create a database.

Mysql>create database faculty;

List down the databases that are present in MySQL server.

Mysql> show databases;

Switch to the newly created database.

Mysql>use faculty;


Let’s create a table called ‘student’ and populate values as given in below.

Student
student_id name age gender hometown
1001 Dilshan 21 Male Colombo
1002 Ajith 23 Male Mannar
1003 Dulani 20 Female Kandy
1004 Samantha 24 Female Kandy
1005 Rohit 22 Male Galle

Mysql> create table student (

student_id int not null,

name varchar(50) not null,

age int,

gender varchar(50),

hometown varchar(50));


To check whether the table is created properly:

Mysql> show tables;

It will display all the tables available inside the database ‘faculty’.


Mysql>describe student;

It will describe the schema of the table ‘student’.


Let’s insert the values to the table.

Mysql> insert into student (student_id, name, age, gender, hometown) values (1001, 'Dilshan', 21, 'male', 'Colombo');

Mysql> insert into student (student_id, name, age, gender, hometown) values (1002, 'Ajith', 23, 'male', 'Mannar');

Mysql> insert into student (student_id, name, age, gender, hometown) values (1003, 'Dulani', 20, 'female', 'Kandy');

Mysql> insert into student (student_id, name, age, gender, hometown) values (1004, 'Samantha', 24, 'female', 'Kandy');

Mysql> insert into student (student_id, name, age, gender, hometown) values (1005, 'Rohit', 22, 'male', 'Mannar');


Let’s say you have mistakenly typed ‘Dilan’ instead of ‘Dilshan’. We can modify it as follows.

Mysql> update student set name = 'Rohit' where student_id=1005;


Now the table is created inside MySQL. To check whether all the values are inserted properly:

Mysql> select * from student;

2. Connect to MySQL via SQOOP

We have seen how to display the available databases using MySQL command line interface (CLI) using “show databases;”.

Let’s connect to MySQL via SQOOP and list down the databases in Sqoop’s CLI.

Please refer the following official user guide from Apache Sqoop for more details about the notations that will be used throughout the demonstration.

http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html

First, check whether MySQL server is running in the local (master) machine.

dsu@master:~$service mysql status

If it is active, you will get “active (running)” message in green color. If not please contact the DSU.

Press ‘q’ to exit the message.


List down all the available databases inside the MySQL via Sqoop

To do that we have to make a connection to the MySQL server from Sqoop using JDBC driver. The complete command is given below.

dsu@master:~$ sqoop list-databases --connect jdbc:mysql://localhost --username root –P

It will ask you to enter password.

After the password is given, Sqoop will convert this command into mapreduce program and run it in the background and display all the available databases.


List down all the available tables inside a particular database via Sqoop

dsu@master:~$ sqoop list-tables --connect jdbc:mysql://localhost/faculty --username root -P

It will display all the tables that are present inside the ‘faculty’ database. It will display only ‘student’ table as we have not created other tables.


Run simple SQL queries via Sqoop CLI

Display all the records of student table:

dsu@master:~$ sqoop eval --connect jdbc:mysql://localhost/faculty --username root -P --query "select * from faculty.student"

Display selective records (display students older than 23 years)

dsu@master:~$ sqoop eval --connect jdbc:mysql://localhost/faculty --username root -P --query "select * from faculty.student where faculty.student.age >= 23"


3. Import data from MySQL into HDFS

So far, we have analyzed data that are present in MySQL server via Sqoop’s CLI. In this section, we will import data from MySQL database into Hadoop cluster. Please note that MySQL server is running in a single machine as a local server and it is not a part of our Hadoop cluster.

Import ‘student’ to table into HDFS

dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student -m 1

The ‘student’ table will be stored in the Hadoop default location “/user/dsu”. We also can provide our desired location where the table should be stored, as follows.

dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student --target-dir /sqooptest


4. Export data from HDFS to MySQL

We have already stored ‘student’ table data inside HDFS (/user/dsu/student). Let’s export the table data back to MySQL and store it in a new table called ‘student_new’.

Create a ‘student_new’ table.

Mysql> create table student_new (student_id int not null primary key, name varchar(50) not null, age int, gender varchar(50), hometown varchar(50));

Now we can export the data from HDFS to the newly created table.

dsu@master:~$ sqoop export --connect jdbc:mysql://localhost/faculty --username root -P --table student_new --export-dir /user/dsu/student -m 1

mysql> select * from student_new;

You will see that the new table is populated with the data imported from HDFS.


5. Import data from MySQL to Hive

dsu@master:~$ sqoop import --connect jdbc:mysql://localhost/faculty --username root -P --table student --target-dir /hive_student --hive-import --create-hive-table

Now, a hive table is populated (hive_student).

Display the content of the newly created hive table

Go to the hive interface and execute the following command.

dsu@master:~$hive

hive>select * from hive_student;


Apache Mahout

Apache Mahout is a library of scalable machine-learning algorithms that runs on top of Hadoop using the Mapreduce paradigm.

Mahout widely supports the following machine-learning tasks.

  • Collaborative filtering

    Mines user behavior patterns for recommendations (e.g., Amazon product recommendations)

  • Clustering

    Grouping of objects based on their characteristics (e.g. categorizing newspaper articles as politics, sports, entertainment, and so on.

  • Classification

    Classify new observations into a set of existing classes based on a training set of data containing observations whose classes are known (e.g., decide whether a new email is spam).

An example for each of the above use cases are illustrated systematically using the DSU Cluster.


Use Case 1: Movie Recommendations

This example recommends movies a user may be interested based on the ratings provided by her on other movies.

We utilize a publicly available dataset called “MovieLens 20M” from the following website.

https://grouplens.org/datasets/movielens/

The MovieLens 20M dataset carries 20 million ratings on 27,000 movies by 138,000 users.

This dataset contains several files. This example needs “u.data” file only.

The structure of the u.data file is as follows.

[user id] [movie id] [rating] [timestamp]


Step 1:

Download the MovieLens 20M dataset and extract it.


Step 2:

Copy u.data into HDFS.

$hadoop fs –put /home/dsu/Desktop/ml-20m/u.data /movielens


Step 3:

Run the item based recommender algorithm on u.data using appropriate parameters and values. You may use different algorithm.

$mahout recommenditembased --input /movielens/u.data --output /movielens/itemrecomds --similarityClassname SIMILARITY_PEARSON_CORRELATION –numRecommendations 5

# recommenditembased – the machine learning algorithm we are using

# --input – location of input file

# --output – location where the output is to be stored

# --similarityClassname – a desired similarity measure that is available in Mahout

# --numRecommendations – number of movies recommended per user


$hadoop fs –cat /movielens/itemrecomds

Format of the output file is as follows.

[user id] [movie_id: predicted rating of that user for that movie, ….]


Use Case 2: K-Means Clustering

Here, we group similar types of news articles together. For example, all news articles related to politics are grouped together.


Step 1:

Create multiple text files where each text file contains a news article. Keep them in a single folder and upload this folder to HDFS.

Assume, we have a folder called sl_news in the Desktop where there are six text files. Each text file contains a news article (2 political news, 2 sports news, & 2 entertainment news).

$hadoop fs –put /home/dsu/Desktop/sl_news/* /sl_news/


Step 2:

Convert the text files to sequence files using the command “seqdirectory”. Sequence files are binary files containing key-value pairs.

$mahout seqdirectory –i /sl_news/ -o /sl_news/kmeansseqfiles –ow

# -i input directory

# -o output directory (sequence file)

# -ow If set, overwrite the output directory


Step 3:

Convert sequence files into tf-idf vectors.

$mahout seq2sparse –i /sl_news/kmeansseqfiles –o /slnews/kmeanstfidffiles –ow

#seq2sparse – a library to convert sequence files into tf-idf vectors


Step 4:

Perform classification using ‘kmeans’ library.

$mahout kmeans –i /sl_news/kmeanstfidffiles/tfidf-vectors/ -c /sl_news/kmeanscentroids –o /sl_news/kmeansclusters –k 3 –ow –x 50 –dm org.apache.mahout.common.distance.CosineDistanceMeasure

# -c destination directory for centroids

# -k number of clusters

# -x number of iterations

# -dm distance measure

Here, we set k=3 as we have three types of news (politics sports, & entertainment).


$mahout clusterdump –d /sl_news/kmeanstfidffiles/dictionary.file-0 –dt sequencefile –i /sl_news/kmeansclusters/clusters-1-final –n 20 –b 100 –o /home/dsu/Desktop/dump-file.txt –p /sl_news/kmeansclusters/clusteredPoints/

You can view the clusters by typing,

$cat /home/dsu/Desktop/dump-file.txt

In our case, there are three clusters and twenty most common words in each cluster are displayed.


Use Case 3: Classify emails into ham and spam using Naïve Bayes classifier

A publicly available dataset is used for this demonstration. The dataset is called “Enron”. It can be downloaded from https://www.cs.cmu.edu/~./enron/.

This dataset has two folders. One is for hams and the other is for spams. Each email is stored as a separate text file inside these folders.


Step 1:

Copy the dataset into HDFS.

$hadoop fs –mkdir /enron

$hadoop fs –mkdir /enron/spam

$hadoop fs –mkdir /enron/ham

$hadoop fs -copyFromLocal /home/dsu/Desktop/enron/ham/* /enron/ham

$hadoop fs -copyFromLocal /home/dsu/Desktop/enron/spam/* /enron/spam


Step 2:

Convert the text data into sequence file format.

$mahout seqdirectory -i /enron -o /enron/nbseqfiles


Step 3:

Convert sequence data into tf-idf vectors.

$mahout seq2sparse -i /enron/nbseqfiles -o /enron/nbsparse


Step 4:

Training and test datasets

# -i files directory (should use tfidf-vectors)

# --trainingOutput ( directory for training data)

# --testOutput (directory for test data)

# --randomSelectPct (percent of data to put in training)

# --overwrite - overwrite current data

# --sequenceFiles (indicating that the files are of sequence form)

# -xm (type of processing. Sequential / mapreduce)


$mahout split -i /enron/nbsparse/tfidf-vectors --trainingOutput /enron/nbTrain --testOutput /enron/nbTest --randomSelectionPct 20 --overwrite --sequenceFiles -xm sequential


Build the NB Model

# -i Training files data

# li - path to store the label index

# -o path to store the model

# -ow overwrite

# -c train complementary naivebayes


$mahout trainnb -i /enron/nbTrain -li /enron/nbLabels -o /enron/nbmodel -ow -c


Step 6:

Testing the model using the testing dataset.

# i - test data directory

# -m model directory

# -l - labels

# -ow overwrite

# -o predictions directory

# -c complementary naive bayes


$mahout trainnb -i /enron/nbTrain -li /enron/nbLabels -o /enron/nbmodel -ow -c


Step 6:

Testing the model using the testing dataset.

# i - test data directory

# -m model directory

# -l - labels

# -ow overwrite

# -o predictions directory

# -c complementary naive bayes


$mahout testnb -i /enron/nbTest -m /enron/nbmodel -l /enron/nbLabels -ow -o /enron/nbpredictions –c

The performance metrics of this model is displayed on the terminal.


DSU is working on adding more tools from Hadoop Ecosystem in the near future. Please contact us for further details.