Chapter 12. Stream Processing 第 12 章. 流处理#
A complex system that works is invariably found to have evolved from a simple system that works. The inverse proposition also appears to be true: A complex system designed from scratch never works and cannot be made to work.
一个能够正常工作的复杂系统,往往是从一个能够正常工作的简单系统发展而来的。这个逆命题似乎也成立:从头设计的复杂系统永远不会工作,也无法使其工作。John Gall, Systemantics (1975)
约翰・加尔,《系统论》(1975 年)
In Chapter 11 we discussed batch processing—techniques that read a set of files as input and produce a new set of output files. The output is a form of derived data; that is, a dataset that can be recreated by running the batch process again if necessary. We saw how this simple but powerful idea can be used to create search indexes, recommendation systems, analytics, and more.
在第 11 章中,我们讨论了批处理 —— 读取一组文件作为输入并生成一组新输出文件的技术。输出是一种衍生数据;也就是说,如果需要,可以通过再次运行批处理过程来重新创建的数据集。我们看到这个简单但强大的想法如何被用来创建搜索索引、推荐系统、分析系统等。
However, one big assumption remained throughout Chapter 11: namely, that the input is bounded—i.e., of a known and finite size—so the batch process knows when it has finished reading its input. For example, the sorting operation that is central to MapReduce must read its entire input before it can start producing output: it could happen that the very last input record is the one with the lowest key, and thus needs to be the very first output record, so starting the output early is not an option.
然而,在第 11 章中始终存在一个大的假设:即输入是有界的 —— 也就是说,具有已知和有限的尺寸 —— 因此批处理过程知道何时已经完成读取其输入。例如,MapReduce 的核心排序操作必须在其能够开始产生输出之前读取其全部输入:可能会发生的情况是,最后一个输入记录恰好是具有最低键值的记录,因此需要成为第一个输出记录,所以提前开始输出并不是一个选项。
In reality, a lot of data is unbounded because it arrives gradually over time: your users produced data yesterday and today, and they will continue to produce more data tomorrow. Unless you go out of business, this process never ends, and so the dataset is never “complete” in any meaningful way [1]. Thus, batch processors must artificially divide the data into chunks of fixed duration: for example, processing a day’s worth of data at the end of every day, or processing an hour’s worth of data at the end of every hour.
实际上,大量数据是无界的,因为它们会随着时间的推移逐渐到来:你的用户昨天和今天产生了数据,并且他们将继续在明天产生更多数据。除非你倒闭,这个流程永远不会结束,因此数据集在任何有意义的层面上都不会 “完成”[1]。因此,批处理程序必须人为地将数据分成固定时长的块:例如,在每天结束时处理当天的数据,或在每小时结束时处理当小时的数。
The problem with daily batch processes is that changes in the input are only reflected in the output a day later, which is too slow for many impatient users. To reduce the delay, we can run the processing more frequently—say, processing a second’s worth of data at the end of every second—or even continuously, abandoning the fixed time slices entirely and simply processing every event as it happens. That is the idea behind stream processing.
每日批处理的问题在于输入的变化要等到一天后才反映在输出中,这对于许多急躁的用户来说太慢了。为了减少延迟,我们可以更频繁地运行处理 —— 比如说,在每秒结束时处理一秒钟的数据 —— 甚至可以连续运行,完全放弃固定的时间片,而只是实时处理每个事件。这就是流处理背后的想法。
In general, a “stream” refers to data that is incrementally made available over time. The concept appears in many places: in the stdin and stdout of Unix, programming languages (lazy lists) [2], filesystem APIs (such as Java’s FileInputStream), TCP connections, delivering audio and video over the internet, and so on.
一般来说,“流” 指的是随时间逐步提供的数据。这一概念出现在许多地方:Unix 的 stdin 和 stdout 、编程语言(惰性列表)[2]、文件系统 API(如 Java 的 FileInputStream )、TCP 连接、通过互联网传输音频和视频等。
In this chapter we will look at event streams as a data management mechanism: the unbounded, incrementally processed counterpart to the batch data we saw in the last chapter. We will first discuss how streams are represented, stored, and transmitted over a network. In “Databases and Streams” we will investigate the relationship between streams and databases. And finally, in “Processing Streams” we will explore approaches and tools for processing those streams continually, and ways that they can be used to build applications.
在本章中,我们将探讨事件流作为一种数据管理机制:它是我们上章所见批处理数据的无界、增量处理对应物。我们将首先讨论流的表示、存储和网络传输方式。在 “数据库与流” 中,我们将研究流与数据库之间的关系。最后,在 “流处理” 中,我们将探索持续处理这些流的方法和工具,以及它们如何用于构建应用程序。
Transmitting Event Streams 传输事件流#
In the batch processing world, the inputs and outputs of a job are files (perhaps on a distributed filesystem). What does the streaming equivalent look like?
在批处理世界中,一个作业的输入和输出是文件(可能位于分布式文件系统上)。流处理的等价物是什么样子?
When the input is a file (a sequence of bytes), the first processing step is usually to parse it into a sequence of records. In a stream processing context, a record is more commonly known as an event, but it is essentially the same thing: a small, self-contained, immutable object containing the details of something that happened at some point in time. An event usually contains a timestamp indicating when it happened according to a time-of-day clock (see “Monotonic Versus Time-of-Day Clocks”).
当输入是一个文件(字节序列)时,第一步通常是将它解析为一系列记录。在流处理环境中,记录更常被称为事件,但本质上是一样的:一个小的、自包含的、不可变的对象,包含某个时间点发生的事情的详细信息。事件通常包含一个时间戳,指示它根据一天中的时钟何时发生(参见 “单调时钟与一天中的时钟”)。
For example, the thing that happened might be an action that a user took, such as viewing a page or making a purchase. It might also originate from a machine, such as a periodic measurement from a temperature sensor, or a CPU utilization metric. In the example of “Batch Processing with Unix Tools”, each line of the web server log is an event.
例如,发生的事情可能是一个用户采取的行动,比如查看页面或进行购买。它也可能来自一台机器,比如温度传感器的周期性测量,或 CPU 利用率指标。在 “使用 Unix 工具进行批处理” 的示例中,每个 Web 服务器日志行都是一个事件。
An event may be encoded as a text string, or JSON, or perhaps in some binary form, as discussed in Chapter 5. This encoding allows you to store an event, for example by appending it to a file, inserting it into a relational table, or writing it to a document database. It also allows you to send the event over the network to another node in order to process it.
一个事件可以被编码为文本字符串、JSON,或者可能是某种二进制形式,如第 5 章所述。这种编码方式允许你存储一个事件,例如通过将其附加到文件、插入到关系表中,或写入到文档数据库。它还允许你将事件通过网络发送到另一个节点以便进行处理。
In batch processing, a file is written once and then potentially read by multiple jobs. Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple consumers (subscribers or recipients) [3]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.
在批处理中,一个文件被写入一次,然后可能被多个作业读取。类似地,在流处理术语中,一个事件由生产者(也称为发布者或发送者)生成一次,然后可能被多个消费者(订阅者或接收者)处理 [3]。在文件系统中,文件名标识了一组相关记录;在流处理系统中,相关事件通常被分组到一个主题或流中。
In principle, a file or database is sufficient to connect producers and consumers: a producer writes every event that it generates to the datastore, and each consumer periodically polls the datastore to check for events that have appeared since it last ran. This is essentially what a batch process does when it processes a day’s worth of data at the end of every day.
原则上,一个文件或数据库足以连接生产者和消费者:生产者将生成的每个事件写入数据存储,而每个消费者定期轮询数据存储以检查自上次运行以来出现的事件。这本质上就是批处理在每天结束时处理当天的数据时所做的事情。
However, when moving toward continual processing with low delays, polling becomes expensive if the datastore is not designed for this kind of usage. The more often you poll, the lower the percentage of requests that return new events, and thus the higher the overheads become. Instead, it is better for consumers to be notified when new events appear.
然而,当转向低延迟的持续处理时,如果数据存储未为此类使用而设计,轮询会变得昂贵。你轮询得越频繁,返回新事件请求的百分比就越低,因此开销就越大。相反,当新事件出现时,消费者被通知会更好。
Databases have traditionally not supported this kind of notification mechanism very well: relational databases commonly have triggers, which can react to a change (e.g., a row being inserted into a table), but they are very limited in what they can do and have been somewhat of an afterthought in database design [4]. Instead, specialized tools have been developed for the purpose of delivering event notifications.
数据库传统上并不很好地支持这种通知机制:关系型数据库通常具有触发器,可以响应变化(例如,一行数据被插入到表中),但它们能做的事情非常有限,并且在数据库设计中往往被视为次要考虑 [4]。相反,为了传递事件通知,人们开发了专门的工具。
Messaging Systems 消息系统#
A common approach for notifying consumers about new events is to use a messaging system: a producer sends a message containing the event, which is then pushed to consumers. We touched on these systems previously in “Event-Driven Architectures”, but we will now go into more detail.
通知消费者新事件的一种常见方法是使用消息系统:生产者发送包含事件的消息,然后消息被推送给消费者。我们之前在 “事件驱动架构” 中简要提到了这些系统,但现在我们将更详细地探讨。
A direct communication channel like a Unix pipe or TCP connection between producer and consumer would be a simple way of implementing a messaging system. However, most messaging systems expand on this basic model. In particular, Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
生产者和消费者之间像 Unix 管道或 TCP 连接这样的直接通信通道是实现消息系统的一种简单方式。然而,大多数消息系统都在这个基本模型上进行了扩展。特别是,Unix 管道和 TCP 将一个发送者精确地连接到一个接收者,而消息系统允许多个生产者节点向同一个主题发送消息,并允许多个消费者节点接收主题中的消息。
Within this publish/subscribe model, different systems take a wide range of approaches, and there is no one right answer for all purposes. To differentiate the systems, it is particularly helpful to ask the following two questions:
在这个发布 / 订阅模型中,不同的系统采用多种方法,并没有一个适用于所有情况的正确答案。为了区分这些系统,提出以下两个问题特别有帮助:
- *What happens if the producers send messages faster than the consumers can process them?*Broadly speaking, there are three options: the system can drop messages, buffer messages in a queue, or apply backpressure (also known as flow control; i.e., blocking the producer from sending more messages). For example, Unix pipes and TCP use backpressure: they have a small fixed-size buffer, and if it fills up, the sender is blocked until the recipient takes data out of the buffer (see “Network congestion and queueing”).
如果生产者发送消息的速度快于消费者处理消息的速度会怎样?总的来说,有三种选择:系统可以丢弃消息、在队列中缓冲消息,或应用背压(也称为流量控制;即阻止生产者发送更多消息)。例如,Unix 管道和 TCP 使用背压:它们有一个小固定大小的缓冲区,如果缓冲区满了,发送者会被阻塞,直到接收者从缓冲区中取出数据(参见 “网络拥塞和队列”)。
If messages are buffered in a queue, it is important to understand what happens as that queue grows. Does the system crash if the queue no longer fits in memory, or does it write messages to disk? In the latter case, how does the disk access affect the performance of the messaging system [5], and what happens when the disk fills up [6]?
如果消息在队列中缓冲,重要的是要了解当队列增长时会发生什么。如果队列不再适合内存,系统会崩溃吗,还是会将消息写入磁盘?在后一种情况下,磁盘访问如何影响消息系统的性能 [5],以及当磁盘满时会发生什么 [6]? - What happens if nodes crash or temporarily go offline—are any messages lost? As with databases, durability may require some combination of writing to disk and/or replication (see the sidebar “Replication and Durability”), which has a cost. If you can afford to sometimes lose messages, you can probably get higher throughput and lower latency on the same hardware.
如果节点崩溃或暂时离线会怎样 —— 会丢失任何消息吗?与数据库类似,持久性可能需要将数据写入磁盘和 / 或复制(参见边栏 “复制与持久性”),这会产生成本。如果你可以接受有时会丢失消息,那么你很可能在相同的硬件上获得更高的吞吐量和更低的延迟。
Whether message loss is acceptable depends very much on the application. For example, with sensor readings and metrics that are transmitted periodically, an occasional missing data point is perhaps not important, since an updated value will be sent a short time later anyway. However, beware that if a large number of messages are dropped, it may not be immediately apparent that the metrics are incorrect [7]. If you are counting events, it is more important that they are delivered reliably, since every lost message means incorrect counters.
消息丢失是否可接受很大程度上取决于应用。例如,对于周期性传输的传感器读数和指标,偶尔丢失的数据点可能并不重要,因为稍后仍会发送更新值。然而,要小心的是,如果大量消息丢失,可能不会立即发现指标错误 [7]。如果你在计数事件,更关键的是它们能可靠地送达,因为每丢失一条消息都意味着计数器不准确。
A nice property of the batch processing systems we explored in Chapter 11 is that they provide a strong reliability guarantee: failed tasks are automatically retried, and partial output from failed tasks is automatically discarded. This means the output is the same as if no failures had occurred, which helps simplify the programming model. Later in this chapter we will examine how we can provide similar guarantees in a streaming context.
第 11 章中我们探讨的批处理系统的一个优点是它们提供了强大的可靠性保证:失败的任务会自动重试,失败任务的部分输出会自动丢弃。这意味着输出与没有发生故障时相同,这有助于简化编程模型。本章稍后我们将探讨如何在流处理环境中提供类似的保证。
Direct messaging from producers to consumers 生产者到消费者的直接消息传递#
A number of messaging systems use direct network communication between producers and consumers without going via intermediary nodes:
一些消息系统使用生产者和消费者之间的直接网络通信,而不经过中间节点:
- UDP multicast is widely used in the financial industry for streams such as stock market feeds, where low latency is important [8]. Although UDP itself is unreliable, application-level protocols can recover lost packets (the producer must remember packets it has sent so that it can retransmit them on demand).
UDP 多播在金融行业中被广泛用于股票市场行情等需要低延迟的数据流 [8]。尽管 UDP 本身不可靠,但应用层协议可以恢复丢失的数据包(生产者必须记住已发送的数据包,以便按需重传)。 - Brokerless messaging libraries such as ZeroMQ and nanomsg take a similar approach, implementing publish/subscribe messaging over TCP or IP multicast.
Brokerless 消息库如 ZeroMQ 和 nanomsg 采用类似方法,通过 TCP 或 IP 多播实现发布 / 订阅消息传递。 - Some metrics collection agents, such as StatsD [9] use unreliable UDP messaging to collect metrics from all machines on the network and monitor them. (In the StatsD protocol, counter metrics are only correct if all messages are received; using UDP makes the metrics at best approximate [10]. See also “TCP Versus UDP”.)
一些指标收集代理如 StatsD [9] 使用不可靠的 UDP 消息来收集网络中所有机器的指标并进行监控。(在 StatsD 协议中,只有当所有消息都被接收时,计数指标才是正确的;使用 UDP 使得指标最多只是近似值 [10]。另见 “TCP 与 UDP”) - If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request (see “Dataflow Through Services: REST and RPC”) to push messages to the consumer. This is the idea behind webhooks [11], a pattern in which a callback URL of one service is registered with another service, and it makes a request to that URL whenever an event occurs.
如果消费者在网络中暴露了服务,生产者可以直接发起 HTTP 或 RPC 请求(参见 “通过服务进行数据流:REST 和 RPC”)将消息推送给消费者。这就是 webhooks [11] 背后的思想,这是一种模式,其中一个服务的回调 URL 注册到另一个服务,每当发生事件时,它就会向该 URL 发起请求。
Although these direct messaging systems work well in the situations for which they are designed, they generally require the application code to be aware of the possibility of message loss. The faults they can tolerate are quite limited: even if the protocols detect and retransmit packets that are lost in the network, they generally assume that producers and consumers are constantly online.
尽管这些直接消息系统在其设计场景中运行良好,但它们通常需要应用程序代码意识到消息丢失的可能性。它们能够容忍的错误非常有限:即使协议能够检测并重新传输网络中丢失的数据包,它们通常也假设生产者和消费者始终在线。
If a consumer is offline, it may miss messages that were sent while it is unreachable. Some protocols allow the producer to retry failed message deliveries, but this approach may break down if the producer crashes, losing the buffer of messages that it was supposed to retry.
如果消费者离线,它可能会错过在其无法访问期间发送的消息。一些协议允许生产者重试失败的消息投递,但如果生产者崩溃,这种做法可能会失败,导致它原本应该重试的消息缓冲区丢失。
Message brokers 消息代理#
A widely used alternative is to send messages via a message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams [12]. It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
一种广泛使用的替代方案是通过消息代理(也称为消息队列)发送消息,本质上这是一种针对处理消息流进行优化的数据库 [12]。它作为服务器运行,生产者和消费者以客户端的身份连接到它。生产者将消息写入代理,消费者通过从代理读取消息来接收它们。
By centralizing the data in the broker, these systems can more easily tolerate clients that come and go (connect, disconnect, and crash), and the question of durability is moved to the broker instead. Some message brokers only keep messages in memory, while others (depending on configuration) write them to disk so that they are not lost in case of a broker crash. Faced with slow consumers, they generally allow unbounded queueing (as opposed to dropping messages or backpressure), although this choice may also depend on the configuration.
通过将数据集中到代理中,这些系统可以更容易地容忍来来往往的客户(连接、断开连接和崩溃),并将持久性问题转移到代理。一些消息代理仅将消息保存在内存中,而另一些代理(取决于配置)将消息写入磁盘,以便在代理崩溃时不会丢失。面对慢消费者,它们通常允许无界队列(与丢弃消息或反向压力相反),尽管这个选择也可能取决于配置。
A consequence of queueing is also that consumers are generally asynchronous: when a producer sends a message, it normally only waits for the broker to confirm that it has buffered the message and does not wait for the message to be processed by consumers. The delivery to consumers will happen at some undetermined future point in time—often within a fraction of a second, but sometimes significantly later if there is a queue backlog.
队列的一个后果是消费者通常是异步的:当生产者发送消息时,它通常只等待代理确认它已缓存消息,而不会等待消费者处理消息。消息传递给消费者将在某个不确定的未来时间点发生 —— 通常在几分之一秒内,但如果队列有积压,有时会显著延迟。
Message brokers compared to databases 消息代理与数据库的比较#
Some message brokers can even participate in two-phase commit protocols using XA or JTA (see “Distributed Transactions Across Different Systems”). This feature makes them quite similar in nature to databases, although there are still important practical differences between message brokers and databases:
一些消息代理甚至可以参与使用 XA 或 JTA 的二阶段提交协议(参见 “跨不同系统的分布式事务”)。这一特性使它们在本质上与数据库非常相似,尽管消息代理和数据库之间仍然存在重要的实际差异:
- Databases usually keep data until it is explicitly deleted, whereas most message brokers automatically delete a message when it has been successfully delivered to its consumers. Such message brokers are not suitable for long-term data storage.
数据库通常保留数据直到被明确删除,而大多数消息代理在消息成功传递给消费者后自动删除消息。这种消息代理不适合长期数据存储。 - Since they quickly delete messages, most message brokers assume that their working set is fairly small—i.e., the queues are short. If the broker needs to buffer a lot of messages because the consumers are slow (perhaps spilling messages to disk if they no longer fit in memory), each individual message takes longer to process, and the overall throughput may degrade [5].
由于它们会快速删除消息,大多数消息代理都假设它们的工作集相对较小 —— 即队列较短。如果代理需要因为消费者速度慢而缓冲大量消息(也许当消息不再适合内存时会溢出到磁盘),每条单独的消息处理时间会更长,整体吞吐量可能会下降 [5]。 - Databases often support secondary indexes and various ways of searching for data using a query language, while message brokers often support some way of subscribing to a subset of topics matching some pattern. Both are essentially ways for a client to select the portion of the data that it wants to know about, but databases typically offer much more advanced query functionality.
数据库通常支持二级索引和多种使用查询语言搜索数据的方式,而消息代理通常支持某种订阅与某些模式匹配的主题子集的方法。两者本质上都是客户端选择它想要了解的数据部分的方式,但数据库通常提供更高级的查询功能。 - When querying a database, the result is typically based on a point-in-time snapshot of the data; if another client subsequently writes something to the database that changes the query result, the first client does not find out that its prior result is now outdated (unless it repeats the query, or polls for changes). By contrast, message brokers do not support arbitrary queries, but they do notify clients when data changes (i.e., when new messages become available).
在查询数据库时,结果通常基于数据的一个时间点快照;如果后续有其他客户端向数据库写入内容从而改变了查询结果,第一个客户端并不会发现其先前结果已经过时(除非它重复查询,或轮询变化)。相比之下,消息代理不支持任意查询,但它们会在数据发生变化时(即新消息可用时)通知客户端。
This is the traditional view of message brokers, which is encapsulated in standards like JMS [13] and AMQP [14] and implemented in software like RabbitMQ, ActiveMQ, HornetQ, Qpid, TIBCO Enterprise Message Service, IBM MQ, Azure Service Bus, and Google Cloud Pub/Sub [15]. Although it is possible to use databases as queues, tuning them to get good performance is not straightforward [16].
这是传统消息代理的观点,体现在 JMS [13] 和 AMQP [14] 等标准中,并在 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO Enterprise Message Service、IBM MQ、Azure Service Bus 和 Google Cloud Pub / Sub [15] 等软件中实现。尽管可以使用数据库作为队列,但要调整它们以获得良好性能并不直接 [16]。
Multiple consumers 多消费者#
When multiple consumers read messages in the same topic, two main patterns of messaging are used, as illustrated in Figure 12-1:
当多个消费者从同一主题读取消息时,如图 12-1 所示,会使用两种主要的消息模式:
Load balancing 负载均衡
Each message is delivered to one of the consumers, so the consumers can share the work of processing the messages in the topic. The broker may assign messages to consumers arbitrarily. This pattern is useful when the messages are expensive to process, and so you want to be able to add consumers to parallelize the processing. (In AMQP, you can implement load balancing by having multiple clients consuming from the same queue, and in JMS it is called a shared subscription.)
每条消息被分发给一个消费者,这样消费者可以共享处理主题中消息的工作。代理可能会任意地将消息分配给消费者。当消息处理成本较高,希望能够添加消费者来并行化处理时,这种模式很有用。(在 AMQP 中,可以通过多个客户端从同一队列消费来实现负载均衡,在 JMS 中这被称为共享订阅。)
Fan-out 发散
Each message is delivered to all of the consumers. Fan-out allows several independent consumers to each “tune in” to the same broadcast of messages, without affecting each other—the streaming equivalent of having several different batch jobs that read the same input file. (This feature is provided by topic subscriptions in JMS, and exchange bindings in AMQP.)
每条消息都会被传递给所有消费者。发散允许多个独立的消费者各自 “收听” 相同的消息广播,而不会相互影响 —— 这是多个不同的批处理作业读取相同输入文件的流式处理等效方式。(此功能由 JMS 中的主题订阅和 AMQP 中的交换绑定提供。)

