Skip to content

Migrate from V1 to V2

Derive component type automatically from class name

KPOps automatically infers the component type from the class name. Therefore, the type and schema_type attributes can be removed from your custom components. By convention the type would be the lower, and kebab cased name of the class.

1
2
3
class MyCoolStreamApp(StreamsApp):
-    type = "my-cool-stream-app"
+    ...

Because of this new convention producer has been renamed to producer-app. This must be addressed in your pipeline.yaml and defaults.yaml.

1
2
3
4
5
6
7
8
- producer:
+ producer-app:
    app:
        streams:
        outputTopic: output_topic
        extraOutputTopics:
            output_role1: output_topic1
            output_role2: output_topic2

Refactor input/output types

To section

In the to section these have changed:

  • The default type is output
  • If role is set, type is inferred to be extra
  • The type error needs to be defined explicitly
  to:
    topics:
      ${pipeline_name}-topic-1:
-       type: extra
        role: "role-1"
        ...
      ${pipeline_name}-topic-2:
-       type: output
        ...
      ${pipeline_name}-topic-3:
         type: error
         ...

From section

In the from section these have changed:

  • The default type is input
  • input-pattern type is replaced by pattern
  • If role is set, type is inferred to be extra
  • If role is set, type is explicitly set to pattern, this would be inferred type extra-pattern
  from:
    topics:
      ${pipeline_name}-input-topic:
-       type: input
        ...
      ${pipeline_name}-extra-topic:
-       type: extra
        role: topic-role
        ...
      ${pipeline_name}-input-pattern-topic:
-       type: input-pattern
+       type: pattern
        ...
      ${pipeline_name}-extra-pattern-topic:
-       type: extra-pattern
+       type: pattern
        role: some-role
        ...

Remove camel case conversion of internal models

All the internal KPOps models are now snake_case, and only Helm/Kubernetes values require camel casing. You can find an example of a pipeline.yaml in the following. Notice that the app section here remains untouched.

...
type: streams-app
  name: streams-app
  namespace: namespace
  app:
    streams:
      brokers: ${brokers}
      schemaRegistryUrl: ${schema_registry_url}
     autoscaling:
      consumerGroup: consumer-group
      lagThreshold: 0
      enabled: false
      pollingInterval: 30

  to:
    topics:
      ${pipeline_name}-output-topic:
        type: error
-       keySchema: key-schema
+       key_schema: key-schema
-       valueSchema: value-schema
+       value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs:
          cleanup.policy: compact
    models:
      model: model
  prefix: ${pipeline_name}-
- repoConfig:
+ repo_config:
-   repositoryName: bakdata-streams-bootstrap
+   repository_name: bakdata-streams-bootstrap
    url: https://bakdata.github.io/streams-bootstrap/
-   repoAuthFlags:
+   repo_auth_flags:
      username: user
      password: pass
      ca_file: /home/user/path/to/ca-file
      insecure_skip_tls_verify: false
  version: "1.0.4"
...

Refactor handling of Helm flags

If you are using the KubernetesApp class to define your own Kubernetes resource to deploy, the abstract function get_helm_chart that returns the chart for deploying the app using Helm is now a Python property and renamed to helm_chart.

1
2
3
4
5
6
7
class MyCoolApp(KubernetesApp):

+   @property
    @override
-   def get_helm_chart(self) -> str:
+   def helm_chart(self) -> str:
        return "./charts/charts-folder"

Plural broker field in pipeline config

Since you can pass a comma separated string of broker address, the broker field in KPOps is now plural. The pluralization has affected multiple areas:

config.yaml

1
2
3
4
5
6
  environment: development
- broker: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
+ brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
  kafka_connect_host: "http://localhost:8083"
  kafka_rest_host: "http://localhost:8082"
  schema_registry_url: "http://localhost:8081"

pipeline.yaml and default.yaml

The variable is now called brokers.

1
2
3
4
5
6
7
8
9
...
  app:
    streams:
-     brokers: ${broker}
+     brokers: ${brokers}
      schemaRegistryUrl: ${schema_registry_url}
    nameOverride: override-with-this-name
    imageTag: "1.0.0"
...

Environment variable

Previously, if you set the environment variable KPOPS_KAFKA_BROKER, you need to replace that now with KPOPS_KAFKA_BROKERS.