Перейти к основному содержанию
Перейти к основному содержанию

Движок таблицы S3Queue

Этот движок обеспечивает интеграцию с экосистемой Amazon S3 и позволяет выполнять потоковый импорт. Он аналогичен движкам Kafka и RabbitMQ, но предоставляет функции, специфичные для S3.

Важно учитывать следующее примечание из исходного PR по реализации S3Queue: когда к таблице с этим движком присоединяется MATERIALIZED VIEW, движок таблицы S3Queue начинает собирать данные в фоновом режиме.

Создать таблицу

CREATE TABLE s3_queue_engine_table (name String, value UInt32)
    ENGINE = S3Queue(path, [NOSIGN, | aws_access_key_id, aws_secret_access_key,] format, [compression], [headers])
    [SETTINGS]
    [mode = '',]
    [after_processing = 'keep',]
    [keeper_path = '',]
    [loading_retries = 0,]
    [processing_threads_num = 16,]
    [parallel_inserts = false,]
    [enable_logging_to_queue_log = true,]
    [last_processed_path = "",]
    [tracked_files_limit = 1000,]
    [tracked_file_ttl_sec = 0,]
    [polling_min_timeout_ms = 1000,]
    [polling_max_timeout_ms = 10000,]
    [polling_backoff_ms = 0,]
    [cleanup_interval_min_ms = 10000,]
    [cleanup_interval_max_ms = 30000,]
    [buckets = 0,]
    [list_objects_batch_size = 1000,]
    [enable_hash_ring_filtering = 0,]
    [max_processed_files_before_commit = 100,]
    [max_processed_rows_before_commit = 0,]
    [max_processed_bytes_before_commit = 0,]
    [max_processing_time_sec_before_commit = 0,]
Примечание

До версии 24.7 требуется использовать префикс s3queue_ для всех настроек, кроме mode, after_processing и keeper_path.

Параметры движка

Параметры S3Queue такие же, как у табличного движка S3. См. раздел «Параметры» здесь.

Пример

CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered';

Использование именованных коллекций:

<clickhouse>
    <named_collections>
        <s3queue_conf>
            <url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
            <access_key_id>test<access_key_id>
            <secret_access_key>test</secret_access_key>
        </s3queue_conf>
    </named_collections>
</clickhouse>
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
    mode = 'ordered';

Settings

Чтобы получить список настроек, заданных для таблицы, используйте таблицу system.s3_queue_settings. Доступно, начиная с версии 24.10.

Имена настроек (24.7+)

Начиная с версии 24.7, настройки S3Queue можно указывать с префиксом s3queue_ или без него:

  • Современный синтаксис (24.7+): processing_threads_num, tracked_file_ttl_sec и т. д.
  • Устаревший синтаксис (все версии): s3queue_processing_threads_num, s3queue_tracked_file_ttl_sec и т. д.

Оба варианта поддерживаются в 24.7+. Примеры на этой странице используют современный синтаксис без префикса.

Mode

Возможные значения:

  • unordered — В режиме unordered множество всех уже обработанных файлов отслеживается с помощью постоянных узлов в ZooKeeper.
  • ordered — В режиме ordered файлы обрабатываются в лексикографическом порядке. Это означает, что если файл с именем BBB был обработан в какой‑то момент, а позже в бакет был добавлен файл с именем AA, он будет проигнорирован. В ZooKeeper сохраняются только максимальное имя (в лексикографическом смысле) успешно обработанного файла и имена файлов, которые будут повторно загружены после неудачной попытки загрузки.

Значение по умолчанию: ordered в версиях до 24.6. Начиная с 24.6 значение по умолчанию отсутствует, настройку требуется указывать вручную. Для таблиц, созданных в более ранних версиях, значение по умолчанию останется ordered для сохранения совместимости.

after_processing

Что делать с файлом после успешной обработки.

Возможные значения:

  • keep.
  • delete.
  • move.
  • tag.

Значение по умолчанию: keep.

Для варианта move требуются дополнительные настройки. В случае перемещения в пределах того же бакета необходимо указать новый префикс пути в параметре after_processing_move_prefix.

Перемещение в другой S3‑бакет требует указания URI целевого бакета в параметре after_processing_move_uri, а также учетных данных доступа к S3 в параметрах after_processing_move_access_key_id и after_processing_move_secret_access_key.

Пример:

CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
    mode = 'unordered',
    after_processing = 'move',
    after_processing_retries = 20,
    after_processing_move_prefix = 'dst_prefix',
    after_processing_move_uri = 'https://clickhouse-public-datasets.s3.amazonaws.com/dst-bucket',
    after_processing_move_access_key_id = 'test',
    after_processing_move_secret_access_key = 'test';

Для перемещения данных из одного контейнера Azure в другой необходимо указать строку подключения Blob Storage в параметре after_processing_move_connection_string и имя контейнера в параметре after_processing_move_container. См. настройки AzureQueue.

Для добавления тегов необходимо указать ключ и значение тега в параметрах after_processing_tag_key и after_processing_tag_value.

after_processing_retries

Количество повторных попыток выполнения запрошенного действия послеобработки, после которых попытки прекращаются.

Возможные значения:

  • Неотрицательное целое число.

Значение по умолчанию: 10.

after_processing_move_access_key_id

ID ключа доступа (Access Key ID) для S3‑бакета, в который нужно переместить успешно обработанные файлы, если целевым местом назначения является другой S3‑бакет.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

after_processing_move_prefix

Префикс пути, в который перемещаются успешно обработанные файлы. Применимо как при перемещении в пределах того же бакета, так и при перемещении в другой бакет.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

after_processing_move_secret_access_key

Secret Access Key для S3‑бакета, в который нужно перемещать успешно обработанные файлы, если целевой ресурс — другой S3‑бакет.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

after_processing_move_uri

URI S3-бакета, в который следует перемещать успешно обработанные файлы, если местом назначения является другой S3-бакет.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

after_processing_tag_key

Ключ тега, который будет использоваться для пометки успешно обработанных файлов, если after_processing='tag'.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

after_processing_tag_value

Значение тега, которое будет присвоено успешно обработанным файлам, если after_processing='tag'.

Возможные значения:

  • Строка.

Значение по умолчанию: пустая строка.

keeper_path

Путь в ZooKeeper может быть задан в параметрах движка таблицы, либо путь по умолчанию может быть сформирован из пути, указанного в глобальной конфигурации, и UUID таблицы. Возможные значения:

  • строка.

Значение по умолчанию: /.

loading_retries

Повторять загрузку файла до указанного количества раз. По умолчанию повторы не выполняются. Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 0.

processing_threads_num

Количество потоков обработки. Применяется только в режиме Unordered.

Значение по умолчанию: число CPU или 16.

parallel_inserts

По умолчанию processing_threads_num будет выполнять один INSERT, поэтому файлы будут только загружаться и парситься в несколько потоков. Но это ограничивает степень параллелизма, поэтому для лучшей пропускной способности используйте parallel_inserts=true — это позволит вставлять данные параллельно (но имейте в виду, что это приведёт к большему количеству создаваемых частей данных для семейства движков MergeTree).

Примечание

INSERT-запросы будут создаваться с учётом настроек max_process*_before_commit.

Значение по умолчанию: false.

enable_logging_to_s3queue_log

Включает логирование в system.s3queue_log.

Значение по умолчанию: 0.

polling_min_timeout_ms

Указывает минимальное время в миллисекундах, которое ClickHouse ожидает перед следующей попыткой опроса.

Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 1000.

polling_max_timeout_ms

Определяет максимальное время в миллисекундах, в течение которого ClickHouse ждёт перед запуском следующей попытки опроса.

Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 10000.

polling_backoff_ms

Определяет дополнительное время ожидания, добавляемое к предыдущему интервалу опроса при отсутствии новых файлов. Следующий опрос выполняется после истечения времени, равного сумме предыдущего интервала и этого значения backoff, либо по наступлении максимального интервала — в зависимости от того, что меньше.

Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 0.

tracked_files_limit

Позволяет ограничить количество узлов ZooKeeper при использовании режима unordered; не влияет на режим ordered. Если лимит достигнут, самые старые обработанные файлы будут удалены из узла ZooKeeper и обработаны повторно.

Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 1000.

tracked_file_ttl_sec

Максимальное время в секундах для хранения обработанных файлов в узле ZooKeeper (по умолчанию хранятся бессрочно) в режиме unordered; не влияет на режим ordered. По истечении указанного времени файл будет повторно импортирован.

Возможные значения:

  • Положительное целое число.

Значение по умолчанию: 0.

cleanup_interval_min_ms

Для режима Ordered. Определяет минимальное значение интервала перепланирования фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и ограничения на их максимальное количество.