The two patterns can be combined, for example using Kafka’s consumer groups feature. When a consumer group subscribes to a topic, each message in the topic is sent to one of the consumers in the group (load-balancing across the consumers in the group). If two separate consumer groups subscribe to the same topic, each message is sent to one consumer in each group (providing fan-out across consumer groups).
这两种模式可以结合使用,例如通过使用 Kafka 的消费者组功能。当消费者组订阅一个主题时,主题中的每条消息会被发送到该组中的一个消费者(在组内的消费者之间进行负载均衡)。如果两个独立的消费者组订阅同一个主题,每条消息会被发送到每个组中的一个消费者(在消费者组之间提供分发的功能)。
Acknowledgments and redelivery 确认和重发#
Consumers may crash at any time, so it could happen that a broker delivers a message to a consumer but the consumer never processes it, or only partially processes it before crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished processing a message so that the broker can remove it from the queue.
消费者可能会随时崩溃,因此可能会发生这样的情况:代理将消息发送给消费者,但消费者从未处理它,或者在崩溃前只部分处理了它。为了确保消息不会丢失,消息代理使用确认机制:客户端必须明确告诉代理它何时完成消息处理,以便代理可以从队列中移除该消息。
If the connection to a client is closed or times out without the broker receiving an acknowledgment, it assumes that the message was not processed, and therefore it delivers the message again to another consumer. (Note that it could happen that the message actually was fully processed, but the acknowledgment was lost in the network. Handling this case requires an atomic commit protocol, as discussed in “Exactly-once message processing”.)
如果客户端连接关闭或超时,而代理没有收到确认,它假定消息未被处理,因此将消息重新投递给另一个消费者。(注意,实际上消息可能已经完全处理,但确认在网络中丢失了。处理这种情况需要原子提交协议,如 “精确一次消息处理” 中所述。)
When combined with load balancing, this redelivery behavior has an interesting effect on the ordering of messages. In Figure 12-2, the consumers generally process messages in the order they were sent by producers. However, consumer 2 crashes while processing message m3, at the same time as consumer 1 is processing message m4. The unacknowledged message m3 is subsequently redelivered to consumer 1, with the result that consumer 1 processes messages in the order m4, m3,m5. Thus, m3 and m4 are not delivered in the same order as they were sent by producer 1.
与负载均衡结合时,这种重新投递行为对消息的顺序有有趣的影响。在图 12-2 中,消费者通常按生产者发送的顺序处理消息。然而,消费者 2 在处理消息 m3 时崩溃,同时消费者 1 正在处理消息 m4。未确认的消息 m3 随后被重新投递给消费者 1,结果消费者 1 按顺序处理消息 m4、m3、m5。因此,m3 和 m4 的投递顺序与生产者 1 发送的顺序不同。

Even if the message broker otherwise tries to preserve the order of messages (as required by both the JMS and AMQP standards), the combination of load balancing with redelivery inevitably leads to messages being reordered. To avoid this issue, you can use a separate queue per consumer (i.e., not use the load balancing feature). Message reordering is not a problem if messages are completely independent of each other, but it can be important if there are causal dependencies between messages, as we shall see later in the chapter.
即使消息代理在其他方面尝试保持消息的顺序(如 JMS 和 AMQP 标准所要求的),负载均衡与重传的组合也必然导致消息的顺序被打乱。为了避免这个问题,你可以为每个消费者使用一个单独的队列(即不使用负载均衡功能)。如果消息完全独立于彼此,消息顺序的混乱不是问题,但如果消息之间存在因果关系,这就会变得重要,正如我们将在本章后面看到的。
Redelivery can also result in wasted resources, resource starvation, or permanent blockages in a stream. A common scenario is a producer that improperly serializes a message; for example, by leaving out a required key in a JSON-encoded object. Any consumer that reads the message will expect the key, and fail if it’s missing. No acknowledgement is sent, so the broker will re-send the message, which will cause another consumer to fail. This loop repeats itself indefinitely. If the broker guarantees strong ordering, no further progress can be made. Brokers that allow message reordering can continue to make progress, but will waste resources on messages that will never be acknowledged.
重发也可能导致资源浪费、资源饥饿或流中的永久阻塞。常见的情况是一个生产者没有正确地序列化消息;例如,在 JSON 编码的对象中遗漏了一个必需的键。任何读取该消息的消费者都会期望这个键,如果缺失就会失败。不会发送确认信息,因此代理会重新发送消息,这会导致另一个消费者失败。这个循环会无限重复。如果代理保证强有序性,就无法再取得进一步进展。允许消息重新排序的代理可以继续取得进展,但会在那些永远不会被确认的消息上浪费资源。
Dead letter queues (DLQs) are used to handle this problem. Rather than keeping the message in the current queue and retrying forever, the message is moved to a different queue to unblock consumers [17,18]. Monitoring is usually set up on dead letter queues—any message in the queue is an error. Once a new message is detected, an operator can decide to permanently drop it, manually modify and re-produce the message, or fix consumer code to handle the message appropriately. DLQs are common in most queuing systems, but log-based messaging systems such as Apache Pulsar and stream processing systems such as Kafka Streams now support them as well [19].
死信队列(DLQs)用于处理这个问题。而不是将消息保留在当前队列中无限重试,消息会被移到不同的队列以解阻塞消费者 [17, 18]。通常会在死信队列上设置监控 —— 队列中的任何消息都是错误。一旦检测到新消息,操作员可以决定永久丢弃它,手动修改并重新生成消息,或修复消费者代码以适当处理消息。DLQs 在大多数队列系统中都很常见,但像 Apache Pulsar 这样的基于日志的消息系统和像 Kafka Streams 这样的流处理系统现在也支持它们 [19]。
Log-based Message Brokers 基于日志的消息代理#
Sending a packet over a network or making a request to a network service is normally a transient operation that leaves no permanent trace. Although it is possible to record it permanently (using packet capture and logging), we normally don’t think of it that way. Even message brokers that durably write messages to disk quickly delete them again after they have been delivered to consumers, because they are built around a transient messaging mindset.
在网络中发送数据包或向网络服务发起请求通常是瞬时的操作,不会留下永久痕迹。尽管可以永久记录它(使用数据包捕获和日志记录),但我们通常不会这样考虑。即使是那些将消息持久写入磁盘的消息代理,在消息交付给消费者后也会迅速删除它们,因为它们是基于瞬时消息思维构建的。
Databases and filesystems take the opposite approach: everything that is written to a database or file is normally expected to be permanently recorded, at least until someone explicitly chooses to delete it again.
数据库和文件系统采取了相反的方法:写入数据库或文件的所有内容通常预期会被永久记录,至少直到有人明确选择再次删除它。
This difference in mindset has a big impact on how derived data is created. A key feature of batch processes, as discussed in Chapter 11, is that you can run them repeatedly, experimenting with the processing steps, without risk of damaging the input (since the input is read-only). This is not the case with AMQP/JMS-style messaging: receiving a message is destructive if the acknowledgment causes it to be deleted from the broker, so you cannot run the same consumer again and expect to get the same result.
这种思维方式的差异对派生数据的创建有重大影响。正如第 11 章所述,批量处理的一个关键特性是你可以反复运行它们,实验处理步骤,而不用担心损坏输入(因为输入是只读的)。但对于 AMQP / JMS 风格的 messaging 来说就不是这样:如果确认导致消息从代理中删除,接收消息就是破坏性的,因此你不能再次运行相同的消费者并期望得到相同的结果。
If you add a new consumer to a messaging system, it typically only starts receiving messages sent after the time it was registered; any prior messages are already gone and cannot be recovered. Contrast this with files and databases, where you can add a new client at any time, and it can read data written arbitrarily far in the past (as long as it has not been explicitly overwritten or deleted by the application).
如果你向消息系统添加一个新的消费者,它通常只从注册时间之后开始接收消息;任何先前的消息都已经消失且无法恢复。对比一下文件和数据库,你可以在任何时候添加一个新的客户端,并且它可以读取任意久远过去的写入数据(只要它没有被应用明确覆盖或删除)。
Why can we not have a hybrid, combining the durable storage approach of databases with the low-latency notification facilities of messaging? This is the idea behind log-based message brokers.
为什么我们不能将数据库的持久化存储方式与消息系统的低延迟通知功能结合起来呢?这就是基于日志的消息代理背后的想法。
Using logs for message storage 使用日志存储消息#
A log is simply an append-only sequence of records on disk. We previously discussed logs in the context of log-structured storage engines and write-ahead logs in Chapter 4, in the context of replication in Chapter 6, and as a form of consensus in Chapter 10.
日志本质上只是磁盘上的一个仅追加的记录序列。我们之前在讨论日志结构化存储引擎的上下文中(第 4 章)、在讨论第 6 章中的复制上下文中以及在第 10 章中作为共识的一种形式都讨论过日志。
The same structure can be used to implement a message broker: a producer sends a message by appending it to the end of the log, and a consumer receives messages by reading the log sequentially. If a consumer reaches the end of the log, it waits for a notification that a new message has been appended. The Unix tool tail -f, which watches a file for data being appended, essentially works like this.
同样的结构可以用来实现消息代理:生产者通过将消息追加到日志的末尾来发送消息,消费者通过顺序读取日志来接收消息。如果消费者到达了日志的末尾,它会等待一个新消息已被追加的通知。Unix 工具 tail -f ,它监视文件以检测数据被追加,本质上就是这样工作的。
In order to scale to higher throughput than a single disk can offer, the log can be sharded (in the sense of Chapter 7). Different shards can then be hosted on different machines, making each shard a separate log that can be read and written independently from other shards. A topic can then be defined as a group of shards that all carry messages of the same type. This approach is illustrated in Figure 12-3.
为了实现比单个磁盘更高的吞吐量,日志可以进行分片(如第 7 章所述)。然后,不同的分片可以部署在不同的机器上,使每个分片成为一个独立的日志,可以独立于其他分片进行读写。然后可以将主题定义为一组承载相同类型消息的分片。这种方法如图 12-3 所示。
Within each shard, which Kafka calls a partition, the broker assigns a monotonically increasing sequence number, or offset, to every message (in Figure 12-3, the numbers in boxes are message offsets). Such a sequence number makes sense because a partition (shard) is append-only, so the messages within a partition are totally ordered. There is no ordering guarantee across different partitions.
在每个分片中(Kafka 称之为分区),代理为每条消息分配一个单调递增的序列号,或偏移量(在图 12-3 中,方框中的数字是消息偏移量)。这样的序列号是有意义的,因为分区(分片)是仅追加的,所以分区内的消息是完全有序的。不同分区之间没有排序保证。

Apache Kafka [20] and Amazon Kinesis Streams are log-based message brokers that work like this. Google Cloud Pub/Sub is architecturally similar but exposes a JMS-style API rather than a log abstraction [15]. Even though these message brokers write all messages to disk, they are able to achieve throughput of millions of messages per second by sharding across multiple machines, and fault tolerance by replicating messages [21,22].
Apache Kafka [20] 和 Amazon Kinesis Streams 是基于日志的消息代理,它们的工作方式如下。Google Cloud Pub / Sub 在架构上类似,但提供的是 JMS 风格的 API 而非日志抽象 [15]。尽管这些消息代理将所有消息写入磁盘,但它们通过跨多台机器分片和通过复制消息来实现每秒数百万条消息的吞吐量,以及容错性 [21, 22]。
Logs compared to traditional messaging 日志与传统消息的比较#
The log-based approach trivially supports fan-out messaging, because several consumers can independently read the log without affecting each other—reading a message does not delete it from the log. To achieve load balancing across a group of consumers, instead of assigning individual messages to consumer clients, the broker can assign entire shards to nodes in the consumer group.
基于日志的方法简单地支持了扇出式消息传递,因为多个消费者可以独立地读取日志而不相互影响 —— 读取消息不会将其从日志中删除。为了在消费者组之间实现负载均衡,而不是将单个消息分配给消费者客户端,代理可以将整个分片分配给消费者组中的节点。
Each client then consumes all the messages in the shards it has been assigned. Typically, when a consumer has been assigned a log shard, it reads the messages in the shard sequentially, in a straightforward single-threaded manner. This coarse-grained load balancing approach has some downsides:
然后,每个客户端消费它被分配的分片中所有的消息。通常,当一个消费者被分配了一个日志分片时,它会按顺序、以简单的单线程方式读取分片中的消息。这种粗粒度的负载均衡方法有一些缺点:
- The number of nodes sharing the work of consuming a topic can be at most the number of log shards in that topic, because messages within the same shard are delivered to the same node. (It’s possible to create a load balancing scheme in which two consumers share the work of processing a shard by having both read the full set of messages, but one of them only considers messages with even-numbered offsets while the other deals with the odd-numbered offsets. Alternatively, you could spread message processing over a thread pool, but that approach complicates consumer offset management. In general, single-threaded processing of a shard is preferable, and parallelism can be increased by using more shards.)
共享消费主题工作的节点数量最多不能超过该主题中的日志分片数量,因为在同一个分片中,消息会发送到同一个节点。(可以创建一种负载均衡方案,其中两个消费者通过各自读取完整消息集来共享处理一个分片的工作,但其中一个只考虑偶数偏移量的消息,而另一个处理奇数偏移量的消息。或者,你可以将消息处理分散到线程池中,但这种方法会使消费者偏移量管理变得复杂。通常,单线程处理一个分片是更可取的,而通过使用更多分片可以提高并行性。) - If a single message is slow to process, it holds up the processing of subsequent messages in that shard (a form of head-of-line blocking; see “Describing Performance”).
如果单个消息处理缓慢,它会阻塞该分片(切片)中后续消息的处理(一种称为 “队首阻塞” 的情况;参见 “描述性能”)。
Thus, in situations where messages may be expensive to process and you want to parallelize processing on a message-by-message basis, and where message ordering is not so important, the JMS/AMQP style of message broker is preferable. On the other hand, in situations with high message throughput, where each message is fast to process and where message ordering is important, the log-based approach works very well [23,24]. However, the distinction between the two architectures is being blurred as log-based messaging systems such as Kafka now support JMS/AMQP style consumer groups, which allow multiple consumers to receive messages from the same partition [25,26].
因此,在消息处理成本较高且需要按消息并行处理,而消息顺序又不那么重要的情况下,JMS/AMQP 风格的消息代理更为合适。另一方面,在高消息吞吐量的情况下,每条消息处理速度快且消息顺序重要时,基于日志的方法效果非常好 [23, 24]。然而,随着 Kafka 等基于日志的消息系统现在支持 JMS/AMQP 风格的消费者组,这两种架构之间的区别正在变得模糊,这些消费者组允许多个消费者从同一个分片接收消息 [25, 26]。
Since sharded logs typically preserve message ordering only within a single shard, all messages that need to be ordered consistently need to be routed to the same shard. For example, an application may require that the events relating to one particular user appear in a fixed order. This can be achieved by choosing the shard for an event based on the user ID of that event (in other words, making the user ID the partition key).
由于分片日志通常仅在单个分片中保留消息顺序,所有需要一致排序的消息都需要被路由到同一个分片。例如,一个应用程序可能要求与特定用户相关的事件按固定顺序出现。这可以通过根据事件的用户 ID 选择事件所在的分片来实现(换句话说,将用户 ID 作为分区键)。
Consumer offsets 消费者偏移量#
Consuming a shard sequentially makes it easy to tell which messages have been processed: all messages with an offset less than a consumer’s current offset have already been processed, and all messages with a greater offset have not yet been seen. Thus, the broker does not need to track acknowledgments for every single message—it only needs to periodically record the consumer offsets. The reduced bookkeeping overhead and the opportunities for batching and pipelining in this approach help increase the throughput of log-based systems.
顺序消费一个分片可以轻松判断哪些消息已被处理:所有偏移量小于消费者当前偏移量的消息已经处理过,而所有偏移量更大的消息尚未被处理。因此,代理不需要为每条消息跟踪确认 —— 它只需要定期记录消费者的偏移量。这种方法的减少记账开销以及批量处理和流水线的机会有助于提高基于日志的系统的吞吐量。
This offset is in fact very similar to the log sequence number that is commonly found in single-leader database replication, and which we discussed in “Setting Up New Followers”. In database replication, the log sequence number allows a follower to reconnect to a leader after it has become disconnected, and resume replication without skipping any writes. Exactly the same principle is used here: the message broker behaves like a leader database, and the consumer like a follower.
这个偏移量实际上与单主数据库复制中常见的日志序列号非常相似,我们在 “设置新从节点” 中也讨论过。在数据库复制中,日志序列号允许从节点在断开连接后重新连接到主节点,并继续复制而不遗漏任何写入。这里使用了完全相同的原则:消息代理表现得像主数据库,消费者表现得像从节点。
If a consumer node fails, another node in the consumer group is assigned the failed consumer’s shards, and it starts consuming messages at the last recorded offset. If the consumer had processed subsequent messages but not yet recorded their offset, those messages will be processed a second time upon restart. We will discuss ways of dealing with this issue later in the chapter.
如果消费者节点失败,消费者组中的另一个节点会被分配到失败消费者的分片,并从最后记录的偏移量开始消费消息。如果消费者已经处理了后续消息但尚未记录它们的偏移量,这些消息将在重启时被再次处理。我们将在本章后面讨论处理此问题的方法。
Disk space usage 磁盘空间使用#
If you only ever append to the log, you will eventually run out of disk space. To reclaim disk space, the log is actually divided into segments, and from time to time old segments are deleted or moved to archive storage. (We’ll discuss a more sophisticated way of freeing disk space in “Log compaction”.)
如果你只追加到日志中,你最终会耗尽磁盘空间。为了回收磁盘空间,日志实际上被分成多个段,并且定期删除旧段或将它们移动到归档存储。(我们将在 “日志压缩” 中讨论一种更复杂的方法来释放磁盘空间。)
This means that if a slow consumer cannot keep up with the rate of messages, and it falls so far behind that its consumer offset points to a deleted segment, it will miss some of the messages. Effectively, the log implements a bounded-size buffer that discards old messages when it gets full, also known as a circular buffer or ring buffer. However, since that buffer is on disk, it can be quite large.
这意味着如果慢消费者无法跟上消息的速率,并且落后得太多以至于其消费者偏移点指向一个已删除的段,它将错过一些消息。实际上,日志实现了一个有界大小的缓冲区,当它满时会丢弃旧消息,也称为循环缓冲区或环形缓冲区。然而,由于该缓冲区在磁盘上,它可以相当大。
Let’s do a back-of-the-envelope calculation. At the time of writing, a typical large hard drive has a capacity of 20 TB and a sequential write throughput of 250 MB/s. If you are writing messages at the fastest possible rate, it takes about 22 hours until the drive is full and you need to start deleting the oldest messages. That means a disk-based log can always buffer at least 22 hours worth of messages, even if you have many disks with many machines (having more disks increases both the available space and the total write bandwidth). In practice, deployments rarely use the full write bandwidth of the disk, so the log can typically keep a buffer of several days’ or even weeks’ worth of messages.
让我们做一个粗略的计算。在撰写本文时,一个典型的硬盘驱动器容量为 20 TB,顺序写入吞吐量为 250 MB / s。如果你以最快的速率写入消息,大约需要 22 小时才能填满驱动器并需要开始删除最旧的消息。这意味着基于磁盘的日志始终可以缓冲至少 22 小时的消息量,即使你有许多磁盘和许多机器(拥有更多磁盘会增加可用空间和总写入带宽)。在实践中,部署很少使用磁盘的完整写入带宽,因此日志通常可以保持几天甚至几周的消息缓冲区。
Many log-based message brokers now store messages in object storage to increase their storage capacity, similarly to databases as we saw in “Databases backed by object storage”. Message brokers such as Apache Kafka and Redpanda serve older messages from object storage as part of their tiered storage. Others, such as WarpStream, Confluent Freight, and Bufstream store all of their data in the object store. In addition to cost-efficiency, this architecture also makes data integration easier: messages in object storage are stored as Iceberg tables, which enable batch and data warehouse job execution directly on the data without having to copy it into another system.
许多基于日志的消息代理现在将消息存储在对象存储中以增加其存储容量,这与我们在 “基于对象存储的数据库” 中看到的数据库类似。Apache Kafka 和 Redpanda 等消息代理从对象存储中提供旧消息,作为其分层存储的一部分。其他如 WarpStream、Confluent Freight 和 Bufstream 的消息代理则将所有数据存储在对象存储中。除了成本效益外,这种架构还使数据集成更加容易:对象存储中的消息存储为 Iceberg 表,这可以直接在数据上执行批处理和数据仓库作业,而无需将其复制到另一个系统。
When consumers cannot keep up with producers 当消费者无法跟上生产者#
At the beginning of “Messaging Systems” we discussed three choices of what to do if a consumer cannot keep up with the rate at which producers are sending messages: dropping messages, buffering, or applying backpressure. In this taxonomy, the log-based approach is a form of buffering with a large but fixed-size buffer (limited by the available disk space).
在 “消息系统” 的开头,我们讨论了当消费者无法跟上生产者发送消息的速度时,有三种选择:丢弃消息、缓冲或应用背压。在这个分类中,基于日志的方法是一种具有大但固定大小的缓冲区(受可用磁盘空间限制)的缓冲方式。
If a consumer falls so far behind that the messages it requires are older than what is retained on disk, it will not be able to read those messages—so the broker effectively drops old messages that go back further than the size of the buffer can accommodate. You can monitor how far a consumer is behind the head of the log, and raise an alert if it falls behind significantly. As the buffer is large, there is enough time for a human operator to fix the slow consumer and allow it to catch up before it starts missing messages.
如果消费者落后太多,以至于它需要的信息比磁盘上保留的信息还要旧,那么它将无法读取这些信息 —— 因此,代理会有效地丢弃超出缓冲区大小所能容纳范围的老消息。你可以监控消费者落后日志头部有多远,并在其落后显著时发出警报。由于缓冲区很大,有足够的时间让人工操作员修复慢消费者,并允许它赶上,从而在开始丢失消息之前不会错过信息。
Even if a consumer does fall too far behind and starts missing messages, only that consumer is affected; it does not disrupt the service for other consumers. This fact is a big operational advantage: you can experimentally consume a production log for development, testing, or debugging purposes, without having to worry much about disrupting production services. When a consumer is shut down or crashes, it stops consuming resources—the only thing that remains is its consumer offset.
即使消费者落后太多而开始错过消息,也只会影响该消费者;它不会干扰其他消费者的服务。这一事实是一个巨大的运营优势:你可以为了开发、测试或调试目的,实验性地消费生产日志,而无需过多担心会中断生产服务。当消费者被关闭或崩溃时,它会停止消耗资源 —— 唯一剩下的就是它的消费者偏移量。
This behavior also contrasts with traditional message brokers, where you need to be careful to delete any queues whose consumers have been shut down—otherwise they continue unnecessarily accumulating messages and taking away memory from consumers that are still active.
这种行为也与传统的消息代理形成对比,在传统的消息代理中,你需要小心删除那些消费者已被关闭的队列 —— 否则它们会继续不必要地累积消息,并占用仍然活跃的消费者的内存。
Replaying old messages 重放旧消息#
We noted previously that with AMQP- and JMS-style message brokers, processing and acknowledging messages is a destructive operation, since it causes the messages to be deleted on the broker. On the other hand, in a log-based message broker, consuming messages is more like reading from a file: it is a read-only operation that does not change the log.
我们之前提到,在使用 AMQP 和 JMS 风格消息代理时,处理和确认消息是一种破坏性操作,因为它会导致消息在代理上被删除。另一方面,在基于日志的消息代理中,消费消息更像是从文件中读取:它是一种只读操作,不会改变日志。
The only side effect of processing, besides any output of the consumer, is that the consumer offset moves forward. But the offset is under the consumer’s control, so it can easily be manipulated if necessary: for example, you can start a copy of a consumer with yesterday’s offsets and write the output to a different location, in order to reprocess the last day’s worth of messages. You can repeat this any number of times, varying the processing code.
处理唯一的副作用,除了消费者产生的任何输出外,就是消费者偏移量向前移动。但偏移量由消费者控制,因此如果需要可以轻易地操纵它:例如,你可以使用昨天的偏移量启动消费者副本,并将输出写入不同的位置,以重新处理最后一天的消息。你可以重复此操作任意次数,并更改处理代码。
This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organization [27].
这方面使基于日志的消息传递更类似于上一章的批处理过程,其中通过可重复的转换过程将派生数据与输入数据清晰分离。它允许更多实验,并更容易从错误和错误中恢复,使其成为组织内集成数据流的良好工具 [27]。
Databases and Streams 数据库和流#
We have drawn some comparisons between message brokers and databases. Even though they have traditionally been considered separate categories of tools, we saw that log-based message brokers have been successful in taking ideas from databases and applying them to messaging. We can also go in reverse: take ideas from messaging and streams, and apply them to databases.
我们在消息代理和数据库之间进行了一些比较。尽管它们传统上被认为是不同类别的工具,但我们看到基于日志的消息代理已经成功地将数据库的思想应用到消息传递中。我们也可以反过来:从消息和流中获取思想,并将它们应用到数据库中。
One approach is to use an event stream as the system of record for storing data (see “Systems of Record and Derived Data”). This is what happens in event sourcing, which we discussed in “Event Sourcing and CQRS”: instead of storing data in a data model that is mutated by updating and deleting, you can model every state change as an immutable event, and write it to an append-only log. Any read-optimized materialized views are derived from these events. Log-based message brokers (configured to never delete old events) are well suited for event sourcing since they use append-only storage, and they can notify consumers about new events with low latency.
一种方法是使用事件流作为存储数据的系统记录(参见 “系统记录和派生数据”)。这就是事件溯源(我们在 “事件溯源和 CQRS” 中讨论过)中发生的情况:你不必将数据存储在可通过更新和删除进行修改的数据模型中,而是可以将每个状态变化建模为不可变事件,并将其写入仅追加日志。任何读优化的事实材料视图都从这些事件派生。基于日志的消息代理(配置为永不删除旧事件)非常适合事件溯源,因为它们使用仅追加存储,并且可以以低延迟通知消费者有关新事件。
But you don’t have to go as far as adopting event sourcing; even with mutable data models, event streams are useful for databases. In fact, every write to a database is an event that can be captured, stored, and processed. The connection between databases and streams runs deeper than just the physical storage of logs on disk—it is quite fundamental.
但你不必采用事件溯源;即使使用可变数据模型,事件流对数据库也很有用。事实上,对数据库的每次写入都是一个可以被捕获、存储和处理的事件。数据库和流之间的联系比仅仅在磁盘上存储日志的物理存储要更深层 —— 这非常根本。
For example, a replication log (see “Implementation of Replication Logs”) is a stream of database write events, produced by the leader as it processes transactions. The followers apply that stream of writes to their own copy of the database and thus end up with an accurate copy of the same data. The events in the replication log describe the data changes that occurred.
例如,复制日志(参见 “复制日志的实现”)是一系列数据库写入事件,由领导者处理事务时产生。从属节点将这些写入事件应用到它们自己的数据库副本上,从而最终得到相同数据的准确副本。复制日志中的事件描述了发生的数据变化。
We also came across the state machine replication principle in “Using shared logs”, which states: if every event represents a write to the database, and every replica processes the same events in the same order, then the replicas will all end up in the same final state. (Processing an event is assumed to be a deterministic operation.) It’s just another case of event streams!
我们在 “使用共享日志” 中也遇到了状态机复制原则,该原则指出:如果每个事件都代表对数据库的写入,并且每个副本都以相同的顺序处理相同的事件,那么所有副本最终都会处于相同的状态。(假设处理事件是一个确定性操作。)这又是事件流的一个例子!
In this section we will first look at a problem that arises in heterogeneous data systems, and then explore how we can solve it by bringing ideas from event streams to databases.
在本节中,我们将首先探讨异构数据系统中出现的一个问题,然后探索如何通过将事件流的思想引入数据库来解决这个问题。
Keeping Systems in Sync 保持系统同步#
As we have seen throughout this book, there is no single system that can satisfy all data storage, querying, and processing needs. In practice, most nontrivial applications need to combine several different technologies in order to satisfy their requirements: for example, using an OLTP database to serve user requests, a cache to speed up common requests, a full-text index to handle search queries, and a data warehouse for analytics. Each of these has its own copy of the data, stored in its own representation that is optimized for its own purposes.
正如本书所述,没有单一系统能够满足所有数据存储、查询和处理需求。在实际应用中,大多数非平凡应用需要结合多种不同技术来满足其要求:例如,使用 OLTP 数据库处理用户请求,使用缓存加速常见请求,使用全文索引处理搜索查询,以及使用数据仓库进行数据分析。这些技术各自拥有数据的副本,并以各自优化的方式存储。
As the same or related data appears in several different places, they need to be kept in sync with one another: if an item is updated in the database, it also needs to be updated in the cache, search indexes, and data warehouse. With data warehouses this synchronization is usually performed by ETL processes (see “Data Warehousing”), often by taking a full copy of a database, transforming it, and bulk-loading it into the data warehouse—in other words, a batch process. Similarly, we saw in “Batch Use Cases” how search indexes, recommendation systems, and other derived data systems might be created using batch processes.
由于相同或相关数据出现在多个不同位置,它们需要保持同步:如果数据库中的项目被更新,它也需要在缓存、搜索索引和数据仓库中更新。对于数据仓库,这种同步通常由 ETL 过程(参见 “数据仓库”)执行,通常是通过完整复制数据库、转换数据,然后批量加载到数据仓库中 —— 换句话说,这是一个批处理过程。类似地,我们在 “批处理用例” 中看到了搜索索引、推荐系统和其他衍生数据系统如何使用批处理过程创建。
If periodic full database dumps are too slow, an alternative that is sometimes used is dual writes, in which the application code explicitly writes to each of the systems when data changes: for example, first writing to the database, then updating the search index, then invalidating the cache entries (or even performing those writes concurrently).
如果周期性的完整数据库转储太慢,有时会使用一种替代方案,即双重写入,在这种方案中,当数据发生变化时,应用程序代码会明确地写入到每个系统中:例如,首先写入数据库,然后更新搜索索引,然后使缓存条目失效(甚至可以并行执行这些写入)。
However, dual writes have some serious problems, one of which is a race condition illustrated in Figure 12-4. In this example, two clients concurrently want to update an item X: client 1 wants to set the value to A, and client 2 wants to set it to B. Both clients first write the new value to the database, then write it to the search index. Due to unlucky timing, the requests are interleaved: the database first sees the write from client 1 setting the value to A, then the write from client 2 setting the value to B, so the final value in the database is B. The search index first sees the write from client 2, then client 1, so the final value in the search index is A. The two systems are now permanently inconsistent with each other, even though no error occurred.
然而,双重写入存在一些严重问题,其中之一是图 12-4 所示的条件竞争。在这个例子中,两个客户端同时想要更新一个项目 X:客户端 1 想要将其值设置为 A,而客户端 2 想要设置为 B。两个客户端首先将新值写入数据库,然后写入搜索索引。由于不巧的时间安排,请求被交错处理:数据库首先看到客户端 1 将值设置为 A 的写入,然后是客户端 2 将值设置为 B 的写入,因此数据库中的最终值是 B。搜索索引首先看到客户端 2 的写入,然后是客户端 1,因此搜索索引中的最终值是 A。这两个系统现在永久性地不一致,即使没有发生错误。

