You will need to at least understand the basics of what are the mapper and the reducer to follow this post. You may want to read this from Apache.
We will use Maven to build the project. If you have no idea how to do this, read Building a JAR Executable with Maven and Spring. We will feed this JAR via the Amazon Elastic MapReduce (EMR) and save the output in Amazon S3.
Here are the EMR supported Hadoop Versions. We will be using 1.0.3.
What we will do:
Assume we have a database called Company and there is a table called Employee with two columns: id and title.
We will count the number of employees with the same titles.
This is same as the WordCount examples you see in other tutorials, but we are fetching this from a database.
Install Hadoop Library
First in your java project, include the Maven Library in the pom.xml file.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>
The File Structure
The program will be very basic and contain the following files. The filenames should be self-explanatory.
Main.java
Map.java
Reduce.java
The mapred library VS the mapreduce library
When you are reading other hadoop examples online, you will see them using either the mapred or the mapreduce library. mapred is the older version, while mapreduce is the cleaner and newer version. To upgrade from mapred to mapreduce, read Hadoop - mapred VS mapreduce libraries.
This example will use the org.apache.hadoop.mapreduce library.
EmployeeRecord
We will need to serialize the object of our interest by implementing Writable and DBWritable as show below.
The Mapper
The Reducer
Main.java
We will hope everything up. The steps are simple.
Create a Job.
Set output format.
Set input format.
Set Mapper class.
Set Reducer class.
Set input. (In our case, it will be from the database)
Set output.
The mapred library VS the mapreduce library
When you are reading other hadoop examples online, you will see them using either the mapred or the mapreduce library. mapred is the older version, while mapreduce is the cleaner and newer version. To upgrade from mapred to mapreduce, read Hadoop - mapred VS mapreduce libraries.
This example will use the org.apache.hadoop.mapreduce library.
EmployeeRecord
We will need to serialize the object of our interest by implementing Writable and DBWritable as show below.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.emr.model; | |
import java.sql.*; | |
import java.io.*; | |
import java.sql.ResultSet; | |
import org.apache.hadoop.io.Writable; | |
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | |
public class EmployeeRecord implements Writable, DBWritable { | |
private static int DB_ID_INDEX = 1; | |
private static int DB_TITLE_INDEX = 2; | |
public Long id; | |
public String title; | |
public void write(DataOutput out) throws IOException { | |
out.writeLong(this.id); | |
out.writeUTF(this.title); | |
} | |
public void readFields(DataInput in) throws IOException { | |
this.id = in.readLong(); | |
this.title = in.readUTF(); | |
} | |
public void write(PreparedStatement statement) throws SQLException { | |
statement.setLong(DB_ID_INDEX, this.id); | |
statement.setString(DB_TITLE_INDEX, this.title); | |
} | |
public void readFields(ResultSet resultSet) throws SQLException { | |
this.id = resultSet.getLong(DB_ID_INDEX); | |
this.title = resultSet.getString(DB_TITLE_INDEX); | |
} | |
} |
The Mapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.emr; | |
import java.io.IOException; | |
import java.util.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
import com.mycompany.emr.EmployeeRecord; | |
public class Map extends Mapper<LongWritable, EmployeeRecord, Text, IntWritable> { | |
private final static IntWritable one = new IntWritable(1); | |
public void map(LongWritable key, EmployeeRecord value, Context context) throws IOException, InterruptedException { | |
context.write(new Text(value.title), one); | |
} | |
} |
The Reducer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.emr; | |
import java.io.IOException; | |
import java.util.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
public class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { | |
public void reduce(Text key, Iterable<IntWritable> values, Context context) | |
throws IOException, InterruptedException { | |
int sum = 0; | |
for (IntWritable val : values) { | |
sum += val.get(); | |
} | |
context.write(key, new IntWritable(sum)); | |
} | |
} |
Main.java
We will hope everything up. The steps are simple.
Create a Job.
Set output format.
Set input format.
Set Mapper class.
Set Reducer class.
Set input. (In our case, it will be from the database)
Set output.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.emr; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.conf.*; | |
import org.apache.hadoop.io.*; | |
import org.apache.hadoop.mapreduce.*; | |
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; | |
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.log4j.Logger; | |
import com.mycompany.emr.model.EmployeeRecord; | |
public class Main { | |
private static String JOB_NAME = "DB WordCount"; | |
private static String MYSQL_DRIVER = "com.mysql.jdbc.Driver"; | |
private static String DB_NAME = "jdbc:mysql://localhost/company"; | |
private static String DB_USER = {your_db_username}; | |
private static String DB_PASS = {your_db_password}; | |
private static String TABLE_NAME = "employee"; | |
private static String PRIMARY_ID = "id"; | |
public static void main( String[] args ) throws Exception { | |
Configuration conf = new Configuration(); | |
Job job = new Job(conf, JOB_NAME); | |
job.setJarByClass(Map.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setMapperClass(Map.class); | |
job.setReducerClass(Reduce.class); | |
job.setInputFormatClass(DBInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
DBConfiguration.configureDB(job.getConfiguration(), MYSQL_DRIVER, DB_NAME, DB_USER, DB_PASS); | |
String [] fields = { "id", "title" }; | |
DBInputFormat.setInput(job, SerializableBookMetadata.class, TABLE_NAME, null , PRIMARY_ID, fields); | |
FileOutputFormat.setOutputPath(job, new Path(args[0])); | |
System.exit(job.waitForCompletion(true)? 0 : 1); | |
} | |
} |
Run the Job via the AWS EMR console
Compile the project and generate a self-contained JAR file. If you are using maven, read Building a JAR Executable with Maven and Spring.
Upload your JAR file to your s3 bucket.
In the AWS EMR console, specify the location of the JAR file.
JAR location: {your_bucket_name}/{jar_name}
Arguments: s3n://{your_bucket_name}/output
The program above takes in the output location as an argument.
Read AWS - Elastic Map Reduce Tutorial for more details on how to create a job flow in EMR.
If you encounter the mysql driver missing error, read Amazon Elastic MapReduce (EMR) ClassNotFoundException: com.mysql.jdbc.Driver.
No comments:
Post a Comment