将 Lambda 函数与 Amazon Kinesis 结合使用
AWS Kinesis 服务用于捕获/存储来自网站点击、日志、社交媒体源的实时跟踪数据。我们可以触发 AWS Lambda 对这些日志执行额外的处理。
必备条件
开始使用 Kinesis 和 AWS Lambda 的基本要求如下所示 −
- 创建具有所需权限的角色
- 在 Kinesis 中创建数据流
- 创建 AWS Lambda 函数。
- 向 AWS Lambda 添加代码
- 向 Kinesis 数据流添加数据
示例
让我们来研究一个示例,其中我们将触发 AWS Lambda 来处理来自 Kinesis 的数据流并使用收到的数据发送邮件。
下面显示了一个用于解释该过程的简单框图 −

创建具有所需权限的角色
转到 AWS 控制台并创建角色。

在 Kinesis 中创建数据流
转到 AWS 控制台并在 kinesis 中创建数据流。

如图所示,有 4 个选项。我们将在此示例中创建数据流。

单击创建数据流。在下面给出的 Kinesis 流名称中输入名称。

输入数据流的分片数量。

分片的详细信息如下所示 −

输入名称并单击底部的创建 Kinesis 流按钮。

请注意,流需要一定的时间才能完成处于活动状态。
创建 AWS Lambda 函数
转到 AWS 控制台并单击 Lambda。创建 AWS Lambda 函数,如下所示 −

单击屏幕末尾的创建函数按钮。将 Kinesis 作为触发器添加到 AWS Lambda。

将配置详细信息添加到 Kinesis 触发器 −

添加触发器,现在将代码添加到 AWS Lambda。
将代码添加到 AWS Lambda
为此,我们将使用 nodejs 作为运行时。一旦使用 kinesis 数据流触发 AWS Lambda,我们就会发送邮件。
const aws = require("aws-sdk"); var ses = new aws.SES({ region: 'us-east-1' }); exports.handler = function(event, context, callback) { let payload = ""; event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here payload = new Buffer(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); var eParams = { Destination: { ToAddresses: ["xxxxxxx@gmail.com"] }, Message: { Body: { Text: { Data:payload } }, Subject: { Data: "Kinesis data stream" } }, Source: "cxxxxxxxxx@gmail.com" }; var email = ses.sendEmail(eParams, function(err, data) { if (err) console.log(err); else { console.log("===EMAIL SENT==="); console.log("EMAIL CODE END"); console.log('EMAIL: ', email); context.succeed(event); callback(null, "email is send"); } }); };
事件参数包含 Kinesis 数据流中输入的数据。一旦数据输入 Kinesis 数据流,上述 aws lambda 代码就会被激活。
将数据添加到 Kinesis 数据流
在这里我们将使用 AWS CLI 添加数据 Kinesis 数据流,如下所示。为此,我们可以使用以下命令 −
aws kinesis put-record --stream-name kinesisdemo --data "hello world" -- partition-key "789675"

然后,AWS Lambda 被激活并发送邮件。


