Kafka 表引擎简介

  • 对接 Kafka 系统,订阅 Kafka 主题,接收消息

创建 Kafka 引擎表

  • 声明

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port, ...',
        kafka_topic_list = 'topic1, topic2, ...',
        kafka_group_name = 'consumer_group_name',
        kafka_format = 'data_format',
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_schema = '',]
        [kafka_num_consumers = N,]
        [kafka_skip_broken_messages = N,]
        [kafka_commit_every_batch =N];
    
  • kafka_broker_list: kafka 节点地址列表,用逗号分隔

  • kafka_topic_list: 订阅的主题列表,用逗号分隔

  • kafka_group_name: 消费组名称,引擎会依据此名称创建消费组

  • kafka_format: 消息格式,如 TSV、JSONEachRow、CSV 等

  • kafka_row_delimiter: 一行数据的结束符,默认 ‘\0’

  • kafka_schema: kafka schema 参数

  • kafka_num_consumers: 消费者数量,默认 1

  • kafka_skip_broken_messages: 允许跳过的错误消息数量,默认0

  • kafka_commit_every_batch: kafka commit 频率,默认 0,即整个 Block 完全写入后才 commit

Kafka 表引擎其他参数

  • stream_poll_timeout_ms: 默认每 500ms 消费一次数据,写入缓存

  • 刷新缓存触发条件:

    • 一个数据块(kafka_max_block_size,默认 65536)写入完成
    • 等待 7500 毫秒(stream_flush_interval_ms)
  • config.xml 中的 librdkafka 配置,参考 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

    1
    2
    3
    
    <kafka>
        <auto_offset_reset>smallest</auto_offset_reset>
    </kafka>
    

Kafka 引擎表一般用法

  • 创建 Kafka 引擎表,充当数据管道

  • 创建 MergeTree 引擎表,用于查询

  • 创建物化视图,同步 kafka 数据到 MergeTree 引擎表

    1
    2
    
    CREATE MATERIALIZED VIEW kafka_view TO mergetree_table
        AS SELECT col1, col2, ... FROM kafka_table;
    
  • 要停止数据同步,可以删除视图,也可以卸载视图

    1
    2
    3
    4
    
    -- 删除
    DROP TABLE kafka_view;
    -- 卸载
    DETACH TABLE kafka_view;
    
  • 恢复数据同步,装载视图

    1
    2
    
    ATTACH MATERIALIZED VIEW kafka_view TO mergetree_table
        AS SELECT col1, col2, ... FROM kafka_table;