云原生与Spark的融合:开启数据分析新篇章

发表时间: 2023-10-18 11:15

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

随着数据规模的迅速增长和数据处理需求的不断演进,云原生架构和湖仓分析成为了现代数据处理的重要趋势。在这个数字化时代,企业面临着海量数据的挑战和机遇,而构建可扩展、灵活且高效的数据分析平台成为了迫切的需求。

文章主要介绍了火山引擎湖仓一体分析服务 LAS(下文以 LAS 指代)基于 Spark 的云原生湖仓分析实践,利用 Spark 的强大功能和云原生技术的优势,构建高效、可扩展、灵活的数据分析平台,满足现代企业对数据洞察的迫切需求,为企业提供了强大的解决方案。文末更有专属彩蛋,新人优惠购福利,等着你来解锁!

本篇文章提纲如下:

  • Spark on K8S
  • Kyuubi:Spark SQL Gateway
  • CatalogService:湖仓一体元数据架构实践
  • LAS Batch Scheduler:云原生批处理调度器
  • UIService:云原生 Spark History Server
  • Falcon:Remote Shuffle Service
  • 总结

Spark on K8S

作为当今云原生基础设施的事实标准,Kubernetes 在 LAS Spark 中扮演着重要的角色。我们首先分享下 LAS Spark 基于 Kubernetes 的实践优化工作。

Kubernetes(常简称为 k8s)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。它提供了一个强大的容器编排和管理系统,可以简化应用程序的部署、扩展和管理过程。

Kubernetes 最初由 Google 开发,并于 2014 年开源。它基于 Google 内部的 Borg 系统的经验和技术,并吸收了社区的贡献和反馈,逐渐成为容器编排领域的事实标准。

Kubernetes 的架构是高度可扩展化的,它由一组核心组件和插件组成。开发者可以通过插件机制扩展和增强 Kubernetes 的功能。Kubernetes 被广泛应用于云原生应用程序的部署和管理。它提供了强大的功能和灵活性,使开发人员和运维团队能够更轻松地构建、部署和管理容器化应用程序,并实现高可用性、可扩展性和弹性伸缩等目标。

火山引擎 LAS 借助 Kubernetes 作为基础架构,结合一系列经过深度自研的可扩展插件,成功打造了 Serverless Spark 的能力,从而实现了云原生湖仓一体服务能力。

LAS Spark 采用了 Spark Operator 在 Kubernetes 上管理每个 Spark 作业的执行。Operator 是 Kubernetes 的一种扩展机制,它利用自定义资源(custom resource)来管理应用程序及其组件。Operator 遵循 Kubernetes 控制器的设计理念。

Operator 模式的概念允许在不修改 Kubernetes 核心代码的情况下,通过为自定义资源关联控制器来扩展集群的功能。Operator 充当了 Kubernetes API 的客户端,同时也是自定义资源的控制器。

部署 Operator 的常见方法是将自定义资源及其关联的控制器添加到集群中。与部署容器化应用程序类似,控制器通常运行在控制面板(Control Panel)之外,例如可以将控制器作为 Deployment 在集群中运行。

Spark Operator 的作用是将 Spark 作业描述为自定义资源,用户或程序可以通过纯 Kubernetes 接口的方式提交Spark 作业并查看作业的运行状态。这使得管理 Spark 作业与管理其他 Kubernetes 资源一样简单,将 Spark 作业转化为标准的 Kubernetes 作业负载类型。

用户或程序通过 SparkApplication CRD(custom resource definition)向 Kubernetes 集群提交一个 Spark 作业。Spark Operator 订阅了集群中所有 SparkApplication 的状态更新,通过调用 spark-submit 向 Kubernetes 集群提交作业,并维护对应 Spark 作业的整个生命周期。

实际上,火山引擎 LAS 在底座上使用的是火山的容器服务 VKE(Volcengine Kubernetes Engine)。VKE 是基于 Kubernetes 的企业级容器云管理平台

通过在 VKE 上构建云原生集群,LAS Spark 提供了多租户隔离的运行环境。在逻辑层面,LAS 通过队列的设计实现了用户资源的切分,而在物理层面,则通过容器的隔离策略保障了租户作业运行时的隔离能力。

更进一步,LAS 基于火山引擎 VCI(Volcengine Container Instance)提供了沙箱容器安全隔离的能力。VCI 是一种 Serverless 和容器化的计算服务,可与容器服务 VKE 托管版无缝集成,提供 Kubernetes 编排能力。

