我的位置:首页 > 技术文章 > 正文

腾讯实时计算平台Oceanus建设实践

2019-04-16 分类:Oceanus

引言:2019年4月1-2日,Flink Forward 2019 San Francisco会议在旧金山召开。Flink Forward会议邀请了来自Google, Uber, Netflix和Alibaba等公司在实时计算领域的顶尖专家和一线实践者,深入讨论了Flink社区的最新进展和发展趋势,以及Flink在业界的应用实践。随着近年来对Flink技术的广泛应用以及对Flink社区的活跃贡献,腾讯也受邀参加了会议并以主题Developing and Operating Real-Time Applications at Tencent介绍了腾讯大数据在实时计算平台建设上的工作。

 

一、背景介绍

Slice-4

近年来,实时计算在腾讯得到了越来越广泛的应用。在腾讯内部,实时计算应用主要分为以下四类:

  • (1)ETL:ETL应该是目前实时计算最普遍的应用场景。例如在TDBank的数据链路中,TDSort读取消息缓存系统Tube中的消息,通过流数据处理系统将消息队列中的数据进行实时分拣,并落地到HDFS接口机集群,并将最终分拣后的数据由加载到TDW中。
  • (2)监控系统:随着服务数量和机器规模的不断增长,线上环境日益复杂,对监控和报警系统也提出了更高的要求。监控系统需要能够对产品和服务进行多维度的监控,对指标数据进行实时的聚合和分析,并支持方便灵活的报警规则设置。
  •  (3)实时BI:实时的业务报表对产品运营有着非常大的帮助,能够帮助我们的运营人员实时掌握产品数据,及时制定运营策略,通过更好的时效性获取竞争优势。
  •  (4)在线学习:实时计算目前在推荐、广告和搜索等产品中也有着十分广泛的应用。一般来说,用户兴趣会在多个时间维度上持续变化。通过对用户行为进行实时检测,我们能够及时获取用户的当前兴趣并提供更精准的用户行为预测。

目前腾讯的实时计算的规模已经十分庞大。数据平台部实时计算团队每天需要处理超过了17万亿条数据,其中每秒接入的数据峰值达到了2.1亿条。

 

二、Oceanus简介

Slice-7

