MapReduce实现TopN效果示例解析

 更新时间:2023年07月18日 08:51:46   作者:huan1993  
这篇文章主要为大家介绍了MapReduce实现TopN效果示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

1、背景

最近在学习Hadoop的MapReduce,此处记录一下如何实现 TopN 的效果,以及在MapReduce中如何实现 自定义分组。

2、需求

我们有一份数据,数据中存在如下3个字段,订单编号,订单项和订单项价格。 输出的数据,需求如下:

  • 订单编号与订单编号之间需要正序输出。
  • 输出每个订单价格最高的2个订单项。

3、分析

  • 订单编号与订单编号之间需要正序输出,那么订单编号必须要作为Key,因为只有Key才有排序操作。
  • 输出每个订单价格最高的2个订单项: 这个输出是在reduce阶段,并且是每个订单,因此需要根据订单编号进行分组操作(前后2个key比较,相同则为一组),而分组也只有Key才有,因此就需要JavaBean(订单编号、订单项、订单项价格)来作为组合Key。
  • 订单编号与订单编号之间需要正序输出 \&& 输出每个订单价格最高的2个订单项: 可以看出在Key中的排序规则为:根据订单编号升序,然后根据订单项价格倒序排序, 并且是根据订单编号来分组。
  • 我们知道默认MapReduce中默认的分区规则是,根据key的hascode来进行分区,而 分区 下是有多个 分组,每个分组调用一次reduce方法。 而我们上方的思路是,根据订单编号来进行分组,当我们Key是JavaBean组合Key时,相同的订单编号所在的JavaBean会被分在一个分组吗,这个不一定,因为JavaBean的hashcode不一定一致,因此就需要我们自定义分区(继承Partitioner类)。此处我们job.setNumReduceTasks设置为1个,因此不考虑这个分区的问题。
  • 一个分区下有多个分组,每个分组调用一次reduce方法。

4、准备数据

4.1 准备数据

20230713000010  item-101    10
20230713000010  item-102    30
20230713000015  item-151    10
20230713000015  item-152    20
20230713000010  item-103    20
20230713000015  item-153    30
20230713000012  item-121    50
20230713000012  item-122    10
20230713000012  item-123    30

4.2 每行数据格式

订单编号          订单项      订单项价格
20230713000012  item-123    30

每行数据的分隔符为空格

4.3 期望输出结果

20230713000010  item-102    30
20230713000010  item-103    20
20230713000012  item-121    50
20230713000012  item-123    30
20230713000015  item-153    30
20230713000015  item-152    20

5、编码实现

5.1 引入jar包

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.3.4</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>3.2.2</version>
            <configuration>
                <archive>
                    <manifest>
                        <addClasspath>true</addClasspath>
                        <classpathPrefix>lib/</classpathPrefix>
                        <mainClass>com.huan.hadoop.mr.TopNDriver</mainClass>
                    </manifest>
                </archive>
            </configuration>
        </plugin>
    </plugins>
</build>

5.2 编写实体类

package com.huan.hadoop.mr;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * 订单数据
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:20
 */
@Getter
@Setter
public class OrderVo implements WritableComparable<OrderVo> {
    /**
     * 订单编号
     */
    private long orderId;
    /**
     * 订单项
     */
    private String itemId;
    /**
     * 订单项价格
     */
    private long price;
    @Override
    public int compareTo(OrderVo o) {
        // 排序: 根据 订单编号 升序, 如果订单编号相同,则根据 订单项价格 倒序
        int result = Long.compare(this.orderId, o.orderId);
        if (result == 0) {
            // 等于0说明 订单编号 相同,则需要根据 订单项价格 倒序
            result = -Long.compare(this.price, o.price);
        }
        return result;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        // 序列化
        out.writeLong(orderId);
        out.writeUTF(itemId);
        out.writeLong(price);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        // 反序列化
        this.orderId = in.readLong();
        this.itemId = in.readUTF();
        this.price = in.readLong();
    }
    @Override
    public String toString() {
        return this.getOrderId() + "\t" + this.getItemId() + "\t" + this.getPrice();
    }
}
  • 此处需要实现 WritableComparable接口
  • 需要编写 排序和序列化方法

5.3 编写分组方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 分组: 订单编号相同说明是同一组,否则是不同的组
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:30
 */
public class TopNGroupingComparator extends WritableComparator {
    public TopNGroupingComparator() {
        // 第二个参数为true: 表示可以通过反射创建实例
        super(OrderVo.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 订单编号 相同说明是同一个对象,否则是不同的对象
        return ((OrderVo) a).getOrderId() == ((OrderVo) b).getOrderId() ? 0 : 1;
    }
}
  • 实现 WritableComparator接口,自定义分组规则。
  • 分组是发生在reduce阶段,前后2个key比较,相同则为一组,一组调用一次reduce方法。

5.4 编写 map 方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * map 操作: 输出的key为OrderVo, 输出的value为: price
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:28
 */
public class TopNMapper extends Mapper<LongWritable, Text, OrderVo, LongWritable> {
    private final OrderVo outKey = new OrderVo();
    private final LongWritable outValue = new LongWritable();
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderVo, LongWritable>.Context context) throws IOException, InterruptedException {
        // 获取一行数据 20230713000010  item-101    10
        String row = value.toString();
        // 根据 \t 进行分割
        String[] cells = row.split("\\s+");
        // 获取订单编号
        long orderId = Long.parseLong(cells[0]);
        // 获取订单项
        String itemId = cells[1];
        // 获取订单项价格
        long price = Long.parseLong(cells[2]);
        // 设置值
        outKey.setOrderId(orderId);
        outKey.setItemId(itemId);
        outKey.setPrice(price);
        outValue.set(price);
        // 写出
        context.write(outKey, outValue);
    }
}
  • map 操作: 输出的key为OrderVo, 输出的value为: price

