hadoop - Group correspoding key and values -
i have used case write map reducing code have group values corresponding same queue:
input:
a,b
a,c
b,a
b,d
output:
a {b,c}
b {a,d}
i have written below :
import java.io.ioexception; import java.util.stringtokenizer; /* * org.apache.hadoop packaged can imported using jar present in lib * directory of java project. */ import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.mapper; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.input.textinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; import org.apache.hadoop.mapreduce.lib.output.textoutputformat; public class groupkeyvalues { public static class map extends mapper { public void map(longwritable key, text value, context con) throws ioexception, interruptedexception { text mykey = new text(); text myval = new text(); string line = value.tostring(); stringtokenizer st = new stringtokenizer(line); while (st.hasmoretokens()) { string thish = st.nexttoken(); string[] splitdata = thish.split(","); mykey.set(splitdata[0]); myval.set(splitdata[1]); } con.write(mykey, myval); } } @suppresswarnings("deprecation") public static void main(string[] args) throws exception { configuration conf = new configuration(); @suppresswarnings("deprecation") job job = new job(conf, "groupkeyvalues"); job.setjarbyclass(groupkeyvalues.class); job.setmapperclass(map.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(text.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); path outputpath = new path(args[1]); fileinputformat.addinputpath(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); outputpath.getfilesystem(conf).delete(outputpath); system.exit(job.waitforcompletion(true) ? 0 : 1); } }
you missing reducer aggregate values single "row" value. example, can use arraywritable this:
public static class aggregatingreducer extends reducer<text, text, text, arraywritable> { private arraywritable result = new arraywritable(text.class); public void reduce(text key, iterable<text> values, context context) throws ioexception, interruptedexception { list<text> list = new arraylist<>(); (text value : values) { list.add(value); } result.set(list.toarray(new text[list.size()])); context.write(key, result); } }
in job setup, make sure add this:
job.setreducerclass(aggregatingreducer.class); job.setoutputvalueclass(arraywritable.class); //instead of text.class
alternatively (depending on need) can concatenate reducer values stringbuilder , emit text instead of accumulating , emitting arraywritable.
update: here example of stringbuilder use comma delimiter:
public static class aggregatingreducer extends reducer<text, text, text, text> { private text result = new text(); public void reduce(text key, iterable<text> values, context context) throws ioexception, interruptedexception { stringbuilder sb = new stringbuilder(); (text value : values) { if (sb.length() != 0) { sb.append(','); } sb.append(value); } result.set(sb.tostring()); context.write(key, result); } }
in driver value type needs changed text:
job.setoutputvalueclass(text.class);
Comments
Post a Comment