阿里云态势感知全量日志分析

阿里云全量日志分析功能模块定义

围绕企业用户日志集中管理和搜索分析核心需求,阿里云态势感知团队推出了全量日志分析功能,对现有的基础日志、告警日志、安全日志进行管理。同时支持用户自定义威胁检索,另存为威胁告警,日志导出,自定义报表分析,日志状态管理等功能。

功能说明

查询方式分为:检索模式,实时模式。

检索模式:检索模式实际上是周期性查询底层数据,所以需要指定查询周期。如果调度时间为1min,表示一分钟查询一次,目前不支持定时查询。实时模式:采用实时引擎,对任务进行实时处理。实时模式目前只支持处理时间(proctime),不支持事件时间(rowtime)。

简单查询模式:

1、简单过滤查询:主要用于ETL

@1、先选择日志类型。@2、添加过滤条件。

详细的中英文对照参考:字段说明

2、关联查询:主要用于原始日志的实时关联

@1、先选择日志类型。@2、添加关联条件。@3、添加过滤条件。

详细的中英文对照参考:字段说明

复杂查询模式:

1、查询语句标准sql92

2、对下面查询中出现的替换符进行说明:

变量名称 含义
table 表名
alias 别名
field 字段
relation 关联条件 比如:exDdosAlert.serverId=sufferBruteForce.serverId
condition 过滤条件 比如:serverId='i-kjewspceko'
avgFunction 聚合函数 比如count(*) sum(age) second是统计的窗口秒数,
比如300,表示5分钟,length是窗口秒数的长度,比如统计窗口为300秒,length=3

$avgFunction:

名称 聚合算法 聚合方式 约定
计数 count(A) A为表字段或者 A为任意类型
求和 sum(A) A,表中字段 A为数值类型
求最大值 MAX(A) A,表中字段 A为数值类型
求最小值 MIN(A) A,表中字段 A为数值类型
求平均值 AVG(A) A,表中字段 A为数值类型
去重 count(DISTINCT A) A,表中字段 A为任意类型

3、简单过滤查询:主要用于ETL

使用方式

selectfrom$table[where$condition[(AND|OR)$condition]]

示例

selectfromexDdosAlertwhereserverId='i-kjewspceko'

4、统计查询:用于聚合统计以及聚合后的过滤

使用方式

SELECT$avgFunctionFROM$table[WHERE$condition[(AND|OR)$condition]]GROUPBYTUMBLE(proctime,INTERVAL'$second'SECOND($length))[,$field]

TUMBLE表示滚动窗口,proctime是内置的时间,固定写法

示例

selectfrom(SELECTTUMBLE_END(proctime,INTERVAL'300'SECOND(3))asprocesstime,count()aspvcountFROMccAttackGROUPBYTUMBLE(proctime,INTERVAL'300'SECOND(3)),serverId)wherepvcount5

上面的查询意思为cc攻击日志按照serverId进行聚合,五分钟的时间窗口,访问次数5的日志TUMBLE_END表示该窗口的结束时间

5、关联查询:主要用于原始日志的实时关联

使用方式一

SELECTFROM$table1[AS$alias1],$table2[AS$alias]WHERE$relationAND$table1.proctimeBETWEEN$table2.proctime-INTERVAL'$second'ANDSECOND($length)AND$table2.proctime+INTERVAL'$second'SECOND($length)[(AND|OR)$relation][(AND|OR)$condition]

示例

SELECTFROMexDdosAlert,ccAttackwhereexDdosAlert.serverId=ccAttack.serverIdANDexDdosAlert.proctimeBETWEENccAttack.proctime-INTERVAL'100'SECOND(3)ANDccAttack.proctime+INTERVAL'100'SECOND(3)ANDexDdosAlert.serverName='i-kjewspceko'

使用方式二

SELECTFROM$table1[AS$alias][join$table2[AS$alias]on$relation[AND$relation]AND$table1.proctimeBETWEEN$table2.proctime-INTERVAL'$second'SECOND($length)AND$table2.proctime+INTERVAL'$second'SECOND($length)[(AND|OR)$condition]]+[WHEREcondition[(AND|OR)condition]]

示例

  • 双表SELECTt1.,t2.FROMflowSessionASt1JOINexDdosAlertASt2ONt1.srcIp=t2.serverIdANDt1.proctimeBETWEENt2.proctime-INTERVAL'100'SECOND(3)ANDt2.proctime+INTERVAL'100'SECOND(3)
  • 多表SELECTt1.,t2.FROMflowSessionASt1JOINexDdosAlertASt2ONt1.srcIp=t2.serverIdANDt1.proctimeBETWEENt2.proctime-INTERVAL'100'SECOND(3)ANDt2.proctime+INTERVAL'100'SECOND(3)JOINccAttackast3ONt1.srcIP=t3.serverIdandt1.proctimeBETWEENt3.proctime-INTERVAL'100'SECOND(3)ANDt3.proctime+INTERVAL'100'SECOND(3)

6、维度表查询:针对于安全日志的关联,关联日志属于离线数据,如果按照实时关联,实际上查不出结果,对于这种场景,正常的操作应该是通过实时数据区关联维表的数据,目前只支持两张表关联,且第二张表为维度表

使用方式

selectfrom$table1ast1JOIN$table2FORSYSTEM_TIMEASOFt1.proctimeast2ON$relation[(AND|OR)$relation][WHERE$condition[(AND|OR)condition]]

$table2必须为维表,此处指的是安全日志,比如基线数据、漏洞数据

示例

selectfromsufferBruteForceast1joinhostBaselineFORSYSTEM_TIMEASOFt1.proctimeast2ont2.serverId=t1.serverIdwheret1.serverId='i-kjewspceko'

7、时序查询:时序模式用于攻击链分析的场景,数据的输入为所有的告警数据,而不是其中的一个,在做这种分析的时候,通过alertlog代替具体的告警进行分析

使用方式

SELECTFROMalertlogMATCH_RECOGNIZE'('[PARTITIONBYexpression[,expression]]//指定分区的列,可选项[ORDERBYorderItem[,orderItem]]//指定排序的字段,目前只支持proctime[MEASURESmeasureColumn[,measureColumn]],相当于select,后面为需要查询的字段[ONEROWPERMATCH]//对于每一次成功的匹配,只会产生一个输出事件;[AFTERMATCH(SKIPTONEXTROW//匹配成功之后,从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配|SKIPPASTLASTROW//匹配成功之后,从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配|SKIPTOFIRSTvariable//匹配成功之后,从匹配成功的事件序列中第一个对应于variable的事件开始下一次匹配|SKIPTOLASTvariable//匹配成功之后,从匹配成功的事件序列中最后一个对应于variable的事件开始下一次匹配|SKIPTOvariable)]PATTERN'('pattern')'//定义待识别的事件序列需要满足的规则,需要定义在()中,由一系列自定义的patternVariable构成:patternVariable之间若以空格间隔,表示符合这两种patternVariable的事件紧挨着,中间不存在其他事件;patternVariable之间若以-间隔,表示符合这两种patternVariable的事件之间可以存在其它事件。[WITHINintervalLiteral]//时间范围,意为此攻击链的分析需要在多长的时间内完成DEFINEvariableAScondition[,variableAScondition]//相当于where,后面为过滤条件')'

示例

selectfromalertlogMATCH_RECOGNIZE(PARTITIONBYserverIdORDERBYproctimeMEASURESA.fixedIpASA_FixedIpONEROWPERMATCHAFTERMATCHSKIPPASTLASTROWPATTERN(AB)DEFINEAasA.logType='exDdosAlert',BasB.logType='sufferBruteForce'