Large messages
Overview
The Large Messages extension adds support for handling messages that exceed Kafka's size limitations by using external storage mechanisms with automatic cleanup. It integrates with streams-bootstrap to transparently manage:
- large message serialization
- large message deserialization
- blob storage files cleanup
For more details, see the large messages module: streams-bootstrap-large-messages GitHub repository
There are two supported ways to enable cleanup for large messages:
- Implement
LargeMessageStreamsApp - Register a topic cleanup hook manually
Option 1: Implement LargeMessageStreamsApp
Use this option for Kafka Streams applications where large message cleanup should always run together with topic cleanup.
public final class MyStreamsApp implements LargeMessageStreamsApp {
@Override
public void buildTopology(final StreamsBuilderX builder) {
// build topology here
}
}
Option 2: Register a cleanup hook manually
If cleanup should only happen conditionally or requires custom behavior, a topic hook can be registered explicitly.
private final boolean largeMessageCleanupEnabled;
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final AppConfiguration<StreamsTopicConfig> configuration) {
final StreamsCleanUpConfiguration cleanUp =
StreamsApp.super.setupCleanUp(configuration);
if (this.largeMessageEnabled) {
return LargeMessageAppUtils.registerTopicHook(cleanUp, configuration);
}
return cleanUp;
}