11. Batch Processing

32.4K
0
0
最后修改于

Chapter 11. Batch Processing 第 11 章 批处理#

A system cannot be successful if it is too strongly influenced by a single person. Once the initial design is complete and fairly robust, the real test begins as people with many different viewpoints undertake their own experiments.
如果系统过于受某一个人的强烈影响,那么它不可能成功。一旦初始设计完成且相当稳健,真正的考验才刚刚开始,因为持有许多不同观点的人们开始进行自己的实验。

Donald Knuth

Much of this book so far has talked about requests and queries, and the corresponding responses or results. This style of data processing is assumed in many modern data systems: you ask for something, or you send an instruction, and the system tries to give you an answer as quickly as possible.
到目前为止,这本书主要讨论了请求和查询,以及相应的响应或结果。这种数据处理方式被许多现代数据系统所采用:你提出请求或发送指令,系统则尽可能快地给出答案。

A web browser requesting a page, a service calling a remote API, databases, caches, search indexes, and many other systems work this way. We call these online systems. Response time is usually their primary measure of performance, and they often require fault tolerance to ensure high availability.
一个网页浏览器请求页面、一个服务调用远程 API、数据库、缓存、搜索索引以及许多其他系统都采用这种方式。我们称这些为在线系统。响应时间是它们衡量性能的主要指标,并且它们通常需要容错能力以确保高可用性。

However, sometimes you need to run a bigger computation or process larger amounts of data than you can do in an interactive request. Maybe you need to train an AI model, or transform lots of data from one form into another, or compute analytics over a very large dataset. We call these tasks batch processing jobs, or sometimes offline systems.
然而,有时你需要执行比交互式请求更大的计算或处理更多数据。也许你需要训练一个 AI 模型,或者将大量数据从一种形式转换为另一种形式,或者对一个非常大的数据集进行计算分析。我们称这些任务为批量处理作业,有时也称为离线系统。

A batch processing job takes some input data (which is read-only), and produces some output data (which is generated from scratch every time the job runs). It typically does not mutate data in the way a read/write transaction would. The output is therefore derived from the input (as discussed in “Systems of Record and Derived Data”): if you don’t like the output, you can just delete it, adjust the job logic, and run it again. By treating inputs as immutable and avoiding side effects (such as writing to external databases), batch jobs not only achieve good performance but also become much easier to maintain:
批处理任务处理一些输入数据(这些数据是只读的),并生成一些输出数据(每次任务运行时都从零开始生成)。它通常不会像读写事务那样改变数据。因此,输出是从输入派生出来的(如 “主数据系统与派生数据” 中所述):如果你不喜欢输出结果,只需删除它,调整任务逻辑,然后重新运行即可。通过将输入视为不可变并避免副作用(例如写入外部数据库),批处理任务不仅能够实现良好的性能,而且维护起来也变得容易得多:

  • If you introduce a bug into the code and the output is wrong or corrupted, you can simply roll back to a previous version of the code and rerun the job, and the output will be correct again. Or, even simpler, you can keep the old output in a different directory and simply switch back to it. Databases with read-write transactions do not have this property: if you deploy buggy code that writes bad data to the database, then rolling back the code will do nothing to fix the data in the database.The idea of being able to recover from buggy code has been called human fault tolerance [1].
    如果你在代码中引入了错误,并且输出结果错误或损坏,你可以简单地回滚到代码的先前版本并重新运行作业,输出就会再次正确。或者,甚至更简单,你可以在不同的目录中保留旧输出,然后直接切换回它。具有读写事务的数据库不具备这种特性:如果你部署了有错误的代码,导致向数据库写入坏数据,那么回滚代码对修复数据库中的数据没有任何作用。能够从有错误的代码中恢复过来的想法被称为人类容错性 [1]。
  • As a consequence of this ease of rolling back, feature development can proceed more quickly than in an environment where mistakes could mean irreversible damage. This principle of minimizing irreversibility is beneficial for Agile software development [2].
    由于回滚的便捷性,功能开发可以比在可能造成不可逆损害的环境中犯错的情况更快地进行。这种最小化不可逆性的原则对敏捷软件开发 [2] 有益。
  • The same set of files can be used as input for various different jobs, including monitoring jobs that calculate metrics and evaluate whether a job’s output has the expected characteristics (for example, by comparing it to the output from the previous run and measuring discrepancies).
    同一组文件可以用作各种不同作业的输入,包括监控作业,这些作业计算指标并评估作业输出是否具有预期特征(例如,通过将其与上一次运行的输出进行比较并测量差异)。

Batch data processing also presents challenges. With most frameworks, output can only be processed by other jobs after the whole job finishes. Batch processing can also be inefficient: any change to input data—even a single byte—means the batch job must reprocess the entire input dataset. Despite these limitations, batch processing has proven useful in a wide range of use cases, which we’ll revisit in “Batch Use Cases”.
批量数据处理也带来了挑战。在大多数框架中,输出只能在整个作业完成后由其他作业处理。批量处理也可能效率低下:输入数据的任何更改 —— 即使是单个字节 —— 都意味着批量作业必须重新处理整个输入数据集。尽管存在这些限制,批量处理在广泛的用例中已被证明是有用的,我们将在 “批量用例” 中再次探讨。

A batch job may take a long time to run: minutes, hours, or even days. Jobs may be scheduled to run periodically (for example, once per day). The primary measure of performance is usually throughput: how much data the job can process per unit time. Some batch systems handle faults by simply aborting and restarting the whole job, while others have fault tolerance so that a job can complete successfully despite some of its nodes crashing.
批量作业可能需要很长时间运行:几分钟、几小时甚至几天。作业可能会被安排定期运行(例如,每天一次)。性能的主要衡量标准通常是吞吐量:作业每单位时间可以处理多少数据。一些批量系统通过简单地中止并重新启动整个作业来处理故障,而其他系统具有容错能力,因此即使其中一些节点崩溃,作业也能成功完成。

Note 注意#

An alternative to batch processing is stream processing, in which the job doesn’t finish running when it has processed the input, but instead continues watching the input and processes changes in the input shortly after they happen. We will turn to stream processing in Chapter 12.
批量处理的一种替代方案是流处理,在这种处理方式中,作业在处理输入后不会结束运行,而是继续监视输入,并在输入发生变化后不久进行处理。我们将在第 12 章转向流处理。

The boundary between online and batch processing systems is not always clear: a long-running database query looks quite like a batch process. But batch processing also has some particular characteristics that make it a useful building block for building reliable, scalable, and maintainable applications. For example, it often plays a role in data integration, i.e., composing multiple data systems to achieve things that one system alone cannot do. ETL, as discussed in “Data Warehousing”, is an example of this.
在线处理和批处理系统之间的界限并不总是清晰的:一个长时间运行的数据库查询看起来很像一个批处理过程。但批处理也有一些特定的特性,使其成为构建可靠、可扩展和可维护应用程序的有用构建模块。例如,它在数据集成中经常发挥作用,即组合多个数据系统来实现单个系统无法完成的事情。正如 “数据仓库” 中讨论的,ETL 是一个这样的例子。

Modern batch processing has been heavily influenced by MapReduce, a batch processing algorithm that was published by Google in 2004 [3], and subsequently implemented in various open source data systems, including Hadoop, CouchDB, and MongoDB. MapReduce is a fairly low-level programming model, and less sophisticated than the parallel query execution engines found, for example, in data warehouses [4,5]. When it was new, MapReduce was a step forward in terms of the scale of processing that could be achieved on commodity hardware, but now it is largely obsolete, and no longer used at Google [6,7].
现代批处理受到了 MapReduce 的严重影响,MapReduce 是一种由 Google 于 2004 年发布的批处理算法 [3],随后在各种开源数据系统中实现,包括 Hadoop、CouchDB 和 MongoDB。MapReduce 是一种相当低级的编程模型,比例如在数据仓库中发现 [4, 5] 的并行查询执行引擎要简单。当它刚出现时,MapReduce 在可用的商用硬件上实现的处理规模上是一个进步,但现在它基本上已经过时了,并且不再在 Google 使用 [6, 7]。

Batch processing today is more often done using frameworks such as Spark or Flink, or data warehouse query engines. Like MapReduce, they rely heavily on sharding (see Chapter 7) and parallel execution to allow them to scale to very large datasets. As these systems have matured, operational concerns have been largely solved, so focus has shifted toward usability. New processing models such as dataflow APIs, query languages, and DataFrame APIs are now widely supported. Job and workflow orchestration has also matured. Hadoop-centric workflow schedulers have been replaced with ones that support a wide array of batch processing frameworks and cloud data warehouses.
当今的批处理更多是使用 Spark 或 Flink 等框架,或数据仓库查询引擎来完成。与 MapReduce 类似,它们严重依赖分片(见第 7 章)和并行执行,以使其能够扩展到非常大的数据集。随着这些系统的成熟,运营问题已基本解决,因此重点已转向可用性。现在,数据流 API、查询语言和 DataFrame API 等新的处理模型得到了广泛支持。作业和工作流编排也已成熟。以 Hadoop 为中心的工作流调度器已被支持多种批处理框架和云数据仓库的调度器所取代。

Cloud computing has grown ubiquitous. Batch storage layers are shifting from distributed filesystems (DFSs) like HDFS, GlusterFS, and CephFS to object storage systems such as S3. Scalable cloud data warehouses like BigQuery and Snowflake are blurring the line between data warehouses and batch processing.
云计算已经变得无处不在。批处理存储层正从 HDFS、GlusterFS 和 CephFS 等分布式文件系统(DFS)转向 S3 等对象存储系统。像 BigQuery 和 Snowflake 这样的可扩展云数据仓库正在模糊数据仓库和批处理之间的界限。

To build an intuition of what batch processing is about, we will start this chapter with an example that uses standard Unix tools on a single machine. We will then investigate how we can extend data processing to multiple machines in a distributed system. We will see that, much like an operating system, distributed batch processing frameworks have a scheduler and a filesystem. We will then explore various processing models that we use to write batch jobs. Finally, we discuss common batch processing use cases.
为了建立对批处理的直观理解,本章将从在单台机器上使用标准 Unix 工具的示例开始。然后我们将研究如何在分布式系统中将数据处理扩展到多台机器。我们会发现,与操作系统类似,分布式批处理框架也有调度器和文件系统。接下来,我们将探讨我们用来编写批处理作业的各种处理模型。最后,我们将讨论常见的批处理用例。

Batch Processing with Unix Tools 使用 Unix 工具进行批处理#

Say you have a web server that appends a line to a log file every time it serves a request. For example, using the nginx default access log format, one line of the log might look like this:
假设你有一个 Web 服务器,每次服务请求时都会向日志文件追加一行。例如,使用 nginx 默认的访问日志格式,其中一行日志可能如下所示:

(That is actually one line; it’s only broken onto multiple lines here for readability.) There’s a lot of information in that line. In order to interpret it, you need to look at the definition of the log format, which is as follows:
(实际上这是一行;这里仅为了可读性被分成多行。)这一行包含大量信息。为了解读它,你需要查看日志格式的定义,如下所示:

So, this one line of the log indicates that on June 27, 2025, at 17:55 UTC, the server received a request for the file /css/typography.css from the client IP address 216.58.210.78. The user was not authenticated, so $remote_user is set to a hyphen (-). The response status was 200 (i.e., the request was successful), and the response was 3,377 bytes in size. The web browser was Chrome 137, and it loaded the file because it was referenced in the page at the URL https://martin.kleppmann.com/.
因此,这一行日志表明,在 2025 年 6 月 27 日 17:55 UTC,服务器收到了来自客户端 IP 地址 216.58.210.78 对文件 /css/typography.css 的请求。用户未通过身份验证,因此 $remote_user 设置为短横线( - )。响应状态为 200(即请求成功),响应大小为 3,377 字节。使用的 Web 浏览器是 Chrome 137,并且浏览器加载了该文件,因为它在页面 URL https://martin.kleppmann.com/ 中被引用。

Simple Log Analysis 简单的日志分析#

Various tools can take these log files and produce pretty reports about your website traffic, but for the sake of exercise, let’s build our own, using basic Unix tools. For example, say you want to find the five most popular pages on your website. You can do this in a Unix shell as follows:
有多种工具可以处理这些日志文件并生成关于网站流量的详细报告,但为了练习,让我们使用基本的 Unix 工具来构建自己的工具。例如,假设你想找到网站上访问量最高的五个页面。你可以在 Unix shell 中按以下方式完成:

Read the log file. (Strictly speaking, cat is unnecessary here, as the input file could be given directly as an argument to awk. However, the linear pipeline is more apparent when written like this.)
读取日志文件。(严格来说,这里 cat 是不必要的,因为输入文件可以直接作为 awk 的参数。然而,写成这样时线性管道的流程更加明显。)

Split each line into fields by whitespace, and output only the seventh such field from each line, which happens to be the requested URL. In our example line, this request URL is /css/typography.css.
按空白字符将每一行分割成多个字段,并输出每行的第七个字段,这个字段恰好是请求的 URL。在我们的示例行中,这个请求 URL 是/css/typography.css。

Alphabetically sort the list of requested URLs. If some URL has been requested n times, then after sorting, the file contains the same URL repeated n times in a row.
按字母顺序 sort 列出请求的 URL。如果某个 URL 被请求了 n 次,那么在排序后,文件中会连续包含相同 URL 重复 n 次。

The uniq command filters out repeated lines in its input by checking whether two adjacent lines are the same. The -c option tells it to also output a counter: for every distinct URL, it reports how many times that URL appeared in the input.
uniq 命令通过检查相邻两行是否相同来过滤输入中的重复行。 -c 选项指示它同时输出一个计数器:对于每个不同的 URL,它报告该 URL 在输入中出现的次数。

The second sort sorts by the number (-n) at the start of each line, which is the number of times the URL was requested. It then returns the results in reverse (-r) order, i.e. with the largest number first.
第二个 sort 按每行开头的数字( -n )排序,即 URL 被请求的次数。然后以反向( -r )顺序返回结果,即以最大数字开头。

Finally, head outputs just the first five lines (-n 5) of input, and discards the rest.
最后, head 只输出输入的前五行( -n 5 ),并丢弃其余部分。

The output of that series of commands looks something like this:
那一系列命令的输出看起来大致如下:

Although the preceding command line likely looks a bit obscure if you’re unfamiliar with Unix tools, it is incredibly powerful. It will process gigabytes of log files in a matter of seconds, and you can easily modify the analysis to suit your needs. For example, if you want to omit CSS files from the report, change the awk argument to '$7 !~ /\.css$/ {print $7}'. If you want to count top client IP addresses instead of top pages, change the awk argument to '{print $1}'. And so on.
虽然如果你不熟悉 Unix 工具,前面的命令行可能看起来有点晦涩,但它非常强大。它可以在几秒钟内处理数 GB 的日志文件,并且你可以轻松地修改分析以满足你的需求。例如,如果你想在报告中排除 CSS 文件,将 awk 参数更改为 '$7 !~ /\.css$/ {print $7}' 。如果你想要统计顶级客户端 IP 地址而不是顶级页面,将 awk 参数更改为 '{print $1}' 。等等。

We don’t have space in this book to explore Unix tools in detail, but they are very much worth learning about. Surprisingly many data analyses can be done in a few minutes using some combination of awk, sed, grep, sort, uniq, and xargs, and they perform surprisingly well [8].
本书没有空间详细探讨 Unix 工具,但它们非常值得学习。令人惊讶的是,许多数据分析可以通过一些 awksedgrepsortuniqxargs 的组合在几分钟内完成,并且表现非常出色 [8]。

Chain of Commands Versus Custom Program 命令链与自定义程序#

Instead of the chain of Unix commands, you could write a simple program to do the same thing. For example, in Python, it might look something like this:
与其使用一串 Unix 命令,你可以写一个简单的程序来做同样的事情。例如,在 Python 中,它可能看起来像这样:

counts is a hash table that keeps a counter for the number of times we’ve seen each URL. A counter is zero by default.
counts 是一个哈希表,它保存了我们每次看到每个 URL 的计数器。计数器默认为零。

