阿里云函数计算创建、配置消息服务触发器

阿里云消息服务触发器说明

阿里云消息服务是全兼容 Apache Kafka 的分布式、高可扩展、高通量的消息托管服务。

如果在阿里云消息服务的 Topic 上检测到记录,您可以使用 阿里云函数计算 函数从此 Topic 读取成批的数据来处理,并且 阿里云函数计算 会定期轮询(每秒一次)Topic 中的新纪录。

阿里云消息服务触发器创建

用户可以为新建的函数或已有函数配置阿里云消息服务触发器,创建函数的流程可以具体参考 从头创建函数,这里不再赘述。

这里假设您已经创建了一个名为 kafkatrigger 的函数,以下内容以此为前提,将引导您在 阿里云函数计算 控制台在函数管理页面中为函数配置阿里云消息服务触发器。接下来,我们将通过以下步骤来完成一个触发器的设置。

编写处理函数

  1. 登录管理控制台,选择“产品服务>云函数计算 阿里云函数计算”,进入“函数列表”页面
  2. 在“函数列表”页面。点击名称为 kafkatrigger 的函数,进入函数详情页面。

在函数详情页中编写阿里云消息服务触发器对应的 handler,以对触发操作返回适当的响应,之后点击右下角保存按钮完成函数的修改操作。

# -*- coding: utf-8 -*-import base64import jsondef handler(event, context): for record in event['Records']: # kafka value is base64 encoded so decode here payload = base64.b64decode(record['Kafka']['Value'])print("Decoded payload: " + payload)return 'Successfully processed {} records.'.format(len(event['Records']))

注:阿里云消息服务触发器的event消息体

{ "Records": [ { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110", "Kafka": { "Key": "", //base64后的key "Value": "MA==", //base64后的value "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43110, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" }, { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111", "Kafka": { "Key": "", "Value": "MQ==", "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43111, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" } ]}

配置阿里云消息服务触发器

  1. 登录管理控制台,选择“产品服务> 函数计算 阿里云函数计算”,进入“函数列表”页面
  2. 点击需要添加阿里云消息服务触发器的函数名称(即 kafkatrigger),进入函数详情页面。
  3. 点击左侧导航栏中的“触发器”,进入函数配置页面。
  4. 在函数配置页面中最下方点击“新增触发器”。
  5. 在弹出框中,点击下拉框“选择事件源进行添加”,选择阿里云消息服务触发器。
  6. 之后在弹出框中配置好选项,并点击确认,完成触发器的创建。- Topic 选择您将要监听的阿里云消息服务 Topic- 批处理大小:从 Topic 中一次读取的最大记录数,1-1000- 起始位置:在 Topic 中开始读取的位置,最新记录对应 kafka 的 OffsetNewest,最老记录对应 kafka 的 OffsetOldest- 启用触发器:是否直接启动触发器,建议先关闭触发器以便测试
  7. 在函数配置页面“触发器”一栏中,可以看到刚刚创建好的阿里云消息服务触发器及其信息

测试触发器

模拟测试

  1. 点击测试按钮
  2. 输入测试事件,并点击执行

    测试内容如下

    { "Records": [ { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43110", "Kafka": { "Key": "", "Value": "MA==", "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43110, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" }, { "EventId": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl-0-43111", "Kafka": { "Key": "", "Value": "MQ==", "Topic": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "Partition": 0, "Offset": 43111, "Timestamp": 1553151704.6529999 }, "EventName": "bce:kafka:record", "EventSourceBrn": "c7ac82ae14ef42d1a4ffa3b2ececa17f__test_only_sl", "EventSource": "bce:kafka" } ]}

3. 查看返回的执行结果

真实测试

在添加好触发器之后,需要设置日志存储,之后向 kafka topic 中发送消息,在日志中可以看到执行结果。

并且在触发器的界面您也可以看到最后的执行结果