Skip to content

Streams applications

Streams apps are applications that process data in real-time as it flows through Kafka topics. They can be used to filter, transform, aggregate, or enrich data streams. Streams apps can also produce new messages to other topics based on the processed data.


Application lifecycle

Running an application

Kafka Streams applications are started via the KafkaStreamsApplication entry point.


Resetting an application

Streams applications support a dedicated reset operation that clears processing state while preserving the application definition and configuration. This is useful for reprocessing input data from the beginning.

When a reset is triggered, the following resources are affected:

Resource Action
State stores Cleared locally, changelog topics deleted
Internal topics Deleted (e.g. repartition topics)
Consumer offsets Reset to earliest for input topics
Output topics Preserved

Triggering a reset via CLI:

java -jar my-streams-app.jar reset

Triggering a reset programmatically:

try (StreamsCleanUpRunner cleanUpRunner = streamsApp.createCleanUpRunner()) {
    cleanUpRunner.reset();
}

After a reset, the application can be started again and will reprocess all input data.


Cleaning an application

The clean command performs everything that reset does and additionally removes the Kafka consumer groups and output topics created by the application.

java -jar my-streams-app.jar clean

Configuration

Topics

Streams applications support flexible topic configuration:

  • --input-topics: Comma-separated list of input topics
  • --input-pattern: Regex pattern for input topics
  • --output-topic: Default output topic
  • --error-topic: Topic for error records
  • --labeled-input-topics: Named input topics with different message types
  • --labeled-input-patterns: Additional labeled input topic patterns
  • --labeled-output-topics: Named output topics with different message types

Application ID

  • --application-id: Unique Kafka Streams application ID

Kafka properties

Base configuration

The following Kafka properties are configured by default for Streams applications in streams-bootstrap:

  • processing.guarantee = exactly_once_v2
  • producer.max.in.flight.requests.per.connection = 1
  • producer.acks = all
  • producer.compression.type = gzip

Custom Kafka properties

Kafka configuration can be customized by overriding createKafkaProperties():


@Override
public Map<String, Object> createKafkaProperties() {
    return Map.of(
            StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4,
            StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            LogAndContinueExceptionHandler.class.getName()
    );
}

Lifecycle hooks

Streams applications support the following hook types:

  • Cleanup hooks – for general cleanup logic not tied to Kafka topics
  • Topic hooks – for reacting to topic lifecycle events (e.g. deletion)
  • Reset hooks – for logic that should run only during an application reset

Clean up

Use cleanup hooks for logic that is not tied to Kafka topics, such as closing external resources or cleaning up temporary state.


@Override
public StreamsCleanUpConfiguration setupCleanUp(
        final AppConfiguration<StreamsTopicConfig> configuration) {

    return StreamsApp.super.setupCleanUp(configuration)
            .registerCleanHook(() -> {
                // Custom cleanup logic
            });
}

Topic hooks

Topic hooks allow Kafka Streams applications to react to Kafka topic lifecycle events, such as topic deletion during clean or reset operations.


@Override
public StreamsCleanUpConfiguration setupCleanUp(
        final AppConfiguration<StreamsTopicConfig> configuration) {

    return StreamsApp.super.setupCleanUp(configuration)
            .registerTopicHook(new TopicHook() {
                @Override
                public void deleted(final String topic) {
                    // Called when a managed topic is deleted
                    System.out.println("Deleted topic: " + topic);
                }

                @Override
                public void close() {
                    // Optional closing of connections/resources
                }
            });
}

Reset hooks

Reset hooks allow Kafka Streams applications to execute custom logic only during a reset operation. They are not invoked during a regular clean.


@Override
public StreamsCleanUpConfiguration setupCleanUp(
        final AppConfiguration<StreamsTopicConfig> configuration) {

    return StreamsApp.super.setupCleanUp(configuration)
            .registerResetHook(() -> {
                // Custom logic executed only during reset
            });
}

Execution options

On start

The onStreamsStart method is a lifecycle hook that gets called after Kafka Streams has successfully started. This hook receives a RunningStreams parameter that provides access to the running Kafka Streams instance and its configuration.


@Override
private void onStreamsStart(final RunningStreams runningStreams) {
    // Custom startup logic
}
Application server

A common use case for the onStreamsStart hook is to start an embedded application server (e.g., for REST APIs, GraphQL, gRPC).


@Override
private void onStreamsStart(final RunningStreams runningStreams) {
// Access the application server configuration  
    final Optional<HostInfo> applicationServer = runningStreams.getConfig().getApplicationServer();

    applicationServer.ifPresent(hostInfo -> {
        final String host = hostInfo.host();
        final int port = hostInfo.port();

        // Start your application server
        log.info("Starting application server on {}:{}", host, port);
        // startRestServer(host, port);  
        // startGrpcServer(host, port);  
    });
}

State listener

TODO

Uncaught exception handler

TODO

Closing options

TODO


Command line interface

Streams applications inherit standard CLI options from KafkaStreamsApplication. The following CLI options are streams-app-specific:

Option Description Default
--application-id Kafka Streams application ID Auto-generated
--volatile-group-instance-id Use volatile group instance ID. This changes shutdown behavior of the Kafka Streams instance. false

Deployment

TODO


Kafka Streams extensions

Several extensions are provided that simplify working with Kafka Streams.

Simple topic access

TODO

Error handling

TODO

Serde auto configuration

TODO