Pafka 0.3.0 Release: Low-cost Solution to Peak Traffic Scenario in Kafka
1. Pafka 0.3.0 Introduction
Pafka GitHub: https://github.com/4paradigm/pafka
Based on the hierarchical storage methodology, Pafka optimizes the popular open-source message queue system Apache Kafka based on the modern storage architecture. It uses the first-level high-speed storage devices (e.g. persistent memory, SSD, etc.) to store hot data. Combined with the automatic migration strategy between storage levels, we try to complete Kafka’s read and write in the first-level high-speed storage device to improve the overall throughput performance. In the latest version 0.3.0, Pafka introduces the adaptability of storage levels. For the first-level high-speed storage medium, you can use the persistent memory (PMem) supported by version 0.2.0 to pursue the ultimate performance. You can also use SSDs with larger capacity to strike a balance between performance and capacity. Compared with PMem, SSDs with larger capacity and lower cost as the first-level storage media can deliver a lower throughput, however, due to their capacity advantages, they can support longer peak traffic time.
About the technical principle of hierarchical storage and automatic migration used by Pafka and how to use PMem to pursue extreme performance in version 0.2.0, please refer to this blog. This article will focus on Pafka 0.3.0 which introduces SSD-based advantageous scenarios.
2. Support Low-Cost Peak Traffic
The support of high peak traffic is a typical advantageous scenario of Pafka. The basic idea to solve the problem is: in the off-peak period, Pafka will automatically migrate the data stored on the first-level fast device to the second-level slow device in the background to free up the available storage space of the first-level device. When the peak comes, try to use the first-level fast storage device to serve the read and write of the message queue and improve the overall throughput of the node. Therefore, the peak traffic time is able to last until the first-level high-speed storage device becomes full. Besides, a certain off-peak period is required to complete the data migration, which takes about the time required to move the data on the first-level storage device to the second-level storage device. For the specific calculation logic of peak and off-peak time, please refer to the calculator we prepared.
Pafka version 0.2.0 only supports PMem as the first-level storage. Although PMem has excellent performance, its capacity (typical configuration: 1.5TB or 3TB) is obviously weaker than traditional hard disks. In some scenarios where Kafka needs to store a large amount of data, the single node PMem is filled quickly and can only support peak throughput for a few minutes. On the contrary, SSD can have relatively large capacity. Although it reduces the maximum throughput of a single node, it can better meet some scenarios to balance performance, capacity, and cost.
We use the food delivery platform as a typical scenario. Based on the actual life experience, there are generally two peaks on the delivery platform, which are noon and evening. The duration of each peak is generally about two hours. In these two peak periods, the traffic of the takeout platform will be much higher than that in the off-peak period. We assume that the system needs to store large data, therefore Kafka is built based on HDD with large capacity (if you don’t need to store much data, you can refer to PMem-based improvement scheme in Pafka 0.2.0). Suppose that the two peak periods of a delivery platform in a day are 12–14 at noon and 18–20 in the evening. If its peak period needs to meet the overall throughput of 12.5 GB/sec for simultaneous reading and writing, and the total throughput demand in non-peak period is 1 GB/sec, we have two technical solutions:
• Solution 1: the traditional Kafka: Suppose each server with a HDD RAID provides a throughput of about 500 MB/sec. In order to meet the performance requirements of 12.5 GB/sec in total during the peak period, 25 servers are required, with a total cost of about USD $250, 000. And most of the machine resources are idle in the off-peak period.
• Solution 2: Pafka based on Hierarchical Storage Optimization: Assuming that each server is equipped with 6 TB SSD, the peak throughput of each machine is planned to be 1.25 GB/sec (note that this value needs to be within the performance range of SSD). Based on the above configuration, we can estimate that the peak period that we can support is 2 hours. In addition, the time required for data migration from the first-level SSD to HDD is 4 hours (that is, it takes about 4 hours during off-peak period to complete the data migration). The above peak and off-peak durations can meet the business needs of the food delivery platform. In terms of cost, we only need 10 servers, each equipped with 6 TB SSD and 10 servers, and the total cost (including SSD) is about USD $110, 000. For the specific calculation logic of this solution, please refer to this calculator.
From the above comparison, it can be seen that for scenarios with obvious peak periods, Pafka provides a low-cost solution based on SSD to meet the high throughput requirements in peak periods. Based on the above example calculation, Pafka solution can save more than twice the hardware cost. Of course, in the actual situation, the performance requirements of different peak periods, the expected length of peak periods and other factors will affect the selection of actual hardware configuration.
If you want to try Pafka quickly, you can directly use the docker image provided by us (https://hub.docker.com/r/4pdopensource/pafka-dev）. It provides the environment and configuration on which Pafka runs.
You can start Pafka’s server according to the following steps and perform basic test and verification through the built-in test scripts.
#Download and start docker
docker run -it 4pdopensource/pafka-dev bash# start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties > zk.log 2>&1 &# start pafka server
bin/kafka-server-start.sh config/server.properties > pafka.log 2>&1 &# test producer performance
bin/kafka-producer-perf-test.sh --topic test --throughput 1000000 --num-records 1000000 --record-size 1024 --producer.config config/producer.properties --producer-props bootstrap.servers=localhost:9092# test consumer performance
bin/kafka-consumer-perf-test.sh --topic test --consumer.config config/consumer.properties --bootstrap-server localhost:9092 --messages 1000000 --show-detailed-stats --reporting-interval 1000 --timeout 100000
4. Performance Measurement
Broker Machine Configuration:
Client Machine Configuration
Some configurations that are different from the standard Kafka configuration (config/server.properties) are listed below:
# 100 Gbps network configuration
listeners=PLAINTEXT://172.29.100.24:9092# Thread related
num.io.threads=16# Tiered storage related configuration
# log file channel type; Options: "file", "pmem", "tiered".
# if "file": use normal file as vanilla Kafka does. Following configs are not applicable.
# the storage types for each layers (separated by ,)
# first-layer storage paths (separated by ,)
# first-layer storage capacities in bytes (separated by ,); -1 means use all the space
# second-layer storage paths (separated by ,)
# threshold to control when to start the migration; -1 means no migration.
# migration threads
• Based on the capacity of our NVMe SSD, storage.tiers.first.sizes is configured as 700, 000, 000, 000 bytes
• In order to quickly migrate first-level storage to second-level storage, storage.migrate.threads can be configured to a larger value, such as 8, to speed up migration
• In order to support longer peak traffic, storage.migrate.threshold can be configured smaller. Here, it is configured as 0.1, that is, when the usage of first-level storage reaches 10%, the background migration starts
You can directly use the test script provided by us to reproduce the results
# Start the producer test process and write the log to producer.log
python3 bin/bench.py --brokers 172.29.100.24:9092 --threads 16 --hosts "$TEST_NODE" --num_records 2000000000 --record_size 1024 --type producer --use_dynamic --dynamic 100000:500000:2000000 --sleept 360 --only_min_max --wait_for_all > producer.log 2>&1 &# Start the consumer test process and write the log to the consumer.log
python3 bin/bench.py --brokers 172.29.100.24:9092 --threads 16 --hosts "$TEST_NODE" --num_records 2000000000 --type consumer --wait_for_all > consumer.log 2>&1 &
The above script will launch 16 producer and 16 consumer processes on $TEST_NODE machine and conduct pressure test to the broker. A total of about 2 TB of data ($num_records * $record_size) will be inserted. The pressure traffic varies from 100,000 records/s to 2,000,000 records/s, with an average of 500,000 records/s ( — dynamic 100000:500000:2000000). Through the — only_min_max option, the traffic will only be 100, 000 or 2, 000, 000. The duration of peak period is set to 360s, and the corresponding duration of off-peak period will be calculated automatically according to the average throughput.
The results are shown in the figure above. It can be seen that based on our experimental configuration, Pafka can very well support the 2 GB/sec peak reading and writing traffic, lasting about 6 minutes, and the off-peak period is about half an hour. The measured results are also basically in line with the estimated results of the calculator provided by us, indicating that the accuracy of the calculation model is in line with expectations. For the actual scenario, we can configure SSDs with different capacities and node bandwidth to achieve the expected duration for the peak period. You can refer to the calculator provided by us for the calculation.
5. Learn more
· Pafka GitHub: https://github.com/4paradigm/pafka