为了提高用户流计算任务持续集成和持续发布的效率,实时计算团队从2017年开始围绕Flink打造了Oceanus (http://data.qq.com),一个集开发、测试、部署和运维于一体的一站式可视化实时计算平台。Oceanus集成了应用管理、计算引擎和资源管理等功能,提供了三种不同的应用开发方式,包括画布,SQL和Jar,来满足不同用户的开发需求,同时通过日志、监控、运维等周边服务打通了应用的整个生命周期。

 

Oceanus还研发了Oceanus-ML来提高在线学习任务的开发效率。Oeanus-ML提供端到端的在线机器学习,涵盖数据接入,数据处理,特征工程,算法训练,模型评估,模型部署整个机器学习流程。通过Oceanus-ML,用户可以方便地利用完备的数据处理函数,丰富的在线学习算法来构建自己的在线学习任务,轻松地完成模型训练和评估,同时可以一键部署模型,管理模型的整个生命周期。

 

在完成作业开发之后,用户可以通过Oceanus对作业进行测试、配置和部署。Oceanus为用户程序提供了一系列的工具来协助作业测试。用户既可以使用Oceanus提供的一键生成功能产生测试数据,也可以自己向Oceanus上传自己的的测试数据,通过对比预期结果和实际结果来验证应用逻辑的正确性。Oceanus依托腾讯内部的资源调度系统Gaia来进行资源管理和作业部署。用户可以通过Oceanus配置作业所需要的CPU和内存资源,并指定作业需要部署的集群。当用户完成配置之后,Oceanus会向Gaia申请对应的资源并将作业提交到Gaia上运行。

 

Oceanus对Flink作业运行时的多个运行指标进行采集,包括Task Manger的内存,I/O和GC等。通过这些丰富的运行指标,用户能够很好的了解应用运行的情况,并在出现异常时能协助用户及时的定位问题。运维人员则可以通过这些采集到的指标,设置报警策略并实现精细化的运营。

Slice-8

大部分Oceanus的用户可以使用画布方便的构建他们的实时计算应用。Oceanus提供了常见的流计算算子。在开发实时计算应用时,用户将需要的算子拖拽到画布上,配置这些算子的属性并将这些算子连接,这样就构建好了一个流计算应用。这种构建方式十分简单,不需要用户了解底层实现的细节,也不需要掌握SQL等语言的语法,使得用户能够专注于业务逻辑。

Slice-9

Oceanus同样也提供了SQL和Jar的方式来开发实时计算作业。在使用SQL和Jar进行开发时,一个比较麻烦的地方就是作业的配置问题。例如SQL脚本没有提供任何方式来允许用户进行作业资源配置。尽管Flink的DataStream API为用户提供了接口来修改并发度和资源等配置,但为了灵活修改这些配置,用户常常需要自己通过外部配置文件的进行处理。为了用户能够在使用SQL和Jar进行开发时也能方便的进行作业配置,Oceanus会首先对用户提交的SQL脚本和JAR包进行解析和编译,生成作业执行的JobGraph,并可视化在页面上。用户之后可以根据可视化的JobGraph来进行作业配置。通过提供可视化的配置方式,用户作业开发的效率可以得到极大的提高。

 

配置好的JobGraph之后将被提交到Yarn集群上执行。为了能够更准确的获取更多的作业信息,Oceanus放弃了Flink默认的提交方式,而通过增强过的ClusterClient接口来提交作业。通过ClusterClient,Oceanus可以获得作业的JobID, Yarn Application ID等信息,并利用这些信息来提供作业的生命周期管理。

Slice-10

Oceanus对正在运行的作业采集了大量的指标,通过这些指标来监控作业运行情况,并在发生故障时定位原因。为了提高运维效率,我们根据长期积累的经验对运行指标进行了筛选,并对Flink UI进行了重构来合理展示这些指标。

 

每个task输入和输出队列的使用率是在实际生产中非常有用的运行指标。通常来说,当一个task的输入队列被占满,而输出队列为空时,说明这个task的上游的数据产生速度已经超过了这个task的处理能力,导致了这个task的输入出现了堆积。当一个作业中出现这样的task时,我们就需要通过性能优化或者增加并发度的方式来提高这个task的处理性能。

 

输入和输出的TPS也是在作业运行中的关键指标。通常来说,一个task的输出TPS和输入TPS之间的比例并不会随着并发度的变化而变化。我们利用这个性质来确定作业运行时的并发度。当确定作业并发度时,我们首先将所有task的并发度设置为1并启动作业。此时这个作业显然是无法处理上游的数据的,因此大部分task的单机处理能力会被打满,其输入和输出TPS可以达到最大值。根据需要的TPS和单机最大TPS,我们可以估算出每个task的并发度,并重新启动。之后根据前面提到的输入输出队列的使用率,我们对作业并发度进行一定的调整来去除作业中的性能瓶颈。一般通过几次调整之后,我们就可以得到较为理想的作业并发度配置。

 

在后面,我们希望能够实现自动化脚本或者优化器来简化作业并发的配置。不过这仍然是一个非常有挑战的工作。一个主要的难点在于窗口算子的处理。很多窗口算子在平时较为空闲,但在窗口触发时会一下子发送大量的结果。在那一瞬间,如果窗口算子的并发度不够就会出现一定的结果延迟。如何平衡窗口算子在空闲和触发时的并发度目前看来仍然需要很多的trade-off。

 

当一个task的最大和最小TPS之间出现较大的差值时,一般就意味着作业中出现了负载倾斜。负载倾斜会对作业的性能造成较大的影响,同时也很难通过增加并发度的方式来提高性能。为了减少负载倾斜对作业性能的影响,我们引入了Local Keyed Streams。相关工作将在后面的第三部分进行介绍。

Slice-11

在即将发布的新版Oceanus中,我们还对TaskExecutor的线程信息进行了采集。这些线程信息能够很好地帮助用户定位发生的问题。例如checkpoint可能会由于多种多样的原因而超时。当用户实现的source function在被IO或者网络堵塞时并没有释放checkpoint锁,那么正在执行的checkpoint可能就会由于无法及时获取锁而超时。用户也有可能实现了一个堵塞的checkpoint函数,由于较慢的HDFS写入或者其他原因而导致checkpoint超时。通过观察线程信息,我们就可以容易的知道checkpoint超时的原因。

 

这些采集的线程信息也能对程序的性能优化提供很多帮助。一般而言,当一个task线程的cpu使用率达到100%时,就说明这个task的执行并没有受到加锁,I/O或者网络等操作的影响。在上图中,我们展示了一个Word Count程序的Task Executor的线程信息。在Word Count程序,我们有一个source task在持续不断的发送word,还有一个map task对出现的word进行计数。可以看到,在Task Executor中,map线程的cpu使用率几乎达到了100%,这说明其的执行是没有太大问题的。而source线程的cpu使用率仅仅只有80%,这说明其的性能受到了影响。观察线程堆栈,我们可以发现source线程时常会堵塞在数据的发送上。这是很好理解的,因为每产生一个word,map线程都需要比source线程执行更多的指令。也就是说,map线程的数据处理能力比source线程的生产能力要低。为了提高这个Word Count程序的性能,我们就需要保证map线程的数目比source线程的数目多一点。

 

三、Flink内核改进

 

Slice-12

除了在Oceanus上提供方便强大的接口和工具之外,我们还对Flink内核进行了大量的改进来提高其可用性和可靠性。这些改进主要包括以下几个部分:

  • (1)作业管理相关:我们对Flink的作业管理的改进主要以提高作业执行的可靠性为主,包括对分布式环境下的leader选举的重构和无需作业重启的Job Master恢复机制等。同时,我们也正在研究和开发细粒度恢复机制来减少发生故障时需要重启的task数目。
  • (2)资源调度相关:我们对Flink的资源调度,特别是在Yarn集群上的资源调度进行了重构,以提供更好的资源使用率。同时,我们也正在研究如何使用分布式和异步的资源调度框架来提高超大并发度的作业的资源调度效率。
  • (3)可用性相关:我们在Flink中提供了多个算子,包括local keyby, incremental windows, dim join等。这些算子能够很好的提高用户开发程序的效率和程序执行的性能。

 3.1 分布式Leader选举

Slice-13

Flink的master负责资源申请、任务调度、checkpoint协调,并响应用户请求。在任何时刻,一个集群中都只可以有一个master节点可以在工作状态。当集群中出现多个master节点时,就需要通过leader选举确定工作的master节点。

在分布式环境下进行Leader选举是分布式系统中的一个经典问题。目前Flink依赖Zookeeper进行leader选举,并将当选的leader的信息保存在Zookeeper上以实现服务发现。但在复杂的集群环境中,Flink当前的实现并不能很好的保证leader选举和发布的正确性。

如上图左侧所示,当JM1获得leader之后,其需要在Zookeeper发布其地址以供其他节点来发现自己。但如果在其发布地址之前,JM1发生了Full GC,那么集群就可以陷入混乱之中。其长时间的GC可能会导致其丢失leader以及和Yarn之间的心跳连接。此时一个新的master节点, JM2, 可能会被Yarn拉起。JM2在获得leader之后会将其地址发布在集群中。当如果此时JM1从Full GC中恢复过来,并继续执行之前的代码,将其地址发布在集群中,那么JM1的地址将会覆盖JM2的地址导致集群混乱。

另一个由于leader选举导致的常见问题是checkpoint的并发访问。当一个master丢失leader节点之后,其需要立即停止其所有正在进行的工作并退出。但是如果此时旧master的Checkpoint Coordinator正在完成checkpoint,那么退出方法将无法获取到锁而执行。此时,在已经丢失了leader的情况下,旧master仍然有机会完成一个新的checkpoint。而此时,新master却会从一个较旧的checkpoint进行恢复。目前Flink使用了许多tricky的方法来保证多个master节点对checkpoint的并发访问不会导致作业无法从故障中恢复,但这些方法也导致我们目前无法对失败的checkpoint进行有效的脏数据清理。

Slice-14

为了上述问题,我们对Flink的leader选举和发布进行了重构。我们要求每个master节点在竞争leader时都创建一个EMPHEMERAL和SEQUENTIAL的latch节点。之后所有master节点会检查latch目录下所有的节点,序列号最小的那个节点将会被选举为leader。

Zookeeper的实现保证了创建的latch节点的序列号是递增的。所以如果一个master节点被选为leader之后,只要它的latch节点仍然存在,就意味着它的序列号仍然是所有master节点中最小的,它仍然是集群中的leader。从而我们就可以通过检查一个master的latch节点是否存在来判断这个master是否已经丢失leader。通过将leader地址的发布以及对checkpoint的修改等更新操作和对latch节点的检查放置在一个Zookeeper事务中,我们可以保证只有保有leader的master节点才可以对作业执行状态进行修改。

3.2 无需作业重启的master恢复机制

Slice-15

Master节点会由于多种不同的原因而发生故障。目前在master重启时,Flink会重启所有正在执行的task,重新开始执行作业。在Zookeeper连接出现抖动时,集群中所有task都会重启,对HDFS, Zookeeper和YARN这些集群基本组件带来较大的压力,使得集群环境进一步恶化。

为了减少master恢复的开销,我们实现了无需作业重启的master恢复机制。首先,我们使用Zookeeper和心跳等手段来对master的状态进行监控。当master发生故障时,我们立即拉起一个新的master。新master在启动时,并不会像第一次执行时那样申请资源并调度任务,而是会进入到reconcile阶段,等待task的汇报。

在另一边,task executor在丢失了和master节点的连接之后,也不会立即杀死这个master负责的task。相反,它将等待一段时间来发现新master的地址。如果在这段时间内发现了新master的地址,那么task executor将把其执行的task的信息汇报给新master。

新master通过task executor汇报上来的信息来重建其execution graph和slot pool。当所有task完成汇报,并且所有task在master恢复的这段时间内没有出现故障,那么master就可以直接切换作业状态到running,并继续作业的执行。如果有task未能在规定时间内汇报,或者有task在这段时间内发生故障,那么master将切换到failover状态并通过重启恢复执行。

3.3 细粒度资源分配

Slice-16

目前Oceanus依赖YARN来进行资源申请和任务调度。但现有Flink在YARN上资源分配的实现有着较大的问题,对作业可靠性带来了一定的风险。

在现在Flink的实现中,每个task executor都有着一定数目的slot。这些slot的数目是在task executor启动时根据配置得到的。当为任务分配资源时,task会按照可用slot的数目分配到空闲的task executor上,一个task占据一个slot。在这个过程中,Flink并不会考虑task实际使用的资源量以及task executor剩余可用的资源量。

这种资源分配的方式是十分危险的,会导致task executor向YARN申请的资源量和实际task使用的资源量不匹配。在集群资源紧张的时候,由于YARN会杀死那些超用资源的container,作业就会进入不断重启的状态之中。

这种资源分配的方式也会导致较严重的资源浪费。在实际中每个算子所需的资源使用量是不同的。有的算子需要较多的CPU资源,而有的算子需要较少的内存资源。由于现在的配置中所有task executor具有相同的slot数目,所有slot都具有相同的资源,因此导致较为严重的资源碎片,无法充分利用集群资源。

为了避免由于资源分配导致的不稳定,我们修改了Flink在YARN上的资源申请协议。我们不再使用静态的slot配置,而是根据task申请动态的创建和销毁slot。首先,我们要求用户能够为每个operator设置其所需的资源量。这样我们就可以根据slot中执行的operator来得到每个slot所需的资源量。当Master节点请求一个slot时,我们遍历所有的task executor并在空余资源量能够满足slot请求的task executor上创建一个新的slot提供给master节点。当这个slot中的task完成执行之后,这个slot也将被删除并将其资源归还给task executor。这种动态的slot申请方式可以使得Flink的资源利用率极大的提高。

3.4 Local Keyed Streams

现实中,很多数据具有幂律分布。在处理这类数据时,作业执行性能就会由于负载倾斜而急剧下降。

Slice-17

以WordCount程序作为示例。为了统计每个出现word的次数,我们需要将每个word送到对应的aggregator上进行统计。当有部分word出现的次数远远超过其他word时,那么将只有少数的几个aggregator在执行,而其他的aggregator将空闲。当我们增加更多的aggregator时,因为绝大部分word仍然只会被发送到少数那几个aggregator上,程序性能也不会得到任何提高。

为了解决负载倾斜的问题,我们提供了Local Keyby算子,允许用户在task本地对数据流进行划分。划分得到的Local keyed streams和一般的Keyed streams是类似的。用户可以通过RuntimeContext访问keyed state,也可以在数据流上执行窗口操作。利用Local keyed streams,我们就可以在数据发送一端就进行本地的预聚合,统计一定时间段内word在当前task出现的次数。这些预聚合的结果然后被发送给下游,通过合并得到最终的结果。

Slice-18

但Local keyed streams的数据划分和分发和keyed streams不同。在keyed streams中,数据流会划分成多个key group,每个task都会负责一部分key group的处理。每个task之间的key group是没有任何交集的。而由于local keyed streams是在task本地对数据流进行划分,因此每个task上的key group range都是key group全集。即如果数据流总共有3个key group,那么每个task的local key group range都为[1, 3]。

当并发度改变时,这些local key group将按照数据均匀分给新的task。例如当task并发度从3变为2时,那么第一个task将分配到5个local key group,而第二个task将被分配到4个。在这种情况下,同一个task将会被分配到多个具有相同id的local key group。这些具有相同id的local key group将会被合并起来。当合并完成之后,所有task上的local key group range将仍然是[1, 3]。对于Reducing State, Aggregating State以及List State来说,它们的合并是比较简单的。而对于Value State, MapState和Folding State等类型的数据而说,则需要用户提供自定义的合并函数来实现local key group的合并。

由于在一定时间段内发送给下游的数据量不过超过上游的并发度,下游的负载倾斜可以有效缓解。同时由于数据在上游一般没有较为严重的倾斜,程序性能不会由于负载倾斜而严重降低。我们测试了WordCount程序在不同数据倾斜程度下的吞吐。可以看到,在没有使用local keyed streams的情况下,程序性能随着倾斜程度而迅速下降,而使用local keyed streams之后,程序性能几乎不受影响。

3.5 可用性提升

Slice-19

为了方便用户开发画布和SQL程序,我们实现了超过30个的Table API和SQL函数。用户可以利用这些内置函数极大地提高实时计算应用的开发效率。此外,我们也对数据流和外部维表的join进行了大量优化,并补充了Flink还未支持的Top N功能。我们还提供了incremental window功能,允许用户能够在窗口未触发时得到窗口的当前结果。Incremental window在多个应用场景中有着广泛的应用。例如用户可以利用incremental window统计活跃用户数目在一天内的增长情况。

四、后续工作

我们后续的工作主要包括以下几个方面:

  • (1)我们将继续研究提高Flink任务调度效率的方法。目前Flink使用单线程模型执行任务调度。在作业并发度较高的情况下,Flink的任务调度效率较低。我们将尝试使用分布式和异步的任务调度模型来提高任务调度效率。
  • (2)我们还将继续研究批流融合的checkpoint机制。目前Flink基于chandy-lamport算法来为流作业进行checkpoint,而使用upstream restart的方式来将批作业从故障中恢复。我们可以将两者结合起来,提供一种统一的checkpoint机制,使得在流作业的恢复可以利用缓存的中间结果来减少所需重启的task数目,而在批作业中,通过对长时间运行的任务进行checkpoint来避免在发生故障时从头开始重新执行。
  • (3)我们还将在制定执行计划时考虑数据在空间和时间维度上划分。例如在系统资源不足以支持对数据流式处理时,我们可以将数据在时间维度上进行划分,依次对划分好的数据进行处理。
  • (4)腾讯大数据产品矩阵即将发布的SuperSQL项目,利用Flink的计算能力来满足跨数据中心,跨数据源的联合分析需求。它可以做到:数据源SQL下推,避免集群带宽资源浪费;单DC内CBO(基于代价优化),生成最优的执行计划;跨DC CBO,根据DC负载和资源选择最佳DC执行计算,从而获得更好的资源利用和更快的查询性能。

TOP
返回顶部
在线咨询