spark dataframe全局排序id与分组后保留最大值行

 更新时间:2023年02月09日 08:49:25   作者:算法全栈之路  
这篇文章主要为大家介绍了spark dataframe全局排序id与分组后保留最大值行实现详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

正文

作为一个算法工程师,日常学习和工作中,不光要 训练模型关注效果 ,更多的 时间 是在 准备样本数据与分析数据 等,而这些过程 都与 大数据 spark和hadoop生态 的若干工具息息相关。

今天我们就不在更新 机器学习算法模型 相关的内容,分享两个 spark函数 吧,以前也在某种场景中使用过但没有保存收藏,哎!! 事前不搜藏,临时抱佛脚 的感觉 真是 痛苦,太耽误干活了

so,把这 两个函数 记在这里 以备不时 之需~

(1) 得到 spark dataframe 全局排序ID

这个函数的 应用场景 就是:根据某一列的数值对 spark 的 dataframe 进行排序, 得到全局多分区排序的全局有序ID,新增一列保存这个rank id ,并且保留别的列的数据无变化

有用户会说,这不是很容易吗 ,直接用 orderBy 不就可以了吗,但是难点是:orderBy完记录下全局ID 并且 保持原来全部列的DF数据

多说无益,遇到这个场景 直接copy 用起来 就知道 有多爽 了,同类问题 我们可以 用下面 这个函数 解决 ~

scala 写的 spark 版本代码:

def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String ="rank_id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
        ++ df.schema.fields ++
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  )
}

函数调用我们可以用这行代码调用: val ranked_df = dfZipWithIndex(raw_df.orderBy($"predict_score".desc)), 直接复制过去就可以~

python写的 pyspark 版本代码:

from pyspark.sql.types import LongType, StructField, StructType
def dfZipWithIndex (df, offset=1, colName="rank_id"):
    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )
    zipped_rdd = df.rdd.zipWithIndex()
    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row)))
    return spark.createDataFrame(new_rdd, new_schema)

调用 同理 , 这里我就不在进行赘述了。

(2)分组后保留最大值行

这个函数的 应用场景 就是: 当我们使用 spark 或则 sparkSQL 查找某个 dataframe 数据的时候,在某一天里,任意一个用户可能有多条记录,我们需要 对每一个用户,保留dataframe 中 某列值最大 的那行数据

其中的 关键点 在于:一次性求出对每个用户分组后,求得每个用户的多行记录中,某个值最大的行进行数据保留

当然,经过 简单修改代码,不一定是最大,最小也是可以的,平均都ok

scala 写的 spark 版本代码:

// 得到一天内一个用户多个记录里面时间最大的那行用户的记录
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions
val w = Window.partitionBy("user_id")
val result_df = raw_df
    .withColumn("max_time",functions.max("time").over(w))
    .where($"time" === $"max_time")
    .drop($"max_time")

python写的 pyspark 版本代码:

# pyspark dataframe 某列值最大的元素所在的那一行 
# GroupBy 列并过滤 Pyspark 中某列值最大的行 
# 创建一个Window 以按A列进行分区,并使用它来计算每个组的最大值。然后过滤出行,使 B 列中的值等于最大值 
from pyspark.sql import Window
w = Window.partitionBy('user_id')
result_df = spark.sql(raw_df).withColumn('max_time', fun.max('time').over(w))\
    .where(fun.col('time') == fun.col('time'))
    .drop('max_time')

我们可以看到: 这个函数的关键就是运用了 spark 的 window 函数 ,灵活运用 威力无穷 哦 !

到这里,spark利器2函数之dataframe全局排序id与分组后保留最大值行 的全文 就写完了 ,更多关于spark dataframe全局排序的资料请关注脚本之家其它相关文章!

相关文章

  • python如何正确使用yield

    python如何正确使用yield

    在 Python 开发中,yield 关键字的使用其实较为频繁,例如大集合的生成,简化代码结构、协程与并发都会用到它。但是,你是否真正了解 yield 的运行过程呢?这篇文章,我们就来看一下 yield 的运行流程,以及在开发中哪些场景适合使用yield
    2021-05-05
  • 使用python在校内发人人网状态(人人网看状态)

    使用python在校内发人人网状态(人人网看状态)

    人人网怎么发状态?下面使用python实现这个功能,大家参考使用吧
    2014-02-02
  • Python networkx中获取图的邻接矩阵方式

    Python networkx中获取图的邻接矩阵方式

    这篇文章主要介绍了Python networkx中获取图的邻接矩阵方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • pandas缺失值np.nan, np.isnan, None, pd.isnull, pd.isna

    pandas缺失值np.nan, np.isnan, None, pd.isnull,&n

    本文主要介绍了pandas缺失值np.nan, np.isnan, None, pd.isnull, pd.isna
    2024-04-04
  • Python实现批量转换文件编码的方法

    Python实现批量转换文件编码的方法

    这篇文章主要介绍了Python实现批量转换文件编码的方法,涉及Python针对文件的遍历及编码转换实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-07-07
  • pycharm重置设置,恢复默认设置的方法

    pycharm重置设置,恢复默认设置的方法

    今天小编就为大家分享一篇pycharm重置设置,恢复默认设置的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-10-10
  • OpenCV实战之实现手势虚拟缩放效果

    OpenCV实战之实现手势虚拟缩放效果

    本篇将会以HandTrackingModule为模块,实现通过手势对本人的博客海报进行缩放。文中的示例代码讲解详细,具有一定的借鉴价值,需要的可以参考一下
    2022-11-11
  • python基础教程之udp端口扫描

    python基础教程之udp端口扫描

    开发一个程序,用于获取局域网中开启snmp服务的主机ip地址列表,并写入相应文件以便其它程序使用。下面是实现方法
    2014-02-02
  • Python协程的四种实现方式总结

    Python协程的四种实现方式总结

    今天继续给大家介绍Python关知识,本文主要内容是Python协程的四种实现方式。文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-01-01
  • 使用PyV8在Python爬虫中执行js代码

    使用PyV8在Python爬虫中执行js代码

    PyV8是chrome用来执行javascript的引擎,据说是最快的js引擎,通过pyv8的封装,可以在python中使用。下面这篇文章主要介绍了使用PyV8在Python爬虫中执行js代码的相关资料,需要的朋友可以参考下。
    2017-02-02

最新评论