ConsumerApp
Subclass of StreamsBootstrap.
Usage
Configures a
streams-bootstrap
Kafka consumer app
Configuration
pipeline.yaml
| # Holds configuration to use as values for the streams bootstrap consumer-app Helm
# chart.
# More documentation on ConsumerApp:
# https://github.com/bakdata/streams-bootstrap
- type: consumer-app
name: consumer-app # required
# Pipeline prefix that will prefix every component name. If you wish to not
# have any prefix you can specify an empty string.
prefix: ${pipeline.name}-
from: # Must not be null
topics: # read from topic
${pipeline.name}-input-topic:
type: input # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra
${pipeline.name}-input-pattern-topic:
type: pattern # Implied to be an input pattern if `role` is undefined
${pipeline.name}-extra-pattern-topic:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
components: # read from specific component
account-producer:
type: input # Implied when role is NOT specified
other-producer:
role: some-role # Implies `type` to be extra
component-as-input-pattern:
type: pattern # Implied to be an input pattern if `role` is undefined
component-as-extra-pattern:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
# to: # While the consumer-app does inherit from kafka-app, it does not need a
# `to` section, hence it does not support it.
namespace: namespace # required
# Allowed configs:
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/consumer-app
values: # required
kafka: # required, consumer-app-specific
bootstrapServers: ${config.kafka_brokers}
schemaRegistryUrl: ${config.schema_registry.url}
groupId: consumer-group-id
inputTopics:
- topic1
- topic2
inputPattern: input-pattern
labeledInputTopics:
input_role1:
- input_topic1
- input_topic2
input_role2:
- input_topic3
- input_topic4
labeledInputPatterns:
pattern_role1: input_pattern1
config:
my.consumer.config: my.value
nameOverride: override-with-this-name # kafka-app-specific
fullnameOverride: override-with-this-name # kafka-app-specific
autoscaling: # consumer-app-specific
enabled: false # Whether to enable auto-scaling using KEDA.
lagThreshold: 0 # Average target value to trigger scaling actions.
# This is the interval to check each trigger on.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
pollingInterval: 30
# The period to wait after the last trigger reported active before scaling
# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
cooldownPeriod: 300
# The offset reset policy for the consumer if the the consumer group is
# not yet subscribed to a partition.
offsetResetPolicy: earliest
# This setting is passed to the HPA definition that KEDA will create for a
# given resource and holds the maximum number of replicas of the target resouce.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
maxReplicas: 1
# Minimum number of replicas KEDA will scale the resource down to.
# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
minReplicas: 0
# If this property is set, KEDA will scale the resource down to this
# number of replicas.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
idleReplicas: 0
topics: # List of topics used by the consumer app.
- topic1
- topic2
additionalTriggers: [] # List of additional KEDA triggers.
# Helm repository configuration (optional)
# If not set the helm repo add will not be called. Useful when using local Helm charts
repo_config:
repository_name: bakdata-streams-bootstrap # required
url: https://bakdata.github.io/streams-bootstrap/ # required
repo_auth_flags:
username: user
password: pass
ca_file: /home/user/path/to/ca-file
insecure_skip_tls_verify: false
version: "2.12.0" # Helm chart version
|
Operations
deploy
Identical to StreamsBootstrap's deploy. The consumer app has no to section, so no topics are created and no schemas are submitted.
destroy
Uninstall Helm release.
reset
- Delete the consumer group offsets
clean
Similar to reset with an additional step:
- Delete persistent volume claims if
statefulSet is enabled and persistence is enabled