Streams applications
Streams apps are applications that process data in real-time as it flows through Kafka topics. They can be used to filter, transform, aggregate, or enrich data streams. Streams apps can also produce new messages to other topics based on the processed data.
Application lifecycle
Running an application
Kafka Streams applications are started via the KafkaStreamsApplication entry point.
Resetting an application
Streams applications support a dedicated reset operation that clears processing state while preserving the
application definition and configuration. This is useful for reprocessing input data from the beginning.
When a reset is triggered, the following resources are affected:
| Resource | Action |
|---|---|
| State stores | Cleared locally, changelog topics deleted |
| Internal topics | Deleted (e.g. repartition topics) |
| Consumer offsets | Reset to earliest for input topics |
| Output topics | Preserved |
Triggering a reset via CLI:
java -jar my-streams-app.jar reset
Triggering a reset programmatically:
try (StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()) {
cleanUpRunner.reset();
}
After a reset, the application can be started again and will reprocess all input data.
Cleaning an application
The clean command performs everything that reset does and additionally removes the Kafka consumer groups and output
topics created by the application.
java -jar my-streams-app.jar clean
Configuration
Topics
Streams applications support flexible topic configuration:
--input-topics: Comma-separated list of input topics--input-pattern: Regex pattern for input topics--output-topic: Default output topic--error-topic: Topic for error records--labeled-input-topics: Named input topics with different message types--labeled-input-patterns: Additional labeled input topic patterns--labeled-output-topics: Named output topics with different message types
Application ID
--application-id: Unique Kafka Streams application ID
Kafka properties
Base configuration
The following Kafka properties are configured by default for Streams applications in streams-bootstrap:
processing.guarantee = exactly_once_v2producer.max.in.flight.requests.per.connection = 1producer.acks = allproducer.compression.type = gzip
Custom Kafka properties
Kafka configuration can be customized by overriding createKafkaProperties():
@Override
public Map<String, Object> createKafkaProperties() {
return Map.of(
StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4,
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueExceptionHandler.class.getName()
);
}
Lifecycle hooks
Streams applications support the following hook types:
- Cleanup hooks – for general cleanup logic not tied to Kafka topics
- Topic hooks – for reacting to topic lifecycle events (e.g. deletion)
- Reset hooks – for logic that should run only during an application reset
Clean up
Use cleanup hooks for logic that is not tied to Kafka topics, such as closing external resources or cleaning up temporary state.
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final AppConfiguration<StreamsTopicConfig> configuration) {
return StreamsApp.super.setupCleanUp(configuration)
.registerCleanHook(() -> {
// Custom cleanup logic
});
}
Topic hooks
Topic hooks allow Kafka Streams applications to react to Kafka topic lifecycle events, such as topic
deletion during clean or reset operations.
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final AppConfiguration<StreamsTopicConfig> configuration) {
return StreamsApp.super.setupCleanUp(configuration)
.registerTopicHook(new TopicHook() {
@Override
public void deleted(final String topic) {
// Called when a managed topic is deleted
System.out.println("Deleted topic: " + topic);
}
@Override
public void close() {
// Optional closing of connections/resources
}
});
}
Reset hooks
Reset hooks allow Kafka Streams applications to execute custom logic only during a reset operation. They are not invoked during a regular clean.
@Override
public StreamsCleanUpConfiguration setupCleanUp(
final AppConfiguration<StreamsTopicConfig> configuration) {
return StreamsApp.super.setupCleanUp(configuration)
.registerResetHook(() -> {
// Custom logic executed only during reset
});
}
Execution options
On start
The onStreamsStart method is a lifecycle hook that gets called after Kafka Streams has successfully started. This hook
receives a RunningStreams parameter that provides access to the running Kafka Streams instance and its configuration.
@Override
private void onStreamsStart(final RunningStreams runningStreams) {
// Custom startup logic
}
Application server
A common use case for the onStreamsStart hook is to start an embedded application server (e.g., for REST APIs,
GraphQL, gRPC).
@Override
private void onStreamsStart(final RunningStreams runningStreams) {
// Access the application server configuration
final Optional<HostInfo> applicationServer = runningStreams.getConfig().getApplicationServer();
applicationServer.ifPresent(hostInfo -> {
final String host = hostInfo.host();
final int port = hostInfo.port();
// Start your application server
log.info("Starting application server on {}:{}", host, port);
// startRestServer(host, port);
// startGrpcServer(host, port);
});
}
State listener
TODO
Uncaught exception handler
TODO
Closing options
TODO
Command line interface
Streams applications inherit standard CLI options from KafkaStreamsApplication. The following CLI options are
streams-app-specific:
| Option | Description | Default |
|---|---|---|
--application-id |
Kafka Streams application ID | Auto-generated |
--volatile-group-instance-id |
Use volatile group instance ID. This changes shutdown behavior of the Kafka Streams instance. | false |
Deployment
TODO
Kafka Streams extensions
Several extensions are provided that simplify working with Kafka Streams.
Simple topic access
TODO
Error handling
TODO
Serde auto configuration
TODO