6月29日,Doris有幸得到中国信通院云大所、大数据技术标准推进委员会的支持,在中国信通院举行了0.11.0新版本预览线下沙龙。各位嘉宾都带来了干货满满的分享。关注Doris官方公众号,后台回复“0629”即可获取各位嘉宾分享PPT及现场录像。
今天是朱良昌同学代表百度智能云流式计算团队带来Spark Streaming对接Doris 设计与实现的分享。
业务场景
Spark Streaming(主要是Structured Streaming)在百度内部被广泛应用于实时计算,日志分析,ETL等业务场景。其中有很多业务方希望可以使用structured streaming读取上游数据源(例如:kafka、 hdfs、 database等),然后对数据进行处理后实时导入Doris以供查询分析。
为此流式计算团队专门开发了Doris sink的组件来适配Doris。Doris sink支持exactly-once语义,封装并对用户屏蔽了与Doris的交互细节,用户只需要关注用户逻辑和计算本身,经过简单配置即可非常方便的将流式数据导入到Doris中。
Structured Streaming介绍
Structured Streaming是Spark 2.3版本之后提出的新的流式计算引擎,具有以下特点:
1. 具备良好的扩展性和高容错性
2. 基于Spark-SQL引擎,使用DataFrame API,使用简单
3. 相比于Spark 2.2版本之前使用DStream API的spark Streaming模型,Structured Streaming支持更加丰富的流式语义。例如:基于eventTime的window、聚合计算,watermark,流和static DataFrame的join,流和流的简单join等
4. 端到端的exactly-once语义。非常适用于要求数据不重不丢的业务
Spark工作模型
Spark作业整体架构如上图所示:
Driver:作为程序的入口,执行用户代码,生成DAG(有限无环图),对整个app的资源进行协调和管理。
Executor: 执行用户逻辑,dataset计算(transformation和action)。一个executor中可以并行执行多个task,task的并发量由启动executor时指定的core数决定,而每个task负责对一个partition进行计算。
Cluster Manager: 百度内部使用的主要是Yarn。
Structured Streaming编程模型
Structured Streaming与传统意义上的流式计算系统不同,它是一个微批次(micro-batch)的流式计算系统。其主要原理是将源源不断的数据,切分成一个一个的小数据块,其中每一小数据块称之为一个batch。当每次触发计算时,系统会处理一个batch中的数据,而batch和batch之间则是串行执行的。一个batch的计算,可以看做是一个Spark-SQL job的一次执行,故一个Structured Streaming作业可以看做是无穷多个job组成的一个不会停止作业。
WordCount Demo
上图是一个Structured Streaming使用Complete output mode执行Wordcount的示例。
nc是指linux的netcat指令,input数据通过socket传入。在时间点1,系统接收到4条数据作为一个batch进行计算,产生了结果(cat, 1)(dog, 3)。在时间点2的时候又来了两条数据,这两条数据会成为一个新的batch进行计算,新的计算结果会加到最终的结果里(cat, 2)(dog,3)(owl,1)。以此类推,每来一批新数据,将这批数据作为一个batch进行计算,然后对结果进行更新。Structured Streaming就这样源源不断的将输入数据切分为一个一个小的batch,然后执行计算。
端到端Exactly-once语
Structured Streaming的exactly-once语义要求数据从读取->计算->写出的过程,实现端到端的不重不丢。因此对各模块有如下要求:
1. Source可回溯且可回放。简单来说就是可以重复消费,常用Source主要是kafka,bigpipe(百度内部以c++实现的类似kafka的消息队列)
2. Execution engine记录checkpoint。引擎会在处理每个batch之前,先写WAL来记录当前batch要读哪些数据。如果发生failover,可以利用checkpoint对batch的数据进行重新计算。故Source + Execution engine做到了at-least once语义,即数据的不丢
3. 支持幂等写的sink,对重复数据去重,从而保证了任何情况failover的exactly-once语义
Checkpoint & WAL:
上图是一个Batch的计算流程, 以此来讲解Execution engine如果做到数据不丢:
1. batch刚开始时,调用getOffset, 获取该batch要处理的数据范围,即offsetRange
2. 将offsetRange [startOffset, endOffset)存储在OffsetLog中
3. 调用getBatch,利用offsetRange构建Dataset, 提交batch
4. Executor对batch进行计算,并将结果sink到下游系统
5. batch运行结束,由driver写commit log,标识该batch运行完成
因此一个batch从执行开始到结束会写两个log,一个offsetlog,一个commitlog。通过这个两个log可以保证任何Failover场景下的数据不丢。
Failover case分析:
Case 1中,因为OffsetLog中记录的最新batchId和CommitLog中记录的最新batchId相等,所以Failover后,引擎发现第75个batch已经成功运行结束,且没有batch需要重放,则从第76个batch开始继续执行。
Case 2中,OffsetLog中最新的batchId是85, 而CommitLog中记录的最新的batchId是84,两者不相等,说明作业failover发生在batch 85执行过程中,此时需要重新执行batch 85。
Sink幂等写入:
Source + Execution-engine保证了数据的不丢,如果在此基础上希望实现端到端的exactly once,就需要Sink支持幂等写入以支持数据的去重。
Doris Sink的设计与实现
由上文介绍可知,要实现端到端的exacly-once语义,需要下游系统支持对数据的去重,所以在设计Doris Sink时,就要考虑Doris对数据的去重功能。Doris有一个很明显的特点:它的写入是唯一的,即对同一Database,对同一个label的导入是唯一的,同一个label只能被导入一次,不可以被多次导入,即一个label对应唯一一批数据。因此我们可以利用该特性来进行Doris sink的去重设计。
Label的生成逻辑
1. 每个structured streaming作业启动时都必须指定一个checkpointlocation,且每个作业的checkpoint必须是唯一的,不能混用。
2. batch是顺序执行的,因此每个batch的id是顺序递增且唯一的。
3. 每个batch实际上是一个普通的spark job,其中的每个数据分片,可以通过paritionId的来标识。
因此,由3元组(checkpointLocation + batchId + paritionId)组成的label可以唯一的确定一个structured streaming作业中的一段数据。那么只要确保在failover前后同一段数据对应的label相同,即可以此来去重以实现exactly-once语义。
val replace = tmp.replaceAll("[-|/|:|.]", "_") +
s"_${batchId}_${TaskContext.getPartitionId}"
// shrink multiple underscores to one
// e.g: label___test => label_test
val builder = new StringBuilder
for (index <- 0 to replace.length - 1) {
if (index == 0) {
builder += replace(0)
} else if (replace(index - 1) != '_' || replace(index) != '_') {
builder += replace(index)
}
}
val resultStr = builder.toString
val length = resultStr.length
if (length > 128) {
logWarning("palo label size larger than 128!, we truncate it!")
resultStr.substring(length - 128, length)
} else {
resultStr
}
以上是生成label代码的部分实现,我们会对特殊字符进行一些处理,且如果生成的label超过128个字符会截断,因为Doris 的label最多只支持128个字符。
Doris sink结构图
Spark本身提供了Sink接口,我们通过继承Sink接口来实现Doris sink组件。
这里重点关注DorisWriterTask,该task是executor中实际进行计算的task逻辑,它有3种实现,分布对应了Doris的3种Load模式(Bulk load, Broker load, Streaming load)。下面主要介绍Bulk load和Broker load。Streaming load将在近期实现。
Doris Bulk Load Task
1. 开始执行后,每个task会首先先将数据写入本地磁盘形成文件,文件则以上文提到的label来命名。
2. Task发送http请求,以bulk load的方式,向Doris发起load请求
3. Task轮询Doris,查询步骤2中的load是否结束
4. Finish load之后,删除步骤1中生成的本地文件
注意点:
1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算
2. 上述过程中步骤3的轮询的意义在于我们需要确保一个batch的数据成功导入后才能开始执行下一个batch,所以我们一定要通过query load info的方法,确保label对应的数据被成功load。
3. bulk load的模式,适用于单个partition数据不超过1GB的导入。
Doris DFS Load Task
1. 开始执行后,每个task会首先先将数据写入hdfs,文件则以上文提到的label来命名。
2. Task 想Doris发送broker load请求
3. Doris broker去hdfs load文件
4. Task轮询Doris,查询步骤2中的load是否结束。
5. Finish load之后,删除步骤1中生成的hdfs文件
注意点:
1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算
2. 适用于单个partition数据超过1GB的导入。
Doris 社区 Pull Request
https://github.com/apache/incubator-doris/pull/1332
此次沙龙我们有幸邀请到了来自一点资讯、京东、搜狐、百度智能云的技术大牛带来他们的应用实践和开发分享。
其他嘉宾的分享会在近日放出,欢迎关注Apache Doris(incubating)官方公众号,后台回复“0629”即可获取各位嘉宾分享PPT及现场录像。
欢迎扫码关注:
Apache Doris(incubating)官方公众号
相关链接:
Apache Doris官方网站:
http://doris.incubator.apache.org
Apache Doris Github:
https://github.com/apache/incubator-doris
Apache Doris Wiki:
https://github.com/apache/incubator-doris/wiki
Apache Doris 开发者邮件组:
dev@doris.apache.org
虽说一波爆发挺好的。。。但是估计得对下时间,这应该是去年的。。。