Defaults
KPOps has a very efficient way of dealing with repeating settings which manifests as defaults.yaml
. This file provides the user with the power to set defaults for any and all components, thus omitting the need to repeat the same settings in pipeline.yaml
.
See real-world examples for defaults
.
Features
Inheritance
An important mechanic of KPOps is that defaults
set for a component apply to all components that inherit from it.
It is possible, although not recommended, to add settings that are specific to a component's subclass. An example would be configuring offset_topic
under kafka-connector
instead of kafka-source-connector
.
Configuration
KPOps allows using multiple default values. The defaults.yaml
(or defaults_<env>.yaml
) files can be distributed across multiple files. These will be picked up by KPOps and get merged into a single pipeline.yaml
file.
KPOps starts from reading the default files from where the pipeline path is defined and picks up every defaults file on its way to where the pipeline_base_dir
is defined.
The deepest defaults.yaml
file in the folder hierarchy (i.e., the closest one to the pipeline.yaml
) overwrites the higher-level defaults' values.
It is important to note that defaults_{environment}.yaml
overrides only the settings that are explicitly set to be different from the ones in the base defaults
file.
defaults merge priority
Imagine the following folder structure, where the pipeline_base_dir
is configured to pipelines
:
| └─ pipelines
└── distributed-defaults
├── defaults.yaml
├── defaults_dev.yaml
└── pipeline-deep
├── defaults.yaml
└── pipeline.yaml
|
KPOps picks up the defaults in the following order (high to low priority):
./pipelines/distributed-defaults/pipeline-deep/defaults.yaml
./pipelines/distributed-defaults/defaults_dev.yaml
./pipelines/distributed-defaults/defaults.yaml
Components
The defaults
codeblocks in this section contain the full set of settings that are specific to the component. If a setting already exists in a parent config, it will not be included in the child's.
defaults.yaml
| # Base Kubernetes App
#
# Parent of: HelmApp
# Child of: PipelineComponent
kubernetes-app:
# Pipeline prefix that will prefix every component name. If you wish to not
# have any prefix you can specify an empty string.
prefix: ${pipeline.name}-
from: # Must not be null
topics: # read from topic
${pipeline.name}-input-topic:
type: input # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra
${pipeline.name}-input-pattern-topic:
type: pattern # Implied to be an input pattern if `role` is undefined
${pipeline.name}-extra-pattern-topic:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
components: # read from specific component
account-producer:
type: input # Implied when role is NOT specified
other-producer:
role: some-role # Implies `type` to be extra
component-as-input-pattern:
type: pattern # Implied to be an input pattern if `role` is undefined
component-as-extra-pattern:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
# Topic(s) into which the component will write output
to:
topics:
${pipeline.name}-output-topic:
type: output # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
${pipeline.name}-error-topic:
type: error
# Currently KPOps supports Avro and JSON schemas.
key_schema: key-schema # must implement SchemaProvider to use
value_schema: value-schema
partitions_count: 1
replication_factor: 1
configs: # https://kafka.apache.org/documentation/#topicconfigs
cleanup.policy: compact
models: # SchemaProvider is initiated with the values given here
model: model
namespace: namespace # required
values: # required
image: exampleImage # Example
debug: false # Example
commandLine: {} # Example
|
defaults.yaml
| # StreamsApp component that configures a streams bootstrap app.
#
# Child of: KafkaApp
# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
streams-app:
# No arbitrary keys are allowed under `app`here
# Allowed configs:
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
values: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
brokers: ${config.kafka_brokers} # required
schemaRegistryUrl: ${config.schema_registry.url}
inputTopics:
- topic1
- topic2
outputTopic: output-topic
inputPattern: input-pattern
extraInputTopics:
input_role1:
- input_topic1
- input_topic2
input_role2:
- input_topic3
- input_topic4
extraInputPatterns:
pattern_role1: input_pattern1
extraOutputTopics:
output_role1: output_topic1
output_role2: output_topic2
errorTopic: error-topic
config:
my.streams.config: my.value
nameOverride: override-with-this-name # streams-app-specific
autoscaling: # streams-app-specific
consumerGroup: consumer-group # required
lagThreshold: 0 # Average target value to trigger scaling actions.
enabled: false # Whether to enable auto-scaling using KEDA.
# This is the interval to check each trigger on.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
pollingInterval: 30
# The period to wait after the last trigger reported active before scaling
# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
cooldownPeriod: 300
# The offset reset policy for the consumer if the the consumer group is
# not yet subscribed to a partition.
offsetResetPolicy: earliest
# This setting is passed to the HPA definition that KEDA will create for a
# given resource and holds the maximum number of replicas of the target resouce.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
maxReplicas: 1
# Minimum number of replicas KEDA will scale the resource down to.
# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
minReplicas: 0
# If this property is set, KEDA will scale the resource down to this
# number of replicas.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
idleReplicas: 0
topics: # List of auto-generated Kafka Streams topics used by the streams app.
- topic1
- topic2
|
defaults.yaml
defaults.yaml
| # Kafka connector
#
# Parent of: KafkaSinkConnector, KafkaSourceConnector
# Child of: PipelineComponent
kafka-connector:
# Pipeline prefix that will prefix every component name. If you wish to not
# have any prefix you can specify an empty string.
prefix: ${pipeline.name}-
from: # Must not be null
topics: # read from topic
${pipeline.name}-input-topic:
type: input # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra
${pipeline.name}-input-pattern-topic:
type: pattern # Implied to be an input pattern if `role` is undefined
${pipeline.name}-extra-pattern-topic:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
components: # read from specific component
account-producer:
type: input # Implied when role is NOT specified
other-producer:
role: some-role # Implies `type` to be extra
component-as-input-pattern:
type: pattern # Implied to be an input pattern if `role` is undefined
component-as-extra-pattern:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
# Topic(s) into which the component will write output
to:
topics:
${pipeline.name}-output-topic:
type: output # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
${pipeline.name}-error-topic:
type: error
# Currently KPOps supports Avro and JSON schemas.
key_schema: key-schema # must implement SchemaProvider to use
value_schema: value-schema
partitions_count: 1
replication_factor: 1
configs: # https://kafka.apache.org/documentation/#topicconfigs
cleanup.policy: compact
models: # SchemaProvider is initiated with the values given here
model: model
# Full documentation on connectors: https://kafka.apache.org/documentation/#connectconfigs
config: # required
tasks.max: 1
# Overriding Kafka Connect Resetter Helm values. E.g. to override the
# Image Tag etc.
resetter_values:
imageTag: "1.2.3"
|
defaults.yaml
| # Kafka source connector
#
# Child of: KafkaConnector
kafka-source-connector:
# The source connector has no `from` section
# from:
# offset.storage.topic
# https://kafka.apache.org/documentation/#connect_running
offset_topic: offset_topic
|
defaults.yaml
| # Kafka sink connector
#
# Child of: KafkaConnector
kafka-sink-connector:
# No settings differ from `kafka-connector`
|