Unless you have some additional concurrency detection mechanism, such as the version vectors we discussed in “Detecting Concurrent Writes”, you will not even notice that concurrent writes occurred—one value will simply silently overwrite another value.
除非你有一些额外的并发检测机制,比如我们在 “检测并发写入” 中讨论的版本向量,否则你甚至不会注意到并发写入发生了 —— 一个值将简单地无声地覆盖另一个值。
Another problem with dual writes is that one of the writes may fail while the other succeeds. This is a fault-tolerance problem rather than a concurrency problem, but it also has the effect of the two systems becoming inconsistent with each other. Ensuring that they either both succeed or both fail is a case of the atomic commit problem, which is expensive to solve (see “Two-Phase Commit (2PC)”).
双重写入的另一个问题是,其中一个写入可能失败而另一个成功。这是一个容错问题,而不是并发问题,但它也会导致两个系统变得彼此不一致。确保它们要么都成功要么都失败,是一个原子提交问题,解决这个问题成本很高(参见 “两阶段提交(2PC)”)。
If you only have one replicated database with a single leader, then that leader determines the order of writes, so the state machine replication approach works among replicas of the database. However, in Figure 12-4 there isn’t a single leader: the database may have a leader and the search index may have a leader, but neither follows the other, and so conflicts can occur (see “Multi-Leader Replication”).
如果你只有一个具有单一领导者的复制数据库,那么那个领导者决定了写入的顺序,因此状态机复制方法在数据库副本之间有效。然而,在图 12-4 中并没有单一领导者:数据库可能有领导者,搜索索引也可能有领导者,但它们彼此不遵循,因此可能会发生冲突(参见 “多领导者复制”)。
The situation would be better if there really was only one leader—for example, the database—and if we could make the search index a follower of the database. But is this possible in practice?
如果真的只有一个领导者 —— 例如数据库 —— 而且如果我们能让搜索索引成为数据库的跟随者,情况会更好。但这在实践中可能吗?
Change Data Capture 变更数据捕获#
The problem with most databases’ replication logs is that they have long been considered to be an internal implementation detail of the database, not a public API. Clients are supposed to query the database through its data model and query language, not parse the replication logs and try to extract data from them.
大多数数据库的复制日志存在的问题是,它们长期以来被视为数据库的内部实现细节,而非公共 API。客户端应该通过数据库的数据模型和查询语言来查询数据库,而不是解析复制日志并试图从中提取数据。
For decades, many databases simply did not have a documented way of getting the log of changes written to them. For this reason it was difficult to take all the changes made in a database and replicate them to a different storage technology such as a search index, cache, or data warehouse.
几十年来,许多数据库根本没有一种记录其更改的文档化方法。因此,将数据库中所有更改复制到不同的存储技术(如搜索索引、缓存或数据仓库)变得非常困难。
More recently, there has been growing interest in change data capture (CDC), which is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems [28]. CDC is especially interesting if changes are made available as a stream, immediately as they are written.
最近,变更数据捕获(CDC)越来越受到关注,CDC 是观察数据库中所有数据更改并将其以可复制到其他系统的形式提取出来的过程 [28]。如果更改可以立即以流的形式提供,那么 CDC 就特别有趣。
For example, you can capture the changes in a database and continually apply the same changes to a search index. If the log of changes is applied in the same order, you can expect the data in the search index to match the data in the database. The search index and any other derived data systems are just consumers of the change stream.
例如,你可以捕获数据库中的变化,并持续将这些变化应用到搜索索引上。如果变化日志按相同顺序应用,你可以预期搜索索引中的数据与数据库中的数据一致。搜索索引和其他任何衍生数据系统都只是变化流的消费者。
Figure 12-5 shows how the concurrency problem of Figure 12-4 is solved with CDC. Even though the two requests to set X to A and B respectively arrive concurrently at the database, the database decides on some order in which to execute them, and writes them to its replication log in that order. The search index picks them up and applies them in the same order. If you need the data in another system, such as a data warehouse, you can simply add it as another consumer of the CDC event stream.
图 12-5 展示了如何使用变更数据捕获(CDC)解决图 12-4 中的并发问题。尽管将 X 设置为 A 和 B 的两个请求同时到达数据库,但数据库会决定它们的执行顺序,并按该顺序将它们写入其复制日志。搜索索引获取这些变化并按相同顺序应用它们。如果你需要将数据应用到另一个系统,例如数据仓库,你只需将其作为 CDC 事件流的另一个消费者添加即可。

Implementing change data capture 实现变更数据捕获#
We can call the log consumers derived data systems, as discussed in “Systems of Record and Derived Data”: the data stored in the search index and the data warehouse is just another view onto the data in the system of record. Change data capture is a mechanism for ensuring that all changes made to the system of record are also reflected in the derived data systems so that the derived systems have an accurate copy of the data.
我们可以将日志消费者称为派生数据系统,正如在 “记录系统和派生数据” 中讨论的那样:存储在搜索索引和数据仓库中的数据只是对记录系统中数据的另一种视图。变更数据捕获是一种机制,用于确保对记录系统所做的所有更改也反映在派生数据系统中,以便派生系统能够获得数据的准确副本。
Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database to the derived systems, since it preserves the ordering of messages (avoiding the reordering issue of Figure 12-2).
本质上,变更数据捕获使一个数据库成为领导者(即从中捕获更改的数据库),并将其他数据库转变为跟随者。基于日志的消息代理非常适合将源数据库的变更事件传输到派生系统,因为它保留了消息的顺序(避免了图 12-2 中重新排序的问题)。
Logical replication logs can be used to implement change data capture (see “Logical (row-based) log replication”), although it comes with challenges, such as handling schema changes and properly modeling updates. The Debezium open source project addresses these challenges. The project contains source connectors for MySQL, PostgreSQL, Oracle, SQL Server, Db2, Cassandra, and many other databases. These connectors attach to database replication logs and surface the changes in a standard event schema. Messages can then be transformed and written to downstream databases. The Kafka Connect framework offers further CDC connectors for various databases, as well. Maxwell does something similar for MySQL by parsing the binlog [29], GoldenGate provides similar facilities for Oracle, and pgcapture does the same for PostgreSQL.
逻辑复制日志可用于实现变更数据捕获(参见 “逻辑(基于行的)日志复制”),尽管这带来了诸如处理模式变更和正确建模更新的挑战。Debezium 开源项目解决了这些挑战。该项目包含适用于 MySQL、PostgreSQL、Oracle、SQL Server、Db2、Cassandra 以及许多其他数据库的源连接器。这些连接器连接到数据库复制日志,并以标准事件模式展示变更。然后可以将消息转换并写入下游数据库。Kafka Connect 框架也为各种数据库提供了进一步的 CDC 连接器。Maxwell 通过解析 binlog [29] 为 MySQL 做类似的事情,GoldenGate 为 Oracle 提供类似功能,而 pgcapture 则为 PostgreSQL 做同样的事情。
Like message brokers, change data capture is usually asynchronous: the system of record database does not wait for the change to be applied to consumers before committing it. This design has the operational advantage that adding a slow consumer does not affect the system of record too much, but it has the downside that all the issues of replication lag apply (see “Problems with Replication Lag”).
与消息代理类似,变更数据捕获通常是异步的:记录系统数据库在将变更应用于消费者之前不会等待其提交。这种设计具有操作优势,即添加一个慢速消费者不会对记录系统产生太大影响,但它也有缺点,即所有复制延迟的问题都适用(参见 “复制延迟的问题”)。
Initial snapshot 初始快照#
If you have the log of all changes that were ever made to a database, you can reconstruct the entire state of the database by replaying the log. However, in many cases, keeping all changes forever would require too much disk space, and replaying it would take too long, so the log needs to be truncated.
如果你有所有曾经对数据库所做的变更的日志,你可以通过重放日志来重建数据库的整个状态。然而,在许多情况下,永久保存所有变更将需要过多的磁盘空间,而重放它将花费太长时间,因此日志需要进行截断。
Building a new full-text index, for example, requires a full copy of the entire database—it is not sufficient to only apply a log of recent changes, since it would be missing items that were not recently updated. Thus, if you don’t have the entire log history, you need to start with a consistent snapshot, as previously discussed in “Setting Up New Followers”.
构建一个新的全文索引,例如,需要整个数据库的完整副本 —— 仅仅应用最近更改的日志是不够的,因为那样会遗漏那些未最近更新的条目。因此,如果你没有整个日志历史,你需要从一个一致性快照开始,正如在 “设置新跟随者” 中之前讨论的那样。
The snapshot of the database must correspond to a known position or offset in the change log, so that you know at which point to start applying changes after the snapshot has been processed. Some CDC tools integrate this snapshot facility, while others leave it as a manual operation. Debezium uses Netflix’s DBLog watermarking algorithm to provide incremental snapshots [30,31].
数据库的快照必须对应于变更日志中的一个已知位置或偏移量,以便你知道在处理完快照后从哪个点开始应用变更。一些 CDC 工具集成了这个快照功能,而另一些则将其留作手动操作。Debezium 使用 Netflix 的 DBLog 水位线算法来提供增量快照 [30, 31]。
Log compaction 日志压缩#
If you can only keep a limited amount of log history, you need to go through the snapshot process every time you want to add a new derived data system. However, log compaction provides a good alternative.
如果你只能保留有限量的日志历史,每次你想添加一个新的衍生数据系统时都需要进行快照过程。然而,日志压缩提供了一个很好的替代方案。
We discussed log compaction previously in “Log-Structured Storage”, in the context of log-structured storage engines (see Figure 4-3 for an example). The principle is simple: the storage engine periodically looks for log records with the same key, throws away any duplicates, and keeps only the most recent update for each key. This might make log segments much smaller, so segments may also be merged as part of the compaction process, as shown in Figure 12-6. This process runs in the background.
我们之前在《日志结构化存储》中讨论了日志压缩,背景是日志结构化存储引擎(见图 4-3 为例)。原理很简单:存储引擎定期查找具有相同键的日志记录,丢弃任何重复项,并仅保留每个键的最新更新。这可能使日志段变得非常小,因此段也可能作为压缩过程的一部分进行合并,如图 12-6 所示。此过程在后台运行。

