Skip to content

pipeline.yaml

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# Kubernetes app managed through Helm with an associated Helm chart
- type: helm-app
  name: helm-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example.
  app: # required
    image: exampleImage # Example
    debug: false # Example
    commandLine: {} # Example
  # Helm repository configuration (optional)
  # If not set the helm repo add will not be called. Useful when using local Helm charts
  repo_config:
    repository_name: bakdata-streams-bootstrap # required
    url: https://bakdata.github.io/streams-bootstrap/ # required
    repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "1.0.0" # Helm chart version
# Base component for Kafka-based components.
# Producer or streaming apps should inherit from this class.
- type: kafka-app # required
  name: kafka-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # `app` can contain application-specific settings, hence  the user is free to
  # add the key-value pairs they need.
  app: # required
    streams: # required
      brokers: ${config.kafka_brokers} # required
      schemaRegistryUrl: ${config.schema_registry.url}
    nameOverride: override-with-this-name # kafka-app-specific
    imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app
# Kafka sink connector
- type: kafka-sink-connector
  name: kafka-sink-connector # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example. Extensive documentation on
  # connectors: https://kafka.apache.org/documentation/#connectconfigs
  app: # required
    tasks.max: 1
  # Overriding Kafka Connect Resetter Helm values. E.g. to override the
  # Image Tag etc.
  resetter_values:
    imageTag: "1.2.3"
# Kafka source connector
- type: kafka-source-connector # required
  name: kafka-source-connector # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  # The source connector has no `from` section
  # from:
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example. Extensive documentation on
  # connectors: https://kafka.apache.org/documentation/#connectconfigs
  app: # required
    tasks.max: 1
  # Overriding Kafka Connect Resetter Helm values. E.g. to override the
  # Image Tag etc.
  resetter_values:
    imageTag: "1.2.3"
  # offset.storage.topic
  # https://kafka.apache.org/documentation/#connect_running
  offset_topic: offset_topic
# Base Kubernetes App
- type: kubernetes-app
  name: kubernetes-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example.
  app: # required
    image: exampleImage # Example
    debug: false # Example
    commandLine: {} # Example
# Holds configuration to use as values for the streams bootstrap producer-app Helm
# chart.
# More documentation on ProducerApp:
# https://github.com/bakdata/streams-bootstrap
- type: producer-app
  name: producer-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  # from: # While the producer-app does inherit from kafka-app, it does not need a
  # `from` section, hence it does not support it.
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  # Allowed configs:
  # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
  app: # required
    streams: # required, producer-app-specific
      brokers: ${config.kafka_brokers} # required
      schemaRegistryUrl: ${config.schema_registry.url}
      outputTopic: output_topic
      extraOutputTopics:
        output_role1: output_topic1
        output_role2: output_topic2
    nameOverride: override-with-this-name # kafka-app-specific
  # Helm repository configuration (optional)
  # If not set the helm repo add will not be called. Useful when using local Helm charts
  repo_config:
    repository_name: bakdata-streams-bootstrap # required
    url: https://bakdata.github.io/streams-bootstrap/ # required
    repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "2.12.0" # Helm chart version
# StreamsApp component that configures a streams bootstrap app.
# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
- type: streams-app # required
  name: streams-app # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  namespace: namespace # required
  # No arbitrary keys are allowed under `app`here
  # Allowed configs:
  # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
  app: # required
    # Streams Bootstrap streams section
    streams: # required, streams-app-specific
      brokers: ${config.kafka_brokers} # required
      schemaRegistryUrl: ${config.schema_registry.url}
      inputTopics:
        - topic1
        - topic2
      outputTopic: output-topic
      inputPattern: input-pattern
      extraInputTopics:
        input_role1:
          - input_topic1
          - input_topic2
        input_role2:
          - input_topic3
          - input_topic4
      extraInputPatterns:
        pattern_role1: input_pattern1
      extraOutputTopics:
        output_role1: output_topic1
        output_role2: output_topic2
      errorTopic: error-topic
      config:
        my.streams.config: my.value
    nameOverride: override-with-this-name # streams-app-specific
    autoscaling: # streams-app-specific
      consumerGroup: consumer-group # required
      lagThreshold: 0 # Average target value to trigger scaling actions.
      enabled: false # Whether to enable auto-scaling using KEDA.
      # This is the interval to check each trigger on.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
      pollingInterval: 30
      # The period to wait after the last trigger reported active before scaling
      #  the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
      cooldownPeriod: 300
      # The offset reset policy for the consumer if the the consumer group is
      # not yet subscribed to a partition.
      offsetResetPolicy: earliest
      # This setting is passed to the HPA definition that KEDA will create for a
      # given resource and holds the maximum number of replicas of the target resouce.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
      maxReplicas: 1
      # Minimum number of replicas KEDA will scale the resource down to.
      # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
      minReplicas: 0
      # If this property is set, KEDA will scale the resource down to this
      # number of replicas.
      # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
      idleReplicas: 0
      topics: # List of auto-generated Kafka Streams topics used by the streams app.
        - topic1
        - topic2
  # Helm repository configuration (optional)
  # If not set the helm repo add will not be called. Useful when using local Helm charts
  repo_config:
    repository_name: bakdata-streams-bootstrap # required
    url: https://bakdata.github.io/streams-bootstrap/ # required
    repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "2.12.0" # Helm chart version