Kinesis
在这篇教程中,我们将介绍如何在几分钟内设置 Quickwit 以从 Kinesis 发送数据。首先,我们将创建一个索引并配置 Kinesis 源。然后,我们将创建一个 Kinesis 流并将一些事件从 GH Archive 加载到其中。最后,我们将执行一些搜索和聚合查询来探索新发送的数据。
在这篇教程中使用 Amazon Kinesis 服务会产生一些费用。
Prerequisites(先决条件)
要完成这篇教程,您需要以下条件:
- AWS CLI 版本 2(参见 开始使用 AWS CLI 了解先决条件和安装)
- 本地安装的 Quickwit 指南
- jq
- GNU parallel
jq
用于重塑事件成为可通过 Amazon Kinesis API 发送的记录。
Create index(创建索引)
首先,我们创建一个新的索引。以下是与 GH Archive 事件模式对应的索引配置和文档映射:
#
# Index config file for gh-archive dataset.
#
version: 0.7
index_id: gh-archive
doc_mapping:
field_mappings:
- name: id
type: text
tokenizer: raw
- name: type
type: text
fast: true
tokenizer: raw
- name: public
type: bool
fast: true
- name: payload
type: json
tokenizer: default
- name: org
type: json
tokenizer: default
- name: repo
type: json
tokenizer: default
- name: actor
type: json
tokenizer: default
- name: other
type: json
tokenizer: default
- name: created_at
type: datetime
fast: true
input_formats:
- rfc3339
fast_precision: seconds
timestamp_field: created_at
indexing_settings:
commit_timeout_secs: 10
执行这些 Bash 命令来下载索引配置并创建 gh-archive
索引。
# Download GH Archive index config.
wget -O gh-archive.yaml https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/index-config.yaml
# Create index.
./quickwit index create --index-config gh-archive.yaml
创建并填充 Kinesis 流
现在,我们创建一个 Kinesis 流并将一些事件加载到其中。
这一步可能相当慢,具体取决于可用带宽。当前命令通过仅取从 GH Archive 下载的每个文件的前 10,000 行来限制要发送的数据量。如果您有足够的带宽,可以移除它来发送整套文件。您也可以通过增加分片的数量和/或 parallel
启动的任务数量 (-j
选项) 来加快速度。
# Create a stream named `gh-archive` with 3 shards.
aws kinesis create-stream --stream-name gh-archive --shard-count 8
# Download a few GH Archive files.
wget https://data.gharchive.org/2022-05-12-{10..12}.json.gz
# Load the events into Kinesis stream
gunzip -c 2022-05-12*.json.gz | \
head -n 10000 | \
parallel --gnu -j8 -N 500 --pipe \
'jq --slurp -c "{\"Records\": [.[] | {\"Data\": (. | tostring), \"PartitionKey\": .id }], \"StreamName\": \"gh-archive\"}" > records-{%}.json && \
aws kinesis put-records --cli-input-json file://records-{%}.json --cli-binary-format raw-in-base64-out >> out.log'
Create Kinesis source(创建 Kinesis 源)
#
# Kinesis source config file.
#
version: 0.7
source_id: kinesis-source
source_type: kinesis
params:
stream_name: gh-archive
运行这些命令来下载源配置文件并创建源。
# Download Kinesis source config.
wget https://raw.githubusercontent.com/quickwit-oss/quickwit/main/config/tutorials/gh-archive/kinesis-source.yaml
# Create source.
./quickwit source create --index gh-archive --source-config kinesis-source.yaml
如果此命令出现以下错误消息而失败:
Command failed: Stream gh-archive under account XXXXXXXXX not found.
Caused by:
0: Stream gh-archive under account XXXXXXXX not found.
1: Stream gh-archive under account XXXXXXXX not found.
这意味着 Kinesis 流在前一步骤中未正确创建。
Launch indexing and search services(启动索引和搜索服务)
最后,执行此命令以服务器模式启动 Quickwit。
# Launch Quickwit services.
./quickwit run
在幕后,这个命令会启动一个索引器和一个搜索器。启动时,索引器将连接到由源指定的 Kinesis 流,并开始从组成流的分片流式处理和索引事件。使用默认的提交超时值(参见 索引设置),索引器应在大约 60 秒后发布第一个分片。
您可以在另一个 shell 中运行此命令来检查索引的属性并查看当前发布的分片数量:
# Display some general information about the index.
./quickwit index describe --index gh-archive
也可以通过 Quickwit 用户界面 获取索引信息。
一旦发布了第一个分片,您就可以开始运行搜索查询。例如,我们可以找到所有关于 Kubernetes 仓库的事件:
curl 'http://localhost:7280/api/v1/gh-archive/search?query=org.login:kubernetes%20AND%20repo.name:kubernetes'
也可以通过 用户界面 访问这些结果。
我们还可以按类型对这些事件进行分组并计数它们:
curl -XPOST -H 'Content-Type: application/json' 'http://localhost:7280/api/v1/gh-archive/search' -d '
{
"query":"org.login:kubernetes AND repo.name:kubernetes",
"max_hits":0,
"aggs":{
"count_by_event_type":{
"terms":{
"field":"type"
}
}
}
}'
Tear down resources (清理资源:可选)
让我们删除为这篇教程创建的文件和资源。
# Delete Kinesis stream.
aws kinesis delete-stream --stream-name gh-archive
# Delete index.
./quickwit index delete --index gh-archive
# Delete source config.
rm kinesis-source.yaml
至此完成了教程。如果您有关于 Quickwit 的任何问题或遇到任何问题,请不要犹豫,在 GitHub 上提出 问题 或打开 问题报告,或者直接在 Discord 上联系我们。