In a log-structured storage engine, an update with a special null value (a tombstone) indicates that a key was deleted, and causes it to be removed during log compaction. But as long as a key is not overwritten or deleted, it stays in the log forever. The disk space required for such a compacted log depends only on the current contents of the database, not the number of writes that have ever occurred in the database. If the same key is frequently overwritten, previous values will eventually be garbage-collected, and only the latest value will be retained.
在日志结构化存储引擎中,带有特殊空值(墓碑)的更新表示键被删除,并在日志压缩期间将其移除。但只要键没有被覆盖或删除,它就会永远留在日志中。这种压缩日志所需的磁盘空间仅取决于数据库的当前内容,而不是数据库中曾经发生的写入次数。如果同一个键被频繁覆盖,之前的值最终会被垃圾回收,并且仅保留最新值。
The same idea works in the context of log-based message brokers and change data capture. If the CDC system is set up such that every change has a primary key, and every update for a key replaces the previous value for that key, then it’s sufficient to keep just the most recent write for a particular key.
同样的思路也适用于基于日志的消息代理和变更数据捕获。如果 CDC 系统设置得当,每个变更都有一个主键,并且每个键的更新都会替换该键的旧值,那么只需保留特定键的最新写入即可。
Now, whenever you want to rebuild a derived data system such as a search index, you can start a new consumer from offset 0 of the log-compacted topic, and sequentially scan over all messages in the log. The log is guaranteed to contain the most recent value for every key in the database (and maybe some older values)—in other words, you can use it to obtain a full copy of the database contents without having to take another snapshot of the CDC source database.
现在,当你想要重新构建一个派生数据系统(如搜索索引)时,可以从日志压缩主题的偏移量 0 启动一个新的消费者,并顺序扫描日志中的所有消息。日志保证包含数据库中每个键的最新值(可能还有一些旧值)— 换句话说,你可以用它来获取数据库内容的完整副本,而无需再次快照 CDC 源数据库。
This log compaction feature is supported by Apache Kafka. As we shall see later in this chapter, it allows the message broker to be used for durable storage, not just for transient messaging.
这项日志压缩功能得到了 Apache Kafka 的支持。正如本章后面将要看到的,它允许消息代理不仅用于临时消息传递,还用于持久化存储。
API support for change streamsAPI 对变更流的支持#
Most popular databases now expose change streams as a first-class interface, rather than the retrofitted and reverse-engineered CDC efforts of the past. Relational databases such as MySQL and PostgreSQL typically send changes through the same replication log they use for their own replicas. Most cloud vendors offer CDC solutions for their products as well: for example, Datastream offers streaming data access for Google Cloud’s relational databases and data warehouses.
目前,大多数流行的数据库都将以第一类接口的形式暴露变更流,而不是过去那些经过改造和逆向工程的 CDC(变更数据捕获)努力。像 MySQL 和 PostgreSQL 这样的关系型数据库通常会通过它们自己用于副本复制的相同复制日志来发送变更。大多数云供应商也为他们的产品提供 CDC 解决方案:例如,Datastream 为 Google Cloud 的关系型数据库和数据仓库提供流式数据访问。
Even evenutally consistent, quorum-based databases such as Cassandra now support change data capture. As we saw in “Linearizability and quorums”, clients must persist writes to a majority of nodes before they’re considered visible. CDC support for quorum writes is challenging because there’s no single source of truth to subscribe to. Whether the data is visible or not depends on each reader’s consistency preferences. Cassandra sidesteps this issue by exposing raw log segments for each node rather than providing a single stream of mutations. Systems that wish to consume the data must read the raw log segments for each node and decide how best to merge them into a single stream (much like a quorum reader does) [32].
即使最终一致、基于多数节点的数据库如 Cassandra 现在也支持变更数据捕获。正如我们在 “线性化性和多数节点” 中所见,客户端必须将写入持久化到多数节点之前才被视为可见。对多数节点写入的 CDC 支持具有挑战性,因为不存在单一的真实现象来源可供订阅。数据是否可见取决于每个读取者的连续性偏好。Cassandra 通过为每个节点暴露原始日志段而非提供单一变更流来规避此问题。希望消费数据的系统必须读取每个节点的原始日志段,并决定如何最佳地将其合并为单一数据流(类似于多数节点读取器)[32]。
Kafka Connect [33] integrates change data capture tools for a wide range of database systems with Kafka. Once the stream of change events is in Kafka, it can be used to update derived data systems such as search indexes, and also feed into stream processing systems as discussed later in this chapter.
Kafka Connect [33] 将变更数据捕获工具与 Kafka 集成,支持多种数据库系统。一旦变更事件流进入 Kafka,它就可以用来更新衍生数据系统,如搜索索引,并且也可以作为本章后面讨论的流处理系统的输入。
Change data capture versus event sourcing 变更数据捕获与事件溯源#
Let’s compare change data capture to event sourcing. Similarly to change data capture, event sourcing involves storing all changes to the application state as a log of change events. The biggest difference is that event sourcing applies the idea at a different level of abstraction:
让我们比较一下变更数据捕获和事件溯源。与变更数据捕获类似,事件溯源也涉及将所有对应用程序状态的变化存储为变更事件日志。最大的区别在于事件溯源在抽象的不同层面上应用这一概念:
- In change data capture, the application uses the database in a mutable way, updating and deleting records at will. The log of changes is extracted from the database at a low level (e.g., by parsing the replication log), which ensures that the order of writes extracted from the database matches the order in which they were actually written, avoiding the race condition in Figure 12-4.
在变更数据捕获中,应用程序以可变的方式使用数据库,随意更新和删除记录。变更日志从数据库中提取,这是在低级别进行的(例如,通过解析复制日志),这确保了从数据库中提取的写入顺序与实际写入的顺序相匹配,避免了图 12-4 中的竞态条件。 - In event sourcing, the application logic is explicitly built on the basis of immutable events that are written to an event log. In this case, the event store is append-only, and updates or deletes of events are discouraged or prohibited. Events are designed to reflect things that happened at the application level, rather than low-level state changes.
在事件溯源中,应用程序逻辑是明确建立在不可变事件的基础上,这些事件被写入事件日志。在这种情况下,事件存储是仅追加的,更新或删除事件是不被鼓励或禁止的。事件的设计是为了反映应用程序层面的发生的事情,而不是低级别的状态变化。
Which one is better depends on your situation. Adopting event sourcing is a big change for an application that is not already doing it; it has a number of pros and cons, which we discussed in “Event Sourcing and CQRS”. In contrast, CDC can be added to an existing database with minimal changes—the application writing to the database might not even know that CDC is occurring.
哪种方式更好取决于你的情况。对于尚未采用事件溯源的应用程序来说,采用事件溯源是一个重大的改变;它有许多优缺点,我们已在 “事件溯源与 CQRS” 中讨论过。相比之下,CDC 可以以最小的改动添加到现有数据库中 —— 向数据库写入的应用程序甚至可能不知道 CDC 正在发生。
Like with change data capture, replaying the event log allows you to reconstruct the current state of the system. However, log compaction needs to be handled differently:
与变更数据捕获类似,重放事件日志可以让你重建系统的当前状态。然而,日志压缩需要以不同的方式处理:
- A CDC event for the update of a record typically contains the entire new version of the record, so the current value for a primary key is entirely determined by the most recent event for that primary key, and log compaction can discard previous events for the same key.
用于记录更新的 CDC 事件通常包含记录的新版本的完整内容,因此主键的当前值完全由该主键的最新事件决定,并且日志压缩可以丢弃具有相同键的先前事件。 - On the other hand, with event sourcing, events are modeled at a higher level: an event typically expresses the intent of a user action, not the mechanics of the state update that occurred as a result of the action. In this case, later events typically do not override prior events, and so you need the full history of events to reconstruct the final state. Log compaction is not possible in the same way.
另一方面,在事件溯源中,事件在更高层次上进行建模:一个事件通常表达用户操作的意图,而不是操作导致的状态更新机制。在这种情况下,后续事件通常不会覆盖先前的事件,因此你需要完整的事件历史来重建最终状态。日志压缩不能以同样的方式实现。
Applications that use event sourcing typically have some mechanism for storing snapshots of the current state that is derived from the log of events, so they don’t need to repeatedly reprocess the full log. However, this is only a performance optimization to speed up reads and recovery from crashes; the intention is that the system is able to store all raw events forever and reprocess the full event log whenever required. We discuss this assumption in “Limitations of immutability”.
使用事件溯源的应用通常有一些机制来存储从事件日志中派生出的当前状态的快照,这样他们就不需要反复重新处理完整日志。然而,这只是一种性能优化,用于加快读取和崩溃恢复速度;意图是系统能够永久存储所有原始事件,并在需要时重新处理完整的事件日志。我们在 “不可变性的局限性” 中讨论这一假设。
State, Streams, and Immutability 状态、流和不可变性#
We saw in Chapter 11 that batch processing benefits from the immutability of its input files, so you can run experimental processing jobs on existing input files without fear of damaging them. This principle of immutability is also what makes event sourcing and change data capture so powerful.
我们在第 11 章中看到,批处理受益于其输入文件的不可变性,因此你可以在现有的输入文件上运行实验性处理作业,而不用担心损坏它们。这个不可变性的原则也是事件溯源和变更数据捕获如此强大的原因。
We normally think of databases as storing the current state of the application—this representation is optimized for reads, and it is usually the most convenient for serving queries. The nature of state is that it changes, so databases support updating and deleting data as well as inserting it. How does this fit with immutability?
我们通常认为数据库存储应用程序的当前状态 —— 这种表示形式针对读取进行了优化,并且通常对于服务查询来说是最方便的。状态的性质是它会变化,因此数据库支持更新和删除数据,就像插入数据一样。这与不可变性如何匹配?
Whenever you have state that changes, that state is the result of the events that mutated it over time. For example, your list of currently available seats is the result of the reservations you have processed, the current account balance is the result of the credits and debits on the account, and the response time graph for your web server is an aggregation of the individual response times of all web requests that have occurred.
每当你有会变化的状态时,这个状态是随时间改变它的状态的结果。例如,你当前可用的座位列表是处理过的预订的结果,当前的账户余额是账户上的贷项和借项的结果,而你的 Web 服务器的响应时间图是对所有已经发生的 Web 请求的个别响应时间的聚合。
No matter how the state changes, there was always a sequence of events that caused those changes. Even as things are done and undone, the fact remains true that those events occurred. The key idea is that mutable state and an append-only log of immutable events do not contradict each other: they are two sides of the same coin. The log of all changes, the changelog, represents the evolution of state over time.
无论状态如何变化,总有一系列事件导致了这些变化。即使事情被完成又被撤销,事实仍然是那些事件确实发生过。关键思想是可变状态和不修改的事件追加日志并不相互矛盾:它们是同一枚硬币的两面。所有变更的日志,即变更日志,代表了状态随时间的变化。
If you are mathematically inclined, you might say that the application state is what you get when you integrate an event stream over time, and a change stream is what you get when you differentiate the state by time, as shown in Figure 12-7 [37,38]. The analogy has limitations (for example, the second derivative of state does not seem to be meaningful), but it’s a useful starting point for thinking about data.
如果你对数学感兴趣,可能会说应用状态是事件流随时间积分的结果,而状态变化流是状态随时间求导的结果,如图 12-7 所示 [37,38]。这个类比有其局限性(例如,状态的第二导数似乎没有意义),但它对于思考数据是一个有用的起点。