From each line of the log, we take the URL to be the seventh whitespace-separated field (the array index is 6 because Python’s arrays are zero-indexed).
从每一行日志中,我们将 URL 作为第七个以空格分隔的字段(数组索引是 6,因为 Python 的数组是零索引的)。

Increment the counter for the URL in the current line of the log.
增加当前日志行中 URL 的计数器。

Sort the hash table contents by counter value (descending), and take the top five entries.
按计数器值对哈希表内容进行降序排序,并取前五项。

Print out those top five entries.
打印出这五项。

This program is not as concise as the chain of Unix pipes, but it’s fairly readable, and which of the two you prefer is partly a matter of taste. However, besides the superficial syntactic differences between the two, there is a big difference in the execution flow, which becomes apparent if you run this analysis on a large file.
这个程序不如 Unix 管道链简洁,但它相当易读,你更偏爱哪种方式部分取决于个人喜好。然而,除了两者表面上的语法差异外,执行流程上有很大不同,如果你在大型文件上运行这个分析就会明显看出来。

Sorting Versus In-memory Aggregation 排序与内存中聚合#

The Python script keeps an in-memory hash table of URLs, where each URL is mapped to the number of times it has been seen. The Unix pipeline example does not have such a hash table, but instead relies on sorting a list of URLs in which multiple occurrences of the same URL are simply repeated.
Python 脚本在内存中维护一个 URL 的哈希表,其中每个 URL 映射到它被看到的次数。Unix 管道示例没有这样的哈希表,而是依赖于对 URL 列表进行排序,其中相同 URL 的多次出现只是简单地重复。

Which approach is better? It depends how many different URLs you have. For most small to mid-sized websites, you can probably fit all distinct URLs, and a counter for each URL, in (say) 1 GB of memory. In this example, the working set of the job (the amount of memory to which the job needs random access) depends only on the number of distinct URLs: if there are a million log entries for a single URL, the space required in the hash table is still just one URL plus the size of the counter. If this working set is small enough, an in-memory hash table works fine—even on a laptop.
哪种方法更好?这取决于你有多少不同的 URL。对于大多数中小型网站,你很可能可以将所有不同的 URL 以及每个 URL 的计数器都放入(比如)1GB 的内存中。在这个示例中,作业的工作集(作业需要随机访问的内存量)仅取决于不同 URL 的数量:即使单个 URL 有百万条日志条目,哈希表中所需的存储空间仍然只是一个 URL 加上计数器的大小。如果这个工作集足够小,内存中的哈希表工作得很好 —— 即使在笔记本电脑上也是如此。

On the other hand, if the job’s working set is larger than the available memory, the sorting approach has the advantage that it can make efficient use of disks. It’s the same principle as we discussed in “Log-Structured Storage”: chunks of data can be sorted in memory and written out to disk as segment files, and then multiple sorted segments can be merged into a larger sorted file. Mergesort has sequential access patterns that perform well on disks (see “Sequential vs. Random Writes on SSDs”).
另一方面,如果作业的工作集大于可用内存,排序方法的优势在于它能够有效利用磁盘。这与我们在 “日志结构化存储” 中讨论的原理相同:数据块可以在内存中排序,然后作为段文件写入磁盘,然后多个排序后的段可以合并成一个更大的排序文件。归并排序具有适合磁盘的顺序访问模式(参见 “SSD 上的顺序写入与随机写入”)。

The sort utility in GNU Coreutils (Linux) automatically handles larger-than-memory datasets by spilling to disk, and automatically parallelizes sorting across multiple CPU cores [9]. This means that the simple chain of Unix commands we saw earlier easily scales to large datasets, without running out of memory. The bottleneck is likely to be the rate at which the input file can be read from disk.
GNU Coreutils(Linux)中的 sort 工具通过溢写到磁盘来自动处理大于内存的数据集,并自动在多个 CPU 核心上并行排序 [9]。这意味着我们之前看到的简单的 Unix 命令链可以轻松扩展到大型数据集,而不会耗尽内存。瓶颈可能是输入文件从磁盘读取的速度。

A limitation of Unix tools is that they run only on a single machine. Datasets that are too large to fit in memory or local disk present a problem—and that’s where distributed batch processing frameworks come in.
Unix 工具的一个限制是它们只在单台机器上运行。对于无法放入内存或本地磁盘的大型数据集来说,这是一个问题 —— 而这就是分布式批量处理框架发挥作用的地方。

Batch Processing in Distributed Systems 分布式系统中的批量处理#

The machine that runs our Unix tool example has a number of components that work together to process the log data:
运行我们 Unix 工具示例的机器有多个组件协同工作来处理日志数据:

  • Storage devices that are accessed through the operating system’s filesystem interface.
    通过操作系统文件系统接口访问的存储设备。
  • A scheduler that determines when processes get to run, and how to allocate CPU resources to them.
    一个决定进程何时运行以及如何分配 CPU 资源的调度器。
  • A series of Unix programs whose stdin and stdout are connected together by pipes.
    一系列 Unix 程序,它们的 stdinstdout 通过管道连接在一起。

These same components exist in distributed data processing frameworks. In fact, you can think of a distributed processing framework as a distributed operating system; they have filesystems, job schedulers, and programs that send data to each other through the filesystem or other communication channels.
这些相同的组件存在于分布式数据处理框架中。事实上,你可以将分布式处理框架视为一个分布式操作系统;它们有文件系统、作业调度器和通过文件系统或其他通信通道相互发送数据的程序。

Distributed Filesystems 分布式文件系统#

The filesystem provided by your operating system is composed of several layers:
您操作系统提供的文件系统由多个层次组成:

  • At the lowest level, block device drivers speak directly to the disk, and allow the layers above to read and write raw blocks.
    在最低层次上,块设备驱动程序直接与磁盘通信,并允许上层读取和写入原始数据块。
  • Above the block layer sits a page cache that keeps recently accessed blocks in memory for faster access.
    在块层之上是一个页缓存,它将最近访问的块保存在内存中,以便更快地访问。
  • The block API is wrapped in a filesystem layer that breaks up large files into blocks, and tracks file metadata such as inodes, directories, and files. ext4 and XFS are two common implementations on Linux, for example.
    块 API 被封装在一个文件系统层中,该层将大文件分割成块,并跟踪文件元数据,如 inode、目录和文件。例如,ext4 和 XFS 是 Linux 上两种常见的实现。
  • Finally, the operating system exposes different filesystems to applications through a common API called the virtual file system (VFS). The VFS is what allows applications to read and write in a standard way regardless of the underlying filesystem.
    最后,操作系统通过一个称为虚拟文件系统(VFS)的通用 API 向应用程序暴露不同的文件系统。VFS 是允许应用程序以标准方式读取和写入,而不管底层文件系统是什么。

Distributed filesystems work in much the same way. Files are broken up into blocks, which are distributed across many machines. DFS blocks are typically much larger than local blocks: HDFS (Hadoop Distributed File System) defaults to 128MB, while JuiceFS and many object stores use 4MB blocks—much larger than ext4’s 4096 bytes. Large blocks mean less per-block metadata to keep track of, which makes a big difference on petabyte-sized datasets.
分布式文件系统的工作方式与此类似。文件被分割成块,这些块分布在多台机器上。DFS 块通常比本地块大得多:HDFS(Hadoop 分布式文件系统)默认为 128MB,而 JuiceFS 和许多对象存储使用 4MB 块 —— 比 ext4 的 4096 字节大得多。大块意味着每个块需要跟踪的元数据更少,这对处理 PB 级数据集有很大影响。

Most physical storage devices can’t write partial blocks, so operating systems require writes to use an entire block even if the data doesn’t take up the whole block. Since distributed filesystems have larger blocks and are usually implemented on top of operating system filesystems, they don’t have this requirement. For example, a 900MB file stored with 128MB blocks would have 7 blocks that use 128MB and 1 block that uses 4MB.
大多数物理存储设备不能写入部分块,因此操作系统要求写入操作必须使用整个块,即使数据没有占用整个块。由于分布式文件系统通常具有更大的块,并且通常建立在操作系统文件系统之上,因此它们没有这个要求。例如,一个 900MB 的文件使用 128MB 的块存储时,会有 7 个使用 128MB 的块和 1 个使用 4MB 的块。

DFS blocks are read by making network requests to a machine in the cluster that stores the block. Each machine runs a daemon, exposing an API that allows remote processes to read and write blocks as files on its local filesystem. HDFS refers to these daemons as DataNodes, while GlusterFS calls them glusterfsd processes. We’ll call them data nodes in this book.
DFS 块通过向集群中存储该块的机器发起网络请求来读取。每台机器都运行一个守护进程,该守护进程暴露了一个 API,允许远程进程将其本地文件系统上的块作为文件进行读写。HDFS 将这些守护进程称为 DataNodes,而 GlusterFS 则称它们为 glusterfsd 进程。在本书中,我们将它们称为数据节点。

Distributed filesystems also implement the distributed equivalent of a page cache. Since DFS blocks are stored as files on data nodes, reads and writes go through each data node’s operating system, which includes an in-memory page cache. This keeps frequently read data blocks in-memory on the data nodes. Some distributed filesystems also implement more caching tiers such as the client-side and local-disk caching found in JuiceFS.
分布式文件系统也实现了页面缓存的分布式版本。由于 DFS 块以文件形式存储在数据节点上,读写操作会通过每个数据节点的操作系统进行,而操作系统包含内存中的页面缓存。这会将频繁读取的数据块保留在数据节点的内存中。一些分布式文件系统还实现了更多缓存层级,例如 JuiceFS 中发现的客户端缓存和本地磁盘缓存。

Filesystems such as ext4 and XFS keep track of storage metadata including free space, file block locations, directory structures, permission settings, and more. Distributed filesystems also need a way to track file locations spread across machines, permission settings, and so on. Hadoop has a service called the NameNode, which maintains metadata for the cluster. DeepSeek’s 3FS has a metadata service that persists its data to a key-value store such as FoundationDB.
ext4 和 XFS 等文件系统会跟踪存储元数据,包括空闲空间、文件块位置、目录结构、权限设置等。分布式文件系统也需要一种方法来跟踪分布在多台机器上的文件位置、权限设置等。Hadoop 有一个名为 NameNode 的服务,用于维护集群的元数据。DeepSeek 的 3FS 有一个元数据服务,将其数据持久化到 FoundationDB 等键值存储中。

Above the filesystem sits the VFS. A close analogue in batch processing is a distributed filesystem’s protocol. Distributed filesystems must expose a protocol or interface so that batch processing systems can read and write files. This protocol acts as a pluggable interface: any DFS may be used so long as it implements the protocol. For example, Amazon S3’s API has been widely adopted by other storage systems such as MinIO, Cloudflare’s R2, Tigris, Backblaze’s B2, and many others. Batch processing systems with S3 support can use any of these storage systems.
在文件系统之上是 VFS。在批量处理中,与之对应的接近类比是分布式文件系统的协议。分布式文件系统必须暴露一个协议或接口,以便批量处理系统可以读取和写入文件。这个协议充当可插拔的接口:只要任何 DFS 实现了该协议,就可以使用。例如,Amazon S3 的 API 已被其他存储系统广泛采用,如 MinIO、Cloudflare 的 R2、Tigris、Backblaze 的 B2 以及其他许多系统。支持 S3 的批量处理系统可以使用这些任何存储系统。

Some DFSs implement POSIX-compliant filesystems that appear to the operating system’s VFS like any other filesystem. Filesystem in Userspace (FUSE) or the Network File System (NFS) protocol are often used to integrate into the VFS. NFS is perhaps the most well known distributed filesystem protocol. The protocol was originally developed to allow multiple clients to read and write data on a single server. More recently, filesystems such as AWS’s Elastic File System (EFS) and Archil provide NFS-compatible distributed filesystem implementations that are far more scalable. NFS clients still connect to one end point, but underneath, these systems communicate with distributed metadata services and data nodes to read and write data.
一些 DFS 实现了符合 POSIX 标准的文件系统,这些文件系统在操作系统的 VFS 中看起来与其他文件系统并无不同。通常使用 Filesystem in Userspace (FUSE) 或网络文件系统 (NFS) 协议将其集成到 VFS 中。NFS 或许是最著名的分布式文件系统协议。该协议最初开发是为了允许多个客户端在单个服务器上读取和写入数据。最近,像 AWS 的弹性文件系统 (EFS) 和 Archil 等文件系统提供了更可扩展的 NFS 兼容分布式文件系统实现。NFS 客户端仍然连接到一个端点,但在底层,这些系统通过分布式元数据服务和数据节点进行通信以读取和写入数据。

Many distributed filesystems are built on commodity hardware, which is less expensive but has higher failure rates than enterprise-grade hardware. In order to tolerate machine and disk failures, file blocks are replicated on multiple machines. Replication may mean simply several copies of the same data on multiple machines, as in Chapter 6, or an erasure coding scheme such as Reed–Solomon codes, which allows lost data to be recovered with lower storage overhead than full replication [10,11,12]. The techniques are similar to RAID, which provides redundancy across several disks attached to the same machine; the difference is that in a distributed filesystem, file access and replication are done over a conventional datacenter network without special hardware.
许多分布式文件系统基于商用硬件构建,这种硬件成本较低,但故障率高于企业级硬件。为了容忍机器和磁盘故障,文件块会在多台机器上进行复制。复制可能意味着在多台机器上存储相同数据的多份副本,如第 6 章所述,也可能是一种纠删码方案,如 Reed-Solomon 码,它允许以低于全复制的存储开销来恢复丢失的数据 [10, 11, 12]。这些技术与 RAID 相似,RAID 为连接到同一台机器的多个磁盘提供冗余;不同之处在于,在分布式文件系统中,文件访问和复制是通过传统数据中心网络进行的,无需特殊硬件。

Object Stores 对象存储#

Object storage services such as Amazon S3, Google Cloud Storage, Azure Blob Storage, and OpenStack Swift have become a popular alternative to distributed filesystems for batch processing jobs. In fact, the line between the two is somewhat blurry. As we saw in the previous section and “Databases backed by object storage”, Filesystem in Userspace (FUSE) drivers allow users to treat object stores such as S3 as a filesystem. Some DFS implementations such as JuiceFS and Ceph offer both object storage and filesystem APIs. However, their APIs, performance, and consistency guarantees are very different.
对象存储服务,如 Amazon S3、Google Cloud Storage、Azure Blob Storage 和 OpenStack Swift,已成为批处理作业中分布式文件系统的流行替代方案。事实上,这两者之间的界限有些模糊。正如我们在上一节和 “基于对象存储的数据库” 中所看到的,用户空间文件系统(FUSE)驱动程序允许用户将 S3 等对象存储视为文件系统。一些分布式文件系统(DFS)实现,如 JuiceFS 和 Ceph,同时提供对象存储和文件系统 API。然而,它们的 API、性能和一致性保证差异很大。

Each object in an object store has a URL such as s3://my-photo-bucket/2025/04/01/birthday.png. The host portion of the URL (my-photo-bucket) describes the bucket where objects are stored, and the part that follows is the object’s key (/2025/04/01/birthday.png in our example). A bucket has a globally unique name, and each object’s key must be unique within its bucket.
对象存储中的每个对象都有一个 URL,例如 s3://my-photo-bucket/2025/04/01/birthday.png 。URL 的主机部分( my-photo-bucket )描述了对象存储的存储桶,而后续部分是对象的键(在我们的示例中为 /2025/04/01/birthday.png )。存储桶具有全局唯一名称,并且每个对象的键在其存储桶内必须是唯一的。

Object are read using a get call and written using a put call. Unlike files, objects are immutable once written. To update an object, it must be fully rewritten using a put call, similarly to a key-value store. Azure Blob Storage and S3 Express One Zone support appends, but most other stores do not. There are no file handle APIs with functions like fopen and fseek.
对象通过 get 调用读取,通过 put 调用写入。与文件不同,对象一旦写入就是不可变的。要更新对象,必须使用 put 调用完全重写,类似于键值存储。Azure Blob 存储和 S3 Express One Zone 支持追加,但大多数其他存储不支持。没有文件句柄 API,也没有像 fopenfseek 这样的函数。