Значение по умолчанию: 10000.

cleanup_interval_max_ms

Для режима Ordered. Определяет верхнюю границу интервала перепланирования фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и максимального числа отслеживаемых файлов.

Значение по умолчанию: 30000.

buckets

Для режима Ordered. Доступно начиная с версии 24.6. Если существует несколько реплик таблицы S3Queue, каждая из которых работает с одним и тем же каталогом метаданных в keeper, значение buckets должно быть не меньше количества реплик. Если также используется настройка processing_threads, имеет смысл дополнительно увеличить значение buckets, так как она определяет фактический уровень параллелизма обработки в S3Queue.

use_persistent_processing_nodes

По умолчанию таблица S3Queue всегда использовала эфемерные узлы обработки, что могло приводить к дублированию данных в случае, если сессия ZooKeeper истекает до того, как S3Queue зафиксирует обработанные файлы в ZooKeeper, но после того, как обработка уже началась. Эта настройка гарантирует исключение возможности появления дубликатов при истечении сессии Keeper.

persistent_processing_nodes_ttl_seconds

В случае некорректного завершения работы сервера, если включён use_persistent_processing_nodes, могут остаться неудалённые узлы обработки. Этот параметр определяет период времени, по истечении которого эти узлы обработки могут быть безопасно удалены.

Значение по умолчанию: 3600 (1 час).

Настройки S3

Движок поддерживает все настройки S3. Дополнительную информацию о настройках S3 см. здесь.

Ролевой доступ к S3

Scale plan feature

S3 Role-Based Access is available in the Scale and Enterprise plans. To upgrade, visit the plans page in the cloud console.

Движок таблицы s3Queue поддерживает ролевую модель доступа. См. документацию здесь с описанием шагов по настройке роли для доступа к вашему бакету.

После настройки роли значение roleARN можно передать через параметр extra_credentials, как показано ниже:

CREATE TABLE s3_table
(
    ts DateTime,
    value UInt64
)
ENGINE = S3Queue(
                'https://<your_bucket>/*.csv',
                extra_credentials(role_arn = 'arn:aws:iam::111111111111:role/<your_role>')
                ,'CSV')
SETTINGS
    ...

Режим S3Queue ordered

Режим обработки S3Queue позволяет хранить меньше метаданных в ZooKeeper, но имеет ограничение: файлы, добавленные позже по времени, должны иметь имена, которые лексикографически (по алфавитно-цифровому порядку) больше предыдущих.

Режим ordered в S3Queue, так же как и unordered, поддерживает настройку (s3queue_)processing_threads_num (префикс s3queue_ является необязательным), которая позволяет управлять количеством потоков, обрабатывающих файлы S3 локально на сервере. Кроме того, режим ordered вводит ещё одну настройку под названием (s3queue_)buckets, которая обозначает «логические потоки». В распределённом сценарии, когда есть несколько серверов с репликами таблицы S3Queue, эта настройка определяет количество единиц обработки. Например, каждый поток обработки на каждой реплике S3Queue будет пытаться захватить определённый bucket для обработки, при этом каждый bucket соответствует определённым файлам по хэшу имени файла. Поэтому в распределённом сценарии настоятельно рекомендуется, чтобы значение настройки (s3queue_)buckets было как минимум равно количеству реплик или больше. Допустимо, чтобы число buckets было больше числа реплик. Оптимальный сценарий — когда значение настройки (s3queue_)buckets равно произведению number_of_replicas и (s3queue_)processing_threads_num. Настройку (s3queue_)processing_threads_num не рекомендуется использовать до версии 24.6. Настройка (s3queue_)buckets доступна, начиная с версии 24.6.

SELECT из движка таблиц S3Queue

Запросы SELECT по умолчанию запрещены для таблиц S3Queue. Это соответствует общему шаблону работы очереди, когда данные читаются один раз, а затем удаляются из очереди. SELECT запрещён, чтобы предотвратить случайную потерю данных. Однако иногда это может быть полезно. Для этого необходимо установить настройку stream_like_engine_allow_direct_select в True. Движок S3Queue имеет специальную настройку для запросов SELECT: commit_on_select. Установите её в False, чтобы сохранять данные в очереди после чтения, или в True, чтобы удалять их.

Описание

