ClickHouse 表引擎之 Kafka
文章目录
Kafka 表引擎简介
- 对接 Kafka 系统,订阅 Kafka 主题,接收消息
创建 Kafka 引擎表
-
声明
1 2 3 4 5 6 7 8 9 10 11
ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port, ...', kafka_topic_list = 'topic1, topic2, ...', kafka_group_name = 'consumer_group_name', kafka_format = 'data_format', [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch =N];
-
kafka_broker_list: kafka 节点地址列表,用逗号分隔
-
kafka_topic_list: 订阅的主题列表,用逗号分隔
-
kafka_group_name: 消费组名称,引擎会依据此名称创建消费组
-
kafka_format: 消息格式,如 TSV、JSONEachRow、CSV 等
-
kafka_row_delimiter: 一行数据的结束符,默认 ‘\0’
-
kafka_schema: kafka schema 参数
-
kafka_num_consumers: 消费者数量,默认 1
-
kafka_skip_broken_messages: 允许跳过的错误消息数量,默认0
-
kafka_commit_every_batch: kafka commit 频率,默认 0,即整个 Block 完全写入后才 commit
Kafka 表引擎其他参数
-
stream_poll_timeout_ms: 默认每 500ms 消费一次数据,写入缓存
-
刷新缓存触发条件:
- 一个数据块(kafka_max_block_size,默认 65536)写入完成
- 等待 7500 毫秒(stream_flush_interval_ms)
-
config.xml 中的 librdkafka 配置,参考 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
1 2 3
<kafka> <auto_offset_reset>smallest</auto_offset_reset> </kafka>
Kafka 引擎表一般用法
-
创建 Kafka 引擎表,充当数据管道
-
创建 MergeTree 引擎表,用于查询
-
创建物化视图,同步 kafka 数据到 MergeTree 引擎表
1 2
CREATE MATERIALIZED VIEW kafka_view TO mergetree_table AS SELECT col1, col2, ... FROM kafka_table;
-
要停止数据同步,可以删除视图,也可以卸载视图
1 2 3 4
-- 删除 DROP TABLE kafka_view; -- 卸载 DETACH TABLE kafka_view;
-
恢复数据同步,装载视图
1 2
ATTACH MATERIALIZED VIEW kafka_view TO mergetree_table AS SELECT col1, col2, ... FROM kafka_table;
文章作者 Colben
上次更新 2020-10-08