Skip to main content

源配置

Quickwit 可以从一个或多个来源将数据插入到索引中。 可以在创建索引后使用 CLI 命令 quickwit source create 添加来源。 也可以使用 quickwit source enable/disable 子命令启用或禁用来源。

来源是通过一个称为来源配置的对象声明的,该对象定义了来源的设置。它由多个参数组成:

  • 来源 ID
  • 来源类型
  • 来源参数
  • 输入格式
  • 每个索引器的最大管道数(可选)
  • 期望的管道数(可选)
  • 转换参数(可选)

Source ID

来源 ID 是一个字符串,用于在索引内唯一标识来源。它只能包含大写或小写的 ASCII 字母、数字、连字符 (-) 和下划线 (_)。最后,它必须以字母开头,并且至少包含 3 个字符,但不超过 255 个。

Source type

来源类型指定了正在配置的来源种类。截至版本 0.5,可用的来源类型有 ingest-apikafkakinesispulsarfile 类型也受支持,但仅用于从 CLI 进行本地摄入。

Source parameters

来源参数指示如何连接到数据存储,并且特定于来源类型。

File source

文件来源从包含由新行分隔的 JSON 对象(NDJSON)的文件中读取数据。如果文件名以 .gz 后缀结尾,则支持 Gzip 压缩。

Ingest a single file (CLI only)

要摄入特定文件,请直接在临时 CLI 进程中运行索引:

./quickwit tool local-ingest --index <index> --input-path <input-path>

本地文件和对象文件都受支持,前提是环境已使用适当的权限进行配置。有一个教程可供参考 这里

基于通知的文件摄入 (Beta)

Quickwit 可以自动摄入所有上传到 S3 存储桶的新文件。这需要创建并配置一个 SQS 通知队列。一个完整的示例可以在 这个教程 中找到。

notifications 参数接受一个通知设置数组。目前每个来源可以配置一个通知器,并且仅支持 SQS 通知 type

