<> preface
In many business scenarios , After reading and analyzing the original data , Sort and output the output results according to the specified business fields , It is convenient for the upper application to display or use the result data , Reduce the cost of secondary sorting
stay hadoop of MapReduce in , Provides the function related to the custom sorting of the client API
<>MapReduce sort
* By default ,MapTask and ReduceTask The data will be key Sort
* The default sort is in dictionary order , And the method of sorting is fast sorting
<>MapReduce Sort classification
1, Partial sort
MapReduce Sort the dataset as a whole according to the key values of the input records , Ensure that the internal data of the output file is in order
2, Full sort
The final output result is only one file , And the interior is orderly , The implementation method is to set only one ReduceTask, But this method is used when a file is very large , Efficiency will be very low , This is lost MapReduce Ability to process tasks in parallel
3, Auxiliary sort
stay Reduce End pair key Group , for instance , On received key by bean Object time , Want one or more fields to be the same key Enter the same reduce Method time , Grouping sorting can be adopted
4, Secondary sort
In custom sorting ,compareto When the judgment condition of is two or more, it is a secondary sort
<> Custom sorting cases
Remember in serialization , Let's take the peak traffic and peak valley traffic of mobile phone numbers as examples , We directly take the output of this case as the input data , Sort the result file by total traffic
The expected output data format is as follows: :
1, Customize one Bean object , realization WritableComparable Interface
After implementing this interface , rewrite compareTo method , The logic of the fields to be sorted is compareTo Written in
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput;
import java.io.DataOutput; import java.io.IOException; public class
PhoneSortBean implements WritableComparable<PhoneSortBean> { // Peak flow private
long upFlow; // Trough flow private long downFlow; // Total flow private long sumFlow;
@Override public int compareTo(PhoneSortBean o) { if (this.sumFlow > o.sumFlow)
{ return -1; }else if(this.sumFlow < o.sumFlow){ return 1; }else { return 0; }
} // Provide parameterless construction public PhoneSortBean() { } // With three parameters getter and setter method public long
getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow
= upFlow; } public long getDownFlow() { return downFlow; } public void
setDownFlow(long downFlow) { this.downFlow = downFlow; } public long
getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow
+ this.downFlow; } // Implement serialization and deserialization methods , Pay attention to the same order @Override public void
write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override
public void readFields(DataInput dataInput) throws IOException { this.upFlow =
dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow =
dataInput.readLong(); } // rewrite ToString method @Override public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
2, custom Mapper
Imagine , Since the data can be sorted ,Map Phase output key Should be a custom comparable object , This is the one above bean,value Is the mobile number
import org.apache.commons.lang3.StringUtils; import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import
java.util.LinkedList; public class SortPhoneMapper extends Mapper<LongWritable,
Text, PhoneSortBean,Text> { private Text outV = new Text(); private
PhoneSortBean outK = new PhoneSortBean(); @Override protected void
map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException { String line = value.toString(); // Split data String[] splits =
line.split("\t"); LinkedList<String> linkedList = new LinkedList<>();
for(String str:splits){ if(StringUtils.isNotEmpty(str)){
linkedList.add(str.trim()); } } // Capture the required data : cell-phone number , Upstream traffic , Downstream flow String phone =
linkedList.get(0); String max = linkedList.get(1); String mine =
linkedList.get(2); // encapsulation outK outV outV.set(phone);
outK.setUpFlow(Long.parseLong(max)); outK.setDownFlow(Long.parseLong(mine));
outK.setSumFlow(); // Write outK outV context.write(outK, outV); } }
3, custom Reducer
Reduce The output result of the stage is still based on the mobile phone number key, and value Customized for sorted bean
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; public class SortPhoneReducer extends
Reducer<PhoneSortBean,Text , Text, PhoneSortBean> { @Override protected void
reduce(PhoneSortBean key, Iterable<Text> values, Context context) throws
IOException, InterruptedException { for (Text value : values) {
context.write(value,key); } } }
4, custom Driver class
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class
SortPhoneJob { public static void main(String[] args) throws Exception { //1
obtain job object Configuration conf = new Configuration(); Job job =
Job.getInstance(conf); //2 Related book Driver class job.setJarByClass(SortPhoneJob.class);
//3 set up Map Terminal output KV type job.setReducerClass(SortPhoneReducer.class);
job.setMapperClass(SortPhoneMapper.class); //4 relation Mapper and Reducer
job.setMapOutputKeyClass(PhoneSortBean.class);
job.setMapOutputValueClass(Text.class); //5 Set the final output of the program KV type
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneSortBean.class); //6 Set the input and output path of the program String inPath =
"F:\\ Net disk \\csv\\phone_out_bean.txt"; String outPath =
"F:\\ Net disk \\csv\\phone_out_sort"; FileInputFormat.setInputPaths(job, new
Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); //7
Submit Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Run the above program , Observe output results , Can see , The total traffic is sorted from large to small
Can see , final 3 In row data , Same total flow , If there is another demand at this time , When the total flow is the same , Then sort by peak traffic , What should we do ?
In fact, it only needs to be customized bean Medium compareto You can continue to add sorting logic in the method
public int compareTo(PhoneSortBean o) { if (this.sumFlow > o.sumFlow) { return
-1; }else if(this.sumFlow < o.sumFlow){ return 1; }else {
// If the total flow is the same , Then sort by peak traffic if(this.upFlow > o.upFlow){ return -1; }else
if(this.upFlow < o.upFlow){ return 1; }else { return 0; } } }
<> Sorting cases within zones
Business requirements , In the above case , We further put forward new requirements , For different mobile phone numbers, they are finally written to different files , Then on the basis of the above , You also need to combine the logic of custom partitions
Those that need to be transformed include 2 spot :
* Add a custom partition , Specify the area code according to the business rules
* reform Driver class , Add custom partition , set up MapReduceTask Number of tasks
1, Add custom partition
public class MyPartioner extends Partitioner<MyPhoneBean, Text> { @Override
public int getPartition(MyPhoneBean myPhoneBean, Text text, int partion) {
String phone = text.toString(); if(phone.startsWith("135")){ return 0; }else
if(phone.startsWith("136")){ return 1; }else if(phone.startsWith("137")){
return 2; }else { return 3; } } }
2, reform Driver class
Other logic should be consistent with the above
public class MyDriver { public static void main(String[] args) throws
Exception { //1 obtain job object Configuration conf = new Configuration(); Job job =
Job.getInstance(conf); //2 Related book Driver class job.setJarByClass(MyDriver.class); //3
set up Map Terminal output KV type job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class); //4 relation Mapper and Reducer
job.setMapOutputKeyClass(MyPhoneBean.class);
job.setMapOutputValueClass(Text.class); //5 Set the final output of the program KV type
job.setOutputKeyClass(Text.class); job.setOutputValueClass(MyPhoneBean.class);
//6, Set the output file to 2 individual job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartioner.class); //7, Set the input and output path of the program String inPath =
"F:\\ Net disk \\csv\\phone_out_bean.txt"; String outPath =
"F:\\ Net disk \\csv\\phone_out_sort"; FileInputFormat.setInputPaths(job, new
Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); //7
Submit Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Run the above program , Then open two files randomly to check whether the above requirements are met , Can see , File final output to 4 Partition files , And the total traffic in each partition file is also in order from high to low
Technology
Daily Recommendation