Scala实现Spark环境下的决策树模型工程

一、项目背景

在集团数据挖掘项目下,用户画像工程需要开发很多基础标签,如年龄、性别、婚姻状况、资产情况等标签。这些标签不能使用集团下的各个数据源来简单的处理后,得到结果作为最终评价结果,因为不同数据源的数据质量参差不齐,且各个数据源得到的结果置信度情况也不一样。
因此我们需要使用决策树+XGBoost等模型来综合预测用户标签。比如年龄段标签的决策树就如下图所示:

二、模型抽象

一开始,我准备参照机器学习的PMML来实现决策树的抽象功能,但是看了一整天的PMML官方文档和一大堆PMML相关的文章后,我发现这种方案对我来实现起来太过复杂。机器学习的PMML文件大多都是模型代码自动生成的,不适合我们这种场景(我们是要自己实现把决策树转换为PMML文件)。如果想要我们来写生成PMML文件的代码,那么需要精通PMML的各类标签定义,以及在spark环境下怎么加载该文件,以及如何保证加载的Model是我们想要的逻辑,这里实现起来,后续工作可能很复杂。
没办法,我只能思考在我们自己的需求背景下,怎么自己写代码来实现决策树模型的抽象功能。
它至少包括以下几个难点:

一、如何抽象决策树
二、如何设计类似PMML的配置文件
三、如何解析配置文件
四、如何转换成spark可执行的代码

接下来上正餐~

三、功能实现

正餐~
由上边几个案例分析得知,内存模型很适合使用二叉树来表达,而配置文件可以使用xml或者json来表达,xml文件可以使用xml工具类来解析,json可以使用scala的json4s或者阿里的fastJson来解析,这里我使用json配置文件,使用比较熟悉的json4s来解析json配置文件。接下来我的思路就很清晰,实现步骤包括:

1、把json配置文件解析成一棵二叉树
2、根据json中的表达式来绑定二叉树各节点的数据
3、将绑定好数据的树转换成spark物理执行计划
4、提交到spark集群执行,将结果输出到执行位置

流程图如下:

Main方法类如下:

package com.yang
import com.yang.decision.Source
import com.yang.decision.build.{Binder, Executor, Parser}
import com.yang.utils.{FileUtils, SparkUtils}
import org.slf4j.LoggerFactory
/**
 * 决策树模型抽象工程Application入口类
 * 1、解析入参args,获取执行日期(processDay)、
 * 配置文件路径(configFilePath)、并行度(parallelism)
 * 2、从HDFS读取配置文件,解析字段(schema)、
 * 数据源Class(source)、输出表(output)、规则集合(rules)
 * 3、将rules并解析为一棵二叉树,同时绑定树节点规则
 * 4、遍历解析后的二叉树(parsedTree),根据节点规则绑定各节点的数据集(dataFrame),
 * 生成绑定数据后的二叉树(boundTree)
 * 5、遍历全部叶子节点(Calculation),叶子节点的数据即为不同规则下的结果集,
 * 合并(union)全部叶子节点数据后,即为最终结果集
 * 执行时会输出parsed tree、bound tree以及physical plan
 * 使用者需要提供两点:
 * 1、【配置文件】,格式参照:AgeGroupRule.json
 * 2、【自定义数据源】,需要实现[[com.yang.decision.Source]]抽象类
 * 如果有新的模型需求,需要实现[[com.yang.decision.model.Model]]抽象类,来扩展决策树的模型类别
 *
 * @author yangfan
 * @since 2021/3/18
 * @version 1.0.0
 */
object DecisionTreeApplication {
 private final val logger = LoggerFactory.getLogger(this.getClass)
 private def checkInputArgs(args: Array[String]): (String, String, Int) = {
 if (null == args || args.length != 3) {
 throw new IllegalArgumentException(
 s"Input args exception, args length mast be 3 but get `${args.mkString(",")}` !")
 }
 logger.error("== args ==\n{}", args.mkString(","))
 (args(0), args(1), args(2).toInt)
 }
 def main(args: Array[String]): Unit = {
 /**
 * processDay:执行日期
 * configFilePath:配置文件路径
 * parallelism:并行度
 */
 val (processDay, configFilePath, parallelism) = checkInputArgs(args)
 //val configText = FileUtils.readFromLocal(configFilePath)
 val configText = FileUtils.readFromHDFS(configFilePath)
 logger.error("== config text ==\n{}", configText)
 /**
 * 解析Json为一棵二叉树,同时绑定树节点规则
 */
 val (sc, parsedTree) = Parser.parse(configText)
 logger.error("== schema and source ==\n{}", sc)
 logger.error("== parsed tree ==\n{}", parsedTree)
 val spark = SparkUtils.initSession(isLocal = false, this.getClass.getSimpleName)
 /**
 * 先序遍历parsedTree,根据节点规则绑定子节点的数据集,生成绑定后的树boundTree
 */
 val boundTree = Binder.bind(parsedTree,
 Source.loadData(sc.sourceClass, spark, processDay, parallelism))
 logger.error("== bound tree ==\n{}", boundTree)
 /**
 * 遍历全部叶子节点Calculation,叶子节点的数据即为不同规则下的结果集,
 * union全部叶子节点数据后,即为最终结果集
 */
 val df = Executor.execute(boundTree)
 df.explain(extended = true)
 //df.show()
 Executor.executeAndSaveResultData(spark, sc, processDay, boundTree)
 spark.close()
 }
}

第一步:

怎样把一个json配置文件解析成一棵二叉树作为内存模型,这部分比较简单。
先看一个简单的json配置文件:

