Skip to content

KafkaSourceConnector

Subclass of KafkaConnector.

Usage

Manages source connectors in your Kafka Connect cluster.

Configuration

pipeline.yaml
# Kafka source connector
- type: kafka-source-connector # required
  name: kafka-source-connector # required
  # Pipeline prefix that will prefix every component name. If you wish to not
  # have any prefix you can specify an empty string.
  prefix: ${pipeline.name}-
  # The source connector has no `from` section
  # from:
  # Topic(s) into which the component will write output
  to:
    topics:
      ${pipeline.name}-output-topic:
        type: output # Implied when role is NOT specified
      ${pipeline.name}-extra-topic:
        role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined
      ${pipeline.name}-error-topic:
        type: error
        # Currently KPOps supports Avro and JSON schemas.
        key_schema: key-schema # must implement SchemaProvider to use
        value_schema: value-schema
        partitions_count: 1
        replication_factor: 1
        configs: # https://kafka.apache.org/documentation/#topicconfigs
          cleanup.policy: compact
    models: # SchemaProvider is initiated with the values given here
      model: model
  # `app` contains application-specific settings, hence it does not have a rigid
  # structure. The fields below are just an example. Extensive documentation on
  # connectors: https://kafka.apache.org/documentation/#connectconfigs
  app: # required
    tasks.max: 1
  # Overriding Kafka Connect Resetter Helm values. E.g. to override the
  # Image Tag etc.
  resetter_values:
    imageTag: "1.2.3"
  # offset.storage.topic
  # https://kafka.apache.org/documentation/#connect_running
  offset_topic: offset_topic

Operations

deploy

  • Add the source connector to the Kafka Connect cluster
  • Create the output topics if provided (optional)
  • Register schemas in the Schema registry if provided (optional)

destroy

Remove the source connector from the Kafka Connect cluster.

reset

Delete state associated with the connector using bakdata's sink resetter.

clean

  • Delete all associated output topics
  • Delete all associated schemas in the Schema Registry
  • Delete state associated with the connector