logstash切割日志案例-grok、kv、ruby使用
近期时间主要都在研究elk三件套,既elasticsearch,logstash和kibana
记录一个日志过滤的案例
实际数据有部分删减,
原日志:
1 | <186>1 2021-07-05T08:51:57.198+08:00 iMaster NCE ALARM - AC-004 [alarm resource="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" probableCause="与远端站点建立的BGP协议中断。" perceivedSeverity=MAJOR" eventType="OTHER" resourceURI="/devices/e7257a97fc1a"] alarmId="66666" alarmName="" alarmType="0" occurTime="2021-07-05 08:52:00" locationInfo="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" additionInfo="The connection has entered the down state. (ConnectionID = 111; SrcTNPID = 111; DestSiteID = 111; DestTNPID = 111)" type="Alarm" |
首先使用grok对日志进行初步的切割
grok {
patterns_dir => ["/usr/share/logstash/conf.d/patterns"]
match => {
"message" => "%{DATA:head}%{TIMESTAMP_ISO8601:timestamp}%{TEMP1:temp1}%{ALARM:alarm}%{TEMP2:temp2}%{TOEND:toend}"
}
}
其中patterns为简单的正则
1 | TEMP1 \s.*?\[alarm\s |
匹配到数据后会按顺序放在对应的字段中,logstash的配置文件也是顺序执行的,多个插件使用时按顺序放配置就好。
grok的自定义规则可以使用kibana中的开发者工具快速的进行测试,不然每次写好都运行一次logstash-非常麻烦
例如其中匹配到的alarm字段的值为
1 | resource="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" probableCause="与远端站点建立的BGP协议中断。" perceivedSeverity=MAJOR" eventType="OTHER" resourceURI="/devices/e7257a97fc1a" |
因为特征非常明显,所以可以直接使用kv插件进行匹配
使用kv插件
配置文件
kv {
source=> "alarm"
trim_value => "\""
}
此处就是取出alarm字段自动处理,就可以得到一个个键值对信息。
toend字段的处理与alarm一致
使用ruby插件
因为有一部分内容的格式不固定,所以使用ruby插件来处理,ruby插件其实就是使用ruby语言编写字符串(数据)处理脚本。
…这也是我也是第一次接触ruby语言
conf配置如下
ruby {
path => "/usr/share/logstash/conf.d/additionInfo.rb"
}
ruby脚本如下
1 | def filter(event) |
使用mutate删除多余的字段
最后删除多余的字段就可以了
mutate {
remove_field => ["head","temp1","temp2","alarm"]
}
完整的脚本如下
1 | input { |
logstash切割日志案例-grok、kv、ruby使用