Spark-UDF函数+SQL初步实践

在Spark中,也支持Hive中的自定义函数。自定义函数大致可以分为三种:

  • UDF(User-Defined-Function),即最基本的自定义函数,类似to_char,to_date等
  • UDAF(User- Defined Aggregation Funcation),用户自定义聚合函数,类似在group by之后使用的sum,avg等
  • UDTF(User-Defined Table-Generating Functions),用户自定义生成函数,有点像stream里面的flatMap

下面来看一个真实案例:我需要统计MongoDB-BuyPayGift 这张表中某天总充值额(所有realMoney相加);充值人数(uuid去重统计);新增付费人数(这些uuid在某日之前从来没付过费,今天付费了),根据unionId进行分组排序。表中所有数据字段结构如下:

结构解释
realMoney:模拟充值金额
uuid:模拟用户唯一ID
addRmb:充值的金币
totalPay:历史一共支付金额
unionId:渠道号
_date:日期

首先我们要先创建SparkSession,通过Session获取MongoDB-DataFrame,获得DF后通过withPipeline 过滤查询某日所有数据,如图我这里是2019-01-18,所以我这里先过滤数据,把不是今天数据过滤走(不推荐把数据直接filter,我们应该直接使用数据库过滤查询)

val buyPayGiftRdd = SparkMongoTool.getMongoRDD(session, args(1), "buyPayGift")
val buyPayGiftPip = buyPayGiftRdd.withPipeline(Seq(Document.parse("{ $match:{'_date':'2019-01-18 '}}")))

我们获取到buyPayGiftPip是过滤后2019-01-18所有的数据,下面我先对这组数据分组一下,这里可以执行Spark SQL 求出今天购买过所有人的UUID并且根据unionId 进行去重 (因为groupby后会生成一个unionId对应多个payuuids,然后就报错了…之后谷歌了一下,原来是time这一列在 group by的时候有多个查询结果,需要使用collect_set()一下)

select unionId,collect_set(distinct(uuid)) as payuuids from buyPayGift group by unionId

然后我们通过Show 方法打印 可以看到结构如下

然后我们现在知道今天每个unionId的支付过的ID了,下面我要知道历史的每个unionId支付过的用户列表,然后对比payuuids 求出他们的差集人数,就是今天新付费的用户了~我们先通过SQL语句+Map+GroupBy把历史的unionId和用户列表拿出来

db.getCollection(‘buyPayGift’).find({“_date”:{“$lt”:”2019-01-18″}})

这句话意思是取出2019-01-18前所有数据 建议大家加上project进行过滤~无需查询没必要的字段浪费网络传输

拿出来后,我们通过UDF函数对比Scala 2个Seq集合的差集

定义一个UDF函数:

def funIntersection(a: Seq[Long], b: Seq[Long]): Seq[Long] = (a.intersect(b))

注册UDF函数和使用:

import org.apache.spark.sql.functions._
val udfIntersection = udf(funIntersection _)
val resultDF = payUsersDF.join(unionPayUserDF, Seq("unionId"))
  .select(col("unionid"), col("uuids"), col("payuuids"),
    udfIntersection(col("uuids"), col("payuuids")))
对比结果

然后我们获取这个resultDF的列长度,就是新增付费用户了

val resultSizeDF = resultDF.map(x => (x.getString(0), x.getSeq(3).size)).toDF("unionId", "firstPay")

我们可以看看结果

对于数组为空的就是0,有则进行合并

我们还需要对其他字段进行一些统计,SparkSQL 默认提供了许多开窗函数

常用开窗函数:
1.为每条数据显示聚合信息.(聚合函数() over())
2.为每条数据提供分组的聚合函数结果(聚合函数() over(partition by 字段) as 别名)
–按照字段分组,分组后进行计算
3.与排名函数一起使用(row number() over(order by 字段) as 别名)
常用分析函数:(最常用的应该是1.2.3 的排序)
1、row_number() over(partition by … order by …)
2、rank() over(partition by … order by …)
3、dense_rank() over(partition by … order by …)
4、count() over(partition by … order by …)
5、max() over(partition by … order by …)
6、min() over(partition by … order by …)
7、sum() over(partition by … order by …)
8、avg() over(partition by … order by …)
9、first_value() over(partition by … order by …)
10、last_value() over(partition by … order by …)
11、lag() over(partition by … order by …)
12、lead() over(partition by … order by …)
lag 和lead 可以 获取结果集中,按一定排序所排列的当前行的上下相邻若干offset 的某个行的某个列(不用结果集的自关联);
lag ,lead 分别是向前,向后;
lag 和lead 有三个参数,第一个参数是列名,第二个参数是偏移的offset,第三个参数是 超出记录窗口时的默认值

下面我来演示一下具体如何使用Spark已经提供的函数,相信写过SQL语句小伙伴们对这条语句十分熟悉了

select unionId,sum(realMoney) as realMoney ,sum(addRmb) as addRmb,count(*) as payCount,count(distinct(uuid)) as userCount from buyPayGift group by unionId

最后和我们的刚刚的SQL大表生成数据和刚刚单独的数据JOIN一下即可

分别是unionID,总充值金额,总充值金币,支付次数,支付人数,首次支付人数

最后我们可以通过MongoSpark.save方法回写到指定的集合中

通过这次学习,知道了这些function + column自带 函数 + udf 可以满足你 99.99%的需求,剩下的0.01%,就是不适合用spark sql 来做了!建议大家多多浏览官方API,里面的函数 还是 要通篇读下,挨个试试,地址:
https://spark.apache.org/docs/2.3.2/api/java/org/apache/spark/sql/functions.html