Package com.bakdata.kafka.streams
Class KafkaStreamsApplication<T extends StreamsApp>
java.lang.Object
com.bakdata.kafka.KafkaApplication<StreamsRunner,StreamsCleanUpRunner,StreamsExecutionOptions,ExecutableStreamsApp<T>,ConfiguredStreamsApp<T>,StreamsTopicConfig,T,StreamsAppConfiguration>
com.bakdata.kafka.streams.KafkaStreamsApplication<T>
- Type Parameters:
T- type ofStreamsAppcreated by this application
- All Implemented Interfaces:
AutoCloseable,Runnable
- Direct Known Subclasses:
SimpleKafkaStreamsApplication
public abstract class KafkaStreamsApplication<T extends StreamsApp>
extends KafkaApplication<StreamsRunner,StreamsCleanUpRunner,StreamsExecutionOptions,ExecutableStreamsApp<T>,ConfiguredStreamsApp<T>,StreamsTopicConfig,T,StreamsAppConfiguration>
The base class for creating Kafka Streams applications.
This class provides the following configuration options in addition to those provided byKafkaApplication:
inputTopicsinputPatternerrorTopiclabeledInputTopicslabeledInputPatternsvolatileGroupInstanceId
KafkaApplication.startApplication(String[]) from your main.-
Nested Class Summary
Nested classes/interfaces inherited from class com.bakdata.kafka.KafkaApplication
KafkaApplication.CleanableApp<CR extends CleanUpRunner>, KafkaApplication.RunnableApp<R extends Runner> -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidclean()Reset the Kafka Streams application.Create configuration to configure appfinal ConfiguredStreamsApp<T>createConfiguredApp(T app, StreamsAppConfiguration configuration) Create a newConfiguredAppthat will be executed according to the given config.final Optional<StreamsExecutionOptions>Create options for running the appprotected org.apache.kafka.streams.KafkaStreams.StateListenerCreate aKafkaStreams.StateListenerto use for Kafka Streams.final StreamsTopicConfigTopics used by appprotected org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandlerCreate aStreamsUncaughtExceptionHandlerto use for Kafka Streams.booleanprotected voidonStreamsStart(RunningStreams runningStreams) Called after starting Kafka Streamsvoidvoidreset()Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams application.voidsetApplicationId(String applicationId) voidsetErrorTopic(String errorTopic) voidsetInputPattern(Pattern inputPattern) voidsetInputTopics(List<String> inputTopics) voidsetLabeledInputPatterns(Map<String, Pattern> labeledInputPatterns) voidsetLabeledInputTopics(Map<String, List<String>> labeledInputTopics) voidsetVolatileGroupInstanceId(boolean volatileGroupInstanceId) toString()Methods inherited from class com.bakdata.kafka.KafkaApplication
close, createApp, createCleanableApp, createConfiguredApp, createExecutableApp, createRunnableApp, getBootstrapServers, getKafkaConfig, getLabeledOutputTopics, getOutputTopic, getRuntimeConfiguration, getSchemaRegistryUrl, onApplicationStart, prepareRun, run, setBootstrapServers, setKafkaConfig, setLabeledOutputTopics, setOutputTopic, setSchemaRegistryUrl, startApplication, startApplicationWithoutExit, stop
-
Constructor Details
-
KafkaStreamsApplication
public KafkaStreamsApplication()
-
-
Method Details
-
clean
public void clean()Reset the Kafka Streams application. Additionally, delete the consumer group and all output and intermediate topics associated with the Kafka Streams application.- Overrides:
cleanin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
reset
public void reset()Clear all state stores, consumer group offsets, and internal topics associated with the Kafka Streams application. -
createExecutionOptions
Description copied from class:KafkaApplicationCreate options for running the app- Specified by:
createExecutionOptionsin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Returns:
- run options if available
- See Also:
-
createTopicConfig
Description copied from class:KafkaApplicationTopics used by app- Specified by:
createTopicConfigin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Returns:
- topic configuration
-
createConfiguredApp
public final ConfiguredStreamsApp<T> createConfiguredApp(T app, StreamsAppConfiguration configuration) Description copied from class:KafkaApplicationCreate a newConfiguredAppthat will be executed according to the given config.- Specified by:
createConfiguredAppin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Parameters:
app- app to configure.configuration- configuration for app- Returns:
ConfiguredApp
-
createConfiguration
Description copied from class:KafkaApplicationCreate configuration to configure app- Specified by:
createConfigurationin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration> - Parameters:
topics- topic configuration- Returns:
- configuration
-
prepareClean
public void prepareClean()- Overrides:
prepareCleanin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
createStateListener
protected org.apache.kafka.streams.KafkaStreams.StateListener createStateListener()Create aKafkaStreams.StateListenerto use for Kafka Streams.- Returns:
KafkaStreams.StateListener.NoOpStateListenerby default- See Also:
-
KafkaStreams.setStateListener(StateListener)
-
createUncaughtExceptionHandler
protected org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler createUncaughtExceptionHandler()Create aStreamsUncaughtExceptionHandlerto use for Kafka Streams.- Returns:
StreamsUncaughtExceptionHandler.DefaultStreamsUncaughtExceptionHandlerby default- See Also:
-
KafkaStreams.setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
-
onStreamsStart
Called after starting Kafka Streams- Parameters:
runningStreams- runningKafkaStreamsinstance along with itsStreamsConfigandTopology
-
toString
- Overrides:
toStringin classKafkaApplication<StreamsRunner,StreamsCleanUpRunner, StreamsExecutionOptions, ExecutableStreamsApp<T extends StreamsApp>, ConfiguredStreamsApp<T extends StreamsApp>, StreamsTopicConfig, T extends StreamsApp, StreamsAppConfiguration>
-
getInputTopics
-
getInputPattern
-
getErrorTopic
-
getLabeledInputTopics
-
getLabeledInputPatterns
-
isVolatileGroupInstanceId
public boolean isVolatileGroupInstanceId() -
getApplicationId
-
setInputTopics
-
setInputPattern
-
setErrorTopic
-
setLabeledInputTopics
-
setLabeledInputPatterns
-
setVolatileGroupInstanceId
public void setVolatileGroupInstanceId(boolean volatileGroupInstanceId) -
setApplicationId
-