阿里云消息队列使用流程

创建主题

  1. 注册并登录阿里云平台,具体操作请参考注册和登录。
  2. 登录成功后,选择“产品服务 -> 阿里消息队列”,点击“主题”后进入主题列表页。
  3. 点击“创建主题”,设置主题名称和分区个数:

    • 若输入主题名称是“demo”,则系统会创建一个主题:<accountID>__demo。
    • 分区数目决定了本主题可以处理的消息通量(throughput),请根据业务需求选择。
  4. 点击“确定”即创建了阿里消息队列主题,可直接使用。

  5. 可点击“查看证书”以查看/编辑该主题的证书权限。

  6. 证书权限中显示“特权证书”、“普通证书”和“他人的证书”对该主题的权限。

特权证书:权限适用于所有主题的自有证书;

普通证书:权限适用于指定主题的自有证书;

他人的证书:用户可通过添加他人的证书,将自己主题的读/写权限授权给他人帐号下,授权后他人将有权限读/写用户的主题。

注意,当前阿里消息队列的数据存储时间默认为24小时,过期后自动删除,若有特殊需求,请提工单联系我们。

创建Consumer Group

Consumer Group是Kafka提供的可扩展且具有容错性的消费者机制。通过console创建的Consumer Group享有完善的安全保护机制,强烈推荐用户采用这种方式使用消费者组功能。

  1. 注册并登录阿里云平台,具体操作请参考注册和登录。
  2. 登录成功后,选择“产品服务>阿里消息队列”,点击“Consumer Group管理”后进入列表页。
  3. (可选)选择区域,请根据实际需求切换选择。
  4. 点击“创建Consumer Group”,输入Consumer Group名称。

    注:通过console创建的Consumer Group会在名称前自动加上Account ID前缀,用户使用时需输入完整的Consumer Group名称。

  5. Consumer Group的使用权限与证书绑定。可点击“查看证书”以查看/编辑该Consumer Group的证书权限。

     

  6. 证书权限中显示“特权证书”、“普通证书”和“他人的证书”对该Consumer Group的权限。用户使用Consumer Group时,会对所使用的证书进行鉴权,验证是否有该Consumer Group的使用权限。

    注:对于未通过console创建的Consumer Group不进行权限控制,但是可能存在数据安全问题,强烈建议用户通过console创建并进行使用。

下载证书

Kafka客户端在连接阿里消息队列之前需要提供相应的证书来用以鉴权授权。请至阿里云平台的“产品服务>阿里消息队列-证书”页下载用于认证客户和云服务的身份的证书:kafka-key.zip。

说明:

  • 请妥善保存证书,该证书用于建立SSL连接,同时用于认证客户端的AccountId。一旦丢失或泄漏,请至阿里云平台重新生成。
  • 若使用香港的服务,请先提工单申请开通香港区域,通过后至香港区域下载对应的证书。
  • 各个区域的证书相互独立,不能够跨区域使用。
  • 广州和北京的证书,因历史原因可跨区域使用。

证书管理

证书列表中可查看证书的相关信息:名称、序列号、类型、密钥、创建时间;可选择“重新生成”更新证书,选择“删除证书”删除该证书;选择“查看主题”查看/编辑该证书对主题的权限,选择“查看Consumer Group”查看/编辑该证书对Consumer Group的权限。

连接服务

您可直接使用阿里云提供的样例代码连接阿里消息队列,也可从Kafka官网下载Kafka二进制包并逐步配置相关信息后连接阿里消息队列。

通过样例代码连接服务

  1. 下载代码样例。请登录控制台,打开“产品服务>阿里消息队列-证书”页,点击页面右上角“代码样例”,下载至本地,下载后解压。
  2. 阿里消息队列提供的代码样例是Maven项目包。
  3. 运行样例代码。cd至代码目录“sample-with-kafka-key”下,执行命令run.bat <accountID>_demo,执行完毕则建立了与阿里消息队列的连接。

此外,我们提供Kafka服务的Python和Golang两种语言代码样例,已上传至GitHub中的Python样例和Golang样例,您可通过GitHub克隆代码至本地设计自己的程序。

逐步配置信息连接服务

  1. 从Kafka官网下载Kafka二进制包:http://kafka.apache.org/downloads.html,目前已支持了0.10版本。
  2. 通过Java访问Kafka服务的用户需使用Java客户端的pom依赖:kafka-client/0.10;我们推荐用户使用JDK 7 或JDK 8 版本,最低配置须为Java 1.7.0-b147。
  3. 配置client.properties。请至阿里云平台的阿里“产品服务>阿里消息队列-证书”页下载用于认证客户和云服务的身份的证书:kafka-key.zip,请将zip包解压到kafka二进制包的根目录下,client.properties,client.keystore.jks,client.truststore.jks都需在同级目录中。解压后打开client.properties,该文件是UTF8编码的文本文件,显示如下内容:

    security.protocol=SSL ssl.truststore.location=client.truststore.jks ssl.truststore.password=<your certificate password> ssl.keystore.location=client.keystore.jks ssl.keystore.password=<your certificate password>

  4. 使用客户端代码中的脚本与阿里消息队列通讯,连接通过SSL加密,保证数据传输无法被监听与篡改。以使用北京区域的阿里消息队列为例,具体操作如下(操作前请确保client.properties,client.keystore.jks,client.truststore.jks都在kafka二进制包的根目录下):

    1. 启动消费者。打开命令行工具并输入以下命令:

      sh bin/kafka-console-consumer.sh --topic <accountID>__demo --bootstrap-server kafka.bj.baidubce.com:9091 --consumer.config client.properties --from-beginning --new-consumer`

      • 针对windows,启动命令如下:

        binwindowskafka-console-consumer.bat --topic <accountID>__demo --bootstrap-server kafka.bj.baidubce.com:9091 --consumer.config client.properties --from-beginning --new-consumer`

    2. 发布消息。打开一个新的命令行工具并输入以下命令:

      sh bin/kafka-console-producer.sh --producer.config client.properties --topic <accountID>__demo --sync --broker-list kafka.bj.baidubce.com:9091

    • 针对windows,发布命令如下:

      binwindowskafka-console-producer.bat --producer.config client.properties --topic <accountID>__demo --broker-list kafka.bj.baidubce.com:9091

    1. 消息发送以后,消费者命令行中返回消息已经正确接收到信息。