I started working with Apache BookKeeper seven years ago, as part of Yahoo’s Push Notification team in Beijing. BookKeeper originally came out of the Yahoo Scalable Systems group in Barcelona, in particular Flavio Junqueira and Ivan Kelly, and through collaborating with them, I started talk to other users of BookKeeper within the company, in particular Matteo Merli who was, at that time, working on what would eventually become Apache Pulsar (incubating).
In 2012, I joined Twitter in San Francisco. This was a particularly interesting time at Twitter. My team was trying to use BookKeeper to address consistency problems for distributed databases. This work led to the creation of Apache DistributedLog. Apache BookKeeper, and Apache Pulsar (incubating), which have now all been open sourced under the Apache Software Foundation.
People often ask about the differences between Pulsar and DistributedLog and how they compare with another common log abstraction system: Apache Kafka. To some extent, these projects are built around the concept of a “log,” a “stream,” or “pub-sub.” Although they share some similarities, they have completely different technical focuses. In this blog post, I would like to walk you through the real-time journey of Pulsar and DistributedLog using BookKeeper, and share my thoughts around this.
Apache Pulsar (incubating) is a distributed pub-sub messaging system developed at Yahoo, offering features like flexible messaging, multi-tenancy, and geo-replication. Pulsar was created to address several shortcomings of existing open source messaging systems available at the time. It began with the idea of using BookKeeper as the durable message store for ActiveMQ. It then evolved beyond ActiveMQ to provide a unified flexible messaging model with many useful features and has since been successfully running in production to support mission-critical services. You can read this blog post from Matteo Merli and Joe Francis about why they developed Pulsar.
I was impressed by how the creators of Pulsar put a lot of thought into designing a distributed messaging system even though there were already a lot of messaging products available on the market. I will highlight one core feature that I think makes Pulsar a unique messaging system: its unified flexible messaging model. For other Pulsar features, such as durability, I/O isolation, and low latency, these features also come with durable storage, I will cover them when I talk about DistributedLog and BookKeeper.
Unified Flexible Messaging Model
There are two traditional messaging models:
Queuing is a point-to-point communication model whereby a pool of consumers may read from a server and each message is delivered to one of them. This allows you to divide up the processing of data across multiple consumer instances and scale your processing. Publish-subscribe is a broadcast communication model whereby a message is broadcast to all consumers. Most existing messaging systems present these two messaging concepts as separate domains and entities: one is the queue, while the other is the topic. There were systems that attempted to combine queue and topic concepts together with limited success. For example, Twitter’s Kestrel, a distributed queuing system, attempted to support the pub-sub model by introducing a concept of a fanout queue, which duplicates the message published to the queue. It wasn’t a performant or cost-efficient solution.
Pulsar generalizes the queue and topic messaging concepts through one unified messaging model. I call it “Producer-Topic-Subscription-Consumer,” or PTSC. A topic is a named channel for sending messages. Each message is only stored once on a topic (and replicated to multiple machines by BookKeeper) and can be consumed as many times as necessary by consumers. The topic is the source of truth for consumption. Although messages are only stored once on the topic, there are different ways of consuming those messages. The group of consumers is organized by subscriptions on a topic. Each group of consumers can decide its own way of consuming the messages (exclusively, shared and failover), which can be different from other groups. This combination of two traditional messaging models was designed and implemented with the goal of not impacting performance and introducing cost overhead, while also providing a lot of flexibility to users to consume messages in a way that’s best for the use case at hand.
Figure 1. Pulsar subscription models: exclusive, shared, and failover.
So why is this unification, and flexibility around how messages are consumed, so important for the users of a messaging system?
I would say because some partition-based pub-sub systems have confused people about queuing and streaming, where they force the users of messaging systems to consume messages in only one way. That means for queuing use cases, in order to increase consumption parallelism, users have to increase the number of partitions to match consume parallelism. This is usually not a cost-effective solution, and it’s always problematic because it slows down processing, especially for use cases like task dispatching.
Messaging use cases can be divided into two categories. I call these queuing and streaming.
- Queueing — Unordered messaging, or point-to-point or shared messaging. These types of use cases are usually found in conjunction with stateless applications. Stateless applications don’t care about ordering, but they do require the ability to acknowledge/remove individual messages as well as the ability to scale the consumption parallelism as much as possible. I’d say that roughly 70% of messaging use cases falls into this category.
- Streaming — Strictly ordered messaging, or exclusive messaging. Exclusive messaging means there is only one consumer per partition/topic. These types of use cases are usually associated with stateful applications. Stateful applications care about ordering and state because ordering and state will impact the correctness of whatever processing logic the application needs to apply. They also need the ability to re-process messages. In stateful processing, for example, the compute engine usually rewinds and re-processes messages when errors occur.
Pulsar covers both queuing and streaming use cases very well, and is optimized for the more common queuing use case.
First, Pulsar achieves queuing by deploying a shared subscription with round-robin delivery, allowing applications to divide up processing across the consumers in the same subscription. This means that you can scale the number of active consumers beyond the number of partitions within a topic. Besides scaling the number of active consumers, Pulsar can do even better. In other systems, increasing the parallelism of consumption requires also increasing the number of partitions. Pulsar is instead designed scalably, separating the scaling of publishing parallelism from consumption parallelism, allowing for independent scaling of active publishers and consumers.
Second, Pulsar implements streaming by deploying an exclusive/failover subscription with ordered, sequential delivery. This allows applications to process messages in order and also to rewind and re-process messages.
Pulsar is able to achieve highly performant queuing and streaming because it is built over a highly scalable log store, Apache BookKeeper. Each topic is essentially a distributed log powered by BookKeeper, which becomes the source of truth for message consumption. Because of BookKeeper, Pulsar can efficiently implement a feature which some have claimed is a weakness in other traditional messaging systems: message removal.
Pulsar accomplishes efficient message removal via a cursor system. A cursor is essentially a state that Pulsar uses to record the message consumption for each subscription. For an exclusive/failover subscription, the cursor is simply an offset value, which represents the point to which a consumer has thus far consumed. For a shared subscription, the cursor is more than a simple offset value in that it also tracks the acknowledgements of individual messages. Pulsar implements cursors in an efficient way by recording cursor updates as changes in BookKeeper ledgers, which makes it possible for Pulsar to support high throughput of cursor updates and reduces the number of message redeliveries when a consumer crashes. The existence of cursors separates tracking message consumption from message deletion, which overcomes problems associated with message removal in traditional messaging systems while still maintaining the power of traditional messaging.
At Streamlio, we’ve used the cursor system to address effectively-once (aka idempotent) publishing in Pulsar in only a few hundred lines of changes. We will share more details about the power of the cursor system and how we use it to implement effectively-once publishing in future posts and tutorials.
Until now, I’ve only talked about the unified flexible messaging and cursor system in Pulsar. We will discuss other unique features in Pulsar, such as multi-tenancy and geo-replication beyond simply two data centers, in future blog posts. I think that the unified flexible messaging and cursor management, however, is already enough to separate Pulsar from other existing messaging systems.
I liked how Matteo Merli explained in his Introduction to Apache Pulsar blog post that Pulsar is a distributed pub-sub system providing flexible traditional messaging, powered by a highly scalable distributed log store. The core concept behind distributed log storage is, of course, the log, which I’ll talk about in the next section.
I started my journey at Twitter at the end of 2012. This was a particularly interesting time at Twitter because real-time messaging infrastructure was highly fragmented at the time. Kestrel was the main queuing system for serving critical traffic for online services, Kafka was used for log collection and analytics in offline workloads, and we were about to use BookKeeper for database replication. DistributedLog was designed to unify this fragmented infrastructure and has become a building block for many other services, including key-value databases, pub-sub messaging, and cross-datacenter replication.
My colleague at the time, Leigh Stewart, called the infrastructure built around a log storage system, such as DistributedLog, “Shared Log Infrastructure.” In this type of infrastructure, DistributedLog is the distributed transaction log, tracking changes happening within it, and these various distributed systems in your infrastructure, such as distributed key-value databases, pub-sub messaging, and real-time indexes, become the materialized views of this giant distributed log. And yes, we named it DistributedLog simply because it’s not just a log but a log that needs to be stored across many machines.
Figure 2. Use cases for Apache DistributedLog and BookKeeper.
At the heart of “Shared Log Infrastructure,” a log stream is the fundamental storage abstraction for real-time applications. A log stream is synonymous with a file in a distributed file system. Data is segmented, distributed, and replicated across machines and durably stored on disk, and it supports high-throughput and low-latency writes and reads. In addition, a log stream is also similar to a queue in messaging systems in that a log stream allows for every efficient fan-in writes and fan-out reads, and also provides a fast tailing/streaming facility to propagate future data to many concurrent readers.
I call a system that stores and serves log streams a Stream Store, i.e. a storage system that has a fast tailing/streaming facility. A Stream Store should be a fundamental building block for real-time infrastructure, just as a POSIX filesystem is essential to an operating system. A Stream Store is the key to connecting historical data to real-time and future data.
One way to think about this Stream Store is to imagine the relationship between a messaging system and a traditional storage system (e.g. filesystem). Messaging systems focus on delivering future messages. When you connect to a messaging system, you are waiting for new messages to arrive to consume. In traditional storage systems like POSIX file systems, blob stores only store historical data. In such systems, you process data received in the past and query the results generated based on their histories. A Stream Store combines the functionality of messaging (precisely because it is streaming, as I discussed above in Pulsar) and storage, and connects historical data with future data.
DistributedLog was developed with the aim of seamlessly connecting both historical and future data. DistributedLog is based on BookKeeper. It leverages a number of fantastic features from BookKeeper, such as low-latency durable storage, parallel replication, simple repeatable read consistency, fast many-to-many replica repair, I/O isolation, and simple operationality.
It extends the numbered, lower-level ledgers in BookKeeper into named, high-level streams, providing a powerful streaming facility over BookKeeper and creating a Stream Store.
DistributedLog has geo-replication built in as a first-class citizen. This means that you can run an instance of DistributedLog/BookKeeper within a single datacenter, or you can run multiple instances of DistributedLog/BookKeeper across many datacenters, with multiple clusters acting as a global instance. From the standpoint of both developers and administrators, there’s little difference between running in these two modes.
Additionally, DistributedLog provides a proxy layer, which users can decide to use or not based on their use case, to achieve large scale fan-in writes and fan-out reads, without impacting the storage system. It also achieves independent scalability between cpu/memory and disks, which also the key to the success of multi-tenancy.
For the details on why we built DistributedLog, please check out Leigh Stewart’s blog post Building DistributedLog: A high performance replicated log service. For why we built DistributedLog on BookKeeper, you can read my previous blog post on Why BookKeeper? Consistency, Durability and Availablity.
Because of the streaming facility, DistributedLog was usually misinterpreted as “yet another messaging system” or “a Kafka clone.” As a Stream Store, DistributedLog/BookKeeper certainly does provide streaming facilities for propagating future data to many concurrent readers. This covers some aspects on messaging - “streaming” as I mentioned above when talking about Pulsar. However, the main focuses in DistributedLog/BookKeeper has alway been more about storage and fast streaming than traditional messaging in Pulsar.
Kafka started as a log collection system and became a messaging system, whereas DistributedLog started from addressing database consistency issues and became a stream storage system. This means both systems have very different designs and technical considerations. For the interests of a technical comparison, you can read a blog post of mine from last year, A Technical Review of Kafka and DistributedLog.
As there is a close dependency between DistributedLog and BookKeeper, DistributedLog has graduated from Apache Incubator and has been merged as a subproject of BookKeeper, for the sake of delivering the best stream storage for real-time workloads.
Messaging, Storage, or Both?
Now, I hope you have a better idea about the real-time journey of Pulsar and DistributedLog. So, are they Messaging, Storage, or both? Here are my thoughts.
Pulsar is a distributed pub-sub system that supports flexible traditional messaging (queuing and pub-sub), backed by a highly scalable, durable stream storage – BookKeeper. It focuses on message dispatching and consumption, and allows the removal of data as soon as possible if they are not needed. However since it is backed by a scalable stream storage, it carries all the fantastic features offered by the stream storage - strong durability guarantees, allowing rewind for bootstrapping system, backfilling data and reprocessing in stream computing.
BookKeeper, together with DistributedLog, forms the highly scalable log stream storage that offers log/stream as a real-time storage storage. It combines the abilities of storing past data and propagating future data in one storage system. Because of the tailing facility, it can also be used as a “messaging system” for streaming data for the applications that requires order guarantees, such as database replication, change propagation, stateful processing. And it can used for storing messages for any other message brokers, for example Pulsar.
While there certainly is an overlap between Pulsar and DistributedLog around messaging, they live in different parts of an infrastructure, largely due to differences in their focus and thus we see them as complementary systems in a unified real-time stack. If you are looking for a real-time storage for streaming data, BookKeeper (together with DistributedLog) definitely fulfills your requirements - high-throughput, low-latency, durable replicated storage, with fast tailing/streaming facility. If you are looking for a fast, flexible messaging system, Pulsar provides flexible traditional messaging, supports both queuing and pub-sub, and also allow rewind for reprocessing message in various use cases - such as bootstrap systems, backfill data and stream computing.
I don’t see the answer is just simple as choosing between messaging or storage. Really, I think the answer should be both messaging and storage.
The combination of Pulsar and DistributedLog/BookKeeper can provide you fast, durable, flexible traditional messaging and highly scalable stream storage. It fulfills the messaging and storage requirements of a unified real-time solution:
- The ability to propagate, stream and process future data.
- The ability to store, replicate and process past data in a durable, replicated way.
- The ability to store intermediate results during processing for future interactive queries.
As an example, let me walk you through how Pulsar and BookKeeper/DistributedLog fit in a unified real-time solution. It is illustrated in the figure below.
- Data is collected from different sources and published to Pulsar.
- This data is durably stored in BookKeeper.
- Real-Time workflows, processes and analytics can be performed with a stream computing engine like Heron, by reading multiple log streams from BookKeeper.
- Those computing jobs can leverage BookKeeper to store the computation state while it is executing the streaming job, enabling stateful computation.
- Those computing jobs can: write the results to BookKeeper; publish results to Pulsar for consumption by downstream applications; or publish results to other serving systems.
Figure 3. The complete picture: durable messaging, stream storage, and stream processing (compute).
In a real-time world, it doesn’t make any sense to talk about messaging without storage or storage without messaging. Without durable, low-latency storage, a messaging system can lose data and doesn’t provide the ability to re-process data for computing engines; without a messaging system providing flexible traditional messaging, a log stream store is insufficient to support various requirements for real-time workloads, microservices, and event-driven architectures. Messaging and storage have different technical focuses: messaging focuses more on message dispatching and consumption, while stream storage focuses on how to store and replicate data in a consistent, durable, and low-latency fashion, as well as how to provide tailing/streaming facilities to propagate future data fast to different applications. Pulsar and DistributedLog have gone through the journey with BookKeeper separately in the past few years. They have overlap but also have different technical focuses. The communities have begun combining efforts on providing a better real-time ecosystem around Pulsar, Bookkeeper/DistributedLog. We believe the combination of these projects can provide a better experience for building real-time applications.
If you are interested in this topic and want to learn more about Pulsar, BookKeeper/DistributedLog, Matteo Merli and I are presenting a talk titled Messaging, storage or both: the real-time story of Pulsar and DistributedLog at Strata New York on Thursday, September 28. Please stop by and chat with us.
If you’re interested in Pulsar, including its flexible messaging (queuing and pub-sub) model, multi-tenancy support, and geo-replication, please visit the official website at https://pulsar.apache.org/. You can also participate in the Pulsar community via:
- The Pulsar Slack channel. You can self-register at https://apache-pulsar.herokuapp.com/.
- The Pulsar email list
If you are interested in BookKeeper/DistributedLog, you may want to checkout the official website at http://bookkeeper.apache.org/ and http://bookkeeper.apache.org/distributedlog/. You can also participate in the BookKeeper/DistributedLog community via:
- The BookKeeper/DistributedLog Slack channel. You can self-register at https://apachebookkeeper.herokuapp.com.
- The BookKeeper email list.
- The DistributedLog email list.