{
 "schema": "id, jr_age, xinan_age, jl_age, umc_age, yc_age, vector",
 "source": "com.yang.decision.source.AgeGroupSource",
 "output": "hdp_jinrong_tech_ods.ods_decision_tree_output_data, age_group",
 "rules": {
 "rule": "jr_age != null",
 "left": {
 "rule": "GeneralRule, jr_age, 1.0"
 },
 "right": {
 "rule": "xinan_age != null",
 "left": {
 "rule": "GeneralRule, xinan_age, 0.997349184"
 },
 "right": {
 "rule": "jl_age != null || umc_age != null || yc_age != null",
 "left": {
 "rule": "jl_age == umc_age || jl_age == yc_age || umc_age == yc_age",
 "left": {
 "rule": "jl_age != null",
 "left": {
 "rule": "GeneralRule, jl_age, 0.992448605"
 },
 "right": {
 "rule": "GeneralRule, umc_age, 0.992448605"
 }
 },
 "right": {
 "rule": "jl_age != null",
 "left": {
 "rule": "GeneralRule, jl_age, 0.982582546"
 },
 "right": {
 "rule": "umc_age != null",
 "left": {
 "rule": "GeneralRule, umc_age, 0.974128879"
 },
 "right": {
 "rule": "GeneralRule, yc_age, 0.920175899"
 }
 }
 }
 },
 "right": {
 "rule": "vector != null",
 "left": {
 "rule": "ModelRule, com.yang.decision.model.XGBoostPipelineModel, /home/hdp_jinrong_tech/resultdata/huangren/feature_mining/age/age-applist-xgb-0901, null"
 }
 }
 }
 }
 }
}

解析配置文件,直接上代码:

