Skip to content

StreamsApp

Subclass of KafkaApp and StreamsBootstrap.

Usage

Configures a streams-bootstrap Kafka Streams app

Configuration

pipeline.yaml
# StreamsApp component that configures a streams bootstrap app.
# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
- type: streams-app # required
  name: streams-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  # No arbitrary keys are allowed under `app`here
  # Allowed configs:
  # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
  app: # required
    # Streams Bootstrap streams section
    streams: # required, streams-app-specific
      brokers: ${config.kafka_brokers} # required
      schemaRegistryUrl: ${config.schema_registry.url}
      inputTopics:
        - topic1
        - topic2
      outputTopic: output-topic
      inputPattern: input-pattern
      extraInputTopics:
        input_role1:
          - input_topic1
          - input_topic2
        input_role2:
          - input_topic3
          - input_topic4
      extraInputPatterns:
        pattern_role1: input_pattern1
      extraOutputTopics:
        output_role1: output_topic1
        output_role2: output_topic2
      errorTopic: error-topic
      config:
        my.streams.config: my.value
    nameOverride: override-with-this-name # streams-app-specific
    autoscaling: # streams-app-specific
      consumerGroup: consumer-group # required
      lagThreshold: 0 # Average target value to trigger scaling actions.
      enabled: false # Whether to enable auto-scaling using KEDA.
      # This is the interval to check each trigger on.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
      pollingInterval: 30
      # The period to wait after the last trigger reported active before scaling
      #  the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
      cooldownPeriod: 300
      # The offset reset policy for the consumer if the the consumer group is
      # not yet subscribed to a partition.
      offsetResetPolicy: earliest
      # This setting is passed to the HPA definition that KEDA will create for a
      # given resource and holds the maximum number of replicas of the target resouce.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
      maxReplicas: 1
      # Minimum number of replicas KEDA will scale the resource down to.
      # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
      minReplicas: 0
      # If this property is set, KEDA will scale the resource down to this
      # number of replicas.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
      idleReplicas: 0
      topics: # List of auto-generated Kafka Streams topics used by the streams app.
        - topic1
        - topic2
  # Helm repository configuration (optional)
  # If not set the helm repo add will not be called. Useful when using local Helm charts
  repo_config:
    repository_name: bakdata-streams-bootstrap # required
    url: https://bakdata.github.io/streams-bootstrap/ # required
    repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "2.12.0" # Helm chart version

Operations

deploy

In addition to KubernetesApp's deploy:

  • Create topics if provided (optional)
  • Submit Avro schemas to the registry if provided (optional)

destroy

Uninstall Helm release.

reset

  • Delete the consumer group offsets
  • Delete Kafka Streams state

clean

Similar to reset with to additional steps:

  • Delete the app's output topics
  • Delete all associated schemas in the Schema Registry