阿里云实时计算新增MySQL实时采集任务步骤

本节介绍如何在实时计算中配置MySQL实时采集任务。目前实时采集任务,阿里云上MySQL数据库、阿里云上MySQL-PXC数据库暂不支持以Binlog方式进行数据采集。
用户自建数据库支持Binlog方式进行数据采集,前提条件需要运行以下SQL语句获取权限,
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT
ON *.* TO用户名’@‘%’ IDENTIFIED BY密码’;
后期阿里云实时计算将计划支持Binlog数据采集方式。
注:实时数据采集任务,当源端与目标端分别为MySQL、Kafka时,MySQL的表不能为空,用户需要确保MySQL的表中至少存在一条数据,否则任务会报错。
说明:实时采集的优点是读取压力小, 数据准确,缺点是需要管理员手动开启日志服务才可使用。
间隔轮训是通过JDBC发起查询请求,通过短间隔的查询数据的方式来达到一个近似实时采集的功能。
间隔轮训优点是不需要手动开启日志服务,缺点是间隔轮训间隔过短且轮训数据量大时,容易给服务器带来较高的负担。

 新建MySQL实时采集任务-向导模式
参数配置说明
 
任务类型
 
MySQL实时采集支持两种类型:通过Binlog实时采集、通过间隔轮询进行采集
 
数据源
 
选择对应数据类型的已有数据源
 
Schema
 
选择数据源下指定Schema
 

 
选择数据源表,支持多表合并写入一张表;
 
多表合并写入时,无法保证完全根据业务顺序写入;
 
增量标识字段
 
用户需选择增量标示字段,每次同步时,系统自动记录增量标识的最大值,下次运行时,会从上一次的最大值继续同步数据,实现增量同步;支持将数值类型、Timestamp类型作为增量标识字段
 
采集起点
 
用户根据选择的增量标示字段设定相应的采集起点,若不填则默认从头开始拉取数据,输入格式请在"数据预览"中参考所选增量标示字段内容。采集时不包含采集起点,例如采集起点为40 则采集开始时不会包含id=40这一条数据。
 
轮询时间间隔
 
手动设定轮询时间间隔,单位为秒
 
高级配置
 
以JSON格式添加高级参数,例如对关系型数据库可配置fetchSize,每类数据源支持不同的参数
新建MySQL实时采集任务-脚本模式
创建间隔轮询方式的脚本模式的流程
 
1、创建一个脚本模式的任务:
 
(1)右击新建任务
 
(2)选择相应的任务类型

2、快速创建模板
 
(1)、选择创建模板

(2)选择好相对应的数据源

3、替换默认生成的脚本
 
(1)默认生成的脚本内容

(2)需要替换的内容如下
 
{
 
  "job" : {
 
    "content" : [ {
 
      "reader" : {
 
        "parameter" : {
 
          "password " : "******",
 
          "increColumn"  : "id",
 
          "column" :  [ {
 
            "name" : "id",
 
            "type" : "INT",
 
            "key" : "id"
 
          } ],
 
          "pollingInterval" : 5000,
 
          "connection" : [ {
 
            "jdbcUrl"  : [ "jdbc:mysql://host:port/dbname" ],
 
            "table"  : [ "dbname.table" ]
 
          } ],
 
          "polling" : true,
 
          "username"  : "testuser"
 
        },
 
        "name" : "mysqlreader"
 
      },
 
      "writer" : {
 
        "parameter" : {
 
          "tableFields"  : [ "id" ],
 
          "producerSettings" : {
 
            "zookeeper.connect " : "ip:2181",
 
            "bootstrap.servers"  : "ip:9092"
 
          },
 
          "topic"  : "topic"
 
        },
 
        "name" : "kafka10writer",
 
        "type" : 17
 
      }
 
    } ],
 
    "setting" : {
 
      "restore" : {
 
        "isRestore" : true,
 
        "isStream" : true,
 
        "restoreColumnName" : "id",
 
        "restoreColumnIndex" : 0
 
      },
 
      "errorLimit" : {
 
      },
 
      "speed" : {
 
        "bytes" : -1048576,
 
        "channel" : 1
 
      }
 
    }
 
  }
 
}