Keys often appear to have a directory structure, which is somewhat confusing. The path structure is simply a convention, and the slashes are a part of the object’s key. This convenion allows you to perform something similar to a directory listing by requesting a list of objects with a particular prefix. However, listing objects by prefix is different from a filesystem directory listing in two ways:
键通常看起来像目录结构,这有点令人困惑。路径结构只是一个约定,斜杠是对象键的一部分。这个约定允许你通过请求具有特定前缀的对象列表来执行类似于目录列表的操作。然而,按前缀列出对象与文件系统目录列表在两个方面有所不同:

  • A prefix list operation behaves like recursive ls -R call on a Unix system: it returns all objects that start with the prefix—objects in subpaths are included.
    前缀 list 操作行为类似于 Unix 系统上的递归 ls -R 调用:它返回所有以该前缀开头的对象 —— 子路径中的对象也包括在内。
  • Empty directories are not possible: if you were to remove all objects underneath s3://my-photo-bucket/2025/04/01, then 01 would no longer appear when we call list on s3://my-photo-bucket/2025/04.
    不可能有空目录:如果你删除 s3://my-photo-bucket/2025/04/01 下所有对象,那么当我们对 s3://my-photo-bucket/2025/04 调用 list 时, 01 将不再出现。

DFS implementations often support many common filesystem operations such as hard links, symbolic links, file locking, and atomic renames. Such features are missing from object stores. Linking and locks are typically not supported, while renames are non-atomic; they’re accomplished by copying the object to the new key, and then deleting the old object. If you want to rename a directory, you have to individually rename every object within it, since the directory name is a part of the key.
DFS 实现通常支持许多常见的文件系统操作,如硬链接、符号链接、文件锁定和原子重命名。这些功能在对象存储中缺失。链接和锁定通常不受支持,而重命名是非原子的;它们通过将对象复制到新键,然后删除旧对象来完成。如果你想要重命名一个目录,你必须单独重命名它里面的每个对象,因为目录名是键的一部分。

The key-value stores we discussed in Chapter 4 are optimized for small values (typically kilobytes) and frequent, low-latency reads/writes. In contrast, distributed filesystems and object stores are generally optimized for large objects (megabytes to gigabytes) and less frequent, larger reads. Recently, however, object stores have begun to add support for frequent and smaller reads/writes. For example, S3 Express One Zone now offers single-millisecond latency and a pricing model that is more similar to key-value stores.
我们在第 4 章讨论的键值存储针对小值(通常为千字节)和频繁的低延迟读写进行了优化。相比之下,分布式文件系统和对象存储通常针对大对象(兆字节到吉字节)和较少的较大读写进行了优化。然而,最近对象存储开始增加对频繁和小型读写的支持。例如,S3 Express One Zone 现在提供单毫秒延迟,并且定价模式与键值存储更加相似。

Another difference between distributed filesystems and object stores is that DFSes often allow computing tasks to be run on the machine that stores a copy of a particular file. This allows the task to read that file without having to send it over the network, which saves bandwidth if the executable code of the task is smaller than the file it needs to read. On the other hand, object stores usually keep storage and computation separate. That may use more bandwidth, but since modern datacenter networks are very fast, this is often acceptable.
分布式文件系统与对象存储的另一个区别在于,分布式文件系统通常允许在存储特定文件副本的机器上运行计算任务。这允许任务直接读取该文件,而无需通过网络传输,如果任务的执行代码小于其需要读取的文件大小,这将节省带宽。另一方面,对象存储通常将存储和计算分离。这可能使用更多带宽,但由于现代数据中心网络非常快,这通常是可以接受的。

Distributed Job Orchestration 分布式作业编排#

Our operating system analogy also applies to job orchestration. When you execute a Unix batch job, something needs to actually run the awk, sort, uniq, and head processes. Data needs to be transferred from one process’s output to another process’s input, memory must be allocated for each process, instructions from each process must be scheduled fairly and executed on the CPU, memory and I/O boundaries must be enforced, and so on. On a single machine, an operating system’s kernel is responsible for such work. In a distributed environment, this is the role of a job orchestrator.
我们的操作系统类比也适用于作业编排。当你执行一个 Unix 批处理作业时,需要有人实际运行 awksortuniqhead 进程。数据需要从一个进程的输出传输到另一个进程的输入,必须为每个进程分配内存,每个进程的指令必须公平调度并在 CPU 上执行,必须强制执行内存和 I / O 边界,等等。在单台机器上,操作系统的内核负责这些工作。在分布式环境中,这是作业编排器的角色。

Batch processing frameworks send a request to an orchestrator’s scheduler to run a job. Requests to start a job contain metadata such as:
批处理框架向协调器的调度器发送请求来运行一个作业。启动作业的请求包含以下元数据:

  • the number of tasks to execute,
    要执行的任务数量,
  • the amount of memory, CPU, and disk needed for each task,
    每个任务所需的内存、CPU 和磁盘量,
  • a job identifier, 作业标识符,
  • access credentials, 访问凭证,
  • job paramaters such as input and output data,
    作业参数,如输入和输出数据,
  • required hardware details such as GPUs or disk types, and
    所需硬件详细信息,如 GPU 或磁盘类型,以及
  • where the job’s executable code is located.
    任务可执行代码的位置。

Orchestrators such as Kubernetes and Hadoop YARN (Yet Another Resource Negotiator) [13] combine this information with cluster metadata to execute the job using the following components:
编排器,如 Kubernetes 和 Hadoop YARN(另一个资源协商器)[13] 将此信息与集群元数据结合,使用以下组件执行作业:

Task executors 任务执行者

An executor daemon such as YARN’s NodeManager or Kubernetes’s kubelet runs on each node in the cluster. Executors are responsible for running job tasks, sending heartbeats to signal their liveness, and tracking task status and resource allocation on the node. When a task-start request is sent to an executor, it retrieves the job’s executable code and runs a command to start the task. The executor then monitors the process until it finishes or fails, at which point it updates the task status metadata accordingly.
执行者守护进程,如 YARN 的 NodeManager 或 Kubernetes 的 kubelet,在每个集群节点上运行。执行者负责运行任务、发送心跳信号以表明其存活状态,以及跟踪节点上的任务状态和资源分配。当向执行者发送任务启动请求时,它会检索任务的可执行代码并运行命令以启动任务。执行者随后监控进程,直到其完成或失败,此时它会相应地更新任务状态元数据。

Many executors also work with the operating system to provide both security and performance isolation. YARN and Kubernetes both use Linux cgroups, for example. This prevents tasks from accessing data without permission, or from negatively affecting the performance of other tasks on the node by using excessive resources.
许多执行者还与操作系统合作,以提供安全性和性能隔离。例如,YARN 和 Kubernetes 都使用 Linux cgroups。这可以防止任务未经授权访问数据,或通过使用过多资源而影响节点上其他任务的性能。

Resource Manager 资源管理器

An orchestrator’s resource manager stores metadata about each node, including available hardware (CPUs, GPUs, memory, disks, and so on), task statuses, network location, node status, and other relevant information. Thus, the manager provides a global view of the cluster’s current state. The centralized nature of the resource manager can lead to both scalability and availability bottlenecks. YARN uses ZooKeeper and Kubernetes uses etcd to store cluster state (see “Coordination Services”).
编排器的资源管理器存储每个节点的元数据,包括可用的硬件(CPU、GPU、内存、磁盘等)、任务状态、网络位置、节点状态和其他相关信息。因此,该管理器提供了集群当前状态的总体视图。资源管理器的集中式特性可能导致可扩展性和可用性瓶颈。YARN 使用 ZooKeeper,而 Kubernetes 使用 etcd 来存储集群状态(参见 “协调服务”)。

Scheduler 调度器

Orchestrators usually have a centralized scheduler subsystem, which receives requests to start, stop, or check on the status of a job. For example, a scheduler might receive a request to start a job with 10 tasks using a specific Docker image on nodes that have a specific type of GPU. The scheduler uses the information from the request and state of the resource manager to determine which tasks to run on which nodes. The task executors are then informed of their assigned work and begin execution.
协调器通常有一个集中的调度子系统,该子系统接收启动、停止或检查作业状态的请求。例如,调度器可能会收到一个请求,要求在具有特定类型 GPU 的节点上使用特定的 Docker 镜像启动一个包含 10 个任务的作业。调度器使用请求中的信息以及资源管理器的状态来确定在哪些节点上运行哪些任务。然后,任务执行器会收到分配给它们的工作并开始执行。

Though each orchestrator uses different terminology, you will find these components in nearly all orchestration systems.
尽管每个协调器使用不同的术语,但你几乎会在所有编排系统中找到这些组件。

Note 注意#

Scheduling decisions sometimes require application-specific schedulers that can take into account particular requirements, such as auto-scaling read replicas when a certain query threshold is reached. The centralized scheduler and application-specific schedulers work together to determine how to best execute tasks. YARN refers to its sub-schedulers as ApplicationMasters, while Kubernetes calls them operators.
调度决策有时需要应用程序特定的调度器,这些调度器能够考虑特定的需求,例如在达到某个查询阈值时自动扩展读副本。集中式调度器和应用程序特定调度器协同工作,以确定如何最佳地执行任务。YARN 将其子调度器称为 ApplicationMasters,而 Kubernetes 则称它们为 operators。

Resource Allocation 资源分配#

Schedulers have a particularly challenging role in job orchestration: they must figure out how to best allocate the cluster’s limited resources amongst jobs with competing needs. Fundamentally, its decisions must balance fairness and efficiency.
调度器在作业编排中扮演着特别具有挑战性的角色:它们必须确定如何最佳地分配集群有限的资源给具有竞争需求的作业。从根本上说,其决策必须在公平性和效率之间取得平衡。

Imagine a small cluster with five nodes that has a total of 160 CPU cores available. The cluster’s scheduler receives two job requests, each wanting 100 cores to complete its work. What’s the best way to schedule the workload?
想象一个包含五个节点的小型集群,总共有 160 个 CPU 核心可用。集群的调度器收到两个作业请求,每个作业都需要 100 个核心来完成工作。如何调度工作负载才是最佳方式?

  • The scheduler could decide to run 80 tasks for each job, starting the remaining 20 tasks for each job as earlier tasks complete.
    调度器可以选择为每个作业运行 80 个任务,并在每个作业的其余 20 个任务开始时启动较早完成的任务。
  • The scheduler could run all of one job’s tasks, and begin running the second job’s tasks only when 100 cores are available, a strategy known as gang scheduling.
    调度器可以运行一个作业的所有任务,并且只有在 100 个核心可用时才开始运行第二个作业的任务,这种策略称为群组调度。
  • One job request comes before the other. The scheduler has to decide whether to allocate all 100 cores to that job, or hold some back in anticipation for future jobs.
    一个作业请求先于另一个作业。调度器必须决定是将所有 100 个核心分配给该作业,还是保留一些以备将来作业使用。

This is a very simple example, but we already see many difficult trade-offs. In the gang-scheduling scenario, for example, if the scheduler reserves CPU cores until all 100 are available at the same time, nodes will sit idle. The cluster’s resource utilization will drop and a deadlock might occur if other jobs also attempt to reserve CPU cores.
这是一个非常简单的例子,但我们已经看到了许多困难的权衡。例如,在群组调度场景中,如果调度器直到所有 100 个核心同时可用时才预留 CPU 核心,节点将处于空闲状态。集群的资源利用率将下降,如果其他作业也尝试预留 CPU 核心,可能会发生死锁。

On the other hand, if the scheduler simply waits for 100 cores to become available, other jobs might grab the cores in the meantime. The cluster might not have 100 cores available for a very long time, which leads to starvation. The scheduler could decide to preempt some of the first job’s tasks, killing them to make room for the second job. Task preemption decreases cluster efficiency as well, since the killed tasks will need to be restarted later and re-run.
另一方面,如果调度器只是等待 100 个核心变得可用,其他作业可能会在此时抢占这些核心。集群可能很长时间都没有 100 个核心可用,这会导致饥饿。调度器可以选择抢占第一个作业的一些任务,杀死它们以给第二个作业腾出空间。任务抢占也会降低集群的效率,因为被杀死的任务稍后需要重新启动并重新运行。

Now imagine a scheduler that must make allocation decisions for hundreds or even millions of such job requests. Finding an optimal solution seems intractable. In fact, the problem is NP-hard, alich means that it is prohibitively slow to calculate an optimal solution for all but the smallest examples [14,15].
现在想象一个调度器必须为数百甚至数百万个这样的作业请求做出分配决策。找到一个最优解似乎是不可能的。事实上,这个问题是 NP-hard,这意味着对于最小的例子之外的所有例子,计算最优解都是非常慢的 [14, 15]。

In practice, schedulers therefore use heuristics to make non-optimal but reasonable decisions. Several algorithms are commonly used, including first-in first-out (FIFO), dominant resource fairness (DRF), priority queues, capacity or quota-based scheduling, and various bin-packing algorithms. The details for such algorithms are beyond the scope of this book, but they’re a fascinating area of research.
因此,在实际中,调度器使用启发式算法来做出非最优但合理的决策。常用的算法包括先入先出(FIFO)、主导资源公平(DRF)、优先队列、基于容量或配额的调度以及各种装箱算法。这些算法的细节超出了本书的范围,但它们是一个引人入胜的研究领域。

Scheduling Workflows 调度工作流#

The Unix tools example at the start of this chapter involved a chain of several commands, connected by Unix pipes. The same pattern arises in distributed batch processes: often the output from one job needs to become the input to one or more other jobs, and each job may have several inputs that are produced by other jobs. This is called a workflow or directed acyclic graph (DAG) of jobs.
本章开头提到的 Unix 工具示例涉及由 Unix 管道连接的一系列命令。在分布式批处理中也会出现相同的模式:一个作业的输出通常需要成为一个或多个其他作业的输入,而每个作业可能都有由其他作业产生的多个输入。这被称为工作流或作业的有向无环图(DAG)。

Note 注意#

In “Durable Execution and Workflows” we saw workflow engines that offer durable execution of a sequence of steps, typically performing RPCs. In the context of batch processing, “workflow” has a different meaning: it’s a sequence of batch processes, each taking input data and producing output data, but normally not making RPCs to external services.
在 “持久执行和工作流” 中,我们看到了提供一系列步骤持久执行的工作流引擎,通常执行 RPC。在批处理的上下文中,“工作流” 具有不同的含义:它是一系列批处理过程,每个过程都接受输入数据并产生输出数据,但通常不会向外部服务发起 RPC。

There are several reasons why a workflow of multiple jobs might be needed:
需要多个作业组成工作流的原因有几个:

  • If the output of one job needs to become the input to several other jobs, which are maintained by different teams, it’s best for the first job to first write its output to a location where all the other jobs can read it. Those consuming jobs can then be scheduled to run every time that data has been updated, or on some other schedule.
    如果某个作业的输出需要成为多个由不同团队维护的作业的输入,最好让第一个作业先将输出写入一个所有其他作业都能读取的位置。这些消费作业随后可以按数据更新时或按其他计划来调度运行。
  • You might want to transfer data from one processing tool to another. For example, a Spark job might output its data to HDFS, then a Python script might trigger a Trino SQL query that does further processing on the HDFS files and outputs to S3.
    你可能需要将数据从一个处理工具转移到另一个。例如,一个 Spark 作业可能将数据输出到 HDFS,然后一个 Python 脚本可能触发一个 Trino SQL 查询,该查询对 HDFS 文件进行进一步处理并将结果输出到 S3。
  • Some data pipelines internally require multiple processing stages. For example, if one stage needs to shard the data by one key, and the next stage needs to shard by a different key, the first stage can output data sharded in the way that is required by the second stage.
    一些数据管道在内部需要多个处理阶段。例如,如果某个阶段需要按一个键对数据进行分片,而下一个阶段需要按不同的键进行分片,第一个阶段可以按第二个阶段所需的分片方式输出数据。

