Spark Doris Connector的实践
三寸九州 发布于2021-04 浏览:6182 回复:0
0
收藏

Doris线下沙龙完美收官!本次沙龙邀请了来自百度、美团、京东的技术大牛带来实战分享。了解更多详情请关注Doris官方公众号。公众号后台回复“1222”立即get现场录像及嘉宾PPT。

 


2019年12月22日,Doris线下沙龙在百度大厦顺利举办。本次邀请了来自美团、京东、百度的技术大牛来分享实战经验,快来跟随小编一起回顾吧!

来自百度大数据部的张文歆为大家带来了通过Spark(百度数据工厂Pingo)查询Doris的最佳实践。


文歆,2015年进入百度,现任百度大数据部资深研发工程师。百度数据计算引擎QE和百度数据工厂Pingo的主力研发人员。百度数据工厂结构化元数据服务负责人。

 

Spark Doris Connector(Doris-Spark)

 

Spark Doris Connector 是Doris在0.12版本中推出的新功能。用户可以使用该功能,直接通过Spark对Doris中存储的数据进行查询。

从Doris角度看,将其数据引入Spark,可以使用Spark一系列丰富的生态产品,拓宽了产品的想象力,也使得Doris和其他数据源的联合查询成为可能。

 

 

技术选型

 

在早期的方案中,我们直接将Doris的JDBC接口提供给Spark。对于JDBC这个Datasource,Spark侧的工作原理为,Spark的Driver通过JDBC协议,访问Doris的FE,以获取对应Doris表的Schema。然后,按照某一字段,将查询分位多个Partition子查询任务,下发给多个Spark的Executors。Executors将所负责的Partition转换成对应的JDBC查询,直接访问Doris的FE接口,获取对应数据。这种方案几乎无需改动代码,但是因为Spark无法感知Doris的数据分布,会导致打到Doris的查询压力非常大。

 

于是我们开发了针对Doris的新的Datasource,Spark-Doris-Connector。这种方案下,Doris可以暴露Doris数据分布给Spark。Spark的Driver访问Doris的FE获取Doris表的Schema和底层数据分布。之后,依据此数据分布,合理分配数据查询任务给Executors。最后,Spark的Executors分别访问不同的BE进行查询。大大提升了查询的效率。

 

 

使用方法

 

在Doris的代码库的 extension/spark-doris-connector/ 目录下编译生成doris-spark-1.0.0-SNAPSHOT.jar,将这个jar包加入Spark的ClassPath中,即可使用Spark-on-Doris功能了

 

SQL方式:

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris


RDD方式:

import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)

dorisSparkRDD.collect()


DataFrame方式:

val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()

dorisSparkDF.show(5)


更多的配置参数及使用方式,可以参阅使用文档:

https://github.com/apache/incubator-doris/tree/master/extension/spark-doris-connector

 

 

适用场景

 

处理历史数据变更

 

在没有Spark Doris Connector前,Doris修改数据的成本很高,但数据的修改和删除需求在真实业务中时常出现。

 

Spark Doris Connector之前:

方案一:之前导入的错误数据不要删除,采用replace的方式,将错误的数据全部倒入一份负值的,从而将value刷成0,再将正确的数据导入进去。

方案二:把错误数据删除,然后再将正确数据insert进来。

 

上述方案都存在一个问题,即总有一段时间窗口内数据value为0。这对于外部系统来说是不能容忍的。例如广告主需要查看自己的账户信息,如果因数据变更问题而导致账户显示为0,将是难以接受的,很不友好。

 

Spark Doris Connector方案:

有了Spark Doris Connector,处理历史数据变更将会更加便捷。

 

如上图所示,第一行是错误数据,第二行是正确数据。Spark可以链接两条流,一条流使用Spark Doris Connector连接Doris,一条流连接外部的正确数据(例如业务部门生成的Parquet文件)。在Spark中做diff操作,将所有value算出diff值,即图中最后一行的结果。将其导入进Doris即可。这样的好处是可以消除中间的时间窗口,同时也便于平时经常使用Spark的业务方来进行操作,非常友好。

 

使用Spark对Doris中的数据和其他数据源进行联合分析

 

很多业务部门会将自己的数据放在不同的存储系统上,比如一些在线分析、报表的数据放在Doris中,一些结构化检索数据放在Elasticsearch中、一些需要事物的数据放在MySQL中,等等。业务往往需要跨多个存储源进行分析,通过Spark Doris Connector打通Spark和Doris后,业务可以直接使用Spark,将Doris中的数据与多个外部数据源做联合查询计算。

 

 

 

 

技术实现

 

架构一览

 

Doris对外提供更多能力

 

Doris FE

对外开放了获取内部表的元数据信息、单表查询规划和部分统计信息的接口。

所有的Rest API接口都需要进行HttpBasic认证,用户名和密码是登录数据库的用户名和密码,需要注意权限的正确分配。

// 获取表schema元信息
GET api/{database}/{table}/_schema

// 获取对单表的查询规划模版
POST api/{database}/{table}/_query_plan
{
"sql": "select k1, k2 from {database}.{table}"
}

// 获取表大小
GET api/{database}/{table}/_count


Doris BE


通过Thrift协议,直接对外提供数据的过滤、扫描和裁剪能力。

