Zwei Haupt Phasen:
Im Beispiel sollen die Buchstaben in den Daten gezählt werden:
Die Elemente im einzelnen:
Client:
InputFormater:
Mapper:
Shuffle-And-Sort:
Combiner:
Partitioner:
Sort:
Reducer:
OutputFormat:
Mit Hadoop 2 hat sich die API geändert, es muss daher das Package „org.apache.hadoop.mapreduce“ verwendet werden.
Ziel ist es den Job mit dem JDeveloper auf einem Windows7 Rechner zu erstellen.
Größte Herausforderung ist dabei eine laufähige Umgebung unter Win7 einzurichten.
⇒ Mit MS Windows Clients mit Hadoop arbeiten
Für den lokalen Test kann ist die Implementierung des „hadoop Tool interfaces“ notwendig!
Beispiel siehe ⇒ http://hadoopi.wordpress.com/2013/06/05/hadoop-implementing-the-tool-interface-for-mapreduce-driver/
hadoop jar <jar file> <startClass> -fs file:/// -jt local <local input Dir> <local output Dir>
Es soll das in Oracle typische Beispiel, wie viele Mitarbeiter arbeiten in welcher Abteilung umgesetzt werden:
SELECT COUNT(*), deptno FROM emp /
Zur Zeit allerdings erstmal das Sum Beispiel implementiert um einen ersten einfachen Test zumzuseten.
Die Daten werden im dem typischem emp format als CSV Liste im HDFS abgelegt.
Die klassischen Testdaten erzeugen:
SET linesize 1000 SET trimspool ON SET pagesize 0 SET feedback off spool /tmp/emp.csv SELECT empno || ':' || ename || ':' || job || ':' || mgr || ':' || to_char(hiredate,'dd.mm.yyyy')|| ':' || sal || ':' || comm || ':' || deptno FROM scott.emp ORDER BY ename / spool off
CSV auf das HDFS kopieren
hdfs dfs -put /tmp/emp.csv /user/gpipperr hdfs dfs -cat /user/gpipperr/emp.csv 7499:ALLEN:SALESMAN:7698:20.02.1981:1600:300:30 7782:CLARK:MANAGER:7839:09.06.1981:2450::10 7566:JONES:MANAGER:7839:02.04.1981:2975::20 7839:KING:PRESIDENT::17.11.1981:5000::10 7654:MARTIN:SALESMAN:7698:28.09.1981:1250:1400:30 7934:MILLER:CLERK:7782:23.01.1982:1300::10 7844:TURNER:SALESMAN:7698:08.09.1981:1500:0:30 7521:WARD:SALESMAN:7698:22.02.1981:1250:500:30
package gpi.hadoop; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; // input Key - input Value - output Key - output Value public class DeptCountMapper extends Mapper<Object, Text, Text, IntWritable> { static IntWritable oneValue = new IntWritable(1); @Override // input Key - input Value - output Value public void map(Object key, Text value, Context contex) throws IOException, InterruptedException { /* input: 0 1 2 3 4 5 6 7 7876:ADAMS:CLERK:7788:12.01.1983:1100:300:20 Split in Key/Values pairs 20,1 */ // read on row String[] emprow = value.toString().split(":"); String deptno = emprow[7]; contex.write(new Text(deptno), oneValue); } }
package gpi.hadoop; import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class DeptCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable totalWordCount = new IntWritable(); @Override public void reduce(Text deptno, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException { //logic /* 30,1,1,1, 50,1,1 Result should look like this 30 3 60 2 */ int deptcount = 0; for (IntWritable count : counts) { deptcount += 1; } context.write(deptno, new IntWritable(deptcount)); } }
package gpi.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class DeptCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Usage: DeptCount [input] [output]"); System.exit(2); } else { System.out.println("Call DeptCount with Parameter 1::"+args[1]+" Parameter 2::"+args[2]); } // create a new Configuration Job job = Job.getInstance(conf); job.setJobName(args[0]); // Mapper job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(DeptCountMapper.class); // Reducer job.setReducerClass(DeptCountReducer.class); job.setOutputFormatClass(TextOutputFormat.class); // Input and Output Path to the data FileInputFormat.setInputPaths(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); //main driver Class job.setJarByClass(DeptCountT.class); //set Output Class job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.submit(); } }
Beispiel für die Verwendung des Tool Interfaces für die Startklasse:
package gpi.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class DeptCountT extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new DeptCountT(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { //get Config object Configuration conf = this.getConf(); // create a new Configuration Job job = Job.getInstance(conf); job.setJobName(args[0]); // Mapper job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(DeptCountMapper.class); // Reducer job.setReducerClass(DeptCountReducer.class); job.setOutputFormatClass(TextOutputFormat.class); // Input and Output Path to the data FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //main driver Class job.setJarByClass(DeptCount.class); //set Output Class job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // Execute job and return status return job.waitForCompletion(true) ? 0 : 1; } }
Jar file erzeugen und auf den Hadoop Server kopieren
⇒ Jar File im Oracle JDeveloper
Jar File aufrufen:
yarn jar GpiHadoopExamples.jar gpi.hadoop.DeptCount /user/gpipperr/emp.csv /user/gpipperr/empRun1 yarn application -list hdfs dfs -cat /user/gpipperr/empRun1/part-r-00000 10 3 20 1 30 4
Hinweise
Wird mit yarn jar <class.jar> <class_name> <in> <out>„ der Job aufgerufen, ist der Parameter 1 der Klassenamen, der Parameter 2 das in Verzeichnis und der Parameter 3 das out Verzeichnis.
D.h. passt aber nicht zu der Verwendung der Eingabe Parameter in den verbreiteten Dokumentationen und Beispielen.
Fehler:
istsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://quickstart.cloudera:8020/user/gpipperr/emp.csv already exists
Nur wenn in der Jar Datei die Main Klasse definiert wird und ein Aufruf nach diesem Muster durchgeführt wird, passt das im Detail: yarn jar <class.jar> <class_name> <in> <out>„
Ansonsten muss bei Fehler mit den Aufrufparamentern (Typischerweise „Output directory xxx already exists“ ) der Index der Parameter im Code entsprechend angepasst werden.
Gute weitere Beispiele:
MapAndReduce Patterns:
Dell Zhang
Buch: