Skip to content

Testing

The streams-bootstrap testing tools provide utilities for testing Kafka Streams, Consumer, Producer and Consumer-Producer applications, covering both unit-level and integration-style scenarios.

They abstract common test concerns such as Kafka infrastructure setup, Schema Registry integration, application lifecycle handling, and consumer group verification, and are designed to work with real Kafka clusters as well as schema-aware test environments.

TestApplicationRunner

TestApplicationRunner is a test utility for running, configuring, and verifying Kafka applications in integration and system tests.

It abstracts away repetitive setup such as: - bootstrap servers - Schema Registry - Kafka client configuration - CLI argument wiring - lifecycle commands (run, clean, reset)

Typical use cases: - end-to-end tests - containerized test environments - embedded Kafka setups - CI pipelines

Typical Usage

TestApplicationRunner runner =
        TestApplicationRunner.create("localhost:9092")
                .withSchemaRegistry()
                .withStateDir(tempDir)
                .withNoStateStoreCaching();

All applications executed via this runner automatically inherit this configuration.

Bootstrap Servers - Passed via --bootstrap-servers - Also set directly on the application instance

Kafka Configuration - All provided Kafka properties are injected - Passed via --kafka-config key=value - Also merged into app.setKafkaConfig(...)

Schema Registry (optional) - Passed via --schema-registry-url - Only configured when explicitly enabled


Configuring Kafka for Tests

runner = runner.withKafkaConfig(Map.of("auto.offset.reset", "earliest"));

Behavior: - merged with existing configuration - immutable after creation - overrides application defaults


Kafka Streams–Specific Helpers

Configure State Directory
runner = runner.withStateDir(tempDir);

Sets:

state.dir = <tempDir>

Use this to: - isolate test runs - avoid state leakage between tests


Disable State Store Caching
runner = runner.withNoStateStoreCaching();

Sets:

statestore.cache.max.bytes = 0

Useful when: - asserting exact record counts - debugging processor behavior - avoiding cache-related timing issues


Consumer-Specific Helpers

Configure Session Timeout
runner = runner.withSessionTimeout(Duration.ofSeconds(5));

Sets:

session.timeout.ms = 5000

Useful for: - fast consumer group rebalancing - deterministic failure testing


Schema Registry Support

Enable a Test Schema Registry

runner = runner.withSchemaRegistry();

Creates: - isolated in-memory Schema Registry - random scope to avoid collisions - transparent integration for the application


Use a Custom TestSchemaRegistry

TestSchemaRegistry registry = new TestSchemaRegistry();
runner = runner.withSchemaRegistry(registry);

Use this when: - sharing schemas across applications - inspecting registered schemas during tests


Running Applications

CLI

CompletableFuture<Integer> exitCode =
        runner.run(app, "--some-flag");
  • invokes startApplicationWithoutExit
  • returns application exit code

Runnable

CompletableFuture<Void> execution = runner.run(app);
  • calls onApplicationStart()
  • runs application directly
  • suitable for long-running tests

Cleaning and Resetting Applications

Clean

runner.clean(app);

or

runner.clean(app, "--custom-arg");

Used to: - delete Kafka topics - clean local state - execute cleanup hooks


Reset

Supported for: - Streams applications - Consumer applications - Consumer–Producer applications

runner.reset(streamsApp);

Consumer Group Verification

ConsumerGroupVerifier verifier = runner.verify(streamsApp);

Allows you to: - assert consumer group existence - check stability - inspect committed offsets


Creating Test Clients

KafkaTestClient client = runner.newTestClient();

Provides: - AdminClient access - Producer/Consumer helpers - runtime-aware configuration


TestApplicationTopologyFactory

TestApplicationTopologyFactory is a test helper for Kafka Streams applications that integrates with Fluent Kafka Streams Tests.

It allows you to: - derive a TestTopology from a real application - reuse production topology and configuration - inject test-specific runtime settings


Typical Usage

TestApplicationTopologyFactory factory =
        TestApplicationTopologyFactory.withSchemaRegistry();

or without Schema Registry:

TestApplicationTopologyFactory factory = new TestApplicationTopologyFactory();

Schema Registry Support

Automatic Schema Registry

TestApplicationTopologyFactory factory =
        TestApplicationTopologyFactory.withSchemaRegistry();
  • random isolated scope
  • no cross-test collisions
  • safe for parallel execution

Custom Schema Registry

TestSchemaRegistry registry = new TestSchemaRegistry();
TestApplicationTopologyFactory factory =
        TestApplicationTopologyFactory.withSchemaRegistry(registry);

Modifying Kafka Configuration

factory = factory.with(Map.of("commit.interval.ms", 100));
  • merged into runtime configuration
  • applies only to tests
  • does not mutate application

Creating a TestTopology

TestTopology<String, MyValue> topology = factory.createTopology(app);

Execution flow: 1. application prepared 2. runtime configuration injected 3. topology extracted 4. TestTopology created


JUnit 5 Integration

TestTopologyExtension<String, MyValue> extension = factory.createTopologyExtension(app);

Accessing Kafka Properties

Map<String, Object> props = factory.getKafkaProperties(app);