前提#
好久沒有給博客除草了,正好最近折騰了下 logstash,記錄一下。
為啥要用 logstash 呢,其實是因為在測試環境上面騰訊雲 tke 的日誌沒有開啟日誌收集,所以在排查問題的時候會十分的痛苦,正好有空了就想着將日誌抽出來放進 es 裡面,方便以後排查問題,正好看到騰訊雲的日誌規則是允許將 pod 的 stdout 日誌進行採集之後投遞到 kafka 的,就小試了一下。
部署 logstash#
logstash 我選擇使用 docker-compose 來進行快速的部署。
以下是部署流程,參考自 deviantony/docker-elk 項目
創建目錄#
mkdir logstash/config logstash/pipeline -p
創建環境變量#
路徑 .env
ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'
創建 Dockerfile#
路徑 logstasg/Dockerfile
ARG ELASTIC_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}
配置文件#
路徑 logstash/config/logstash.yml
---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0
node.name: logstash
路徑 logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 50000
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
啟動服務#
version: '3.7'
services:
logstash:
build:
context: logstash/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- 5044:5044
- 50000:50000/tcp
- 50000:50000/udp
- 9600:9600
environment:
LS_JAVA_OPTS: -Xms256m -Xmx256m
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
depends_on:
- elasticsearch
restart: unless-stopped
配置 logstash pipeline#
配置 input#
因為日誌要從 kafka 讀取,所以要在 input 塊內聲明一個新的數據源
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka {
bootstrap_servers => "kafka 地址"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
配置過濾器#
由於騰訊雲投遞到 kafka 的數據不是可以直接用的,數據示例如下
{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}
這時候就要對數據進行預處理,因為有很多數據是不需要添加進 es 裡面的。使用了 logstags 的 filter 進行處理。
保存原始數據#
這個過濾器添加了一個叫做 source 的字段,並且將 kafka 消息全部存儲在裡面
mutate {
add_field => { "source" => "%{[message]}" }
}
解析 json#
使用 json 插件,對 message 字段的 json 字符串進行序列化,並且將序列化後的數據存儲在 kafkajson 字段內,同時刪除 message 字段
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
提取日誌元數據#
將日誌中的一些定位數據提取到外層
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
使用正則表達式處理數據#
騰訊雲給出的數據會在服務自己打印的日誌前方添加時間以及 ”stdout F“,所以要使用正則表達式將我們不需要的東西做下替換
2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
提取服務打印的日誌#
這回終於可以將日誌提取出來了
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
提取日誌等級#
最後一步,將日誌的等級字段提取到外層,方便使用
mutate {
rename => { "[data][level]" => "level" }
}
完整 pipeline#
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka{
bootstrap_servers => "kafka 地址"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
filter {
mutate {
add_field => { "source" => "%{[message]}" }
}
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
mutate {
rename => { "[data][level]" => "level" }
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
特別鳴謝#
在本次配置 logstash 的過程中,我自己搭建的、使用 openAPI API 的、基於 GPT3.5 語言模型的 telegram bot 給予了我很大的幫助。
以下是我與它的對話 (對話內容經過處理)
johnpoint, [5/15/23 2:18 PM]
我想從 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} 這些數據裡面使用logstash 提取出json數據,logstash 的pipeline該怎麼寫
openAI, [5/15/23 2:18 PM]
假設你的日誌文件是以JSON格式輸出的,你需要配置一個JSON解析器來解析日誌中的JSON數據,並將其轉換為可用的事件。以下是一個簡單的Logstash配置文件:
```
input {
file {
path => "/path/to/your/logfile"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
```
在logstash的input部分,我們指定讀取的文件的路徑,以及從文件的開始位置開始讀入數據。
在filter部分,我們使用`json`插件從日誌中提取json對象。
最後,我們在output部分使用`rubydebug`進行輸出。
運行這個Logstash的配置命令:
```
bin/logstash -f /path/to/your/config/file
```
這樣,logstash就會從你指定的文件中讀取數據,並把提取的JSON對象輸出到控制台上。
johnpoint, [5/15/23 2:19 PM]
這個數據有json格式的數據,但是在json格式的數據前面還有一些髒數據,是否可以去除?
openAI, [5/15/23 2:19 PM]
可以,在使用 logstash 分析數據的時候,可以使用 grok 這個插件,通過正則表達式匹配出需要的數據。
以下是一個可能符合你提供的數據的 grok 表達式:
```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```
這個正則表達式的含義是匹配以 ISO8601 格式表示的時間戳(如 "2023-05-15T13:51:00.004687384+08:00")和以字符串 "stdout F " 結尾的內容,然後跟著一個 Json 格式的字符串。"%{GREEDYDATA:json_data}" 表示將這個字符串保存到一個名為 "json_data" 的字段中。
上述 grok 表達式可以在 Logstash 的配置文件中使用,例如:
```
input {
# 配置 logstash 接收數據的源
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
}
}
# 其他過濾器
}
output {
# 配置 logstash 輸出數據的目標
}
```
這個配置文件中,通過 filter 下的 grok 插件匹配出了 timestamp 和 json_data 兩個字段。從而可以使用 logstash 利用這些數據進行更進一步的操作,如輸出到 Elasticsearch 或者 stdout。達到你想要的結果。
其中 `%{TIMESTAMP_ISO8601:timestamp}` 表示匹配 ISO8601 格式的時間戳,並將其保存到一個名為 "timestamp" 的字段中。`%{GREEDYDATA:json_data}` 表示匹配任意字符,可以匹配行末附加的不符合 Json 格式的內容。因為 grok 會一直匹配到行尾,所以匹配過程中會包括最後一個字段(即 json_data)中包含的所有內容。
johnpoint, [5/15/23 2:36 PM]
返回 Can't get text on a START_OBJECT at 1:8 是什麼錯誤
openAI, [5/15/23 2:36 PM]
這個錯誤常見於嘗試將 JSON 對象作為字符串進行解析的情況下,但是實際上得到的是 JSON 對象,而不是字符串。這意味著該對象的開頭是一個“{”,而不是一個“"”。可以檢查代碼調用的函數或方法,以確保解析的字符串確實是一個字符串。另外,可以檢查 JSON 對象的源代碼,以確保它是按照預期格式進行編寫的。
johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
這行日誌表示 logstash 有什麼問題呢
openAI, [5/15/23 2:53 PM]
這行日誌表示 Logstash 在解析輸入數據 json_data 時遇到了編碼問題,其中輸入數據包含了 `\'` 等不符合 JSON 規範的字符,導致 JSON 解析器無法正確解析。具體來說,可能是輸入數據中使用了反斜線對引號進行轉義,而又沒有正確地對反斜線本身進行轉義。可以嘗試對輸入數據進行預處理,將不符合 JSON 規範的字符進行轉義或刪除,以便讓 Logstash 能夠正確解析數據。
johnpoint, [5/15/23 2:53 PM]
可以給出相對應 的 pipeline 配置嗎
openAI, [5/15/23 2:53 PM]
好的,以下是一個簡單的 Logstash Pipeline 配置,用於從輸入中提取 `json_data` 字段,並過濾掉任何無效數據:
```conf
input {
# 在這裡配置輸入插件
stdin {}
}
filter {
# 從 json_data 字段提取 JSON 數據
json {
source => "json_data"
# 用於忽略不完整的 JSON 數據字符串
skip_on_invalid_json => true
}
#過濾掉沒有有效 JSON 數據的事件
if "_jsonparsefailure" in [tags] {
drop {}
}
}
output {
# 在這裡配置輸出插件
stdout {}
}
```
您需要根據自己的需求進行相應的修改和調整,比如將 `json_data` 改成實際的字段名,或者使用適合您的輸入和輸出插件。
johnpoint, [5/15/23 2:54 PM]
使用grok提取的json數據會多出不必要的轉義反斜杠
openAI, [5/15/23 2:54 PM]
是的,這是因為 Grok 模式匹配時會將反斜杠視為轉義字符,因此如果日誌中原本就有反斜杠,會被額外轉義。為了解決這個問題,可以在配置文件中使用 gsub 過濾器去除多餘的反斜杠。示例配置如下:
```
input {
stdin { }
}
filter {
grok {
match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
}
mutate {
gsub => ["json_data", "\\\\", "\\"]
}
json {
source => "json_data"
}
}
output {
stdout { codec => rubydebug }
}
```
這個配置中首先使用 Grok 解析出日誌中的 JSON 數據,並將其存儲到 `json_data` 字段中。然後使用 `mutate` 過濾器中的 `gsub` 將多餘的反斜杠替換掉,最後使用 `json` 過濾器將其轉換成 JSON 格式的事件。這樣就可以避免多出不必要的反斜杠了。
johnpoint, [5/15/23 3:00 PM]
報錯了 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]
openAI, [5/15/23 3:00 PM]
這個錯誤是在 Logstash 配置文件中,第36行,第34列(672個字節)處,期望看到 `[ \t\r\n]", "#", "{", ",", "]"` 中的一个字符,但是實際遇到的字符無法按預期解析。這可能是因為語法錯誤或表達式不正確導致的。請檢查您的配置文件並確保語法正確,並解決任何表達式錯誤。
另外,由於錯誤信息無法提供足夠的上下文,您還可能需要檢查其他部分的配置文件以確定是否存在其他錯誤。
johnpoint, [5/15/23 3:15 PM]
logstash 可以將一個字段的數據移動到另一個字段裡面去嗎
openAI, [5/15/23 3:15 PM]
可以的,可以使用logstash的filter插件中的mutate插件的rename選項來實現.
例如,如果數據來源的字段名為`source_field`,想要將其複製到目標字段`target_field`,可以使用以下配置:
```
filter {
mutate {
rename => { "source_field" => "target_field" }
}
}
```
這將把`source_field`的值複製到新字段`target_field`中,並刪除`source_field`字段本身。
johnpoint, [5/15/23 3:19 PM]
如果我要保留這個字段呢
openAI, [5/15/23 3:19 PM]
可以使用 mutate 插件的 add_field 選項,用於在事件中創建新字段。
比如,如果您想要在 Logstash 事件中創建一個新字段 "new_field" 並從原始字段 "old_field" 中移動其值:
```
filter {
mutate {
add_field => { "new_field" => "%{[old_field]}" }
remove_field => [ "old_field" ]
}
}
```
這將將 "old_field" 中的值賦給新的 "new_field" 字段,並將刪除 "old_field" 字段。如果您想在保留源字段的同時移動值,可以將 "remove_field" 選項刪除。
johnpoint, [5/15/23 3:21 PM]
logstash 刪除自動創建 keyword 字段
openAI, [5/15/23 3:21 PM]
Logstash 中默認會為每個字符串類型的字段創建一個 `keyword` 類型的子字段,作為原始字段的精確值。如果你不需要這個子字段,可以在索引模板(template)中將它移除:在 `mappings` 中設置 `"include_in_all": false` 即可。但需要注意的是,使用該方法刪除 `keyword` 字段會導致失去部分精確查詢的能力。
如果你想要在日誌流中保留 `keyword` 字段,可以使用 `remove_field` 過濾器來刪除原始字段,但保留 `keyword` 字段:
```ruby
filter {
mutate {
remove_field => ["my_field"]
rename => { "my_field.keyword" => "new_field" }
}
}
```
上面的例子中,Logstash 會刪除原始的 `my_field` 字段,並將精確值保存在 `my_field.keyword` 子字段中,最後使用 `rename` 過濾器,將 `my_field.keyword` 重命名為 `new_field`。這樣,你就可以在 Elasticsearch 中使用 `new_field` 進行精確查詢。
johnpoint, [5/15/23 4:18 PM]
logstash 為什麼創建的 es index 沒有每天滾動的名字
openAI, [5/15/23 4:18 PM]
logstash 創建 ES index 默認採用的是 Daily Index 的模式,每天一個 Index,並且附帶日期的後綴。例如,一個名為 `logstash` 的索引將被轉換成 `logstash-yyyy.mm.dd` 的名字,在寫入數據時,會根據 UTC 時區來決定對應日期的具體值。如果沒有滾動,一種可能是配置文件中未啟用,還有一個可能是 Elasticsearch 集群配置文件設置了禁止創建新 index,可以檢查一下 Elasticsearch 的配置文件中的相關設置。
johnpoint, [5/15/23 4:19 PM]
logstash 怎麼開啟 Daily Index 模式
openAI, [5/15/23 4:19 PM]
要開啟 Daily Index 模式,需要修改 Logstash 的配置文件。在 `output` 配置中,可以使用 `index` 配置項來指定 Elasticsearch 索引的名稱。為了讓索引名稱按日滾動,可使用日期格式佔位符,例如:
```
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "your-index-name-%{+YYYY.MM.dd}"
}
}
```
在上述配置中,索引名稱的格式為 `your-index-name-YYYY.MM.dd`,其中 `%{+YYYY.MM.dd}` 是一個日期格式佔位符,表示當前日期。這樣配置後,每天會自動創建一個新的索引。
還需注意的是,為了啟用 Date Index 模式,需要在 Elasticsearch 的配置文件中將 `action.auto_create_index` 設置為 `true`。