In the Unix tools example, the pipe that connects the output of one command to the input of another uses only a small in-memory buffer, and doesn’t write the data it to a file. If that buffer fills up, the producing process needs to wait until the consuming process has read some data from the buffer before it can output more—a form of backpressure. Spark, Flink, and other batch execution engines support a similar model where the output of one task is directly passed to another task (over the network if the tasks are running on different machines).
在 Unix 工具示例中,连接一个命令输出到另一个命令输入的管道仅使用一个小的内存缓冲区,并且不会将数据写入文件。如果该缓冲区满了,生产进程需要等待消费进程从缓冲区中读取一些数据,然后才能输出更多数据 —— 这是一种背压形式。Spark、Flink 和其他批处理执行引擎支持类似的模型,其中一个任务的输出直接传递给另一个任务(如果任务在不同的机器上运行,则通过网络传递)。

However, in a workflow it is more usual for one job to write its output to a distributed filesystem or object store, and for the next job to read it from there. This decouples the jobs from each other, allowing them to run at different times. If a job has several inputs, a workflow scheduler typically waits until all of the jobs that produce its inputs have completed successfully before running the job that consumes those inputs.
然而,在一个工作流中,通常一个作业将其输出写入分布式文件系统或对象存储,而下一个作业从那里读取。这种做法使作业彼此解耦,允许它们在不同的时间运行。如果一个作业有多个输入,工作流调度器通常会等待所有产生其输入的作业都成功完成后,再运行消费这些输入的作业。

Schedulers found in orchestration frameworks such as YARN’s ResourceManager or Spark’s built-in scheduler do not manage entire workflows; they do scheduling on a per-job basis. To handle these dependencies between job executions, various workflow schedulers have been developed, including Airflow, Dagster, and Prefect. Workflow schedulers have management features that are useful when maintaining a large collection of batch jobs. Workflows consisting of 50 to 100 jobs are common in many data pipelines, and in a large organization, many different teams may be running different jobs or workflows that read each other’s output. Tool support is important for managing such complex dataflows.
在编排框架中(如 YARN 的 ResourceManager 或 Spark 的内置调度器)找到的调度器并不管理整个工作流;它们是按作业进行调度的。为了处理作业执行之间的依赖关系,已经开发了各种工作流调度器,包括 Airflow、Dagster 和 Prefect。工作流调度器具有在维护大量批量作业时非常有用的管理功能。在许多数据管道中,由 50 到 100 个作业组成的工作流很常见,在一个大型组织中,许多不同的团队可能正在运行不同的作业或工作流,这些作业或工作流会读取彼此的输出。对于管理这种复杂的数据流,工具支持非常重要。

Handling Faults 处理故障#

Batch jobs often run for long periods of time. Long-running jobs with many parallel tasks are likely to experience at least one task failure along the way. As discussed in “Hardware and Software Faults” and “Unreliable Networks”, there are many reasons why this could happen, including hardware faults (especially on commodity hardware), or network interruptions.
批处理作业通常运行很长时间。长时间运行且包含许多并行任务的作业很可能在过程中至少遇到一次任务失败。正如在 “硬件和软件故障” 和 “不可靠网络” 中讨论的,这种情况发生的原因有很多,包括硬件故障(尤其是在商用硬件上)或网络中断。

Another reason why a task might not finish running is that the scheduler may intentionally preempt (kill) it. Preemption is particularly useful if you have multiple priority levels: low-priority tasks that are cheaper to run, and high-priority tasks that cost more. Low-priority tasks can run whenever there is spare computing capacity, but they run the risk of being preempted at any moment if a higher-priority task arrives. Such cheaper, low-priority virtual machines are called spot instances on Amazon EC2 and Azure, and preemptible instances on Google Cloud [16].
任务可能无法完成运行的其他原因是调度器可能会有意地抢占(终止)它。如果存在多个优先级级别,抢占特别有用:低优先级任务运行成本较低,而高优先级任务成本较高。低优先级任务可以在任何时候运行,只要存在空闲计算能力,但如果出现更高优先级任务,它们随时可能被抢占。这种成本较低、低优先级的虚拟机在亚马逊 EC2 和 Azure 上称为 spot instances,在谷歌云上称为 preemptible instances [16]。

Since batch processing is not time-sensitive, it is well suited for using low-priority tasks and spot instances to reduce the cost of running jobs. Essentially, those jobs can use spare computing resources that would otherwise be idle, and thereby increase the utilization of the cluster. However, this also means that those tasks are more likely to be killed by the scheduler: preemptions can be much more frequent than hardware faults [17].
由于批处理对时间不敏感,因此非常适合使用低优先级任务和现货实例来降低运行作业的成本。本质上,这些作业可以使用原本闲置的备用计算资源,从而提高集群的利用率。然而,这也意味着这些任务更有可能被调度器终止:抢占可能比硬件故障更频繁 [17]。

Since batch jobs regenerate their output from scratch every time they are run, task failures are easier to handle than in online systems: the system can just delete the partial output from the failed execution, and schedule it to run again on another machine. It would be wasteful to rerun the entire job due to a single task failure, though. MapReduce and its successors therefore keep the execution of parallel tasks independent from each other, so that they can retry work at the granularity of an individual task [3].
由于批处理作业每次运行时都会从头开始重新生成输出,因此处理任务失败比在线系统更容易:系统只需删除失败执行的部分输出,并安排它在另一台机器上重新运行。不过,由于单个任务失败而重新运行整个作业是浪费的。因此,MapReduce 及其后继者将并行任务的执行保持相互独立,以便它们可以在单个任务的粒度上重试工作 [3]。

Fault tolerance is trickier when the output of one task becomes the input to another task as part of a workflow. MapReduce solves this by always writing such intermediate data back to the distributed filesystem, and waiting for the writing task to complete successfully before allowing other tasks to read the data. This works, even in an environment where preemption is common, but it means a lot of writes to the DFS, which can be inefficient.
当某个任务的输出成为另一个任务的工作流输入时,容错性会变得更加复杂。MapReduce 通过始终将此类中间数据写回分布式文件系统,并等待写入任务成功完成才允许其他任务读取数据来解决这一问题。这种方法即使在预占资源常见的环境中也能有效工作,但这意味着对 DFS 进行了大量写入操作,可能会降低效率。

Spark keeps intermedate data in memory and only writes the final result to the DFS. It also keeps track of how the intermediate data was computed, allowing Spark to recompute it in case it is lost [18]. Flink uses a different approach based on periodically checkpointing a snapshot of tasks [19]. We will return to this topic in “Dataflow Engines”.
Spark 将中间数据保存在内存中,仅将最终结果写入 DFS。它还跟踪中间数据的计算方式,以便在数据丢失时能够重新计算 [18]。Flink 采用不同的方法,基于周期性地对任务快照进行检查点 [19]。我们将在 “数据流引擎” 部分再次讨论这一话题。

Batch Processing Models 批处理模型#

We have seen how batch jobs are scheduled in a distributed environment. Let us now turn our attention to how batch processing frameworks actually process data. The two most common models are MapReduce and dataflow engines. Although dataflow engines have largely replaced MapReduce in practice, it is useful to understand how MapReduce works, since it influenced many modern batch processing frameworks.
我们已经看到了分布式环境中如何调度批处理作业。现在,让我们来关注批处理框架实际上是如何处理数据的。最常见的两种模型是 MapReduce 和数据流引擎。尽管在实际应用中数据流引擎已经很大程度上取代了 MapReduce,但了解 MapReduce 的工作原理仍然很有用,因为它影响了许多现代批处理框架。

MapReduce and dataflow engines have evolved to support multiple programming models including low-level programmatic APIs, relational query languages, and DataFrame APIs. A variety of options enable application engineers, analytics engineers, business analysts, and even non-technical employees to process company data for various use cases, which we’ll discuss in “Batch Use Cases”.
MapReduce 和数据流引擎已经发展到支持多种编程模型,包括低级编程 API、关系查询语言和 DataFrame API。各种选项使应用程序工程师、分析工程师、商业分析师甚至非技术人员能够处理公司数据以满足各种用例,我们将在 “批处理用例” 中讨论这些用例。

MapReduce#

The pattern of data processing in MapReduce is very similar to the web server log analysis example in “Simple Log Analysis”:
MapReduce 中的数据处理模式与 “简单日志分析” 中的 Web 服务器日志分析示例非常相似:

  1. Read a set of input files, and break it up into records. In the web server log example, each record is one line in the log (that is, \n is the record separator). In Hadoop’s MapReduce, the input file is stored in a distributed filesystem like HDFS or an object store like S3. Various file formats are used, such as Apache Parquet (a columnar format, see “Column-Oriented Storage”) or Apache Avro (a row-based format, see “Avro”).
    读取一组输入文件,并将其拆分为记录。在 Web 服务器日志示例中,每条记录是日志中的一行(即 \n 是记录分隔符)。在 Hadoop 的 MapReduce 中,输入文件存储在分布式文件系统(如 HDFS)或对象存储(如 S3)中。使用多种文件格式,例如 Apache Parquet(列式格式,参见 “列式存储”)或 Apache Avro(行式格式,参见 “Avro”)。
  2. Call the mapper function to extract a key and value from each input record. In the Unix tool example, the mapper function is awk '{print $7}': it extracts the URL ($7) as the key, and leaves the value empty.
    调用映射函数从每条输入记录中提取键和值。在 Unix 工具示例中,映射函数是 awk '{print $7}' :它提取 URL( $7 )作为键,并将值留空。
  3. Sort all of the key-value pairs by key. In the log example, this is done by the first sort command.
    按键对所有键值对进行排序。在日志示例中,这是通过第一个 sort 命令完成的。
  4. Call the reducer function to iterate over the sorted key-value pairs. If there are multiple occurrences of the same key, the sorting has made them adjacent in the list, so it is easy to combine those values without having to keep a lot of state in memory. In the Unix tool example, the reducer is implemented by the command uniq -c, which counts the number of adjacent records with the same key.
    调用 reducer 函数来遍历排序后的键值对。如果存在相同键的多个出现,排序使它们在列表中相邻,因此可以轻松合并这些值,而无需在内存中保持大量状态。在 Unix 工具示例中,reducer 由命令 uniq -c 实现,它统计具有相同键的相邻记录的数量。

Those four steps can be performed by one MapReduce job. Steps 2 (map) and 4 (reduce) are where you write your custom data processing code. Step 1 (breaking files into records) is handled by the input format parser. Step 3, the sort step, is implicit in MapReduce—you don’t have to write it, because the output from the mapper is always sorted before it is given to the reducer. This sorting step is a foundational batch processing algorithm, which we’ll revisit in “Shuffling Data”.
这四个步骤可以通过一个 MapReduce 作业来完成。步骤 2(map)和步骤 4(reduce)是你编写自定义数据处理代码的地方。步骤 1(将文件拆分为记录)由输入格式解析器处理。步骤 3, sort 步骤,在 MapReduce 中是隐式的 —— 你不必编写它,因为 mapper 的输出在传递给 reducer 之前总是排序的。这个排序步骤是基础批处理算法,我们将在 “Shuffling Data” 中再次探讨。

To create a MapReduce job, you need to implement two callback functions, the mapper and reducer, which behave as follow:
要创建一个 MapReduce 作业,你需要实现两个回调函数,mapper 和 reducer,它们的行为如下:

Mapper

The mapper is called once for every input record, and its job is to extract the key and value from the input record. For each input, it may generate any number of key-value pairs (including none). It does not keep any state from one input record to the next, so each record is handled independently.
Mapper 为每个输入记录调用一次,其任务是从输入记录中提取键和值。对于每个输入,它可以生成任意数量的键值对(包括零个)。它不会保留从一个输入记录到下一个输入记录的状态,因此每个记录都是独立处理的。

Reducer

The MapReduce framework takes the key-value pairs produced by the mappers, collects all the values belonging to the same key, and calls the reducer with an iterator over that collection of values. The reducer can produce output records (such as the number of occurrences of the same URL).
MapReduce 框架接收由 mappers 生成的键值对,收集属于同一键的所有值,并使用一个迭代器调用 reducer 来处理这些值集合。reducer 可以生成输出记录(例如相同 URL 出现的次数)。

In the web server log example, we had a second sort command in step 5, which ranked URLs by number of requests. In MapReduce, if you need a second sorting stage, you can implement it by writing a second MapReduce job and using the output of the first job as input to the second job. Viewed like this, the role of the mapper is to prepare the data by putting it into a form that is suitable for sorting, and the role of the reducer is to process the data that has been sorted.
在 Web 服务器日志示例中,我们在第 5 步有一个第二个 sort 命令,该命令按请求次数对 URL 进行排序。在 MapReduce 中,如果你需要第二个排序阶段,可以通过编写第二个 MapReduce 作业,并将第一个作业的输出作为第二个作业的输入来实现。从这个角度来看,mapper 的作用是将数据准备成适合排序的形式,而 reducer 的作用是处理已排序的数据。

Implementing a complex processing job using the raw MapReduce APIs is actually quite hard and laborious—for instance, any join algorithms used by the job would need to be implemented from scratch [20]. MapReduce is also quite slow compared to more modern batch processors. One reason is that its file-based I/O prevents job pipelining, i.e., processing output data in a downstream job before the upstream job is complete.
使用原始的 MapReduce API 实现复杂的处理任务实际上非常困难和繁琐 —— 例如,任务中使用的任何连接算法都需要从头实现 [20]。与更现代的批处理器相比,MapReduce 也相当慢。一个原因是其基于文件的 I / O 阻止了任务流水线化,即在上游任务完成之前,在下游任务中处理输出数据。

Dataflow Engines 数据流引擎#

In order to fix some of MapReduce’s problems, several new execution engines for distributed batch computations were developed, the most well known of which are Spark [18,21] and Flink [19]. There are various differences in the way they are designed, but they have one thing in common: they handle an entire workflow as one job, rather than breaking it up into independent subjobs.
为了解决 MapReduce 的一些问题,开发了几种新的分布式批处理计算执行引擎,其中最著名的是 Spark [18,21] 和 Flink [19]。它们在设计方式上存在各种差异,但有一个共同点:它们将整个工作流作为一个任务来处理,而不是将其拆分成独立的子任务。

Since they explicitly model the flow of data through several processing stages, these systems are known as dataflow engines. Like MapReduce, they support a low-level API that repeatedly calls a user-defined function to process one record at a time, but they also offer higher-level operators such as join and group by. They parallelize work by sharding inputs, and they copy the output of one task over the network to become the input to another task. Unlike in MapReduce, operators need not take the strict roles of alternating map and reduce, but instead can be assembled in more flexible ways.
由于它们明确地模拟了数据通过多个处理阶段的流动,这些系统被称为数据流引擎。与 MapReduce 类似,它们支持一个低级 API,该 API 反复调用用户定义的函数来逐条处理记录,但它们也提供了更高级的操作符,如 join 和 group by。它们通过分片输入来并行化工作,并将一个任务的结果通过网络复制到另一个任务作为输入。与 MapReduce 不同,操作符不必严格交替扮演 map 和 reduce 的角色,而是可以更灵活地组合。

These dataflow APIs generally use relational-style building blocks to express a computation: joining datasets on the value of some field; grouping tuples by key; filtering by some condition; and aggregating tuples by counting, summing, or other functions. Internally, these operations are implemented using the shuffle algorithms that we discuss in the next section.
这些数据流 API 通常使用关系式构建块来表达计算:基于某个字段的值来连接数据集;按键对元组进行分组;根据某些条件进行过滤;以及通过计数、求和或其他函数对元组进行聚合。在内部,这些操作使用我们在下一节讨论的洗牌算法来实现。

