Package com.bakdata.kafka.streams
Class ConfiguredStreamsApp<T extends StreamsApp>
java.lang.Object
com.bakdata.kafka.streams.ConfiguredStreamsApp<T>
- Type Parameters:
T- type ofStreamsApp
- All Implemented Interfaces:
ConfiguredApp<ExecutableStreamsApp<T>>,AutoCloseable
public class ConfiguredStreamsApp<T extends StreamsApp>
extends Object
implements ConfiguredApp<ExecutableStreamsApp<T>>
A
StreamsApp with a corresponding StreamsAppConfiguration-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()org.apache.kafka.streams.TopologycreateTopology(Map<String, Object> kafkaProperties) Create the topology of the Kafka Streams appgetApp()getKafkaProperties(RuntimeConfiguration runtimeConfiguration) This method creates the configuration to run aStreamsApp.Get topic configurationGet unique application identifier ofStreamsAppwithRuntimeConfiguration(RuntimeConfiguration runtimeConfiguration) Create anExecutableStreamsAppusing the providedRuntimeConfiguration
-
Constructor Details
-
ConfiguredStreamsApp
public ConfiguredStreamsApp(@NonNull T app, @NonNull @NonNull StreamsAppConfiguration configuration)
-
-
Method Details
-
getKafkaProperties
This method creates the configuration to run a
Configuration is created in the following orderStreamsApp.-
Exactly-once, in-order, and compression are configured:
processing.guarantee=exactly_once_v2 producer.max.in.flight.requests.per.connection=1 producer.acks=all producer.compression.type=gzip
-
Configs provided by
App.createKafkaProperties() -
Configs provided via environment variables (see
EnvironmentKafkaConfigParser.parseVariables(Map)) -
Configs provided by
RuntimeConfiguration.createKafkaProperties() -
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIGandStreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIGis configured usingStreamsApp.defaultSerializationConfig() -
StreamsConfig.APPLICATION_ID_CONFIGis configured usingStreamsApp.getUniqueAppId(StreamsAppConfiguration)
- Parameters:
runtimeConfiguration- configuration to run app with- Returns:
- Kafka configuration
-
Exactly-once, in-order, and compression are configured:
-
getUniqueAppId
Get unique application identifier ofStreamsApp- Returns:
- unique application identifier
- Throws:
IllegalArgumentException- if unique application identifier ofStreamsAppis different from provided application identifier inStreamsAppConfiguration- See Also:
-
getTopics
Get topic configuration- Returns:
- topic configuration
-
withRuntimeConfiguration
Create anExecutableStreamsAppusing the providedRuntimeConfiguration- Specified by:
withRuntimeConfigurationin interfaceConfiguredApp<T extends StreamsApp>- Parameters:
runtimeConfiguration- configuration to run app with- Returns:
ExecutableStreamsApp
-
createTopology
Create the topology of the Kafka Streams app- Parameters:
kafkaProperties- configuration that should be used by clients to configure Kafka utilities- Returns:
- topology of the Kafka Streams app
-
close
public void close()- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceConfiguredApp<T extends StreamsApp>
-
getApp
-