johnpoint

johnpoint

(。・∀・)ノ゙嗨
github

使用 logstash 采集來自騰訊雲 tke 的日誌

前提#

好久沒有給博客除草了,正好最近折騰了下 logstash,記錄一下。

為啥要用 logstash 呢,其實是因為在測試環境上面騰訊雲 tke 的日誌沒有開啟日誌收集,所以在排查問題的時候會十分的痛苦,正好有空了就想着將日誌抽出來放進 es 裡面,方便以後排查問題,正好看到騰訊雲的日誌規則是允許將 pod 的 stdout 日誌進行採集之後投遞到 kafka 的,就小試了一下。

部署 logstash#

logstash 我選擇使用 docker-compose 來進行快速的部署。

以下是部署流程,參考自 deviantony/docker-elk 項目

創建目錄#

mkdir logstash/config logstash/pipeline -p

創建環境變量#

路徑 .env

ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'

創建 Dockerfile#

路徑 logstasg/Dockerfile

ARG ELASTIC_VERSION

# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}

配置文件#

路徑 logstash/config/logstash.yml

---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0

node.name: logstash

路徑 logstash/pipeline/logstash.conf

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

啟動服務#

version: '3.7'

services:
  logstash:
    build:
      context: logstash/
      args:
        ELASTIC_VERSION: ${ELASTIC_VERSION}
    volumes:
      - ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
      - ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
    ports:
      - 5044:5044
      - 50000:50000/tcp
      - 50000:50000/udp
      - 9600:9600
    environment:
      LS_JAVA_OPTS: -Xms256m -Xmx256m
      LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
    depends_on:
      - elasticsearch
    restart: unless-stopped

配置 logstash pipeline#

配置 input#

因為日誌要從 kafka 讀取,所以要在 input 塊內聲明一個新的數據源

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka {
		bootstrap_servers => "kafka 地址"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

配置過濾器#

由於騰訊雲投遞到 kafka 的數據不是可以直接用的,數據示例如下

{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}

這時候就要對數據進行預處理,因為有很多數據是不需要添加進 es 裡面的。使用了 logstags 的 filter 進行處理。

保存原始數據#

這個過濾器添加了一個叫做 source 的字段,並且將 kafka 消息全部存儲在裡面

	mutate {
		add_field => { "source" => "%{[message]}" }
  	}

解析 json#

使用 json 插件,對 message 字段的 json 字符串進行序列化,並且將序列化後的數據存儲在 kafkajson 字段內,同時刪除 message 字段

	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}

提取日誌元數據#

將日誌中的一些定位數據提取到外層

	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}

使用正則表達式處理數據#

騰訊雲給出的數據會在服務自己打印的日誌前方添加時間以及 ”stdout F“,所以要使用正則表達式將我們不需要的東西做下替換

2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}

提取服務打印的日誌#

這回終於可以將日誌提取出來了

	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}

提取日誌等級#

最後一步,將日誌的等級字段提取到外層,方便使用

	mutate {
    	rename => { "[data][level]" => "level" }
  	}

完整 pipeline#

input {
	beats {
		port => 5044
	}

	tcp {
		port => 50000
	}

	kafka{
		bootstrap_servers => "kafka 地址"
		client_id => "test_logstash"
		group_id => "test_logstash"
		auto_offset_reset => "latest" 
		consumer_threads => 1
		decorate_events => true 
		topics => ["tencent-tke-log"]
		type => "bhy"
	}
}

filter {
	mutate {
		add_field => { "source" => "%{[message]}" }
  	}
	json {
		source => "message"
		remove_field => [ 
			"message"
		]
		target => "kafkajson"
	}
	mutate {
    	rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
    	rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
    	rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
    	rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
  	}
	mutate {
		gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
	}
	json {
		source => "[kafkajson][log]"
		remove_field => [ 
			"kafkajson"
		]
		target => "data"
	}
	mutate {
    	rename => { "[data][level]" => "level" }
  	}
}

## Add your filters / logstash plugins configuration here

output {
	elasticsearch {
		hosts => "elasticsearch:9200"
		user => "logstash_internal"
		password => "${LOGSTASH_INTERNAL_PASSWORD}"
		index => "logstash-%{+YYYY-MM-dd}"
	}
}

特別鳴謝#

在本次配置 logstash 的過程中,我自己搭建的、使用 openAPI API 的、基於 GPT3.5 語言模型的 telegram bot 給予了我很大的幫助。

以下是我與它的對話 (對話內容經過處理)

johnpoint, [5/15/23 2:18 PM]
我想從 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} 這些數據裡面使用logstash 提取出json數據,logstash 的pipeline該怎麼寫

openAI, [5/15/23 2:18 PM]
假設你的日誌文件是以JSON格式輸出的,你需要配置一個JSON解析器來解析日誌中的JSON數據,並將其轉換為可用的事件。以下是一個簡單的Logstash配置文件:

```
input {
  file {
    path => "/path/to/your/logfile"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
      codec => rubydebug
  }
}
```

