详解Spark Sql在UDF中如何引用外部数据

 更新时间:2023年02月01日 11:02:15   作者:KYs_Daddy  
这篇文章主要为大家介绍了详解Spark Sql在UDF中如何引用外部数据示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

前言

Spark Sql可以通过UDF来对DataFrame的Column进行自定义操作。在特定场景下定义UDF可能需要用到Spark Context以外的资源或数据。比如从List或Map中取值,或是通过连接池从外部的数据源中读取数据,然后再参与Column的运算。

Excutor中每个task的工作线程都会对UDF的call进行调用,外部资源的使用发生在Excutor端,而资源加载既能发生在Driver端,也可以发生在Excutor端。如果外部资源对象能序列化,我们可以在Driver端进行初始化,然后广播(broadcast)到Excutor端参与运算。对于不能进行序列化的对象,如JedisPool(redis连接池),只能在Excutor端进行初始化。

因此,在UDF中引用外部资源有以下两类方法:

  • 能序列化:在Driver端进行初始化,然后通过spark的broadcast方法广播到Excutor上进行使用;
  • 不能序列化:在Excutor端进行初始化然后使用。

下面我们将用一个实际例子对上述两种方法进行详细介绍。

本文使用环境:Spark-2.3.0,Java 8。

场景介绍

我们以一个DataFrame(两个字段node_1、node_2)作为原始数据;一棵二叉搜索树(BST)作为Spark外部被引用数据;目标是定义一个UDF来判断:BST中是否刚好存在一个父节点,它的左右子节点值与node_1、node_2两个字段值相同。然后将判断结果输出到新列is_bro。其中DataFrame:

BST:

输出DataFrame:

二叉树的定义与判断是否为父节点的左右子节点的逻辑如下:

import java.io.Serializable;
/**
 * @author wangjiahui
 * @create 2021-03-14-10:57
 */
public class TreeNode implements Serializable{
    private Integer val;
    private TreeNode left;
    private TreeNode right;
    public TreeNode() {
    }
    public TreeNode(Integer val) {
        this.val = val;
    }
    public TreeNode(Integer val, TreeNode left, TreeNode right) {
        this.val = val;
        this.left = left;
        this.right = right;
    }
    public Integer getVal() {
        return val;
    }
    public void setVal(Integer val) {
        this.val = val;
    }
    public TreeNode getLeft() {
        return left;
    }
    public void setLeft(TreeNode left) {
        this.left = left;
    }
    public TreeNode getRight() {
        return right;
    }
    public void setRight(TreeNode right) {
        this.right = right;
    }
    /**
     * 判断是否刚好有一个父节点的左、右子节点值与num1、num2相同
     * @param num1
     * @param num2
     * @return
     */
    public Boolean isBro( Integer num1, Integer num2) {
        if (null == getLeft()||null == getRight()) {
            return false;
        }
        if (getLeft().getVal().compareTo(num1)==0 && getRight().getVal().compareTo(num2)==0) {
            return true;
        }
        return getLeft().isBro(num1, num2) || getRight().isBro(num1, num2);
    }
}

生成上图所示BST的方法createTree()如下:

public static TreeNode createTree(){
    TreeNode[] treeNodes = new TreeNode[8];
    for(int i=1; i<=7; i++){
        treeNodes[i] =  new TreeNode(i);
    }
    treeNodes[2].setLeft(treeNodes[1]);
    treeNodes[2].setRight(treeNodes[3]);
    treeNodes[6].setLeft(treeNodes[5]);
    treeNodes[6].setRight(treeNodes[7]);
    treeNodes[4].setLeft(treeNodes[2]);
    treeNodes[4].setRight(treeNodes[6]);
    return treeNodes[4];
}

方法一 Driver端加载

在Driver端完成初始化并定义UDF

JavaSparkContext javaSparkContext = new JavaSparkContext(spark.sparkContext());
//  初始化树
TreeNode tree = createTree();
//  broadcast
Broadcast<TreeNode> broadcastTree = javaSparkContext.broadcast(tree);
//  lambda表达式定义udf
UserDefinedFunction udf = functions.udf((Integer num1, Integer num2) -> {
    return broadcastTree.getValue().isBro(num1,num2);
}, BooleanType);
//  注册udf
spark.udf().register("isBro",udf);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

方法二 Excutor端加载

如果我们直接在call中进行初始化会存在问题:由于多个task的线程会在同一时刻对UDF中的call进行调用,导致资源对象在同一时刻被初始化多次,造成Excutor内存资源浪费。此外,如果外部资源为连接池对象,在同一时刻初始化多次会建立多个连接,增加外部数据源的访问压力。

