將nginx日志通過filebeat收集后傳入logstash,經(jīng)過logstash處理后寫入elasticsearch。filebeat只負責收集工作,logstash完成日志的格式化,數(shù)據(jù)的替換,拆分 ,以及將日志寫入elasticsearch后的索引的創(chuàng)建。
1、配置nginx日志格式
log_format main '$remote_addr $http_x_forwarded_for [$time_local] $server_name $request '
'$status $body_bytes_sent $http_referer '
'"$http_user_agent" '
'"$connection" '
'"$http_cookie" '
'$request_time '
'$upstream_response_time';
2、安裝配置filebeat,啟用nginx module
tar -zxvf filebeat-6.2.4-linux-x86_64.tar.gz -C /usr/local
cd /usr/local;ln -s filebeat-6.2.4-linux-x86_64 filebeat
cd /usr/local/filebeat
啟用nginx模塊
./filebeat modules enable nginx
查看模塊
創(chuàng)建配置文件
vim /usr/local/filebeat/blog_module_logstash.yml
filebeat.modules:
- module: nginx
access:
enabled: true
var.paths: ["/home/weblog/blog.cnfol.com_access.log"]
#error:
# enabled: true
# var.paths: ["/home/weblogerr/blog.cnfol.com_error.log"]
output.logstash:
hosts: ["192.168.15.91:5044"]
啟動filebeat
./filebeat -c blog_module_logstash.yml -e
3、配置logstash
tar -zxvf logstash-6.2.4.tar.gz /usr/local
cd /usr/local;ln -s logstash-6.2.4 logstash
創(chuàng)建一個nginx日志的pipline文件
cd /usr/local/logstash
logstash內(nèi)置的模板目錄
vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns
編輯 grok-patterns 添加一個支持多ip的正則
FORWORD (?:%{IPV4}[,]?[ ]?)+|%{WORD}
官方grok
http://grokdebug.herokuapp.com/patterns#
創(chuàng)建logstash pipline配置文件
#input {
# stdin {}
#}
# 從filebeat接受數(shù)據(jù)
input {
beats {
port => 5044
host => "0.0.0.0"
}
}
filter {
# 添加一個調(diào)試的開關
mutate{add_field => {"[@metadata][debug]"=>true}}
grok {
# 過濾nginx日志
#match => { "message" => "%{NGINXACCESS_TEST2}" }
#match => { "message" => '%{IPORHOST:clientip} # (?<http_x_forwarded_for>[^\#]*) # \[%{HTTPDATE:[@metadata][webtime]}\] # %{NOTSPACE:hostname} # %{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} # %{NUMBER:response} # (?:%{NUMBER:bytes}|-) # (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) # (?:"(?<http_user_agent>[^#]*)") # (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) # (?:"(?<cookies>[^#]*)") # %{NUMBER:request_time:float} # (?:%{NUMBER:upstream_response_time:float}|-)' }
#match => { "message" => '(?:%{IPORHOST:clientip}|-) (?:%{TWO_IP:http_x_forwarded_for}|%{IPV4:http_x_forwarded_for}|-) \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) (?:"(?<cookies>[^#]*)") %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
match => { "message" => '(?:%{IPORHOST:clientip}|-) %{FORWORD:http_x_forwarded_for} \[%{HTTPDATE:[@metadata][webtime]}\] (?:%{HOSTNAME:hostname}|-) %{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion} %{NUMBER:response} (?:%{NUMBER:bytes}|-) (?:"(?:%{NOTSPACE:referrer}|-)"|%{NOTSPACE:referrer}|-) %{QS:agent} (?:"(?:%{NUMBER:connection}|-)"|%{NUMBER:connection}|-) %{QS:cookie} %{NUMBER:request_time:float} (?:%{NUMBER:upstream_response_time:float}|-)' }
}
# 將默認的@timestamp(beats收集日志的時間)的值賦值給新字段@read_tiimestamp
ruby {
#code => "event.set('@read_timestamp',event.get('@timestamp'))"
#將時區(qū)改為東8區(qū)
code => "event.set('@read_timestamp',event.get('@timestamp').time.localtime + 8*60*60)"
}
# 將nginx的日志記錄時間格式化
# 格式化時間 20/May/2015:21:05:56 +0000
date {
locale => "en"
match => ["[@metadata][webtime]","dd/MMM/yyyy:HH:mm:ss Z"]
}
# 將bytes字段由字符串轉(zhuǎn)換為數(shù)字
mutate {
convert => {"bytes" => "integer"}
}
# 將cookie字段解析成一個json
#mutate {
# gsub => ["cookies",'\;',',']
#}
# 如果有使用到cdn加速http_x_forwarded_for會有多個ip,第一個ip是用戶真實ip
if[http_x_forwarded_for] =~ ", "{
ruby {
code => 'event.set("http_x_forwarded_for", event.get("http_x_forwarded_for").split(",")[0])'
}
}
# 解析ip,獲得ip的地理位置
geoip {
source => "http_x_forwarded_for"
# # 只獲取ip的經(jīng)緯度、國家、城市、時區(qū)
fields => ["location","country_name","city_name","region_name"]
}
# 將agent字段解析,獲得瀏覽器、系統(tǒng)版本等具體信息
useragent {
source => "agent"
target => "useragent"
}
#指定要刪除的數(shù)據(jù)
#mutate{remove_field=>["message"]}
# 根據(jù)日志名設置索引名的前綴
ruby {
code => 'event.set("@[metadata][index_pre]",event.get("source").split("/")[-1])'
}
# 將@timestamp 格式化為2019.04.23
ruby {
code => 'event.set("@[metadata][index_day]",event.get("@timestamp").time.localtime.strftime("%Y.%m.%d"))'
}
# 設置輸出的默認索引名
mutate {
add_field => {
#"[@metadata][index]" => "%{@[metadata][index_pre]}_%{+YYYY.MM.dd}"
"[@metadata][index]" => "%{@[metadata][index_pre]}_%{@[metadata][index_day]}"
}
}
# 將cookies字段解析成json
# mutate {
# gsub => [
# "cookies", ";", ",",
# "cookies", "=", ":"
# ]
# #split => {"cookies" => ","}
# }
# json_encode {
# source => "cookies"
# target => "cookies_json"
# }
# mutate {
# gsub => [
# "cookies_json", ',', '","',
# "cookies_json", ':', '":"'
# ]
# }
# json {
# source => "cookies_json"
# target => "cookies2"
# }
# 如果grok解析存在錯誤,將錯誤獨立寫入一個索引
if "_grokparsefailure" in [tags] {
#if "_dateparsefailure" in [tags] {
mutate {
replace => {
#"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{+YYYY.MM.dd}"
"[@metadata][index]" => "%{@[metadata][index_pre]}_failure_%{@[metadata][index_day]}"
}
}
# 如果不存在錯誤就刪除message
}else{
mutate{remove_field=>["message"]}
}
}
output {
if [@metadata][debug]{
# 輸出到rubydebuyg并輸出metadata
stdout{codec => rubydebug{metadata => true}}
}else{
# 將輸出內(nèi)容轉(zhuǎn)換成 "."
stdout{codec => dots}
# 將輸出到指定的es
elasticsearch {
hosts => ["192.168.15.160:9200"]
index => "%{[@metadata][index]}"
document_type => "doc"
}
}
}
啟動logstash
nohup bin/logstash -f test_pipline2.conf &
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。