SELECT мало полезен для потокового импорта (кроме отладки), потому что каждый файл можно импортировать только один раз. Более практично создавать потоки в реальном времени с помощью materialized views. Для этого:

  1. Используйте этот движок для создания таблицы, которая будет читать данные из указанного пути в S3 и рассматривать её как поток данных.
  2. Создайте таблицу с требуемой структурой.
  3. Создайте materialized view, которое преобразует данные из этого движка и помещает их в ранее созданную таблицу.

Когда MATERIALIZED VIEW подключено к этому движку, оно начинает собирать данные в фоновом режиме.

Пример:

  CREATE TABLE s3queue_engine_table (name String, value UInt32)
    ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
    SETTINGS
        mode = 'unordered';

  CREATE TABLE stats (name String, value UInt32)
    ENGINE = MergeTree() ORDER BY name;

  CREATE MATERIALIZED VIEW consumer TO stats
    AS SELECT name, value FROM s3queue_engine_table;

  SELECT * FROM stats ORDER BY name;

Виртуальные столбцы

  • _path — путь к файлу.
  • _file — имя файла.
  • _size — размер файла.
  • _time — время создания файла.

Дополнительную информацию о виртуальных столбцах см. здесь.

Подстановочные символы в path

Аргумент path может задавать несколько файлов, используя подстановочные шаблоны в стиле bash. Чтобы файл был обработан, он должен существовать и полностью соответствовать шаблону пути. Перечень файлов определяется во время выполнения SELECT (а не в момент CREATE).

  • * — Заменяет любое количество любых символов, кроме /, включая пустую строку.
  • ** — Заменяет любое количество любых символов, включая /, включая пустую строку.
  • ? — Заменяет ровно один произвольный символ.
  • {some_string,another_string,yet_another_one} — Заменяет любую из строк 'some_string', 'another_string', 'yet_another_one'.
  • {N..M} — Заменяет любое число в диапазоне от N до M включительно. N и M могут содержать ведущие нули, например 000..078.

Конструкции с {} аналогичны табличной функции remote.

Ограничения

  1. Дубликаты строк могут возникать в результате:
  • во время парсинга происходит исключение в середине обработки файла, и включены повторные попытки через s3queue_loading_retries;

  • S3Queue настроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и сессия Keeper завершается до того, как один из серверов успел зафиксировать обработанный файл, что может привести к тому, что другой сервер возьмет в обработку файл, который уже мог быть частично или полностью обработан первым сервером; однако это не актуально, начиная с версии 25.8, если use_persistent_processing_nodes = 1.

  • аварийного завершения работы сервера.

  1. Если S3Queue настроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и используется режим Ordered, то s3queue_loading_retries не будет работать. Это будет скоро исправлено.

Интроспекция

Для интроспекции используйте неперсистентную таблицу system.s3queue и персистентную таблицу system.s3queue_log.

  1. system.s3queue. Эта таблица неперсистентная и отображает состояние S3Queue в памяти: какие файлы в данный момент обрабатываются, какие файлы уже обработаны или завершились с ошибкой.
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue
(
    `database` String,
    `table` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` String,
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64)
    `exception` String
)
ENGINE = SystemS3Queue
COMMENT 'Contains in-memory state of S3Queue metadata and currently processed rows per file.' │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Пример:


SELECT *
FROM system.s3queue

Row 1:
──────
zookeeper_path:        /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name:             wikistat/original/pageviews-20150501-030000.gz
rows_processed:        5068534
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:31
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
  1. system.s3queue_log. Персистентная таблица. Содержит ту же информацию, что и system.s3queue, но для файлов со статусами processed и failed.

Таблица имеет следующую структуру:

SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_log
(
    `event_date` Date,
    `event_time` DateTime,
    `table_uuid` String,
    `file_name` String,
    `rows_processed` UInt64,
    `status` Enum8('Processed' = 0, 'Failed' = 1),
    `processing_start_time` Nullable(DateTime),
    `processing_end_time` Nullable(DateTime),
    `ProfileEvents` Map(String, UInt64),
    `exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Чтобы использовать system.s3queue_log, укажите его конфигурацию в конфигурационном файле сервера:

    <s3queue_log>
        <database>system</database>
        <table>s3queue_log</table>
    </s3queue_log>

Пример:

SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date:            2023-10-13
event_time:            2023-10-13 13:10:12
table_uuid:
file_name:             wikistat/original/pageviews-20150501-020000.gz
rows_processed:        5112621
status:                Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time:   2023-10-13 13:10:12
ProfileEvents:         {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception: