2024年,不懂大数据开发将成落伍者!入门必备知识解析

发表时间: 2024-06-04 17:16

Kappa架构与架构实例

Lambda架构为开发大数据量下的实时应用提供了一种切实有效的通用模式。

通过将数据和处理分为批处理层、快速计算层和服务层3个相对独立的层次,Lambda架构降低了大数据在持续更新过程中问题的复杂性,并能够实时获得在全部数据集合上的查询结果。

不过Lambda架构也存在一些问题,其中最主要的就是,对于同一个查询目标,需要分别为批处理层和快速计算层开发不同的算法实现。也就是说,对于同一套大体相同的逻辑,需要开发两种完全不同的代码,这给开发、测试和运维都带来一定的复杂性和额外工作量。

Kappa架构

为了解决Lambda架构中因为批处理层和快速计算层“异质”带来的复杂性问题,LinkedIn的Jay Kreps在Lambda架构的基础上提出了Kappa架构。

Kappa架构的核心思路是将批处理层用快速处理层的流计算技术替换。这样一来,批处理层和快速处理层均使用相同的流处理逻辑,在开发、测试和运维上都有一个更统一的框架,从而降低了开发、测试和运维的成本。

最初的Kappa架构建立在Kafka的基础上,这大概就是这种架构名字的首字母为K的原因。Kappa这个名字给部分刚接触这种架构模式的开发人员(如笔者)带来一个很微妙的疑惑。对于像笔者这样从传统批处理转向流处理(特别是在大数据领域)的开发人员来说,其或多或少都会对Kafka能不能存储HDFS量级的离线数据持一定的怀疑态度,不确定这种做法是否合理。这种怀疑有一定道理,毕竟我们可以在Kafka存储1周、1个月,甚至1个季度的数据,但是如果时间线真的很长,如三年五载,再加上数据量真的超级大,如T级甚至P级,这种数据就这样“裸”着放在Kafka中,是不是真的合适?现在笔者敢说,这样做是合理的。

这种直接管理和存储流式数据的功能正是Kafka最初的设计目标之一。

即便没有Kafka,这种直接管理和存储流式“大”数据的“数据库系统”也是非常有用的,所以一定会有人开发这样的“流式数据库系统”。如果你对Kafka不满意,但又实在想要一个这样的流式数据库,那么,话说自己造一个轮子也是有趣的事呢!

如果我们从数据处理的角度来看Kappa架构的离线处理部分,还会有更加清晰的认识。对于主流流计算框架,如Flink和Spark Streaming等,“流数据”和“块数据”的区别已经开始逐渐模糊。在Flink中,块数据处理相关API的底层就是用流数据来实现的,而且在未来的Flink开发计划中还将进一步地去掉块数据处理相关API,最终统一为流处理API。

在Spark Streaming中,众所周知的概念就是“流”是由一系列的“块”组成的,流数据的处理最终转化为块数据的处理。不管是“流”就是“块”,还是“块”就是“流”,这都说明以“流”这种统一的方式来处理数据已经是各主流大数据处理框架的共识。换言之,“流”已然是大势所趋。所以,即便没有像Kafka这样的消息中间件,我们也可以先将流数据以块的方式存储在HDFS上,然后以“流”的方式对其进行读取和处理,这样同样达到了将批处理层替换为流计算的目的,统一了Lambda架构中的批处理和快速处理层的开发界面,减少了开发、测试和运维的复杂程度。

所以,在诸如Kafka和Pulsar等新一代流式大数据存储方案,以及Flink和Spark Streaming等新一代流计算框架的双重加持下,用Kappa架构取代Lambda架构成了自然而然的选择。Kappa架构如图7-3所示。

图7-3 Kappa架构

从图7-3可以看出,Kappa架构本质上依旧是Lambda架构的一种,只是原本用作离线的批处理层被流计算取代了。在使用Kappa架构时,不管是用Kafka等“流数据库”存储数据,还是用HDFS等分布式文件系统存储数据(没错,未尝不可,记住Flink能够用流的方式处理HDFS上的块数据),对于离线部分的数据,保存所需分析时间窗口的数据,过期数据可以(或者说必须)通过设置过期策略进行淘汰。例如,如果要分析的数据是过去3天的数据,就设置超期时间为3天,只保留3天的数据。然后设置定时任务,定期对时间窗口内的全量数据进行处理,处理的结果保存到数据库中。当新的离线计算结果出来后,旧的离线计算结果就可以删除了。

由于快速处理层同样采用流计算方式,所以批处理层和快速处理层可以用完全相同的流计算框架和大体相同的流计算代码来实现,无非两者设定的时间窗口、滑动周期有所不同而已。例如,离线计算部分的时间窗口为3天,计算耗时20分钟,我们设置这个离线任务每30分钟调度一次。同时,设置实时计算部分的窗口为30分钟,计算耗时很短,如10秒,设置每15秒执行一次。将两者每次计算的结果都存入数据库,然后就可以为服务层实时提供(最多有15秒时延)最近3天的全量数据的查询结果了。相比没有快速计算层时每次查询都至少有20分钟的时延,采用Lambda架构能够将查询时延降低到15秒,而采用Kappa架构后不仅时延减少到了15秒,而且只需要开发一套代码就可以同时实现离线部分和实时部分的处理逻辑。

Kappa架构实例

正所谓“光说不练假把式”,下面我们就来使用Flink实现Kappa架构。假设现在需要统计“过去3天每种商品的销售量”,我们在Kappa架构中将这个计算任务分为离线处理层和快速处理层两层。离线处理层的实现如下:

DataStream counts = stream

// 将字符串的数据解析为JSON对象

.map(new MapFunction<String, Event>() {

@Override

public Event map(String s) throws Exception {

return JSONObject.parseObject(s, Event.class);

}

})

// 提取出每个事件中的商品,转化为商品计数事件

.map(new MapFunction<Event, CountedEvent>() {

@Override

public CountedEvent map(Event event) throws Exception {

return new CountedEvent(event.product, 1, event.timestamp);

}

})

.assignTimestampsAndWatermarks(new EventTimestampPeriodicWatermarks())

.keyBy("product")

// 对于批处理层,使用滑动窗口SlidingEventTimeWindows

.timeWindow(Time.days(3), Time.minutes(30))

// 最后是批处理窗口内的聚合计算

.reduce((e1, e2) -> {

CountedEvent countedEvent = new CountedEvent();

countedEvent.product = e1.product;

countedEvent.timestamp = e1.timestamp;

countedEvent.count = e1.count + e2.count;

countedEvent.minTimestamp = Math.min(e1.minTimestamp, e2.minTimestamp);

countedEvent.maxTimestamp = Math.min(e1.maxTimestamp, e2.maxTimestamp);

return countedEvent;

});;

在上面的批处理层实现中,我们采用了长度为3天、步长为30分钟的滑动时间窗口。也就是说,每30分钟会计算一次3天内各个商品的销售量。

快速处理层的实现如下:

DataStream counts = stream

// 将字符串的数据解析为JSON对象

.map(new MapFunction<String, Event>() {

@Override

public Event map(String s) throws Exception {

return JSONObject.parseObject(s, Event.class);

}

})

// 提取出每个事件中的商品,转化为商品计数事件

.map(new MapFunction<Event, CountedEvent>() {

@Override

public CountedEvent map(Event event) throws Exception {

return new CountedEvent(event.product, 1, event.timestamp);

}

})

.assignTimestampsAndWatermarks(new EventTimestampPeriodicWatermarks())

.keyBy(x -> x.product)

// 对于批处理层,使用翻转窗口TumblingEventTimeWindows

.window(TumblingEventTimeWindows.of(Time.seconds(15)))

// 最后是批处理窗口内的聚合计算

.reduce((e1, e2) -> {

CountedEvent countedEvent = new CountedEvent();

countedEvent.product = e1.product;

countedEvent.timestamp = e1.timestamp;

countedEvent.count = e1.count + e2.count;

countedEvent.minTimestamp = Math.min(e1.minTimestamp, e2.minTimestamp);

countedEvent.maxTimestamp = Math.min(e1.maxTimestamp, e2.maxTimestamp);

return countedEvent;

});;

在上面的快速处理层实现中,我们采用了长度为15秒的翻转时间窗口。也就是说,每15秒会计算一次15秒内各个商品的销售量。相信聪明的读者们看到这时已经发现Kappa架构的优势所在了——在上面批处理层和快速处理层的实现中,除了两个窗口的类型不一样以外,其他代码是完全一样的!是不是非常惊艳呢?要知道这给开发和运维减少了太多太多的工作量啊!

在批处理层和快速处理层各自计算出结果后,需要将计算结果存入数据库,具体如下:

public class JdbcWriter extends RichSinkFunction<CountedEvent> {

// 将每个窗口内的计算结果保存到数据库中

private String inset_sql = "INSERT INTO

table_counts(id,start,end,product,v_count,layer) VALUES(?,?,?,?,?,?) " +

"ON DUPLICATE KEY UPDATE start=?,end=?,product=?,v_count=?,layer=?;";

private long slideMS = 0;

private long slideNumberInWindow = 0;

private String layer = null; public JdbcWriter(long slideMS, long slideNumberInWindow, String layer) {

this.slideMS = slideMS;

this.slideNumberInWindow = slideNumberInWindow;

this.layer = layer;

}

@Override

public void invoke(CountedEvent value, Context context) throws Exception {

// 通过对滑动或翻滚的步长取整,以对齐时间窗口,从而方便后续合并离线部分和实时部分的计算

结果

long start = value.minTimestamp / slideMS;

long end = value.maxTimestamp / slideMS + slideNumberInWindow;

String product = value.product;

int v_count = value.count;

String layer = this.layer;

String id = DigestUtils.md5Hex(Joiner.on("&").

join(Lists.newArrayList(start, end, product, layer)));

preparedStatement.setString(1, id);

preparedStatement.setLong(2, start);

preparedStatement.setLong(3, end);

preparedStatement.setString(4, product);

preparedStatement.setInt(5, v_count);

preparedStatement.setString(6, layer);

preparedStatement.setLong(7, start);

preparedStatement.setLong(8, end);

preparedStatement.setString(9, product);

preparedStatement.setInt(10, v_count);

preparedStatement.setString(11, layer);

preparedStatement.executeUpdate();

}

}

在上面的实现中,我们将批处理层和快速处理层的结果都存入了数据库。最后服务层就只需要通过一条简单的SQL语句就能将两者的计算结果合并起来了,具体如下:

SELECT product, sum(v_count) as s_count from

(

SELECT * FROM table_counts WHERE start=? AND end=? AND layer='batch'

UNION

SELECT * FROM table_counts WHERE start>=? AND end<=? AND layer='fast'

) as union_table GROUP BY product;

在上面的代码中,我们使用UNION操作将批处理层和快速处理层的结果合并起来,然后在这个合并的表上,通过分组聚合计算即可非常方便并且精确地计算出“过去3天每种商品的销售量”了。

本篇文章给大家讲解的内容是Kappa架构与架构实例

下篇文章给大家讲解的内容是数据传输-消息中间件