阿里云流式计算创建开发作业的流程

创建作业

  1. 选择“阿里云流式计算>作业管理>作业开发”,进入到作业开发页面。

    • 阿里云目前开放多区域支持,如果您需要在多区域新增作业,请参考区域选择说明。
    • 在不同区域创建的作业相互独立。
  2. 点击“新增作业”,显示“新增作业”弹出框,在弹出框中填写作业的基本信息。

    说明:作业类型:目前仅支持“SQL作业”类型,如需了解SQL语法,请参考SQL使用手册。

  3. 点击“确定”进入到编辑作业页面。

编辑作业

为了方便用户快速了解流式作业包括哪些内容,为用户提供示例作业,示例作业的完整SQL语句如下(用户可以直接粘贴到编辑器中,进行作业调试):

CREATE table source_kafka(
    username STRING,
    cost_type STRING,
    cost Float
) with(
    type = 'BKAFKA',
    topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source',
    kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
    sslFilePath = 'bsc_test_ce.zip',
    encode = 'CSV'
);
CREATE TABLE source_mysql(
  username1 STRING,
  cost_type1 STRING  
)WITH(
    type = 'RDS',
    user = 'zhangsan',
    password = 'xxxx',
    accountId = 'xxxx',
    url = 'xxxx',
    dbTable = 'xxx'
);
create table sink_kafka(
    username String,
    cost_type String,
    cost Float
) with(
    type = 'BKAFKA',
    topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink',
    kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
    sslFilePath = 'bsc_test_ce.zip',
    encode = 'CSV'
);
insert into
    sink_kafka
select
    username,
    cost_type,
    cost
from
    source_mysql right join source_kafka on source_mysql.username1 = source_kafka.username
WHERE
      cost > 1500;

接下来对示例作业的各个模块进行详细介绍。

添加kafka作为输入端

  1. 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考创建主题。
  2. 编写kafka作为输入端的DDL语句。

    说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

    CREATE table source_kafka(username STRING,cost_type STRING,cost Float) with(type = 'BKAFKA',topic = '8a04sge0cc6f6458b8aaf0fcd26608b03__bsc_test_source',kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',sslFilePath = 'bsc_test_ce.zip',encode = 'CSV');

With参数的含义:

参数名称 必填 描述
type 输入端接入的数据资源的类型,BKAFKA代表阿里云消息服务
topic BKAFKA中用户自己创建的Topic
kafka.bootstrap.servers BKAFKA服务器地址和端口,地址和端口之间用“:”分隔。

北京区域填写:kafka.bj.alice.com:9091

广州区域填写:kafka.gz.alice.com:9092
sslFilePath 该topic的证书文件,格式为zip,并且需要将通过“高级设置”
将证书文件上传到服务中
encode Kafka中数据的编码格式,支持json和csv格式的消息

添加RDS作为输入端

  1. 如果没有可用的RDS数据表,需要先使用RDS添加数据表。创建数据表的步骤请参考使用流程。
  2. 编写RDS作为输入端的DDL语句。

    说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

    CREATE TABLE source_mysql(username1 STRING,cost_type1 STRING
    )WITH(type = 'RDS',user = 'zhangsan',password = 'xxxx',accountId = 'xxxx',url = 'xxxx',dbTable = 'xxx');

参数名称 必填 描述
type 输入端接入的数据资源的类型,此处填写“RDS”
user RDS的用户名
password RDS用户名对应的密码
accountId RDS用户的用户ID
url 通过jdbc访问rds的url,例如

jdbc:mysql://mysql56.rdsmiusj9oseag6.rds.bj.baidubce.com:
3306/bsc_test?useUnicode=true&characterEncoding=UTF8
dbTable 数据表名称

添加Kafka作为输出端

  1. 如果没有可用的Topic,需要先创建Topic。创建Topic的步骤请参考 创建主题。
  2. 编写kafka作为输出端的DDL语句。

    说明:在调试作业阶段,没有验证参数信息的连通性,如果只是进行测试本产品,不发布作业,无需修改下面的SQL语句,直接粘贴到编辑器中进行调试。

    create table sink_kafka(username String,cost_type String,cost Float) with(type = 'BKAFKA',topic = '8a9003sdc6f6458b8aaf0fcd26608b03__bsc_test_sink',kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',sslFilePath = 'bsc_test_ce.zip',encode = 'CSV');

编写业务处理逻辑的SQL语句

insert into sink_kafkaselect username, cost_type, costfrom source_mysql right join source_kafka on source_mysql.username1 = source_kafka.usernameWHERE cost > 1500;
调试作业

  1. 点击右上角的“调试作业”,弹出上传输入端测试数据的页面。

    • 上传csv文件:csv文件中的字段需要跟输入端数据表中的字段(包括字段顺序)保持一致。
    • 手动录入:用户根据输入框中提示的字段类型输入相应类型的测试数据,测试数据需要去掉头部的字段名称
  2. 选择“source_mysql”,选择“手动录入”,将下面的测试数据粘贴到输入框。室友 1,租金室友 2,租金室友 3,租金室友 4,租金室友 1,有线电视室友 2,有线电视室友 3,有线电视室友 4,有线电视
  3. 选择“source_kafka”,选择“手动录入”,将下面的测试数据粘贴到输入框。

    室友 1,租金,3600.00 室友 2,租金,3500.00 室友 3,租金,2000.00 室友 4,租金,2000.00 室友 1,有线电视,250.00 室友 2,有线电视,250.00 室友 3,有线电视,250.00 室友 4,有线电视,250.00 室友 3,日用品,1500.00

  4. 点击“开始调试”,开始运行作业。
  5. 查看作业运行过程中产生的调试日志和调试结果。

发布作业

点击“发布”,设置CU数据,然后发布作业。其中,CU是作业运行所需资源的基本单位,1CU包括1核CPU和4G内存。

运维作业

启动作业

  1. 选择“阿里流式计算>作业管理>作业运维”,进入到作业运维页面。
  2. 点击列表中操作这一列的“启动”,启动当前作业开始运行。

如需了解作业运行过程中的监控数据和运行日志相关内容,请参照作业运维。