LAS 在 VKE/VCI 的基础上构建了潮汐 Quota 的能力,通过对集群层面资源用量的整体监控,实现了资源的削峰填谷。基于 VCI POD 粒度的弹性执行能力,LAS 会在后续进一步增强弹性扩缩容的能力,在 Spark 作业粒度提供完全无损的实时弹性扩缩容能力。

Kyuubi:Spark SQL Gateway

基于前面的介绍,我们已经成功实现了 Spark 的云原生化。为了进一步实现 Spark 的输出能力,LAS Spark 利用 Apache Kyuubi 封装了完整的 Spark 引擎功能。Kyuubi 是一个分布式和多租户网关,主要用于在数据仓库和数据湖上提供入口服务。它能满足企业内不同大数据场景的需求,如 ETL、BI 报表等。Kyuubi 提供了标准的 ODBC/JDBC 接口,使用户能够使用 SQL 语言查询各种数据源。它具备多租户、安全性和高可用性等特性,使其适用于高并发的企业级大数据查询分析等场景。

  • Server Discovery/Load Balance:使用 ZK/ETCD 进行服务发现和负载均衡,客户在提交作业的过程中会通过ZK/ETCD 的负载均衡策略路由到一台 KyuubiServer 上管理作业执行。
  • Servers:支持多个 KyuubiServer,启动过程中会注册到 ZK/ETCD,方便进行服务发现和负载均衡。多个 Server也实现了冷备的 HA。
  • Engine Discovery:客户端请求在 KyuubiServer 中会通过 Engine Discovery 找到自己的 Engine,然后将请求提到对应的 Engine 中。
  • Engines:具体的执行引擎,例如 Spark、Trino 等引擎。

LAS 基于 Kyuubi 构建了 Spark 的近实时查询分析能力,有效的支持了多租户、高并发场景。Kyuubi 支持Connection、User、Group 等不同级别的隔离能力,通过和 LAS 租户队列能力的结合,充分实现了资源隔离,确保了不同租户的 Spark 任务之间的资源公平分配。基于 Kyuubi,LAS 提供了简单易用的接口,用户可以通过 JDBC/ODBC 客户端或者 LAS Console 进行交互,可以轻松地在 LAS 上运行 Spark SQL 查询。

为了能够适配更多类型的引擎(比如 Presto),LAS 在 Kyuubi 之外进行了深度的自研扩展,提供了统一 SQL 的能力(code name:ByteQuery),并在解析层完成了大量的优化。在这里由于篇幅有限,我们后续再给大家做进一步分享。

CatalogService:湖仓一体元数据架构实践

作为湖仓一体分析服务,LAS 面临的下一个挑战是如何为 Spark 引擎屏蔽元数据的差异性。为了解决这个问题,LAS 自研了统一的元数据服务 CatalogService。CatalogService 提供与 HMS(Hive Metastore)兼容的接口,并为所有查询引擎提供统一的元数据视图,解决了异构数据源的元数据管理问题。

CatalogService 整体分三层,第一层是 Catalog Federation,提供统一的视图和跨地域的数据访问能力。以及提供了对源数据请求的路由能力,可以根据元数据请求的类型,支持通过 Mapping 的方式,来路由不同的服务请求对应的底层元数据服务实例。

第二层是 CatalogService 下层的具体元数据服务的实现,比如 Hive MetaStore Service,ByteLake MetaStore Service 以及其他元数据服务。这些元数据服务与 CatalogService 进行对接,以统一向上层引擎提供元数据服务。

最后一层是 MetaStore 的存储层,它通过插件式的方式来提供不同的存储引擎,来满足上层不同元数据服务实例的存储要求。

LAS Batch Scheduler:云原生批处理调度器

Spark 作业提交到 Kubernetes 集群之后,如何高效的进行资源调度成为 LAS Spark 需要解决的下一个课题。Kubernetes default-scheduler 最初是为了容器编排服务设计的,虽然社区后续也做了大量的改进,但是在调度功能和吞吐性能上并不是批处理作业的最佳选择。因此,LAS 在云原生的基础上完善了 Spark 作业的资源调度能力。

LAS Batch Scheduler 提供了Gang Scheduling,FIFO/Fair Scheduling,min/maxQuota,优先级抢占,超卖,CPU/GPU混合调度等批处理作业依赖的所有调度能力,并通过全局调度缓存提升了批处理调度的性能。在架构上,我们采用了组合模式的设计,具有高度的扩展性,方便在后续基于批处理调度器进行进一步的改进。