This style of processing engine is based on research systems like Dryad [22] and Nephele [23], and it offers several advantages compared to the MapReduce model:
这种处理引擎的风格基于 Dryad [22] 和 Nephele [23] 等研究系统,与 MapReduce 模型相比,它提供了几个优势:

  • Expensive work such as sorting need only be performed in places where it is actually required, rather than always happening by default between every map and reduce stage.
    昂贵的操作,如排序,只需要在实际需要的地方执行,而不是默认在每次 map 和 reduce 阶段之间都执行。
  • When there are several operators in a row that don’t change the sharding of the dataset (such as map or filter), they can be combined into a single task, reducing data copying overheads.
    当一行中有多个不改变数据集分区的操作符(如 map 或 filter)时,它们可以组合成一个单独的任务,从而减少数据复制的开销。
  • Because all joins and data dependencies in a workflow are explicitly declared, the scheduler has an overview of what data is required where, so it can make locality optimizations. For example, it can try to place the task that consumes some data on the same machine as the task that produces it, so that the data can be exchanged through a shared memory buffer rather than having to copy it over the network.
    由于工作流中的所有连接和数据依赖关系都是明确声明的,调度器可以了解在何处需要哪些数据,因此可以进行位置优化。例如,它可以尝试将消耗某些数据的任务放置在与产生这些数据的任务相同的机器上,以便通过共享内存缓冲区交换数据,而不是通过网络复制数据。
  • It is usually sufficient for intermediate state between operators to be kept in memory or written to local disk, which requires less I/O than writing it to a distributed filesystem or object store (where it must be replicated to several machines and written to disk on each replica). MapReduce already uses this optimization for mapper output, but dataflow engines generalize the idea to all intermediate state.
    通常,将操作符之间的中间状态保存在内存中或写入本地磁盘就足够了,这比将其写入分布式文件系统或对象存储(在那里它必须复制到多台机器并在每个副本上写入磁盘)所需的 I / O 要少。MapReduce 已经对 mapper 输出使用了这种优化,但数据流引擎将这一概念推广到所有中间状态。
  • Operators can start executing as soon as their input is ready; there is no need to wait for the entire preceding stage to finish before the next one starts.
    操作员可以在输入就绪后立即开始执行;无需等待前一个阶段完全结束再开始下一个阶段。
  • Existing processes can be reused to run new operators, reducing startup overheads compared to MapReduce (which launches a new JVM for each task).
    现有进程可以用于运行新的操作符,与 MapReduce(为每个任务启动一个新的 JVM)相比,这减少了启动开销。

You can use dataflow engines to implement the same computations as MapReduce workflows, and they usually execute significantly faster due to the optimizations described here.
您可以使用数据流引擎来实现与 MapReduce 工作流相同的计算,并且由于这里描述的优化,它们通常执行得更快。

Shuffling Data 数据混洗#

We saw that both the Unix tools example at the beginning of the chapter and MapReduce are based on sorting. Batch processors need to be able to sort datasets petabytes in size, which are too large to fit on a single machine. They therefore require a distributed sorting algorithm where both the input and the output is sharded. Such an algorithm is called a shuffle.
我们看到,本章开头处的 Unix 工具示例和 MapReduce 都基于排序。批处理程序需要能够对大小达到 petabytes 的数据集进行排序,这些数据集太大无法放在单台机器上。因此,它们需要一个分布式排序算法,其中输入和输出都被分片。这种算法被称为混洗。

Shuffle is not random 混洗并非随机#

The term shuffle is confusing. When you shuffle a deck of cards, you end up with a random order. In contrast, the shuffle we’re talking about here produces a sorted order, with no randomness.
术语 "shuffle" 令人困惑。当你洗一副牌时,你会得到一个随机顺序。相比之下,我们这里所说的 shuffle 会产生一个有序的顺序,没有任何随机性。

Shuffling is a foundational algorithm for batch processors, where it is used for joins and aggregations. MapReduce, Spark, Flink, Daft, Dataflow, and BigQuery [24] all implement scalable and performant shuffle algorithms in order to handle large datasets. We’ll use the shuffle in Hadoop MapReduce [25] for illustration purposes, but the concepts in this section translate to other systems as well.
Shuffle 是批处理程序的基础算法,用于连接和聚合。MapReduce、Spark、Flink、Daft、Dataflow 和 BigQuery [24] 都实现了可扩展且高效的 shuffle 算法,以处理大型数据集。我们将使用 Hadoop MapReduce [25] 进行说明,但本节中的概念也适用于其他系统。

Figure 11-1 shows the dataflow in a MapReduce job. We assume that the input to the job is sharded, and the shards are labelled m 1, m 2, and m 3. For example, each shard may be a separate file on HDFS or a separate object in an object store, and all the shards belonging to the same dataset are grouped into the same HDFS directory or have the same key prefix in an object store bucket.
图 11-1 显示了 MapReduce 作业中的数据流。我们假设作业的输入是分片的,并且分片标记为 m1、m2 和 m3。例如,每个分片可能是在 HDFS 上的一个单独文件,或对象存储中的一个单独对象,属于同一数据集的所有分片被分组到同一个 HDFS 目录中,或在对象存储的存储桶中具有相同的键前缀。

ddia 1101

The framework starts a separate map task for each input shard. A task reads its assigned file, passing one record at a time to the mapper callback. The reduce side of the computation is also sharded. While the number of map tasks is determined by the number of input shards, the number of reduce tasks is configured by the job author (it can be different from the number of map tasks).
框架为每个输入分片启动一个单独的 map 任务。一个任务读取分配给它的文件,每次将一条记录传递给 mapper 回调。计算过程的 reduce 部分也是分片的。虽然 map 任务的数量由输入分片的数量决定,但 reduce 任务的数量由作业作者配置(它可以与 map 任务的数量不同)。

The output of the mapper consists of key-value pairs, and the framework needs to ensure that if two different mappers output the same key, those key-value pairs end up being processed by the same reducer task. To achieve this, each mapper creates a separate output file on its local disk for every reducer (for example, the file m 1, r 2 in Figure 11-1 is the file created by mapper 1 containing the data destined for reducer 2). When the mapper outputs a key-value pair, a hash of the key typically determines which reducer file it is written to (similarly to “Sharding by Hash of Key”).
mapper 的输出由键值对组成,框架需要确保如果两个不同的 mapper 输出相同的键,那么这些键值对最终会被同一个 reducer 任务处理。为了实现这一点,每个 mapper 为其每个 reducer 在本地磁盘上创建一个单独的输出文件(例如,图 11-1 中的文件 m1,r2 是由 mapper 1 创建的,包含分配给 reducer 2 的数据)。当 mapper 输出一个键值对时,键的哈希值通常决定了它被写入哪个 reducer 文件(类似于 “按键哈希分片”)。

While a mapper is writing these files, it also sorts the key-value pairs within each file. This can be done using the techniques we saw in “Log-Structured Storage”: batches of key-value pairs are first collected in a sorted data structure in memory, then written out as sorted segment files, and smaller segment files are progressively merged into larger ones.
当 mapper 正在写入这些文件时,它还会对每个文件中的键值对进行排序。这可以使用我们在 “日志结构化存储” 中看到的技术来完成:首先将键值对批次收集到内存中的排序数据结构中,然后作为排序的段文件写入,较小的段文件会逐步合并成更大的文件。

After each mapper finishes, reducers connect to it and copy the appropriate file of sorted key-value pairs to their local disk. Once the reduce task has its share of the output from all of the mappers, it merges these files together, preserving the sort order, mergesort-style. Key-value pairs with the same key are now consecutive, even if they came from different mappers. The reducer function is then called once per-key, each time with an iterator that returns all the values for that key.
每个 mapper 完成后,reducer 会连接到它,并将排序好的键值对文件复制到本地磁盘。一旦 reduce 任务获取了所有 mapper 的输出数据,它会将这些文件合并在一起,保持排序顺序,类似于归并排序的方式。具有相同键的键值对现在会连续出现,即使它们来自不同的 mapper。然后,reducer 函数会针对每个键调用一次,每次调用都会传递一个迭代器,该迭代器返回该键的所有值。

Any records output by the reducer function are sequentially written to a file, with one file per reduce task. These files (r 1, r 2, r 3 in Figure 11-1) become the shards of the job’s output dataset, and they are written back to the distributed filesystem or object store.
任何由 reducer 函数输出的记录都会按顺序写入一个文件,每个 reduce 任务一个文件。这些文件(图 11-1 中的 r 1、r 2、r 3)成为作业输出数据集的分片,并写回到分布式文件系统或对象存储中。

Though MapReduce executes the shuffle step between its map and reduce steps, modern dataflow engines and cloud data warehouses are more sophisticated. Systems such as BigQuery have optimized their shuffle algorithms to keep data in memory and to write data to external sorting services [24]. Such services speed up shuffling and replicate shuffled data to provide resilience.
尽管 MapReduce 在其 map 和 reduce 步骤之间执行 shuffle 步骤,但现代数据流引擎和云数据仓库更为复杂。BigQuery 等系统优化了它们的 shuffle 算法,以将数据保留在内存中,并将数据写入外部排序服务 [24]。这些服务加速了 shuffle 过程,并将 shuffle 后的数据复制以提供容错能力。

JOIN and GROUP BYJOIN 和 GROUP BY#

Let’s look at how sorted data simplifies distributed joins and aggregations. We’ll continue with MapReduce for illustration purposes, though these concepts apply to most batch processing systems.
让我们看看排序数据如何简化分布式连接和聚合。我们将继续使用 MapReduce 进行说明,尽管这些概念适用于大多数批处理系统。

A typical example of a join in a batch job is illustrated in Figure 11-2. On the left is a log of events describing the things that logged-in users did on a website (known as activity events or clickstream data), and on the right is a database of users. You can think of this example as being part of a star schema (see “Stars and Snowflakes: Schemas for Analytics”): the log of events is the fact table, and the user database is one of the dimensions.
在批处理作业中,一个典型的连接示例如图 11-2 所示。左侧是一个事件日志,描述了登录用户在网站上的行为(称为活动事件或点击流数据),右侧是一个用户数据库。你可以将这个示例视为星形模式(参见 “星型和雪花:分析用模式”)的一部分:事件日志是事实表,用户数据库是维度之一。

ddia 1102

If you want to perform an analysis of the activity events that takes into account information from the user database (for example, find out whether certain pages are more popular with younger or older users, using the date of birth field in the user profile), you need to compute a join between these two tables. How would you compute that join, assuming both tables are so large that they have to be sharded?
如果你想要对活动事件进行分析,并考虑用户数据库中的信息(例如,通过用户资料中的出生日期字段,找出某些页面是否更受年轻或年长用户欢迎),你需要在这两个表之间计算一个连接。假设这两个表都很大,必须进行分片,你会如何计算这个连接?

You can use the fact that in MapReduce, the shuffle brings together all the key-value pairs with the same key to the same reducer, no matter which shard they were on originally. Here, the user ID can serve as the key. You can therefore write a mapper that goes over the user activity events, and emits page view URLs keyed by user ID, as illustrated in Figure 11-3. Another mapper goes over the user database row by row, extracting the user ID as the key and the user’s date of birth as the value.
你可以利用 MapReduce 中的 shuffle 特性,该特性会将具有相同键的所有键值对集中到同一个 reducer,无论它们最初位于哪个分片。在这里,用户 ID 可以作为键。因此,你可以编写一个 mapper 来遍历用户活动事件,并按用户 ID 作为键发出页面浏览 URL,如图 11-3 所示。另一个 mapper 逐行遍历用户数据库,提取用户 ID 作为键,以及用户的出生日期作为值。

ddia 1103

The shuffle then ensures that a reducer function can access a particular user’s date of birth and all of that user’s page view events at the same time. The MapReduce job can even arrange the records to be sorted such that the reducer always sees the record from the user database first, followed by the activity events in timestamp order—this technique is known as a secondary sort [25].
shuffle 确保 reducer 函数可以同时访问特定用户的出生日期以及该用户的所有页面浏览事件。MapReduce 作业甚至可以安排记录的排序方式,使得 reducer 总是先看到来自用户数据库的记录,然后按时间戳顺序看到活动事件 —— 这种技术称为二次排序 [25]。

The reducer can then perform the actual join logic easily. The first value is expected to be the date of birth, which the reducer stores in a local variable. It then iterates over the activity events with the same user ID, outputting each viewed URL along with the viewer’s date of birth. Since the reducer processes all of the records for a particular user ID in one go, it only needs to keep one user record in memory at any one time, and it never needs to make any requests over the network. This algorithm is known as a sort-merge join, since mapper output is sorted by key, and the reducers then merge together the sorted lists of records from both sides of the join.
然后,reducer 可以轻松地执行实际的连接逻辑。第一个值预期是出生日期,reducer 将其存储在本地变量中。接着,它遍历具有相同用户 ID 的活动事件,输出每个查看的 URL 以及查看者的出生日期。由于 reducer 一次性处理特定用户 ID 的所有记录,因此它只需要在任何时候保持一个用户记录在内存中,并且永远不会发起任何网络请求。这种算法被称为排序合并连接,因为 mapper 的输出按键排序,然后 reducer 将连接两边的记录排序后的列表合并在一起。

The next MapReduce job in the workflow can then calculate the distribution of viewer ages for each URL. To do so, the job would first shuffle the data using the URL as key. Once sorted, the reducers would then iterate over all the page views (with viewer birth date) for a single URL, keep a counter for the number of views by each age group, and increment the appropriate counter for each page view. This way you can implement a group by operation and aggregation.
工作流中的下一个 MapReduce 任务可以计算每个 URL 的观众年龄分布。为此,该任务首先会使用 URL 作为键来对数据进行混洗。一旦数据被排序,每个 reducer 就会遍历单个 URL 的所有页面浏览量(包含观众出生日期),为每个年龄组维护一个计数器,并为每个页面浏览量递增相应的计数器。这样你就可以实现分组操作和聚合。

Query languages 查询语言#

Over the years, execution engines for distributed batch processing have matured. By now, the infrastructure has become robust enough to store and process many petabytes of data on clusters of over 10,000 machines. As the problem of physically operating batch processes at such scale has been considered more or less solved, attention has turned to improving the programming model.
多年来,分布式批处理执行引擎已经成熟。如今,基础设施已经足够健壮,能够在超过 10,000 台机器的集群上存储和处理许多 PB 级数据。随着在如此规模下物理操作批处理的问题在很大程度上得到解决,人们的注意力转向了改进编程模型。

MapReduce, dataflow engines, and cloud data warehouses have all embraced SQL as the lingua franca for batch processing. It’s a natural fit: legacy data warehouses used SQL, data analytics and ETL tools already support SQL, and all developers and analysts know it.
MapReduce、数据流引擎和云数据仓库都采用了 SQL 作为批处理的通用语言。这是一种自然的选择:传统的数据仓库使用 SQL,数据分析和 ETL 工具已经支持 SQL,所有开发人员和分析师都熟悉它。

Besides the obvious advantage of requiring less code than handwritten MapReduce jobs, these query language interfaces also allow interactive use, in which you write analytical queries and run them from a terminal or GUI. This style of interactive querying is an efficient and natural way for business analytics, product managers, sales and finance teams, and others to explore data in a batch processing environment. Though not a classic form of batch processing, SQL support has made exploratory queries suitable for distributed batch processing systems.
除了手工编写 MapReduce 任务所需代码更少这一明显优势外,这些查询语言接口还允许交互式使用,你可以编写分析查询并在终端或 GUI 中运行它们。这种交互式查询方式是业务分析、产品经理、销售和财务团队等在批量处理环境中探索数据的有效且自然的方式。尽管不是经典的批量处理形式,但 SQL 支持已使探索性查询适用于分布式批量处理系统。

High-level query languages not only make the humans using the system more productive, but they also improve the job execution efficiency at a machine level. As we saw in “Cloud Data Warehouses”, query engines are responsible for converting SQL queries into batch jobs to be executed in a cluster. This translation step from query to syntax tree to physical operators allows the engine to optimize queries. Query engines such as Hive, Trino, Spark, and Flink have cost-based query optimizers that can analyze the properties of join inputs and automatically decide which algorithm would be most suitable for the task at hand. Optimizers might even change the order of joins so that the amount of intermediate state is minimized [19,26,27,28].
高级查询语言不仅提高了使用系统的用户的生产力,还提升了机器层面的任务执行效率。正如我们在《云数据仓库》中所见,查询引擎负责将 SQL 查询转换为集群中执行的批处理作业。从查询到语法树再到物理操作符的转换步骤,使引擎能够优化查询。Hive、Trino、Spark 和 Flink 等查询引擎具有基于成本的查询优化器,可以分析连接输入的特性,并自动决定最适合当前任务的算法。优化器甚至可能改变连接的顺序,以最小化中间状态的数量 [19, 26, 27, 28]。

