Pafka 0.2.0: Optimized Kafka Based on Tiered Storage Architecture

MemArk
8 min readSep 24, 2021

1. Release Notes

Pafka 0.2.0 has been released( https://github.com/4paradigm/pafka ). Pafka is an optimized version of the popular open source stream-processing platform Apache Kafka. It enhances Kafka by introducing tiered storage architecture awareness. Compared with the previous version Pafka 0.1.1, the major features introduced are as follows:

  • It introduces tiered storage architecture aware optimization, which supports the use of high-performance persistent memory PMem as the first level storage, and lower performance HDD or SSD as the second level storage.
  • It integrates a data migration strategy to balance capacity and performance based on tiered storage architecture.
  • It uses the libpmem interface of llpl ( https://github.com/pmem/llpl ) to rewrite most PMem related codes, which further improves the read-write performance and stability of the system.

2. Tiered Storage Architecture and Migration Strategy

The most important feature of Pafka 0.2.0 is the introduction of tiered storage awareness and migration strategies, so as to improve Kafka’s overall performance at a lower cost for specific scenarios. As shown in the figure below, with the introduction of persistent memory (PMem), our storage system has become richer at the hierarchical level. Data centres generally no longer use HDDs as the only storage devices. SSD and high-speed PMem can be used as storage devices as well. The original Kafka is not aware of such storage hierarchy. Thus Pafka is designed to take advantage of such modern storage architecture for performance optimization.

Pafka 0.2.0 introduces an important new parameter to control data migration, which is called migration threshold (storage.migrate.threshold). This parameter can be understood as the data capacity that the system expects to store on PMem in a balanced state. Pafka will try to keep fresh data stored on PMem and migrate older data to secondary storage. The basic idea of this migration behaviour is that newer data is more likely to be consumed in the near future. The setting of this value has different impacts in different scenarios, which will be demonstrated in the evaluation section.

The below figure shows the system architecture and migration strategy based on two-level (PMem and HDD) storage architecture. Basically, the data is first written into the fast PMem whenever it is available, otherwise into the slow HDD. Besides, there is a migration task running in background. When the used space of PMem exceeds the threshold, the older data in PMem will be migrated into HDD; similarly, when the used space of PMem is smaller than the threshold, the newer data in disk will be migrated into PMem. Of course, data migration may introduce significant overhead for certain scenarios, thus it is configurable to be disabled.

3. Benchmark Script

We have provided an improved Pafka benchmark script that can be easily used for testing the performance of multiple producers and consumers in distributed environment. The script is located at “bin/bench.py”, and you can investigate the parameters of this script by:

./bin/bench.py --help

Environment Requirement

  • The servers used for testing should be configured as SSH login without password.
  • The Pafka codebase should locate at the same path for all servers (NFS is workable).

Producer Testing Script

bin/bench.py --threads 16 --hosts "node-1 node-2" --num_records 100000000 --type producer

The above script will start testing at node-1 and node-2; and each node will start 16 Producer processes with one topic generated each, and will generate 100,000,000 records in total.

Consumer Testing Script

bin/bench.py --threads 16 --hosts "node-1 node-2" --num_records 100000000 --type consumer

The above script is similar to the producer testing script.

4. Benchmark for Specific Scenarios

Hardware Configuration

1x Broker server:
- CPU: Intel(R) Xeon(R) Gold 6252 CPU (24 cores)
- PMem: 128 GB x 6 = 768 GB
- Network: 100 Gbps
- HDD: RAID-5, deliver around 500 MB/sec in total

2x Client servers:
- CPU: Intel(R) Xeon(R) Gold 6132 CPU (14 cores)
- Network: 100 Gbps

Note that, for most servers, there are two CPU sockets, which can equip 12 PMem DIMMs (e.g. 128 GB x 12 = 1.5 TB). Only one socket is used for this test. If you want to use PMems on both sockets, you may either: (1) launch two brokers on one server; or (2) use Linux Logical Volume Manager to virtually merge PMems from two sockets as a single volume. In the future version of Pafka, we will support PMems from both sockets directly.

Pafka Configuration

We list the configurations that might be different from the default Kafka (config/server.properties)

# 100 Gbps IP
listeners=PLAINTEXT://172.29.100.24:9092
# threads related
num.network.threads=32
num.io.threads=16
# PMem related
storage.pmem.path=/mnt/mem/pool/
storage.pmem.size=670000000000
storage.hdd.path=/mnt/hdd0/kafka/
storage.migrate.threshold=0.5
storage.migrate.threads=4
log.channel.type=mix
log.pmem.pool.ratio=0.99
log.preallocate=true

Producer Configuration

batch.size=163840

Scenario Description

Pafka 0.2.0 provides a low-cost elastic solution for Kafka’s performance requirements in some specific scenarios. Users no longer need to allocate additional high-cost physical servers to expand the Kafka cluster, but only need to configure persistent memory to meet the corresponding requirements. Such a solution has significant advantages in total cost, physical space occupation, power consumption and so on. Here we mainly test two advantage scenarios:

Scenario 1: Sudden high traffic scenario. For example, on the online shopping platform, specific preferential products or coupons will be issued every hour, so sudden high traffic is expected in this short period of time (possibly a few minutes). On the traditional Kafka platform, in order to meet this occasional high throughput demand, additional physical machines need to be equipped, but in most cases, these physical machines are in an idle state, because the high throughput demand only occurs occasionally. In this case, the tiered storage strategy can use high-performance PMem to guarantee the instantaneous high throughput demand.

Scenario 2: Stable high throughput demand scenario. For example, food delivery platforms or online music and video platforms may need to maintain an average data pressure for several hours. In this scenario, if the user’s requirement for log size is not particularly high (which is configurable through the relevant parameters under log.retention), the user can maintain a certain log size (e.g., the overall log size is twice that of PMem) to take advantage from PMem to improve the overall throughput.

Result of Scenario 1: Sudden high traffic scenario

In this scenario, we simulate that the producer will generate data at a relatively low rate most of the time. After a certain interval, the producer will suddenly increase the amount of data generated. We intentionally adjusted the interval of high-speed data generation to almost meet the use of PMem to undertake high traffic. At the same time, consumers continuously consume data. Relevant key parameter settings and descriptions are as follows:

  • storage.migrate.threshold = 0.5: this parameter setting reserves half of the PMem space to deal with high throughput requirements. Properly reducing this parameter will make the peak throughput of data pressure that can be supported higher and last longer, but too small will affect the performance of consumers. Therefore, in most cases, we recommend using the default value of 0.5.
  • log.retention related parameters: there are no specific restrictions, that is, Kafka log files will not be automatically cleaned by default. If PMem becomes full, secondary storage will be enabled automatically.

As shown in the figure above, we investigate two scenarios, the difference is the peak value when high-speed data is generated. The figure on the left shows that under the throughput of up to 8 GB / sec, we can accommodate two peaks within the test time (about half an hour), and each peak can support about 1 minute. In the right figure, if the throughput peak is reduced to 2.5 GB / sec, three peaks can be supported in half an hour, each peak can last about 2 minutes.

In general, the basic calculation method of peak duration and interval of sudden high flow that Pafka can support is as follows.

  • The duration of high throughput is the time required for the reserved PMem capacity (ratio: 1- storage.migrate.threshold) to be filled by sudden high traffic. For example, our PMem reserves 500 GB. If the rate of burst high traffic is 5 GB / sec (here refers to the rate received by each Kafka broker node), the supporting time is about 100 sec.
  • Adjacent burst high traffic requires at least an interval, which is essentially the time when the reserved PMem capacity is migrated to secondary storage in the background. For example, if we reserve 500 GB on PMem and the write bandwidth of secondary storage disk is 500 MB / sec, the interval time needs to be more than 1,000 sec.

Of course, the real-world scenario is more complex, and the specific selection of hardware and parameter configuration needs to be made according to the business requirements.

Result of Scenario 2: Stable High Throughput Demand Scenario

When the traffic is mostly stable, the tiered storage based optimization can also help the overall performance. But note that, due to the size of limit of PMem, the overall log size significantly affects the performance improvement. In this experiment, we use the parameter log.retention.bytes to vary the log size. We still let the consumers continuously consume data, to observe the performance of producers and consumers at the same time. Key parameters are explained as below:

  • storage.migrate.threshold = -1: The data migration is disabled in this experiment to avoid high overhead.
  • log.retention.bytes: We vary this parameter to investigate the performance impact under different log sizes, such as 2x of PMem’s capacity. Users may also use time related parameters under log.retention to config the log size.

The above figure shows the throughput at a stable status, that is the data is continuously produced, and the log size already exceeds the retention limit, thus log cleanup is triggered in background. When retention_size = pmem_size, which is storing all log entirely in PMem, the best performance can be achieved with around 6 GB/sec throughput. When we maintain the log file larger than PMem’s capacity, the data might be stored into HDD. In such a case, if the log file is larger than the retention limit, the old data will be removed. Thus PMem may become available again to store data. This way, the overall performance can be improved. It demonstrates that when the log size is 1.5x or 2x larger than PMem’s capacity, the system still can improve the overall throughput by 1–2x compared with the baseline with HDD only. So if we are able to equip the broker server with higher capacity of PMem, it can further improve the overall performance.

5. Roadmap

Pafka will continue to carry out technical iteration, innovation, and optimization. The following is the development plan of expected major features. The specific updates are subject to the release note of each version.

Pafka 0.3.0
- Release date: Q4 2021
- Major features: The first level storage device is extended from PMem to ordinary file interface, so that Pafka can also use high-speed SSD with larger capacity as the first level storage device.

Pafka 0.4.0
- Release date: Q1 2022
- Major features: There will be more migration strategies to support different different scenarios.

Pafka 1.0.0
- Release date: Q2 2022
- Major features: Multi-level storage hierarchy is supported (not limited to two only), such as PMem / SSD / HDD for better flexible matching to meet the needs of different scenarios. It supports the deployment in the form of Kafka plugin other than a standalone system.

6. Community

Github: https://github.com/4paradigm/pafka
Support: Slack Workspace
Email: contact@memark.io

--

--

MemArk

memark.io — Leveraging Modern Storage Architecture for System Enhancement