package com.yang.decision.build
import com.yang.decision.ModelProperties
import com.yang.decision.tree._
import org.apache.spark.sql.DataFrame
import org.json4s.jackson.JsonMethods
import org.json4s.{DefaultFormats, JValue}
/**
 * 解析器:将配置文件解析成一棵二叉树
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
object Parser {
 implicit val formats: DefaultFormats.type = DefaultFormats
 def parse(text: String): (ModelProperties, Tree[DataFrame]) = try {
 val jv = JsonMethods.parse(text)
 val schema = (jv \ "schema").extract[String]
 val arr = schema.split(",").map(_.trim)
 val source = (jv \ "source").extract[String].trim
 val output = (jv \ "output").extract[String].split(",").map(_.trim)
 val rules = jv \ "rules"
 (ModelProperties(arr, source, output), parse2Tree(rules, null))
 } catch {
 case _: Exception =>
 throw new RuntimeException(
 s"`parse` method Exception, config text parse failed !")
 }
 def parse2Tree(jv: JValue,
 parent: Tree[DataFrame]): Tree[DataFrame] = parse2Node(jv) match {
 case (rule, l, r) if null == l && null == r =>
 new Calculation(parent, rule)
 case (rule, l, r) =>
 new Condition(parent, rule,
 parse2Tree(l, null), parse2Tree(r, null))
 case _ => new Calculation(parent, null)
 }
 def parse2Node(jv: JValue): (String, JValue, JValue) = jv match {
 case _: JValue => (
 try {
 (jv \ "rule").extract[String]
 } catch {
 case _: Exception =>
 throw new RuntimeException(
 s"`parse2Node` method Exception, `rule` mast not be null !")
 },
 if (containsKey(jv, "left")) jv \ "left" else null,
 if (containsKey(jv, "right")) jv \ "right" else null
 )
 case _ => null
 }
 def containsKey(jv: JValue, key: String): Boolean = {
 jv.findField(_._1.equals(key)).nonEmpty
 }
}

上边,parse(text: String): (ModelProperties, Tree[DataFrame])方法把schema、source、output、rules等解析出来,此时rules还是一个JValue类型的对象,并不是Tree。所以针对rules,需要调用parse2Tree(jv: JValue, parent: Tree[DataFrame]): Tree[DataFrame]方法将JValue对象解析成Tree,解析方式是递归遍历,深度优先遍历。解析完成后,我把这个Tree标识为ParsedTree(解析后的Tree)
下边展示解析前后的局部对比图:

第二步:

经过第一步,得到的二叉树模型(parsedTree)是这样的:

== schema and source ==
schema: id,jr_age,xinan_age,jl_age,umc_age,yc_age,vector
source: com.bj58.decision.source.AgeGroupSource
output: hdp_jinrong_tech_ods.ods_decision_tree_output_data,age_group
== parsed tree ==
Branch[DataFrame]: {
 DataFrame: null
 Condition: jr_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: jr_age, confidence: 1.0]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: xinan_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: xinan_age, confidence: 0.997349184]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: jl_age != null || umc_age != null || yc_age != null
 LeftNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: jl_age == umc_age || jl_age == yc_age || umc_age == yc_age
 LeftNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: jl_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: jl_age, confidence: 0.992448605]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: umc_age, confidence: 0.992448605]
 }
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: jl_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: jl_age, confidence: 0.982582546]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: umc_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: umc_age, confidence: 0.974128879]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: null
 GeneralRule[fieldName: yc_age, confidence: 0.920175899]
 }
 }
 }
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: null
 Condition: vector != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: null
 ModelRule[modelClass: com.bj58.decision.model.XGBoostPipelineModel, modelFilePath: /home/hdp_jinrong_tech/resultdata/huangren/feature_mining/age/age-applist-xgb-0901, config: null]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: null
 null
 }
 }
 }
 }
}

那么如何绑定各节点数据呢?
这部分比较复杂,先展示绑定数据前后的局部对比图:

可以看到,每个节点都已经绑定好了该节点自身需要的数据集(注:这里的数据集是一个DataFrame数据抽象,而不是数据本身,懂Spark RDD的都懂,就不多说了~)。
那么这一步是怎么做的呢?
首先要知道二叉树的节点分为分支节点(Branch)和叶子节点(Leaf),根据实际情况,我发现在我们这个场景下,所有的分支节点(Branch)都存在条件表达式,表达式的结果表示一条数据要去左节点还是右节点,而所有的叶子节点(Leaf)都是需要根据规则计算的节点。于是我定义了两个类:Condition、Calculation。Condition表示条件节点,继承自Branch。Calculation表示计算节点,继承自Leaf。
这里为什么要让Condition、Calculation去实现Branch、Leaf,而不是直接去实现Tree呢?在跟老大沟通时问了我这样一个问题,我当时想了想,好像确实没什么必要多这一层。但当我反过来重新看了下SparkSql源码时,我认为这样是没毛病的。SparkSql源码在对TreeNode进行实现时,也有UnaryNode、BinaryNode等抽象类,在此之下才是Join、CoGroup、SetOperation等真正的实现类。
还有就是对数据绑定这部分逻辑有疑问的,请移步去看SparkSql源码org.apache.spark.sql.execution.QueryExecution类对于Unresolved Logical Plan->Resolved Logical Plan的实现,实际它就是Sql语句经过Antlr4解析,生成Unresolved Logical Plan,然后analyzer与catalog进行绑定(catlog存储元数据),生成Resolved Logical Plan。我的代码实现就是借鉴此处。
上代码:
1、绑定过程:

package com.yang.decision.build
import com.yang.decision.tree._
import org.apache.spark.sql.DataFrame
/**
 * 绑定器:绑定节点数据
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
object Binder {
 def bind(parsedTree: Tree[DataFrame], dataFrame: DataFrame): Tree[DataFrame] = {
 parsedTree.data = dataFrame
 bindChild(parsedTree)
 }
 def bindChild(parsedTree: Tree[DataFrame]): Tree[DataFrame] = parsedTree match {
 case leaf: Calculation if null != leaf.rule =>
 leaf.data = leaf.rule.execute(leaf.data)
 leaf
 case branch: Condition =>
 branch.left.data = branch.leftDF
 branch.right.data = branch.rightDF
 bindChild(branch.left)
 bindChild(branch.right)
 branch
 case other@_ => other
 }
}

从上边可以看到,绑定的过程分为以下几步:

1、传入解析后的二叉树parsedTree,和根节点的数据集rootDataFrame
2、绑定根节点的数据集parsedTree.data = dataFrame
3、递归遍历左右子节点,绑定各子节点的数据集
 如果该节点是分支节点(Condition),则根据条件表达式分别绑定左右节点的数据集,
 再递归绑定左右节点及其子节点的数据集,返回该节点:
 branch.left.data = branch.leftDF
 branch.right.data = branch.rightDF
 如果该节点是叶子节点(Calculation),则根据计算规则rule,得到该节点的数据集,返回该节点:
 leaf.data = leaf.rule.execute(leaf.data)
4、如果该节点既不是分支节点,也不是叶子节点,则不做绑定直接返回该节点

2、二叉树的抽象

package com.yang.decision.tree
/**
 * 二叉树抽象
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
abstract class Tree[T](parent: Tree[T]) extends Serializable {
 var data: T = _
 /**
 * 节点个数
 */
 def nodeNumber: Int = go(this, 0)
 private def go[A](tree: Tree[A], t: Int): Int = tree match {
 case Leaf(_) => t + 1
 case Branch(_, left, right) => t + 1 + go(left, 0) + go(right, 0)
 case _ => 0
 }
 /**
 * 深度值
 */
 val depth: Int = deepFirstSearch(this)(_ => 1)(_ max _ + 1)
 /**
 * 转换操作
 */
 def map[A, B](tree: Tree[A])(f: A => B): Tree[B] =
 deepFirstSearch(tree)(x => f(x))((a, b) => Branch(a, b))
 /**
 * 深度优先遍历
 */
 def deepFirstSearch[A, B](tree: Tree[A])(f: A => B)(g: (B, B) => B): B = tree match {
 case l@Leaf(_) => f(l.data)
 case b@Branch(_, left, right) =>
 g(deepFirstSearch(left), deepFirstSearch(right))
 f(b.data)
 }
 override def toString: String = toString(0)
 def toString(depth: Int): String = super.toString
}
/**
 * 叶子节点
 *
 * @since 2021/3/16
 * @version 1.0.0
 */
case class Leaf[T](parent: Tree[T]) extends Tree[T](parent)
/**
 * 分支节点
 *
 * @since 2021/3/16
 * @version 1.0.0
 */
case class Branch[T](parent: Tree[T], left: Tree[T], right: Tree[T]) extends Tree[T](parent)

3、条件节点的实现:

