将Kafka中的数据导入Es

创建集群

在将数据导入Es之前需要再阿里云上创建Es集群,假定创建的集群信息如下:

这里主要记录以下信息:

  • 集群ID: 296245916518715392
  • 创建集群时的密码: bbs_2016

创建Kafka Topic

登录阿里云管理控制台,进入kafka产品界面,创建topic,并向topic中灌入数据。示例中我们灌入的数据如下,包括两个json字段,样例数据如下:

示例中创建topic:a15fdd9dd5154845b32f7c74ae155ae3__demo_test 并且确保该topic下有对应的证书,将证书下载到本地。

编辑BSC 作业

创建Kafka Source

进入BSC编辑作业界面,创建kafka source table, sql代码如下

CREATE table source_table_kafka(stringtype STRING,longtype LONG) with( type = 'BKAFKA', topic = 'a15fdd9dd5154845b32f7c74ae155ae3__demo_test', kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091', sslFilePath = 'kafka_key.zip', encode = 'json');

其中sslFilePath = 'kafka-key.zip',为上一步下载到本地的kafka证书。

上传Kafka证书

点击高级设置,上传kafka证书

上传之后如下图

创建Es Sink Table

sql代码如下

create table sink_table_es( stringtype String, longtype Long)with( type = 'ES', es.net.http.auth.user = 'superuser', es.net.http.auth.pass = 'bbs_2016', es.resource = 'bsc_test/doc_type', es.clusterId = '296245916518715392', es.region = 'bd', es.port = '8200', es.version = '6.5.3');

其中:

  • es.resource对应es的索引与类型,es会在bsc写入数据时自动创建指定索引
  • es.clusterId对应es的集群ID
  • es.region 表示 Es服务所在的地区的代码,可以参考 Es服务区域代码 中查询区域与代码的对应关系。

编写导入语句

sql语句如下:

insert into sink_table_es(stringtype, longtype) outputmode appendselect stringtype, longtypefrom source_table_kafka;

保存作业并发布运行作业