While SQL is the most popular general-purpose batch processing query language, other languages remain in use for niche use cases. Apache Pig was a language based on relational operators that allowed data pipelines to be specified step by step, rather than as one big SQL query. DataFrames (see next section) have similar characteristics, and Morel is a more modern language influenced by Pig. Other users have adopted JSON query languages such as jq, JMESPath, or JsonPath.
虽然 SQL 是最流行的通用批处理查询语言,但其他语言仍然用于特定的用例。Apache Pig 是一种基于关系运算符的语言,允许逐步指定数据管道,而不是作为一个大的 SQL 查询。DataFrame(见下一节)具有类似的特性,Morel 是一种受 Pig 影响更现代的语言。其他用户采用了 JSON 查询语言,如 jq、JMESPath 或 JsonPath。

In “Graph-Like Data Models” we discussed using graphs for modeling data, and using graph query languages to traverse the edges and vertices in a graph. Many graph processing frameworks also support batch computation through query languages such as Apache TinkerPop’s Gremlin. We will look at graph processing use cases in more detail in “Batch Use Cases”.
在 “图状数据模型” 中,我们讨论了使用图来建模数据,以及使用图查询语言来遍历图中的边和顶点。许多图处理框架也通过 Apache TinkerPop 的 Gremlin 等查询语言支持批处理计算。我们将在 “批处理用例” 中更详细地探讨图处理用例。

DataFrames 数据帧#

As data scientists and statisticians began using distributed batch processing frameworks for machine learning use cases, they found existing processing models cumbersome, as they were used to working with the DataFrame data model found in R and Pandas (see “DataFrames, Matrices, and Arrays”). A DataFrame is similar to a table in a relational database: it is a collection of rows, and all the values in the same column have the same type. Instead of writing one big SQL query, users call functions corresponding to relational operators to perform filters, joins, sorting, group by, and other operations.
随着数据科学家和统计学家开始使用分布式批处理框架进行机器学习用例,他们发现现有的处理模型过于繁琐,因为他们习惯于使用 R 和 Pandas 中发现的 DataFrame 数据模型(参见 “数据帧、矩阵和数组”)。数据帧类似于关系数据库中的表:它是由多行组成的集合,同一列中的所有值具有相同的数据类型。用户不再需要编写一个大的 SQL 查询,而是调用对应于关系运算符的函数来执行过滤、连接、排序、分组和其他操作。

Originally, DataFrame manipulation typically occurred locally, in memory. Consequently, DataFrames were limited to datasets that fit on a single machine. Data scientists wanted to interact with the large datasets found in batch processing environments using the DataFrame APIs they were used to. Distributed data processing frameworks such as Spark, Flink, and Daft have adopted DataFrame APIs to meet this need.
最初,数据帧的操作通常在本地内存中进行。因此,数据帧仅限于适合单个机器的数据集。数据科学家希望使用他们习惯的 DataFrame API 与批处理环境中的大型数据集进行交互。Spark、Flink 和 Daft 等分布式数据处理框架采用了 DataFrame API 以满足这一需求。

DataFrame APIs appear similar to dataflow APIs, but implementations vary. While Pandas executes operations immediately when the DataFrame methods are called, Apache Spark first translates all the DataFrame API calls into a query plan and runs query optimization before executing the workflow on top of its distributed dataflow engine. This allows it to improve performance.
DataFrame API 与数据流 API 看起来相似,但实现方式不同。Pandas 在调用 DataFrame 方法时立即执行操作,而 Apache Spark 首先将所有 DataFrame API 调用转换为查询计划,并在其分布式数据流引擎上运行查询优化,然后再执行工作流。这使得它能够提高性能。

Frameworks such as Daft even support both client and server-side computation. Smaller, in-memory operations are executed on the client while larger datasets and computation are executed on the server. Columnar storage formats such as Apache Arrow offer a unified data model that both client and server-side execution engines can share.
框架如 Daft 甚至支持客户端和服务器端计算。较小的内存操作在客户端执行,而较大的数据集和计算在服务器上执行。列式存储格式如 Apache Arrow 提供了一种统一的数据模型,客户端和服务器端执行引擎都可以共享。

Batch Use Cases 批处理用例#

Now that we’ve seen how batch processing works, let’s see how it is applied to a range of different applications. Batch jobs are excellent for processing large datasets in bulk, but they aren’t good for low latency use cases. Consequently, you’ll find batch jobs wherever there’s a lot of data and data freshness isn’t important. This might sound limiting, but it turns out that the a significant amount of data processing fits this model:
既然我们已经了解了批处理的工作原理,让我们看看它是如何应用于各种不同应用程序的。批处理作业非常适合批量处理大型数据集,但它们不适合低延迟用例。因此,你会在大量数据且数据新鲜度不重要的地方找到批处理作业。这可能听起来有限制,但事实证明,大量数据处理符合这种模型:

  • Accounting and inventory reconciliation, where companies verify that transactions line up with their bank accounts and inventory, are often done in batch [29].
    会计和库存核对,公司验证交易是否与银行账户和库存一致,通常以批处理方式完成 [29]。
  • In manufacturing, demand forecasting is computed in periodic batch jobs [30].
    在制造业中,需求预测通过周期性批处理作业进行计算 [30]。
  • Ecommerce, media, and social media companies train their recommendation models using batch jobs [31,32].
    电子商务、媒体和社交媒体公司使用批处理作业来训练他们的推荐模型 [31, 32]。
  • Many financial systems are batch-based, as well. For example, the United States’s banking network runs almost entirely on batch jobs [33].
    许多金融系统也是基于批处理的。例如,美国的银行网络几乎完全依赖批处理作业运行 [33]。

In the following sections, we’ll discuss some of the batch processing use cases you’ll find in nearly every industry.
在接下来的章节中,我们将讨论在几乎所有行业中都能找到的一些批处理应用案例。

Extract–Transform–Load (ETL) 抽取 - 转换 - 加载(ETL)#

“Data Warehousing” introduced the idea of ETL, where a data processing pipeline extracts data from a production database, transforms it, and loads results into a downstream system. Batch jobs are often used for ETL workloads, especially when the downstream system is a data warehouse.
“数据仓库” 引入了 ETL 的概念,即数据处理管道从生产数据库中抽取数据,进行转换,并将结果加载到下游系统中。批处理作业通常用于 ETL 工作负载,尤其是在下游系统是数据仓库时。

The parallel nature of batch jobs makes them a great fit for data transformation. Much of data transformation involves “embarrassingly parallel” workloads. Filtering data, projecting fields, and many other common data warehouse transformations can all be done in parallel.
批处理作业的并行特性使其非常适合数据转换。数据转换的大部分工作涉及 “显而易见并行” 的工作负载。过滤数据、投影字段以及许多其他常见的数据仓库转换都可以并行完成。

Batch processing environments also come with robust workflow schedulers, which make it easy to schedule, orchestrate, and debug ETL data pipeline jobs. When a failure occurs, schedulers often retry jobs to mitigate transient issues that might occur. A job that fails repeatedly will be marked as failed, which helps developers easily see which job in their data pipeline stopped working. Schedulers like Airflow even come with built-in source, sink, and query operators for MySQL, PostgreSQL, Snowflake, Spark, Flink, and dozens of other popular systems. A tight integration between schedulers and data processing systems simplifies data integration.
批处理环境还配备了强大的工作流调度器,这使得安排、协调和调试 ETL 数据管道作业变得容易。当发生故障时,调度器通常会重试作业以减轻可能出现的暂时性问题。反复失败的作业会被标记为失败,这有助于开发人员轻松识别其数据管道中哪个作业停止工作。像 Airflow 这样的调度器甚至内置了针对 MySQL、PostgreSQL、Snowflake、Spark、Flink 以及其他几十种流行系统的源、汇和查询操作符。调度器与数据处理系统之间的紧密集成简化了数据集成。

We’ve also seen that batch jobs are easy to troubleshoot and fix when things go awry. This feature is invaluable when debugging data pipelines. Failed files can be easily inspected to see what went wrong, and ETL batch jobs can be fixed and re-run. For example, an input file might no longer contain a field that a transformation batch job intends to use. Data engineers will see that the field is missing, and update the transformation logic or the job that produced the input.
我们还看到,当出现问题时,批处理作业易于排查和修复。这一特性在调试数据管道时非常有价值。失败的文件可以轻松检查以了解问题所在,ETL 批处理作业可以修复并重新运行。例如,输入文件可能不再包含转换批处理作业打算使用的字段。数据工程师会注意到字段缺失,并更新转换逻辑或生成输入的作业。

Data pipelines used to be managed by a single data engineering team, as it was considered unfair to ask other teams working on product features to write and manage complex batch data pipelines. Recently, improvements in batch processing models and metadata management have made it much easier for engineers across an organization to contribute to and manage their own data pipelines.Data mesh [34,35],data contract [36], and data fabric [37] practices provide standards and tools to help teams safely publish their data for consumption by anybody in the organization.
数据管道过去由单一的数据工程团队管理,因为要求其他正在开发产品功能的团队编写和管理复杂的批量数据管道被认为是不公平的。最近,批量处理模型和元数据管理的改进使得组织内的工程师更容易贡献和管理自己的数据管道。数据网格 [34, 35]、数据合约 [36] 和数据织物 [37] 实践提供了标准和工具,帮助团队安全地发布其数据供组织内的任何人使用。

Data pipelines and analytic queries have begun to share not only processing models, but execution engines as well. Many batch ETL jobs now run on the same systems as the analytic queries that read their output. It is not uncommon to see data pipeline transformations and analytic queries both run as SparkSQL, Trino, or DuckDB queries. Such an architecture further blurs the line between application engineering, data engineering, analytics engineering, and business analysis.
数据管道和分析查询开始不仅共享处理模型,还共享执行引擎。许多批量 ETL 作业现在在读取其输出的分析查询所运行的同一系统上运行。看到数据管道转换和分析查询都作为 SparkSQL、Trino 或 DuckDB 查询运行是很常见的。这种架构进一步模糊了应用工程、数据工程、分析工程和商业分析之间的界限。

Analytics 分析#

In “Analytical versus Operational Systems”, we saw that analytic queries (OLAP) often scan over a large number of records, performing groupings and aggregations. It is possible to run such workloads in a batch processing system, alongside other batch processing workloads. Analysts write SQL queries that execute atop a query engine, which reads and writes from a distributed file system or object store. Table metadata such as table to file mappings, names, and types are managed with table formats such as Apache Iceberg and catalogs such as Unity (see “Cloud Data Warehouses”). This architecture is known as a data lakehouse [38].
在 “分析系统与操作系统” 一节中,我们看到分析查询(OLAP)通常需要扫描大量记录,进行分组和聚合。这类工作负载可以在批处理系统中与其他批处理工作负载一起运行。分析师编写 SQL 查询,这些查询在查询引擎上执行,查询引擎从分布式文件系统或对象存储中读取和写入数据。表元数据(如表到文件的映射、名称和类型)通过 Apache Iceberg 等表格式和 Unity 等目录进行管理(参见 “云数据仓库”)。这种架构被称为数据湖屋 [38]。

As with ETL, improvements in SQL query interfaces mean many organizations now use batch frameworks such as Spark for analytics. Such query patterns come in two styles:
与 ETL 类似,SQL 查询界面的改进意味着许多组织现在使用 Spark 等批处理框架进行分析。这类查询模式有两种风格:

  • Pre-aggregation queries, where data is rolled up into OLAP cubes or data marts. Pre-aggregated data is queried in the warehouse or pushed to purpose-built realtime OLAP systems such as Apache Druid or Apache Pinot. Pre-aggregation normally takes place at a scheduled interval. The workflow schedulers discussed in “Scheduling Workflows” are used to manage these workloads.
    预聚合查询,其中数据被汇总到 OLAP 立方体或数据集市中。预聚合数据在仓库中被查询,或被推送到专门构建的实时 OLAP 系统,如 Apache Druid 或 Apache Pinot。预聚合通常在预定的时间间隔内进行。在 “工作流调度” 中讨论的工作流调度器用于管理这些工作负载。
  • Ad hoc queries that users run to answer specific business questions, investigate user behavior, debug operational issues, and much more. Response times are important for this use case. Analysts run queries iteratively as they get responses and learn more about the data they’re investigating. Batch processing frameworks with fast query execution help reduce waiting times for analysts.
    用户运行的自适应查询,用于回答特定业务问题、调查用户行为、调试运营问题等。响应时间对此用例至关重要。分析师在获取响应和学习更多关于他们正在调查的数据时,会迭代运行查询。具有快速查询执行能力的批处理框架有助于减少分析师的等待时间。

SQL support enables batch processing frameworks to integrate with spreadsheets and data visualization tools such as Tableau, Power BI, Looker, and Apache Superset. For example, Tableau offers SparkSQL and Presto connectors, while Apache Superset supports Trino, Hive, Spark SQL, Presto, and many other systems that ultimately execute batch jobs to query data.
SQL 支持使批处理框架能够与电子表格和数据可视化工具(如 Tableau、Power BI、Looker 和 Apache Superset)集成。例如,Tableau 提供 SparkSQL 和 Presto 连接器,而 Apache Superset 支持 Trino、Hive、Spark SQL、Presto 以及其他许多系统,这些系统最终会执行批处理作业来查询数据。

Machine learning 机器学习#

Machine learning (ML) makes frequent use of batch processing. Data scientists, ML engineers, and AI engineers use batch processing frameworks to investigate data patterns, transform data, and train machine learning models. Common uses include:
机器学习(ML)频繁使用批处理。数据科学家、机器学习工程师和人工智能工程师使用批处理框架来研究数据模式、转换数据以及训练机器学习模型。常见用途包括:

  • Feature engineering: Raw data is filtered and transformed into data that models can be trained on. Predictive models often need numeric data, so engineers must transform other forms of data (such as text or discrete values) into the required format.
    特征工程:原始数据被过滤和转换成模型可以训练的数据。预测模型通常需要数值数据,因此工程师必须将其他形式的数据(如文本或离散值)转换为所需格式。
  • Model training: The training data is the input to the batch process, and the weights of the trained model are the output.
    模型训练:训练数据是批处理的输入,训练好的模型的权重是输出。
  • Batch inference: A trained model can then be used to make predictions in bulk if datasets are large and realtime results are not required. This includes evaluating the model’s predictions on a test dataset.
    批处理推理:如果数据集很大且不需要实时结果,则可以使用训练好的模型进行批量预测。这包括在测试数据集上评估模型的预测。

Batch processing frameworks provide tools explicitly for these use cases. For example, Apache Spark’s MLlib and Apache Flink’s FlinkML come with a wide variety of feature engineering tools, statistical functions, and classifiers.
批处理框架为这些用例提供了专门的工具。例如,Apache Spark 的 MLlib 和 Apache Flink 的 FlinkML 提供了丰富的特征工程工具、统计函数和分类器。

Machine learning applications such as recommendation engines and ranking systems also make heavy use of graph processing (see “Graph-Like Data Models”). Many graph algorithms are expressed by traversing one edge at a time, joining one vertex with an adjacent vertex in order to propagate some information, and repeating until some condition is met—for example, until there are no more edges to follow, or until some metric converges.
推荐引擎和排名系统等机器学习应用也大量使用图处理(参见 “图状数据模型”)。许多图算法通过逐次遍历一条边、将一个顶点与其相邻顶点连接以传播某些信息,并重复直到满足某个条件 —— 例如,直到没有更多边可遍历,或某个指标收敛。