If you store the changelog durably, that simply has the effect of making the state reproducible. If you consider the log of events to be your system of record, and any mutable state as being derived from it, it becomes easier to reason about the flow of data through a system. As Pat Helland puts it [39]:
如果你将变更日志持久化存储,这仅仅会使得状态可重现。如果你将事件日志视为你的系统记录,并将任何可变状态视为由此衍生,那么理解数据在系统中的流动就变得更容易。正如 Pat Helland 所说 [39]:
Transaction logs record all the changes made to the database. High-speed appends are the only way to change the log. From this perspective, the contents of the database hold a caching of the latest record values in the logs. The truth is the log. The database is a cache of a subset of the log. That cached subset happens to be the latest value of each record and index value from the log.
事务日志记录了对数据库所做的所有更改。高速追加是唯一更改日志的方式。从这个角度来看,数据库的内容持有日志中最新记录值的缓存。真相是日志。数据库是日志子集的缓存。这个缓存的子集碰巧是日志中每条记录和索引值的最新值。
Log compaction is one way of bridging the distinction between log and database state: it retains only the latest version of each record, and discards overwritten versions.
日志压缩是弥合日志和数据库状态之间差异的一种方式:它仅保留每条记录的最新版本,并丢弃被覆盖的版本。
Advantages of immutable events 不可变事件的优势#
Immutability in databases is an old idea. For example, accountants have been using immutability for centuries in financial bookkeeping. When a transaction occurs, it is recorded in an append-only ledger, which is essentially a log of events describing money, goods, or services that have changed hands. The accounts, such as profit and loss or the balance sheet, are derived from the transactions in the ledger by adding them up [40].
数据库中的不可变性是一个古老的概念。例如,会计师们在数百年来的财务簿记中一直使用不可变性。当发生交易时,它被记录在一个仅追加的账簿中,这本质上是一个描述已转移的金钱、商品或服务的事件日志。账户,如损益表或资产负债表,是通过将账簿中的交易相加得出的 [40]。
If a mistake is made, accountants don’t erase or change the incorrect transaction in the ledger—instead, they add another transaction that compensates for the mistake, for example refunding an incorrect charge. The incorrect transaction still remains in the ledger forever, because it might be important for auditing reasons. If incorrect figures, derived from the incorrect ledger, have already been published, then the figures for the next accounting period include a correction. This process is entirely normal in accounting [39].
如果出现错误,会计师不会擦除或更改账簿中的不正确交易 —— 相反,他们会添加另一笔交易来弥补错误,例如退还错误的收费。不正确的交易仍然会永久保留在账簿中,因为它可能对审计很重要。如果根据不正确的账簿得出的不正确数据已经发布,那么下一会计期的数据会包含更正。这个过程在会计中完全是正常的 [39]。
Although such auditability is particularly important in financial systems, it is also beneficial for many other systems that are not subject to such strict regulation. If you accidentally deploy buggy code that writes bad data to a database, recovery is much harder if the code is able to destructively overwrite data. With an append-only log of immutable events, it is much easier to diagnose what happened and recover from the problem.
尽管这种可审计性在金融系统中尤为重要,但它对许多不受此类严格监管的其他系统也有益处。如果你不小心部署了有缺陷的代码,导致向数据库写入错误数据,如果代码能够破坏性地覆盖数据,恢复会困难得多。通过不可变事件的追加日志,诊断发生了什么以及从问题中恢复要容易得多。
Immutable events also capture more information than just the current state. For example, on a shopping website, a customer may add an item to their cart and then remove it again. Although the second event cancels out the first event from the point of view of order fulfillment, it may be useful to know for analytics purposes that the customer was considering a particular item but then decided against it. Perhaps they will choose to buy it in the future, or perhaps they found a substitute. This information is recorded in an event log, but would be lost in a database that deletes items when they are removed from the cart.
不可变事件不仅捕获比当前状态更多的信息。例如,在一个购物网站上,顾客可能将一个商品加入购物车,然后又将其移除。从订单履行的角度来看,第二个事件会抵消第一个事件,但它对于分析目的可能是有用的,可以知道顾客曾考虑过某个商品,但最终决定不购买。也许他们将来会决定购买它,也许他们找到了替代品。这些信息记录在事件日志中,但在删除购物车中的商品时会从数据库中删除,因此这些信息会丢失。
Deriving several views from the same event log 从同一事件日志中派生多个视图#
Moreover, by separating mutable state from the immutable event log, you can derive several different read-oriented representations from the same log of events. This works just like having multiple consumers of a stream (Figure 12-5): for example, the analytic database Druid ingests directly from Kafka using this approach, and Kafka Connect sinks can export data from Kafka to various different databases and indexes [33].
此外,通过将可变状态与不可变事件日志分离,你可以从同一事件日志中派生多个不同的读向表示。这就像有多个流消费者一样(图 12-5):例如,分析数据库 Druid 直接使用这种方法从 Kafka 中摄取数据,而 Kafka Connect 的输出端可以将数据从 Kafka 导出到各种不同的数据库和索引中 [33]。
Having an explicit translation step from an event log to a database makes it easier to evolve your application over time: if you want to introduce a new feature that presents your existing data in some new way, you can use the event log to build a separate read-optimized view for the new feature, and run it alongside the existing systems without having to modify them. Running old and new systems side by side is often easier than performing a complicated schema migration in an existing system. Once readers have switched to the new system and the old system is no longer needed, you can simply shut it down and reclaim its resources [41,42].
将事件日志显式转换为数据库的步骤使你更容易随着时间的推移演进你的应用程序:如果你想要引入一个以某种新方式展示现有数据的新功能,你可以使用事件日志为该新功能构建一个单独的读优化视图,并在不修改现有系统的情况下与现有系统并行运行。新旧系统并行运行通常比在现有系统中执行复杂的模式迁移要容易。一旦读者切换到新系统并且旧系统不再需要,你只需关闭它并回收其资源 [41, 42]。
This idea of writing data in one write-optimized form, and then translating it into different read-optimized representations as needed, is the command query responsibility segregation (CQRS) pattern that we already encountered in “Event Sourcing and CQRS”. It doesn’t necessarily require event sourcing: you can just as well build multiple materialized views from a stream of CDC events [43].
这种在一个写优化的形式中写入数据,然后根据需要将其转换为不同的读优化表示形式的想法,是我们已经在 “事件溯源和 CQRS” 中遇到过的命令查询责任分离(CQRS)模式。它并不一定需要事件溯源:你也可以同样从 CDC 事件流中构建多个物化视图 [43]。
The traditional approach to database and schema design is based on the fallacy that data must be written in the same form as it will be queried. Debates about normalization and denormalization (see “Normalization, Denormalization, and Joins”) become largely irrelevant if you can translate data from a write-optimized event log to read-optimized application state: it is entirely reasonable to denormalize data in the read-optimized views, as the translation process gives you a mechanism for keeping it consistent with the event log.
传统的数据库和模式设计方法基于一个谬误,即数据必须以查询时的形式写入。如果你可以将数据从写优化的事件日志转换为读优化的应用状态(参见 “规范化、反规范化与连接”),那么关于规范化和反规范化的争论就基本上无关紧要:在读优化的视图中反规范化数据完全是合理的,因为转换过程为你提供了一个机制来保持其与事件日志的一致性。
In “Case Study: Social Network Home Timelines” we discussed a social network’s home timelines, a cache of recent posts by the people a particular user is following (like a mailbox). This is another example of read-optimized state: home timelines are highly denormalized, since your posts are duplicated in all of the timelines of the people following you. However, the fan-out service keeps this duplicated state in sync with new posts and new following relationships, which keeps the duplication manageable.
在 “案例研究:社交网络主页时间线” 中,我们讨论了社交网络的主页时间线,这是特定用户所关注的人的最近帖子缓存(类似于邮箱)。这是另一个读优化的状态示例:主页时间线高度反规范化,因为你的帖子会复制到所有关注你的人的时间线中。然而,扇出服务将这个复制状态与新的帖子和新关注关系保持同步,这使得复制状态是可控的。
Concurrency control 并发控制#
The biggest downside of CQRS is that the consumers of the event log are usually asynchronous, so there is a possibility that a user may make a write to the log, then read from a derived view and find that their write has not yet been reflected in the view. We discussed this problem and potential solutions previously in “Reading Your Own Writes”.
CQRS 最大的缺点在于事件日志的消费者通常是异步的,因此存在用户写入日志后,从派生视图读取时发现其写入尚未反映在视图中的可能性。我们在 “读取自己的写入” 中讨论过这个问题和潜在的解决方案。
One solution would be to perform the updates of the read view synchronously with appending the event to the log. This either requires a distributed transaction across the event log and the derived view, or some way of waiting until an event has been reflected in the view. Both approaches are usually impractical, so views are normally updated asynchronously.
一种解决方案是将读取视图的更新与将事件追加到日志中同步进行。这要么需要跨事件日志和派生视图的分布式事务,要么需要某种方式等待事件反映在视图中。这两种方法通常都不切实际,因此视图通常异步更新。
On the other hand, deriving the current state from an event log also simplifies some aspects of concurrency control. Much of the need for multi-object transactions (see “Single-Object and Multi-Object Operations”) stems from a single user action requiring data to be changed in several different places. With event sourcing, you can design an event such that it is a self-contained description of a user action. The user action then requires only a single write in one place—namely appending the event to the log—which is easy to make atomic.
另一方面,从事件日志中派生当前状态也简化了并发控制的一些方面。对多对象事务的需求(参见 “单对象和多对象操作”)很大程度上源于单个用户操作需要在不同位置更改数据。使用事件溯源,你可以设计一个事件,使其成为对用户操作的完整描述。然后,用户操作只需要在一个地方进行一次写入 —— 即将事件追加到日志中 —— 这很容易实现原子性。
If the event log and the application state are sharded in the same way (for example, processing an event for a customer in shard 3 only requires updating shard 3 of the application state), then a straightforward single-threaded log consumer needs no concurrency control for writes—by construction, it only processes a single event at a time (see also “Actual Serial Execution”). The log removes the nondeterminism of concurrency by defining a serial order of events in a shard [27]. If an event touches multiple state shards, a bit more work is required, which we will discuss in [Link to Come].
如果事件日志和应用状态以相同的方式进行分片(例如,处理分片 3 中的客户事件只需更新应用状态的分片 3),那么一个简单的单线程日志消费者不需要写操作并发控制 —— 按设计,它一次只处理一个事件(另见 “实际串行执行”)。日志通过为分片中的事件定义一个串行顺序来消除并发的不确定性 [27]。如果一个事件涉及多个状态分片,则需要更多的工作,我们将在 [即将链接] 中讨论。
Many systems that don’t use an event-sourced model nevertheless rely on immutability for concurrency control: various databases internally use immutable data structures or multi-version data to support point-in-time snapshots (see “Indexes and snapshot isolation”). Version control systems such as Git, Mercurial, and Fossil also rely on immutable data to preserve version history of files.
许多不使用事件溯源模型的系统仍然依赖不可变性来实现并发控制:各种数据库内部使用不可变数据结构或多版本数据来支持即时快照(参见 “索引和快照隔离”)。版本控制系统如 Git、Mercurial 和 Fossil 也依赖不可变数据来保存文件的历史版本。
Limitations of immutability 不可变性的局限性#
To what extent is it feasible to keep an immutable history of all changes forever? The answer depends on the amount of churn in the dataset. Some workloads mostly add data and rarely update or delete; they are easy to make immutable. Other workloads have a high rate of updates and deletes on a comparatively small dataset; in these cases, the immutable history may grow prohibitively large, fragmentation may become an issue, and the performance of compaction and garbage collection becomes crucial for operational robustness [44,45].
永久保持所有变更的不可变历史在多大程度上是可行的?答案取决于数据集的变动量。某些工作负载主要添加数据而很少更新或删除,它们很容易实现不可变性。其他工作负载在相对较小的数据集上频繁更新和删除;在这些情况下,不可变历史可能会变得过大,碎片化可能成为一个问题,而压缩和垃圾回收的性能对于操作稳定性至关重要 [44, 45]。
Besides the performance reasons, there may also be circumstances in which you need data to be deleted for administrative or legal reasons, in spite of all immutability. For example, privacy regulations such as the European General Data Protection Regulation (GDPR) require that a user’s personal information be deleted and erroneous information be removed on demand, or an accidental leak of sensitive information may need to be contained.
除了性能原因外,还可能存在需要出于管理或法律原因删除数据的情形,尽管数据是不可变的。例如,欧洲通用数据保护条例(GDPR)等隐私法规要求在用户要求时删除个人信息并移除错误信息,或者意外泄露的敏感信息可能需要被控制。
In these circumstances, it’s not sufficient to just append another event to the log to indicate that the prior data should be considered deleted—you actually want to rewrite history and pretend that the data was never written in the first place. For example, Datomic calls this feature excision [46], and the Fossil version control system has a similar concept called shunning [47].
在这些情况下,仅仅在日志中追加一个事件来表示先前数据应被视为已删除是不够的 —— 实际上你需要重写历史,假装数据从未被写入过。例如,Datomic 将这一功能称为切除 [46],而 Fossil 版本控制系统有一个类似的概念,称为回避 [47]。
Truly deleting data is surprisingly hard [48], since copies can live in many places: for example, storage engines, filesystems, and SSDs often write to a new location rather than overwriting in place [39], and backups are often deliberately immutable to prevent accidental deletion or corruption.
真正删除数据出乎意料地困难 [48],因为副本可能存在于很多地方:例如,存储引擎、文件系统和 SSD 通常会写入新位置而不是原地覆盖 [39],并且备份通常被有意设计为不可变的,以防止意外删除或损坏。
One way of enabling deletion of immutable data is crypto-shredding [49]: data that you may want to delete in the future is stored encrypted, and when you want to get rid of it, you forget the encryption key. The encrypted data is then still there, but nobody can use it. In some sense this only moves the problem around: the actual data is now immutable, but your key storage is mutable.
一种实现不可变数据删除的方法是加密销毁 [49]:未来可能需要删除的数据被加密存储,当你想要丢弃它时,就忘记加密密钥。加密数据仍然存在,但无人能使用。从某种意义上说,这只是将问题转移:实际数据现在不可变,但你的密钥存储是可变的。
Moreover, you have to decide up front which data is going to be encrypted with the same key, and when you are going to use different keys—an important decision, since you can later crypto-shred either all or none of the data encrypted with a particular key, but not some of it. Storing a separate key for every single data item would get too unwieldy, as the key storage would get as big as the primary data storage. More sophisticated schemes such as puncturable encryption [50] make it possible to selectively revoke a key’s decryption abilities, but they are not widely used.
此外,你必须一开始就决定哪些数据将使用相同的密钥加密,以及何时使用不同的密钥 —— 这是一个重要的决定,因为你后来可以加密销毁使用特定密钥加密的所有或全部数据,但不能只销毁部分数据。为每个单个数据项存储一个单独的密钥会变得过于繁琐,因为密钥存储将变得与主要数据存储一样大。更复杂的方案,如可插拔加密 [50],可以实现对密钥解密能力的选择性撤销,但它们并未得到广泛应用。
Overall, deletion is more a matter of “making it harder to retrieve the data” than actually “making it impossible to retrieve the data.” Nevertheless, you sometimes have to try, as we shall see in [Link to Come].
总的来说,删除更多是 “让数据更难被检索” 而不是 “让数据实际上无法被检索”。尽管如此,有时你不得不尝试,正如我们将在 [待链接] 中看到的那样。
Processing Streams 处理流#
So far in this chapter we have talked about where streams come from (user activity events, sensors, and writes to databases), and we have talked about how streams are transported (through direct messaging, via message brokers, and in event logs).
到目前为止,在本章中我们讨论了流数据的来源(用户活动事件、传感器以及写入数据库),也讨论了流数据的传输方式(通过直接消息、通过消息代理以及通过事件日志)。
What remains is to discuss what you can do with the stream once you have it—namely, you can process it. Broadly, there are three options:
剩下的就是讨论一旦你获取了流数据后可以做什么 —— 即可以处理它。广义上,有三种选择:
- You can take the data in the events and write it to a database, cache, search index, or similar storage system, from where it can then be queried by other clients. As shown in Figure 12-5, this is a good way of keeping a database in sync with changes happening in other parts of the system—especially if the stream consumer is the only client writing to the database. Writing to a storage system is the streaming equivalent of what we discussed in “Batch Use Cases”.
你可以将事件中的数据写入数据库、缓存、搜索索引或类似的存储系统,然后其他客户端可以从这些系统中查询数据。如图 12-5 所示,这是一种使数据库与其他系统部分的变化保持同步的好方法 —— 特别是如果流消费者是唯一写入数据库的客户端。写入存储系统是我们在 “批量用例” 中讨论内容的流式处理等效方式。 - You can push the events to users in some way, for example by sending email alerts or push notifications, or by streaming the events to a real-time dashboard where they are visualized. In this case, a human is the ultimate consumer of the stream.
你可以以某种方式将事件推送给用户,例如通过发送电子邮件警报或推送通知,或者将事件流传输到实时仪表板进行可视化。在这种情况下,人类是流的最终消费者。 - You can process one or more input streams to produce one or more output streams. Streams may go through a pipeline consisting of several such processing stages before they eventually end up at an output (option 1 or 2).
你可以处理一个或多个输入流来生成一个或多个输出流。流可能会经过一个由多个此类处理阶段组成的管道,最终到达输出(选项 1 或 2)。
In the rest of this chapter, we will discuss option 3: processing streams to produce other, derived streams. A piece of code that processes streams like this is known as an operator or a job. It is closely related to the Unix processes and MapReduce jobs we discussed in Chapter 11, and the pattern of dataflow is similar: a stream processor consumes input streams in a read-only fashion and writes its output to a different location in an append-only fashion.
在本章的其余部分,我们将讨论选项 3:处理流以生成其他衍生流。像这样处理流的代码被称为算子或作业。它与我们在第 11 章讨论的 Unix 进程和 MapReduce 作业密切相关,数据流模式也相似:流处理器以只读方式消耗输入流,并以追加方式将其输出写入不同的位置。
The patterns for sharding and parallelization in stream processors are also very similar to those in MapReduce and the dataflow engines we saw in Chapter 11, so we won’t repeat those topics here. Basic mapping operations such as transforming and filtering records also work the same.
流处理器中的分片和并行化模式也与 MapReduce 以及我们在第 11 章看到的数据流引擎中的模式非常相似,因此我们不会在这里重复这些主题。基本的映射操作,如转换和过滤记录,也工作方式相同。
The one crucial difference from batch jobs is that a stream never ends. This difference has many implications: as discussed at the start of this chapter, sorting does not make sense with an unbounded dataset, and so sort-merge joins (see “JOIN and GROUP BY”) cannot be used. Fault-tolerance mechanisms must also change: with a batch job that has been running for a few minutes, a failed task can simply be restarted from the beginning, but with a stream job that has been running for several years, restarting from the beginning after a crash may not be a viable option.
与批处理作业的一个关键区别是流永远不会结束。这一区别有许多影响:正如本章开头所讨论的,对于无界数据集,排序没有意义,因此不能使用排序 - 合并连接(参见 “JOIN 和 GROUP BY”)。容错机制也必须改变:对于运行了几分钟的批处理作业,失败的任务可以简单地从头开始重新启动,但对于运行了几年的流处理作业,在崩溃后从头开始重新启动可能不是一个可行的选项。
Uses of Stream Processing 流处理的用途#
Stream processing has long been used for monitoring purposes, where an organization wants to be alerted if certain things happen. For example:
流处理长期以来被用于监控目的,即当某些事情发生时,组织希望收到警报。例如:
- Fraud detection systems need to determine if the usage patterns of a credit card have unexpectedly changed, and block the card if it is likely to have been stolen.
欺诈检测系统需要确定信用卡的使用模式是否发生了意外变化,并在可能被盗的情况下封锁该卡。 - Trading systems need to examine price changes in a financial market and execute trades according to specified rules.
交易系统需要检查金融市场中的价格变化,并根据指定规则执行交易。 - Manufacturing systems need to monitor the status of machines in a factory, and quickly identify the problem if there is a malfunction.
制造系统需要监控工厂中机器的状态,并在出现故障时快速识别问题。 - Military and intelligence systems need to track the activities of a potential aggressor, and raise the alarm if there are signs of an attack.
军事和情报系统需要追踪潜在侵略者的活动,并在出现攻击迹象时发出警报。
These kinds of applications require quite sophisticated pattern matching and correlations. However, other uses of stream processing have also emerged over time. In this section we will briefly compare and contrast some of these applications.
这类应用需要相当复杂的模式匹配和关联。然而,随着时间的推移,流处理的其他用途也已出现。在本节中,我们将简要比较和对比其中一些应用。
Complex event processing 复杂事件处理#
Complex event processing (CEP) is an approach developed in the 1990s for analyzing event streams, especially geared toward the kind of application that requires searching for certain event patterns [51]. Similarly to the way that a regular expression allows you to search for certain patterns of characters in a string, CEP allows you to specify rules to search for certain patterns of events in a stream.
复杂事件处理(CEP)是一种在 20 世纪 90 年代开发的方法,用于分析事件流,尤其适用于需要搜索特定事件模式的应用 [51]。类似于正则表达式允许你在字符串中搜索特定字符模式,CEP 允许你指定规则来搜索流中特定的事件模式。
CEP systems often use a high-level declarative query language like SQL, or a graphical user interface, to describe the patterns of events that should be detected. These queries are submitted to a processing engine that consumes the input streams and internally maintains a state machine that performs the required matching. When a match is found, the engine emits a complex event (hence the name) with the details of the event pattern that was detected [52].
CEP 系统通常使用像 SQL 这样的高级声明式查询语言,或图形用户界面,来描述应检测的事件模式。这些查询提交给处理引擎,该引擎消费输入流并在内部维护一个状态机来执行所需的匹配。当找到匹配时,引擎会发出一个复杂事件(因此得名),其中包含被检测到的事件模式的详细信息 [52]。
In these systems, the relationship between queries and data is reversed compared to normal databases. Usually, a database stores data persistently and treats queries as transient: when a query comes in, the database searches for data matching the query, and then forgets about the query when it has finished. CEP engines reverse these roles: queries are stored long-term; as each event arrives, the engine checks whether it has now seen an event pattern that matches any of its standing queries [53].
在这些系统中,查询与数据之间的关系与普通数据库相反。通常,数据库会持久存储数据,并将查询视为短暂的:当查询到来时,数据库会搜索与查询匹配的数据,然后在完成搜索后就会忘记该查询。CEP 引擎则反转了这些角色:查询被长期存储;每当一个事件到达时,引擎会检查它是否已经看到了与任何当前查询匹配的事件模式 [53]。
Implementations of CEP include Esper, Apama, and TIBCO StreamBase. Distributed stream processors like Flink and Spark Streaming also have SQL support for declarative queries on streams.
CEP 的实现包括 Esper、Apama 和 TIBCO StreamBase。像 Flink 和 Spark Streaming 这样的分布式流处理器也支持对流的声明式查询的 SQL。
Stream analytics 流分析#
Another area in which stream processing is used is for analytics on streams. The boundary between CEP and stream analytics is blurry, but as a general rule, analytics tends to be less interested in finding specific event sequences and is more oriented toward aggregations and statistical metrics over a large number of events—for example:
流处理应用的另一个领域是流上的分析。CEP 与流分析之间的界限模糊不清,但作为一般规则,分析通常不太关注找到特定的特定事件序列,而更倾向于对大量事件进行聚合和统计指标 —— 例如:
- Measuring the rate of some type of event (how often it occurs per time interval)
测量某种事件的发生频率(即每个时间间隔内发生的次数) - Calculating the rolling average of a value over some time period
计算某个时间段内值的滚动平均值 - Comparing current statistics to previous time intervals (e.g., to detect trends or to alert on metrics that are unusually high or low compared to the same time last week)
将当前统计数据与先前的时间间隔进行比较(例如,以检测趋势或提醒与上周同一时间段相比异常高或低的指标)
Such statistics are usually computed over fixed time intervals—for example, you might want to know the average number of queries per second to a service over the last 5 minutes, and their 99th percentile response time during that period. Averaging over a few minutes smoothes out irrelevant fluctuations from one second to the next, while still giving you a timely picture of any changes in traffic pattern. The time interval over which you aggregate is known as a window, and we will look into windowing in more detail in “Reasoning About Time”.
这类统计数据通常在固定的时间间隔内计算 —— 例如,你可能想知道过去 5 分钟内服务每秒的平均请求数量,以及该期间内它们的 99 分位数的响应时间。对几分钟内的值进行平均可以平滑每秒之间的无关波动,同时仍然能让你及时了解流量模式的变化。你聚合的时间间隔被称为窗口,我们将在 “关于时间的推理” 中更详细地探讨窗口化。
Stream analytics systems sometimes use probabilistic algorithms, such as Bloom filters (which we encountered in “Bloom filters”) for set membership, HyperLogLog [54] for cardinality estimation, and various percentile estimation algorithms (see “Computing percentiles”). Probabilistic algorithms produce approximate results, but have the advantage of requiring significantly less memory in the stream processor than exact algorithms. This use of approximation algorithms sometimes leads people to believe that stream processing systems are always lossy and inexact, but that is wrong: there is nothing inherently approximate about stream processing, and using probabilistic algorithms is merely an optimization [55].
流分析系统有时会使用概率算法,例如用于集合成员资格的布隆过滤器(我们在 “布隆过滤器” 中遇到过)、用于基数估计的 HyperLogLog [54] 以及各种百分位数估计算法(参见 “计算百分位数”)。概率算法会产生近似结果,但相对于精确算法,它们的优势在于流处理器所需的内存显著减少。这种近似算法的使用有时会让人认为流处理系统总是有损且不精确,但这是错误的:流处理本身并没有内在的近似性,使用概率算法仅仅是一种优化 [55]。
Many open source distributed stream processing frameworks are designed with analytics in mind: for example, Apache Storm, Spark Streaming, Flink, Samza, and Kafka Streams [56]. Hosted services include Google Cloud Dataflow and Azure Stream Analytics.
许多开源分布式流处理框架都是为分析而设计的:例如,Apache Storm、Spark Streaming、Flink、Samza 和 Kafka Streams [56]。托管的云服务包括 Google Cloud Dataflow 和 Azure Stream Analytics。
Maintaining materialized views 维护物化视图#
We saw that a stream of changes to a database can be used to keep derived data systems, such as caches, search indexes, and data warehouses, up to date with a source database. These are examples of maintaining materialized views: deriving an alternative view onto some dataset so that you can query it efficiently, and updating that view whenever the underlying data changes [37].
我们看到,数据库的变化流可以用来使派生数据系统(如缓存、搜索索引和数据仓库)与源数据库保持最新。这些是维护物化视图的例子:派生出对某些数据集的替代视图,以便你可以高效地查询它,并在底层数据变化时更新该视图 [37]。
Similarly, in event sourcing, application state is maintained by applying a log of events; here the application state is also a kind of materialized view. Unlike stream analytics scenarios, it is usually not sufficient to consider only events within some time window: building the materialized view potentially requires all events over an arbitrary time period, apart from any obsolete events that may be discarded by log compaction. In effect, you need a window that stretches all the way back to the beginning of time.
类似地,在事件溯源中,应用状态是通过应用事件日志来维护的;在这里,应用状态也是一种物化视图。与流分析场景不同,通常仅考虑某个时间窗口内的事件是不够的:构建物化视图可能需要所有任意时间段内的事件,除了可能被日志压缩丢弃的过时事件。实际上,你需要一个一直延伸到时间开始时的窗口。
In principle, any stream processor could be used for materialized view maintenance, although the need to maintain events forever runs counter to the assumptions of some analytics-oriented frameworks that mostly operate on windows of a limited duration. Kafka Streams and Confluent’s ksqlDB support this kind of usage, building upon Kafka’s support for log compaction [57].
原则上,任何流处理器都可以用于物化视图维护,尽管需要永久维护事件这与一些以分析为导向的框架的假设相悖,这些框架主要在有限时间窗口上运行。Kafka Streams 和 Confluent 的 ksqlDB 支持这种用法,它们建立在 Kafka 对日志压缩的支持之上 [57]。
Search on streams 流搜索#
Besides CEP, which allows searching for patterns consisting of multiple events, there is also sometimes a need to search for individual events based on complex criteria, such as full-text search queries.
除了 CEP(复杂事件处理),它允许搜索由多个事件组成的模式之外,有时还需要根据复杂标准搜索单个事件,例如全文搜索查询。
For example, media monitoring services subscribe to feeds of news articles and broadcasts from media outlets, and search for any news mentioning companies, products, or topics of interest. This is done by formulating a search query in advance, and then continually matching the stream of news items against this query. Similar features exist on some websites: for example, users of real estate websites can ask to be notified when a new property matching their search criteria appears on the market. The percolator feature of Elasticsearch [61] is one option for implementing this kind of stream search.
例如,媒体监控服务订阅媒体机构的新闻文章和广播源,并搜索任何提及公司、产品或感兴趣主题的新闻。这是通过提前制定搜索查询,然后持续将新闻流与该查询进行匹配来完成的。一些网站上也存在类似功能:例如,房地产网站的用户可以请求在市场上出现符合其搜索标准的房产时收到通知。Elasticsearch [61] 的 percolator 功能是实现这种流搜索的一种选择。
Conventional search engines first index the documents and then run queries over the index. By contrast, searching a stream turns the processing on its head: the queries are stored, and the documents run past the queries, like in CEP. In the simplest case, you can test every document against every query, although this can get slow if you have a large number of queries. To optimize the process, it is possible to index the queries as well as the documents, and thus narrow down the set of queries that may match [62].
Event-Driven Architectures and RPC#
In “Event-Driven Architectures” we discussed message-passing systems as an alternative to RPC—i.e., as a mechanism for services to communicate, as used for example in the actor model. Although these systems are also based on messages and events, we normally don’t think of them as stream processors:
- Actor frameworks are primarily a mechanism for managing concurrency and distributed execution of communicating modules, whereas stream processing is primarily a data management technique.
- Communication between actors is often ephemeral and one-to-one, whereas event logs are durable and multi-subscriber.
演员之间的通信通常是短暂的且是一对一的,而事件日志是持久的且有多订阅者。 - Actors can communicate in arbitrary ways (including cyclic request/response patterns), but stream processors are usually set up in acyclic pipelines where every stream is the output of one particular job, and derived from a well-defined set of input streams.
演员可以以任意方式通信(包括循环请求 / 响应模式),但流处理器通常设置为无环管道,其中每个流都是特定工作的输出,并源自一组明确定义的输入流。
That said, there is some crossover area between RPC-like systems and stream processing. For example, Apache Storm has a feature called distributed RPC, which allows user queries to be farmed out to a set of nodes that also process event streams; these queries are then interleaved with events from the input streams, and results can be aggregated and sent back to the user. (See also [Link to Come].)
话虽如此,RPC 类系统和流处理之间有一些交叉领域。例如,Apache Storm 有一个名为分布式 RPC 的功能,它允许用户查询分配给一组也处理事件流的节点;这些查询随后与输入流中的事件交错,结果可以聚合并发送回用户。(另见 [待链接])
It is also possible to process streams using actor frameworks. However, many such frameworks do not guarantee message delivery in the case of crashes, so the processing is not fault-tolerant unless you implement additional retry logic.
也可以使用演员框架处理流。然而,许多此类框架在崩溃的情况下不保证消息传递,因此除非你实现额外的重试逻辑,否则处理不是容错的。
Reasoning About Time 关于时间的推理#
Stream processors often need to deal with time, especially when used for analytics purposes, which frequently use time windows such as “the average over the last five minutes.” It might seem that the meaning of “the last five minutes” should be unambiguous and clear, but unfortunately the notion is surprisingly tricky.
流处理程序通常需要处理时间,尤其是在用于分析目的时,分析经常使用时间窗口,例如 “过去五分钟的平均值”。看起来 “过去五分钟” 的含义应该是明确且清晰的,但不幸的是,这个概念出乎意料地复杂。
In a batch process, the processing tasks rapidly crunch through a large collection of historical events. If some kind of breakdown by time needs to happen, the batch process needs to look at the timestamp embedded in each event. There is no point in looking at the system clock of the machine running the batch process, because the time at which the process is run has nothing to do with the time at which the events actually occurred.
在批处理过程中,处理任务会快速处理大量历史事件。如果需要按时间进行某种划分,批处理过程需要查看嵌入在每个事件中的时间戳。没有理由查看运行批处理程序的机器的系统时钟,因为处理运行的时间与事件实际发生的时间无关。
A batch process may read a year’s worth of historical events within a few minutes; in most cases, the timeline of interest is the year of history, not the few minutes of processing. Moreover, using the timestamps in the events allows the processing to be deterministic: running the same process again on the same input yields the same result.
批处理过程可能几分钟内读取一年的历史事件;在大多数情况下,感兴趣的时间线是历史的一年,而不是处理的那几分钟。此外,使用事件中的时间戳允许处理具有确定性:在相同的输入上再次运行相同的处理会产生相同的结果。
On the other hand, many stream processing frameworks use the local system clock on the processing machine (the processing time) to determine windowing [63]. This approach has the advantage of being simple, and it is reasonable if the delay between event creation and event processing is negligibly short. However, it breaks down if there is any significant processing lag—i.e., if the processing may happen noticeably later than the time at which the event actually occurred.
另一方面,许多流处理框架使用处理机器的本地系统时钟(处理时间)来确定窗口 [63]。这种方法的优势在于简单,如果事件创建和事件处理之间的延迟可以忽略不计,这是合理的。然而,如果有任何显著的处理延迟 —— 即处理可能明显晚于事件实际发生的时间 —— 这种方法就会失效。
Event time versus processing time 事件时间与处理时间#
There are many reasons why processing may be delayed: queueing, network faults, a performance issue leading to contention in the message broker or processor, a restart of the stream consumer, or reprocessing of past events while recovering from a fault or after fixing a bug in the code.
处理延迟的原因有很多:队列、网络故障、导致消息代理或处理器争用的性能问题、流消费者的重启,或在从故障恢复后或修复代码中的错误后重新处理过去的事件。
Moreover, message delays can also lead to unpredictable ordering of messages. For example, say a user first makes one web request (which is handled by web server A), and then a second request (which is handled by server B). A and B emit events describing the requests they handled, but B’s event reaches the message broker before A’s event does. Now stream processors will first see the B event and then the A event, even though they actually occurred in the opposite order.
此外,消息延迟还可能导致消息的顺序变得不可预测。例如,假设用户首先发起一个网络请求(由 A 服务器处理),然后发起第二个请求(由 B 服务器处理)。A 和 B 分别发出描述它们所处理请求的事件,但 B 的事件比 A 的事件先到达消息代理。此时,流处理程序会先看到 B 事件,然后看到 A 事件,尽管它们实际发生的顺序相反。
If it helps to have an analogy, consider the Star Wars movies: Episode IV was released in 1977, Episode V in 1980, and Episode VI in 1983, followed by Episodes I, II, and III in 1999, 2002, and 2005, respectively, and Episodes VII, VIII, and IX in 2015, 2017, and 2019 [64]. If you watched the movies in the order they came out, the order in which you processed the movies is inconsistent with the order of their narrative. (The episode number is like the event timestamp, and the date when you watched the movie is the processing time.) As humans, we are able to cope with such discontinuities, but stream processing algorithms need to be specifically written to accommodate such timing and ordering issues.
如果用一个类比来帮助理解,可以想想《星球大战》系列电影:第四部于 1977 年上映,第五部于 1980 年上映,第六部于 1983 年上映,随后第一、二、三部分别于 1999 年、2002 年和 2005 年上映,第七、八、九部分别于 2015 年、2017 年和 2019 年上映 [64]。如果你按照上映顺序观看这些电影,你处理电影的顺序就会与它们的叙事顺序不一致。(集数就像事件时间戳,你观看电影的日期就是处理时间。)作为人类,我们能够应对这种不连续性,但流处理算法需要特别编写以适应这类时间和顺序问题。
Confusing event time and processing time leads to bad data. For example, say you have a stream processor that measures the rate of requests (counting the number of requests per second). If you redeploy the stream processor, it may be shut down for a minute and process the backlog of events when it comes back up. If you measure the rate based on the processing time, it will look as if there was a sudden anomalous spike of requests while processing the backlog, when in fact the real rate of requests was steady (Figure 12-8).
混淆事件时间和处理时间会导致数据错误。例如,假设你有一个测量请求速率(每秒计数请求数)的流处理器。如果你重新部署流处理器,它可能会关闭一分钟,并在重新启动时处理积压的事件。如果你基于处理时间来测量速率,看起来就像在处理积压事件时突然出现了异常的请求激增,而实际上请求的真实速率是稳定的(图 12-8)。

