Skip to main content

带 SQS 通知的 S3

在这篇教程中,我们介绍如何设置 Quickwit 以从 S3 发送数据,其中桶通知事件通过 SQS 流式传输。我们首先使用 Terraform 创建 AWS 资源(S3 桶、SQS 队列、通知)。然后配置 Quickwit 索引和文件源。最后,我们将一些数据发送到源桶并验证其是否被正确索引。

AWS resources(AWS 资源)

完整的 Terraform 脚本可以从 这里 下载。

首先,创建接收源数据文件(NDJSON 格式)的桶:

resource "aws_s3_bucket" "file_source" {
bucket_prefix = "qw-tuto-source-bucket"
}

然后设置 SQS 队列,当文件添加到桶时,队列将承载通知。队列配置了一个策略,允许源桶向其写入 S3 通知消息。同时创建一个死信队列 (DLQ),用于接收文件源无法处理的消息(例如损坏的文件)。消息在经过 5 次索引尝试后会被移动到 DLQ。

locals {
sqs_notification_queue_name = "qw-tuto-s3-event-notifications"
}

data "aws_iam_policy_document" "sqs_notification" {
statement {
effect = "Allow"

principals {
type = "*"
identifiers = ["*"]
}

actions = ["sqs:SendMessage"]
resources = ["arn:aws:sqs:*:*:${local.sqs_notification_queue_name}"]

condition {
test = "ArnEquals"
variable = "aws:SourceArn"
values = [aws_s3_bucket.file_source.arn]
}
}
}

resource "aws_sqs_queue" "s3_events_deadletter" {
name = "${locals.sqs_notification_queue_name}-deadletter"
}

resource "aws_sqs_queue" "s3_events" {
name = local.sqs_notification_queue_name
policy = data.aws_iam_policy_document.sqs_notification.json

redrive_policy = jsonencode({
deadLetterTargetArn = aws_sqs_queue.s3_events_deadletter.arn
maxReceiveCount = 5
})
}

resource "aws_sqs_queue_redrive_allow_policy" "s3_events_deadletter" {
queue_url = aws_sqs_queue.s3_events_deadletter.id

redrive_allow_policy = jsonencode({
redrivePermission = "byQueue",
sourceQueueArns = [aws_sqs_queue.s3_events.arn]
})
}

配置桶通知,每当源桶中创建新文件时,都会向 SQS 写入消息:

resource "aws_s3_bucket_notification" "bucket_notification" {
bucket = aws_s3_bucket.file_source.id

queue {
queue_arn = aws_sqs_queue.s3_events.arn
events = ["s3:ObjectCreated:*"]
}
}
note

只支持 s3:ObjectCreated:* 类型的事件。 其他类型(例如 ObjectRemoved)会被确认,并且会记录警告日志。

源需要能够访问通知队列和源桶。以下策略文档包含了源所需的最小权限:

data "aws_iam_policy_document" "quickwit_node" {
statement {
effect = "Allow"
actions = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueAttributes",
]
resources = [aws_sqs_queue.s3_events.arn]
}
statement {
effect = "Allow"
actions = ["s3:GetObject"]
resources = ["${aws_s3_bucket.file_source.arn}/*"]
}
}

创建 IAM 用户和凭证,以便将其与本地 Quickwit 实例关联:

resource "aws_iam_user" "quickwit_node" {
name = "quickwit-filesource-tutorial"
path = "/system/"
}

resource "aws_iam_user_policy" "quickwit_node" {
name = "quickwit-filesource-tutorial"
user = aws_iam_user.quickwit_node.name
policy = data.aws_iam_policy_document.quickwit_node.json
}

resource "aws_iam_access_key" "quickwit_node" {
user = aws_iam_user.quickwit_node.name
}
warning

我们不建议在生产环境中使用 IAM 用户凭证运行 Quickwit 节点。 这只是为了简化教程设置。在 EC2/ECS 上运行时,应将策略文档附加到 IAM 角色上。

下载 完整的 Terraform 脚本,并使用 terraform initterraform apply 部署它。成功执行后,将列出配置 Quickwit 所需的输出。您可以使用以下命令显示敏感输出(密钥 ID 和密钥)的值:

terraform output quickwit_node_access_key_id
terraform output quickwit_node_secret_access_key

Run Quickwit(运行 Quickwit)

本地安装 Quickwit,然后在安装目录中,使用必要的访问权限运行 Quickwit,将 <quickwit_node_access_key_id><quickwit_node_secret_access_key> 替换为匹配的 Terraform 输出值:

AWS_ACCESS_KEY_ID=<quickwit_node_access_key_id> \
AWS_SECRET_ACCESS_KEY=<quickwit_node_secret_access_key> \
AWS_REGION=us-east-1 \
./quickwit run

Configure the index and the source(配置索引和源)

在另一个终端中,在 Quickwit 安装目录中,创建一个索引:

cat << EOF > tutorial-sqs-file-index.yaml
version: 0.7
index_id: tutorial-sqs-file
doc_mapping:
mode: dynamic
indexing_settings:
commit_timeout_secs: 30
EOF

./quickwit index create --index-config tutorial-sqs-file-index.yaml

<notification_queue_url> 替换为相应的 Terraform 输出值,为该索引创建一个文件源:

cat << EOF > tutorial-sqs-file-source.yaml
version: 0.8
source_id: sqs-filesource
source_type: file
num_pipelines: 2
params:
notifications:
- type: sqs
queue_url: <notification_queue_url>
message_type: s3_notification
EOF

./quickwit source create --index tutorial-sqs-file --source-config tutorial-sqs-file-source.yaml
tip

num_pipeline 配置控制了多少个消费者将并行地从队列中轮询。根据您想要为此源分配的索引器计算资源选择数字。一般而言,每 2 个核心配置 1 个管道。

Ingest data(摄入数据)

我们现在可以通过将文件上传到 S3 来向 Quickwit 发送数据。如果您已安装 AWS CLI,运行以下命令,将 <source_bucket_name> 替换为关联的 Terraform 输出:

curl https://quickwit-datasets-public.s3.amazonaws.com/hdfs-logs-multitenants-10000.json | \
aws s3 cp - s3://<source_bucket_name>/hdfs-logs-multitenants-10000.json

如果您不想使用 AWS CLI,您也可以下载文件并通过 AWS 控制台手动将其上传到源桶。

等待大约 1 分钟,数据应该会出现在索引中:

./quickwit index describe --index tutorial-sqs-file

Tear down the resources(清理资源)

这篇教程中实例化的 AWS 资源不会产生固定成本,但我们仍然建议您完成后删除它们。在包含 Terraform 脚本的目录中,运行 terraform destroy