Skip to content

KafkaSinkConnector

Subclass of KafkaConnector.

Usage

Lets other systems pull data from Apache Kafka.

Configuration

pipeline.yaml
# Kafka sink connector
- type: kafka-sink-connector
  name: kafka-sink-connector # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  from: # Must not be null
    topics: # read from topic
      ${pipeline.name}-input-topic:
        type: input # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra
      ${pipeline.name}-input-pattern-topic:
        type: pattern # Implied to be an input pattern if `role` is undefined
      ${pipeline.name}-extra-pattern-topic:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
    components: # read from specific component
      account-producer:
        type: output # Implied when role is NOT specified
      other-producer:
        role: some-role # Implies `type` to be extra
      component-as-input-pattern:
        type: pattern # Implied to be an input pattern if `role` is undefined
      component-as-extra-pattern:
        type: pattern # Implied to be an extra pattern if `role` is defined
        role: some-role
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example. Extensive documentation on
  # connectors: https://kafka.apache.org/documentation/#connectconfigs
  app: # required
    tasks.max: 1
  # Overriding Kafka Connect Resetter Helm values. E.g. to override the
  # Image Tag etc.
  resetter_values:
    imageTag: "1.2.3"

Operations

deploy

  • Add the sink connector to the Kafka Connect cluster
  • Create the output topics if provided (optional)
  • Register schemas in the Schema Registry if provided (optional)

destroy

The associated sink connector is removed from the Kafka Connect cluster.

reset

Reset the consumer group offsets using bakdata's sink resetter.

clean

  • Delete associated consumer group
  • Delete configured error topics