Package com.bakdata.kafka.streams
Class KafkaStreamsApplication<T extends StreamsApp>
java.lang.Object
com.bakdata.kafka.KafkaApplication<StreamsRunner,StreamsCleanUpRunner,StreamsExecutionOptions,ExecutableStreamsApp<T>,ConfiguredStreamsApp<T>,StreamsTopicConfig,T,StreamsAppConfiguration>
com.bakdata.kafka.streams.KafkaStreamsApplication<T>
- Type Parameters:
T
- type ofStreamsApp
created by this application
- All Implemented Interfaces:
AutoCloseable
,Runnable
- Direct Known Subclasses:
SimpleKafkaStreamsApplication
public abstract class KafkaStreamsApplication<T extends StreamsApp>
extends KafkaApplication<StreamsRunner,StreamsCleanUpRunner,StreamsExecutionOptions,ExecutableStreamsApp<T>,ConfiguredStreamsApp<T>,StreamsTopicConfig,T,StreamsAppConfiguration>
The base class for creating Kafka Streams applications.
This class provides the following configuration options in addition to those provided byKafkaApplication
:
inputTopics
inputPattern
errorTopic
labeledInputTopics
labeledInputPatterns
volatileGroupInstanceId
KafkaApplication.startApplication(String[])
from your main.-
Nested Class Summary
Nested classes/interfaces inherited from class com.bakdata.kafka.KafkaApplication
KafkaApplication.CleanableApp<CR extends CleanUpRunner>, KafkaApplication.RunnableApp<R extends Runner>
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
clean()
Reset the Kafka Streams application.Create configuration to configure appfinal ConfiguredStreamsApp<T>
createConfiguredApp
(T app, StreamsAppConfiguration configuration) Create a newConfiguredApp
that will be executed according to the given config.final Optional<StreamsExecutionOptions>
Create options for running the appprotected org.apache.kafka.streams.KafkaStreams.StateListener
Create aKafkaStreams.StateListener
to use for Kafka Streams.final StreamsTopicConfig
Topics used by appprotected org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler
Create aStreamsUncaughtExceptionHandler
to use for Kafka Streams.boolean
protected void
onStreamsStart
(RunningStreams runningStreams) Called after starting Kafka Streamsvoid
void
reset()
Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams application.void
setApplicationId
(String applicationId) void
setErrorTopic
(String errorTopic) void
setInputPattern
(Pattern inputPattern) void
setInputTopics
(List<String> inputTopics) void
setLabeledInputPatterns
(Map<String, Pattern> labeledInputPatterns) void
setLabeledInputTopics
(Map<String, List<String>> labeledInputTopics) void
setVolatileGroupInstanceId
(boolean volatileGroupInstanceId) toString()
Methods inherited from class com.bakdata.kafka.KafkaApplication
close, createApp, createCleanableApp, createConfiguredApp, createExecutableApp, createRunnableApp, getBootstrapServers, getKafkaConfig, getLabeledOutputTopics, getOutputTopic, getRuntimeConfiguration, getSchemaRegistryUrl, onApplicationStart, prepareRun, run, setBootstrapServers, setKafkaConfig, setLabeledOutputTopics, setOutputTopic, setSchemaRegistryUrl, startApplication, startApplicationWithoutExit, stop
-
Constructor Details
-
KafkaStreamsApplication
public KafkaStreamsApplication()
-
-
Method Details
-
clean
public void clean()Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate topics associated with the Kafka Streams application.- Overrides:
clean
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
reset
public void reset()Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams application. -
createExecutionOptions
Description copied from class:KafkaApplication
Create options for running the app- Specified by:
createExecutionOptions
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Returns:
- run options if available
- See Also:
-
createTopicConfig
Description copied from class:KafkaApplication
Topics used by app- Specified by:
createTopicConfig
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Returns:
- topic configuration
-
createConfiguredApp
public final ConfiguredStreamsApp<T> createConfiguredApp(T app, StreamsAppConfiguration configuration) Description copied from class:KafkaApplication
Create a newConfiguredApp
that will be executed according to the given config.- Specified by:
createConfiguredApp
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Parameters:
app
- app to configure.configuration
- configuration for app- Returns:
ConfiguredApp
-
createConfiguration
Description copied from class:KafkaApplication
Create configuration to configure app- Specified by:
createConfiguration
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Parameters:
topics
- topic configuration- Returns:
- configuration
-
prepareClean
public void prepareClean()- Overrides:
prepareClean
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
createStateListener
protected org.apache.kafka.streams.KafkaStreams.StateListener createStateListener()Create aKafkaStreams.StateListener
to use for Kafka Streams.- Returns:
KafkaStreams.StateListener
.NoOpStateListener
by default- See Also:
-
KafkaStreams.setStateListener(StateListener)
-
createUncaughtExceptionHandler
protected org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler createUncaughtExceptionHandler()Create aStreamsUncaughtExceptionHandler
to use for Kafka Streams.- Returns:
StreamsUncaughtExceptionHandler
.DefaultStreamsUncaughtExceptionHandler
by default- See Also:
-
KafkaStreams.setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
-
onStreamsStart
Called after starting Kafka Streams- Parameters:
runningStreams
- runningKafkaStreams
instance along with itsStreamsConfig
andTopology
-
toString
- Overrides:
toString
in classKafkaApplication<StreamsRunner,
StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
getInputTopics
-
getInputPattern
-
getErrorTopic
-
getLabeledInputTopics
-
getLabeledInputPatterns
-
isVolatileGroupInstanceId
public boolean isVolatileGroupInstanceId() -
getApplicationId
-
setInputTopics
-
setInputPattern
-
setErrorTopic
-
setLabeledInputTopics
-
setLabeledInputPatterns
-
setVolatileGroupInstanceId
public void setVolatileGroupInstanceId(boolean volatileGroupInstanceId) -
setApplicationId
-