The bulk synchronous parallel (BSP) model of computation [39] has become popular for batch processing graphs. Among others, it is implemented by Apache Giraph [20], Spark’s GraphX API, and Flink’s Gelly API [40]. It is also known as the Pregel model, as Google’s Pregel paper popularized this approach for processing graphs [41].
计算模型中的批量同步并行(BSP)模型 [39] 已成为批处理图的主流。其中包括 Apache Giraph [20]、Spark 的 GraphX API 和 Flink 的 Gelly API [40] 的实现。它也被称为 Pregel 模型,因为 Google 的 Pregel 论文推广了这种方法用于处理图 [41]。

Batch processing is also an integral part of large language model (LLM) data preparation and training. Raw text input data such as websites typically reside in a DFS or object store. This data must be pre-processed to make it suitable for training. Pre-processing steps that are well-suited for batch processing frameworks include:
批量处理也是大型语言模型(LLM)数据准备和训练的重要组成部分。原始文本输入数据如网站通常存储在分布式文件系统(DFS)或对象存储中。这些数据必须经过预处理才能用于训练。适合批量处理框架的预处理步骤包括:

  • Plain text must be extracted from HTML and malformed text must be fixed.
    需要从 HTML 中提取纯文本,并修复格式不正确的文本。
  • Low quality, irrelevant, and duplicate documents must be detected and removed.
    必须检测并移除低质量、不相关和重复的文档。
  • Text must be tokenized (split into words) and converted into embeddings, which are numeric representations each word.
    文本必须进行分词(分割成单词)并转换为嵌入,嵌入是每个单词的数值表示。

Batch processing frameworks such as Kubeflow, Flyte, and Ray are purpose-built for such workloads. OpenAI uses Ray as part of its ChatGPT training process, for example [42]. These frameworks have built-in integrations for LLM and AI libraries such as PyTorch, Tensorflow, XGBoost, and many others. They also offer build-in support for feature engineering, model training, batch inference, and fine tuning (adjusting a foundational model for specific use cases).
批处理框架如 Kubeflow、Flyte 和 Ray 专为这类工作负载而设计。例如,OpenAI 在其 ChatGPT 训练过程中使用 Ray [42]。这些框架内置了对 LLM 和 AI 库(如 PyTorch、Tensorflow、XGBoost 等)的集成。它们还提供内置支持用于特征工程、模型训练、批量推理和微调(针对特定用例调整基础模型)。

Finally, data scientists often experiment with data in interactive notebooks such as Jupyter or Hex. Notebooks are made up of cells, which are small chunks of markdown, Python, or SQL. Cells are executed sequentially to produce spreadsheets, graphs, or data. Many notebooks use batch processing via DataFrame APIs or query such systems using SQL.
最后,数据科学家经常在 Jupyter 或 Hex 等交互式笔记本中实验数据。笔记本由单元格组成,每个单元格是 Markdown、Python 或 SQL 的小块。单元格按顺序执行以生成电子表格、图表或数据。许多笔记本通过 DataFrame API 使用批处理,或使用 SQL 查询这些系统。

Bulk data imports 批量数据导入#

Batch jobs are often used to build pre-computed or derived datasets such as product recommendations, user-facing reports, and features for machine learning models. These datasets are typically served from a production database, key-value store, or search engine. Regardless of the system used, the pre-computed data needs to make its way from the batch processor’s distributed filesystem or object store back into the database that’s serving live traffic.
批处理任务通常用于构建预计算或派生数据集,例如产品推荐、面向用户报告和机器学习模型的特征。这些数据集通常从生产数据库、键值存储或搜索引擎中提供。无论使用何种系统,预计算数据都需要从批处理处理器的分布式文件系统或对象存储中传输回服务于实时流量的数据库。

The most obvious choice might be to use the client library for your favorite database directly within a batch job, and to write directly to the database server, one record at a time. This will work (assuming your firewall rules allow direct access from your batch processing environment to your production databases), but it is a bad idea for several reasons:
最直接的选择可能是直接在批处理作业中使用你最喜欢的数据库的客户端库,一次写一条记录直接写入数据库服务器。这会起作用(假设你的防火墙规则允许从批处理环境直接访问你的生产数据库),但它有几个缺点:

  • Making a network request for every single record is orders of magnitude slower than the normal throughput of a batch task. Even if the client library supports batching, performance is likely to be poor.
    对每条记录都发起网络请求,其速度比批量任务的正常吞吐量慢几个数量级。即使客户端库支持批量处理,性能也可能很差。
  • Batch processing frameworks often run many tasks in parallel. If all the tasks concurrently write to the same output database, with a rate expected of a batch process, that database can easily be overwhelmed, and its performance for queries is likely to suffer. This can in turn cause operational problems in other parts of the system [43].
    批量处理框架通常并行运行许多任务。如果所有任务同时向同一个输出数据库写入,且写入速率符合批量处理预期,那么该数据库很容易被淹没,其查询性能也可能受到影响。这反过来又可能导致系统其他部分的运维问题 [43]。
  • Normally, batch jobs provide a clean all-or-nothing guarantee for job output: if a job succeeds, the result is the output of running every task exactly once, even if some tasks failed and had to be retried along the way; if the entire job fails, no output is produced. However, writing to an external system from inside a job produces externally visible side effects that cannot be hidden in this way. Thus, you have to worry about the results from partially completed jobs being visible to other systems. If a task fails and is restarted, it may duplicate output from the failed execution.
    通常,批量作业为作业输出提供清晰的 “全有或全无” 保证:如果作业成功,结果是运行每个任务一次的输出,即使有些任务失败并需要重试;如果整个作业失败,则不会产生任何输出。然而,从作业内部向外部系统写入会产生外部可见的副作用,无法以这种方式隐藏。因此,你必须担心部分完成的作业结果对其他系统可见。如果任务失败并重启,它可能会重复失败执行的输出。

A better solution is to have batch jobs push pre-computed datasets to streams such as Kafka topics, which we discuss further in Chapter 12. Search engines like Elasticsearch, realtime OLAP systems like Apache Pinot and Apache Druid, derived datastores like Venice [44], and cloud data warehouses like ClickHouse all have the built-in ability to ingest data from Kafka into their systems. Pushing data through a streaming systems fixes a few of the problems we discussed above:
一个更好的解决方案是让批处理作业将预计算的数据集推送到像 Kafka 主题这样的流中,我们将在第 12 章中进一步讨论这一点。像 Elasticsearch 这样的搜索引擎、像 Apache Pinot 和 Apache Druid 这样的实时 OLAP 系统、像 Venice [44] 这样的衍生数据存储以及像 ClickHouse 这样的云数据仓库,都具有内置的能力可以从 Kafka 导入数据到它们的系统中。通过流系统推送数据解决了我们上面讨论的几个问题:

  • Streaming systems are optimized for sequential writes, which make them better suited for the bulk write workload of a batch job.
    流系统针对顺序写入进行了优化,这使得它们更适合批处理作业的大批量写入工作负载。
  • Streaming systems can also act as a buffer between the batch job and the production databases. Downstream systems can throttle their read rate to ensure they can continue to comfortably serve production traffic.
    流系统还可以作为批处理作业和生产数据库之间的缓冲器。下游系统可以调节它们的读取速率,以确保它们能够舒适地继续服务生产流量。
  • The output of a single batch job can be consumed by multiple downstream systems.
    一个批处理作业的输出可以被多个下游系统消费。
  • Streaming systems can serve as a security boundary between batch processing environments and production networks: they can be deployed in a so-called DMZ (demilitarized zone) network that sits between the batch processing network and production network.
    流系统可以作为批处理环境和生产网络之间的安全边界:它们可以部署在所谓的 DMZ(非军事区)网络中,该网络位于批处理网络和生产网络之间。

Pushing data through streams doesn’t inherently solve the all-or-nothing guarantee issue we discussed above. To make this work, batch jobs must send a notification to downstream systems that their job is complete and the data can now be served. Consumers of the stream need to be able to keep data they receive invisible to queries, like an uncommitted transaction with read committed isolation (see “Read Committed”), until they are notified that it is complete.
通过流式传输数据并不能从根本上解决我们上面讨论的全有或全无保证问题。为了让这工作,批处理作业必须向下游系统发送通知,告知它们作业已完成,数据现在可以使用了。流式传输的消费者需要能够保持他们接收到的数据对查询不可见,就像一个具有读已提交隔离级别(参见 “读已提交”)的未提交事务,直到收到通知表示其完成。

Another pattern that is more common when bootstrapping databases is to build a brand-new database inside the batch job and bulk load those files directly into the database from a distributed filesystem, object store, or local filesystem. Many data systems offer bulk import tools such as TiDB’s Lightning tool, or Apache Pinot’s and Apache Druid’s Hadoop import jobs. RocksDB also offers an API to bulk import SSTs from batch jobs.
在启动数据库时,另一个更常见的模式是在批处理作业中构建一个全新的数据库,并将文件直接从分布式文件系统、对象存储或本地文件系统批量加载到数据库中。许多数据系统提供批量导入工具,例如 TiDB 的 Lightning 工具,或 Apache Pinot 和 Apache Druid 的 Hadoop 导入作业。RocksDB 也提供了一个 API,用于从批处理作业批量导入 SST 文件。

Building databases in batch and bulk importing the data is very fast, and makes it easier for systems to atomically switch between dataset versions. On the other hand, it can be challenging to incrementally update datasets from batch jobs that build brand-new databases. It’s common to take a hybrid approach in situations where both bootstrapping and incremental loads are needed. Venice, for example, supports hybrid stores that allow for batch row-based updates and full dataset swaps.
批量构建数据库和批量导入数据非常快,这使得系统更容易在数据集版本之间原子切换。另一方面,从构建全新数据库的批量任务中增量更新数据集可能具有挑战性。在需要引导和增量加载的情况下,通常采用混合方法。例如,Venice 支持混合存储,允许批量基于行的更新和完整数据集替换。

Summary 摘要#

In this chapter, we explored the design and implementation of batch processing systems. We began with the classic Unix toolchain (awk, sort, uniq, etc.), to illustrate fundamental batch processing primitives such as sorting and counting.
在本章中,我们探讨了批量处理系统的设计和实现。我们从经典的 Unix 工具链(awk、sort、uniq 等)开始,以说明排序和计数等基本批量处理原语。

We then scaled up to distributed batch processing systems. We saw that batch-style I/O processes immutable, bounded input datasets to produce output data, allowing reruns and debugging without side effects. To process files, we saw that batch frameworks have three main components: an orchestration layer that determines where and when jobs run, a storage layer to persist data, and a computation layer that processes the actual data.
然后我们扩展到分布式批量处理系统。我们看到批量风格的 I / O 处理不可变、有界输入数据集以生成输出数据,允许无副作用地重跑和调试。为了处理文件,我们看到批量框架有三个主要组件:一个编排层,用于确定作业的运行位置和时间;一个存储层,用于持久化数据;以及一个计算层,用于处理实际数据。

We looked at how distributed filesystems and object stores manage large files through block-based replication, caching, and metadata services, and how modern batch frameworks interact with these systems using pluggable APIs. We also discussed how orchestrators schedule tasks, allocate resources, and handle faults in large clusters. We also compared job orchestrators that schedule jobs with workflow orchestrators that manage the lifecycle of a collection of jobs that run in a dependency graph.
我们探讨了分布式文件系统和对象存储如何通过基于块的复制、缓存和元数据服务来管理大文件,以及现代批处理框架如何使用可插拔的 API 与这些系统交互。我们还讨论了协调器如何在大集群中调度任务、分配资源和处理故障。此外,我们还比较了作业协调器和工作流协调器,工作流协调器管理在依赖图中年运行的一组作业的生命周期。

We surveyed batch processing models, starting with MapReduce and its canonical map and reduce functions. Next, we turned to dataflow engines like Spark and Flink, which offer simpler-to-use dataflow APIs and better performance. To understand how batch jobs scale, we covered the shuffle algorithm, a foundational operation that enables grouping, joining, and aggregation.
我们调研了批处理模型,从 MapReduce 及其经典的 map 和 reduce 函数开始。接着,我们转向了像 Spark 和 Flink 这样的数据流引擎,它们提供了更易用的数据流 API 和更好的性能。为了理解批处理作业如何扩展,我们介绍了洗牌算法,这是一种基础操作,能够实现分组、连接和聚合。

As batch systems matured, focus shifted to usability. You learned about high-level query languages like SQL and DataFrame APIs, which make batch jobs more accessible and easier to optimize. Query optimizers translate declarative queries into efficient execution plans.
随着批处理系统的成熟,重点转向了可用性。你学习了像 SQL 和 DataFrame API 这样的高级查询语言,它们使批处理作业更易于访问和优化。查询优化器将声明式查询转换为高效的执行计划。

We finished the chapter with common batch processing use cases:
我们以常见的批处理用例结束本章:

  • ETL pipelines, which extract, transform, and load data between different systems using scheduled workflows;
    ETL 管道,它使用计划工作流在不同的系统之间提取、转换和加载数据;
  • Analytics, where batch jobs support both pre-aggregated dashboards and ad hoc queries;
    分析领域,其中批处理任务支持预先聚合的仪表板和即席查询;
  • Machine learning, where batch jobs prepare and process large training datasets;
    机器学习领域,其中批处理任务准备和处理大型训练数据集;
  • Bulk imports, which populate production-facing systems from batch outputs, often via streams or bulk loading tools.
    批量导入,它通过流或批量加载工具将生产系统从批处理输出中填充数据。

In the next chapter, we will turn to stream processing, in which the input is unbounded —that is, you still have a job, but its inputs are never-ending streams of data. In this case, a job is never complete, because at any time there may still be more work coming in. We shall see that stream and batch processing are similar in some respects, but the assumption of unbounded streams also changes a lot about how we build systems.
在下一章中,我们将转向流处理,其中输入是无界的 —— 也就是说,你仍然有一个任务,但其输入是永无止境的数据流。在这种情况下,任务永远不会完成,因为随时可能还有更多的工作进来。我们将看到流处理和批处理在某些方面是相似的,但无界流的假设也改变了我们构建系统的方式。

Footnotes 脚注#
References 参考文献#

[1] Nathan Marz.How to Beat the CAP Theorem. nathanmarz.com, October 2011. Archived at perma.cc/4BS9-R9A4
[1] Nathan Marz. 如何击败 CAP 定理. nathanmarz.com, 2011 年 10 月. 归档于 perma.cc / 4BS9 - R9A4

[2] Molly Bartlett Dishman and Martin Fowler.Agile Architecture. At O’Reilly Software Architecture Conference, March 2015.
[2] Molly Bartlett Dishman 和 Martin Fowler. 敏捷架构. 在 O’Reilly 软件架构会议, 2015 年 3 月.

[3] Jeffrey Dean and Sanjay Ghemawat.MapReduce: Simplified Data Processing on Large Clusters. At 6th USENIX Symposium on Operating System Design and Implementation (OSDI), December 2004.
[3] Jeffrey Dean 和 Sanjay Ghemawat. MapReduce: 在大型集群上的简化数据处理. 在第 6 届 USENIX 操作系统设计与实现研讨会 (OSDI), 2004 年 12 月.

[4] Shivnath Babu and Herodotos Herodotou.Massively Parallel Databases and MapReduce Systems. Foundations and Trends in Databases, volume 5, issue 1, pages 1–104, November 2013.doi.1561/1900000036
[4] Shivnath Babu 和 Herodotos Herodotou. 大规模并行数据库和 MapReduce 系统. 数据库基础与趋势, 卷 5, 第 1 期, 页 1-104, 2013 年 11 月. doi.1561/1900000036

[5] David J. DeWitt and Michael Stonebraker.MapReduce: A Major Step Backwards. Originally published at databasecolumn.vertica.com, January 2008. Archived at perma.cc/U8PA-K48V
[5] David J. DeWitt 和 Michael Stonebraker. MapReduce:一个重大的倒退。最初发表于 databasecolumn.vertica.com,2008 年 1 月。存档于 perma.cc / U8PA - K48V

