算法必知 --- LFU缓存淘汰算法
写在前LRU缓存机制(Least Recently Used)(看时间)在缓存满的时候,删除缓存里最久未使用的数据,然后再放入新元素;数据的访问时间很重要,访问时间距离现在越近,就越不容易被删除;就是喜新厌旧,淘汰在缓存里呆的时间最久的元素。在删除元素的时候,只看「时间」这一个维度。LFU缓存机制(Least Frequently Used)(看访问次数)在缓存满的时候,删除缓存里使用次数最少的元素,然后在缓存中放入新元素;数据的访问次数很重要,访问次数越多,就越不容易被删除,即淘汰访问次数最少的;核心思想:先考虑访问次数,在访问次数相同的情况下,再考虑缓存的时间。算法描述请你为 最不经常使用(LFU)缓存算法设计并实现数据结构。实现 LFUCache 类:LFUCache(int capacity) - 用数据结构的容量 capacity 初始化对象int get(int key) - 如果键存在于缓存中,则获取键的值,否则返回 -1。void put(int key, int value) - 如果键已存在,则变更其值;如果键不存在,请插入键值对。当缓存达到其容量时,则应该在插入新项之前,使最不经常使用的项无效。在此问题中,当存在平局(即两个或更多个键具有相同使用频率)时,应该去除 最近最久未使用 的键。注意「项的使用次数」就是自插入该项以来对其调用 get 和 put 函数的次数之和。使用次数会在对应项被移除后置为 0 。为了确定最不常使用的键,可以为缓存中的每个键维护一个 使用计数器 。使用计数最小的键是最久未使用的键。当一个键首次插入到缓存中时,它的使用计数器被设置为 1 (由于 put 操作)。对缓存中的键执行 get 或 put 操作,使用计数器的值将会递增。注意哦,get 和 put 方法必须都是 O(1) 的时间复杂度!示例:输入:
["LFUCache", "put", "put", "get", "put", "get", "get", "put", "get", "get", "get"]
[[2], [1, 1], [2, 2], [1], [3, 3], [2], [3], [4, 4], [1], [3], [4]]
输出:
[null, null, null, 1, null, -1, 3, null, -1, 3, 4]
解释:
// cnt(x) = 键 x 的使用计数
// cache=[] 将显示最后一次使用的顺序(最左边的元素是最近的)
LFUCache lFUCache = new LFUCache(2);
lFUCache.put(1, 1); // cache=[1,_], cnt(1)=1
lFUCache.put(2, 2); // cache=[2,1], cnt(2)=1, cnt(1)=1
lFUCache.get(1); // 返回 1
// cache=[1,2], cnt(2)=1, cnt(1)=2
lFUCache.put(3, 3); // 去除键 2 ,因为 cnt(2)=1 ,使用计数最小
// cache=[3,1], cnt(3)=1, cnt(1)=2
lFUCache.get(2); // 返回 -1(未找到)
lFUCache.get(3); // 返回 3
// cache=[3,1], cnt(3)=2, cnt(1)=2
lFUCache.put(4, 4); // 去除键 1 ,1 和 3 的 cnt 相同,但 1 最久未使用
// cache=[4,3], cnt(4)=1, cnt(3)=2
lFUCache.get(1); // 返回 -1(未找到)
lFUCache.get(3); // 返回 3
// cache=[3,4], cnt(4)=1, cnt(3)=3
lFUCache.get(4); // 返回 4
// cache=[3,4], cnt(4)=2, cnt(3)=3算法设计首先需要维护一个链表,链表的结构如下。ps:注意是双端链表,图示有误,但意思明确,具体见代码。注意:首先我们在LRU中定义两个永久节点,head作为头节点、tail作为尾节点,就作为了我们链表的头节点和尾节点。我们插入的每一个缓存都是链表中的一个Node节点。因为链表频数大的靠近head,频数小的靠近tail。这种情况实际上是将我们的链表按照频数划分成了不同的区域如下图算法需要实现的三个操作:新增节点:如果新增节点的频数为1,所以我们需要找到当前链表频数为1的部分的第一个节点(头结点),在他前面插入新元素(如果不存在频数为1那么就是在tail节点前插入)修改节点(删除+移动):首先需要将节点的值进行修改这个很简单,然后就是移动节点在链表中的位置了,假设节点的频数为 a ,那么节点首先需要从频数为a的区域中删除,这就分为了以下几种情况:频数为a的区域只有一个节点,那么a节点频数修改后,频数为a的区域将会消失(注意这个说法,后面会讲实现),然后将当前暂时从链表中移除频数为a的区域的头节点是当前节点,那么将频数为a的节点的头节点改为当前节点的后一个元素即可,然后将当前节点暂时从链表中移除频数为a的区域的头节点不是当前节点,那么直接将当前节点暂时从频数为a的区域删除即可。从频数为a的区域删除后,下面一步就是插入到频数为a+1的区域的头部:频数为a+1的区域不存在,那么将当前节点插入到上一步更新后的频数为a的头节点的前面即可频数为a+1的区域存在,直接插入到频数为a+1的区域的头部即可删除节点:因为tail节点的前一个就是我们使用次数最少且最不常使用的缓存,我们直接删除tail节点的前一个节点即可,单数删除的时候需要注意。假设需要删除的节点的频数为a,这个操作相当于将指定节点从频数为a的区域删除,这和我们修改阶段的第一步是类似的。总结:上述算法实现的重点,如何获得频数为a的区域的头节点(使用一个Map来维护链表中频数为a的区域的头节点)。代码实现首先定义双端链表类(包括数据和记录前驱/后继节点的指针)class DLinkedNode {
int key;
int value;
// 记录当前key被调用的次数
int count;
DLinkedNode pre;
DLinkedNode next;
public DLinkedNode() {};
public DLinkedNode(int key, int value) {
this.key = key;
this.value = value;
this.count = count;
}
}双向链表需要提供一些接口api,便于我们操作,主要就是链表的一些操作,画图理解!private void renewNode(DLinkedNode node) {
int oldCnt = node.count;
int newCnt = oldCnt + 1;
DLinkedNode next = null;
if (cntMap.get(oldCnt) == node) {
//当前节点是oldCnt频数的头结点(两种情况:还有其他节点/只有一个节点)
// 更新oldCnt频数头结点的映射
if (node.next.count == node.count) {
cntMap.put(oldCnt, node.next);
} else {
cntMap.remove(oldCnt);
}
// 更新newCnt频数头结点的映射(不存在直接加入,存在找到对应频数的头结点)
if (cntMap.get(newCnt) == null) {
cntMap.put(newCnt, node);
node.count++;
return;
} else {
removeFromList(node);
next = cntMap.get(newCnt);
}
} else {
// 当前节点不是某个频数的头结点(我们不需要维护频数头结点的映射,直接找到对应频数的头结点即可)
removeFromList(node);
if (cntMap.get(newCnt) == null) {
next = cntMap.get(oldCnt);
} else {
next = cntMap.get(newCnt);
}
}
node.count++;
cntMap.put(newCnt, node);
// 插入节点(连接节点),其中next是频数的头结点
insertToList(node, next);
}
private void removeFromList(DLinkedNode node) {
node.pre.next = node.next;
node.next.pre = node.pre;
}
private void insertToList(DLinkedNode node, DLinkedNode next) {
next.pre.next = node;
node.pre = next.pre;
node.next = next;
next.pre = node;
}
// 缓存容量满了,删除一个最少且最久没使用的节点
private void deleteCache() {
DLinkedNode delNode = tail.pre;
DLinkedNode pre = delNode.pre;
if (cntMap.get(delNode.count) == delNode) {
// 删除节点是某个频数的头结点
cntMap.remove(delNode.count);
}
// 实际删除的节点
pre.next = tail;
tail.pre = pre;
cache.remove(delNode.key);
--size;
}确定LRU缓存类的成员变量(链表长度、缓存容量和map映射等)和构造函数。注意:定义虚拟头尾结点便于在头部插入元素或者寻找尾部元素!并在构造函数初始化。// cnt - node : 增加了频数与头节点的映射
private Map<Integer, DLinkedNode> cntMap = new HashMap<>();
// key - node
private Map<Integer, DLinkedNode> cache = new HashMap<>();
// 缓存中目前存储的数据量
private int size;
private int capacity;
private DLinkedNode head, tail;
public LFUCache(int capacity) {
this.size = 0;
this.capacity = capacity;
head = new DLinkedNode();
tail = new DLinkedNode();
head.next = tail;
tail.pre = head;
}核心代码:get和put方法,都是先根据key获取这个映射,根据映射节点的情况(有无)进行操作。注意:get和put都在使用,所以数据要提前!put操作如果改变了双端链表长度(不是仅改变值),需要先判断是否达到最大容量!public int get(int key) {
DLinkedNode node = cache.get(key);
if (capacity == 0 || node == null) {
return -1;
}
// node节点的调用次数+1,应该更新他的位置
renewNode(node);
return node.value;
}
public void put(int key, int value) {
if (capacity == 0) {
return;
}
DLinkedNode node = cache.get(key);
if (node != null) {
node.value = value;
// 将这个节点的频数cnt+1,更新位置
renewNode(node);
} else {
if (cache.size() == capacity) {
deleteCache();
--size;
}
DLinkedNode newNode = new DLinkedNode(key, value, 1);
DLinkedNode next = cntMap.get(1);
if (next == null) {
next = tail;
}
// 将新建的节点插入到链表,并更新映射
insertToList(newNode, next);
cntMap.put(1, newNode);
cache.put(key, newNode);
++size;
}
}完整代码如下:class LFUCache {
class DLinkedNode {
int key;
int value;
// 记录当前key被调用的次数(即node节点的频数)
int count;
DLinkedNode pre;
DLinkedNode next;
public DLinkedNode() {};
public DLinkedNode(int key, int value, int count) {
this.key = key;
this.value = value;
this.count = count;
}
}
// cnt - node : 增加了频数与头节点的映射
private Map<Integer, DLinkedNode> cntMap = new HashMap<>();
// key - node
private Map<Integer, DLinkedNode> cache = new HashMap<>();
// 缓存中目前存储的数据量
private int size;
private int capacity;
private DLinkedNode head, tail;
public LFUCache(int capacity) {
this.size = 0;
this.capacity = capacity;
head = new DLinkedNode();
tail = new DLinkedNode();
head.next = tail;
tail.pre = head;
}
public int get(int key) {
DLinkedNode node = cache.get(key);
if (capacity == 0 || node == null) {
return -1;
}
// node节点的调用次数+1,应该更新他的位置
renewNode(node);
return node.value;
}
public void put(int key, int value) {
if (capacity == 0) {
return;
}
DLinkedNode node = cache.get(key);
if (node != null) {
node.value = value;
// 将这个节点的频数cnt+1,更新位置
renewNode(node);
} else {
if (cache.size() == capacity) {
deleteCache();
--size;
}
DLinkedNode newNode = new DLinkedNode(key, value, 1);
DLinkedNode next = cntMap.get(1);
if (next == null) {
next = tail;
}
// 将新建的节点插入到链表,并更新映射
insertToList(newNode, next);
cntMap.put(1, newNode);
cache.put(key, newNode);
++size;
}
}
private void renewNode(DLinkedNode node) {
int oldCnt = node.count;
int newCnt = oldCnt + 1;
DLinkedNode next = null;
if (cntMap.get(oldCnt) == node) {
//当前节点是oldCnt频数的头结点(两种情况:还有其他节点/只有一个节点)
// 更新oldCnt频数头结点的映射
if (node.next.count == node.count) {
cntMap.put(oldCnt, node.next);
} else {
cntMap.remove(oldCnt);
}
// 更新newCnt频数头结点的映射(不存在直接加入,存在找到对应频数的头结点)
if (cntMap.get(newCnt) == null) {
cntMap.put(newCnt, node);
node.count++;
return;
} else {
removeFromList(node);
next = cntMap.get(newCnt);
}
} else {
// 当前节点不是某个频数的头结点(我们不需要维护频数头结点的映射,直接找到对应频数的头结点即可)
removeFromList(node);
if (cntMap.get(newCnt) == null) {
next = cntMap.get(oldCnt);
} else {
next = cntMap.get(newCnt);
}
}
node.count++;
cntMap.put(newCnt, node);
// 插入节点(连接节点),其中next是频数的头结点
insertToList(node, next);
}
private void removeFromList(DLinkedNode node) {
node.pre.next = node.next;
node.next.pre = node.pre;
}
private void insertToList(DLinkedNode node, DLinkedNode next) {
next.pre.next = node;
node.pre = next.pre;
node.next = next;
next.pre = node;
}
// 缓存容量满了,删除一个最少且最久没使用的节点
private void deleteCache() {
DLinkedNode delNode = tail.pre;
DLinkedNode pre = delNode.pre;
if (cntMap.get(delNode.count) == delNode) {
// 删除节点是某个频数的头结点
cntMap.remove(delNode.count);
}
// 实际删除的节点
pre.next = tail;
tail.pre = pre;
cache.remove(delNode.key);
--size;
}
}总结与补充上述代码在LRU基础上进行的。主要区别是:节点类中引入了count变量,记录key出现的频数LFU成员变量中增加了cntMap:key的频数与这个频数区间头结点的映射注意:同样的,我们要注意维护cntMap映射和节点的频数
客快物流大数据项目(六十七):客户主题(一)
客户主题一、背景介绍客户主题主要是通过分析用户的下单情况构建用户画像二、指标明细三、表关联关系1、事实表2、维度表3、关联关系用户表与维度表的关联关系如下:四、客户数据拉宽开发1、拉宽后的字段2、SQL语句SELECT
TC."id" ,
TC."name" ,
TC."tel",
TC."mobile",
TC."email",
TC."type",
TC."is_own_reg",
TC."reg_dt",
TC."reg_channel_id",
TC."state",
TC."cdt",
TC."udt",
TC."last_login_dt",
TC."remark",
customercodes."code_desc",
sender_info.first_cdt AS first_sender_cdt ,
sender_info.last_cdt AS last_sender_cdt,
sender_info.billCount AS billCount,
sender_info.totalAmount AS totalAmount
FROM "tbl_customer" tc
LEFT JOIN (
SELECT
"ciid", min(sender_info."id") first_id, max(sender_info."id") last_id, min(sender_info."cdt") first_cdt, max(sender_info."cdt") last_cdt,COUNT(sender_info."id" ) billCount,sum(express_package."actual_amount") totalAmount
FROM "tbl_consumer_sender_info" sender_info
LEFT JOIN "tbl_express_package" express_package
ON SENDER_INFO."pkg_id" =express_package."id"
GROUP BY sender_info."ciid"
) sender_info
ON tc."id" = sender_info."ciid"
LEFT JOIN "tbl_codes" customercodes ON customercodes."type" =16 AND tc."type" =customercodes."code"3、Spark实现实现步骤:在dwd目录下创建 CustomerDWD 单例对象,继承自OfflineApp特质初始化环境的参数,创建SparkSession对象获取客户表(tbl_customer)数据,并缓存数据判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)获取客户寄件信息表(tbl_consumer_sender_info)数据,并缓存数据获取客户包裹表(tbl_express_package)数据,并缓存数据获取物流字典码表(tbl_codes)数据,并缓存数据根据以下方式拉宽仓库车辆明细数据根据客户id,在客户表中获取客户数据根据包裹id,在包裹表中获取包裹数据根据客户类型id,在物流字典码表中获取客户类型名称数据创建客户明细宽表(若存在则不创建)将客户明细宽表数据写入到kudu数据表中删除缓存数据3.1、初始化环境变量初始化客户明细拉宽作业的环境变量package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
/**
* 客户主题数据的拉宽操作
*/
object CustomerDWD extends OfflineApp {
//定义应用的名称
val appName = this.getClass.getSimpleName
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}3.2、加载客户相关的表并缓存加载客户表的时候,需要指定日期条件,因为客户主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)//导入隐士转换
import sparkSession.implicits._
val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)3.3、定义表的关联关系为了在DWS层任务中方便的获取每日增量客户表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd代码如下://TODO 4)定义维度表与事实表的关联关系
val left_outer = "left_outer"
/**
* 获取每个用户的首尾单发货信息及发货件数和总金额
*/
val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
.groupBy(customerSenderInfoDF("ciid"))
.agg(min(customerSenderInfoDF("id")).alias("first_id"),
max(customerSenderInfoDF("id")).alias("last_id"),
min(expressPageageDF("cdt")).alias("first_cdt"),
max(expressPageageDF("cdt")).alias("last_cdt"),
count(customerSenderInfoDF("id")).alias("totalCount"),
sum(expressPageageDF("actualAmount")).alias("totalAmount")
)
val customerDetailDF: DataFrame = customerDF
.join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
.join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
.sort(customerDF("cdt").asc)
.select(
customerDF("id"),
customerDF("name"),
customerDF("tel"),
customerDF("mobile"),
customerDF("type").cast(IntegerType),
customerTypeDF("codeDesc").as("type_name"),
customerDF("isownreg").as("is_own_reg"),
customerDF("regdt").as("regdt"),
customerDF("regchannelid").as("reg_channel_id"),
customerDF("state"),
customerDF("cdt"),
customerDF("udt"),
customerDF("lastlogindt").as("last_login_dt"),
customerDF("remark"),
customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
customerSenderDetailInfoDF("totalCount"), //寄件总次数
customerSenderDetailInfoDF("totalAmount") //总金额
)3.4、创建客户明细宽表并将客户明细数据写入到kudu数据表中客户明细宽表数据需要保存到kudu中,因此在第一次执行客户明细拉宽操作时,客户明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建实现步骤:在CustomerDWD 单例对象中调用save方法实实现过程:在CustomerDWD 单例对象Main方法中调用save方法save(customerDetailDF, OfflineTableDefine.customerDetail)3.5、删除缓存数据为了释放资源,客户明细宽表数据计算完成以后,需要将缓存的源表数据删除。//移除缓存
customerDetailDF.unpersist
codesDF.unpersist
expressPackageDF.unpersist
customerSenderDF.unpersist
customerDF.unpersist3.6、完整代码package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
/**
* 客户主题数据的拉宽操作
*/
object CustomerDWD extends OfflineApp {
//定义应用的名称
val appName = this.getClass.getSimpleName
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//导入隐士转换
import sparkSession.implicits._
val customerSenderInfoDF: DataFrame = getKuduSource(sparkSession, TableMapping.consumerSenderInfo, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val customerDF = getKuduSource(sparkSession, TableMapping.customer, true).persist(StorageLevel.DISK_ONLY_2)
val expressPageageDF = getKuduSource(sparkSession, TableMapping.expressPackage, true).persist(StorageLevel.DISK_ONLY_2)
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
val customerTypeDF = codesDF.where($"type" === CodeTypeMapping.CustomType)
//TODO 4)定义维度表与事实表的关联关系
val left_outer = "left_outer"
/**
* 获取每个用户的首尾单发货信息及发货件数和总金额
*/
val customerSenderDetailInfoDF: DataFrame = customerSenderInfoDF.join(expressPageageDF, expressPageageDF("id") === customerSenderInfoDF("pkgId"), left_outer)
.groupBy(customerSenderInfoDF("ciid"))
.agg(min(customerSenderInfoDF("id")).alias("first_id"),
max(customerSenderInfoDF("id")).alias("last_id"),
min(expressPageageDF("cdt")).alias("first_cdt"),
max(expressPageageDF("cdt")).alias("last_cdt"),
count(customerSenderInfoDF("id")).alias("totalCount"),
sum(expressPageageDF("actualAmount")).alias("totalAmount")
)
val customerDetailDF: DataFrame = customerDF
.join(customerSenderDetailInfoDF, customerDF("id") === customerSenderInfoDF("ciid"), left_outer)
.join(customerTypeDF, customerDF("type") === customerTypeDF("code").cast(IntegerType), left_outer)
.sort(customerDF("cdt").asc)
.select(
customerDF("id"),
customerDF("name"),
customerDF("tel"),
customerDF("mobile"),
customerDF("type").cast(IntegerType),
customerTypeDF("codeDesc").as("type_name"),
customerDF("isownreg").as("is_own_reg"),
customerDF("regdt").as("regdt"),
customerDF("regchannelid").as("reg_channel_id"),
customerDF("state"),
customerDF("cdt"),
customerDF("udt"),
customerDF("lastlogindt").as("last_login_dt"),
customerDF("remark"),
customerSenderDetailInfoDF("first_id").as("first_sender_id"), //首次寄件id
customerSenderDetailInfoDF("last_id").as("last_sender_id"), //尾次寄件id
customerSenderDetailInfoDF("first_cdt").as("first_sender_cdt"), //首次寄件时间
customerSenderDetailInfoDF("last_cdt").as("last_sender_cdt"), //尾次寄件时间
customerSenderDetailInfoDF("totalCount"), //寄件总次数
customerSenderDetailInfoDF("totalAmount") //总金额
)
save(customerDetailDF, OfflineTableDefine.customerDetail)
// 5.4:将缓存的数据删除掉
customerDF.unpersist()
customerSenderInfoDF.unpersist()
expressPageageDF.unpersist()
customerTypeDF.unpersist()
sparkSession.stop()
}
}
客快物流大数据项目(六十六):车辆主题(下)
2、SQL语句2.1、拉宽网点车辆表SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
TTL ."given_load" ,
TTL ."load_cn_unit" ,
TTL ."load_en_unit" ,
TTL ."buy_dt" ,
TTL ."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
dot."id" AS dot_id,
dot."dot_name" ,
dot."dot_number" ,
dot."dot_addr" ,
dot."dot_gis_addr" ,
dot."dot_tel" ,
dot."manage_area_id" ,
dot."manage_area_gis" ,
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_transport_tool" ttl
LEFT JOIN "tbl_dot_transport_tool" tdtl ON ttl."id" = tdtl."transport_tool_id"
LEFT JOIN "tbl_dot" dot ON DOT ."id" = TDTL ."dot_id"
LEFT JOIN "tbl_company_dot_map" companydot ON companydot."dot_id" = TDTL ."dot_id"
LEFT JOIN "tbl_company" company ON company."id" = companydot."company_id"2.2、拉宽仓库车辆表SELECT
ttl."id" ,
ttl."brand" ,
ttl."model" ,
ttl."type" ,
ttl."given_load" ,
ttl."load_cn_unit" ,
ttl."load_en_unit" ,
ttl."buy_dt" ,
ttl."license_plate" ,
ttl."state" ,
ttl."cdt" ,
ttl."udt" ,
ttl."remark" ,
warehouse."id" ,
warehouse."name",
warehouse."addr",
warehouse."addr_gis",
warehouse."employee_id",
warehouse."type",
warehouse."area",
warehouse."is_lease",
COMPANY ."id" AS company_id,
COMPANY ."company_name",
COMPANY ."company_number",
COMPANY ."city_id",
COMPANY ."company_addr",
COMPANY ."company_addr_gis",
COMPANY ."company_tel",
COMPANY ."is_sub_company"
FROM "tbl_warehouse_transport_tool" twt
LEFT JOIN "tbl_transport_tool" ttl ON twt."transport_tool_id" = ttl."id"
LEFT JOIN "tbl_warehouse" warehouse ON WAREHOUSE ."id" = twt."warehouse_id"
LEFT JOIN "tbl_company_warehouse_map" warehouse_map ON warehouse_map."warehouse_id" = warehouse."id"
LEFT JOIN "tbl_company" company ON company."id" = warehouse_map."company_id"3、Spark实现实现步骤:在dwd目录下创建 TransportToolDWD 单例对象,继承自OfflineApp特质初始化环境的参数,创建SparkSession对象获取运输工具表(tbl_transport_tool)数据,并缓存数据判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)获取网点运输工具关联表(tbl_dot_transport_tool)数据,并缓存数据获取网点表(tbl_dot)数据,并缓存数据获取公司网点关联表(tbl_company_dot_map)数据,并缓存数据获取仓库运输工具关联表(tbl_warehouse_transport_tool)数据,并缓存数据获取公司仓库关联表(tbl_company_warehouse_map)数据,并缓存数据获取仓库表(tbl_warehouse)数据,并缓存数据获取公司表(tbl_company)数据,并缓存数据根据以下方式拉宽仓库车辆明细数据根据交通工具id,在交通工具表中获取交通工具数据根据网点id,在网点表中获取网点数据根据公司id,在公司表中获取公司数据根据仓库id,在仓库表中获取仓库数据创建网点车辆明细宽表(若存在则不创建)创建仓库车辆明细宽表(若存在则不创建)将仓库车辆明细宽表数据写入到kudu数据表中删除缓存数据3.1、初始化环境变量初始化运单明细拉宽作业的环境变量package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
} 3.2、加载运输工具表及车辆相关的表并缓存加载运输工具表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))3.3、定义网点车辆宽表的关联关系为了在DWS层任务中方便的获取每日增量网点车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd代码如下://TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)3.4、创建网点车辆明细宽表并将网点车辆明细数据写入到kudu数据表中网点车辆明细宽表数据需要保存到kudu中,因此在第一次执行网点车辆明细拉宽操作时,网点车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建实现步骤:在TransportToolDWD 单例对象中调用save方法实现过程:在TransportToolDWD 单例对象Main方法中调用save方法//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)3.5、定义仓库车辆宽表的关联关系为了在DWS层任务中方便的获取每日增量仓库车辆表数据(根据日期),因此在DataFrame基础上动态增加列(day),指定日期格式为yyyyMMdd代码如下:// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)3.6、创建仓库车辆明细宽表并将仓库车辆明细数据写入到kudu数据表中仓库车辆明细宽表数据需要保存到kudu中,因此在第一次执行仓库车辆明细拉宽操作时,仓库车辆明细宽表是不存在的,因此需要实现自动判断宽表是否存在,如果不存在则创建实现步骤:在TransportToolDWD 单例对象中调用save方法实现过程:在TransportToolDWD 单例对象Main方法中调用save方法save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)3.7、删除缓存数据为了释放资源,车辆明细宽表数据计算完成以后,需要将缓存的源表数据删除。//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()3.8、完整代码package cn.it.logistics.offline.dwd
import cn.it.logistics.common.{CodeTypeMapping, Configuration, OfflineTableDefine, SparkUtils, TableMapping}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 车辆主题开发
* 将车辆相关的表进行关联拉宽,将拉宽后的数据写入到车辆宽表中
* 1)网点车辆关联表->派送网点所拥有的车辆(三轮车)
* 2)仓库车辆关联表->仓库与仓库之间的运输工具(货车、货机)
*/
object TransportToolDWD extends OfflineApp{
//定义应用的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)初始化sparkConf对象
* 2)创建sparkSession对象
* 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
* 4)定义维度表与事实表的关联
* 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
* 5.1:创建车辆明细宽表的schema表结构
* 5.2:创建车辆宽表(判断宽表是否存在,如果不存在则创建)
* 5.3:将数据写入到kudu中
* 6)将缓存的数据删除掉
* 7)停止任务
*/
//1)初始化sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//2)创建sparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//数据处理
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)加载kudu中的事实表和维度表的数据(将加载后的数据进行缓存)
//加载车辆表数据(事实表)
val ttDF: DataFrame = getKuduSource(sparkSession, TableMapping.transportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆表数据
val ttDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dotTransportTool, true).persist(StorageLevel.DISK_ONLY_2)
//加载网点表的数据
val dotDF: DataFrame = getKuduSource(sparkSession, TableMapping.dot, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司网点关联表的数据
val companyDotDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyDotMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载公司表的数据
val companyDF: DataFrame = getKuduSource(sparkSession, TableMapping.company, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库车辆关联表数据(事实表)
val ttWsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouseTransportTool, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//加载仓库公司关联表
val companyWareHouseMapDF: DataFrame = getKuduSource(sparkSession, TableMapping.companyWarehouseMap, true).persist(StorageLevel.DISK_ONLY_2)
//加载仓库表数据
val wsDF: DataFrame = getKuduSource(sparkSession, TableMapping.warehouse, true).persist(StorageLevel.DISK_ONLY_2)
//加载物流码表数据
val codesDF: DataFrame = getKuduSource(sparkSession, TableMapping.codes, true).persist(StorageLevel.DISK_ONLY_2)
import sparkSession.implicits._
//获取运输工具类型
val transportTypeDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportType).select($"code".as("ttType"), $"codeDesc".as("ttTypeName"))
//获取运输工具状态
val transportStatusDF: DataFrame = codesDF.where($"type" === CodeTypeMapping.TransportStatus).select($"code".as("ttStatus"), $"codeDesc".as("ttStateName"))
//TODO 4)定义维度表与事实表的关联
val left_outer: String = "left_outer"
// 4.1:拉宽网点车辆表
val ttDotDetailDF = ttDotDF.join(ttDF, ttDotDF.col("transportToolId") === ttDF.col("id"), left_outer) //网点车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(dotDF, dotDF.col("id") === ttDotDF.col("dotId"), left_outer) //网点车辆表关联网点
.join(companyDotDF, ttDotDF.col("dotId") === companyDotDF.col("dotId"), left_outer) //网点车辆管连网点公司关联表
.join(companyDF, companyDotDF.col("companyId") === companyDF.col("id"), left_outer) //网点车辆表关联公司表
.withColumn("day", date_format(ttDotDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").cast(IntegerType).as("given_load"), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表state对应字典表类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
dotDF("id").as("dot_id"), //网点表dot_id
dotDF("dotNumber").as("dot_number"), //网点表dot_number
dotDF("dotName").as("dot_name"), //网点表dot_name
dotDF("dotAddr").as("dot_addr"), //网点表dot_addr
dotDF("dotGisAddr").as("dot_gis_addr"), //网点表dot_gis_addr
dotDF("dotTel").as("dot_tel"), //网点表dot_tel
dotDF("manageAreaId").as("manage_area_id"), //网点表manage_area_id
dotDF("manageAreaGis").as("manage_area_gis"), //网点表manage_area_gis
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
// 4.2:拉宽仓库车辆表
// 拉宽仓库车辆表
val ttWsDetailDF = ttWsDF.join(ttDF, ttWsDF.col("transportToolId") === ttDF.col("id"), left_outer) //仓库车辆表关联车辆表
.join(transportTypeDF, transportTypeDF("ttType") === ttDF("type"), left_outer) //车辆表类型关联字典表类型
.join(transportStatusDF, transportStatusDF("ttStatus") === ttDF("state"), left_outer) //车辆表状态管理字典表状态
.join(wsDF, wsDF.col("id") === ttWsDF.col("warehouseId"), left_outer) //仓库车辆表关联仓库
.join(companyWareHouseMapDF, ttWsDF.col("warehouseId") === companyWareHouseMapDF.col("warehouseId"), left_outer) //仓库车辆管连仓库公司关联表
.join(companyDF, companyDF.col("id") === companyWareHouseMapDF.col("companyId"), left_outer)
.withColumn("day", date_format(ttWsDF("cdt"), "yyyyMMdd"))//虚拟列,可以根据这个日期列作为分区字段,可以保证同一天的数据保存在同一个分区中
.sort(ttDF.col("cdt").asc)
.select(
ttDF("id"), //车辆表id
ttDF("brand"), //车辆表brand
ttDF("model"), //车辆表model
ttDF("type").cast(IntegerType), //车辆表type
transportTypeDF("ttTypeName").as("type_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("givenLoad").as("given_load").cast(IntegerType), //车辆表given_load
ttDF("loadCnUnit").as("load_cn_unit"), //车辆表load_cn_unit
ttDF("loadEnUnit").as("load_en_unit"), //车辆表load_en_unit
ttDF("buyDt").as("buy_dt"), //车辆表buy_dt
ttDF("licensePlate").as("license_plate"), //车辆表license_plate
ttDF("state").cast(IntegerType), //车辆表state
transportStatusDF("ttStateName").as("state_name"), // 车辆表type对应字典表车辆类型的具体描述
ttDF("cdt"), //车辆表cdt
ttDF("udt"), //车辆表udt
ttDF("remark"), //车辆表remark
wsDF("id").as("ws_id"), //仓库表id
wsDF("name"), //仓库表name
wsDF("addr"), //仓库表addr
wsDF("addrGis").as("addr_gis"), //仓库表addr_gis
wsDF("employeeId").as("employee_id"), //仓库表employee_id
wsDF("type").as("ws_type").cast(IntegerType), //仓库表type
wsDF("area"), //仓库表area
wsDF("isLease").as("is_lease").cast(IntegerType), //仓库表is_lease
companyDF("id").alias("company_id"), //公司表id
companyDF("companyName").as("company_name"), //公司表company_name
companyDF("cityId").as("city_id"), //公司表city_id
companyDF("companyNumber").as("company_number"), //公司表company_number
companyDF("companyAddr").as("company_addr"), //公司表company_addr
companyDF("companyAddrGis").as("company_addr_gis"), //公司表company_addr_gis
companyDF("companyTel").as("company_tel"), //公司表company_tel
companyDF("isSubCompany").as("is_sub_company"), //公司表is_sub_company
$"day"
)
//TODO 5)将拉宽后的数据再次写回到kudu数据库中(DWD明细层)
save(ttDotDetailDF, OfflineTableDefine.dotTransportToolDetail)
save(ttWsDetailDF, OfflineTableDefine.warehouseTransportToolDetail)
//TODO 6)将缓存的数据删除掉
ttDF.unpersist()
ttDotDF.unpersist()
dotDF.unpersist()
companyDotDF.unpersist()
companyDF.unpersist()
ttWsDF.unpersist()
companyWareHouseMapDF.unpersist()
wsDF.unpersist()
codesDF.unpersist()
sparkSession.stop()
}
}五、车辆数据指标开发1、计算的字段2、Spark实现实现步骤:在dws目录下创建 TransportToolDWS 单例对象,继承自OfflineApp特质初始化环境的参数,创建SparkSession对象根据指定的日期获取拉宽后的车辆主题宽表(tbl_dot_transport_tool_detail、tbl_dot_transport_tool_detail)增量数据,并缓存数据判断是否是首次运行,如果是首次运行的话,则全量装载数据(含历史数据)指标计算各网点发车次数各网点最大发车次数各网点最小发车次数各网点平均发车次数各区域发车次数各区域最大发车次数各区域最小发车次数各区域平均发车次数各公司发车次数各公司最大发车次数各公司最小发车次数各公司平均发车次数获取当前时间yyyyMMddHH构建要持久化的指标数据(需要判断计算的指标是否有值,若没有需要赋值默认值)通过StructType构建指定Schema创建车辆主题指标数据表(若存在则不创建)持久化指标数据到kudu表2.1、初始化环境变量package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* 车辆主题指标开发
*/
object TransportToolDWS extends OfflineApp{
//定义应用程序的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取车辆明细宽表的数据
* 4)对车辆明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
sparkSession.stop()
}
}2.2、加载车辆宽表增量数据并缓存加载车辆宽表的时候,需要指定日期条件,因为车辆主题最终需要Azkaban定时调度执行,每天执行一次增量数据,因此需要指定日期。//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)2.3、指标计算//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
//各网点的发车次数(西三旗:10,西二旗:20)
val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各网点的总发车次数
val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
//各网点的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
// 各区域发车次数
val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
// 各公司发车次数
val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
)
dotRows.append(rowInfo)
println(rowInfo)
ttDotDetailByDayDF.unpersist()
ttDotTotalCountDF.unpersist()
areaDotTotalCountDF.unpersist()
companyDotTotalCountDF.unpersist()
})
//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各仓库的总发车次数
val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
//各仓库的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
// 各区域发车次数
val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
// 各公司发车次数
val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
)
wsRows.append(rowInfo)
println(rowInfo)
ttWsDetailByDayDF.unpersist()
areaTransportToolTotalCountDF.unpersist()
companyTransportToolTotalCountDF.unpersist()
whTransportToolTotalCountDF.unpersist()
})2.4、通过StructType构建指定Schema//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数
StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数
StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数
StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数
StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))
//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数
StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数
StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数
StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数
StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))2.5、持久化指标数据到kudu表//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)2.6、完整代码package cn.it.logistics.offline.dws
import cn.it.logistics.common.{Configuration, DateHelper, OfflineTableDefine, SparkUtils}
import cn.it.logistics.offline.OfflineApp
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, LongType, Metadata, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
import org.apache.spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
/**
* 车辆主题指标开发
*/
object TransportToolDWS extends OfflineApp{
//定义应用程序的名称
val appName = this.getClass.getSimpleName
/**
* 入口函数
* @param args
*/
def main(args: Array[String]): Unit = {
/**
* 实现步骤:
* 1)创建SparkConf对象
* 2)创建SparkSession对象
* 3)读取车辆明细宽表的数据
* 4)对车辆明细宽表的数据进行指标的计算
* 5)将计算好的指标数据写入到kudu数据库中
* 5.1:定义指标结果表的schema信息
* 5.2:组织需要写入到kudu表的数据
* 5.3:判断指标结果表是否存在,如果不存在则创建
* 5.4:将数据写入到kudu表中
* 6)删除缓存数据
* 7)停止任务,退出sparksession
*/
//TODO 1)创建SparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(appName)
)
//TODO 2)创建SparkSession对象
val sparkSession: SparkSession = SparkUtils.getSparkSession(sparkConf)
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//处理数据
execute(sparkSession)
}
/**
* 数据处理
*
* @param sparkSession
*/
override def execute(sparkSession: SparkSession): Unit = {
//TODO 3)读取车辆明细宽表的数据
val ttDotDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.dotTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
val ttWarehouseDetailDF: DataFrame = getKuduSource(sparkSession, OfflineTableDefine.warehouseTransportToolDetail, Configuration.isFirstRunnable).persist(StorageLevel.DISK_ONLY_2)
//根据网点车辆的日期进行分组
val ttDotDetailGroupByDayDF: DataFrame = ttDotDetailDF.select("day").groupBy("day").count().cache()
//导入隐式转换
import sparkSession.implicits._
//定义计算好的指标结果集合对象
val dotRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历网点车辆每个日期的车辆明细宽表数据
ttDotDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttDotDetailByDayDF: DataFrame = ttDotDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
//各网点的发车次数(西三旗:10,西二旗:20)
val ttDotTotalCountDF: DataFrame = ttDotDetailByDayDF.groupBy($"dot_id").agg(count("dot_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各网点的总发车次数
val ttDotTotalCount: Row = ttDotTotalCountDF.agg(sum("cnt")).first()
//各网点的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgTTDotTotalCount: Row = ttDotTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(0)
val minTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(1)
val avgTTDotTotalCount: Any = maxAndMinAndAvgTTDotTotalCount(2)
// 各区域发车次数
val areaDotTotalCountDF = ttDotDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaDotTotalCount = areaDotTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaDotTotalCount = areaDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(0)
val minAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(1)
val avgAreaDotTotalCount = maxAndMinAndAvgAreaDotTotalCount(2)
// 各公司发车次数
val companyDotTotalCountDF = ttDotDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyDotTotalCount = companyDotTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyDotTotalCount = companyDotTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(0)
val minCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(1)
val avgCompanyDotTotalCount = maxAndMinAndAvgCompanyDotTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(ttDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTTDotTotalCount==null) 0L else maxTTDotTotalCount.asInstanceOf[Number].longValue(),
if(minTTDotTotalCount==null) 0L else minTTDotTotalCount.asInstanceOf[Number].longValue(),
if(avgTTDotTotalCount==null) 0L else avgTTDotTotalCount.asInstanceOf[Number].longValue(),
if(areaDotTotalCount.isNullAt(0)) 0L else areaDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaDotTotalCount==null) 0L else maxAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(minAreaDotTotalCount==null) 0L else minAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaDotTotalCount==null) 0L else avgAreaDotTotalCount.asInstanceOf[Number].longValue(),
if(companyDotTotalCount.isNullAt(0)) 0L else ttDotTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyDotTotalCount==null) 0L else maxCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyDotTotalCount==null) 0L else minCompanyDotTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyDotTotalCount==null) 0L else avgCompanyDotTotalCount.asInstanceOf[Number].longValue()
)
dotRows.append(rowInfo)
println(rowInfo)
ttDotDetailByDayDF.unpersist()
ttDotTotalCountDF.unpersist()
areaDotTotalCountDF.unpersist()
companyDotTotalCountDF.unpersist()
})
//根据仓库车辆的日期进行分组
val ttWsDetailGroupByDayDF: DataFrame = ttWarehouseDetailDF.select("day").groupBy("day").count().cache()
//定义计算好的指标结果集合对象
val wsRows: ArrayBuffer[Row] = ArrayBuffer[Row]()
//循环遍历仓库车辆每个日期的车辆明细宽表数据
ttWsDetailGroupByDayDF.collect().foreach(row=> {
//获取到要处理的数据所在的日期
val day: String = row.getAs[String](0)
//返回指定日期的仓库明细数据
val ttWsDetailByDayDF: DataFrame = ttWarehouseDetailDF.where(col("day") === day).toDF().persist(StorageLevel.DISK_ONLY_2)
val whTransportToolTotalCountDF: DataFrame = ttWsDetailByDayDF.groupBy($"ws_id").agg(count("ws_id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
//计算各仓库的总发车次数
val whTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(sum("cnt")).first()
//各仓库的最大发车次数、最小发车次数、平均发车次数
val maxAndMinAndAvgWhTransportToolTotalCount: Row = whTransportToolTotalCountDF.agg(max("cnt"), min("cnt"), avg("cnt")).first()
val maxTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(0)
val minTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(1)
val avgTransportToolCount: Any = maxAndMinAndAvgWhTransportToolTotalCount(2)
// 各区域发车次数
val areaTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("city_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val areaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各区域最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgAreaTransportToolTotalCount = areaTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxAreaTransportToolTotalCount: Any = maxAndMinAndAvgAreaTransportToolTotalCount(0)
val minAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(1)
val avgAreaTransportToolTotalCount = maxAndMinAndAvgAreaTransportToolTotalCount(2)
// 各公司发车次数
val companyTransportToolTotalCountDF = ttWsDetailByDayDF.groupBy("company_id").agg(count($"id").alias("cnt")).persist(StorageLevel.DISK_ONLY_2)
val companyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(sum($"cnt")).first()
// 各公司最大发车次数、最小发车次数和平均发车次数
val maxAndMinAndAvgCompanyTransportToolTotalCount = companyTransportToolTotalCountDF.agg(max($"cnt"), min($"cnt"), avg($"cnt")).first()
val maxCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(0)
val minCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(1)
val avgCompanyTransportToolTotalCount = maxAndMinAndAvgCompanyTransportToolTotalCount(2)
//将计算好的指标数据封装到row对象中
val rowInfo: Row = Row(
day,
if(whTransportToolTotalCount.isNullAt(0)) 0L else whTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxTransportToolCount==null) 0L else maxTransportToolCount.asInstanceOf[Number].longValue(),
if(minTransportToolCount==null) 0L else minTransportToolCount.asInstanceOf[Number].longValue(),
if(avgTransportToolCount==null) 0L else avgTransportToolCount.asInstanceOf[Number].longValue(),
if(areaTransportToolTotalCount.isNullAt(0)) 0L else areaTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxAreaTransportToolTotalCount==null) 0L else maxAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minAreaTransportToolTotalCount==null) 0L else minAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgAreaTransportToolTotalCount==null) 0L else avgAreaTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(companyTransportToolTotalCount.isNullAt(0)) 0L else companyTransportToolTotalCount.get(0).asInstanceOf[Number].longValue(),
if(maxCompanyTransportToolTotalCount==null) 0L else maxCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(minCompanyTransportToolTotalCount==null) 0L else minCompanyTransportToolTotalCount.asInstanceOf[Number].longValue(),
if(avgCompanyTransportToolTotalCount==null) 0L else avgCompanyTransportToolTotalCount.asInstanceOf[Number].longValue()
)
wsRows.append(rowInfo)
println(rowInfo)
ttWsDetailByDayDF.unpersist()
areaTransportToolTotalCountDF.unpersist()
companyTransportToolTotalCountDF.unpersist()
whTransportToolTotalCountDF.unpersist()
})
//定义指标结果表的shema信息
//网点车辆相关的表结构数据
val schemaDot: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("ttDotTotalCount", LongType, true, Metadata.empty), //各网点发车次数
StructField("maxTtDotTotalCount", LongType, true, Metadata.empty), //各网点最大发车次数
StructField("minTtDotTotalCount", LongType, true, Metadata.empty), //各网点最小发车次数
StructField("avgTtDotTotalCount", LongType, true, Metadata.empty), //各网点平均发车次数
StructField("areaDotTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaDotTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaDotTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyDotTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyDotTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyDotTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))
//仓库车辆相关的表结构数据
val schemaWs: StructType = StructType(Array(
StructField("id", StringType, false, Metadata.empty),
StructField("whTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库发车次数
StructField("maxWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最大发车次数
StructField("minWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库最小发车次数
StructField("avgWhTransportToolTotalCount", LongType, true, Metadata.empty), //各仓库平均发车次数
StructField("areaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域发车次数
StructField("maxAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最大发车次数
StructField("minAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域最小发车次数
StructField("avgAreaTransportToolTotalCount", LongType, true, Metadata.empty), //各区域平均发车次数
StructField("companyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司发车次数
StructField("maxCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最大发车次数
StructField("minCompanyTransportToolTotalCount", LongType, true, Metadata.empty), //各公司最小发车次数
StructField("avgCompanyTransportToolTotalCount", LongType, true, Metadata.empty) //各公司平均发车次数
))
//TODO 5)将计算好的指标数据写入到kudu数据库中
val dotRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(dotRows)
val wsRDD: RDD[Row] = sparkSession.sparkContext.makeRDD(wsRows)
val dotDataFrame: DataFrame = sparkSession.createDataFrame(dotRDD, schemaDot)
val wsDataFrame: DataFrame = sparkSession.createDataFrame(wsRDD, schemaWs)
save(dotDataFrame, OfflineTableDefine.ttDotSummary)
save(wsDataFrame, OfflineTableDefine.ttWsSummary)
// 6)删除缓存数据
ttDotDetailDF.unpersist()
ttWarehouseDetailDF.unpersist()
ttDotDetailGroupByDayDF.unpersist()
ttWsDetailGroupByDayDF.unpersist()
// 7)停止任务,退出sparksession
sparkSession.stop()
}
}