为此,我们可以借助单例模式中的懒汉式实现,让资源在每个Excutor中只被初始化一次。懒汉式的实现需要新建一个类(命名为IsBroUDF2)并实现UDF2<Integer, Integer, Boolean>接口,重写UDF2的call方法:

import org.apache.spark.sql.api.java.UDF2;
/**
 * @author wangjiahui
 * @create 2021-03-14-14:25
 */
public class IsBroUDF2 implements UDF2<Integer,Integer,Boolean> {
    // 定义静态的TreeNode成员变量
    private static volatile TreeNode treeNode;
    public IsBroUDF2() {
    }
    @Override
    public Boolean call(Integer num1, Integer num2) throws Exception {
//        懒汉式 二次判定
        if(null==treeNode){
            synchronized (IsBroUDF2.class){
                if(null==treeNode){
                    treeNode=createTree();
                }
            }
        }
        return treeNode.isBro(num1,num2);
    }
    // 辅助方法
    public static TreeNode createTree(){
        TreeNode[] treeNodes = new TreeNode[8];
        for(int i=1; i<=7; i++){
            treeNodes[i] =  new TreeNode(i);
        }
        treeNodes[2].setLeft(treeNodes[1]);
        treeNodes[2].setRight(treeNodes[3]);
        treeNodes[6].setLeft(treeNodes[5]);
        treeNodes[6].setRight(treeNodes[7]);
        treeNodes[4].setLeft(treeNodes[2]);
        treeNodes[4].setRight(treeNodes[6]);
        return treeNodes[4];
    }
}

然后注册和使用UDF

//  注册udf
spark.udf().register("isBro",new IsBroUDF2(), BooleanType);
//  使用udf
df = df.withColumn("is_bro",functions.expr("isBro(node_1, node_2)"));

在call方法中通过加锁可以实现TreeNode资源在同一个Excutor中只被初始化一次。除了上面介绍的这种懒汉式的写法之外,还可以通过静态内部类懒加载、枚举等方式实现TreeNode资源在Excutor端只被初始化一次。

小结

想要在Spark Sql的UDF中使用Spark外的资源和数据进行运算,我们既可以在Driver端预先进行初始化然后广播到各Excutor上(要求对象能序列化),也可以直接在Excutor端进行加载;如果在Excutor端加载要保证外部资源对象只被初始化一次。

以上就是详解Spark Sql在UDF中如何引用外部数据的详细内容,更多关于Spark Sql UDF引用外部数据的资料请关注脚本之家其它相关文章!

相关文章

  • 详解java中DelayQueue的使用

    详解java中DelayQueue的使用

    这篇文章主要介绍了java中DelayQueue的使用,帮助大家更好的理解和学习Java,感兴趣的朋友可以了解下
    2020-10-10
  • 在Java的Struts中判断是否调用AJAX及用拦截器对其优化

    在Java的Struts中判断是否调用AJAX及用拦截器对其优化

    这篇文章主要介绍了在Java的Struts中判断是否调用AJAX及用拦截器对其优化的方法,Struts框架是Java的SSH三大web开发框架之一,需要的朋友可以参考下
    2016-01-01
  • selenium+java环境搭建过程推荐

    selenium+java环境搭建过程推荐

    这篇文章主要介绍了selenium+java环境搭建过程推荐,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • java实现文件复制上传操作

    java实现文件复制上传操作

    这篇文章主要为大家详细介绍了java实现文件复制上传操作,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2016-11-11
  • Mybatis如何按顺序查询出对应的数据字段

    Mybatis如何按顺序查询出对应的数据字段

    这篇文章主要介绍了Mybatis如何按顺序查询出对应的数据字段,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • Java8中CompletableFuture的用法全解

    Java8中CompletableFuture的用法全解

    这篇文章主要给大家介绍了关于Java8中CompletableFuture用法的相关资料,文中通过实例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2022-01-01
  • Springboot自动扫描包路径来龙去脉示例详解

    Springboot自动扫描包路径来龙去脉示例详解

    这篇文章主要介绍了Springboot自动扫描包路径来龙去脉示例详解,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-12-12
  • 汇总java调用python方法

    汇总java调用python方法

    这篇文章主要为大家详细介绍了java调用python的方法,文章中介绍了三种java调用python方法,感兴趣的朋友可以参考一下
    2016-02-02
  • 基于注解的springboot+mybatis的多数据源组件的实现代码

    基于注解的springboot+mybatis的多数据源组件的实现代码

    这篇文章主要介绍了基于注解的springboot+mybatis的多数据源组件的实现,会使用到多个数据源,文中通过代码讲解的非常详细,需要的朋友可以参考下
    2021-04-04
  • 解决Springboot中Feignclient调用时版本问题

    解决Springboot中Feignclient调用时版本问题

    这篇文章主要介绍了解决Springboot中Feign client调用时版本问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03

最新评论