SQS notifications 参数项所需的字段:

  • type: sqs
  • queue_url: SQS 队列的完整 URL(例如 https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
  • message_type: 消息负载的格式,可以是
    • s3_notification: 一个 S3 事件通知
    • raw_uri: 包含文件对象 URI 的消息(例如 s3://mybucket/mykey

使用 CLI 向索引添加带有 SQS 通知的文件来源

cat << EOF > source-config.yaml
version: 0.8
source_id: my-sqs-file-source
source_type: file
num_pipelines: 2
params:
notifications:
- type: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
message_type: s3_notification
EOF
./quickwit source create --index my-index --source-config source-config.yaml
note
  • Quickwit 在成功摄入后不会自动删除来源文件。您可以使用 S3 对象过期 来配置它们应在存储桶中保留多久。
  • 配置通知仅转发类型为 s3:ObjectCreated:* 的事件。其他事件会被来源确认,但不再进一步处理,并记录警告。
  • 我们强烈建议使用 死信队列 来接收所有无法被文件来源处理的消息。maxReceiveCount 的值为 5 是一个不错的默认值。以下是一些常见情况,其中通知消息最终会进入死信队列:
    • 通知消息无法解析(例如,它不是一个有效的 S3 通知)
    • 文件未找到
    • 文件损坏(例如,意外压缩)

Ingest API source

Ingest API 来源从 Ingest API 读取数据。此来源会在创建索引时自动生成,不能删除或禁用。

Kafka source

Kafka 来源从 Kafka 流中读取数据。流中的每条消息必须包含一个 JSON 对象。

有一个教程可供参考 这里

Kafka source parameters

Kafka 来源使用客户端库 librdkafka 消费 topic,并将 client_params 参数携带的键值对转发给底层的 librdkafka 消费者。常见的 client_params 选项包括引导服务器 (bootstrap.servers) 或安全协议 (security.protocol)。请参阅 Kafkalibrdkafka 文档页面获取更高级的选项。

属性描述默认值
topic要消费的主题名称。必需
client_log_levellibrdkafka 客户端日志级别。可能的值是:debug、info、warn、error。info
client_paramslibrdkafka 客户端配置参数。{}
enable_backfill_mode回填模式在到达主题末尾后停止来源。false

Kafka 客户端参数

  • bootstrap.servers 逗号分隔的主机和端口对列表,这些是 Kafka 集群中 Kafka 经纪人的地址子集。

  • auto.offset.reset 定义了来源在消费一个分区时的行为,该分区在检查点中没有保存初始偏移量。earliest 从分区开始消费,而 latest(默认)从分区末尾消费。

  • enable.auto.commit 此设置被忽略,因为 Kafka 来源使用 检查点 API 内部管理提交偏移量,并强制禁用自动提交。

  • group.id 基于 Kafka 的分布式索引依赖于消费者组。除非在客户端参数中覆盖,默认分配给来源管理的每个消费者的组 ID 是 quickwit-{index_uid}-{source_id}

  • max.poll.interval.ms 短的最大轮询间隔持续时间可能会导致来源在索引器出现反压时崩溃。因此,Quickwit 建议使用默认值 300000(5 分钟)。

使用 CLI 向索引添加 Kafka 来源

cat << EOF > source-config.yaml
version: 0.8
source_id: my-kafka-source
source_type: kafka
num_pipelines: 2
params:
topic: my-topic
client_params:
bootstrap.servers: localhost:9092
security.protocol: SSL
EOF
./quickwit source create --index my-index --source-config source-config.yaml

Kinesis source

Kinesis 来源从 Amazon Kinesis 流中读取数据。流中的每条消息必须包含一个 JSON 对象。

有一个教程可供参考 这里

Kinesis source parameters

Kinesis 来源通过 stream_nameregion 消费一个流。

属性描述默认值
stream_name要消费的流名称。必需
region流所在的 AWS 区域。与 endpoint 互斥。us-east-1
endpoint用于与 AWS 兼容的 Kinesis 服务一起使用的自定义端点。与 region 互斥。可选

如果没有指定区域,Quickwit 将尝试在多个其他位置查找区域,并按照以下优先顺序:

  1. 环境变量 (AWS_REGION 然后是 AWS_DEFAULT_REGION)

  2. 配置文件,通常位于 ~/.aws/config 或者如果设置了 AWS_CONFIG_FILE 环境变量且不为空,则按其指定的位置。

  3. Amazon EC2 实例元数据服务确定当前运行的 Amazon EC2 实例所在的区域。

  4. 默认值:us-east-1

使用 CLI 向索引添加 Kinesis 来源

cat << EOF > source-config.yaml
version: 0.7
source_id: my-kinesis-source
source_type: kinesis
params:
stream_name: my-stream
EOF
quickwit source create --index my-index --source-config source-config.yaml

Pulsar source

Pulsar 来源从一个或多个 Pulsar 主题读取数据。每个主题中的消息必须包含一个 JSON 对象。

有一个教程可供参考 这里

Pulsar source parameters

Pulsar 来源使用客户端库 pulsar-rs 消费 topics

属性描述默认值
topics要消费的主题列表。必需
addressPulsar URL(pulsar:// 和 pulsar+ssl://)。必需
consumer_name要与 Pulsar 来源注册的消费者名称。quickwit

使用 CLI 向索引添加 Pulsar 来源

cat << EOF > source-config.yaml
version: 0.7
source_id: my-pulsar-source
source_type: pulsar
params:
topics:
- my-topic
address: pulsar://localhost:6650
EOF
./quickwit source create --index my-index --source-config source-config.yaml

Number of pipelines

num_pipelines 参数仅适用于像 Kafka、GCP PubSub 和 Pulsar 这样的分布式来源。

它定义了要在集群上为来源运行的管道数量。这些管道在不同索引器上的实际放置将由控制平面决定。

info

请注意,对于像 Kafka 这样的分区来源,通过将不同的分区分配给不同的管道来分布索引负载。因此,重要的是确保分区的数量是 num_pipelines 的倍数。

此外,假设您只在 Quickwit 集群中索引单个 Kafka 来源,您应该将管道数量设置为索引器数量的倍数。最后,如果您的索引吞吐量很高,您应该为每个管道配置 2 到 4 个 vCPU。

例如,假设您想要索引一个 60 个分区的主题,每个分区接收 10 MB/s 的吞吐量。如果您测量到 Quickwit 可以以每管道 40 MB/s 的速度索引您的数据,那么可能的设置可以是:

  • 5 个索引器,每个有 8 个 vCPU
  • 15 个管道

这样,每个索引器将负责 3 个管道,每个管道将覆盖 4 个分区。

Transform parameters

除了 ingest-api 类型之外的所有来源类型,在索引之前可以使用 Vector Remap Language (VRL) 脚本转换摄取的文档。

属性描述默认值
script执行以转换文档的 VRL 程序的源代码。必需
timezoneVRL 程序中用于日期和时间操作的时区。它必须是 TZ 数据库 中的有效名称。UTC
# Your source config here
# ...
transform:
script: |
.message = downcase(string!(.message))
.timestamp = now()
del(.username)
timezone: local

Input format

input_format 参数指定了来源预期的数据格式。目前支持两种格式:

  • json: JSON,这是默认格式
  • plain_text: 非结构化文本文档

内部而言,Quickwit 只能索引 JSON 数据。为了允许摄取纯文本文档,Quickwit 会将它们实时转换成如下形式的 JSON 对象:{"plain_text": "<original plain text document>"}。然后,可以使用 VRL 脚本将它们可选地转换为更复杂的文档。(参见 transform 特性)。

下面是一个如何解析并转换包含用户列表的 CSV 数据集的例子,其中用户由 3 个属性描述:名字、姓氏和年龄。

# Your source config here
# ...
input_format: plain_text
transform:
script: |
user = parse_csv!(.plain_text)
.first_name = user[0]
.last_name = user[1]
.age = to_int!(user[2])
del(.plain_text)

启用/禁用索引中的来源

可以通过 CLI 命令 quickwit source enablequickwit source disable 启用或禁用索引中的来源:

quickwit source disable --index my-index --source my-source

来源默认是启用的。当禁用一个来源时,相关的索引管道将在每个相关索引器上关闭,对该来源的索引也会暂停。

从索引中删除来源

可以通过 CLI 命令 quickwit source delete 从索引中移除来源:

quickwit source delete --index my-index --source my-source

删除来源时,与来源关联的检查点也会被移除。