5.5 编写reduce方法

package com.huan.hadoop.mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现,
 * 即 订单编号 相同的认为一组
 *
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNReducer extends Reducer<OrderVo, LongWritable, OrderVo, NullWritable> {
    @Override
    protected void reduce(OrderVo key, Iterable<LongWritable> values, Reducer<OrderVo, LongWritable, OrderVo, NullWritable>.Context context) throws IOException, InterruptedException {
        int topN = 0;
        // 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的
        for (LongWritable price : values) {
            topN++;
            if (topN > 2) {
                break;
            }
            // 注意: 此处的key每次输出都不一样
            context.write(key, NullWritable.get());
        }
    }
}
  • reduce操作: Key(OrderVo)相同的分为一组, 此处 OrderVo 作为key, 分组是根据 TopNGroupingComparator 来实现,即 订单编号 相同的认为一组.
  • 随着每次遍历, key的 orderId 是相同的(因为是根据这个分组的),但是里面的itemId和price是不同的

5.6 编写driver类

package com.huan.hadoop.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * @author huan.fu
 * @date 2023/7/13 - 14:29
 */
public class TopNDriver extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        // 构建配置对象
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new TopNDriver(), args);
        // 退出程序
        System.exit(status);
    }
    @Override
    public int run(String[] args) throws Exception {
        // 构建Job对象实例 参数(配置对象,Job对象名称)
        Job job = Job.getInstance(getConf(), "topN");
        // 设置mr程序运行的主类
        job.setJarByClass(TopNDriver.class);
        // 设置mr程序运行的 mapper类型和reduce类型
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        // 指定mapper阶段输出的kv数据类型
        job.setMapOutputKeyClass(OrderVo.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce阶段输出的kv数据类型,业务mr程序输出的最终类型
        job.setOutputKeyClass(OrderVo.class);
        job.setOutputValueClass(NullWritable.class);
        // 配置本例子中的输入数据路径和输出数据路径,默认输入输出组件为: TextInputFormat和TextOutputFormat
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 先删除输出目录(方便本地测试)
        FileSystem.get(this.getConf()).delete(new Path(args[1]), true);
        // 设置分组
        job.setGroupingComparatorClass(TopNGroupingComparator.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
}
  • 需要设置分组 job.setGroupingComparatorClass(TopNGroupingComparator.class);

5.7 运行结果

完整代码

https://gitee.com/huan1993/spring-cloud-parent/tree/master/hadoop/mr-topn-group

以上就是MapReduce实现TopN的效果的详细内容,更多关于MapReduce TopN效果的资料请关注脚本之家其它相关文章!

相关文章

  • JDK8新特性之判空遍历写法

    JDK8新特性之判空遍历写法

    这篇文章主要介绍了JDK8新特性之判空遍历写法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-10-10
  • Elasticsearch 计数分词中的token使用实例

    Elasticsearch 计数分词中的token使用实例

    这篇文章主要为大家介绍了Elasticsearch 计数分词中的token使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • SpringBoot中使用Redis的完整实例

    SpringBoot中使用Redis的完整实例

    这篇文章主要给大家介绍了关于SpringBoot中使用Redis的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-09-09
  • jdbc实现连接和增删改查功能

    jdbc实现连接和增删改查功能

    这篇文章主要为大家详细介绍了jdbc实现连接和基本的增删改查功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-02-02
  • 浅析Java ReentrantLock锁的原理与使用

    浅析Java ReentrantLock锁的原理与使用

    这篇文章主要为大家详细介绍了Java中ReentrantLock锁的原理与使用,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以了解下
    2023-08-08
  • Java 中的vector和list的区别和使用实例详解

    Java 中的vector和list的区别和使用实例详解

    在大家还没有了解vector,list,deque的知识之前,我先给大家介绍下stl,本文重点给大家介绍vector和list的区别及使用,感兴趣的的朋友一起看看吧
    2017-09-09
  • Collections工具类_动力节点Java学院整理

    Collections工具类_动力节点Java学院整理

    Collections工具类提供了大量针对Collection/Map的操作。这篇文章主要介绍了Collections工具类_动力节点Java学院整理,需要的朋友可以参考下
    2017-04-04
  • mybatis一对一查询功能

    mybatis一对一查询功能

    所谓的一对一查询,就是说我们在查询一个表的数据的时候,需要关联查询其他表的数据。这篇文章主要介绍了mybatis一对一查询功能,需要的朋友可以参考下
    2017-02-02
  • 基于SpringBoot构建电商秒杀项目代码实例

    基于SpringBoot构建电商秒杀项目代码实例

    这篇文章主要介绍了基于SpringBoot构建电商秒杀项目代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Springboot详解如何整合使用Thymeleaf

    Springboot详解如何整合使用Thymeleaf

    这篇文章主要分享了Spring Boot整合使用Thymeleaf,Thymeleaf是新一代的Java模板引擎,类似于Velocity、FreeMarker等传统引擎,关于其更多相关内容,需要的小伙伴可以参考一下
    2022-06-06

最新评论