Skip to main content
You use the Kafka source connector to stream events from Kafka into your . The connector connects to your Confluent Cloud Kafka cluster and Schema Registry using SASL/SCRAM authentication and service account–based API keys. Only the Avro format is currently supported with some limitations. This page explains how to connect to your Confluent Cloud Kafka cluster. Early access: the Kafka source connector is not yet supported for production use.

Prerequisites

To follow the steps on this page:
  • Create a target with the Real-time analytics capability enabled. You need your connection details.
  • Sign up for Confluent Cloud.
  • Create a Kafka cluster in Confluent Cloud.
This feature is currently not supported for on Microsoft Azure.

Access your Kafka cluster in Confluent Cloud

Take the following steps to prepare your Kafka cluster for connection:
  1. Create a service account If you already have a service account, you can reuse it. To create a new service account:
    1. Log in to Confluent Cloud.
    2. Click the burger menu at the top-right of the pane, then press Access control > Service accounts >Add service account.
    3. Enter the following details:
      • Name: tigerdata-access
      • Description: Service account for the Tiger Cloud source connector
    4. Add the service account owner role, then click Next.
    5. Select a role assignment, then click Add
    6. Click Next, then click Create service account.
  2. Create API keys
    1. In Confluent Cloud, click Home > Environments > Select your environment > Select your cluster.
    2. Under Cluster overview in the left sidebar, select API Keys.
    3. Click Add key, choose Service Account and click Next.
    4. Select tigerdata-access, then click Next.
    5. For your cluster, choose the Operation and select the following Permissions, then click Next:
      • Resource type: Cluster
      • Operation: DESCRIBE
      • Permission: ALLOW
    6. Click Download and continue, then securely store the ACL.
    7. Use the same procedure to add the following keys:
      • ACL 2: Topic access
        • Resource type: Topic
        • Topic name: Select the topics that Tiger Cloud should read
        • Pattern type: LITERAL
        • Operation: READ
        • Permission: ALLOW
      • ACL 3: Consumer group access
        • Resource type: Consumer group
        • Consumer group ID: tigerdata-kafka/<tiger_cloud_project_id>. See Find your connection details for where to find your ID
        • Pattern type: PREFIXED
        • Operation: READ
        • Permission: ALLOW You need these to configure your Kafka source connector.

Configure Confluent Cloud Schema Registry

The connector requires access to the Schema Registry to fetch schemas for Kafka topics. To configure the Schema Registry:
  1. Navigate to Schema Registry In Confluent Cloud, click Environments and select your environment, then click Stream Governance.
  2. Create a Schema Registry API key
    1. Click API Keys, then click Add API Key.
    2. Choose Service Account, select tigerdata-access, then click Next.
    3. Under Resource scope, choose Schema Registry, select the default environment, then click Next.
    4. In Create API Key, add the following, then click Create API Key :
      • Name: tigerdata-schema-registry-access
      • Description: API key for Tiger Cloud schema registry access
    5. Click Download API Key and securely store the API key and secret, then click Complete.
  3. Assign roles for Schema Registry
    1. Click the burger menu at the top-right of the pane, then press Access control > Accounts & access > Service accounts.
    2. Select the tigerdata-access service account.
    3. In the Access tab, add the following role assignments for All schema subjects:
      • ResourceOwner on the service account.
      • DeveloperRead on schema subjects. Choose All schema subjects or restrict to specific subjects as required.
    4. Save the role assignments.
Your Confluent Cloud Schema Registry is now accessible using the API key and secret.

Add Kafka source connector

Take the following steps to create a Kafka source connector in .
  1. In , select your
  2. Go to Connectors > Source connectors. Click New Connector, then select Kafka
  3. Click the pencil icon, then set the connector name
  4. Set up Kafka authentication Enter the name of your cluster in Confluent Cloud and the information from the first api-key-*.txt that you downloaded, then click Authenticate.
  5. Set up the Schema Registry Enter the service account ID and the information from the second api-key-*.txt that you downloaded, then click Authenticate.
  6. Select topics to sync Add the schema and table, map the columns in the table, and click Create connector.
Your Kafka connector is configured and ready to stream events.

Known limitations and unsupported types

The following Avro schema types are not supported:

Union types

Multi-type non-nullable unions are blocked. Examples:
  • Multiple type union:
    {
      "type": "record",
      "name": "Message",
      "fields": [
        {"name": "content", "type": ["string", "bytes", "null"]}
      ]
    }
    
  • Union as root schema:
    ["null", "string"]
    

Reference types (named type references)

Referencing a previously defined named type by name, instead of inline, is not supported. Examples:
  • Named type definition:
    {
      "type": "record",
      "name": "Address",
      "fields": [
        {"name": "street", "type": "string"},
        {"name": "city", "type": "string"}
      ]
    }
    
  • Failing reference:
    {
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "address", "type": "Address"}
      ]
    }
    

Unsupported logical types

Only the logical types in the hardcoded supported list are supported. This includes:
  • decimal, date, time-millis, time-micros
  • timestamp-millis, timestamp-micros, timestamp-nanos
  • local-timestamp-millis, local-timestamp-micros, local-timestamp-nanos
  • uuid, duration
Unsupported examples:
{
  "type": "int",
  "logicalType": "date-time"
}

{
  "type": "string",
  "logicalType": "json"
}

{
  "type": "bytes",
  "logicalType": "custom-type"
}