StreamsApp
Subclass of and StreamsBootstrap.
Usage
Configures a
streams-bootstrap
Kafka Streams app
Configuration
pipeline.yaml
| # StreamsApp component that configures a streams bootstrap app.
# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
- type: streams-app # required
name: streams-app # required
# Pipeline prefix that will prefix every component name. If you wish to not
# have any prefix you can specify an empty string.
prefix: ${pipeline.name}-
from: # Must not be null
topics: # read from topic
${pipeline.name}-input-topic:
type: input # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra
${pipeline.name}-input-pattern-topic:
type: pattern # Implied to be an input pattern if `role` is undefined
${pipeline.name}-extra-pattern-topic:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
components: # read from specific component
account-producer:
type: input # Implied when role is NOT specified
other-producer:
role: some-role # Implies `type` to be extra
component-as-input-pattern:
type: pattern # Implied to be an input pattern if `role` is undefined
component-as-extra-pattern:
type: pattern # Implied to be an extra pattern if `role` is defined
role: some-role
# Topic(s) into which the component will write output
to:
topics:
${pipeline.name}-output-topic:
type: output # Implied when role is NOT specified
${pipeline.name}-extra-topic:
role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
${pipeline.name}-error-topic:
type: error
# Currently KPOps supports Avro and JSON schemas.
key_schema: key-schema # must implement SchemaProvider to use
value_schema: value-schema
partitions_count: 1
replication_factor: 1
configs: # https://kafka.apache.org/documentation/#topicconfigs
cleanup.policy: compact
models: # SchemaProvider is initiated with the values given here
model: model
namespace: namespace # required
# No arbitrary keys are allowed under `app`here
# Allowed configs:
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
values: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
brokers: ${config.kafka_brokers} # required
schemaRegistryUrl: ${config.schema_registry.url}
inputTopics:
- topic1
- topic2
outputTopic: output-topic
inputPattern: input-pattern
extraInputTopics:
input_role1:
- input_topic1
- input_topic2
input_role2:
- input_topic3
- input_topic4
extraInputPatterns:
pattern_role1: input_pattern1
extraOutputTopics:
output_role1: output_topic1
output_role2: output_topic2
errorTopic: error-topic
config:
my.streams.config: my.value
nameOverride: override-with-this-name # streams-app-specific
autoscaling: # streams-app-specific
consumerGroup: consumer-group # required
lagThreshold: 0 # Average target value to trigger scaling actions.
enabled: false # Whether to enable auto-scaling using KEDA.
# This is the interval to check each trigger on.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
pollingInterval: 30
# The period to wait after the last trigger reported active before scaling
# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
cooldownPeriod: 300
# The offset reset policy for the consumer if the the consumer group is
# not yet subscribed to a partition.
offsetResetPolicy: earliest
# This setting is passed to the HPA definition that KEDA will create for a
# given resource and holds the maximum number of replicas of the target resouce.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
maxReplicas: 1
# Minimum number of replicas KEDA will scale the resource down to.
# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
minReplicas: 0
# If this property is set, KEDA will scale the resource down to this
# number of replicas.
# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
idleReplicas: 0
topics: # List of auto-generated Kafka Streams topics used by the streams app.
- topic1
- topic2
# Helm repository configuration (optional)
# If not set the helm repo add will not be called. Useful when using local Helm charts
repo_config:
repository_name: bakdata-streams-bootstrap # required
url: https://bakdata.github.io/streams-bootstrap/ # required
repo_auth_flags:
username: user
password: pass
ca_file: /home/user/path/to/ca-file
insecure_skip_tls_verify: false
version: "2.12.0" # Helm chart version
|
Operations
deploy
In addition to KubernetesApp's deploy
:
- Create topics if provided (optional)
- Submit Avro schemas to the registry if provided (optional)
destroy
Uninstall Helm release.
reset
- Delete the consumer group offsets
- Delete Kafka Streams state
clean
Similar to reset
with to additional steps:
- Delete the app's output topics
- Delete all associated schemas in the Schema Registry