KosmoKrator

data

Apache Kafka CLI for AI Agents

Use the Apache Kafka CLI from KosmoKrator to call Apache Kafka tools headlessly, return JSON, inspect schemas, and automate workflows from coding agents, scripts, and CI.

7 functions 6 read 1 write API token auth

Apache Kafka CLI Setup

Apache Kafka can be configured headlessly with `kosmokrator integrations:configure kafka`.

# Install KosmoKrator first if it is not available on PATH.
curl -fsSL https://raw.githubusercontent.com/OpenCompanyApp/kosmokrator/main/install.sh | bash

# Configure and verify this integration.
kosmokrator integrations:configure kafka --set api_token="$KAFKA_API_TOKEN" --enable --read allow --write ask --json
kosmokrator integrations:doctor kafka --json
kosmokrator integrations:status --json

Credentials

Authentication type: API token api_token. Configure credentials once, then use the same stored profile from scripts, coding CLIs, Lua code mode, and the MCP gateway.

KeyEnv varTypeRequiredLabel
api_token KAFKA_API_TOKEN Secret secret yes API Token
cluster_id KAFKA_CLUSTER_ID Text text no Cluster ID

Call Apache Kafka Headlessly

Use the generic call form when another coding CLI or script needs a stable universal interface.

kosmo integrations:call kafka.kafka_list_topics '{
  "cluster_id": "example_cluster_id"
}' --json

Use the provider shortcut form for shorter human-facing commands.

kosmo integrations:kafka kafka_list_topics '{
  "cluster_id": "example_cluster_id"
}' --json

Agent Discovery Commands

These commands return structured output for coding agents that need to inspect capabilities before choosing a function.

kosmo integrations:docs kafka --json
kosmo integrations:docs kafka.kafka_list_topics --json
kosmo integrations:schema kafka.kafka_list_topics --json
kosmo integrations:search "Apache Kafka" --json
kosmo integrations:list --json

All CLI Functions

Every function below can be called headlessly. The generic form is stable across all integrations; the provider shortcut is shorter but specific to Apache Kafka.

kafka.kafka_list_topics

Read read

List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.

Parameters
cluster_id

Generic CLI call

kosmo integrations:call kafka.kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json

Provider shortcut

kosmo integrations:kafka kafka_list_topics '{"cluster_id":"example_cluster_id"}' --json

kafka.kafka_get_topic

Read read

Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.

Parameters
topic_name, cluster_id

Generic CLI call

kosmo integrations:call kafka.kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

Provider shortcut

kosmo integrations:kafka kafka_get_topic '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

kafka.kafka_create_topic

Write write

Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.

Parameters
topic_name, partitions_count, replication_factor, configs, cluster_id

Generic CLI call

kosmo integrations:call kafka.kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json

Provider shortcut

kosmo integrations:kafka kafka_create_topic '{"topic_name":"example_topic_name","partitions_count":1,"replication_factor":1,"configs":"example_configs","cluster_id":"example_cluster_id"}' --json

kafka.kafka_list_clusters

Read read

List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.

Parameters
none

Generic CLI call

kosmo integrations:call kafka.kafka_list_clusters '{}' --json

Provider shortcut

kosmo integrations:kafka kafka_list_clusters '{}' --json

kafka.kafka_get_cluster

Read read

Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.

Parameters
cluster_id

Generic CLI call

kosmo integrations:call kafka.kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json

Provider shortcut

kosmo integrations:kafka kafka_get_cluster '{"cluster_id":"example_cluster_id"}' --json

kafka.kafka_list_producers

Read read

List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.

Parameters
topic_name, cluster_id

Generic CLI call

kosmo integrations:call kafka.kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

Provider shortcut

kosmo integrations:kafka kafka_list_producers '{"topic_name":"example_topic_name","cluster_id":"example_cluster_id"}' --json

kafka.kafka_get_current_user

Read read

Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.

Parameters
none

Generic CLI call

kosmo integrations:call kafka.kafka_get_current_user '{}' --json

Provider shortcut

kosmo integrations:kafka kafka_get_current_user '{}' --json

Function Schemas

Use these parameter tables when building CLI payloads without calling integrations:schema first.

kafka.kafka_list_topics

List Kafka topics in a cluster. Returns topic names, partition counts, replication factors, and status.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_list_topics --json
ParameterTypeRequiredDescription
cluster_id string no Override the default Kafka cluster ID.

kafka.kafka_get_topic

Get full details of a specific Kafka topic by name. Returns partition count, replication factor, and topic configuration.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_get_topic --json
ParameterTypeRequiredDescription
topic_name string yes The name of the topic to retrieve.
cluster_id string no Override the default Kafka cluster ID.

kafka.kafka_create_topic

Create a new Kafka topic in a cluster. Specify the topic name, partition count, and optional replication factor and configs.

Operation
Write write
Schema command
kosmo integrations:schema kafka.kafka_create_topic --json
ParameterTypeRequiredDescription
topic_name string yes The name for the new topic.
partitions_count integer yes Number of partitions for the topic (e.g., 6).
replication_factor integer no Replication factor (e.g., 3 for production). Defaults to the cluster default.
configs object no JSON-encoded topic configs: retention.ms, cleanup.policy, etc.
cluster_id string no Override the default Kafka cluster ID.

kafka.kafka_list_clusters

List Kafka clusters in your Confluent Cloud environment. Returns cluster IDs, names, types, and status.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_list_clusters --json
ParameterTypeRequiredDescription
No parameters.

kafka.kafka_get_cluster

Get details of a specific Kafka cluster. Returns broker count, controller info, and cluster configuration.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_get_cluster --json
ParameterTypeRequiredDescription
cluster_id string no The cluster ID to retrieve. Uses the default cluster if not specified.

kafka.kafka_list_producers

List producers for a specific Kafka topic. Returns producer IDs, client IDs, and connection details.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_list_producers --json
ParameterTypeRequiredDescription
topic_name string yes The topic name to list producers for.
cluster_id string no Override the default Kafka cluster ID.

kafka.kafka_get_current_user

Get the currently authenticated Confluent Cloud user. Useful for verifying credentials and identifying the connected account.

Operation
Read read
Schema command
kosmo integrations:schema kafka.kafka_get_current_user --json
ParameterTypeRequiredDescription
No parameters.

Permissions

Headless calls still follow the integration read/write permission policy. Configure read/write defaults with integrations:configure. Add --force only for trusted automation that should bypass that policy.