package com.yang.decision.tree
import com.ql.util.express.{DefaultContext, ExpressRunner}
import com.yang.express.EqualsOperator
import org.apache.spark.sql.{DataFrame, Row}
/**
 * 条件节点
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
class Condition(parent: Tree[DataFrame],
 ruleText: String,
 left: Tree[DataFrame],
 right: Tree[DataFrame])
 extends Branch[DataFrame](parent, left, right) {
 lazy val runner: ExpressRunner = {
 val runner = new ExpressRunner()
 /**
 * 修改原表达式 `==` 的判断逻辑
 * 修改前:null == null 返回:true
 * 修改后:null == null 返回:false
 */
 runner.replaceOperator("==", new EqualsOperator())
 runner
 }
 lazy val fields: Array[String] = runner.getOutVarNames(ruleText)
 lazy val leftDF: DataFrame = data.filter(r => isLeft(r))
 lazy val rightDF: DataFrame = data.filter(!isLeft(_))
 def isLeft(row: Row): Boolean = {
 val context = new DefaultContext[String, AnyRef]
 fields.foreach(field =>
 context.put(field, row.getAs(field).asInstanceOf[AnyRef])
 )
 try {
 runner.execute(ruleText, context,
 null, true, false)
 .asInstanceOf[Boolean]
 } catch {
 case _: Exception =>
 throw new RuntimeException(
 s"`isLeft` method Exception, condition expression compute failed !")
 }
 }
 override def toString(depth: Int): String = {
 val prefix = Seq.fill(depth)('\t').mkString
 s"""Branch[DataFrame]: {
 |${prefix + '\t'}DataFrame: $data
 |${prefix + '\t'}Condition: $ruleText
 |${prefix + '\t'}LeftNode: ${if (null == left) "null" else left.toString(depth + 1)}
 |${prefix + '\t'}RightNode: ${if (null == right) "null" else right.toString(depth + 1)}
 |$prefix}""".stripMargin
 }
}

4、计算节点的实现:

package com.yang.decision.tree
import com.yang.decision._
import org.apache.spark.sql.DataFrame
/**
 * 计算节点
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
class Calculation(parent: Tree[DataFrame],
 ruleText: String)
 extends Leaf[DataFrame](parent) {
 val rule: Rule = {
 if (null == ruleText) {
 null
 } else {
 val arr: Array[String] = ruleText.split(",").map(_.trim)
 arr(0) match {
 case "GeneralRule" => GeneralRule(arr(1), arr(2).toDouble)
 case "ModelRule" => ModelRule(arr(1), arr(2), null)
 case par@_ =>
 throw new RuntimeException(s"`Calculation` class Exception, " +
 s"`rule` mast be `GeneralRule` or `ModelRule` but get `$par` !")
 }
 }
 }
 override def toString(depth: Int): String = {
 val prefix = Seq.fill(depth)('\t').mkString
 s"""Leaf[DataFrame]: {
 |${prefix + '\t'}DataFrame: $data
 |${prefix + '\t'}$rule
 |$prefix}""".stripMargin
 }
}

这里,计算节点的rule对象是一个重要的对象,它表示这个节点在需要计算的时候,要根据什么规则来计算得到结果集。它返回的是一个Rule对象。Rule类是一个抽象类,里边只有一个方法execute(data: DataFrame): DataFrame,当该节点提交到集群上边后,真正执行的计算的就是这个方法。目前实现的计算规则有两个,第一个是普通计算,第二个是模型计算。代码如下:

package com.yang.decision
import com.yang.decision.model.Model
import org.apache.spark.sql.{DataFrame, Encoders}
/**
 * 计算规则抽象
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
trait Rule {
 def execute(data: DataFrame): DataFrame
}
/**
 * 普通计算
 *
 * @since 2021/3/16
 * @version 1.0.0
 */
case class GeneralRule(fieldName: String, confidence: Double) extends Rule {
 override def execute(data: DataFrame): DataFrame = {
 data.map(row =>
 OutputData(
 row.getAs[String]("id"),
 row.getAs[String](fieldName),
 confidence
 )
 )(Encoders.product[OutputData]).toDF()
 }
 override def toString: String = s"GeneralRule[fieldName: $fieldName, confidence: $confidence]"
}
/**
 * 模型计算
 *
 * @since 2021/3/16
 * @version 1.0.0
 */
case class ModelRule(modelClass: String,
 modelFilePath: String,
 config: Map[String, String]) extends Rule {
 lazy val model: Model = Class.forName(modelClass)
 .getConstructor(classOf[String], classOf[Map[String, String]])
 .newInstance(modelFilePath, config)
 .asInstanceOf[Model]
 override def execute(data: DataFrame): DataFrame = model.execute(data)
 override def toString: String = s"ModelRule[modelClass: $modelClass, " +
 s"modelFilePath: $modelFilePath, config: $config]"
}

而这里的模型计算规则中,可以看到调用的是model.execute(data)方法返回结果集,因此真正实现计算逻辑的是各个模型,比如XGboost模型、其他模型等。所以这里的model对象Model仍然是一个抽象类,当该节点想要根据不同的模型去计算时,需要实现Model类的execute(data: DataFrame): DataFrame方法,这部分代码如下:
1、模型抽象类(有两个需要实现的属性方法,val model: Any表示模型对象,def execute(data: DataFrame): DataFrame表示模型计算的逻辑):

package com.yang.decision.model
import org.apache.spark.sql.DataFrame
/**
 * 模型抽象
 *
 * @author yangfan
 * @since 2021/3/8
 * @version 1.0.0
 */
abstract class Model(modelFilePath: String,
 config: Map[String, String]) extends Serializable {
 val model: Any
 def execute(data: DataFrame): DataFrame
}

2、XGBoost模型的实现类:

package com.yang.decision.model
import com.yang.decision.OutputData
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.sql.{DataFrame, Encoders}
import scala.collection.mutable
/**
 * XGBoost模型
 *
 * @author yangfan
 * @since 2021/3/10
 * @version 1.0.0
 */
class XGBoostPipelineModel(modelFilePath: String,
 config: Map[String, String])
 extends Model(modelFilePath, config) {
 override lazy val model: PipelineModel = PipelineModel.load(modelFilePath)
 override def execute(data: DataFrame): DataFrame = {
 if (null == model)
 throw new NullPointerException(
 s"`execute` method exception, `model` mast not be null !")
 if (null == data)
 throw new NullPointerException(
 s"`execute` method exception, `data` mast not be null !")
 val labeledData = data.map(l => {
 LabeledPoint(0d,
 Vectors.dense(
 l.getAs[mutable.WrappedArray[Double]]("vector").toArray
 ),
 l.getAs[String]("id"))
 })(Encoders.product[LabeledPoint])
 model.transform(labeledData)
 .select("id", "probability", "predictedLabel")
 .map(r => {
 val v = r.getAs[Vector](1)
 OutputData(r.getString(0), r.getString(2), v(v.argmax))
 })(Encoders.product[OutputData])
 .toDF()
 }
}
case class LabeledPoint(label: Double,
 features: Vector,
 id: String)