[6] Henry Robinson.The Elephant Was a Trojan Horse: On the Death of Map-Reduce at Google.the-paper-trail.org, June 2014. Archived at perma.cc/9FEM-X787
[6] Henry Robinson. 大象是个特洛伊木马:关于谷歌中 Map-Reduce 的消亡。the-paper-trail.org,2014 年 6 月。存档于 perma.cc / 9FEM - X787

[7] Urs Hölzle.R.I.P. MapReduce. After having served us well since 2003, today we removed the remaining internal codebase for good.twitter.com, September 2019. Archived at perma.cc/B34T-LLY7
[7] Urs Hölzle. MapReduce,安息吧。自 2003 年以来一直为我们服务,今天我们彻底移除了剩余的内部代码库。twitter.com,2019 年 9 月。存档于 perma.cc / B34T - LLY7

[8] Adam Drake.Command-Line Tools Can Be 235x Faster than Your Hadoop Cluster. aadrake.com, January 2014. Archived at perma.cc/87SP-ZMCY
[8] Adam Drake. 命令行工具可以比你的 Hadoop 集群快 235 倍。aadrake.com,2014 年 1 月。存档于 perma.cc / 87SP - ZMCY

[9] sort: Sort text files. GNU Coreutils 9.7 Documentation, Free Software Foundation, Inc., 2025.
[ 9] sort: 排序文本文件。GNU Coreutils 9.7 文档,自由软件基金会,2025 年。

[10] Michael Ovsiannikov, Silvius Rus, Damian Reeves, Paul Sutter, Sriram Rao, and Jim Kelly.The Quantcast File System. Proceedings of the VLDB Endowment, volume 6, issue 11, pages 1092–1101, August 2013.doi.14778/2536222.2536234
[10] Michael Ovsiannikov, Silvius Rus, Damian Reeves, Paul Sutter, Sriram Rao 和 Jim Kelly。Quantcast 文件系统。《VLDB 论文集》,第 6 卷,第 11 期,第 1092-1101 页,2013 年 8 月。doi.14778/2536222.2536234

[11] Andrew Wang, Zhe Zhang, Kai Zheng, Uma Maheswara G., and Vinayakumar B.Introduction to HDFS Erasure Coding in Apache Hadoop. blog.cloudera.com, September 2015. Archived at archive.org
[11] Andrew Wang, Zhe Zhang, Kai Zheng, Uma Maheswara G. 和 Vinayakumar B. Apache Hadoop 中的 HDFS 消息编码简介。blog.cloudera.com,2015 年 9 月。存档于 archive.org

[12] Andy Warfield.Building and operating a pretty big storage system called S3. allthingsdistributed.com, July 2023. Archived at perma.cc/7LPK-TP7V
[12] Andy Warfield。构建和运营一个名为 S3 的相当大的存储系统。allthingsdistributed.com,2023 年 7 月。存档于 perma.cc / 7LPK - TP7V

[13] Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, Sharad Agarwal, Mahadev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh Shah, Siddharth Seth, Bikas Saha, Carlo Curino, Owen O’Malley, Sanjay Radia, Benjamin Reed, and Eric Baldeschwieler.Apache Hadoop YARN: Yet Another Resource Negotiator. At 4th Annual Symposium on Cloud Computing (SoCC), October 2013.doi.1145/2523616.2523633
[13] Vinod Kumar Vavilapalli, Arun C. Murthy, Chris Douglas, Sharad Agarwal, Mahadev Konar, Robert Evans, Thomas Graves, Jason Lowe, Hitesh Shah, Siddharth Seth, Bikas Saha, Carlo Curino, Owen O’Malley, Sanjay Radia, Benjamin Reed, and Eric Baldeschwieler. Apache Hadoop YARN: Yet Another Resource Negotiator. 在第 4 届云计算年会上(SoCC),2013 年 10 月. doi.1145/2523616.2523633

[14] Richard M. Karp.Reducibility Among Combinatorial Problems.Complexity of Computer Computations. The IBM Research Symposia Series. Springer, 1972.doi.1007/978-1-4684-2001-2_9
[14] Richard M. Karp. 组合问题间的可约性. 计算机计算复杂性. IBM 研究研讨会系列. Springer, 1972. doi.1007/978-1-4684-2001-2_9

[15] J. D. Ullman.NP-Complete Scheduling Problems.Journal of Computer and System Sciences, volume 10, issue 3, June 1975.doi.1016/S0022-0000(75)80008-0
[15] J. D. Ullman. NP 完全调度问题. 计算机与系统科学杂志,第 10 卷,第 3 期,1975 年 6 月. doi.1016/S0022-0000(75)80008-0

[16] Gilad David Maayan.The complete guide to spot instances on AWS, Azure and GCP. datacenterdynamics.com, March 2021. Archived at archive.org
[16] Gilad David Maayan. AWS、Azure 和 GCP 上的 Spot 实例完整指南. datacenterdynamics.com, 2021 年 3 月. 已存档于 archive.org

[17] Abhishek Verma, Luis Pedrosa, Madhukar Korupolu, David Oppenheimer, Eric Tune, and John Wilkes.Large-Scale Cluster Management at Google with Borg. At 10th European Conference on Computer Systems (EuroSys), April 2015.doi.1145/2741948.2741964
[17] Abhishek Verma、Luis Pedrosa、Madhukar Korupolu、David Oppenheimer、Eric Tune 和 John Wilkes。谷歌大规模集群管理技术 Borg。在第十届欧洲计算机系统会议(EuroSys)上,2015 年 4 月。doi.1145/2741948.2741964

[18] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, and Ion Stoica.Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing. At 9th USENIX Symposium on Networked Systems Design and Implementation (NSDI), April 2012.
[18] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker 和 Ion Stoica. 弹性分布式数据集:内存集群计算的容错抽象。在 2012 年 4 月的第 9 届 USENIX 网络系统设计与实现研讨会(NSDI)上。

[19] Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker Markl, and Kostas Tzoumas.Apache Flink™: Stream and Batch Processing in a Single Engine. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, volume 38, issue 4, December 2015. Archived at perma.cc/G3N3-BKX5
[19] Paris Carbone, Stephan Ewen, Seif Haridi, Asterios Katsifodimos, Volker Markl 和 Kostas Tzoumas. Apache Flink™:单一引擎的流和批处理。IEEE 计算机学会数据工程技术委员会公报,第 38 卷,第 4 期,2015 年 12 月。存档于 perma.cc / G3N3 - BKX5。

[20] Mark Grover, Ted Malaska, Jonathan Seidman, and Gwen Shapira.Hadoop Application Architectures. O’Reilly Media, 2015. ISBN: 978-1-491-90004-8
[20] Mark Grover, Ted Malaska, Jonathan Seidman 和 Gwen Shapira. Hadoop 应用架构。O’Reilly Media, 2015。ISBN: 978-1-491-90004-8。

[21] Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee.Learning Spark, 2nd Edition. O’Reilly Media, 2020. ISBN: 978-1492050049
[21] Jules S. Damji, Brooke Wenig, Tathagata Das 和 Denny Lee. 学习 Spark,第 2 版。O’Reilly Media, 2020。ISBN: 978-1492050049。

[22] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell, and Dennis Fetterly.Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks. At 2nd European Conference on Computer Systems (EuroSys), March 2007.doi.1145/1272996.1273005
[22] Michael Isard, Mihai Budiu, Yuan Yu, Andrew Birrell 和 Dennis Fetterly. Dryad: 从顺序构建模块构建的分布式数据并行程序. 在第 2 届欧洲计算机系统会议 (EuroSys) 上,2007 年 3 月. doi.1145/1272996.1273005

[23] Daniel Warneke and Odej Kao.Nephele: Efficient Parallel Data Processing in the Cloud. At 2nd Workshop on Many-Task Computing on Grids and Supercomputers (MTAGS), November 2009.doi.1145/1646468.1646476
[23] Daniel Warneke 和 Odej Kao. Nephele: 云中的高效并行数据处理. 在第 2 届网格和超级计算机上的多任务计算研讨会 (MTAGS) 上,2009 年 11 月. doi.1145/1646468.1646476

[24] Hossein Ahmadi.In-memory query execution in Google BigQuery. cloud.google.com, August 2016. Archived at perma.cc/DGG2-FL9W
[24] Hossein Ahmadi. Google BigQuery 的内存查询执行. cloud.google.com, 2016 年 8 月. 存档于 perma.cc/DGG2-FL9W

[25] Tom White.Hadoop: The Definitive Guide, 4th edition. O’Reilly Media, 2015. ISBN: 978-1-491-90163-2
[25] Tom White. Hadoop: 完美指南,第 4 版. O’Reilly Media, 2015. ISBN: 978-1-491-90163-2

[26] Fabian Hüske.Peeking into Apache Flink’s Engine Room. flink.apache.org, March 2015. Archived at perma.cc/44BW-ALJX
[26] Fabian Hüske. 探秘 Apache Flink 的引擎室. flink.apache.org, 2015 年 3 月. 归档于 perma.cc / 44BW - ALJX

[27] Mostafa Mokhtar.Hive 0.14 Cost Based Optimizer (CBO) Technical Overview. hortonworks.com, March 2015. Archived on archive.org
[27] Mostafa Mokhtar. Hive 0.14 基于成本的优化器(CBO)技术概述. hortonworks.com, 2015 年 3 月. 已在 archive.org 上存档

[28] Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, and Matei Zaharia.Spark SQL: Relational Data Processing in Spark. At ACM International Conference on Management of Data (SIGMOD), June 2015.doi.1145/2723372.2742797
[28] Michael Armbrust, Reynold S. Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K. Bradley, Xiangrui Meng, Tomer Kaftan, Michael J. Franklin, Ali Ghodsi, 和 Matei Zaharia. Spark SQL: Spark 中的关系数据处理. 在 ACM 国际数据管理会议(SIGMOD),2015 年 6 月. doi.1145/2723372.2742797

[29] Ammar Chalifah.Tracking payments at scale.bolt.eu.com, June 2025. Archived at perma.cc/Q4KX-8K3J
[29] Ammar Chalifah. 大规模支付追踪. bolt.eu.com, 2025 年 6 月. 存档于 perma.cc / Q4KX - 8K3J

[30] Nafi Ahmet Turgut, Hamza Akyıldız, Hasan Burak Yel, Mehmet İkbal Özmen, Mutlu Polatcan, Pinar Baki, and Esra Kayabali.Demand forecasting at Getir built with Amazon Forecast. aws.amazon.com.com, May 2023. Archived at perma.cc/H3H6-GNL7
[30] Nafi Ahmet Turgut, Hamza Akyıldız, Hasan Burak Yel, Mehmet İkbal Özmen, Mutlu Polatcan, Pinar Baki, 和 Esra Kayabali. 使用 Amazon Forecast 构建的 Getir 需求预测. aws.amazon.com.com, 2023 年 5 月. 存档于 perma.cc / H3H6 - GNL7

[31] Jason (Siyu) Zhu.Enhancing homepage feed relevance by harnessing the power of large corpus sparse ID embeddings.linkedin.com, August 2023. Archived at archive.org
[31] Jason (Siyu) Zhu. 通过利用大型语料库稀疏 ID 嵌入增强主页信息流的相关性. linkedin.com, 2023 年 8 月. 已存档于 archive.org

[32] Avery Ching, Sital Kedia, and Shuojie Wang.Apache Spark @Scale: A 60 TB+ production use case. engineering.fb.com, August 2016. Archived at perma.cc/F7R5-YFAV
[32] Avery Ching, Sital Kedia, 和 Shuojie Wang. Apache Spark @Scale: 一个 60 TB 以上的生产用例. engineering.fb.com, 2016 年 8 月. 存档于 perma.cc / F7R5 - YFAV

[33] Edward Kim.How ACH works: A developer perspective — Part 1. engineering.gusto.com, April 2014. Archived at perma.cc/F67P-VBLK
[33] Edward Kim. ACH 如何工作:开发者的视角 —— 第一部分. engineering.gusto.com, 2014 年 4 月. 已存档于 perma.cc / F67P - VBLK

[34] Zhamak Dehghani.How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh. martinfowler.com, May 2019. Archived at perma.cc/LN2L-L4VC
[34] Zhamak Dehghani. 如何从单体数据湖过渡到分布式数据网格. martinfowler.com, 2019 年 5 月. 已存档于 perma.cc / LN2L - L4VC

[35] Chris Riccomini.What the Heck is a Data Mesh?!cnr.sh, June 2021. Archived at perma.cc/NEJ2-BAX3
[35] Chris Riccomini. 数据网格到底是什么?! cnr.sh, 2021 年 6 月. 已存档于 perma.cc/NEJ2-BAX3

[36] Chad Sanderson.What Are Data Contracts? What Leaders Need to Know. gable.ai, January 2024. Archived at perma.cc/C6CJ-YC8B
[36] Chad Sanderson. 什么是数据合同?领导者需要了解什么。gable.ai,2024 年 1 月。存档于 perma.cc / C6CJ - YC8B

[37] Daniel Abadi.Data Fabric vs. Data Mesh: What’s the Difference?starburst.io, November 2021. Archived at perma.cc/RSK3-HXDK
[37] Daniel Abadi. 数据织物与数据网格:有何不同?starburst.io,2021 年 11 月。存档于 perma.cc/RSK3-HXDK

[38] Michael Armbrust, Ali Ghodsi, Reynold Xin, and Matei Zaharia.Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics. At 11th Annual Conference on Innovative Data Systems Research (CIDR), January 2021.
[38] Michael Armbrust, Ali Ghodsi, Reynold Xin, 和 Matei Zaharia. Lakehouse:新一代统一数据仓库和高级分析的开源平台。第 11 届创新数据系统研究年会(CIDR),2021 年 1 月。

[39] Leslie G. Valiant.A Bridging Model for Parallel Computation.Communications of the ACM, volume 33, issue 8, pages 103–111, August 1990.doi.1145/79173.79181
[39] Leslie G. Valiant. 并行计算的桥梁模型。ACM 通讯,第 33 卷,第 8 期,第 103-111 页,1990 年 8 月。doi.1145/79173.79181

[40] Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann, and Volker Markl. Spinning Fast Iterative Data Flows. Proceedings of the VLDB Endowment, volume 5, issue 11, pages 1268-1279, July 2012.doi.14778/2350229.2350245
[40] Stephan Ewen, Kostas Tzoumas, Moritz Kaufmann 和 Volker Markl. Spinning Fast Iterative Data Flows. VLDB 基金会会议录,第 5 卷,第 11 期,第 1268-1279 页,2012 年 7 月。doi.14778/2350229.2350245

[41] Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser, and Grzegorz Czajkowski.Pregel: A System for Large-Scale Graph Processing. At ACM International Conference on Management of Data (SIGMOD), June 2010.doi.1145/1807167.1807184
[41] Grzegorz Malewicz, Matthew H. Austern, Aart J. C. Bik, James C. Dehnert, Ilan Horn, Naty Leiser 和 Grzegorz Czajkowski. Pregel: 一种大规模图处理系统。在 ACM 国际数据管理会议(SIGMOD)上,2010 年 6 月。doi.1145/1807167.1807184

[42] Richard MacManus.OpenAI Chats about Scaling LLMs at Anyscale’s Ray Summit. thenewstack.io, September 2023. Archived at perma.cc/YJD6-KUXU
[42] Richard MacManus. OpenAI 在 Anyscale 的 Ray Summit 上讨论扩展 LLMs。thenewstack.io,2023 年 9 月。存档于 perma.cc/YJD6-KUXU

[43] Jay Kreps.Why Local State is a Fundamental Primitive in Stream Processing. oreilly.com, July 2014. Archived at perma.cc/P8HU-R5LA
[43] Jay Kreps. 为什么本地状态是流处理中的基本原语。oreilly.com,2014 年 7 月。存档于 perma.cc / P8HU - R5LA

[44] Félix GV.Open Sourcing Venice – LinkedIn’s Derived Data Platform. linkedin.com, September 2022. Archived at archive.org
[44] Félix GV. 开源威尼斯 —— 领英的衍生数据平台。linkedin.com,2022 年 9 月。存档于 archive.org