Class KafkaApplication<R extends Runner,CR extends CleanUpRunner,O,E extends ExecutableApp<R,CR,O>,CA extends ConfiguredApp<E>,T,A,AC>

java.lang.Object
com.bakdata.kafka.KafkaApplication<R,CR,O,E,CA,T,A,AC>
Type Parameters:
R - type of Runner used by this app
CR - type of CleanUpRunner used by this app
O - type of execution options to create runner
E - type of ExecutableApp used by this app
CA - type of ConfiguredApp used by this app
T - type of topic config used by this app
A - type of app
AC - type of configuration used by this app
All Implemented Interfaces:
AutoCloseable, Runnable
Direct Known Subclasses:
KafkaProducerApplication, KafkaStreamsApplication

public abstract class KafkaApplication<R extends Runner,CR extends CleanUpRunner,O,E extends ExecutableApp<R,CR,O>,CA extends ConfiguredApp<E>,T,A,AC> extends Object implements Runnable, AutoCloseable

The base class for creating Kafka applications.

This class provides the following configuration options:
  • bootstrapServers
  • outputTopic
  • labeledOutputTopics
  • schemaRegistryUrl
  • kafkaConfig
To implement your Kafka application inherit from this class and add your custom options. Run it by creating an instance of your class and calling startApplication(String[]) from your main.
  • Constructor Details

    • KafkaApplication

      public KafkaApplication()
  • Method Details

    • startApplication

      public void startApplication(String[] args)

      This method should be called in the main method of your application

      This method calls System exit

      Parameters:
      args - Arguments passed in by the custom application class.
      See Also:
    • startApplicationWithoutExit

      public int startApplicationWithoutExit(String[] args)

      This method should be called in the main method of your application

      Parameters:
      args - Arguments passed in by the custom application class.
      Returns:
      Exit code of application
    • createExecutionOptions

      public abstract Optional<O> createExecutionOptions()
      Create options for running the app
      Returns:
      run options if available
      See Also:
    • createTopicConfig

      public abstract T createTopicConfig()
      Topics used by app
      Returns:
      topic configuration
    • createApp

      public abstract A createApp()
      Create a new app that will be configured and executed according to this application.
      Returns:
      app
    • clean

      public void clean()
      Clean all resources associated with this application
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      See Also:
    • stop

      public final void stop()
      Stop all applications that have been started asynchronously, e.g., by using run() or clean().
    • run

      public void run()
      Run the application.
      Specified by:
      run in interface Runnable
    • getRuntimeConfiguration

      public RuntimeConfiguration getRuntimeConfiguration()
    • createExecutableApp

      public final E createExecutableApp()
      Create a new ExecutableApp that will be executed according to the requested command.
      Returns:
      ExecutableApp
    • createConfiguredApp

      public final CA createConfiguredApp()
      Create a new ConfiguredApp that will be executed according to this application.
      Returns:
      ConfiguredApp
    • createConfiguration

      public abstract AC createConfiguration(T topics)
      Create configuration to configure app
      Parameters:
      topics - topic configuration
      Returns:
      configuration
    • createRunnableApp

      public final KafkaApplication.RunnableApp<R> createRunnableApp()
      Returns:
      KafkaApplication.RunnableApp
    • createCleanableApp

      public final KafkaApplication.CleanableApp<CR> createCleanableApp()
      Returns:
      KafkaApplication.CleanableApp
    • onApplicationStart

      public void onApplicationStart()
      Called before starting the application, e.g., invoking run()
    • prepareRun

      public void prepareRun()
      Called before running the application, i.e., invoking run()
    • prepareClean

      public void prepareClean()
      Called before cleaning the application, i.e., invoking clean()
    • createConfiguredApp

      protected abstract CA createConfiguredApp(A app, AC configuration)
      Create a new ConfiguredApp that will be executed according to the given config.
      Parameters:
      app - app to configure.
      configuration - configuration for app
      Returns:
      ConfiguredApp
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • getOutputTopic

      public String getOutputTopic()
    • getLabeledOutputTopics

      public Map<String,String> getLabeledOutputTopics()
    • getBootstrapServers

      public String getBootstrapServers()
    • getSchemaRegistryUrl

      public String getSchemaRegistryUrl()
    • getKafkaConfig

      public Map<String,String> getKafkaConfig()
    • setOutputTopic

      public void setOutputTopic(String outputTopic)
    • setLabeledOutputTopics

      public void setLabeledOutputTopics(Map<String,String> labeledOutputTopics)
    • setBootstrapServers

      public void setBootstrapServers(String bootstrapServers)
    • setSchemaRegistryUrl

      public void setSchemaRegistryUrl(String schemaRegistryUrl)
    • setKafkaConfig

      public void setKafkaConfig(Map<String,String> kafkaConfig)