下边展示局部计算逻辑图:


第三步:

经过一、二两步,得到的二叉树模型(boundTree)是这样的:

== bound tree ==
Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: jr_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: jr_age, confidence: 1.0]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: xinan_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: xinan_age, confidence: 0.997349184]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: jl_age != null || umc_age != null || yc_age != null
 LeftNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: jl_age == umc_age || jl_age == yc_age || umc_age == yc_age
 LeftNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: jl_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: jl_age, confidence: 0.992448605]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: umc_age, confidence: 0.992448605]
 }
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: jl_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: jl_age, confidence: 0.982582546]
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: umc_age != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: umc_age, confidence: 0.974128879]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 GeneralRule[fieldName: yc_age, confidence: 0.920175899]
 }
 }
 }
 }
 RightNode: Branch[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 Condition: vector != null
 LeftNode: Leaf[DataFrame]: {
 DataFrame: [id: string, featureValue: string ... 1 more field]
 ModelRule[modelClass: com.bj58.decision.model.XGBoostPipelineModel, modelFilePath: /home/hdp_jinrong_tech/resultdata/huangren/feature_mining/age/age-applist-xgb-0901, config: null]
 }
 RightNode: Leaf[DataFrame]: {
 DataFrame: [id: string, jr_age: string ... 5 more fields]
 null
 }
 }
 }
 }
}

那么怎么才能把这样一个已经绑定好数据的Tree转换为Spark可执行的物理执行计划呢?
如果明白了前边几步的实现,那么这一步是最简单的,上代码:

package com.yang.decision.build
import com.yang.decision.ModelProperties
import com.yang.decision.tree._
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 执行器:将Tree转换成Spark可执行的物理执行计划
 *
 * @author yangfan
 * @since 2021/3/16
 * @version 1.0.0
 */
object Executor {
 def executeAndSaveResultData(spark: SparkSession,
 sc: ModelProperties,
 processDay: String,
 tree: Tree[DataFrame]): Unit = {
 execute(tree).createOrReplaceTempView("output_data_temp_view")
 spark.sql(
 s"""
 |insert overwrite table ${sc.output(0)}
 |partition(dt = '$processDay', model = '${sc.output(1)}')
 |select
 | id
 | ,featureValue
 | ,confidenceScore
 |from output_data_temp_view
 """.stripMargin
 )
 }
 def execute(boundTree: Tree[DataFrame]): DataFrame = boundTree match {
 case leaf: Calculation if null != leaf.rule => leaf.data
 case branch: Condition => union(execute(branch.left), execute(branch.right))
 case _ => null
 }
 def union(l: DataFrame, r: DataFrame): DataFrame = (l, r) match {
 case _ if null == l => r
 case _ if null == r => l
 case _ => l.union(r)
 }
}

实际上,在前几步的实现下,我们已经将每个节点想要的数据集都已经绑定到该节点下了,那么最后我们想要的结果不就是每个叶子节点(计算节点)下的数据集吗?所以,最后只需要递归遍历叶子节点,并把所有数据集union起来就是最终的结果集。而得到的DataFrame一旦触发计算,就会自动生成物理执行计划到Spark集群上执行!我这里触发计算的当然就是insert命令。
最后展示以下物理执行计划(Physical Plan),以及执行时的监控:

