<> preface

In many business scenarios , After reading and analyzing the original data , Sort and output the output results according to the specified business fields , It is convenient for the upper application to display or use the result data , Reduce the cost of secondary sorting

stay hadoop of MapReduce in , Provides the function related to the custom sorting of the client API

<>MapReduce sort

* By default ,MapTask and ReduceTask The data will be key Sort
* The default sort is in dictionary order , And the method of sorting is fast sorting
<>MapReduce Sort classification

1, Partial sort

MapReduce Sort the dataset as a whole according to the key values of the input records , Ensure that the internal data of the output file is in order

2, Full sort

The final output result is only one file , And the interior is orderly , The implementation method is to set only one ReduceTask, But this method is used when a file is very large , Efficiency will be very low , This is lost MapReduce Ability to process tasks in parallel

3, Auxiliary sort

stay Reduce End pair key Group , for instance , On received key by bean Object time , Want one or more fields to be the same key Enter the same reduce Method time , Grouping sorting can be adopted

4, Secondary sort

In custom sorting ,compareto When the judgment condition of is two or more, it is a secondary sort

<> Custom sorting cases

Remember in serialization , Let's take the peak traffic and peak valley traffic of mobile phone numbers as examples , We directly take the output of this case as the input data , Sort the result file by total traffic

The expected output data format is as follows: :

1, Customize one Bean object , realization WritableComparable Interface

After implementing this interface , rewrite compareTo method , The logic of the fields to be sorted is compareTo Written in
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput;
import java.io.DataOutput; import java.io.IOException; public class
PhoneSortBean implements WritableComparable<PhoneSortBean> { // Peak flow private
long upFlow; // Trough flow private long downFlow; // Total flow private long sumFlow;
@Override public int compareTo(PhoneSortBean o) { if (this.sumFlow > o.sumFlow)
{ return -1; }else if(this.sumFlow < o.sumFlow){ return 1; }else { return 0; }
} // Provide parameterless construction public PhoneSortBean() { } // With three parameters getter and setter method public long
getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow
= upFlow; } public long getDownFlow() { return downFlow; } public void
setDownFlow(long downFlow) { this.downFlow = downFlow; } public long
getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.upFlow
+ this.downFlow; } // Implement serialization and deserialization methods , Pay attention to the same order @Override public void
write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override
public void readFields(DataInput dataInput) throws IOException { this.upFlow =
dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow =
dataInput.readLong(); } // rewrite ToString method @Override public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow; } }
2, custom Mapper