针对不同规模 Spark 作业的执行特点,LAS 在 Batch Scheduler 的基础上,实现了服务态 workload 层面的资源 Quota 实时打通,从而实现服务态热启动的中小规模 Spark 作业和冷启动的大规模 Spark 作业在租户队列层面能够实现实时 Quota 共享。此外,LAS在多租户、安全性、实时弹性等等方面进行了众多优化,我们会在后续合适的机会再做进一步的展开。

UIService:云原生 Spark History Server

在完成资源调度之后,Spark 作业正式进入执行阶段。在 Spark 作业的执行阶段,LAS 进行了大量的优化工作,其中的详细内容将在其他专题分享文章中展开介绍。而在本文中,我们将重点分享 LAS 自主研发的全新云原生 Spark 历史服务—— UIService。相比开源的 SHS(Spark History Server),UIService 存储占用和访问延迟均降低 90% 以上。

原生的 Spark History Service 建立在 Spark 事件(Spark Event)体系之上,在 Spark 任务运行期间会产生大量包含运行信息的 SparkListenerEvent,例如 ApplicationStart / StageCompleted / MetricsUpdate 等等,都有对应的 SparkListenerEvent 实现。所有的 event 会发送到 ListenerBus 中,被注册在 ListenerBus 中的所有 listener 监听。其中 EventLoggingListener 是专门用于生成 event log 的监听器。它会将 event 序列化为 Json 格式的 event log 文件,写到文件系统中(如 HDFS)。

在 History Server 侧,核心逻辑在 FsHistoryProvider 中。FsHistoryProvider 会维持一个线程间歇扫描配置好的 event log 存储路径,遍历其中的 event log 文件,提取其中概要信息(主要是 appliaction_id, user, status, start_time, end_time, event_log_path),维护一个列表。当用户访问 UI,会从列表中查找请求所需的任务,如果存在,就完整读取对应的 event log 文件,进行解析。解析的过程就是一个回放过程(replay)。Event log 文件中的每一行是一个序列化的 event,将它们逐行反序列化,并使用 ReplayListener 将其中信息反馈到 KVStore 中,还原任务的状态。

无论运行时还是 History Server,任务状态都存储在有限几个类的实例中,而它们则存储在 KVStore 中,KVStore 是 Spark 中基于内存的 KV 存储,可以存储任意的类实例。前端会从 KVStore 查询所需的对象,实现页面的渲染。

原生的 Spark History Service 存在以下问题:

  1. 存储空间开销大

Spark 的事件体系非常详细,导致 event log 记录的事件数量非常大,对于 UI 显示来说,大部分 event 是无用的。并且 event log 一般使用 Json 明文存储,空间占用较大。

  1. 回放效率差,延迟高

History Server 采用回放解析 event log 的方式还原 Spark UI,有大量的计算开销,当任务较大就会有明显的响应延迟,大型作业结束之后,用户可能要等十几分钟甚至半小时才能通过 History Server 看到作业历史,非常影响用户体验。

  1. 扩展性差

History Server 的 FsHistoryProvider 在回放解析文件之前,需要先扫描配置的 event log 路径,遍历其中的 event log,将所有文件的元信息加载到内存中,这使得原生服务成为了有状态的服务。因此每次服务重启,都需要重新加载整个路径,才能对外服务。每个任务在完成后,也需要等待下一轮扫描才能被访问到。难以方便的进行水平扩展。

  1. 非云原生

Spark History Server 并非是云原生的服务,不同租户的 workload 差异很大,在公有云场景下改造和维护成本高。

为了解决前面的几个问题,我们尝试对 History Server 进行改造。

无论运行中 Spark Driver 还是 History Server,都是通过监听 event,将其中包含的任务变化信息反映到几种 UI 相关的类的实例中,然后存入 KVStore 供 UI 渲染。也就是说,KVStore 中存储着 UI 显示所需的完备信息。对于 History Server 的用户来说,绝大多数情况下我们只关心任务的最终状态,而无需关心引起状态变化的具体 event。因此,我们可以只将 KVStore 持久化下来,而不需要存储大量冗余的 event 信息。此外,KVStore 原生支持了 Kryo 序列化,性能明显于 Json 序列化。我们基于此思想重写了一套新的 History Serve 系统,命名为 UIService。

我们将 KVStore 中和 UI 相关的所有类实例,我们将这些类统称为 UIMeta 类。具体包括 AppStatusStore 和SQLAppStatusStore 中的信息(如下所列)。我们定义一个类 UIMetaStore 来抽象,一个 UIMetaStore 即一个任务所有 UI 信息的集合。

