将消费的kafka数据同步到Kudu中
已经将kafka消费的数据转换成了OggMessageBean或者CanalMessageBean对象,每条数据对应着某张表的操作记录。
接下来需要根据具体的Table列,将数据转换成对应的JavaBean对象
具体步骤如下:
- 定义oracle和mysql表与实体类的映射关系
- 创建解析工具类 - 为了将OggMessageBean或者CanalMessageBean对象转换成具体的POJO对象
- 将OggMessageBean或者CanalMessageBean对象转换成具体的POJO对象
- 扩展自定义POJO的隐式转换
- 转换OGG和Canal对应的主题数据为具体的POJO对象
- 实现Kudu表的自动创建实现工具类
- 将数据写入到kudu数据库中
一、导入表名映射关系类
实现步骤:
- 在公共模块的scala目录下的common程序包下创建 TableMapping** **类
- 根据Oracle和Mysql数据库的表名定义属性,每个属性对应一个表名
- 使用Map对象存储表名与表对应的实体类的映射关系
参考代码:
package cn.it.logistics.common
/**
* 定义表名
* 根据表的名字定义属性
*/
class TableDefineMapping {
val empInfoMap = "tbl_emp_info_map"
val driver = "tbl_driver"
val emp = "tbl_emp"
val warehouseTransportTool = "tbl_warehouse_transport_tool"
val chargeStandard = "tbl_charge_standard"
val company = "tbl_company"
val companyDotMap = "tbl_company_dot_map"
val companyTransportRouteMa = "tbl_company_transport_route_ma"
val companyWarehouseMap = "tbl_company_warehouse_map"
val courier = "tbl_courier"
val deliverRegion = "tbl_deliver_region"
val deliveryRecord = "tbl_delivery_record"
val department = "tbl_department"
val fixedArea = "tbl_fixed_area"
val goodsRack = "tbl_goods_rack"
val job = "tbl_job"
val outWarehouseDetail = "tbl_out_warehouse_detail"
val pkg = "tbl_pkg"
val postalStandard = "tbl_postal_standard"
val pushWarehouseDetail = "tbl_push_warehouse_detail"
val serviceEvaluation = "tbl_service_evaluation"
val storeGrid = "tbl_store_grid"
val vehicleMonitor = "tbl_vehicle_monitor"
val warehouseRackMap = "tbl_warehouse_rack_map"
val warehouseReceiptDetail = "tbl_warehouse_receipt_detail"
val waybillLine = "tbl_waybill_line"
val waybillStateRecord = "tbl_waybill_state_record"
val workTime = "tbl_work_time"
val areas = "tbl_areas"
val deliverPackage = "tbl_deliver_package"
val customer = "tbl_customer"
val codes = "tbl_codes"
val warehouse = "tbl_warehouse"
val consumerAddressMap = "tbl_consumer_address_map"
val warehouseReceipt = "tbl_warehouse_receipt"
val warehouseSendVehicle = "tbl_warehouse_send_vehicle"
val warehouseVehicleMap = "tbl_warehouse_vehicle_map"
val dot = "tbl_dot"
val transportTool = "tbl_transport_tool"
val dotTransportTool = "tbl_dot_transport_tool"
val address = "tbl_address"
val route = "tbl_route"
val pushWarehouse = "tbl_push_warehouse"
val outWarehouse = "tbl_out_warehouse"
val warehouseEmp = "tbl_warehouse_emp"
val expressPackage = "tbl_express_package"
val expressBill = "tbl_express_bill"
val consumerSenderInfo = "tbl_consumer_sender_info"
val collectPackage = "tbl_collect_package"
val waybill = "tbl_waybill"
val transportRecord = "tbl_transport_record"
}
object TableMapping extends TableDefineMapping{
}
二、编写数据解析器根据表名解析成具体的POJO对象
实现步骤:
- 在etl模块的在parser目录下创建 ****DataParser ****类
package cn.it.logistics.etl.parser
import java.util
import java.util.Objects
import cn.it.logistics.common.beans.crm._
import cn.it.logistics.common.beans.logistics._
import cn.it.logistics.common.beans.parser.{CanalMessageBean, MessageBean, OggMessageBean}
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.serializer.SerializerFeature
import org.apache.commons.collections.CollectionUtils
/**
* 数据解析,将每张表的字段信息转换成javaBean对象
*/
class DataParser {
/**
* 判断messageBean是否是OggMessageBean
* @param bean
* @return
*/
private def getOggMessageBean(bean:MessageBean) : OggMessageBean ={
bean match {
case ogg:OggMessageBean => ogg
}
}
/**
* 判断messageBean是否是CanalMessageBean
* @param bean
* @return
*/
private def getCanalMessageBean(bean:MessageBean) : CanalMessageBean ={
bean match {
case canal:CanalMessageBean => canal
}
}
/**
* 提取ogg(I、U、D)和canal(insert、update、delete)数据的optype属性,转换成统一的操作字符串
* @param opType
*/
private def getOpType(opType:String) = {
opType match {
case "I" =>"insert"
case "U" =>"update"
case "D" =>"delete"
case "INSERT" =>"insert"
case "UPDATE" =>"update"
case "DELETE" =>"delete"
case _ => "insert"
}
}
/**
* 将tbl_areas表的字段信息转换成AreaBean的bean对象
* @param bean
* @return
*/
def toAreas(bean: MessageBean):AreasBean = {
val oggBean: OggMessageBean = getOggMessageBean(bean)
//1:返回需要处理的列的集合
val columnsMap: util.Map[String, AnyRef] = oggBean.getValue()
//2:将map对象转换成json格式的字符串
val areaJson = JSON.toJSONString(columnsMap, SerializerFeature.PrettyFormat)
//3:将json格式的数据反序列化成javaBean对象
val areaBean: AreasBean = JSON.parseObject(areaJson, classOf[AreasBean])
if(Objects.nonNull(areaBean)) {
areaBean.setOpType(getOpType(oggBean.getOp_type))
}
//返回反序列化后的javaBean对象
areaBean
}
def toAddress(bean: MessageBean): AddressBean = {
var res: AddressBean = null
val canal = getCanalMessageBean(bean)
val list: java.util.List[AddressBean] = JSON.parseArray(JSON.toJSONString(canal.getData, SerializerFeature.PrettyFormat), classOf[AddressBean])
if (!CollectionUtils.isEmpty(list)) {
res = list.get(0)
res.setOpType(getOpType(canal.getType))
}
res
}
def toCustomer(bean: MessageBean): CustomerBean = {
var res: CustomerBean = null
val canal = getCanalMessageBean(bean)
val list: java.util.List[CustomerBean] = JSON.parseArray(JSON.toJSONString(canal.getData, SerializerFeature.PrettyFormat), classOf[CustomerBean])
if (!CollectionUtils.isEmpty(list)) {
res = list.get(0)
res.setOpType(getOpType(canal.getType))
}
res
}
def toCustomerAddress(bean: MessageBean): CustomerAddressBean = {
var res = new CustomerAddressBean
val canal = getCanalMessageBean(bean)
val list: java.util.List[CustomerAddressBean] = JSON.parseArray(JSON.toJSONString(canal.getData, SerializerFeature.PrettyFormat), classOf[CustomerAddressBean])
if (!CollectionUtils.isEmpty(list)) {
res = list.get(0)
res.setOpType(getOpType(canal.getType))
}
res
}
def toChargeStandard(bean: MessageBean): ChargeStandardBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[ChargeStandardBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCodes(bean: MessageBean): CodesBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CodesBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCollectPackage(bean: MessageBean): CollectPackageBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CollectPackageBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCompany(bean: MessageBean): CompanyBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CompanyBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCompanyDotMap(bean: MessageBean): CompanyDotMapBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CompanyDotMapBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCompanyTransportRouteMa(bean: MessageBean): CompanyTransportRouteMaBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CompanyTransportRouteMaBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCompanyWarehouseMap(bean: MessageBean): CompanyWarehouseMapBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CompanyWarehouseMapBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toConsumerSenderInfo(bean: MessageBean): ConsumerSenderInfoBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[ConsumerSenderInfoBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toCourier(bean: MessageBean): CourierBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[CourierBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDeliverPackage(bean: MessageBean): DeliverPackageBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DeliverPackageBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDeliverRegion(bean: MessageBean): DeliverRegionBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DeliverRegionBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDeliveryRecord(bean: MessageBean): DeliveryRecordBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DeliveryRecordBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDepartment(bean: MessageBean): DepartmentBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DepartmentBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDot(bean: MessageBean): DotBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DotBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDotTransportTool(bean: MessageBean): DotTransportToolBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DotTransportToolBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toDriver(bean: MessageBean): DriverBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[DriverBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toEmp(bean: MessageBean): EmpBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[EmpBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toEmpInfoMap(bean: MessageBean): EmpInfoMapBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[EmpInfoMapBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toExpressBill(bean: MessageBean): ExpressBillBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[ExpressBillBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toExpressPackage(bean: MessageBean): ExpressPackageBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[ExpressPackageBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toFixedArea(bean: MessageBean): FixedAreaBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[FixedAreaBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toGoodsRack(bean: MessageBean): GoodsRackBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[GoodsRackBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toJob(bean: MessageBean): JobBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[JobBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toOutWarehouse(bean: MessageBean): OutWarehouseBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[OutWarehouseBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toOutWarehouseDetail(bean: MessageBean): OutWarehouseDetailBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[OutWarehouseDetailBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toPkg(bean: MessageBean): PkgBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[PkgBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toPostalStandard(bean: MessageBean): PostalStandardBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[PostalStandardBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toPushWarehouse(bean: MessageBean): PushWarehouseBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[PushWarehouseBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toPushWarehouseDetail(bean: MessageBean): PushWarehouseDetailBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[PushWarehouseDetailBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toRoute(bean: MessageBean): RouteBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[RouteBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toServiceEvaluation(bean: MessageBean): ServiceEvaluationBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[ServiceEvaluationBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toStoreGrid(bean: MessageBean): StoreGridBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[StoreGridBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toTransportTool(bean: MessageBean): TransportToolBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[TransportToolBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toVehicleMonitor(bean: MessageBean): VehicleMonitorBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[VehicleMonitorBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouse(bean: MessageBean): WarehouseBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseEmp(bean: MessageBean): WarehouseEmpBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseEmpBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseRackMap(bean: MessageBean): WarehouseRackMapBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseRackMapBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseReceipt(bean: MessageBean): WarehouseReceiptBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseReceiptBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseReceiptDetail(bean: MessageBean): WarehouseReceiptDetailBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseReceiptDetailBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseSendVehicle(bean: MessageBean): WarehouseSendVehicleBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseSendVehicleBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseTransportTool(bean: MessageBean): WarehouseTransportToolBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseTransportToolBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWarehouseVehicleMap(bean: MessageBean): WarehouseVehicleMapBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WarehouseVehicleMapBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWaybill(bean: MessageBean): WaybillBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WaybillBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWaybillLine(bean: MessageBean): WaybillLineBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WaybillLineBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWaybillStateRecord(bean: MessageBean): WaybillStateRecordBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WaybillStateRecordBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toWorkTime(bean: MessageBean): WorkTimeBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[WorkTimeBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
def toTransportRecordBean(bean: MessageBean): TransportRecordBean = {
val ogg = getOggMessageBean(bean)
val res = JSON.parseObject(JSON.toJSONString(ogg.getValue, SerializerFeature.PrettyFormat), classOf[TransportRecordBean])
if (Objects.nonNull(res)) {
res.setOpType(getOpType(ogg.getOp_type))
}
res
}
}
object DataParser extends DataParser{
}
三、扩展自定义POJO的隐式转换实现
实现步骤:
- 在公共模块的scala目录的common程序包下创建 BeanImplicit** **类
package cn.it.logistics.common
import cn.it.logistics.common.beans.crm.{AddressBean, CustomerAddressBean, CustomerBean}
import cn.it.logistics.common.beans.logistics._
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import org.apache.spark.sql.{Encoder, Encoders}
/**
* 扩展自定义POJO的隐式转换实现
*/
object BeanImplicit {
//定义javaBean的隐式转换
implicit val OggMessageBeanEncoder: Encoder[OggMessageBean] = Encoders.bean(classOf[OggMessageBean])
implicit val CanalMessageBeanEncoder: Encoder[CanalMessageBean] = Encoders.bean(classOf[CanalMessageBean])
// Logistics Bean
implicit val AreasBeanEncoder: Encoder[AreasBean] = Encoders.bean(classOf[AreasBean])
implicit val ChargeStandardBeanEncoder: Encoder[ChargeStandardBean] = Encoders.bean(classOf[ChargeStandardBean])
implicit val CodesBeanEncoder: Encoder[CodesBean] = Encoders.bean(classOf[CodesBean])
implicit val CollectPackageBeanEncoder: Encoder[CollectPackageBean] = Encoders.bean(classOf[CollectPackageBean])
implicit val CompanyBeanEncoder: Encoder[CompanyBean] = Encoders.bean(classOf[CompanyBean])
implicit val CompanyDotMapBeanEncoder: Encoder[CompanyDotMapBean] = Encoders.bean(classOf[CompanyDotMapBean])
implicit val CompanyTransportRouteMaBeanEncoder: Encoder[CompanyTransportRouteMaBean] = Encoders.bean(classOf[CompanyTransportRouteMaBean])
implicit val CompanyWarehouseMapBeanEncoder: Encoder[CompanyWarehouseMapBean] = Encoders.bean(classOf[CompanyWarehouseMapBean])
implicit val ConsumerSenderInfoBeanEncoder: Encoder[ConsumerSenderInfoBean] = Encoders.bean(classOf[ConsumerSenderInfoBean])
implicit val CourierBeanEncoder: Encoder[CourierBean] = Encoders.bean(classOf[CourierBean])
implicit val DeliverPackageBeanEncoder: Encoder[DeliverPackageBean] = Encoders.bean(classOf[DeliverPackageBean])
implicit val DeliverRegionBeanEncoder: Encoder[DeliverRegionBean] = Encoders.bean(classOf[DeliverRegionBean])
implicit val DeliveryRecordBeanEncoder: Encoder[DeliveryRecordBean] = Encoders.bean(classOf[DeliveryRecordBean])
implicit val DepartmentBeanEncoder: Encoder[DepartmentBean] = Encoders.bean(classOf[DepartmentBean])
implicit val DotBeanEncoder: Encoder[DotBean] = Encoders.bean(classOf[DotBean])
implicit val DotTransportToolBeanEncoder: Encoder[DotTransportToolBean] = Encoders.bean(classOf[DotTransportToolBean])
implicit val DriverBeanEncoder: Encoder[DriverBean] = Encoders.bean(classOf[DriverBean])
implicit val EmpBeanEncoder: Encoder[EmpBean] = Encoders.bean(classOf[EmpBean])
implicit val EmpInfoMapBeanEncoder: Encoder[EmpInfoMapBean] = Encoders.bean(classOf[EmpInfoMapBean])
implicit val ExpressBillBeanEncoder: Encoder[ExpressBillBean] = Encoders.bean(classOf[ExpressBillBean])
implicit val ExpressPackageBeanEncoder: Encoder[ExpressPackageBean] = Encoders.bean(classOf[ExpressPackageBean])
implicit val FixedAreaBeanEncoder: Encoder[FixedAreaBean] = Encoders.bean(classOf[FixedAreaBean])
implicit val GoodsRackBeanEncoder: Encoder[GoodsRackBean] = Encoders.bean(classOf[GoodsRackBean])
implicit val JobBeanEncoder: Encoder[JobBean] = Encoders.bean(classOf[JobBean])
implicit val OutWarehouseBeanEncoder: Encoder[OutWarehouseBean] = Encoders.bean(classOf[OutWarehouseBean])
implicit val OutWarehouseDetailBeanEncoder: Encoder[OutWarehouseDetailBean] = Encoders.bean(classOf[OutWarehouseDetailBean])
implicit val PkgBeanEncoder: Encoder[PkgBean] = Encoders.bean(classOf[PkgBean])
implicit val PostalStandardBeanEncoder: Encoder[PostalStandardBean] = Encoders.bean(classOf[PostalStandardBean])
implicit val PushWarehouseBeanEncoder: Encoder[PushWarehouseBean] = Encoders.bean(classOf[PushWarehouseBean])
implicit val PushWarehouseDetailBeanEncoder: Encoder[PushWarehouseDetailBean] = Encoders.bean(classOf[PushWarehouseDetailBean])
implicit val RouteBeanEncoder: Encoder[RouteBean] = Encoders.bean(classOf[RouteBean])
implicit val ServiceEvaluationBeanEncoder: Encoder[ServiceEvaluationBean] = Encoders.bean(classOf[ServiceEvaluationBean])
implicit val StoreGridBeanEncoder: Encoder[StoreGridBean] = Encoders.bean(classOf[StoreGridBean])
implicit val TransportToolBeanEncoder: Encoder[TransportToolBean] = Encoders.bean(classOf[TransportToolBean])
implicit val VehicleMonitorBeanEncoder: Encoder[VehicleMonitorBean] = Encoders.bean(classOf[VehicleMonitorBean])
implicit val WarehouseBeanEncoder: Encoder[WarehouseBean] = Encoders.bean(classOf[WarehouseBean])
implicit val WarehouseEmpBeanEncoder: Encoder[WarehouseEmpBean] = Encoders.bean(classOf[WarehouseEmpBean])
implicit val WarehouseRackMapBeanEncoder: Encoder[WarehouseRackMapBean] = Encoders.bean(classOf[WarehouseRackMapBean])
implicit val WarehouseReceiptBeanEncoder: Encoder[WarehouseReceiptBean] = Encoders.bean(classOf[WarehouseReceiptBean])
implicit val WarehouseReceiptDetailBeanEncoder: Encoder[WarehouseReceiptDetailBean] = Encoders.bean(classOf[WarehouseReceiptDetailBean])
implicit val WarehouseSendVehicleBeanEncoder: Encoder[WarehouseSendVehicleBean] = Encoders.bean(classOf[WarehouseSendVehicleBean])
implicit val WarehouseTransportToolBeanEncoder: Encoder[WarehouseTransportToolBean] = Encoders.bean(classOf[WarehouseTransportToolBean])
implicit val WarehouseVehicleMapBeanEncoder: Encoder[WarehouseVehicleMapBean] = Encoders.bean(classOf[WarehouseVehicleMapBean])
implicit val WaybillBeanEncoder: Encoder[WaybillBean] = Encoders.bean(classOf[WaybillBean])
implicit val WaybillLineBeanEncoder: Encoder[WaybillLineBean] = Encoders.bean(classOf[WaybillLineBean])
implicit val WaybillStateRecordBeanEncoder: Encoder[WaybillStateRecordBean] = Encoders.bean(classOf[WaybillStateRecordBean])
implicit val WorkTimeBeanEncoder: Encoder[WorkTimeBean] = Encoders.bean(classOf[WorkTimeBean])
implicit val TransportRecordBeanEncoder: Encoder[TransportRecordBean] = Encoders.bean(classOf[TransportRecordBean])
// CRM Bean
implicit val CustomerBeanEncoder: Encoder[CustomerBean] = Encoders.bean(classOf[CustomerBean])
implicit val AddressBeanEncoder: Encoder[AddressBean] = Encoders.bean(classOf[AddressBean])
implicit val CustomerAddressBeanEncoder: Encoder[CustomerAddressBean] = Encoders.bean(classOf[CustomerAddressBean])
}
四、转换Ogg和Canal对应主题的数据为具体的POJO对象
实现步骤:
- 在etl模块的realtime目录下 KuduStreamApp单例对象的execute方法 - 导入自定义POJO的隐式转换- 转换Ogg和Canal对应数据为具体的POJO对象- 测试转换是否成功
参考代码:
//导入隐式转换
import sparkSession.implicits._
//导入自定义的POJO的隐士转换
import cn.it.logistics.common.BeanImplicit._
//首先在topic中过滤出来每张表的数据,然后将该表对应的字段信息转换成javabean对象
// 转换Ogg和Canal对应主题的数据为具体的POJO对象
val areasDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.areas).map(bean => DataParser.toAreas(bean))(AreasBeanEncoder).toDF()
val warehouseSendVehicleDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseSendVehicle).map(bean => DataParser.toWarehouseSendVehicle(bean))(WarehouseSendVehicleBeanEncoder).toDF()
val waybillLineDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybillLine).map(bean => DataParser.toWaybillLine(bean))(WaybillLineBeanEncoder).toDF()
val chargeStandardDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.chargeStandard).map(bean => DataParser.toChargeStandard(bean))(ChargeStandardBeanEncoder).toDF()
val codesDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.codes).map(bean => DataParser.toCodes(bean))(CodesBeanEncoder).toDF()
val collectPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.collectPackage).map(bean => DataParser.toCollectPackage(bean))(CollectPackageBeanEncoder).toDF()
val companyDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.company).map(bean => DataParser.toCompany(bean))(CompanyBeanEncoder).toDF()
val companyDotMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyDotMap).map(bean => DataParser.toCompanyDotMap(bean))(CompanyDotMapBeanEncoder).toDF()
val companyTransportRouteMaDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyTransportRouteMa).map(bean => DataParser.toCompanyTransportRouteMa(bean))(CompanyTransportRouteMaBeanEncoder).toDF()
val companyWarehouseMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyWarehouseMap).map(bean => DataParser.toCompanyWarehouseMap(bean))(CompanyWarehouseMapBeanEncoder).toDF()
val consumerSenderInfoDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.consumerSenderInfo).map(bean => DataParser.toConsumerSenderInfo(bean))(ConsumerSenderInfoBeanEncoder).toDF()
val courierDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.courier).map(bean => DataParser.toCourier(bean))(CourierBeanEncoder).toDF()
val deliverPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliverPackage).map(bean => DataParser.toDeliverPackage(bean))(DeliverPackageBeanEncoder).toDF()
val deliverRegionDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliverRegion).map(bean => DataParser.toDeliverRegion(bean))(DeliverRegionBeanEncoder).toDF()
val deliveryRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliveryRecord).map(bean => DataParser.toDeliveryRecord(bean))(DeliveryRecordBeanEncoder).toDF()
val departmentDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.department).map(bean => DataParser.toDepartment(bean))(DepartmentBeanEncoder).toDF()
val dotDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.dot).map(bean => DataParser.toDot(bean))(DotBeanEncoder).toDF()
val dotTransportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.dotTransportTool).map(bean => DataParser.toDotTransportTool(bean))(DotTransportToolBeanEncoder).toDF()
val driverDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.driver).map(bean => DataParser.toDriver(bean))(DriverBeanEncoder).toDF()
val empDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.emp).map(bean => DataParser.toEmp(bean))(EmpBeanEncoder).toDF()
val empInfoMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.empInfoMap).map(bean => DataParser.toEmpInfoMap(bean))(EmpInfoMapBeanEncoder).toDF()
val expressBillDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.expressBill).map(bean => DataParser.toExpressBill(bean))(ExpressBillBeanEncoder).toDF()
val expressPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.expressPackage).map(bean => DataParser.toExpressPackage(bean))(ExpressPackageBeanEncoder).toDF()
val fixedAreaDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.fixedArea).map(bean => DataParser.toFixedArea(bean))(FixedAreaBeanEncoder).toDF()
val goodsRackDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.goodsRack).map(bean => DataParser.toGoodsRack(bean))(GoodsRackBeanEncoder).toDF()
val jobDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.job).map(bean => DataParser.toJob(bean))(JobBeanEncoder).toDF()
val outWarehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.outWarehouse).map(bean => DataParser.toOutWarehouse(bean))(OutWarehouseBeanEncoder).toDF()
val outWarehouseDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.outWarehouseDetail).map(bean => DataParser.toOutWarehouseDetail(bean))(OutWarehouseDetailBeanEncoder).toDF()
val pkgDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pkg).map(bean => DataParser.toPkg(bean))(PkgBeanEncoder).toDF()
val postalStandardDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.postalStandard).map(bean => DataParser.toPostalStandard(bean))(PostalStandardBeanEncoder).toDF()
val pushWarehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pushWarehouse).map(bean => DataParser.toPushWarehouse(bean))(PushWarehouseBeanEncoder).toDF()
val pushWarehouseDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pushWarehouseDetail).map(bean => DataParser.toPushWarehouseDetail(bean))(PushWarehouseDetailBeanEncoder).toDF()
val routeDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.route).map(bean => DataParser.toRoute(bean))(RouteBeanEncoder).toDF()
val serviceEvaluationDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.serviceEvaluation).map(bean => DataParser.toServiceEvaluation(bean))(ServiceEvaluationBeanEncoder).toDF()
val storeGridDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.storeGrid).map(bean => DataParser.toStoreGrid(bean))(StoreGridBeanEncoder).toDF()
val transportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.transportTool).map(bean => DataParser.toTransportTool(bean))(TransportToolBeanEncoder).toDF()
val vehicleMonitorDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.vehicleMonitor).map(bean => DataParser.toVehicleMonitor(bean))(VehicleMonitorBeanEncoder).toDF()
val warehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouse).map(bean => DataParser.toWarehouse(bean))(WarehouseBeanEncoder).toDF()
val warehouseEmpDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseEmp).map(bean => DataParser.toWarehouseEmp(bean))(WarehouseEmpBeanEncoder).toDF()
val warehouseRackMapDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseRackMap).map(bean => DataParser.toWarehouseRackMap(bean))(WarehouseRackMapBeanEncoder).toDF()
val warehouseReceiptDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseReceipt).map(bean => DataParser.toWarehouseReceipt(bean))(WarehouseReceiptBeanEncoder).toDF()
val warehouseReceiptDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseReceiptDetail).map(bean => DataParser.toWarehouseReceiptDetail(bean))(WarehouseReceiptDetailBeanEncoder).toDF()
val warehouseTransportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseTransportTool).map(bean => DataParser.toWarehouseTransportTool(bean))(WarehouseTransportToolBeanEncoder).toDF()
val warehouseVehicleMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseVehicleMap).map(bean => DataParser.toWarehouseVehicleMap(bean))(WarehouseVehicleMapBeanEncoder).toDF()
val waybillDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybill).map(bean => DataParser.toWaybill(bean))(WaybillBeanEncoder).toDF()
val waybillStateRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybillStateRecord).map(bean => DataParser.toWaybillStateRecord(bean))(WaybillStateRecordBeanEncoder).toDF()
val workTimeDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.workTime).map(bean => DataParser.toWorkTime(bean))(WorkTimeBeanEncoder).toDF()
val transportRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.transportRecord).map(bean => DataParser.toTransportRecordBean(bean))(TransportRecordBeanEncoder).toDF()
val addressDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.address).map(bean => DataParser.toAddress(bean))(AddressBeanEncoder).toDF()
val customerDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.customer).map(bean => DataParser.toCustomer(bean))(CustomerBeanEncoder).toDF()
val consumerAddressMapDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.consumerAddressMap).map(bean => DataParser.toCustomerAddress(bean))(CustomerAddressBeanEncoder).toDF()
五、实现Kudu表的自动创建工具类
将OGG数据和Canal数据转换成具体的POJO对象以后,需要将数据写入到KUDU数据库中,写入的前提是Kudu数据库中必须要有一个同名的表,然后才可以存储输入端的源数据,因此编写工具类实现表是否存在的判断逻辑,如果表不存在则在kudu数据库中创建表
实现步骤:
- 在公共模块的common程序包下创建 Tools** **类
- 创建方法:实现kudu中表不存在则创建的逻辑
参考代码:
package cn.it.logistics.common
import java.util
import org.apache.commons.lang3.StringUtils
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{Metadata, StructField, StructType}
/**
* kudu操作的工具类
*/
object Tools {
/**
* 创建kudu表
*/
def autoCreateKuduTable(tableName:String, dataFrame: DataFrame, primaryField:String = "id", showSchema:Boolean = false)={
/**
* 创建kudu表的约束?
* kudu表中必须要有一个主键列,更新数据和删除数据都需要根据主键更新,应该使用哪个列作为主键?id
* kudu表的字段有哪些?oracle和mysql的表有哪些字段,kudu表就有哪些字段
* kudu表的名字是什么?使用数据中传递的表明作为kudu的表名
*/
/**
* 实现步骤:
* 1)获取kudu的上下文对象:KuduContext
* 2)判断kudu中是否存在这张表,如果不存在则创建
* 3)判断是否指定了主键列
* 4)生成kudu表的结构信息
* 5)创建表
*/
//1)获取kudu的上下文对象:KuduContext
val kuduContext: KuduContext = new KuduContext(Configuration.kuduRpcAddress, dataFrame.sqlContext.sparkContext)
//2)判断kudu中是否存在这张表,如果不存在则创建
if(!kuduContext.tableExists(tableName)){
//创建表
//3)判断是否指定了主键列
if(StringUtils.isEmpty(primaryField)){
println(s"没有为${tableName}指定主键字段,将使用默认【id】作为主键列,如果表中存在该字段则创建成功,否则抛出异常退出程序!")
}
//如果打印schema
if(showSchema) {
println("=========原始数据中的schema==============")
/**
* root
* |-- citycode: string (nullable = true)
* |-- id: long (nullable = true)
* |-- lat: double (nullable = true)
* |-- level: string (nullable = true)
* |-- lng: double (nullable = true)
* |-- mername: string (nullable = true)
* |-- name: string (nullable = true)
* |-- opType: string (nullable = true)
* |-- pid: long (nullable = true)
* |-- pinyin: string (nullable = true)
* |-- sname: string (nullable = true)
* |-- yzcode: string (nullable = true)
*/
dataFrame.printSchema()
}
//4)生成kudu表的结构信息(使用dataframe的schema作为kudu表的字段信息)
//在kudu中主键列是不能为空的, 但是schema信息中的所有列都是可以为空的, 所以需要将主键列设置为非空类型
val schema: StructType = new StructType(dataFrame.schema.map(field => {
StructField.apply(field.name, field.dataType, {
//判断当前的列名是否是主键列,如果是主键列,则列不能允许为空
if (primaryField == field.name) false else true
}, Metadata.empty)
}).toArray)
if(showSchema) {
println("=========kudu表设置主键列以后的schema==============")
schema.printTreeString()
}
//指定主键列
val primaryFieldName: String = dataFrame.schema.apply(primaryField).name
val createTableOptions = new CreateTableOptions()
//指定副本数
createTableOptions.setNumReplicas(1)
//指定分区方式
createTableOptions.addHashPartitions(util.Arrays.asList(primaryFieldName), 3)
//创建表
kuduContext.createTable(tableName, schema, Array(primaryFieldName), createTableOptions)
}else{
println(s"${tableName}表已经存在,无需创建!")
}
}
}
六、将数据写入到kudu数据库
实现步骤:
- 在etl模块的realtime目录下 KuduStreamApp** **单例对象重写 ****save ****方法
- 在execute方法中调用save方法实现数据写入
实现过程:
- 在realtime目录下 KuduStreamApp** **单例对象重写 ****save ****方法
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
/**
* 保存数据的执行逻辑:
* 1)构建连接kudu数据源的连接信息
* 2)判断kudu数据库中是否存在指定表名的表(如果表不存在,则创建)
* 如果不创建表,写入数据报错:the table does not exist: table_name: "tbl_areas"
* 3)将数据写入到kudu中
*/
//1)构建连接kudu数据源的连接信息
val options: Map[String, String] = Map(
"kudu.master"-> Configuration.kuduRpcAddress,
"kudu.table"-> tableName
)
//2)判断kudu数据库中是否存在指定表名的表(如果表不存在,则创建)
if(isAutoCreateTable){
Tools.autoCreateKuduTable(tableName, dataFrame, "id")
}
//3)将数据写入到kudu中
dataFrame.writeStream
.format(Configuration.SPARK_KUDU_FORMAT)
.options(options)
.outputMode(OutputMode.Update())
.queryName(s"${tableName}-${Configuration.SPARK_KUDU_FORMAT}")
.start()
}
- 在execute方法中调用save方法实现数据写入
// 保存到Kudu
save(areasDF, TableMapping.areas)
save(warehouseSendVehicleDF, TableMapping.warehouseSendVehicle)
save(waybillLineDF, TableMapping.waybillLine)
save(chargeStandardDF, TableMapping.chargeStandard)
save(codesDF, TableMapping.codes)
save(collectPackageDF, TableMapping.collectPackage)
save(companyDF, TableMapping.company)
save(companyDotMapDF, TableMapping.companyDotMap)
save(companyTransportRouteMaDF, TableMapping.companyTransportRouteMa)
save(companyWarehouseMapDF, TableMapping.companyWarehouseMap)
save(consumerSenderInfoDF, TableMapping.consumerSenderInfo)
save(courierDF, TableMapping.courier)
save(deliverPackageDF, TableMapping.deliverPackage)
save(deliverRegionDF, TableMapping.deliverRegion)
save(deliveryRecordDF, TableMapping.deliveryRecord)
save(departmentDF, TableMapping.department)
save(dotDF, TableMapping.dot)
save(dotTransportToolDF, TableMapping.dotTransportTool)
save(driverDF, TableMapping.driver)
save(empDF, TableMapping.emp)
save(empInfoMapDF, TableMapping.empInfoMap)
save(expressBillDF, TableMapping.expressBill)
save(expressPackageDF, TableMapping.expressPackage)
save(fixedAreaDF, TableMapping.fixedArea)
save(goodsRackDF, TableMapping.goodsRack)
save(jobDF, TableMapping.job)
save(outWarehouseDF, TableMapping.outWarehouse)
save(outWarehouseDetailDF, TableMapping.outWarehouseDetail)
save(pkgDF, TableMapping.pkg)
save(postalStandardDF, TableMapping.postalStandard)
save(pushWarehouseDF, TableMapping.pushWarehouse)
save(pushWarehouseDetailDF, TableMapping.pushWarehouseDetail)
save(routeDF, TableMapping.route)
save(serviceEvaluationDF, TableMapping.serviceEvaluation)
save(storeGridDF, TableMapping.storeGrid)
save(transportToolDF, TableMapping.transportTool)
save(vehicleMonitorDF, TableMapping.vehicleMonitor)
save(warehouseDF, TableMapping.warehouse)
save(warehouseEmpDF, TableMapping.warehouseEmp)
save(warehouseRackMapDF, TableMapping.warehouseRackMap)
save(warehouseReceiptDF, TableMapping.warehouseReceipt)
save(warehouseReceiptDetailDF, TableMapping.warehouseReceiptDetail)
save(warehouseTransportToolDF, TableMapping.warehouseTransportTool)
save(warehouseVehicleMapDF, TableMapping.warehouseVehicleMap)
save(waybillDF, TableMapping.waybill)
save(waybillStateRecordDF, TableMapping.waybillStateRecord)
save(workTimeDF, TableMapping.workTime)
save(transportRecordDF, TableMapping.transportRecord)
save(addressDF, TableMapping.address)
save(customerDF, TableMapping.customer)
save(consumerAddressMapDF, TableMapping.consumerAddressMap)
七、完整代码
package cn.it.logistics.etl.realtime
import cn.it.logistics.common.beans.parser.{CanalMessageBean, OggMessageBean}
import cn.it.logistics.common.{Configuration, SparkUtils, TableMapping, Tools}
import cn.it.logistics.etl.parser.DataParser
import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, Dataset, Encoders, SparkSession}
/**
* Kudu数据管道应用
* 实现KUDU数据库的实时ETL操作
*/
object KuduStreamApp extends StreamApp {
/**
* 入口方法
* @param args
*/
def main(args: Array[String]): Unit = {
//创建sparkConf对象
val sparkConf: SparkConf = SparkUtils.autoSettingEnv(
SparkUtils.sparkConf(this.getClass.getSimpleName)
)
//数据处理
execute(sparkConf)
}
/**
* 数据的处理
*
* @param sparkConf
*/
override def execute(sparkConf: SparkConf): Unit = {
/**
* 实现步骤:
* 1)创建sparksession对象
* 2)获取数据源(获取物流相关数据以及crm相关数据)
* 3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
* 4)抽取每条数据的字段信息
* 5)将过滤出来的每张表写入到kudu数据库
*/
//1)创建sparksession对象
val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
sparkSession.sparkContext.setLogLevel(Configuration.LOG_OFF)
//2)获取数据源(获取物流相关数据以及crm相关数据)
//2.1:获取物流系统相关的数据
val logisticsDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaLogisticsTopic)
//2.2:获取客户关系系统相关的数据
val crmDF: DataFrame = getKafkaSource(sparkSession, Configuration.kafkaCrmTopic)
//3)对数据进行处理(返回的数据是字符串类型,需要转换成javabean对象)
//3.1:物流相关数据的转换
val logsticsMessageBean: Dataset[OggMessageBean] = logisticsDF.filter(!_.isNullAt(0)).mapPartitions(iters => {
iters.map(row => {
//获取到value列的值(字符串)
val jsonStr: String = row.getAs[String](0)
//将字符串转换成javabean对象
JSON.parseObject(jsonStr, classOf[OggMessageBean])
}).toList.iterator
})(Encoders.bean(classOf[OggMessageBean]))
//3.2:客户关系相关数据的转换
val crmMessageBean: Dataset[CanalMessageBean] = crmDF.filter(!_.isNullAt(0)).mapPartitions(iters=>{
//canal同步的数据除了增删改操作以外,还有清空表数据的操作,因此将清空表数据的操作过滤掉
iters.filter(row=>{
//取到value列的数据
val line: String = row.getAs[String](0)
//如果value列的值不为空,且是清空表的操作
if(line!=null && line.toUpperCase().contains("TRUNCATE")) false else true
}).map(row=>{
//取到value列的数据
val jsonStr: String = row.getAs[String](0)
//将json字符串转换成javaBean对象
JSON.parseObject(jsonStr, classOf[CanalMessageBean])
}).toList.toIterator
})(Encoders.bean(classOf[CanalMessageBean]))
//导入隐式转换
import sparkSession.implicits._
//导入自定义的POJO的隐士转换
import cn.it.logistics.common.BeanImplicit._
//首先在topic中过滤出来每张表的数据,然后将该表对应的字段信息转换成javabean对象
// 转换Ogg和Canal对应主题的数据为具体的POJO对象
val areasDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.areas).map(bean => DataParser.toAreas(bean))(AreasBeanEncoder).toDF()
val warehouseSendVehicleDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseSendVehicle).map(bean => DataParser.toWarehouseSendVehicle(bean))(WarehouseSendVehicleBeanEncoder).toDF()
val waybillLineDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybillLine).map(bean => DataParser.toWaybillLine(bean))(WaybillLineBeanEncoder).toDF()
val chargeStandardDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.chargeStandard).map(bean => DataParser.toChargeStandard(bean))(ChargeStandardBeanEncoder).toDF()
val codesDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.codes).map(bean => DataParser.toCodes(bean))(CodesBeanEncoder).toDF()
val collectPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.collectPackage).map(bean => DataParser.toCollectPackage(bean))(CollectPackageBeanEncoder).toDF()
val companyDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.company).map(bean => DataParser.toCompany(bean))(CompanyBeanEncoder).toDF()
val companyDotMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyDotMap).map(bean => DataParser.toCompanyDotMap(bean))(CompanyDotMapBeanEncoder).toDF()
val companyTransportRouteMaDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyTransportRouteMa).map(bean => DataParser.toCompanyTransportRouteMa(bean))(CompanyTransportRouteMaBeanEncoder).toDF()
val companyWarehouseMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.companyWarehouseMap).map(bean => DataParser.toCompanyWarehouseMap(bean))(CompanyWarehouseMapBeanEncoder).toDF()
val consumerSenderInfoDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.consumerSenderInfo).map(bean => DataParser.toConsumerSenderInfo(bean))(ConsumerSenderInfoBeanEncoder).toDF()
val courierDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.courier).map(bean => DataParser.toCourier(bean))(CourierBeanEncoder).toDF()
val deliverPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliverPackage).map(bean => DataParser.toDeliverPackage(bean))(DeliverPackageBeanEncoder).toDF()
val deliverRegionDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliverRegion).map(bean => DataParser.toDeliverRegion(bean))(DeliverRegionBeanEncoder).toDF()
val deliveryRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.deliveryRecord).map(bean => DataParser.toDeliveryRecord(bean))(DeliveryRecordBeanEncoder).toDF()
val departmentDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.department).map(bean => DataParser.toDepartment(bean))(DepartmentBeanEncoder).toDF()
val dotDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.dot).map(bean => DataParser.toDot(bean))(DotBeanEncoder).toDF()
val dotTransportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.dotTransportTool).map(bean => DataParser.toDotTransportTool(bean))(DotTransportToolBeanEncoder).toDF()
val driverDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.driver).map(bean => DataParser.toDriver(bean))(DriverBeanEncoder).toDF()
val empDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.emp).map(bean => DataParser.toEmp(bean))(EmpBeanEncoder).toDF()
val empInfoMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.empInfoMap).map(bean => DataParser.toEmpInfoMap(bean))(EmpInfoMapBeanEncoder).toDF()
val expressBillDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.expressBill).map(bean => DataParser.toExpressBill(bean))(ExpressBillBeanEncoder).toDF()
val expressPackageDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.expressPackage).map(bean => DataParser.toExpressPackage(bean))(ExpressPackageBeanEncoder).toDF()
val fixedAreaDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.fixedArea).map(bean => DataParser.toFixedArea(bean))(FixedAreaBeanEncoder).toDF()
val goodsRackDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.goodsRack).map(bean => DataParser.toGoodsRack(bean))(GoodsRackBeanEncoder).toDF()
val jobDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.job).map(bean => DataParser.toJob(bean))(JobBeanEncoder).toDF()
val outWarehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.outWarehouse).map(bean => DataParser.toOutWarehouse(bean))(OutWarehouseBeanEncoder).toDF()
val outWarehouseDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.outWarehouseDetail).map(bean => DataParser.toOutWarehouseDetail(bean))(OutWarehouseDetailBeanEncoder).toDF()
val pkgDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pkg).map(bean => DataParser.toPkg(bean))(PkgBeanEncoder).toDF()
val postalStandardDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.postalStandard).map(bean => DataParser.toPostalStandard(bean))(PostalStandardBeanEncoder).toDF()
val pushWarehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pushWarehouse).map(bean => DataParser.toPushWarehouse(bean))(PushWarehouseBeanEncoder).toDF()
val pushWarehouseDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.pushWarehouseDetail).map(bean => DataParser.toPushWarehouseDetail(bean))(PushWarehouseDetailBeanEncoder).toDF()
val routeDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.route).map(bean => DataParser.toRoute(bean))(RouteBeanEncoder).toDF()
val serviceEvaluationDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.serviceEvaluation).map(bean => DataParser.toServiceEvaluation(bean))(ServiceEvaluationBeanEncoder).toDF()
val storeGridDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.storeGrid).map(bean => DataParser.toStoreGrid(bean))(StoreGridBeanEncoder).toDF()
val transportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.transportTool).map(bean => DataParser.toTransportTool(bean))(TransportToolBeanEncoder).toDF()
val vehicleMonitorDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.vehicleMonitor).map(bean => DataParser.toVehicleMonitor(bean))(VehicleMonitorBeanEncoder).toDF()
val warehouseDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouse).map(bean => DataParser.toWarehouse(bean))(WarehouseBeanEncoder).toDF()
val warehouseEmpDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseEmp).map(bean => DataParser.toWarehouseEmp(bean))(WarehouseEmpBeanEncoder).toDF()
val warehouseRackMapDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseRackMap).map(bean => DataParser.toWarehouseRackMap(bean))(WarehouseRackMapBeanEncoder).toDF()
val warehouseReceiptDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseReceipt).map(bean => DataParser.toWarehouseReceipt(bean))(WarehouseReceiptBeanEncoder).toDF()
val warehouseReceiptDetailDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseReceiptDetail).map(bean => DataParser.toWarehouseReceiptDetail(bean))(WarehouseReceiptDetailBeanEncoder).toDF()
val warehouseTransportToolDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseTransportTool).map(bean => DataParser.toWarehouseTransportTool(bean))(WarehouseTransportToolBeanEncoder).toDF()
val warehouseVehicleMapDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.warehouseVehicleMap).map(bean => DataParser.toWarehouseVehicleMap(bean))(WarehouseVehicleMapBeanEncoder).toDF()
val waybillDF: DataFrame = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybill).map(bean => DataParser.toWaybill(bean))(WaybillBeanEncoder).toDF()
val waybillStateRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.waybillStateRecord).map(bean => DataParser.toWaybillStateRecord(bean))(WaybillStateRecordBeanEncoder).toDF()
val workTimeDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.workTime).map(bean => DataParser.toWorkTime(bean))(WorkTimeBeanEncoder).toDF()
val transportRecordDF = logsticsMessageBean.filter(bean => bean.getTable == TableMapping.transportRecord).map(bean => DataParser.toTransportRecordBean(bean))(TransportRecordBeanEncoder).toDF()
val addressDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.address).map(bean => DataParser.toAddress(bean))(AddressBeanEncoder).toDF()
val customerDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.customer).map(bean => DataParser.toCustomer(bean))(CustomerBeanEncoder).toDF()
val consumerAddressMapDF = crmMessageBean.filter(bean => bean.getTable == TableMapping.consumerAddressMap).map(bean => DataParser.toCustomerAddress(bean))(CustomerAddressBeanEncoder).toDF()
// 保存到Kudu
save(areasDF, TableMapping.areas)
save(warehouseSendVehicleDF, TableMapping.warehouseSendVehicle)
save(waybillLineDF, TableMapping.waybillLine)
save(chargeStandardDF, TableMapping.chargeStandard)
save(codesDF, TableMapping.codes)
save(collectPackageDF, TableMapping.collectPackage)
save(companyDF, TableMapping.company)
save(companyDotMapDF, TableMapping.companyDotMap)
save(companyTransportRouteMaDF, TableMapping.companyTransportRouteMa)
save(companyWarehouseMapDF, TableMapping.companyWarehouseMap)
save(consumerSenderInfoDF, TableMapping.consumerSenderInfo)
save(courierDF, TableMapping.courier)
save(deliverPackageDF, TableMapping.deliverPackage)
save(deliverRegionDF, TableMapping.deliverRegion)
save(deliveryRecordDF, TableMapping.deliveryRecord)
save(departmentDF, TableMapping.department)
save(dotDF, TableMapping.dot)
save(dotTransportToolDF, TableMapping.dotTransportTool)
save(driverDF, TableMapping.driver)
save(empDF, TableMapping.emp)
save(empInfoMapDF, TableMapping.empInfoMap)
save(expressBillDF, TableMapping.expressBill)
save(expressPackageDF, TableMapping.expressPackage)
save(fixedAreaDF, TableMapping.fixedArea)
save(goodsRackDF, TableMapping.goodsRack)
save(jobDF, TableMapping.job)
save(outWarehouseDF, TableMapping.outWarehouse)
save(outWarehouseDetailDF, TableMapping.outWarehouseDetail)
save(pkgDF, TableMapping.pkg)
save(postalStandardDF, TableMapping.postalStandard)
save(pushWarehouseDF, TableMapping.pushWarehouse)
save(pushWarehouseDetailDF, TableMapping.pushWarehouseDetail)
save(routeDF, TableMapping.route)
save(serviceEvaluationDF, TableMapping.serviceEvaluation)
save(storeGridDF, TableMapping.storeGrid)
save(transportToolDF, TableMapping.transportTool)
save(vehicleMonitorDF, TableMapping.vehicleMonitor)
save(warehouseDF, TableMapping.warehouse)
save(warehouseEmpDF, TableMapping.warehouseEmp)
save(warehouseRackMapDF, TableMapping.warehouseRackMap)
save(warehouseReceiptDF, TableMapping.warehouseReceipt)
save(warehouseReceiptDetailDF, TableMapping.warehouseReceiptDetail)
save(warehouseTransportToolDF, TableMapping.warehouseTransportTool)
save(warehouseVehicleMapDF, TableMapping.warehouseVehicleMap)
save(waybillDF, TableMapping.waybill)
save(waybillStateRecordDF, TableMapping.waybillStateRecord)
save(workTimeDF, TableMapping.workTime)
save(transportRecordDF, TableMapping.transportRecord)
save(addressDF, TableMapping.address)
save(customerDF, TableMapping.customer)
save(consumerAddressMapDF, TableMapping.consumerAddressMap)
// 设置Streaming应用输出及启动
logisticsDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("logistics").start()
crmDF.writeStream.outputMode(OutputMode.Update())
.format("console").queryName("crm").start()
//8)启动运行等待停止
val stream = sparkSession.streams
//stream.active:获取当前活动流式查询的列表
stream.active.foreach(query => println(s"准备启动的查询:${query.name}"))
//线程阻塞,等待终止
stream.awaitAnyTermination()
}
/**
* 数据的保存
* @param dataFrame
* @param tableName
* @param isAutoCreateTable
*/
override def save(dataFrame: DataFrame, tableName: String, isAutoCreateTable: Boolean = true): Unit = {
/**
* 保存数据的执行逻辑:
* 1)构建连接kudu数据源的连接信息
* 2)判断kudu数据库中是否存在指定表名的表(如果表不存在,则创建)
* 如果不创建表,写入数据报错:the table does not exist: table_name: "tbl_areas"
* 3)将数据写入到kudu中
*/
//1)构建连接kudu数据源的连接信息
val options: Map[String, String] = Map(
"kudu.master"-> Configuration.kuduRpcAddress,
"kudu.table"-> tableName
)
//2)判断kudu数据库中是否存在指定表名的表(如果表不存在,则创建)
if(isAutoCreateTable){
Tools.autoCreateKuduTable(tableName, dataFrame, "id")
}
//3)将数据写入到kudu中
dataFrame.writeStream
.format(Configuration.SPARK_KUDU_FORMAT)
.options(options)
.outputMode(OutputMode.Update())
.queryName(s"${tableName}-${Configuration.SPARK_KUDU_FORMAT}")
.start()
}
}
八、测试效果展示
实现步骤:
- 启动ETL模块的App单例对象
- 启动数据生成器
- 查询kudu表中是否有数据
实现过程:
- 启动ETL模块的KuduStreamApp单例对象
- 启动数据生成器
- 查询kudu表中是否有数据(随便打开一张表)
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。