02 March 2015
I was evaluating Flume for a Hadoop integration project recently, and as part of my investigation I needed to see how many messages per second it could handle.
The Flume manual points out that Flume performance will vary greatly depending on your hardware, message size, disk speed and configuration, so it is important to evaluate performance based on your own application.
The manual also points out that a bigger batch size when passing messages into Flume should give higher performance.
In order to perform some benchmarks, I created a simple flume injector, that allowed me to send a given number of messages to Flume, where I could control the length of each message and the batch size.
For the following tests, I am only concerned about the message input rate - therefore I am using a null sink to remove the messages from the channel.
I am also using a single injector with a single connection to Flume - maybe I could get better performance out of many injectors each connecting to Flume separately, but I am not concerned with going into that level of detail.
The Flume test box, is a 2 core 4GB RAM VM, with no internal disks, so it is fairly basic hardware. The injector is running on a similar VM sending messages to Flume over the network.
For these tests I inject 500k messages, varying the message size or the batch size. The Flume configuration uses an Avro source, a memory channel and a null sink:
agent.sources = avro
agent.sinks = nullsink
agent.channels = memchannel
agent.sources.avro.type = avro
agent.sources.avro.bind = 0.0.0.0
agent.sources.avro.port = 41414
agent.channels.memchannel.type = memory
agent.channels.memchannel.capacity = 10000
agent.channels.memchannel.transactionCapacity = 1000
agent.channels.memchannel.byteCapacity = 100000000
agent.sinks.nullsink.type = null
agent.sources.avro.channels = memchannel
agent.sinks.nullsink.channel = memchannel
For this test I inject 500K messages of approximately 500 bytes each, varying the batch size:
Batch Size | Runtime (seconds) | TPS |
---|---|---|
1 | 259 | 1930 |
10 | 43 | 11627 |
20 | 24 | 20833 |
40 | 16 | 31250 |
80 | 12.5 | 40000 |
160 | 11.8 | 42372 |
320 | 11.5 | 43748 |
640 | 11.2 | 44642 |
1000 | 11 | 45454 |
Increasing the batch size has a notable impact on performance up to a batch size of between 80 and 160 messages where it seems to flatten out.
For this test, I used the same Flume config as above and set the batch size to 80, varying the message length:
Message Length | Runtime (seconds) | TPS |
---|---|---|
100 | 10.5 | 47619 |
200 | 10.6 | 47169 |
500 | 12.3 | 40650 |
800 | 14.5 | 34482 |
1600 | 17.5 | 28517 |
3200 | 24.2 | 20661 |
6400 | 38 | 13157 |
12800 | 68 | 7352 |
As the message length increased, the TPS reduced. This is probably expected. For small message lengths (under 500 bytes) the effect of going from 100 to 500 bytes is not too noticeable. For longer message lengths, doubling the length of the message seems to almost half the TPS.
For these tests, I changed the Flume configuration to use a file channel instead of a memory channel:
agent.sources = avro
agent.sinks = nullsink
agent.channels = filech
agent.sources.avro.type = avro
agent.sources.avro.bind = 0.0.0.0
agent.sources.avro.port = 41414
agent.channels.filech.type = file
agent.channels.filech.checkpointDir = /var/flume/filech/checkpoint
agent.channels.filech.dataDirs = /var/flume/filech/data
agent.channels.filech.capacity = 1000000
agent.channels.filech.transactionCapacity = 1000
agent.sinks.nullsink.type = null
agent.sources.avro.channels = filech
agent.sinks.nullsink.channel = filech
Note, that as the file channel is much slower than the memory channel, I have changed the tests to load 100K messages instead of 500K.
Load 100K messages of length 500 bytes, varying the batch size:
Batch Size | Time (seconds) | TPS |
---|---|---|
1 | 140 | 714 |
10 | 23.3 | 4291 |
20 | 15.5 | 6451 |
40 | 11.5 | 8695 |
80 | 9.3 | 10752 |
160 | 8.6 | 11627 |
320 | 9.4 | 10638 |
640 | 8.7 | 11494 |
1000 | 7.7 | 12987 |
Notice that the file channel test exhibits a similar performance profile as the memory channel as the batch size increases, but at a much lower TPS.
Load 100K messages of varying size into a file channel using a batch size of 80.
Message Size | Time (seconds) | TPS |
---|---|---|
100 | 7.5 | 13333 |
200 | 7.8 | 12820 |
400 | 8.8 | 11363 |
500 | 8.7 | 11494 |
800 | 9.8 | 10204 |
1600 | 12.6 | 7936 |
3200 | 17.5 | 5714 |
6400 | 25.5 | 3921 |
12800 | 40 | 2500 |
Again, the performance profile looks similar to the memory channel test, but at lower TPS.
The final test I ran against file channels, is to examine the effect of a multiplexed channel. I loaded 100K messages using a batch size of 80 and a message length of 500. The flume config is:
agent.sources = avro
agent.sinks = nullsink nullsink2 nullsink3
agent.channels = filech filech2 filech3
agent.sources.avro.type = avro
agent.sources.avro.bind = 0.0.0.0
agent.sources.avro.port = 41414
agent.channels.filech.type = file
agent.channels.filech.checkpointDir = /var/flume/filech/checkpoint
agent.channels.filech.dataDirs = /var/flume/filech/data
agent.channels.filech.capacity = 1000000
agent.channels.filech.transactionCapacity = 1000
agent.channels.filech2.type = file
agent.channels.filech2.checkpointDir = /var/flume/filech2/checkpoint
agent.channels.filech2.dataDirs = /var/flume/filech2/data
agent.channels.filech2.capacity = 1000000
agent.channels.filech2.transactionCapacity = 1000
agent.channels.filech3.type = file
agent.channels.filech3.checkpointDir = /var/flume/filech3/checkpoint
agent.channels.filech3.dataDirs = /var/flume/filech3/data
agent.channels.filech3.capacity = 1000000
agent.channels.filech3.transactionCapacity = 1000
agent.sinks.nullsink.type = null
agent.sinks.nullsink2.type = null
agent.sinks.nullsink3.type = null
agent.sources.avro.selector = replicating
agent.sources.avro.channels = filech filech2 filech3
agent.sinks.nullsink.channel = filech
agent.sinks.nullsink2.channel = filech2
agent.sinks.nullsink3.channel = filech3
The time taken to load 100K messages to 1, 2 and 3 replicated channels is given below:
Single Channel | 2 Replicated Channels | 3 Replicated Channels |
---|---|---|
9.3 | 13.4 | 21 |
It looks like each replicated channel hurts performance significantly. I suspect I am hitting contention on on disk writes with the replicated channels - the machine I am testing on is a VM with disk stored on SAN, so the disk performance is not going to be great. If I get time in the future I may trying running this test again with SSD disks or on a machine with several internal disks to see the effect.
The TPS Flume is capable of handling varies significantly depending on the batch size and message size. Messages under 500 bytes seem pretty efficient, and a batch size of around 100 seems to be optimal in these tests.
Its also significant to note the performance impact a persistent file channel has - cutting throughput by almost 4 times.
I should point out that the hardware these tests were run on is nothing fantastic. I suspect file channel performance would be much better on SSD machines, with a separate disk for each channel.
I also didn't make any effort to tune any Flume settings. I did turn on Java GC logging to ensure Flume was not suffering from excessive full GC runs, which it was not.