POST /topics
參數
名稱 | 說明 | 預設值 |
---|---|---|
name | (必填) topic 名稱 | 無 |
partitions | (選填) partition 數量 | 1 |
replicas | (選填) replication 數量 | 1 |
probability | (選填) 分佈機率 | 無,預設依照 kafka 的分佈方式 |
- replicas 數量須 <= brokers 數量
cURL 範例
建立名為 test1 且 partitions 為 1,replicas 為 1 的 topic
curl -X POST http://localhost:8001/topics \
-H "Content-Type: application/json" \
-d '{"topics":[{
"name": "test1",
"partitions": 1,
"replicas": 1
}]}'
所有在 JSON Response configs
裡頭的參數也可以透過此 api 來設定。範例如下
curl -X POST http://localhost:8001/topics \
-H "Content-Type: application/json" \
-d '{"topics":[{
"name": "test1",
"configs": {
"max.message.bytes": 1000
}}]}'
JSON Response 範例
name
: topic 名稱partitions
: 該 topic 下所有 partitionsid
: partition id,從 0 開始遞增earliest
: partition 現存的資料最早的紀錄位置latest
: partition 最新紀錄位置replicas
: partition 副本資料broker
: 儲存此副本的 broker 在叢集中的識別碼lag
: 副本同步狀況,計算公式為副本 LEO (Log End offset) - HW (High Watermark),用以判斷副本最新資料與所有 ISR 資料水位比較size
: 副本資料大小,單位為 bytesleader
: 此副本所在的節點是否為 leaderinSync
: 此副本所在的節點是否位於 ISR (In-Sync Replicas) 列表裡頭。每個 partition 都會有一個 ISR,由能夠和 leader 保持同步的 follower 與 leader 組成的集合isFuture
: 此副本是否正準備由另一份新的副本取代,可能的觸發時機點為 reassign partitions,副本準備要搬移到不同節點,或者副本準備移到同節點的不同路徑下path
: 副本於節點中的路徑
configs
: 此 topic 正在使用的參數,這些參數都可以在建立 topic 時設定
{
"name": "test1",
"partitions": [
{
"id": 0,
"earliest": 0,
"latest": 0,
"replicas": [
{
"broker": 1001,
"lag": 0,
"size": 0,
"leader": true,
"inSync": true,
"isFuture": false,
"path": "/tmp/log-folder-0"
}
]
}
],
"configs": {
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"cleanup.policy": "delete",
"flush.ms": "9223372036854775807",
"follower.replication.throttled.replicas": "",
"segment.bytes": "1073741824",
"retention.ms": "604800000",
"flush.messages": "9223372036854775807",
"message.format.version": "3.0-IV1",
"max.compaction.lag.ms": "9223372036854775807",
"file.delete.delay.ms": "60000",
"max.message.bytes": "1048588",
"min.compaction.lag.ms": "0",
"message.timestamp.type": "CreateTime",
"preallocate": "false",
"min.cleanable.dirty.ratio": "0.5",
"index.interval.bytes": "4096",
"unclean.leader.election.enable": "false",
"retention.bytes": "-1",
"delete.retention.ms": "86400000",
"segment.ms": "604800000",
"message.timestamp.difference.max.ms": "9223372036854775807",
"segment.index.bytes": "10485760"
}
}
Kafka Topic 的建立需要時間,Web Service 和 Kafka 的溝通中間存在 Race Condition,遇到這種情況時
POST /topics
回傳結果會不包含 topic 的詳細訊息,如果有穩定取得 topic 詳細內容的需求, 建議在POST /topics
後,等待一段時間再透過GET /topics
來取得詳細的 topic 資訊
GET /topics
參數
名稱 | 說明 | 預設值 |
---|---|---|
partition | (選填) 指定要查看哪一個 partition | 無,代表全部 partitions |
listInternal | (選填) 為 boolean 值。若填寫 true,則會列出 kafka 內部所使用的 topics,如 __commit_offsets。若設為 false,則不會列出此種 topics | true,代表列出所有 topics |
poll_timeout | (選填) 拉取最新資料時間戳記的等待時間,當沒有指定時回傳內容將不包含最新資料時間戳記 | 無,代表不拉取最新資料的時間戳記 |
cURL 範例
curl -X GET http://localhost:8001/topics
JSON Response 範例
{
"topics": [
{
"name": "test1",
"activeGroupIds": [
"groupId-1666757298494"
],
"partitions": [
{
"id": 0,
"earliest": 0,
"latest": 0,
"replicas": [
{
"broker": 1001,
"lag": 0,
"size": 0,
"leader": true,
"inSync": true,
"isFuture": false,
"path": "/tmp/log-folder-0"
}
],
"maxTimestamp": 1666757305832,
"timestampOfLatestRecord": 1666757554723
}
],
"configs": {
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"cleanup.policy": "delete",
"flush.ms": "9223372036854775807",
"follower.replication.throttled.replicas": "",
"segment.bytes": "1073741824",
"retention.ms": "604800000",
"flush.messages": "9223372036854775807",
"message.format.version": "3.0-IV1",
"max.compaction.lag.ms": "9223372036854775807",
"file.delete.delay.ms": "60000",
"max.message.bytes": "1048588",
"min.compaction.lag.ms": "0",
"message.timestamp.type": "CreateTime",
"preallocate": "false",
"min.cleanable.dirty.ratio": "0.5",
"index.interval.bytes": "4096",
"unclean.leader.election.enable": "false",
"retention.bytes": "-1",
"delete.retention.ms": "86400000",
"segment.ms": "604800000",
"message.timestamp.difference.max.ms": "9223372036854775807",
"segment.index.bytes": "10485760"
}
}
]
}
GET /topics/{topicName}
參數
名稱 | 說明 | 預設值 |
---|---|---|
partition | (選填) 指定要查看哪一個 partition | 無,代表全部 partitions |
cURL 範例
查詢名為 test1 的 topic 資訊
curl -X GET http://localhost:8001/topics/test1
JSON Response 範例
{
"name": "test1",
"activeGroupIds": [
"groupId-1666757298494"
],
"partitions": [
{
"id": 0,
"earliest": 0,
"latest": 0,
"replicas": [
{
"broker": 1001,
"lag": 0,
"size": 0,
"leader": true,
"inSync": true,
"isFuture": false,
"path": "/tmp/log-folder-0"
}
],
"maxTimestamp": 1666757305832
}
],
"configs": {
"compression.type": "producer",
"leader.replication.throttled.replicas": "",
"message.downconversion.enable": "true",
"min.insync.replicas": "1",
"segment.jitter.ms": "0",
"cleanup.policy": "delete",
"flush.ms": "9223372036854775807",
"follower.replication.throttled.replicas": "",
"segment.bytes": "1073741824",
"retention.ms": "604800000",
"flush.messages": "9223372036854775807",
"message.format.version": "3.0-IV1",
"max.compaction.lag.ms": "9223372036854775807",
"file.delete.delay.ms": "60000",
"max.message.bytes": "1048588",
"min.compaction.lag.ms": "0",
"message.timestamp.type": "CreateTime",
"preallocate": "false",
"min.cleanable.dirty.ratio": "0.5",
"index.interval.bytes": "4096",
"unclean.leader.election.enable": "false",
"retention.bytes": "-1",
"delete.retention.ms": "86400000",
"segment.ms": "604800000",
"message.timestamp.difference.max.ms": "9223372036854775807",
"segment.index.bytes": "10485760"
}
}
DELETE /topics/{topicName}
cURL 範例
curl -X DELETE "http://localhost:8001/topics/mytopic"