类似于 EventLoggingListener,为 UIMeta 开发了专用的 Listener —— UIMetaLoggingListener,用于监听事件,写 UIMeta 文件。

和 EventLoggingListener 进行对比:EventLoggingListener 每接受一个 event 都会触发写,写的是序列化的 event;而 UIMetaLoggingListener 只会被特定的 event 触发,目前是只会被 stageEnd,JobEnd 事件触发,但每次写操作是批量的写,将上一阶段的 UIMetaStore 的信息完整地持久化。

我们使用 UIMetaProvider 替换原先的 FsHistoryProvider,主要区别在于:

  • 将读取 event log 文件和回放生成 KVStore 的流程改为读取 UIMetaFile,反序列化出 UIMetaStore。
  • 去掉了 FsHistoryProvider 的路径扫描逻辑;每次 UI 访问,根据 appid 和路径规则,直接去读取 UIMetaFile 解析。这使得 UIService 无需预加载所有文件元信息,不需要随着任务数量增加提高服务器配置,方便了水平扩展。

通过构建 UIService,我们极大的节省了 Spark UI 相关 event 的存储空间,并有效的提升了 UI 访问延迟性能,在架构上我们也基于 UIService 实现了多租户访问隔离,云原生和弹性伸缩能力。

Falcon:Remote Shuffle Service

除了 UIService 之外,Shuffle 层面的优化也是一个值得重点分享的课题。Shuffle 是 Spark 作业中用于连接上下游数据交互的过程。提供 Shuffle 能力的服务称为 Shuffle Service。最初,Spark 内部实现了基于哈希的 Shuffle Service,后来引入了基于排序的 Shuffle Service。尽管 Spark 内部不断对 Shuffle 机制进行迭代和改进,但由于存储和计算之间的耦合限制,Spark 内部实现的 Shuffle 机制在某些场景下可用性受到限制。

为了解决这个问题,业界提出了将 Shuffle Service从Spark 中独立出来的 Shuffle 设计,通常称为 Remote Shuffle Service(RSS)。 RSS 允许 Shuffle Service 在 Spark 之外运行,解耦了存储和计算,提供更好的可用性和性能。

Falcon 是 LAS 上的 Remote Shuffle Service,采用高可用及存算分离的架构。它能够支持 Spark 引擎进行远程 Shuffle 数据的读取和写入,并可在云环境中部署和应用。

Falcon 实现了在 Reducer 端聚合数据的能力,解决了 Shuffle 阶段碎片读的问题,这大大降低了对磁盘 IOPS 的依赖,减少了 Spark Executor 运行时 OOM(内存溢出)的风险,保障了大 Shuffle 作业的稳定性。此外,Falcon 还提供了 Tiered Storage(分层存储)能力,根据作业规模的不同选择不同介质(内存/SSD/HDD)存储 Shuffle 数据,进一步提升了中小规模作业执行性能。

在 Falcon上,LAS 提供了 CRC 校验方案,以保障 Shuffle 数据的稳定性,避免了因为数据丢失导致的执行正确性问题。

总结

以上就是本文想要与大家分享的 LAS Spark 在云原生湖仓分析服务中的实践和优化内容。无论是面对大规模数据处理、实时分析还是复杂的人工智能任务,基于 Spark 的云原生湖仓分析实践为企业提供了一种强大的解决方案。通过本篇实践指南,希望能够帮助读者深入理解云原生湖仓一体分析服务 LAS 基于 Spark 引擎的一系列设计和实践,并在实际项目中应用 Spark 和云原生技术,为企业的数据驱动决策和创新提供支持。

考虑到篇幅限制,本文未能对许多实践细节进行详尽展开。我们会在后续的专题文章中对其进行更深入的解读。感兴趣的读者可以继续关注我们的后续分享。

湖仓一体分析服务 LAS(Lakehouse Analytics Service)是面向湖仓一体架构的 Serverless 数据处理分析服务,提供字节跳动最佳实践的一站式 EB 级海量数据存储计算和交互分析能力,兼容 Spark、Presto 生态,帮助企业轻松构建智能实时湖仓。新人优惠来袭!赠送给所有新人用户的专属福利来啦,LAS 数据中台新人特惠 1 元秒杀活动最新上线!更有超多叠加优惠等你来抢!感谢大家一直以来对我们的支持与厚爱,我们会一如既往地为您带来更好的内容。(点击链接,可顺滑体验)

链接:zjsms.com/jVCr5bp/