Skip to main content

Kafka

在这篇教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kafka 发送数据。首先,我们将创建一个索引并配置 Kafka 源。然后,我们将创建一个 Kafka 主题并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

Prerequisites(先决条件)

要完成这篇教程,您需要以下条件:

  • 正在运行的 Kafka 集群(参见 Kafka 快速入门
  • 本地安装的 Quickwit 指南

Create index(创建索引)

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

index-config.yaml
#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
field_mappings:
- name: id
type: text
tokenizer: raw
- name: type
type: text
fast: true
tokenizer: raw
- name: public
type: bool
fast: true
- name: payload
type: json
tokenizer: default
- name: org
type: json
tokenizer: default
- name: repo
type: json
tokenizer: default
- name: actor
type: json
tokenizer: default
- name: other
type: json
tokenizer: default
- name: created_at
type: datetime
fast: true
input_formats:
- rfc3339
fast_precision: seconds
timestamp_field: created_at

indexing_settings:
commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引:

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

Create and populate Kafka topic(创建并填充 Kafka 主题)

现在,我们创建一个 Kafka 主题并将一些事件加载到其中。

# Create a topic named `gh-archive` with 3 partitions.
bin/kafka-topics.sh --create --topic gh-archive --partitions 3 --bootstrap-server localhost:9092

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..15}.json.gz

# Load the events into Kafka topic.
gunzip -c 2022-05-12*.json.gz | \
bin/kafka-console-producer.sh --topic gh-archive --bootstrap-server localhost:9092

Create Kafka source(创建 Kafka 源)

note

这篇教程假设 Kafka 集群在默认端口(9092)上本地可用。 如果情况并非如此,请相应地更新 bootstrap.servers 参数。

kafka-source.yaml
#
# Kafka source config file.
#
version: 0.8
source_id: kafka-source
source_type: kafka
num_pipelines: 2
params:
topic: gh-archive
client_params:
bootstrap.servers: localhost:9092

运行这些命令来下载源配置文件并创建源。

# Download Kafka source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kafka-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kafka-source.yaml
note

如果您遇到以下错误:

Command failed: Topic `gh-archive` has no partitions.

这意味着 Kafka 主题 gh-archive 在前一步骤中未正确创建。

Launch indexing and search services(启动索引和搜索服务)

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kafka 主题,并开始从组成主题的分区流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 Quickwit 用户界面 访问这些结果。

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
"query":"org.login:kubernetes AND repo.name:kubernetes",
"max_hits":0,
"aggs":{
"count_by_event_type":{
"terms":{
"field":"type"
}
}
}
}'

Secured Kafka connection(安全的 Kafka 连接:可选)

Quickwit 的 Kafka 源支持 SSL 和 SASL 身份验证。这对于从外部 Kafka 服务消费数据特别有用。

tip

证书和密钥文件必须存在于所有 Quickwit 节点上,以便创建 Kafka 源并成功运行索引管道。

SSL 配置

version: 0.8
source_id: kafka-source-ssl
source_type: kafka
num_pipelines: 2
params:
topic: gh-archive
client_params:
bootstrap.servers: your-kafka-broker.com
security.protocol: SSL
ssl.ca.location: /path/to/ca.pem
ssl.certificate.location: /path/to/service.cert
ssl.key.location: /path/to/service.key

SASL 配置

version: 0.8
source_id: kafka-source-sasl
source_type: kafka
num_pipelines: 2
params:
topic: gh-archive
client_params:
bootstrap.servers: your-kafka-broker.com
ssl.ca.location: /path/to/ca.pem
security.protocol: SASL_SSL
sasl.mechanisms: SCRAM-SHA-256
sasl.username: your_sasl_username
sasl.password: your_sasl_password
note

如果您遇到以下错误:

Client creation error: ssl.ca.location failed: error:05880002:x509 certificate routines::system lib

通常意味着 CA 证书的路径不正确。请相应地更新 ssl.ca.location 参数。

Tear down resources(清理资源:可选)

让我们删除为这篇教程创建的文件和资源。

# Delete Kafka topic.
bin/kafka-topics.sh --delete --topic gh-archive --bootstrap-server localhost:9092

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kafka-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。