service TDorisExternalService {
// 初始化查询执行器
TScanOpenResult open_scanner(1: TScanOpenParams params);

// 流式batch获取数据,Apache Arrow数据格式
TScanBatchResult get_next(1: TScanNextBatchParams params);

// 结束扫描
TScanCloseResult close_scanner(1: TScanCloseParams params);
}


Thrift相关结构体定义可参考:

https://github.com/apache/incubator-doris/blob/master/gensrc/thrift/DorisExternalService.thrift

 

Doris这些对外提供的接口,同样可以用于Flink On Doris等。

 

实现Spark Doris Datasource

 

Datasource V1 API

V1API 由一系列的抽象类和接口组成,它们位于 spark/sql/sources/interfaces.scala 文件中。主要的内容有:

trait RelationProvider {
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

abstract class BaseRelation {
def sqlContext: SQLContext
def schema: StructType
}

trait TableScan {
def buildScan(): RDD[Row]
}

通过实现 RelationProvider 接口,表明该类定义了一种新的数据源,可以供 Spark 读取数据所用。createRelation 方法的参数可以用来做初始化,如DorisFE的地址,用户等信息。BaseRelation 抽象类则用来定义数据源的表结构,在Doris Datasource中,表结构来源于Doris FE接口。该类还必须实现某个 Scan 接口,Spark 会调用 buildScan 方法来获取数据源的 RDD。

 

实现RDD

DorisDatasource定义好后,我们需要实现DorisRDD用于真正的读取Doris的数据到Spark中。

在Spark RDD中,最主要的接口为getPartitons()和compute(),getPartitions负责将数据切分成多个子集,用于并行计算。而compute则返回一个迭代器,迭代器会依次获取在子集上的结果序列。

override def getPartitions: Array[Partition] = {
// 1. 调用FE 获取对单表的查询规划模版接口
// 2. 为每个Tablet选择BE,并形成Seq>>。每一组Pair作为一个Partition
}

override def compute(split: Partition, context: TaskContext):
ScalaDorisRDDIterator[T] = {
// 返回一个获取结果数据的迭代器
new ScalaDorisRDDIterator(context, split.asInstanceOf[DorisPartition].dorisPartition)
}

// 迭代器内部实现
class ScalaDorisRDDIterator[T] {
def init: Unit = {
// 调用BE 初始化查询执行器,获取Context id;
}
override def hasNext: Boolean = {
if (没有缓存的数据) {
// 调用BE 流式batch获取数据API,将数据放入缓存;
// 返回API内的eos;
}
else {
// 返回true;
}
}
override def next(): T = {
// 读取缓存中的一行数据;
}
}

 


Doris Spark后续规划

 

扩展SparkSessionExtensions, 支持聚合如sum、count等聚合算子下推,充分利用Doris的物化视图模型。

 

持续优化数据传输协议,提高扫描速度

 

 

百度数据工厂Pingo介绍

 

百度数据工厂Pingo是基于Spark的数据工厂产品,是增强版的数据湖。Pingo拥有以下主要功能:

能够对异地、异构的非结构化数据、结构化数据、计算资源进行统一接入、访问和权限管理

由于核心引擎是Spark,因此查询语言支持SparkSQL,Dataframe。

提供WebUI、客户端、Restful接口三种交互方式

 

Pingo在百度内部提供离线计算服务。通过百度公有云、私有云的方式对外提供商业版本,并已经积累了很多行业案例。

 

架构一览

下面是Pingo的架构图,灰色部分是Pingo提供的功能组件,绿色部分为Pingo可挂载的外部组件。

 

主要特点

Pingo提供了比原生Spark更优异的性能和更丰富的数据处理能力,主要特点如下:

批流一体的计算引擎:Pingo提供Spark Dataframe,Spark Structed Streaming等业界流行的离线、流式计算框架,支持标准SQL访问。并自研了Spark Streaming SQL,统一批、流查询语言。

支持多种形式数据采集:Pingo提供数据传输服务,可以完成结构化数据库、非结构化数据的批量与增量采集。

多存储后端数据联合查询:Pingo不仅自身能够提供文件的存储能力,而且能够无缝的接入多种外部的存储系统。无需停机维护和数据迁移,用户可以直接将存储在BOS,S3,私有HDFS等存储资源上的数据接入到Pingo中。Pingo可实现多存储混合查询,与此同时,Pingo还能够缓存常用的数据,加速计算过程。

多结构化元信息联合查询:Pingo的元数据存储及管理模块可无缝对接用户已有的元数据存储系统(包括但不限于Hive Metastore, MySQL, Doris等)。可直接通过Pingo执行联合查询等离线计算任务。

多计算集群同时调度:Pingo自带资源调度模块。在提供默认计算资源的同时,用户也可通过简单操作挂载已有计算资源,从而无需在不同的计算集群中重新分配计算资源。

 

欢迎试用

目前Pingo已在百度云上线,欢迎使用:

https://cloud.baidu.com/product/pingo.html

 

 

 

 

欢迎扫码关注:

Apache Doris(incubating)官方公众号

 

相关链接:

Apache Doris官方网站:

http://doris.incubator.apache.org

Apache Doris Github:

https://github.com/apache/incubator-doris

Apache Doris Wiki:

https://github.com/apache/incubator-doris/wiki

Apache Doris 开发者邮件组:

dev@doris.apache.org

收藏
点赞
0
个赞
快速回复
TOP
切换版块