Skip to content

ConsumerApp

Subclass of StreamsBootstrap.

Usage

Configures a streams-bootstrap Kafka consumer app

Configuration

pipeline.yaml
# Holds configuration to use as values for the streams bootstrap consumer-app Helm
# chart.
# More documentation on ConsumerApp:
# https://github.com/bakdata/streams-bootstrap
- type: consumer-app
  name: consumer-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: input # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # to: # While the consumer-app does inherit from kafka-app, it does not need a
  # `to` section, hence it does not support it.
  namespace: namespace # required
  # Allowed configs:
  # https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app
  values: # required
    kafka: # required, consumer-app-specific
      bootstrapServers: ${config.kafka_brokers}
      schemaRegistryUrl: ${config.schema_registry.url}
      groupId: consumer-group-id
      inputTopics:
        - topic1
        - topic2
      inputPattern: input-pattern
      labeledInputTopics:
        input_role1:
          - input_topic1
          - input_topic2
        input_role2:
          - input_topic3
          - input_topic4
      labeledInputPatterns:
        pattern_role1: input_pattern1
      config:
        my.consumer.config: my.value
    nameOverride: override-with-this-name # kafka-app-specific
    fullnameOverride: override-with-this-name # kafka-app-specific
    autoscaling: # consumer-app-specific
      enabled: false # Whether to enable auto-scaling using KEDA.
      lagThreshold: 0 # Average target value to trigger scaling actions.
      # This is the interval to check each trigger on.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
      pollingInterval: 30
      # The period to wait after the last trigger reported active before scaling
      #  the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
      cooldownPeriod: 300
      # The offset reset policy for the consumer if the the consumer group is
      # not yet subscribed to a partition.
      offsetResetPolicy: earliest
      # This setting is passed to the HPA definition that KEDA will create for a
      # given resource and holds the maximum number of replicas of the target resouce.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
      maxReplicas: 1
      # Minimum number of replicas KEDA will scale the resource down to.
      # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
      minReplicas: 0
      # If this property is set, KEDA will scale the resource down to this
      # number of replicas.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
      idleReplicas: 0
      topics: # List of topics used by the consumer app.
        - topic1
        - topic2
      additionalTriggers: [] # List of additional KEDA triggers.
  # Helm repository configuration (optional)
  # If not set the helm repo add will not be called. Useful when using local Helm charts
  repo_config:
    repository_name: bakdata-streams-bootstrap # required
    url: https://bakdata.github.io/streams-bootstrap/ # required
    repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "2.12.0" # Helm chart version

Operations

deploy

Identical to StreamsBootstrap's deploy. The consumer app has no to section, so no topics are created and no schemas are submitted.

destroy

Uninstall Helm release.

reset

  • Delete the consumer group offsets

clean

Similar to reset with an additional step:

  • Delete persistent volume claims if statefulSet is enabled and persistence is enabled