在logstash的input部分,我們指定讀取的文件的路徑,以及從文件的開始位置開始讀入數據。
在filter部分,我們使用`json`插件從日誌中提取json對象。
最後,我們在output部分使用`rubydebug`進行輸出。

運行這個Logstash的配置命令:

```
bin/logstash -f /path/to/your/config/file
```

這樣,logstash就會從你指定的文件中讀取數據,並把提取的JSON對象輸出到控制台上。

johnpoint, [5/15/23 2:19 PM]
這個數據有json格式的數據,但是在json格式的數據前面還有一些髒數據,是否可以去除?

openAI, [5/15/23 2:19 PM]
可以,在使用 logstash 分析數據的時候,可以使用 grok 這個插件,通過正則表達式匹配出需要的數據。

以下是一個可能符合你提供的數據的 grok 表達式:

```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```

這個正則表達式的含義是匹配以 ISO8601 格式表示的時間戳(如 "2023-05-15T13:51:00.004687384+08:00")和以字符串 "stdout F " 結尾的內容,然後跟著一個 Json 格式的字符串。"%{GREEDYDATA:json_data}" 表示將這個字符串保存到一個名為 "json_data" 的字段中。

上述 grok 表達式可以在 Logstash 的配置文件中使用,例如:

```
input {
  # 配置 logstash 接收數據的源
}

filter {
  grok {
    match => {
      "message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
    }
  }
  # 其他過濾器
}

output {
  # 配置 logstash 輸出數據的目標
}
```

這個配置文件中,通過 filter 下的 grok 插件匹配出了 timestamp 和 json_data 兩個字段。從而可以使用 logstash 利用這些數據進行更進一步的操作,如輸出到 Elasticsearch 或者 stdout。達到你想要的結果。

其中 `%{TIMESTAMP_ISO8601:timestamp}` 表示匹配 ISO8601 格式的時間戳,並將其保存到一個名為 "timestamp" 的字段中。`%{GREEDYDATA:json_data}` 表示匹配任意字符,可以匹配行末附加的不符合 Json 格式的內容。因為 grok 會一直匹配到行尾,所以匹配過程中會包括最後一個字段(即 json_data)中包含的所有內容。

johnpoint, [5/15/23 2:36 PM]
返回 Can't get text on a START_OBJECT at 1:8 是什麼錯誤

openAI, [5/15/23 2:36 PM]
這個錯誤常見於嘗試將 JSON 對象作為字符串進行解析的情況下,但是實際上得到的是 JSON 對象,而不是字符串。這意味著該對象的開頭是一個“{”,而不是一個“"”。可以檢查代碼調用的函數或方法,以確保解析的字符串確實是一個字符串。另外,可以檢查 JSON 對象的源代碼,以確保它是按照預期格式進行編寫的。

johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json    ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
這行日誌表示 logstash 有什麼問題呢

openAI, [5/15/23 2:53 PM]
這行日誌表示 Logstash 在解析輸入數據 json_data 時遇到了編碼問題,其中輸入數據包含了 `\'` 等不符合 JSON 規範的字符,導致 JSON 解析器無法正確解析。具體來說,可能是輸入數據中使用了反斜線對引號進行轉義,而又沒有正確地對反斜線本身進行轉義。可以嘗試對輸入數據進行預處理,將不符合 JSON 規範的字符進行轉義或刪除,以便讓 Logstash 能夠正確解析數據。

johnpoint, [5/15/23 2:53 PM]
可以給出相對應 的 pipeline 配置嗎

openAI, [5/15/23 2:53 PM]
好的,以下是一個簡單的 Logstash Pipeline 配置,用於從輸入中提取 `json_data` 字段,並過濾掉任何無效數據:

```conf
input {
  # 在這裡配置輸入插件
  stdin {}
}

filter {
  # 從 json_data 字段提取 JSON 數據
  json {
    source => "json_data"
    # 用於忽略不完整的 JSON 數據字符串
    skip_on_invalid_json => true
  }

  #過濾掉沒有有效 JSON 數據的事件
  if "_jsonparsefailure" in [tags] {
    drop {}
  }
}

output {
  # 在這裡配置輸出插件
  stdout {}
}
```

您需要根據自己的需求進行相應的修改和調整,比如將 `json_data` 改成實際的字段名,或者使用適合您的輸入和輸出插件。

johnpoint, [5/15/23 2:54 PM]
使用grok提取的json數據會多出不必要的轉義反斜杠

openAI, [5/15/23 2:54 PM]
是的,這是因為 Grok 模式匹配時會將反斜杠視為轉義字符,因此如果日誌中原本就有反斜杠,會被額外轉義。為了解決這個問題,可以在配置文件中使用 gsub 過濾器去除多餘的反斜杠。示例配置如下:

```
input {
  stdin { }
}

filter {
  grok {
    match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
  }
  mutate {
    gsub => ["json_data", "\\\\", "\\"]
  }
  json {
    source => "json_data"
  }
}

output {
  stdout { codec => rubydebug }
}
```

這個配置中首先使用 Grok 解析出日誌中的 JSON 數據,並將其存儲到 `json_data` 字段中。然後使用 `mutate` 過濾器中的 `gsub` 將多餘的反斜杠替換掉,最後使用 `json` 過濾器將其轉換成 JSON 格式的事件。這樣就可以避免多出不必要的反斜杠了。

johnpoint, [5/15/23 3:00 PM]
報錯了 Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]

