Package com.bakdata.kafka.streams
Class ConfiguredStreamsApp<T extends StreamsApp>
java.lang.Object
com.bakdata.kafka.streams.ConfiguredStreamsApp<T>
- Type Parameters:
T
- type ofStreamsApp
- All Implemented Interfaces:
ConfiguredApp<ExecutableStreamsApp<T>>
,AutoCloseable
public class ConfiguredStreamsApp<T extends StreamsApp>
extends Object
implements ConfiguredApp<ExecutableStreamsApp<T>>
A
StreamsApp
with a corresponding StreamsAppConfiguration
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
org.apache.kafka.streams.Topology
createTopology
(Map<String, Object> kafkaProperties) Create the topology of the Kafka Streams appgetApp()
getKafkaProperties
(RuntimeConfiguration runtimeConfiguration) This method creates the configuration to run aStreamsApp
.Get topic configurationGet unique application identifier ofStreamsApp
withRuntimeConfiguration
(RuntimeConfiguration runtimeConfiguration) Create anExecutableStreamsApp
using the providedRuntimeConfiguration
-
Constructor Details
-
ConfiguredStreamsApp
public ConfiguredStreamsApp(@NonNull T app, @NonNull @NonNull StreamsAppConfiguration configuration)
-
-
Method Details
-
getKafkaProperties
This method creates the configuration to run a
Configuration is created in the following orderStreamsApp
.-
Exactly-once, in-order, and compression are configured:
processing.guarantee=exactly_once_v2 producer.max.in.flight.requests.per.connection=1 producer.acks=all producer.compression.type=gzip
-
Configs provided by
App.createKafkaProperties()
-
Configs provided via environment variables (see
EnvironmentKafkaConfigParser.parseVariables(Map)
) -
Configs provided by
RuntimeConfiguration.createKafkaProperties()
-
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG
andStreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
is configured usingStreamsApp.defaultSerializationConfig()
-
StreamsConfig.APPLICATION_ID_CONFIG
is configured usingStreamsApp.getUniqueAppId(StreamsAppConfiguration)
- Parameters:
runtimeConfiguration
- configuration to run app with- Returns:
- Kafka configuration
-
Exactly-once, in-order, and compression are configured:
-
getUniqueAppId
Get unique application identifier ofStreamsApp
- Returns:
- unique application identifier
- Throws:
IllegalArgumentException
- if unique application identifier ofStreamsApp
is different from provided application identifier inStreamsAppConfiguration
- See Also:
-
getTopics
Get topic configuration- Returns:
- topic configuration
-
withRuntimeConfiguration
Create anExecutableStreamsApp
using the providedRuntimeConfiguration
- Specified by:
withRuntimeConfiguration
in interfaceConfiguredApp<T extends StreamsApp>
- Parameters:
runtimeConfiguration
- configuration to run app with- Returns:
ExecutableStreamsApp
-
createTopology
Create the topology of the Kafka Streams app- Parameters:
kafkaProperties
- configuration that should be used by clients to configure Kafka utilities- Returns:
- topology of the Kafka Streams app
-
close
public void close()- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceConfiguredApp<T extends StreamsApp>
-
getApp
-