Skip to main content

Kinesis

在这篇教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kinesis 发送数据。首先,我们将创建一个索引并配置 Kinesis 源。然后,我们将创建一个 Kinesis 流并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。

caution

在这篇教程中使用 Amazon Kinesis 服务会产生一些费用。

Prerequisites(先决条件)

要完成这篇教程,您需要以下条件:

note

jq 用于重塑事件成为可通过 Amazon Kinesis API 发送的记录。

Create index(创建索引)

首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:

index-config.yaml
#
# Index config file for gh-archive dataset.
#
version: 0.7

index_id: gh-archive

doc_mapping:
field_mappings:
- name: id
type: text
tokenizer: raw
- name: type
type: text
fast: true
tokenizer: raw
- name: public
type: bool
fast: true
- name: payload
type: json
tokenizer: default
- name: org
type: json
tokenizer: default
- name: repo
type: json
tokenizer: default
- name: actor
type: json
tokenizer: default
- name: other
type: json
tokenizer: default
- name: created_at
type: datetime
fast: true
input_formats:
- rfc3339
fast_precision: seconds
timestamp_field: created_at

indexing_settings:
commit_timeout_secs: 10

执行这些 Bash 命令来下载索引配置并创建 gh-archive 索引。

# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml

# Create index.
./quickwit index create --index-config gh-archive.yaml

创建并填充 Kinesis 流

现在,我们创建一个 Kinesis 流并将一些事件加载到其中。

tip

这一步可能相当慢,具体取决于可用带宽。当前命令通过仅取从 GH Archive 下载的每个文件的前 10,000 行来限制要发送的数据量。如果您有足够的带宽,可以移除它来发送整套文件。您也可以通过增加分片的数量和/或 parallel 启动的任务数量 (-j 选项) 来加快速度。

# Create a stream named `gh-archive` with 3 shards.
aws kinesis create-stream --stream-name gh-archive --shard-count 8

# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..12}.json.gz

# Load the events into Kinesis stream
gunzip -c 2022-05-12*.json.gz | \
head -n 10000 | \
parallel --gnu -j8 -N 500 --pipe \
'jq --slurp -c "{\"Records\": [.[] | {\"Data\": (. | tostring), \"PartitionKey\": .id }], \"StreamName\": \"gh-archive\"}" > records-{%}.json && \
aws kinesis put-records --cli-input-json file://records-{%}.json --cli-binary-format raw-in-base64-out >> out.log'

Create Kinesis source(创建 Kinesis 源)

kinesis-source.yaml
#
# Kinesis source config file.
#
version: 0.7
source_id: kinesis-source
source_type: kinesis
params:
stream_name: gh-archive

运行这些命令来下载源配置文件并创建源。

# Download Kinesis source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kinesis-source.yaml

# Create source.
./quickwit source create --index gh-archive --source-config kinesis-source.yaml
note

如果此命令出现以下错误消息而失败:

Command failed: Stream gh-archive under account XXXXXXXXX not found.

Caused by:
0: Stream gh-archive under account XXXXXXXX not found.
1: Stream gh-archive under account XXXXXXXX not found.

这意味着 Kinesis 流在前一步骤中未正确创建。

Launch indexing and search services(启动索引和搜索服务)

最后,执行此命令以服务器模式启动 Quickwit。

# Launch Quickwit services.
./quickwit run

在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kinesis 流,并开始从组成流的分片流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。

您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:

# Display some general information about the index.
./quickwit index describe --index gh-archive

也可以通过 Quickwit 用户界面 获取索引信息。

一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:

curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'

也可以通过 用户界面 访问这些结果。

我们还可以按类型对这些事件进行分组并计数它们:

curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
"query":"org.login:kubernetes AND repo.name:kubernetes",
"max_hits":0,
"aggs":{
"count_by_event_type":{
"terms":{
"field":"type"
}
}
}
}'

Tear down resources (清理资源:可选)

让我们删除为这篇教程创建的文件和资源。

# Delete Kinesis stream.
aws kinesis delete-stream --stream-name gh-archive

# Delete index.
./quickwit index delete --index gh-archive

# Delete source config.
rm kinesis-source.yaml

至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。