logstash切割日志案例-grok、kv、ruby使用

近期时间主要都在研究elk三件套,既elasticsearch,logstash和kibana

记录一个日志过滤的案例

实际数据有部分删减,

原日志:

1
<186>1 2021-07-05T08:51:57.198+08:00 iMaster NCE ALARM - AC-004 [alarm resource="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" probableCause="与远端站点建立的BGP协议中断。" perceivedSeverity=MAJOR" eventType="OTHER" resourceURI="/devices/e7257a97fc1a"] alarmId="66666" alarmName="" alarmType="0" occurTime="2021-07-05 08:52:00" locationInfo="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" additionInfo="The connection has entered the down state. (ConnectionID = 111; SrcTNPID = 111; DestSiteID = 111; DestTNPID = 111)" type="Alarm"

首先使用grok对日志进行初步的切割

grok {
    patterns_dir => ["/usr/share/logstash/conf.d/patterns"]
    match => {
        "message" => "%{DATA:head}%{TIMESTAMP_ISO8601:timestamp}%{TEMP1:temp1}%{ALARM:alarm}%{TEMP2:temp2}%{TOEND:toend}"
    }
}

其中patterns为简单的正则

1
2
3
4
TEMP1 \s.*?\[alarm\s
ALARM [\S\s]*
TEMP2 \]\s
TOEND [\s\S]*

匹配到数据后会按顺序放在对应的字段中,logstash的配置文件也是顺序执行的,多个插件使用时按顺序放配置就好。

grok的自定义规则可以使用kibana中的开发者工具快速的进行测试,不然每次写好都运行一次logstash-非常麻烦

例如其中匹配到的alarm字段的值为

1
resource="OID=1.1.1.1.1.1.1;index=xxxxxxxxxxxxxxxxxxx;ESN=xxxxxxxxxxxxxx" probableCause="与远端站点建立的BGP协议中断。" perceivedSeverity=MAJOR" eventType="OTHER" resourceURI="/devices/e7257a97fc1a"

因为特征非常明显,所以可以直接使用kv插件进行匹配

使用kv插件

配置文件

kv {
    source=> "alarm"
    trim_value => "\""
}

此处就是取出alarm字段自动处理,就可以得到一个个键值对信息。

toend字段的处理与alarm一致

使用ruby插件

因为有一部分内容的格式不固定,所以使用ruby插件来处理,ruby插件其实就是使用ruby语言编写字符串(数据)处理脚本。

…这也是我也是第一次接触ruby语言

conf配置如下

ruby {
    path => "/usr/share/logstash/conf.d/additionInfo.rb"
}

ruby脚本如下

1
2
3
4
5
6
7
def filter(event)
_additionInfo = event.get('additionInfo')
event.set("device_id",event.get('resourceURI').split("/").last)
# ......中间省略了很多,主要就是必须有一个filter方法,可以使用event操作logstash的字段,最终返回一个event对象
event.set("type_zh",_type)
return [event]
end

使用mutate删除多余的字段

最后删除多余的字段就可以了

mutate {
    remove_field => ["head","temp1","temp2","alarm"]
}

完整的脚本如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
input {
#beats {
#port => 5044
#codec => "json"
# }
elasticsearch {
schedule => "* * * * *"
hosts => "0.0.0.0:9200"
index => "huawei-sdwan"
user => "bothwin"
password => "both-win"
docinfo => true
docinfo_fields => ["_id","@timestamp"]
query => '{"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "now/d",
"lt": "now+1d/d"
}
}
},{
"match": {
"message": "probableCause"
}
}
],
"should": []
}
}}'
}
}
filter {
fingerprint {
source => ["message"]
target => "[@metadata][generated_id]"
method => "SHA256"
key => "fuhb"
concatenate_sources => true
}
grok {
patterns_dir => ["/usr/share/logstash/conf.d/patterns"]
match => {
"message" => "%{DATA:head}%{TIMESTAMP_ISO8601:timestamp}%{TEMP1:temp1}%{ALARM:alarm}%{TEMP2:temp2}%{TOEND:toend}"
}
}
kv {
source=> "alarm"
trim_value => "\""
}
kv {
source=> "toend"
default_keys => [ "alarmName","" ]
}
ruby {
path => "/usr/share/logstash/conf.d/additionInfo.rb"
}
mutate {
remove_field => ["head","temp1","temp2","alarm"]
#测试时删除message字段
# remove_field => ["toend"]
# remove_field => ["timestamp","resource","perceivedSeverity","message"]
}

}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "huawei-sdwan"
document_id => "%{[@metadata][generated_id]}"
}
stdout { codec => rubydebug }
}


logstash切割日志案例-grok、kv、ruby使用

https://blog.rui.plus/2021/2021-7-16/

作者

Rui plus

发布于

2021-07-16

更新于

2021-07-16

许可协议

评论