openAI, [5/15/23 3:00 PM]
這個錯誤是在 Logstash 配置文件中,第36行,第34列(672個字節)處,期望看到 `[ \t\r\n]", "#", "{", ",", "]"` 中的一个字符,但是實際遇到的字符無法按預期解析。這可能是因為語法錯誤或表達式不正確導致的。請檢查您的配置文件並確保語法正確,並解決任何表達式錯誤。

另外,由於錯誤信息無法提供足夠的上下文,您還可能需要檢查其他部分的配置文件以確定是否存在其他錯誤。

johnpoint, [5/15/23 3:15 PM]
logstash 可以將一個字段的數據移動到另一個字段裡面去嗎

openAI, [5/15/23 3:15 PM]
可以的,可以使用logstash的filter插件中的mutate插件的rename選項來實現.

例如,如果數據來源的字段名為`source_field`,想要將其複製到目標字段`target_field`,可以使用以下配置:

```
filter {
  mutate {
    rename => { "source_field" => "target_field" }
  }
}
```

這將把`source_field`的值複製到新字段`target_field`中,並刪除`source_field`字段本身。

johnpoint, [5/15/23 3:19 PM]
如果我要保留這個字段呢

openAI, [5/15/23 3:19 PM]
可以使用 mutate 插件的 add_field 選項,用於在事件中創建新字段。

比如,如果您想要在 Logstash 事件中創建一個新字段 "new_field" 並從原始字段 "old_field" 中移動其值:

```
filter {
   mutate {
     add_field => { "new_field" => "%{[old_field]}" }
     remove_field => [ "old_field" ]
   }
}
```

這將將 "old_field" 中的值賦給新的 "new_field" 字段,並將刪除 "old_field" 字段。如果您想在保留源字段的同時移動值,可以將 "remove_field" 選項刪除。

johnpoint, [5/15/23 3:21 PM]
logstash 刪除自動創建 keyword 字段

openAI, [5/15/23 3:21 PM]
Logstash 中默認會為每個字符串類型的字段創建一個 `keyword` 類型的子字段,作為原始字段的精確值。如果你不需要這個子字段,可以在索引模板(template)中將它移除:在 `mappings` 中設置 `"include_in_all": false` 即可。但需要注意的是,使用該方法刪除 `keyword` 字段會導致失去部分精確查詢的能力。

如果你想要在日誌流中保留 `keyword` 字段,可以使用 `remove_field` 過濾器來刪除原始字段,但保留 `keyword` 字段: 

```ruby
filter {
  mutate {
    remove_field => ["my_field"]
    rename => { "my_field.keyword" => "new_field" }
  }
}
```

上面的例子中,Logstash 會刪除原始的 `my_field` 字段,並將精確值保存在 `my_field.keyword` 子字段中,最後使用 `rename` 過濾器,將 `my_field.keyword` 重命名為 `new_field`。這樣,你就可以在 Elasticsearch 中使用 `new_field` 進行精確查詢。

johnpoint, [5/15/23 4:18 PM]
logstash 為什麼創建的 es index 沒有每天滾動的名字

openAI, [5/15/23 4:18 PM]
logstash 創建 ES index 默認採用的是 Daily Index 的模式,每天一個 Index,並且附帶日期的後綴。例如,一個名為 `logstash` 的索引將被轉換成 `logstash-yyyy.mm.dd` 的名字,在寫入數據時,會根據 UTC 時區來決定對應日期的具體值。如果沒有滾動,一種可能是配置文件中未啟用,還有一個可能是 Elasticsearch 集群配置文件設置了禁止創建新 index,可以檢查一下 Elasticsearch 的配置文件中的相關設置。

johnpoint, [5/15/23 4:19 PM]
logstash 怎麼開啟 Daily Index 模式

openAI, [5/15/23 4:19 PM]
要開啟 Daily Index 模式,需要修改 Logstash 的配置文件。在 `output` 配置中,可以使用 `index` 配置項來指定 Elasticsearch 索引的名稱。為了讓索引名稱按日滾動,可使用日期格式佔位符,例如:

```
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "your-index-name-%{+YYYY.MM.dd}"
  }
}
```

在上述配置中,索引名稱的格式為 `your-index-name-YYYY.MM.dd`,其中 `%{+YYYY.MM.dd}` 是一個日期格式佔位符,表示當前日期。這樣配置後,每天會自動創建一個新的索引。

還需注意的是,為了啟用 Date Index 模式,需要在 Elasticsearch 的配置文件中將 `action.auto_create_index` 設置為 `true`。

參考文檔#

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。