Skip to content

Defaults

KPOps has a very efficient way of dealing with repeating settings which manifests as defaults.yaml. This file provides the user with the power to set defaults for any and all components, thus omitting the need to repeat the same settings in pipeline.yaml.

See real-world examples for defaults.

Features

Inheritance

An important mechanic of KPOps is that defaults set for a component apply to all components that inherit from it.

It is possible, although not recommended, to add settings that are specific to a component's subclass. An example would be configuring offset_topic under kafka-connector instead of kafka-source-connector.

Configuration

KPOps allows using multiple default values. The defaults.yaml (or defaults_<env>.yaml) files can be distributed across multiple files. These will be picked up by KPOps and get merged into a single pipeline.yaml file. KPOps starts from reading the default files from where the pipeline path is defined and picks up every defaults file on its way to where the pipeline_base_dir is defined.

The deepest defaults.yaml file in the folder hierarchy (i.e., the closest one to the pipeline.yaml) overwrites the higher-level defaults' values.

It is important to note that defaults_{environment}.yaml overrides only the settings that are explicitly set to be different from the ones in the base defaults file.

defaults merge priority

Imagine the following folder structure, where the pipeline_base_dir is configured to pipelines:

1
2
3
4
5
6
7
└─ pipelines
   └── distributed-defaults
       ├── defaults.yaml
       ├── defaults_dev.yaml
       └── pipeline-deep
           ├── defaults.yaml
           └── pipeline.yaml

KPOps picks up the defaults in the following order (high to low priority):

  • ./pipelines/distributed-defaults/pipeline-deep/defaults.yaml
  • ./pipelines/distributed-defaults/defaults_dev.yaml
  • ./pipelines/distributed-defaults/defaults.yaml

Components

The defaults codeblocks in this section contain the full set of settings that are specific to the component. If a setting already exists in a parent config, it will not be included in the child's.

KubernetesApp

defaults.yaml
# Base Kubernetes App
#
# Parent of: HelmApp
# Child of: PipelineComponent
kubernetes-app:
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: input # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  values: # required
    image: exampleImage # Example
    debug: false # Example
    commandLine: {} # Example

StreamsApp

defaults.yaml
# StreamsApp component that configures a streams bootstrap app.
#
# Child of: KafkaApp
# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
streams-app:
  # No arbitrary keys are allowed under `app`here
  # Allowed configs:
  # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
  values: # required
    # Streams Bootstrap streams section
    streams: # required, streams-app-specific
      brokers: ${config.kafka_brokers} # required
      schemaRegistryUrl: ${config.schema_registry.url}
      inputTopics:
        - topic1
        - topic2
      outputTopic: output-topic
      inputPattern: input-pattern
      extraInputTopics:
        input_role1:
          - input_topic1
          - input_topic2
        input_role2:
          - input_topic3
          - input_topic4
      extraInputPatterns:
        pattern_role1: input_pattern1
      extraOutputTopics:
        output_role1: output_topic1
        output_role2: output_topic2
      errorTopic: error-topic
      config:
        my.streams.config: my.value
    nameOverride: override-with-this-name # streams-app-specific
    autoscaling: # streams-app-specific
      consumerGroup: consumer-group # required
      lagThreshold: 0 # Average target value to trigger scaling actions.
      enabled: false # Whether to enable auto-scaling using KEDA.
      # This is the interval to check each trigger on.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
      pollingInterval: 30
      # The period to wait after the last trigger reported active before scaling
      #  the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
      cooldownPeriod: 300
      # The offset reset policy for the consumer if the the consumer group is
      # not yet subscribed to a partition.
      offsetResetPolicy: earliest
      # This setting is passed to the HPA definition that KEDA will create for a
      # given resource and holds the maximum number of replicas of the target resouce.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
      maxReplicas: 1
      # Minimum number of replicas KEDA will scale the resource down to.
      # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
      minReplicas: 0
      # If this property is set, KEDA will scale the resource down to this
      # number of replicas.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
      idleReplicas: 0
      topics: # List of auto-generated Kafka Streams topics used by the streams app.
        - topic1
        - topic2

ProducerApp

defaults.yaml

KafkaConnector

defaults.yaml
# Kafka connector
#
# Parent of: KafkaSinkConnector, KafkaSourceConnector
# Child of: PipelineComponent
kafka-connector:
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: input # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
  config: # required
    tasks.max: 1
  # Overriding Kafka Connect Resetter Helm values. E.g. to override the
  # Image Tag etc.
  resetter_values:
    imageTag: "1.2.3"

KafkaSourceConnector

defaults.yaml
1
2
3
4
5
6
7
8
9
# Kafka source connector
#
# Child of: KafkaConnector
kafka-source-connector:
  # The source connector has no `from` section
  # from:
  # offset.storage.topic
  # https://kafka.apache.org/documentation/#connect_running
  offset_topic: offset_topic

KafkaSinkConnector

defaults.yaml
1
2
3
4
5
# Kafka sink connector
#
# Child of: KafkaConnector
kafka-sink-connector:
  # No settings differ from `kafka-connector`