Imagine , Since the data can be sorted ,Map Phase output key Should be a custom comparable object , This is the one above bean,value Is the mobile number
import org.apache.commons.lang3.StringUtils; import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import
org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import
java.util.LinkedList; public class SortPhoneMapper extends Mapper<LongWritable,
Text, PhoneSortBean,Text> { private Text outV = new Text(); private
PhoneSortBean outK = new PhoneSortBean(); @Override protected void
map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException { String line = value.toString(); // Split data String[] splits =
line.split("\t"); LinkedList<String> linkedList = new LinkedList<>();
for(String str:splits){ if(StringUtils.isNotEmpty(str)){
linkedList.add(str.trim()); } } // Capture the required data : cell-phone number , Upstream traffic , Downstream flow String phone =
linkedList.get(0); String max = linkedList.get(1); String mine =
linkedList.get(2); // encapsulation outK outV outV.set(phone);
outK.setUpFlow(Long.parseLong(max)); outK.setDownFlow(Long.parseLong(mine));
outK.setSumFlow(); // Write outK outV context.write(outK, outV); } }
3, custom Reducer

Reduce The output result of the stage is still based on the mobile phone number key, and value Customized for sorted bean
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException; public class SortPhoneReducer extends
Reducer<PhoneSortBean,Text , Text, PhoneSortBean> { @Override protected void
reduce(PhoneSortBean key, Iterable<Text> values, Context context) throws
IOException, InterruptedException { for (Text value : values) {
context.write(value,key); } } }
4, custom Driver class
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class
SortPhoneJob { public static void main(String[] args) throws Exception { //1
obtain job object Configuration conf = new Configuration(); Job job =
Job.getInstance(conf); //2 Related book Driver class job.setJarByClass(SortPhoneJob.class);
//3 set up Map Terminal output KV type job.setReducerClass(SortPhoneReducer.class);
job.setMapperClass(SortPhoneMapper.class); //4 relation Mapper and Reducer
job.setMapOutputKeyClass(PhoneSortBean.class);
job.setMapOutputValueClass(Text.class); //5 Set the final output of the program KV type
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneSortBean.class); //6 Set the input and output path of the program String inPath =
"F:\\ Net disk \\csv\\phone_out_bean.txt"; String outPath =
"F:\\ Net disk \\csv\\phone_out_sort"; FileInputFormat.setInputPaths(job, new
Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); //7
Submit Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Run the above program , Observe output results , Can see , The total traffic is sorted from large to small

Can see , final 3 In row data , Same total flow , If there is another demand at this time , When the total flow is the same , Then sort by peak traffic , What should we do ?

In fact, it only needs to be customized bean Medium compareto You can continue to add sorting logic in the method
public int compareTo(PhoneSortBean o) { if (this.sumFlow > o.sumFlow) { return
-1; }else if(this.sumFlow < o.sumFlow){ return 1; }else {
// If the total flow is the same , Then sort by peak traffic if(this.upFlow > o.upFlow){ return -1; }else
if(this.upFlow < o.upFlow){ return 1; }else { return 0; } } }
<> Sorting cases within zones

Business requirements , In the above case , We further put forward new requirements , For different mobile phone numbers, they are finally written to different files , Then on the basis of the above , You also need to combine the logic of custom partitions

Those that need to be transformed include 2 spot :

* Add a custom partition , Specify the area code according to the business rules
* reform Driver class , Add custom partition , set up MapReduceTask Number of tasks
1, Add custom partition
public class MyPartioner extends Partitioner<MyPhoneBean, Text> { @Override
public int getPartition(MyPhoneBean myPhoneBean, Text text, int partion) {
String phone = text.toString(); if(phone.startsWith("135")){ return 0; }else
if(phone.startsWith("136")){ return 1; }else if(phone.startsWith("137")){
return 2; }else { return 3; } } }
2, reform Driver class

Other logic should be consistent with the above
public class MyDriver { public static void main(String[] args) throws
Exception { //1 obtain job object Configuration conf = new Configuration(); Job job =
Job.getInstance(conf); //2 Related book Driver class job.setJarByClass(MyDriver.class); //3
set up Map Terminal output KV type job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class); //4 relation Mapper and Reducer
job.setMapOutputKeyClass(MyPhoneBean.class);
job.setMapOutputValueClass(Text.class); //5 Set the final output of the program KV type
job.setOutputKeyClass(Text.class); job.setOutputValueClass(MyPhoneBean.class);
//6, Set the output file to 2 individual job.setNumReduceTasks(4);
job.setPartitionerClass(MyPartioner.class); //7, Set the input and output path of the program String inPath =
"F:\\ Net disk \\csv\\phone_out_bean.txt"; String outPath =
"F:\\ Net disk \\csv\\phone_out_sort"; FileInputFormat.setInputPaths(job, new
Path(inPath)); FileOutputFormat.setOutputPath(job, new Path(outPath)); //7
Submit Job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
Run the above program , Then open two files randomly to check whether the above requirements are met , Can see , File final output to 4 Partition files , And the total traffic in each partition file is also in order from high to low

Technology
©2019-2020 Toolsou All rights reserved,
Solve in servlet The Chinese output in is a question mark C String function and character function in language MySQL management 35 A small coup optimization Java performance —— Concise article Seven sorting algorithms (java code ) use Ansible Batch deployment SSH Password free login to remote host according to excel generate create Build table SQL sentence Spring Source code series ( sixteen )Spring merge BeanDefinition Principle of Virtual machine installation Linux course What are the common exception classes ?