Knowing when you’re ready 知道何时准备就绪#
A tricky problem when defining windows in terms of event time is that you can never be sure when you have received all of the events for a particular window, or whether there are some events still to come.
在基于事件时间定义窗口时,一个棘手的问题是,你永远无法确定何时已经接收到特定窗口的所有事件,或者是否还有某些事件即将到来。
For example, say you’re grouping events into one-minute windows so that you can count the number of requests per minute. You have counted some number of events with timestamps that fall in the 37th minute of the hour, and time has moved on; now most of the incoming events fall within the 38th and 39th minutes of the hour. When do you declare that you have finished the window for the 37th minute, and output its counter value?
例如,假设你将事件分组到一个分钟的窗口中,以便你可以统计每分钟的请求数量。你已经统计了一些时间戳落在小时第 37 分钟的事件,而时间已经过去;现在大多数传入的事件都落在小时的第 38 分钟和第 39 分钟之间。你何时宣布完成第 37 分钟的窗口,并输出其计数器值?
You can time out and declare a window ready after you have not seen any new events for that window in a while. However, it could still happen that some events were buffered on another machine somewhere, delayed due to a network interruption. You need to be able to handle such straggler events that arrive after the window has already been declared complete. Broadly, you have two options [1]:
你可以在一段时间内没有看到该窗口中的任何新事件后超时并声明窗口已准备好。然而,仍然可能发生某些事件在其他机器上被缓冲,由于网络中断而延迟的情况。你需要能够处理在窗口已声明完成之后到达的这些滞留事件。总的来说,你有两个选择 [1]:
- Ignore the straggler events, as they are probably a small percentage of events in normal circumstances. You can track the number of dropped events as a metric, and alert if you start dropping a significant amount of data.
忽略滞留事件,因为它们在正常情况下可能只占事件的一小部分。你可以将丢弃事件的数量作为一个指标来跟踪,并在开始丢弃大量数据时发出警报。 - Publish a correction, an updated value for the window with stragglers included. You may also need to retract the previous output.
发布一个修正,一个包含滞留事件在内的窗口的更新值。你可能还需要撤回之前的输出。
In some cases it is possible to use a special message to indicate, “From now on there will be no more messages with a timestamp earlier than t,” which can be used by consumers to trigger windows [65]. However, if several producers on different machines are generating events, each with their own minimum timestamp thresholds, the consumers need to keep track of each producer individually. Adding and removing producers is trickier in this case.
在某些情况下,可以使用一个特殊消息来指示,“从现在开始将不会有时间戳早于 t 的消息,” 这可以被消费者用来触发窗口 [65]。然而,如果不同机器上的多个生产者正在生成事件,每个事件都有自己的最小时间戳阈值,消费者需要单独跟踪每个生产者。在这种情况下,添加和移除生产者会更加复杂。
Whose clock are you using, anyway? 你到底在用哪个时钟?#
Assigning timestamps to events is even more difficult when events can be buffered at several points in the system. For example, consider a mobile app that reports events for usage metrics to a server. The app may be used while the device is offline, in which case it will buffer events locally on the device and send them to a server when an internet connection is next available (which may be hours or even days later). To any consumers of this stream, the events will appear as extremely delayed stragglers.
当事件可以在系统的多个点被缓冲时,给事件分配时间戳会更加困难。例如,考虑一个向服务器报告使用指标事件的移动应用。当设备离线时可能使用该应用,这种情况下,应用会在设备本地缓冲事件,并在下次有网络连接时(可能几小时甚至几天后)将它们发送到服务器。对于这个流的消费者来说,这些事件会显得非常延迟。
In this context, the timestamp on the events should really be the time at which the user interaction occurred, according to the mobile device’s local clock. However, the clock on a user-controlled device often cannot be trusted, as it may be accidentally or deliberately set to the wrong time (see “Clock Synchronization and Accuracy”). The time at which the event was received by the server (according to the server’s clock) is more likely to be accurate, since the server is under your control, but less meaningful in terms of describing the user interaction.
在这种情况下,事件上的时间戳应该根据移动设备的本地时钟,真正反映用户交互发生的时间。然而,用户控制的设备的时钟往往不可信,因为它可能被意外或故意设置为错误的时间(参见 “时钟同步和准确性”)。事件被服务器接收的时间(根据服务器的时钟)更有可能准确,因为服务器在你的控制之下,但在描述用户交互方面意义较小。
To adjust for incorrect device clocks, one approach is to log three timestamps [66]:
为了调整设备时钟的不正确,一种方法是记录三个时间戳 [66]:
- The time at which the event occurred, according to the device clock
事件发生的时间,根据设备时钟 - The time at which the event was sent to the server, according to the device clock
事件被发送到服务器的时间,根据设备时钟 - The time at which the event was received by the server, according to the server clock
事件被服务器接收的时间,根据服务器时钟
By subtracting the second timestamp from the third, you can estimate the offset between the device clock and the server clock (assuming the network delay is negligible compared to the required timestamp accuracy). You can then apply that offset to the event timestamp, and thus estimate the true time at which the event actually occurred (assuming the device clock offset did not change between the time the event occurred and the time it was sent to the server).
通过将第二个时间戳减去第三个时间戳,你可以估算设备时钟和服务器时钟之间的偏移量(假设网络延迟与所需的时间戳精度相比可以忽略不计)。然后你可以将这个偏移量应用到事件时间戳上,从而估算事件实际发生的时间(假设事件发生时和被发送到服务器时设备时钟的偏移量没有变化)。
This problem is not unique to stream processing—batch processing suffers from exactly the same issues of reasoning about time. It is just more noticeable in a streaming context, where we are more aware of the passage of time.
这个问题并非流处理所独有 —— 批处理同样面临着关于时间的推理问题。只是在一个流处理环境中,我们更加意识到时间的流逝。
Types of windows 窗口类型#
Once you know how the timestamp of an event should be determined, the next step is to decide how windows over time periods should be defined. The window can then be used for aggregations, for example to count events, or to calculate the average of values within the window. Several types of windows are in common use [63,67]:
一旦你知道了如何确定事件的时间戳,下一步就是决定如何定义跨越时间段的时间窗口。然后,这个窗口可以用于聚合,例如统计事件数量,或者计算窗口内值的平均值。有几种常见的时间窗口类型 [63, 67]:
Tumbling window 滑动窗口
A tumbling window has a fixed length, and every event belongs to exactly one window. For example, if you have a 1-minute tumbling window, all the events with timestamps between 10:03 and 10:03 are grouped into one window, events between 10:04 and 10:04 into the next window, and so on. You could implement a 1-minute tumbling window by taking each event timestamp and rounding it down to the nearest minute to determine the window that it belongs to.
一个滑动窗口具有固定长度,并且每个事件恰好属于一个窗口。例如,如果你有一个 1 分钟的滑动窗口,所有时间戳在 10:03 到 10:03 之间的事件会被分到一个窗口中,10:04 到 10:04 之间的事件会被分到下一个窗口,以此类推。你可以通过将每个事件的时间戳向下舍入到最近的分钟来确定它所属的窗口,从而实现一个 1 分钟的滑动窗口。
Hopping window 跳跃窗口
A hopping window also has a fixed length, but allows windows to overlap in order to provide some smoothing. For example, a 5-minute window with a hop size of 1 minute would contain the events between 10:03 and 10:07, then the next window would cover events between 10:04 and 10:08, and so on. You can implement this hopping window by first calculating 1-minute tumbling windows, and then aggregating over several adjacent windows.
一个跳跃窗口也具有固定长度,但允许窗口重叠以提供一定的平滑效果。例如,一个长度为 5 分钟、跳跃大小为 1 分钟的窗口会包含 10:03 到 10:07 之间的事件,下一个窗口会覆盖 10:04 到 10:08 之间的事件,以此类推。你可以通过首先计算 1 分钟的滑动窗口,然后对几个相邻的窗口进行聚合来实现这个跳跃窗口。
Sliding window 滑动窗口
A sliding window contains all the events that occur within some interval of each other. For example, a 5-minute sliding window would cover events at 10:03 and 10:08, because they are less than 5 minutes apart (note that tumbling and hopping 5-minute windows would not have put these two events in the same window, as they use fixed boundaries). A sliding window can be implemented by keeping a buffer of events sorted by time and removing old events when they expire from the window.
滑动窗口包含在某个时间间隔内发生的事件。例如,一个 5 分钟的滑动窗口会覆盖 10:03 和 10:08 这两个事件,因为它们之间的时间差小于 5 分钟(注意,tumbling 和 hopping 5 分钟窗口不会将这两个事件放在同一个窗口中,因为它们使用固定边界)。滑动窗口可以通过保持一个按时间排序的事件缓冲区来实现,并在事件过期出窗口时移除旧事件。
Session window 会话窗口
Unlike the other window types, a session window has no fixed duration. Instead, it is defined by grouping together all events for the same user that occur closely together in time, and the window ends when the user has been inactive for some time (for example, if there have been no events for 30 minutes). Sessionization is a common requirement for website analytics.
与其他窗口类型不同,会话窗口没有固定持续时间。相反,它通过将同一用户在时间上紧密发生的事件分组在一起来定义,窗口在用户一段时间不活跃时结束(例如,如果 30 分钟内没有事件发生)。会话化是网站分析的一个常见需求。
Stream Joins 流连接#
In “JOIN and GROUP BY” we discussed how batch jobs can join datasets by key, and how such joins form an important part of data pipelines. Since stream processing generalizes data pipelines to incremental processing of unbounded datasets, there is exactly the same need for joins on streams.
在 “JOIN 和 GROUP BY” 中,我们讨论了批处理作业如何通过键连接数据集,以及这种连接如何成为数据管道的重要组成部分。由于流处理将数据管道推广到无界数据集的增量处理,因此流上同样需要连接。
However, the fact that new events can appear anytime on a stream makes joins on streams more challenging than in batch jobs. To understand the situation better, let’s distinguish three different types of joins: stream-stream joins, stream-table joins, and table-table joins. In the following sections we’ll illustrate each by example.
然而,由于新事件可能随时出现在流上,流上的连接比批量作业更具挑战性。为了更好地理解情况,让我们区分三种不同类型的连接:流 - 流连接、流 - 表连接和表 - 表连接。在接下来的几节中,我们将通过示例说明每种连接。
Stream-stream join (window join) 流 - 流连接(窗口连接)#
Say you have a search feature on your website, and you want to detect recent trends in searched-for URLs. Every time someone types a search query, you log an event containing the query and the results returned. Every time someone clicks one of the search results, you log another event recording the click. In order to calculate the click-through rate for each URL in the search results, you need to bring together the events for the search action and the click action, which are connected by having the same session ID. Similar analyses are needed in advertising systems [68].
假设你的网站有一个搜索功能,你想检测搜索中 URL 的近期趋势。每次有人输入搜索查询时,你记录一个包含查询和返回结果的事件。每次有人点击搜索结果中的一个时,你记录另一个记录点击的事件。为了计算搜索结果中每个 URL 的点击率,你需要将搜索操作的事件和点击操作的事件结合起来,这两个事件通过相同的会话 ID 连接。广告系统中也需要类似的分析 [68]。
The click may never come if the user abandons their search, and even if it comes, the time between the search and the click may be highly variable: in many cases it might be a few seconds, but it could be as long as days or weeks (if a user runs a search, forgets about that browser tab, and then returns to the tab and clicks a result sometime later). Due to variable network delays, the click event may even arrive before the search event. You can choose a suitable window for the join—for example, you may choose to join a click with a search if they occur at most one hour apart.
如果用户放弃搜索,点击可能永远不会发生,即使发生,搜索和点击之间的时间间隔也可能高度可变:在许多情况下可能是几秒钟,但可能长达几天或几周(如果用户运行搜索,忘记浏览器标签,然后稍后返回标签并点击结果)。由于网络延迟的变化,点击事件甚至可能在搜索事件之前到达。你可以为连接选择一个合适的窗口 —— 例如,如果你选择在一小时内的点击与搜索进行连接。
Note that embedding the details of the search in the click event is not equivalent to joining the events: doing so would only tell you about the cases where the user clicked a search result, not about the searches where the user did not click any of the results. In order to measure search quality, you need accurate click-through rates, for which you need both the search events and the click events.
请注意,在点击事件中嵌入搜索的详细信息并不等同于事件连接:这样做只能告诉你用户点击了搜索结果的情况,而无法告诉你用户没有点击任何结果的搜索情况。为了衡量搜索质量,你需要准确的点击率,而这需要同时拥有搜索事件和点击事件。
To implement this type of join, a stream processor needs to maintain state: for example, all the events that occurred in the last hour, indexed by session ID. Whenever a search event or click event occurs, it is added to the appropriate index, and the stream processor also checks the other index to see if another event for the same session ID has already arrived. If there is a matching event, you emit an event saying which search result was clicked. If the search event expires without you seeing a matching click event, you emit an event saying which search results were not clicked.
要实现这种类型的连接,流处理程序需要维护状态:例如,按会话 ID 索引的过去一小时内发生的所有事件。每当发生搜索事件或点击事件时,它会被添加到相应的索引中,流处理程序还会检查其他索引,看看是否已经到达了具有相同会话 ID 的另一个事件。如果有匹配的事件,就会发出一个事件,说明哪个搜索结果被点击了。如果搜索事件过期而没有看到匹配的点击事件,就会发出一个事件,说明哪些搜索结果没有被点击。
Stream-table join (stream enrichment) 流 - 表连接(流增强)#
In “JOIN and GROUP BY” (Figure 11-2) we saw an example of a batch job joining two datasets: a set of user activity events and a database of user profiles. It is natural to think of the user activity events as a stream, and to perform the same join on a continuous basis in a stream processor: the input is a stream of activity events containing a user ID, and the output is a stream of activity events in which the user ID has been augmented with profile information about the user. This process is sometimes known as enriching the activity events with information from the database.
在 “JOIN 和 GROUP BY”(图 11-2)中,我们看到一个批量作业的例子,该作业将两个数据集进行连接:一组用户活动事件和用户资料数据库。很自然地,我们会将用户活动事件视为一个流,并在流处理器中持续执行相同的连接操作:输入是一个包含用户 ID 的活动事件流,输出是一个用户 ID 已补充了用户资料信息的活动事件流。这个过程有时被称为使用数据库中的信息丰富活动事件。
To perform this join, the stream process needs to look at one activity event at a time, look up the event’s user ID in the database, and add the profile information to the activity event. The database lookup could be implemented by querying a remote database; however, as discussed in “JOIN and GROUP BY”, such remote queries are likely to be slow and risk overloading the database [57].
为了执行这个连接,流处理器需要逐个查看活动事件,在数据库中查找事件的用户 ID,并将资料信息添加到活动事件中。数据库查找可以通过查询远程数据库来实现;然而,正如在 “JOIN 和 GROUP BY” 中讨论的,这种远程查询可能会很慢,并存在过载数据库的风险 [57]。
Another approach is to load a copy of the database into the stream processor so that it can be queried locally without a network round-trip. This technique is called a hash join since the local copy of the database might be an in-memory hash table if it is small enough, or an index on the local disk.
另一种方法是将数据库的副本加载到流处理器中,以便它可以在本地查询,而无需网络往返。这种技术称为哈希连接,因为如果数据库足够小,其本地副本可能是一个内存中的哈希表,或者是一个本地磁盘上的索引。
The difference from batch jobs is that a batch job uses a point-in-time snapshot of the database as input, whereas a stream processor is long-running, and the contents of the database are likely to change over time, so the stream processor’s local copy of the database needs to be kept up to date. This issue can be solved by change data capture: the stream processor can subscribe to a changelog of the user profile database as well as the stream of activity events. When a profile is created or modified, the stream processor updates its local copy. Thus, we obtain a join between two streams: the activity events and the profile updates.
与批处理作业的不同之处在于,批处理作业使用数据库的某个时间点的快照作为输入,而流处理器是长时间运行的,数据库的内容可能会随时间变化,因此流处理器的本地数据库副本需要保持最新。这个问题可以通过变更数据捕获来解决:流处理器可以订阅用户配置数据库的变更日志以及活动事件流。当配置被创建或修改时,流处理器会更新其本地副本。这样,我们就得到了两个流之间的连接:活动事件和配置更新。
A stream-table join is actually very similar to a stream-stream join; the biggest difference is that for the table changelog stream, the join uses a window that reaches back to the “beginning of time” (a conceptually infinite window), with newer versions of records overwriting older ones. For the stream input, the join might not maintain a window at all.
流表连接实际上与流流连接非常相似;最大的区别在于对于表变更日志流,连接使用一个可以回溯到 “时间起点”(一个概念上的无限窗口),其中较新的记录会覆盖较旧的记录。对于流输入,连接可能根本不维护窗口。
Table-table join (materialized view maintenance) 表表连接(物化视图维护)#
Consider the social network timeline example that we discussed in “Case Study: Social Network Home Timelines”. We said that when a user wants to view their home timeline, it is too expensive to iterate over all the people the user is following, find their recent posts, and merge them.
考虑我们在 “案例研究:社交网络主页时间线” 中讨论的社交网络时间线示例。我们说过,当用户想要查看他们的主页时间线时,迭代用户所关注的所有人、查找他们的最近帖子并将它们合并是非常昂贵的。
Instead, we want a timeline cache: a kind of per-user “inbox” to which posts are written as they are sent, so that reading the timeline is a single lookup. Materializing and maintaining this cache requires the following event processing:
相反,我们想要一个时间线缓存:一种每个用户的 “收件箱”,帖子在发送时被写入其中,这样读取时间线就只需要一次查找。物化并维护这个缓存需要以下事件处理:
- When user u sends a new post, it is added to the timeline of every user who is following u.
当用户 u 发送新帖子时,它会被添加到所有关注 u 的用户的动态时间线中。 - When a user deletes a post, or deletes their entire account, it is removed from all users’ timelines.
当用户删除帖子,或删除整个账户时,它会被从所有用户的动态时间线中移除。 - When user u 1 starts following user u 2, recent posts by u 2 are added to u 1 ’s timeline.
当用户 u 1 开始关注用户 u 2 时,u 2 的近期帖子会被添加到 u 1 的动态时间线中。 - When user u 1 unfollows user u 2, posts by u 2 are removed from u 1 ’s timeline.
当用户 u 1 取消关注用户 u 2 时,u 2 的帖子会被从 u 1 的动态时间线中移除。
To implement this cache maintenance in a stream processor, you need streams of events for posts (sending and deleting) and for follow relationships (following and unfollowing). The stream process needs to maintain a database containing the set of followers for each user so that it knows which timelines need to be updated when a new post arrives.
要在流处理器中实现这种缓存维护,你需要为帖子(发送和删除)以及关注关系(关注和取消关注)准备事件流。流处理过程需要维护一个包含每个用户关注者集合的数据库,以便在收到新帖子时知道哪些时间线需要更新。
Another way of looking at this stream process is that it maintains a materialized view for a query that joins two tables (posts and follows), something like the following:
另一种看待这个流处理过程的方式是,它为查询中连接两个表(posts 和 follows)维护一个物化视图,类似于以下内容:
The join of the streams corresponds directly to the join of the tables in that query. The timelines are effectively a cache of the result of this query, updated every time the underlying tables change.
流的连接直接对应于该查询中表的连接。时间线实际上是该查询结果的缓存,每当底层表发生变化时都会更新。
Note 注意#
If you regard a stream as the derivative of a table, as in Figure 12-7, and regard a join as a product of two tables u·v, something interesting happens: the stream of changes to the materialized join follows the product rule (u·v)′ = u ′ v + uv ′. In words: any change of posts is joined with the current followers, and any change of followers is joined with the current posts [37].
如果你将流视为表的导数,如图 12-7 所示,并将连接视为两个表的乘积 u・v,就会发生一些有趣的事情:物化连接的变化流遵循乘积法则 (u・v)' = u'v + uv'。用语言描述就是:任何对 posts 的更改都会与当前的 followers 连接,任何对 followers 的更改都会与当前的 posts 连接 [37]。
Time-dependence of joins 连接的时间依赖性#
The three types of joins described here (stream-stream, stream-table, and table-table) have a lot in common: they all require the stream processor to maintain some state (search and click events, user profiles, or follower list) based on one join input, and query that state on messages from the other join input.
这里描述的三种连接类型(流 - 流、流 - 表和表 - 表)有很多共同点:它们都要求流处理器根据一个连接输入维护某些状态(搜索和点击事件、用户资料或关注者列表),并在来自另一个连接输入的消息上查询这些状态。
The order of the events that maintain the state is important (it matters whether you first follow and then unfollow, or the other way round). In a sharded event log like Kafka, the ordering of events within a single shard (partition) is preserved, but there is typically no ordering guarantee across different streams or shards.
维护状态的事件的顺序很重要(先关注后取消关注,还是反过来,这都有影响)。在像 Kafka 这样的分片事件日志中,单个分片(分区)内事件的顺序会被保留,但通常没有跨不同流或分片的顺序保证。
This raises a question: if events on different streams happen around a similar time, in which order are they processed? In the stream-table join example, if a user updates their profile, which activity events are joined with the old profile (processed before the profile update), and which are joined with the new profile (processed after the profile update)? Put another way: if state changes over time, and you join with some state, what point in time do you use for the join?
这引发了一个问题:如果不同流上的事件大约在同一时间发生,它们将按什么顺序处理?在流 - 表连接的例子中,如果用户更新了他们的个人资料,哪些活动事件将与旧的个人资料(在个人资料更新之前处理)连接,哪些将与新的个人资料(在个人资料更新之后处理)连接?换句话说:如果状态随时间变化,而你与某个状态连接,你将使用哪个时间点进行连接?
Such time dependence can occur in many places. For example, if you sell things, you need to apply the right tax rate to invoices, which depends on the country or state, the type of product, and the date of sale (since tax rates change from time to time). When joining sales to a table of tax rates, you probably want to join with the tax rate at the time of the sale, which may be different from the current tax rate if you are reprocessing historical data.
这种时间依赖性可能出现在很多地方。例如,如果你销售商品,你需要将正确的税率应用于发票,这取决于国家或州、产品类型以及销售日期(因为税率会随时间变化)。当将销售记录与税率表连接时,你可能希望使用销售时的税率进行连接,如果你正在重新处理历史数据,这个税率可能与当前的税率不同。
If the ordering of events across streams is undetermined, the join becomes nondeterministic [69], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again.
如果跨流的事件顺序不确定,连接操作将变得非确定性 [69],这意味着你无法在相同的输入上重新运行相同的作业并必然得到相同的结果:当你再次运行作业时,输入流上的事件可能会以不同的方式交错。
In data warehouses, this issue is known as a slowly changing dimension (SCD), and it is often addressed by using a unique identifier for a particular version of the joined record: for example, every time the tax rate changes, it is given a new identifier, and the invoice includes the identifier for the tax rate at the time of sale [70,71]. This change makes the join deterministic, but has the consequence that log compaction is not possible, since all versions of the records in the table need to be retained.
在数据仓库中,这个问题被称为缓慢变化维度(SCD),通常通过为连接记录的特定版本使用唯一标识符来解决:例如,每次税率发生变化时,它都会被赋予一个新的标识符,而发票会包含销售时税率的标识符 [70,71]。这种变化使连接操作变得确定性,但随之而来的是日志压缩无法进行,因为表中的所有记录版本都需要被保留。
Fault Tolerance 容错性#
In the final section of this chapter, let’s consider how stream processors can tolerate faults. We saw in Chapter 11 that batch processing frameworks can tolerate faults fairly easily: if a task fails, it can simply be started again on another machine, and the output of the failed task is discarded. This transparent retry is possible because input files are immutable, each task writes its output to a separate file, and output is only made visible when a task completes successfully.
在本章的最后一节,我们来考虑流处理器如何容忍故障。我们在第 11 章中看到,批处理框架可以相当容易地容忍故障:如果任务失败,它只需在另一台机器上重新启动,而失败的任务的输出将被丢弃。这种透明的重试之所以可能,是因为输入文件是不可变的,每个任务将其输出写入一个单独的文件,并且只有当任务成功完成时,输出才会可见。
In particular, the batch approach to fault tolerance ensures that the output of the batch job is the same as if nothing had gone wrong, even if in fact some tasks did fail. It appears as though every input record was processed exactly once—no records are skipped, and none are processed twice. Although restarting tasks means that records may in fact be processed multiple times, the visible effect in the output is as if they had only been processed once. This principle is known as exactly-once semantics, although effectively-once would be a more descriptive term [72].
特别是,批处理方法对故障的容忍确保批处理作业的输出与实际上没有任何问题时相同,即使实际上某些任务确实失败了。它看起来好像每个输入记录都只被处理了一次 —— 没有记录被跳过,也没有记录被处理两次。虽然重新启动任务意味着记录实际上可能被多次处理,但在输出中可见的效果是它们只被处理了一次。这个原则被称为 “一次一例” 语义,尽管 “有效一次” 将是一个更描述性的术语 [72]。
The same issue of fault tolerance arises in stream processing, but it is less straightforward to handle: waiting until a task is finished before making its output visible is not an option, because a stream is infinite and so you can never finish processing it.
流处理中同样会出现容错问题,但处理起来不那么直接:等待任务完成后再使其输出可见并不可行,因为流是无限的,所以你永远无法完成处理。
Microbatching and checkpointing 微批处理和检查点#
One solution is to break the stream into small blocks, and treat each block like a miniature batch process. This approach is called microbatching, and it is used in Spark Streaming [73]. The batch size is typically around one second, which is the result of a performance compromise: smaller batches incur greater scheduling and coordination overhead, while larger batches mean a longer delay before results of the stream processor become visible.
一种解决方案是将流拆分成小块,并将每个块像微型批处理一样处理。这种方法称为微批处理,并在 Spark Streaming [73] 中使用。批处理大小通常约为一秒,这是性能折衷的结果:较小的批次会导致更大的调度和协调开销,而较大的批次则意味着流处理器的结果可见性延迟更长。
Microbatching also implicitly provides a tumbling window equal to the batch size (windowed by processing time, not event timestamps); any jobs that require larger windows need to explicitly carry over state from one microbatch to the next.
微批处理还隐式地提供了等于批处理大小的滚动窗口(按处理时间而非事件时间进行窗口化);任何需要较大窗口的工作都需要显式地将状态从一个微批次传递到下一个微批次。
A variant approach, used in Apache Flink, is to periodically generate rolling checkpoints of state and write them to durable storage [74,75]. If a stream operator crashes, it can restart from its most recent checkpoint and discard any output generated between the last checkpoint and the crash. The checkpoints are triggered by barriers in the message stream, similar to the boundaries between microbatches, but without forcing a particular window size.
一种变体方法,在 Apache Flink 中使用的是定期生成状态滚动检查点并将其写入持久化存储 [74, 75]。如果流操作符崩溃,它可以从最新的检查点重新启动,并丢弃在最后一个检查点和崩溃之间生成的任何输出。检查点由消息流中的屏障触发,类似于微批次的边界,但不会强制特定的窗口大小。
Within the confines of the stream processing framework, the microbatching and checkpointing approaches provide the same exactly-once semantics as batch processing. However, as soon as output leaves the stream processor (for example, by writing to a database, sending messages to an external message broker, or sending emails), the framework is no longer able to discard the output of a failed microbatch. In this case, restarting a failed task causes the external side effect to happen twice, and microbatching or checkpointing alone is not sufficient to prevent this problem.
在流处理框架的范围内,微批处理和检查点方法与批处理提供完全相同的精确一次语义。然而,一旦输出离开流处理器(例如,通过写入数据库、向外部消息代理发送消息或发送电子邮件),框架就不再能够丢弃失败微批次的输出。在这种情况下,重新启动失败的任务会导致外部副作用发生两次,仅靠微批处理或检查点不足以防止这个问题。
Atomic commit revisited 原子提交再探讨#
In order to give the appearance of exactly-once processing in the presence of faults, we need to ensure that all outputs and side effects of processing an event take effect if and only if the processing is successful. Those effects include any messages sent to downstream operators or external messaging systems (including email or push notifications), any database writes, any changes to operator state, and any acknowledgment of input messages (including moving the consumer offset forward in a log-based message broker).
为了在出现故障时呈现出精确一次处理的表象,我们需要确保处理事件的所有输出和副作用只有在处理成功时才会生效。这些效果包括向下游操作者或外部消息系统发送的任何消息(包括电子邮件或推送通知)、任何数据库写入、对操作者状态的任何更改,以及对输入消息的任何确认(包括在基于日志的消息代理中向前移动消费者偏移量)。
Those things either all need to happen atomically, or none of them must happen, but they should not go out of sync with each other. If this approach sounds familiar, it is because we discussed it in “Exactly-once message processing” in the context of distributed transactions and two-phase commit.
那些事情要么必须全部原子性地发生,要么一个都不能发生,但它们之间不应该出现不同步的情况。如果这个方法听起来熟悉,那是因为我们在 “精确一次消息处理” 的上下文中讨论过它,讨论的是分布式事务和两阶段提交。
In Chapter 10 we discussed the problems in the traditional implementations of distributed transactions, such as XA. However, in more restricted environments it is possible to implement such an atomic commit facility efficiently. This approach is used in Google Cloud Dataflow [65,74], VoltDB [76], and Apache Kafka [77,78]. Unlike XA, these implementations do not attempt to provide transactions across heterogeneous technologies, but instead keep the transactions internal by managing both state changes and messaging within the stream processing framework. The overhead of the transaction protocol can be amortized by processing several input messages within a single transaction.
在第 10 章中,我们讨论了分布式事务传统实现(如 XA)中的问题。然而,在更受限的环境中,可以高效地实现这种原子提交机制。这种方法被用于 Google Cloud Dataflow [65, 74]、VoltDB [76] 和 Apache Kafka [77, 78]。与 XA 不同,这些实现并不试图提供跨异构技术的交易,而是通过在流处理框架内部管理状态变化和消息来保持交易内部性。通过在单个交易中处理多个输入消息,可以分摊交易协议的开销。
Idempotence 幂等性#
Our goal is to discard the partial output of any failed tasks so that they can be safely retried without taking effect twice. Distributed transactions are one way of achieving that goal, but another way is to rely on idempotence, as we saw in “Durable Execution and Workflows” [79].
我们的目标是丢弃任何失败任务的中间输出,以便它们可以安全地重试而不产生重复效果。分布式事务是实现这一目标的一种方式,另一种方式是依赖幂等性,正如我们在 “持久执行和工作流”[79] 中所看到的。
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once. For example, setting a key in a key-value store to some fixed value is idempotent (writing the value again simply overwrites the value with an identical value), whereas incrementing a counter is not idempotent (performing the increment again means the value is incremented twice).
幂等操作是指可以多次执行,且其效果与只执行一次相同。例如,在键值存储中将一个键设置为某个固定值是幂等的(再次写入值只是用相同的值覆盖原来的值),而计数器自增则不是幂等的(再次执行自增意味着值被增加了两次)。
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
即使一个操作本身不是天然的幂等的,通常也可以通过一些额外的元数据使其变得幂等。例如,在从 Kafka 消费消息时,每条消息都有一个持久的、单调递增的偏移量。当向外部数据库写入值时,你可以包含触发上次写入的消息的偏移量。这样,你就可以判断更新是否已经应用过,并避免重复执行相同的更新。
The state handling in Storm’s Trident is based on a similar idea. Relying on idempotence implies several assumptions: restarting a failed task must replay the same messages in the same order (a log-based message broker does this), the processing must be deterministic, and no other node may concurrently update the same value [80,81].
Storm 的 Trident 中的状态处理基于类似的想法。依赖幂等性意味着几个假设:重新启动失败的任务必须以相同的顺序重放相同的消息(基于日志的消息代理会这样做),处理必须是确定性的,且没有其他节点可以同时更新相同的值 [80, 81]。
When failing over from one processing node to another, fencing may be required (see “Distributed Locks and Leases”) to prevent interference from a node that is thought to be dead but is actually alive. Despite all those caveats, idempotent operations can be an effective way of achieving exactly-once semantics with only a small overhead.
在从一个处理节点切换到另一个处理节点时,可能需要实施围栏措施(参见 “分布式锁和租约”)以防止一个被认为已死但实际上还存活的节点产生干扰。尽管存在这些注意事项,幂等操作可以是一种以较小的开销实现精确一次语义的有效方法。
Rebuilding state after a failure 失败后重建状态#
Any stream process that requires state—for example, any windowed aggregations (such as counters, averages, and histograms) and any tables and indexes used for joins—must ensure that this state can be recovered after a failure.
任何需要状态的流处理 —— 例如,任何窗口聚合(如计数器、平均值和直方图)以及用于连接的任何表和索引 —— 都必须确保在故障发生后能够恢复此状态。
One option is to keep the state in a remote datastore and replicate it, although having to query a remote database for each individual message can be slow. An alternative is to keep state local to the stream processor, and replicate it periodically. Then, when the stream processor is recovering from a failure, the new task can read the replicated state and resume processing without data loss.
一种选择是将状态保存在远程数据存储中并进行复制,尽管为每个单独的消息查询远程数据库可能会很慢。另一种选择是将状态保存在流处理器的本地,并定期复制。然后,当流处理器从故障中恢复时,新任务可以读取复制的状态并继续处理而不会丢失数据。
For example, Flink periodically captures snapshots of operator state and writes them to durable storage such as a distributed filesystem [74,75], and Kafka Streams replicates state changes by sending them to a dedicated Kafka topic with log compaction, similar to change data capture [82]. VoltDB replicates state by redundantly processing each input message on several nodes (see “Actual Serial Execution”).
例如,Flink 定期捕获算子状态的快照并将其写入持久化存储,如分布式文件系统 [74, 75],而 Kafka Streams 通过将状态变更发送到具有日志压缩的专用 Kafka 主题来复制状态变更,类似于变更数据捕获 [82]。VoltDB 通过在多个节点上冗余地处理每个输入消息来复制状态(参见 “实际串行执行”)。
In some cases, it may not even be necessary to replicate the state, because it can be rebuilt from the input streams. For example, if the state consists of aggregations over a fairly short window, it may be fast enough to simply replay the input events corresponding to that window. If the state is a local replica of a database, maintained by change data capture, the database can also be rebuilt from the log-compacted change stream.
在某些情况下,甚至可能没有必要复制状态,因为可以从输入流中重新构建状态。例如,如果状态包含对相对较短窗口的聚合,那么重新播放与该窗口对应的输入事件可能就足够快。如果状态是数据库的本地副本,并由变更数据捕获维护,那么数据库也可以从日志压缩的变更流中重新构建。
However, all of these trade-offs depend on the performance characteristics of the underlying infrastructure: in some systems, network delay may be lower than disk access latency, and network bandwidth may be comparable to disk bandwidth. There is no universally ideal trade-off for all situations, and the merits of local versus remote state may also shift as storage and networking technologies evolve.
然而,所有这些权衡都取决于底层基础设施的性能特征:在某些系统中,网络延迟可能低于磁盘访问延迟,网络带宽可能与磁盘带宽相当。没有适用于所有情况的普遍理想的权衡方案,随着存储和网络技术的演进,本地状态与远程状态的优势也可能发生变化。
Summary 摘要#
In this chapter we have discussed event streams, what purposes they serve, and how to process them. In some ways, stream processing is very much like the batch processing we discussed in Chapter 11, but done continuously on unbounded (never-ending) streams rather than on a fixed-size input. From this perspective, message brokers and event logs serve as the streaming equivalent of a filesystem.
在本章中,我们讨论了事件流、它们的作用以及如何处理它们。在某些方面,流处理与我们在第 11 章讨论的批处理非常相似,但它是连续在无界(永无止境)的流上进行的,而不是在固定大小的输入上。从这个角度来看,消息代理和事件日志充当了文件系统的流式等效物。
We spent some time comparing two types of message brokers:
我们花了一些时间比较了两种类型的消息代理:
AMQP/JMS-style message broker
AMQP / JMS 风格的消息代理
The broker assigns individual messages to consumers, and consumers acknowledge individual messages when they have been successfully processed. Messages are deleted from the broker once they have been acknowledged. This approach is appropriate as an asynchronous form of RPC (see also “Event-Driven Architectures”), for example in a task queue, where the exact order of message processing is not important and where there is no need to go back and read old messages again after they have been processed.
代理将单个消息分配给消费者,消费者在成功处理消息后进行确认。一旦消息被确认,它就会从代理中删除。这种方法适合用作异步形式的 RPC(另见 “事件驱动架构”),例如在一个任务队列中,其中消息处理的顺序并不重要,并且不需要在处理后再回读已处理的老消息。
Log-based message broker
基于日志的消息代理
The broker assigns all messages in a shard to the same consumer node, and always delivers messages in the same order. Parallelism is achieved through sharding, and consumers track their progress by checkpointing the offset of the last message they have processed. The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
消息代理将分区内所有消息分配给同一个消费者节点,并始终按相同顺序传递消息。通过分片实现并行性,消费者通过检查点记录已处理消息的偏移量来跟踪进度。消息代理将消息保留在磁盘上,因此如有必要可以回跳并重读旧消息。
The log-based approach has similarities to the replication logs found in databases (see Chapter 6) and log-structured storage engines (see Chapter 4). It is also a form of consensus, as we saw in Chapter 10. We saw that this approach is especially appropriate for stream processing systems that consume input streams and generate derived state or derived output streams.
基于日志的方法与数据库中的复制日志(见第 6 章)和日志结构化存储引擎(见第 4 章)有相似之处。它也是一种共识形式,正如我们在第 10 章所见。我们看到这种方法特别适用于消费输入流并生成衍生状态或衍生输出流的流处理系统。
In terms of where streams come from, we discussed several possibilities: user activity events, sensors providing periodic readings, and data feeds (e.g., market data in finance) are naturally represented as streams. We saw that it can also be useful to think of the writes to a database as a stream: we can capture the changelog—i.e., the history of all changes made to a database—either implicitly through change data capture or explicitly through event sourcing. Log compaction allows the stream to retain a full copy of the contents of a database.
在流数据的来源方面,我们讨论了多种可能性:用户活动事件、提供周期性读数的传感器以及数据源(例如金融市场的市场数据)自然地表现为流。我们看到,将数据库的写入也视为流也是有益的:我们可以捕获变更日志 —— 即数据库所有变更的历史记录 —— 无论是通过变更数据捕获隐式地捕获,还是通过事件溯源显式地捕获。日志压缩允许流保留数据库内容的完整副本。
Representing databases as streams opens up powerful opportunities for integrating systems. You can keep derived data systems such as search indexes, caches, and analytics systems continually up to date by consuming the log of changes and applying them to the derived system. You can even build fresh views onto existing data by starting from scratch and consuming the log of changes from the beginning all the way to the present.
将数据库表示为流为系统集成开辟了强大的机会。你可以通过消费变更日志并将其应用于派生系统,来持续更新搜索索引、缓存和分析系统等派生数据系统。你甚至可以从头开始,消费从变更日志的起始到当前的变更日志,从而构建现有数据的新视图。
The facilities for maintaining state as streams and replaying messages are also the basis for the techniques that enable stream joins and fault tolerance in various stream processing frameworks. We discussed several purposes of stream processing, including searching for event patterns (complex event processing), computing windowed aggregations (stream analytics), and keeping derived data systems up to date (materialized views).
维护状态作为流以及重放消息的设施也是实现流连接和容错的各种流处理框架中相关技术的基石。我们讨论了流处理的几个目的,包括搜索事件模式(复杂事件处理)、计算窗口聚合(流分析)以及保持衍生数据系统更新(物化视图)。
We then discussed the difficulties of reasoning about time in a stream processor, including the distinction between processing time and event timestamps, and the problem of dealing with straggler events that arrive after you thought your window was complete.
然后我们讨论了在流处理程序中推理时间的困难,包括处理时间和事件时间戳之间的区别,以及处理在你认为窗口完成之后才到达的延迟事件的问题。
We distinguished three types of joins that may appear in stream processes:
我们在流处理中区分了三种可能出现的连接类型:
Stream-stream joins 流 - 流连接
Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time. For example, it may match two actions taken by the same user within 30 minutes of each other. The two join inputs may in fact be the same stream (a self-join) if you want to find related events within that one stream.
两个输入流都由活动事件组成,连接操作符会在一定时间窗口内搜索相关事件。例如,它可能匹配同一用户在 30 分钟内采取的两个动作。如果想要在一个流内找到相关事件,两个连接输入实际上可以是同一个流(自连接)。
Stream-table joins 流 - 表连接
One input stream consists of activity events, while the other is a database changelog. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.
一个输入流由活动事件组成,另一个是数据库变更日志。变更日志保持数据库的本地副本更新。对于每个活动事件,连接操作符查询数据库并输出一个丰富化的活动事件。
Table-table joins 表 - 表连接
Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.
两个输入流都是数据库变更日志。在这种情况下,一侧的每个变更都与另一侧的最新状态进行连接。结果是两个表之间连接的物化视图的变更流。
Finally, we discussed techniques for achieving fault tolerance and exactly-once semantics in a stream processor. As with batch processing, we need to discard the partial output of any failed tasks. However, since a stream process is long-running and produces output continuously, we can’t simply discard all output. Instead, a finer-grained recovery mechanism can be used, based on microbatching, checkpointing, transactions, or idempotent writes.
最后,我们讨论了在流处理程序中实现容错性和精确一次语义的技术。与批量处理一样,我们需要丢弃任何失败任务的中间输出。然而,由于流处理是长时间运行的并持续产生输出,我们不能简单地丢弃所有输出。相反,可以使用基于微批处理、检查点、事务或幂等写入的更细粒度的恢复机制。
Footnotes 脚注#
References 参考文献#
[1] Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, Rafael J. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, and Sam Whittle.The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing.Proceedings of the VLDB Endowment, volume 8, issue 12, pages 1792–1803, August 2015.doi.14778/2824032.2824076
[1] Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, Rafael J. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, 和 Sam Whittle. 数据流模型:大规模、无界、乱序数据处理中平衡正确性、延迟和成本的一种实用方法. VLDB 基金会会议录, 卷 8, 第 12 期, 第 1792–1803 页, 2015 年 8 月. doi.14778/2824032.2824076
[2] Harold Abelson, Gerald Jay Sussman, and Julie Sussman. Structure and Interpretation of Computer Programs, 2nd edition. MIT Press, 1996. ISBN: 978-0-262-51087-5, archived at archive.org/details/sicp_20211010
[2] Harold Abelson, Gerald Jay Sussman, 和 Julie Sussman. 计算机程序的构造和解释(第二版). MIT 出版社, 1996. ISBN: 978-0-262-51087-5, 存档于 archive.org/details/sicp_20211010
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec.The Many Faces of Publish/Subscribe.ACM Computing Surveys, volume 35, issue 2, pages 114–131, June 2003.doi.1145/857076.857078
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, 和 Anne-Marie Kermarrec. 发布 / 订阅的多种面貌. ACM 计算机调查, 卷 35, 第 2 期, 第 114–131 页, 2003 年 6 月. doi.1145/857076.857078
[4] Don Carney, Uğur Çetintemel, Mitch Cherniack, Christian Convey, Sangdon Lee, Greg Seidman, Michael Stonebraker, Nesime Tatbul, and Stan Zdonik.Monitoring Streams – A New Class of Data Management Applications. At 28th International Conference on Very Large Data Bases (VLDB), August 2002.doi.1016/B978-155860869-6/50027-5
[4] Don Carney, Uğur Çetintemel, Mitch Cherniack, Christian Convey, Sangdon Lee, Greg Seidman, Michael Stonebraker, Nesime Tatbul 和 Stan Zdonik. 监控流 —— 一种新的数据管理应用类别。在 2002 年 8 月的第 28 届国际大型数据库会议(VLDB)上。doi.1016/B978-155860869-6/50027-5
[5] Matthew Sackman.Pushing Back.wellquite.org, May 2016. Archived at perma.cc/3KCZ-RUFY
[5] Matthew Sackman. 推回。wellquite.org, 2016 年 5 月。存档于 perma.cc / 3KCZ - RUFY
[6] Thomas Figg (tef).how (not) to write a pipeline. cohost.org, June 2023. Archived at perma.cc/A3V8-NYCM
[6] Thomas Figg (tef). 如何(不)编写管道。cohost.org, 2023 年 6 月。存档于 perma.cc / A3V8 - NYCM
[7] Vicent Martí.Brubeck, a statsd-Compatible Metrics Aggregator. github.blog, June 2015. Archived at perma.cc/TP3Q-DJYM
[7] Vicent Martí. Brubeck,一个 statsd 兼容的指标聚合器。github.blog, 2015 年 6 月。存档于 perma.cc / TP3Q - DJYM
[8] Seth Lowenberger.MoldUDP64 Protocol Specification V 1.00. nasdaqtrader.com, July 2009. Archived at https://perma.cc/7CRQ-QBD7
[8] Seth Lowenberger. MoldUDP64 协议规范 V 1.00. nasdaqtrader.com, 2009 年 7 月. 归档于 https://perma.cc/7CRQ-QBD7
[9] Ian Malpass.Measure Anything, Measure Everything. codeascraft.com, February 2011. Archived at archive.org
[9] Ian Malpass. 测量一切,测量所有. codeascraft.com, 2011 年 2 月. 归档于 archive.org
[10] Dieter Plaetinck.25 Graphite, Grafana and statsd Gotchas. grafana.com, March 2016. Archived at perma.cc/3NP3-67U7
[10] Dieter Plaetinck. 25 个 Graphite、Grafana 和 statsd 的陷阱. grafana.com, 2016 年 3 月. 归档于 perma.cc / 3NP3 - 67U7
[11] Jeff Lindsay.Web Hooks to Revolutionize the Web. progrium.com, May 2007. Archived at perma.cc/BF9U-XNX4
[11] Jeff Lindsay. 通过 Web 钩子革新互联网. progrium.com, 2007 年 5 月. 归档于 perma.cc / BF9U - XNX4
[12] Jim N. Gray.Queues Are Databases. Microsoft Research Technical Report MSR-TR-95-56, December 1995. Archived at arxiv.org
[13] Mark Hapner, Rich Burridge, Rahul Sharma, Joseph Fialli, Kate Stout, and Nigel Deakin. JSR-343 Java Message Service (JMS) 2.0 Specification. jms-spec.java.net, March 2013. Archived at perma.cc/E4YG-46TA
[14] Sanjay Aiyagari, Matthew Arrott, Mark Atwell, Jason Brome, Alan Conway, Robert Godfrey, Robert Greig, Pieter Hintjens, John O’Hara, Matthias Radestock, Alexis Richardson, Martin Ritchie, Shahrokh Sadjadi, Rafael Schloming, Steven Shaw, Martin Sustrik, Carl Trieloff, Kim van der Riet, and Steve Vinoski.AMQP: Advanced Message Queuing Protocol Specification. Version 0-9-1, November 2008. Archived at perma.cc/6YJJ-GM9X
[15] Architectural overview of Pub/Sub. cloud.google.com, 2025. Archived at perma.cc/VWF5-ABP4
[16] Aris Tzoumas.Lessons from scaling PostgreSQL queues to 100k events per second. rudderstack.com, July 2025. Archived at perma.cc/QD8C-VA4Y
[16] Aris Tzoumas. 从将 PostgreSQL 队列扩展到每秒 10 万事件的经验教训。rudderstack.com,2025 年 7 月。存档于 perma.cc / QD8C - VA4Y
[17] Robin Moffatt.Kafka Connect Deep Dive – Error Handling and Dead Letter Queues. confluent.io, March 2019. Archived at perma.cc/KQ5A-AB28
[17] Robin Moffatt. Kafka Connect 深入解析 —— 错误处理和死信队列. confluent.io, 2019 年 3 月. 归档于 perma.cc / KQ5A - AB28
[18] Dunith Danushka.Message reprocessing: How to implement the dead letter queue. redpanda.com. Archived at perma.cc/R7UB-WEWF
[18] Dunith Danushka. 消息重处理:如何实现死信队列. redpanda.com. 归档于 perma.cc / R7UB - WEWF
[19] Damien Gasparina, Loic Greffier, and Sebastien Viale.KIP-1034: Dead letter queue in Kafka Streams. cwiki.apache.org, April 2024. Archived at perma.cc/3VXV-QXAN
[19] Damien Gasparina, Loic Greffier, 和 Sebastien Viale. KIP-1034: Kafka Streams 中的死信队列. cwiki.apache.org, 2024 年 4 月. 归档于 perma.cc / 3VXV - QXAN
[20] Jay Kreps, Neha Narkhede, and Jun Rao.Kafka: A Distributed Messaging System for Log Processing. At 6th International Workshop on Networking Meets Databases (NetDB), June 2011. Archived at perma.cc/CSW7-TCQ5
[20] Jay Kreps, Neha Narkhede, 和 Jun Rao. Kafka:一个用于日志处理的分布式消息系统. 在第 6 届网络与数据库国际研讨会 (NetDB) 上,2011 年 6 月. 归档于 perma.cc/CSW7-TCQ5
[21] Jay Kreps.Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines). engineering.linkedin.com, April 2014. Archived at archive.org
[21] Jay Kreps. Benchmarking Apache Kafka: 每秒 200 万次写入(在三台廉价机器上)。engineering.linkedin.com,2014 年 4 月。存档于 archive.org
[22] Kartik Paramasivam.How We’re Improving and Advancing Kafka at LinkedIn. engineering.linkedin.com, September 2015. Archived at perma.cc/3S3V-JCYJ
[22] Kartik Paramasivam. 我们如何在 LinkedIn 改进和推进 Kafka。engineering.linkedin.com,2015 年 9 月。存档于 perma.cc / 3S3V - JCYJ
[23] Philippe Dobbelaere and Kyumars Sheykh Esmaili.Kafka versus RabbitMQ: A comparative study of two industry reference publish/subscribe implementations. At 11th ACM International Conference on Distributed and Event-based Systems (DEBS), June 2017.doi.1145/3093742.3093908
[23] Philippe Dobbelaere 和 Kyumars Sheykh Esmaili. Kafka 与 RabbitMQ:两个行业参考发布 / 订阅实现的比较研究。在第 11 届 ACM 国际分布式与事件系统会议(DEBS),2017 年 6 月。doi.1145/3093742.3093908
[24] Kate Holterhoff.Why Message Queues Endure: A History. redmonk.com, December 2024. Archived at perma.cc/6DX8-XK4W
[24] Kate Holterhoff. 消息队列为何经久不衰:历史回顾。redmonk.com,2024 年 12 月。存档于 perma.cc / 6DX8 - XK4W
[25] Andrew Schofield.KIP-932: Queues for Kafka.cwiki.apache.org, May 2023. Archived at perma.cc/LBE4-BEMK
[25] Andrew Schofield. KIP-932: Kafka 的队列。cwiki.apache.org, 2023 年 5 月。存档于 perma.cc/LBE4-BEMK
[26] Jack Vanlightly.The advantages of queues on logs.jack-vanlightly.com, October 2023. Archived at perma.cc/WJ7V-287K
[26] Jack Vanlightly. 《队列在日志上的优势》. jack-vanlightly.com, 2023 年 10 月. 归档于 perma.cc / WJ7V - 287K
[27] Jay Kreps.The Log: What Every Software Engineer Should Know About Real-Time Data’s Unifying Abstraction.engineering.linkedin.com, December 2013. Archived at perma.cc/2JHR-FR64
[27] Jay Kreps. 《日志:每个软件工程师都应该了解的实时数据统一抽象》. engineering.linkedin.com, 2013 年 12 月. 归档于 perma.cc / 2JHR - FR64
[28] Andy Hattemer.Change Data Capture is having a moment. Why?materialize.com, September 2021. Archived at perma.cc/AL37-P53C
[28] Andy Hattemer. 《变更数据捕获正迎来高潮。为什么?》. materialize.com, 2021 年 9 月. 归档于 perma.cc/AL37-P53C
[29] Prem Santosh Udaya Shankar.Streaming MySQL Tables in Real-Time to Kafka. engineeringblog.yelp.com, August 2016. Archived at perma.cc/5ZR3-2GVV
[29] Prem Santosh Udaya Shankar. 《实时将 MySQL 表流式传输到 Kafka》. engineeringblog.yelp.com, 2016 年 8 月. 归档于 perma.cc / 5ZR3 - 2GVV
[30] Andreas Andreakis, Ioannis Papapanagiotou.DBLog: A Watermark Based Change-Data-Capture Framework. October 2020. Archived at arxiv.org
[30] Andreas Andreakis, Ioannis Papapanagiotou. DBLog: 基于水印的变更数据捕获框架. 2020 年 10 月. 归档于 arxiv.org
[31] Jiri Pechanec.Percolator.debezium.io, October 2021. Archived at perma.cc/EQ8E-W6KQ
[31] Jiri Pechanec. Percolator. debezium.io, 2021 年 10 月. 归档于 perma.cc / EQ8E - W6KQ
[32] Debezium maintainers.Debezium Connector for Cassandra. debezium.io. Archived at perma.cc/WR6K-EKMD
[32] Debezium 维护者. Debezium Cassandra 连接器. debezium.io. 归档于 perma.cc / WR6K - EKMD
[33] Neha Narkhede.Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines. confluent.io, February 2016. Archived at perma.cc/8WXJ-L6GF
[33] Neha Narkhede. 宣布 Kafka Connect:构建大规模低延迟数据管道. confluent.io, 2016 年 2 月. 归档于 perma.cc / 8WXJ - L6GF
[34] Chris Riccomini.Kafka change data capture breaks database encapsulation. cnr.sh, November 2018. Archived at perma.cc/P572-9MKF
[34] Chris Riccomini. Kafka 数据变更捕获破坏数据库封装。cnr.sh,2018 年 11 月。存档于 perma.cc/P572-9MKF
[35] Gunnar Morling.“Change Data Capture Breaks Encapsulation”. Does it, though?decodable.co, November 2023. Archived at perma.cc/YX2P-WNWR
[35] Gunnar Morling. “数据变更捕获破坏封装”。真的吗?decodable.co,2023 年 11 月。存档于 perma.cc / YX2P - WNWR
[36] Gunnar Morling.Revisiting the Outbox Pattern. decodable.co, October 2024. Archived at perma.cc/M5ZL-RPS9
[36] Gunnar Morling. 重访发送箱模式。decodable.co,2024 年 10 月。存档于 perma.cc / M5ZL - RPS9
[37] Ashish Gupta and Inderpal Singh Mumick.Maintenance of Materialized Views: Problems, Techniques, and Applications. IEEE Data Engineering Bulletin, volume 18, issue 2, pages 3–18, June 1995. Archived at archive.org
[37] Ashish Gupta 和 Inderpal Singh Mumick. 物化视图的维护:问题、技术和应用。IEEE 数据工程简报,第 18 卷,第 2 期,第 3-18 页,1995 年 6 月。存档于 archive.org
[38] Mihai Budiu, Tej Chajed, Frank McSherry, Leonid Ryzhyk, Val Tannen.DBSP: Incremental Computation on Streams and Its Applications to Databases.SIGMOD Record, volume 53, issue 1, pages 87–95, March 2024.doi.1145/3665252.3665271
[38] Mihai Budiu, Tej Chajed, Frank McSherry, Leonid Ryzhyk, Val Tannen. DBSP:流数据的增量计算及其在数据库中的应用。SIGMOD 记录,第 53 卷,第 1 期,第 87-95 页,2024 年 3 月。doi.1145/3665252.3665271
[39] Pat Helland.Immutability Changes Everything. At 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
[39] Pat Helland. 不可变性改变一切。在第七届创新数据系统研究双年会议(CIDR)上,2015 年 1 月。
[40] Martin Kleppmann.Accounting for Computer Scientists. martin.kleppmann.com, March 2011. Archived at perma.cc/9EGX-P38N
[40] Martin Kleppmann. 计算机科学家的会计学。martin.kleppmann.com,2011 年 3 月。存档于 perma.cc / 9EGX - P38N
[41] Martin Kleppmann.Making Sense of Stream Processing. Report, O’Reilly Media, May 2016. Archived at perma.cc/RAY4-JDVX
[41] Martin Kleppmann. 流处理的意义。报告,O’Reilly Media,2016 年 5 月。存档于 perma.cc/RAY4-JDVX
[42] Kartik Paramasivam.Stream Processing Hard Problems – Part 1: Killing Lambda. engineering.linkedin.com, June 2016. Archived at archive.org
[42] Kartik Paramasivam. 流处理难题 —— 第一部分:终结 Lambda. engineering.linkedin.com, 2016 年 6 月. 已归档于 archive.org
[43] Stéphane Derosiaux.CQRS: What? Why? How?sderosiaux.medium.com, September 2019. Archived at perma.cc/FZ3U-HVJ4
[43] Stéphane Derosiaux. CQRS:是什么?为什么?如何实现?sderosiaux.medium.com, 2019 年 9 月. 已归档于 perma.cc / FZ3U - HVJ4
[44] Baron Schwartz.Immutability, MVCC, and Garbage Collection. xaprb.com, December 2013. Archived at archive.org
[44] Baron Schwartz. 不可变性、MVCC 和垃圾回收. xaprb.com, 2013 年 12 月. 已归档于 archive.org
[45] Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.Re: Turning the Database Inside-out with Apache Samza. Hacker News discussion, news.ycombinator.com, March 2015. Archived at perma.cc/ML9E-JC83
[45] Daniel Eloff, Slava Akhmechet, Jay Kreps 等. Re: 使用 Apache Samza 将数据库逆向设计. Hacker News 讨论, news.ycombinator.com, 2015 年 3 月. 已归档于 perma.cc / ML9E - JC83
[46] Datomic Documentation: Excision. Cognitect, Inc., docs.datomic.com. Archived at perma.cc/J5QQ-SH32
[46] Datomic 文档:截断。Cognitect, Inc., docs.datomic.com。存档于 perma.cc / J5QQ - SH32
[47] Fossil Documentation: Deleting Content from Fossil. fossil-scm.org, 2025. Archived at perma.cc/DS23-GTNG
[47] Fossil 文档:从 Fossil 中删除内容。fossil-scm.org, 2025。存档于 perma.cc/DS23-GTNG
[48] Jay Kreps.The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard.x.com, March 2015. Archived at perma.cc/7RRZ-V7B7
[48] Jay Kreps。分布式系统的讽刺之处在于数据丢失很容易,但删除数据却出乎意料地困难。x.com, 2015 年 3 月。存档于 perma.cc / 7RRZ - V7B7
[49] Brent Robinson.Crypto shredding: How it can solve modern data retention challenges. medium.com, January 2019. Archived at https://perma.cc/4LFK-S6XE
[49] Brent Robinson。加密销毁:如何解决现代数据保留挑战。medium.com, 2019 年 1 月。存档于 https://perma.cc/4LFK-S6XE
[50] Matthew D. Green and Ian Miers.Forward Secure Asynchronous Messaging from Puncturable Encryption. At IEEE Symposium on Security and Privacy, May 2015.doi.1109/SP.2015.26
[50] Matthew D. Green 和 Ian Miers. 可中断加密的向前安全异步消息传递. 在 IEEE 安全与隐私研讨会, 2015 年 5 月. doi.1109/SP.2015.26
[51] David C. Luckham.What’s the Difference Between ESP and CEP?complexevents.com, June 2019. Archived at perma.cc/E7PZ-FDEF
[51] David C. Luckham. ESP 和 CEP 有什么区别? complexevents.com, 2019 年 6 月. 存档于 perma.cc / E7PZ - FDEF
[52] Arvind Arasu, Shivnath Babu, and Jennifer Widom.The CQL Continuous Query Language: Semantic Foundations and Query Execution. The VLDB Journal, volume 15, issue 2, pages 121–142, June 2006.doi.1007/s00778-004-0147-z
[52] Arvind Arasu, Shivnath Babu 和 Jennifer Widom. CQL 连续查询语言:语义基础和查询执行. VLDB 期刊, 第 15 卷, 第 2 期, 第 121-142 页, 2006 年 6 月. doi.1007/s00778-004-0147-z
[53] Julian Hyde.Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch. ACM Queue, volume 7, issue 11, December 2009.doi.1145/1661785.1667562
[53] Julian Hyde. 数据在传输中:流式 SQL 技术如何帮助解决 Web 2.0 数据处理难题. ACM Queue, 第 7 卷, 第 11 期, 2009 年 12 月. doi.1145/1661785.1667562
[54] Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier.HyperLogLog: The Analysis of a Near-Optimal Cardinality Estimation Algorithm. At Conference on Analysis of Algorithms (AofA), June 2007.doi.46298/dmtcs.3545
[54] Philippe Flajolet、Éric Fusy、Olivier Gandouet 和 Frédéric Meunier. HyperLogLog:近似最优基数估计算法的分析。在算法分析会议(AofA)上,2007 年 6 月。doi.46298/dmtcs.3545
[55] Jay Kreps.Questioning the Lambda Architecture. oreilly.com, July 2014. Archived at perma.cc/2WY5-HC8Y
[55] Jay Kreps. 质疑 Lambda 架构. oreilly.com, 2014 年 7 月. 永久存档于 perma.cc / 2WY5 - HC8Y
[56] Ian Reppel.An Overview of Apache Streaming Technologies. ianreppel.org, March 2016. Archived at perma.cc/BB3E-QJLW
[56] Ian Reppel. Apache 流技术的概述. ianreppel.org, 2016 年 3 月. 永久存档于 perma.cc / BB3E - QJLW
[57] Jay Kreps.Why Local State is a Fundamental Primitive in Stream Processing. oreilly.com, July 2014. Archived at perma.cc/P8HU-R5LA
[57] Jay Kreps. 为什么本地状态是流处理中的基本原语. oreilly.com, 2014 年 7 月. 永久存档于 perma.cc / P8HU - R5LA
[58] RisingWave Labs.Deep Dive Into the RisingWave Stream Processing Engine - Part 2: Computational Model.risingwave.com, November 2023. Archived at perma.cc/LM74-XDEL
[58] RisingWave Labs. 深入解析 RisingWave 流处理引擎 - 第二部分:计算模型. risingwave.com, 2023 年 11 月. 永久存档于 perma.cc/LM74-XDEL
[59] Frank McSherry, Derek G. Murray, Rebecca Isaacs, and Michael Isard.Differential dataflow. At 6th Biennial Conference on Innovative Data Systems Research (CIDR), January 2013.
[59] 弗兰克・麦克沙里、德雷克・G・默里、丽贝卡・伊萨克和迈克尔・伊斯德。差分数据流。在第六届创新数据系统研究双年会议(CIDR),2013 年 1 月。
[60] Andy Hattemer.Incremental Computation in the Database. materialize.com, March 2020. Archived at perma.cc/AL94-YVRN
[60] 安迪・哈特默尔. 数据库中的增量计算. materialize.com, 2020 年 3 月. 归档于 perma.cc/AL94-YVRN
[61] Shay Banon.Percolator. elastic.co, February 2011. Archived at perma.cc/LS5R-4FQX
[61] 谢伊・巴农. Percolator. elastic.co, 2011 年 2 月. 归档于 perma.cc / LS5R - 4FQX
[62] Alan Woodward and Martin Kleppmann.Real-Time Full-Text Search with Luwak and Samza. martin.kleppmann.com, April 2015. Archived at perma.cc/2U92-Q7R4
[62] 艾伦・伍德沃德和马丁・克莱普曼. 使用 Luwak 和 Samza 进行实时全文搜索. martin.kleppmann.com, 2015 年 4 月. 归档于 perma.cc / 2U92 - Q7R4
[63] Tyler Akidau.The World Beyond Batch: Streaming 102. oreilly.com, January 2016. Archived at perma.cc/4XF9-8M2K
[63] 泰勒・阿基德奥. 批处理之外的世界:流处理 102. oreilly.com, 2016 年 1 月. 归档于 perma.cc / 4XF9 - 8M2K
[64] Stephan Ewen.Streaming Analytics with Apache Flink. At Kafka Summit, April 2016. Archived at perma.cc/QBQ4-F9MR
[64] Stephan Ewen. 使用 Apache Flink 进行流分析. 在 Kafka Summit,2016 年 4 月. 存档于 perma.cc/QBQ4-F9MR
[65] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, Slava Chernyak, Josh Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul Nordstrom, and Sam Whittle.MillWheel: Fault-Tolerant Stream Processing at Internet Scale. Proceedings of the VLDB Endowment, volume 6, issue 11, pages 1033–1044, August 2013.doi.14778/2536222.2536229
[65] Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, Slava Chernyak, Josh Haberman, Reuven Lax, Sam McVeety, Daniel Mills, Paul Nordstrom, 和 Sam Whittle. MillWheel: 大规模互联网容错流处理. VLDB 基金会会议录,第 6 卷,第 11 期,第 1033-1044 页,2013 年 8 月. doi.14778/2536222.2536229
[66] Alex Dean.Improving Snowplow’s Understanding of Time. snowplow.io, September 2015. Archived at perma.cc/6CT9-Z3Q2
[66] Alex Dean. 提升 Snowplow 对时间的理解. snowplow.io, 2015 年 9 月. 存档于 perma.cc / 6CT9 - Z3Q2
[67] Azure Stream Analytics: Windowing functions. Microsoft Azure Reference, learn.microsoft.com, July 2025. Archived at archive.org
[67] Azure 流分析:窗口函数. Microsoft Azure 参考, learn.microsoft.com, 2025 年 7 月. 存档于 archive.org
[68] Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, Ashish Gupta, Haifeng Jiang, Tianhao Qiu, Alexey Reznichenko, Deomid Ryabkov, Manpreet Singh, and Shivakumar Venkataraman.Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams. At ACM International Conference on Management of Data (SIGMOD), June 2013.doi.1145/2463676.2465272
[68] Rajagopal Ananthanarayanan、Venkatesh Basker、Sumit Das、Ashish Gupta、Haifeng Jiang、Tianhao Qiu、Alexey Reznichenko、Deomid Ryabkov、Manpreet Singh 和 Shivakumar Venkataraman. Photon:容错且可扩展的连续数据流连接。在 ACM 国际数据管理会议(SIGMOD),2013 年 6 月。doi.1145/2463676.2465272
[69] Ben Kirwin.Doing the Impossible: Exactly-Once Messaging Patterns in Kafka. ben.kirw.in, November 2014. Archived at perma.cc/A5QL-QRX7
[69] Ben Kirwin. 实现不可能:Kafka 中的精确一次消息模式。ben.kirw.in,2014 年 11 月。存档于 perma.cc / A5QL - QRX7
[70] Pat Helland.Data on the Outside Versus Data on the Inside. At 2nd Biennial Conference on Innovative Data Systems Research (CIDR), January 2005.
[70] Pat Helland. 外部数据与内部数据。在第二届创新数据系统研究双年会议(CIDR),2005 年 1 月。
[71] Ralph Kimball and Margy Ross.The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling, 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1
[71] Ralph Kimball 和 Margy Ross. 数据仓库工具箱:维度建模权威指南,第 3 版。John Wiley & Sons, 2013。ISBN: 978-1-118-53080-1
[72] Viktor Klang.I’m coining the phrase ‘effectively-once’ for message processing with at-least-once + idempotent operations.x.com, October 2016. Archived at perma.cc/7DT9-TDG2
[72] Viktor Klang. 我将为使用至少一次 + 幂等操作的消息处理创造短语 “有效一次”。x.com,2016 年 10 月。存档于 perma.cc / 7DT9 - TDG2
[73] Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker, and Ion Stoica.Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters. At 4th USENIX Conference in Hot Topics in Cloud Computing (HotCloud), June 2012.
[73] Matei Zaharia, Tathagata Das, Haoyuan Li, Scott Shenker 和 Ion Stoica. 离散流:大规模集群上高效且容错的流处理模型。第 4 届 USENIX 云计算热点会议(HotCloud),2012 年 6 月。
[74] Kostas Tzoumas, Stephan Ewen, and Robert Metzger.High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink. ververica.com, August 2015. Archived at archive.org
[74] Kostas Tzoumas, Stephan Ewen 和 Robert Metzger. Apache Flink 的高吞吐量、低延迟和精确一次流处理。ververica.com, 2015 年 8 月。存档于 archive.org
[75] Paris Carbone, Gyula Fóra, Stephan Ewen, Seif Haridi, and Kostas Tzoumas. Lightweight Asynchronous Snapshots for Distributed Dataflows. arXiv.08603 [cs.DC], June 2015.
[75] Paris Carbone, Gyula Fóra, Stephan Ewen, Seif Haridi 和 Kostas Tzoumas. 分布式数据流轻量级异步快照。arXiv.08603 [cs.DC], 2015 年 6 月。
[76] Ryan Betts and John Hugg.Fast Data: Smart and at Scale. Report, O’Reilly Media, October 2015. Archived at perma.cc/VQ6S-XQQY
[76] Ryan Betts 和 John Hugg. 快速数据处理:智能与规模化. 报告, O’Reilly Media, 2015 年 10 月. 存档于 perma.cc / VQ6S - XQQY
[77] Neha Narkhede and Guozhang Wang.Exactly-Once Semantics Are Possible: Here’s How Kafka Does It. confluent.io, June 2019. Archived at perma.cc/Q2AU-Q2ED
[77] Neha Narkhede 和 Guozhang Wang. 精确一次语义是可能的:这是 Kafka 如何实现的. confluent.io, 2019 年 6 月. 存档于 perma.cc / Q2AU - Q2ED
[78] Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang.KIP-98 – Exactly Once Delivery and Transactional Messaging. cwiki.apache.org, November 2016. Archived at perma.cc/95PT-RCTG
[78] Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, 和 Guozhang Wang. KIP-98 – 精确一次交付与事务性消息. cwiki.apache.org, 2016 年 11 月. 存档于 perma.cc / 95PT - RCTG
[79] Pat Helland.Idempotence Is Not a Medical Condition. Communications of the ACM, volume 55, issue 5, page 56, May 2012.doi.1145/2160718.2160734
[79] Pat Helland. 不可逆性不是一种医疗状况. ACM 通讯, 第 55 卷, 第 5 期, 第 56 页, 2012 年 5 月. doi.1145/2160718.2160734
[80] Jay Kreps.Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind. Email to samza-dev mailing list, September 2014. Archived at perma.cc/7DPD-GJNL
[80] Jay Kreps. Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind. 邮件发送至 samza-dev 邮件列表,2014 年 9 月。存档于 perma.cc/7DPD - GJNL
[81] E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson.A Survey of Rollback-Recovery Protocols in Message-Passing Systems. ACM Computing Surveys, volume 34, issue 3, pages 375–408, September 2002.doi.1145/568522.568525
[81] E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, 和 David B. Johnson. 消息传递系统中回滚恢复协议的调查. ACM 计算机调查, 卷 34, 期 3, 页 375–408, 2002 年 9 月. doi.1145/568522.568525
[82] Adam Warski.Kafka Streams – How Does It Fit the Stream Processing Landscape?softwaremill.com, June 2016. Archived at perma.cc/WQ5Q-H2J2
[82] Adam Warski. Kafka Streams – 它如何适应流处理领域? softwaremill.com, 2016 年 6 月. 永久存档于 perma.cc / WQ5Q - H2J2