During the last couple weeks I’ve been working on a project that involves the transformation of a batch-based data pipeline into an event-driven one.
I am working in real-time advertisement industry and most of the reports generated require processing huge amounts of data. The reports are essentially counting events for traffic estimation or classification. However, counting a thousand events per second, in a distributed environment might not be as trivial as it seems.
The old process ingested hourly logs, captured from some dozens of nodes every hour. Each node uploaded a compressed bulk of log files to AWS S3, and Hadoop jobs triggered every hour had to load them up again into HDFS. The whole time spent in this IO-dependent ETL process was considerably huge and the increasing amount of data was causing a delay of several hours to get reporting metrics ready.
In real-time bidding, the reporting data of some hours ago might become totally useless for the current bidding decisions, as the market and the opportunities in inventory vary a lot, depending on the countries you serve ads and depending also on the time of the day.
The latency and the time-based utility of reporting data motivated the change into an event-driven pipeline.
Requirements
In order to keep business running, the new system had to be implemented in parallel with the batch pipeline. The first change was to gather data directly from each node into a message broker and have a set of consumers processing the data.
In the first phase, messages could be simply gathered with a tail -f
in the log files and the output piped with a message publisher application that reads them from the input stream.
The message queue system had to be able to deliver messages with high-throughput rather than low-latency and had to persist them to disk, as it was meant to replace the file based logging pipeline. Logging to network inside a datacenter can be faster than logging to disk (see Latency numbers every programmer should know, by Peter Norvig and Jeff Dean).
Message global ordering wasn’t really a requirement for the volume estimation processing, as each opportunity event contains a timestamp we can use to group counts by.
For volume estimation, data loss up to some seconds is also acceptable in a failure scenario, as even if we miscount some events, most of them get will be counted.
Apache Kafka
After analysing the most popular, open-source available queueing solutions I could conclude that most of them focused on low-latency message deliver and just a few supported persistence. Apache Kafka was the one that seemed to match our requirements.
Kafka started as a LinkedIn in-house project, developed when the company was passing through a similar period of transition when pure batch processing started to affect the feedback loop and become a pain to the business.
Kafka is a distributed message publishing/subscribing system of one or more brokers, each one with a set of zero or more partitions for each existing topic. Kafka persists periodically messages to disk, so in case of failure the last ones might get loss. This speeds up the publishing operation, as publishers don’t need to wait until that data gets written to disk.
When a publisher connects to a Kafka cluster, it queries which partitions exist for the topic and which nodes are responsible for each partition.
Each publisher then acts like a card dealer in a poker game, handing messages to partitions as if they were cards. They assign messages to each partition using an hashing algorithm and deliver them to the broker responsible for that partition.
For each partition, a broker stores the incoming messages with monoticaly increasing order identifiers and persists the “deck” to disk using a data structure with access complexity of O(1).
A subscriber is a set of co-operating processes that belong to a consumer-group. Each consumer in the group get assigned a set of partitions to consume from. One key difference to other message queue systems is that each partition is consumed by the same consumer and this allows each consumer to track their progress on consumption on the thread and update it asynchronously.
This simplifies the subscription process as the consumer doesn’t need to reply with acknowledgement responses to each “card” it gets from the partition “deck” and the broker doesn’t need to store for each message, if it was processed or not. Consumers keep track on what they consume and store asynchronously in Zookeeper. This is the key point that allows high-throughput. In case of consumer failure, a new process can start from the last saved point, eventually processing the last messages twice.
The broker subscription API requires the identifier of the last message a consumer had from a given partition and starts to stream from that point on. The constant access time data structures on disk play an important role here to reduce disk seeks.
Both consumer groups and brokers are dynamic, so if the amount of incoming messages increase, you can just add new broker nodes to the list and each of them will contain a defined number of partitions for each topic. According to the number of partitions you have, you can also spawn more subscriber processes if the ones you have can’t handle the new partition’s messages in a reasonable time.
This kind of flexibility by design reduces the random IO in the broker machines and make the whole Kafka system a very stable one in heavy-load production environments.
In the part II of the series, I will write how consumers perform the distributed count of events.