== Physical Plan ==
*Union
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#210, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#211, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#212]
: +- *MapElements <function1>, obj#209: com.bj58.decision.OutputData
: +- *Filter <function1>.apply
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#208: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#224, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#225, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#226]
: +- *MapElements <function1>, obj#223: com.bj58.decision.OutputData
: +- *Filter (<function1>.apply && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#222: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#242, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#243, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#244]
: +- *MapElements <function1>, obj#241: com.bj58.decision.OutputData
: +- *Filter ((((<function1>.apply && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#240: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#254, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#255, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#256]
: +- *MapElements <function1>, obj#253: com.bj58.decision.OutputData
: +- *Filter ((((<function1>.apply && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#252: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#268, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#269, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#270]
: +- *MapElements <function1>, obj#267: com.bj58.decision.OutputData
: +- *Filter ((((<function1>.apply && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#266: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#282, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#283, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#284]
: +- *MapElements <function1>, obj#281: com.bj58.decision.OutputData
: +- *Filter (((((<function1>.apply && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#280: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
:- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#294, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#295, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#296]
: +- *MapElements <function1>, obj#293: com.bj58.decision.OutputData
: +- *Filter (((((<function1>.apply && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply) && <function1>.apply)
: +- *DeserializeToObject createexternalrow(id#29.toString, jr_age#30.toString, xinan_age#31.toString, jl_age#32.toString, umc_age#33.toString, yc_age#34.toString, staticinvoke(class scala.collection.mutable.WrappedArray$, ObjectType(interface scala.collection.Seq), make, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, DoubleType, true), vector#28, None).array, true, false), StructField(id,StringType,true), StructField(jr_age,StringType,true), StructField(xinan_age,StringType,true), StructField(jl_age,StringType,true), StructField(umc_age,StringType,true), StructField(yc_age,StringType,true), StructField(vector,ArrayType(DoubleType,true),true)), obj#292: org.apache.spark.sql.Row
: +- *Exchange RoundRobinPartitioning(300)
: +- *Project [coalesce(userid#121, userid#27) AS id#29, cast(jr_age#22 as string) AS jr_age#30, cast(xinan_age#23 as string) AS xinan_age#31, cast(jl_age#24 as string) AS jl_age#32, cast(umc_age#25 as string) AS umc_age#33, cast(yc_age#26 as string) AS yc_age#34, vector#28]
: +- *SortMergeJoin [userid#121], [userid#27], FullOuter
: :- *SortAggregate(key=[userid#121], functions=[max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, jr_age#22, xinan_age#23, jl_age#24, umc_age#25, yc_age#26])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#121, 500, false)
: : +- *SortAggregate(key=[userid#121], functions=[partial_max(CASE WHEN (ds#2 = jinrong) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = xinan) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = jianli) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = umc) THEN feature_value#1 ELSE null END), partial_max(CASE WHEN (ds#2 = yingcai) THEN feature_value#1 ELSE null END)], output=[userid#121, max#507, max#508, max#509, max#510, max#511])
: : +- *Sort [userid#121 ASC NULLS FIRST], false, 0
: : +- *Union
: : :- *Project [cast(user_id#39L as string) AS userid#121, birthday#43 AS feature_value#1, xinan AS ds#2]
: : : +- *Filter ((((((((isnotnull(user_id#39L) && isnotnull(birthday#43)) && isnotnull(cast(birthday#43 as int))) && (cast(birthday#43 as int) > 19200101)) && (cast(birthday#43 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#43, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (cast(user_id#39L as string) = 0)) && (cast(cast(user_id#39L as string) as bigint) > 0)) && NOT cast(user_id#39L as string) RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#43, user_id#39L], HiveTableRelation `hdp_ubu_xxzl_defaultdb`.`dw_realname_userinfo`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#37, access_party_id#38, user_id#39L, name_md5#40, idno_md5#41, gender#42, birthday#43, address#44], [dt#45], [isnotnull(dt#45), (dt#45 = 20210321)]
: : :- *Project [wb_id#46 AS userid#5, birthday#3 AS feature_value#6, jinrong AS ds#7]
: : : +- *Filter ((((((isnotnull(rank1#4) && isnotnull(birthday#3)) && (rank1#4 = 1)) && isnotnull(cast(birthday#3 as int))) && (cast(birthday#3 as int) > 19200101)) && isnotnull(unix_timestamp(birthday#3, yyyyMMdd, Some(Asia/Shanghai)))) && (cast(birthday#3 as int) < 20170101))
: : : +- *Window [row_number() windowspecdefinition(wb_id#46, bill_no#50 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank1#4], [wb_id#46], [bill_no#50 DESC NULLS LAST]
: : : +- *Sort [wb_id#46 ASC NULLS FIRST, bill_no#50 DESC NULLS LAST], false, 0
: : : +- *Exchange hashpartitioning(wb_id#46, 500, false)
: : : +- *Project [wb_id#46, CASE WHEN ((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) THEN substring(idcard#47, 7, 8) WHEN ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))) THEN concat(19, substring(idcard#47, 7, 6)) END AS birthday#3, bill_no#50]
: : : +- *Filter ((((isnotnull(wb_id#46) && (((length(idcard#47) = 18) && isnotnull(cast(substring(idcard#47, 7, 8) as int))) || ((length(idcard#47) = 15) && isnotnull(cast(substring(idcard#47, 7, 6) as int))))) && NOT (wb_id#46 = 0)) && (cast(wb_id#46 as bigint) > 0)) && NOT wb_id#46 RLIKE ^0.*)
: : : +- *HiveTableScan [bill_no#50, idcard#47, wb_id#46], HiveTableRelation `hdp_jinrong_qiangui_defaultdb`.`itf_58haojie_user_baihang`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [wb_id#46, idcard#47, name#48, phone#49, bill_no#50]
: : :- *Project [userid#51, CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END AS feature_value#10, umc AS ds#11]
: : : +- *Filter (((((((((isnotnull(userid#51) && (length(birthday#66) > 5)) && (((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) || (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1))) && isnotnull(cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int))) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) > 19200101)) && (cast(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END as int) < 20170101)) && isnotnull(unix_timestamp(CASE WHEN ((instr(trim(birthday#66, None), CST) > 0) || (instr(trim(birthday#66, None), CDT) > 0)) THEN from_unixtime(unix_timestamp(trim(birthday#66, None), EEEMMMddHH:mm:sszzzyyyy, Some(Asia/Shanghai)), yyyyMMdd, Some(Asia/Shanghai)) WHEN (length(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0)) > 1) THEN regexp_replace(regexp_extract(birthday#66, ^d{4}-d{2}-d{2}, 0), -, ) END, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#51 = 0)) && (cast(userid#51 as bigint) > 0)) && NOT userid#51 RLIKE ^0.*)
: : : +- *HiveTableScan [birthday#66, userid#51], HiveTableRelation `hdp_teu_dpd_defaultdb`.`ds_umc_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#51, register_name#52, register_email#53, register_mobile#54, verified_mobile#55, verified_realname#56, verified_business#57, reg_time#58, reg_ip#59, reg_cityid#60, reg_platform#61, locked#62, md5_realname#63, nick_name#64, sex#65, birthday#66, md5_qq#67, md5_msn#68, md5_mobile#69, md5_phone#70, postzip#71, face#72, verified_face#73, address#74, ... 17 more fields], [ds#92], [isnotnull(ds#92), (ds#92 = umc)]
: : :- *Project [userid#93, concat(regexp_replace(feature_value#95, -, ), 01) AS feature_value#12, jianli AS ds#13]
: : : +- *Filter ((((((((((isnotnull(userid#93) && isnotnull(feature_key#94)) && (feature_key#94 = zp_avg_birthday_rsm_val_1d)) && (length(feature_value#95) = 7)) && isnotnull(cast(regexp_replace(feature_value#95, -, ) as int))) && (cast(regexp_replace(feature_value#95, -, ) as int) > 192001)) && (cast(regexp_replace(feature_value#95, -, ) as int) < 201701)) && isnotnull(unix_timestamp(concat(regexp_replace(feature_value#95, -, ), 01), yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#93 = 0)) && (cast(userid#93 as bigint) > 0)) && NOT userid#93 RLIKE ^0.*)
: : : +- *FileScan parquet hdp_jinrong_tech_dw.stf_taie_feature_topic[userid#93,feature_key#94,feature_value#95,service_type#98,period#99,task#100,dt#101] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/resultdata/hdp_jinrong_tech_dw/..., PartitionCount: 1, PartitionFilters: [isnotnull(service_type#98), isnotnull(task#100), isnotnull(dt#101), isnotnull(period#99), (dt#10..., PushedFilters: [IsNotNull(userid), IsNotNull(feature_key), EqualTo(feature_key,zp_avg_birthday_rsm_val_1d), Not(..., ReadSchema: struct<userid:string,feature_key:string,feature_value:string>
: : +- *Project [userid#14, feature_value#15, ds#16]
: : +- *Filter (count(1)#117L = 1)
: : +- *SortAggregate(key=[userid#102], functions=[max(birthday#103), count(1)], output=[userid#14, feature_value#15, ds#16, count(1)#117L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Exchange hashpartitioning(userid#102, 500, false)
: : +- *SortAggregate(key=[userid#102], functions=[partial_max(birthday#103), partial_count(1)], output=[userid#102, max#166, count#167L])
: : +- *Sort [userid#102 ASC NULLS FIRST], false, 0
: : +- *Project [userid#102, birthday#103]
: : +- *Filter ((((((((isnotnull(birthday#103) && isnotnull(userid#102)) && isnotnull(cast(birthday#103 as int))) && (cast(birthday#103 as int) > 19200101)) && (cast(birthday#103 as int) < 20170101)) && isnotnull(unix_timestamp(birthday#103, yyyyMMdd, Some(Asia/Shanghai)))) && NOT (userid#102 = 0)) && (cast(userid#102 as bigint) > 0)) && NOT userid#102 RLIKE ^0.*)
: : +- *FileScan parquet hdp_jinrong_tech_ods.user_chinahr_bir_20181231[userid#102,birthday#103,dt#104] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[viewfs://58-cluster/home/hdp_jinrong_tech/warehouse/hdp_jinrong_tech_dw/d..., PartitionCount: 1, PartitionFilters: [isnotnull(dt#104), (dt#104 = 20181231)], PushedFilters: [IsNotNull(birthday), IsNotNull(userid), Not(EqualTo(userid,0))], ReadSchema: struct<userid:string,birthday:string>
: +- *Sort [userid#27 ASC NULLS FIRST], false, 0
: +- *Exchange hashpartitioning(userid#27, 500, true)
: +- *Project [UDF:des_encoder(userid#105, decode) AS userid#27, UDF:vector_trans(applistall#110, 4399游戏盒|代练通|b612咔叽|wifi万能钥匙|掌上wegame|翼支付|个人所得税|掌上英雄联盟|比心|饿了么|三国杀|知到|inshot-视频编辑|手机淘宝|美图秀秀|节奏大师|识货|部落冲突|qq|前程无忧51job|百度极速版|美柚|一甜相机|虎牙直播|迷你世界|云班课|轻颜相机|tt语音|火山小视频|驾考宝典|买单吧|宝宝树孕育|一起小学学生| 360手机卫士|家长通|同桌游戏|vsco|意见反馈|picsart美易照片编辑|哔哩哔哩|爱奇艺|一起作业学生|安全教育平台|小猿口算|熊猫优选|y2002电音|滴滴车主|百度网盘|dnf助手|作业帮|剪映|毒|微光|纳米盒|qq音乐|欢乐斗地主|汽车之家|uu加速器|最右|steam|内涵段子|小天才|百度手机助手|店长直聘|美篇|好看视频|uki|云课堂智慧职教|一键锁屏|蓝墨云班课|360手机助手|学习通|掌上飞车|抖音插件|支付宝|萤石云视频|全民小视频|小红书|手机营业厅|yy|铃声多多|腾讯动漫|来分期|快手|发现精彩|今日头条极速版|捷信金融|交易猫|faceu激萌|天天酷跑|蘑菇街|中国大学mooc|斗米|同花顺|分期乐|企鹅电竞|一起学|学而思网校|抖音短视频|快手直播伴侣|芒果tv|酷狗音乐|平安口袋银行|应用宝|绝地求生刺激战场|网易云音乐|易班|qq飞车|好游快爆|微博|王者营地|影视大全|球球大作战|boss直聘|皮皮虾|作业盒子小学|天天p图|小盒家长|皮皮搞笑|u净|全球购骑士特权|小猿搜题|mix|momo陌陌|掌上穿越火线|今日头条|超级课程表|穿越火线:枪战王者|掌通家园|学小易|王者荣耀|pubgmobile|工银融e联|快手小游戏|知乎|小恩爱|心悦俱乐部|平安普惠|百度贴吧|优酷|keep|美团|得物(毒)|百词斩|找靓机|晓黑板|到梦空间|美团外卖|智联招聘|华为主题动态引擎|情侣空间|抖音火山版|闲鱼|中国建设银行|今日校园|玩吧|相册管家|便签|智学网|浦发信用卡|熊猫直播|交管12123|掌上生活|腾讯新闻|猫咪|迅雷|探探|腾讯欢乐麻将|qq同步助手|凤凰新闻|平安好车主|和平精英|比心陪练|优酷视频|快猫|狼人杀|第五人格|快看漫画|斗鱼直播|途虎养车|搜狗输入法|西瓜视频|快影|人人视频|学习强国|倒数日|韩剧tv|音遇|soul|菜鸟裹裹|云电脑|无他相机|迅游手游加速器|租号玩|平安金管家|qq安全中心|360清理大师|葫芦侠|王者荣耀助手|taptap|全民k歌|腾讯视频|唱鸭|掌上道聚城|百度, \|) AS vector#28]
: +- *HiveTableScan [applistall#110, userid#105], HiveTableRelation `hdp_jinrong_tech_dw`.`dw_taie_tb_app_action_applist_info`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [userid#105, lastdate#106, applistlast#107, applist180d#108, applist365d#109, applistall#110], [daystr#111], [isnotnull(daystr#111), (daystr#111 = 20210321)]
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).id, true, false) AS id#472, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.bj58.decision.OutputData, true]).featureValue, true, false) AS featureValue#473, assertnotnull(input[0, com.bj58.decision.OutputData, true]).confidenceScore AS confidenceScore#474]
 +- *MapElements <function1>, obj#471: com.bj58.decision.OutputData
 +- *DeserializeToObject createexternalrow(id#385.toString, newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize, predictedLabel#454.toString, StructField(id,StringType,true), StructField(probability,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true), StructField(predictedLabel,StringType,true)), obj#470: org.apache.spark.sql.Row
 +- *Project [id#385, UDF(_probability#389) AS probability#406, UDF(knownotnull(UDF(_probability#389))) AS predictedLabel#454]
 +- *Scan ExistingRDD[label#383,features#384,id#385,classIndex#386,indexedFeatures#387,_rawPrediction#388,_probability#389]

DAG执行图:

Stage执行图:

四、优化计算

一、关于条件表达式解析计算的优化

为了解析用户定义的各种条件表达式,我引入了阿里开源的QLExpress,它支持解析各种数学表达式,也支持重写、自定义等操作。QLExpress开源项目源码链接:https://github.com/alibaba/QL...
我在使用它时,一开始发现效率不是很高,后来发现跟代码实现有关,部分代码如下:

 lazy val runner: ExpressRunner = {
 val runner = new ExpressRunner()
 /**
 * 修改原表达式 `==` 的判断逻辑
 * 修改前:null == null 返回:true
 * 修改后:null == null 返回:false
 */
 runner.replaceOperator("==", new EqualsOperator())
 runner
 }
 lazy val fields: Array[String] = runner.getOutVarNames(ruleText)
 lazy val leftDF: DataFrame = data.filter(r => isLeft(r))
 lazy val rightDF: DataFrame = data.filter(!isLeft(_))
 def isLeft(row: Row): Boolean = {
 val context = new DefaultContext[String, AnyRef]
 fields.foreach(field =>
 context.put(field, row.getAs(field).asInstanceOf[AnyRef])
 )
 try {
 runner.execute(ruleText, context,
 null, true, false)
 .asInstanceOf[Boolean]
 } catch {
 case _: Exception =>
 throw new RuntimeException(
 s"`isLeft` method Exception, condition expression compute failed !")
 }
 }

按照一开始的实现,不难发现每处理一条数据都需要传入ruleText去重新解析表达式,这样的话效率自然会变低,解决方案也很简单,就是开启表达式的缓存功能。
第二个问题是按照Hive或者Spark的逻辑,==表达式两边如果出现null,那么一定是要返回false的,但QLExpress竟然返回的是true。于是我自己重信定义了==表达式,替换掉了原来的表达式逻辑:runner.replaceOperator("==", new EqualsOperator())。代码如下:

package com.yang.express;
import com.ql.util.express.Operator;
import com.ql.util.express.OperatorOfNumber;
/**
 * 修改阿里开源QLExpress表达式 `==` 的判断逻辑
 * 修改前:null == null 返回:true
 * 修改后:null == null 返回:false
 *
 * @author yangfan
 * @since 2021/3/19
 * @version 1.0.0
 */
public class EqualsOperator extends Operator {
 @Override
 public Boolean executeInner(Object[] objects) {
 return executeInner(objects[0], objects[1]);
 }
 private Boolean executeInner(Object op1, Object op2) {
 if (null != op1 && null != op2) {
 int compareResult;
 if (op1 instanceof Character || op2 instanceof Character) {
 if (op1 instanceof Character && op2 instanceof Character) {
 return op1.equals(op2);
 }
 if (op1 instanceof Number) {
 compareResult = OperatorOfNumber.compareNumber((Number) op1, Integer.valueOf((Character) op2));
 return compareResult == 0;
 }
 if (op2 instanceof Number) {
 compareResult = OperatorOfNumber.compareNumber(Integer.valueOf((Character) op1), (Number) op2);
 return compareResult == 0;
 }
 }
 if (op1 instanceof Number && op2 instanceof Number) {
 compareResult = OperatorOfNumber.compareNumber((Number) op1, (Number) op2);
 return compareResult == 0;
 } else {
 return op1.equals(op2);
 }
 } else {
 return false;
 }
 }
}

二、针对XGBoot入参需要传入的数组向量的优化

这个在另一篇文章有详细说明。传送门:https://segmentfault.com/a/11...

五、项目总结

项目源码地址:https://github.com/dafanpangg...
待更新~

    作者:farAway原文地址:https://segmentfault.com/a/1190000039689046

    %s 个评论

    要回复文章请先登录注册