前提#
久しぶりにブログを更新しました。最近 logstash をいじっていたので、記録しておきます。
なぜ logstash を使うのかというと、実はテスト環境で Tencent Cloud TKE のログが収集されていなかったため、問題を調査する際に非常に苦労しました。ちょうど時間ができたので、ログを抽出して ES に入れ、今後の問題調査を楽にしようと思いました。Tencent Cloud のログルールでは、pod の stdout ログを収集して kafka に送信することが許可されていることを知り、ちょっと試してみました。
logstash のデプロイ#
logstash は docker-compose を使用して迅速にデプロイすることにしました。
以下はデプロイの手順で、deviantony/docker-elkプロジェクトを参考にしています。
ディレクトリの作成#
mkdir logstash/config logstash/pipeline -p
環境変数の作成#
パス .env
ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'
Dockerfile の作成#
パス logstasg/Dockerfile
ARG ELASTIC_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}
設定ファイル#
パス logstash/config/logstash.yml
---
## LogstashベースイメージからのデフォルトのLogstash設定。
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0
node.name: logstash
パス logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 50000
}
}
## フィルター / logstashプラグインの設定をここに追加
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
サービスの起動#
version: '3.7'
services:
logstash:
build:
context: logstash/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- 5044:5044
- 50000:50000/tcp
- 50000:50000/udp
- 9600:9600
environment:
LS_JAVA_OPTS: -Xms256m -Xmx256m
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
depends_on:
- elasticsearch
restart: unless-stopped
logstash パイプラインの設定#
input の設定#
ログを kafka から読み取る必要があるため、input ブロック内に新しいデータソースを宣言する必要があります。
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka {
bootstrap_servers => "kafkaのアドレス"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
フィルターの設定#
Tencent Cloud から kafka に送信されたデータはそのまま使用できないため、データの前処理が必要です。データの例は以下の通りです。
{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}
この時点でデータを前処理する必要があります。多くのデータは ES に追加する必要がありません。logstags のフィルターを使用して処理します。
元のデータを保存#
このフィルターは、source というフィールドを追加し、kafka メッセージをすべてその中に保存します。
mutate {
add_field => { "source" => "%{[message]}" }
}
JSON の解析#
json プラグインを使用して、message フィールドの JSON 文字列をシリアル化し、シリアル化されたデータを kafkajson フィールドに保存し、message フィールドを削除します。
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
ログのメタデータを抽出#
ログ内のいくつかの位置データを外側に抽出します。
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
正規表現を使用してデータを処理#
Tencent Cloud から提供されたデータは、サービスが自分で出力したログの前に時間と "stdout F" を追加するため、正規表現を使用して不要な部分を置き換えます。
2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
サービスが出力したログを抽出#
これでログを抽出できるようになりました。
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
ログレベルを抽出#
最後のステップとして、ログのレベルフィールドを外側に抽出し、使用しやすくします。
mutate {
rename => { "[data][level]" => "level" }
}
完全なパイプライン#
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka{
bootstrap_servers => "kafkaのアドレス"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
filter {
mutate {
add_field => { "source" => "%{[message]}" }
}
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
mutate {
rename => { "[data][level]" => "level" }
}
}
## フィルター / logstashプラグインの設定をここに追加
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
特別な感謝#
今回の logstash の設定プロセスで、私が構築した、openAPI API を使用した、GPT3.5 言語モデルに基づく telegram ボットが大いに助けてくれました。
以下は私とそのボットの対話(対話内容は加工されています)。
johnpoint, [5/15/23 2:18 PM]
2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"} からjsonデータをlogstashで抽出したいのですが、logstashのパイプラインはどう書けばいいですか?
openAI, [5/15/23 2:18 PM]
あなたのログファイルがJSON形式で出力されていると仮定すると、ログ内のJSONデータを解析し、使用可能なイベントに変換するためにJSONパーサーを設定する必要があります。以下はシンプルなLogstash設定ファイルです:
```
input {
file {
path => "/path/to/your/logfile"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
```
logstashのinput部分では、読み取るファイルのパスを指定し、ファイルの開始位置からデータを読み込むようにします。
filter部分では、`json`プラグインを使用してログからJSONオブジェクトを抽出します。
最後に、output部分で`rubydebug`を使用して出力します。
このLogstashの設定コマンドを実行します:
```
bin/logstash -f /path/to/your/config/file
```
これにより、logstashは指定したファイルからデータを読み取り、抽出したJSONオブジェクトをコンソールに出力します。
johnpoint, [5/15/23 2:19 PM]
このデータにはjson形式のデータがありますが、json形式のデータの前にいくつかのゴミデータがあります。取り除くことはできますか?
openAI, [5/15/23 2:19 PM]
はい、logstashでデータを分析する際に、grokプラグインを使用して正規表現で必要なデータをマッチさせることができます。
以下は、あなたが提供したデータに合致する可能性のあるgrok表現です:
```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```
この正規表現の意味は、ISO8601形式で表されたタイムスタンプ(例:"2023-05-15T13:51:00.004687384+08:00")と、文字列"stdout F"で終わる内容をマッチさせ、その後にJSON形式の文字列が続くことを示しています。"%{GREEDYDATA:json_data}"は、この文字列を"json_data"という名前のフィールドに保存することを意味します。
上記のgrok表現はLogstashの設定ファイルで使用できます。例えば:
```
input {
# logstashがデータを受信するソースを設定
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
}
}
# 他のフィルター
}
output {
# logstashがデータを出力するターゲットを設定
}
```
この設定ファイルでは、filterの下のgrokプラグインを使用してtimestampとjson_dataの2つのフィールドをマッチさせました。これにより、logstashはこれらのデータを使用してさらに操作を行うことができ、Elasticsearchやstdoutに出力することができます。あなたが望む結果を得ることができます。
その中で、`%{TIMESTAMP_ISO8601:timestamp}`はISO8601形式のタイムスタンプをマッチさせ、それを"timestamp"という名前のフィールドに保存します。`%{GREEDYDATA:json_data}`は任意の文字をマッチさせ、行末に付加されたJSON形式に合致しない内容をマッチさせます。grokは行末までマッチし続けるため、マッチング中に最後のフィールド(すなわちjson_data)に含まれるすべての内容が含まれます。
johnpoint, [5/15/23 2:36 PM]
Can't get text on a START_OBJECT at 1:8というエラーは何ですか?
openAI, [5/15/23 2:36 PM]
このエラーは、JSONオブジェクトを文字列として解析しようとした場合によく見られますが、実際には文字列ではなくJSONオブジェクトを取得していることを意味します。これは、そのオブジェクトの始まりが"{"であり、"で始まることを期待していたためです。コードで呼び出している関数やメソッドを確認し、解析している文字列が確かに文字列であることを確認してください。また、JSONオブジェクトのソースコードを確認し、期待される形式で書かれていることを確認してください。
johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
このログはlogstashに何か問題があることを示していますか?
openAI, [5/15/23 2:53 PM]
このログは、Logstashが入力データjson_dataを解析する際にエンコーディングの問題に直面していることを示しています。入力データには`\'`など、JSON規格に合わない文字が含まれており、JSONパーサーが正しく解析できません。具体的には、入力データで引用符が反斜線でエスケープされているが、反斜線自体が正しくエスケープされていない可能性があります。入力データを前処理して、JSON規格に合わない文字をエスケープまたは削除し、Logstashがデータを正しく解析できるようにすることをお勧めします。
johnpoint, [5/15/23 2:53 PM]
対応するパイプライン設定を教えてもらえますか?
openAI, [5/15/23 2:53 PM]
もちろん、以下は入力から`json_data`フィールドを抽出し、無効なデータをフィルタリングするためのシンプルなLogstashパイプライン設定です:
```conf
input {
# ここで入力プラグインを設定
stdin {}
}
filter {
# json_dataフィールドからJSONデータを抽出
json {
source => "json_data"
# 不完全なJSONデータ文字列を無視するため
skip_on_invalid_json => true
}
# 有効なJSONデータがないイベントをフィルタリング
if "_jsonparsefailure" in [tags] {
drop {}
}
}
output {
# ここで出力プラグインを設定
stdout {}
}
```
必要に応じて、`json_data`を実際のフィールド名に変更したり、適切な入力および出力プラグインを使用するように調整してください。
johnpoint, [5/15/23 2:54 PM]
grokで抽出したjsonデータには不要なエスケープ反斜線が多くなります。
openAI, [5/15/23 2:54 PM]
はい、これはGrokパターンマッチング時に反斜線がエスケープ文字として扱われるため、ログに元々反斜線が含まれている場合、追加でエスケープされることになります。この問題を解決するために、設定ファイルでgsubフィルターを使用して余分な反斜線を取り除くことができます。以下はその例です:
```
input {
stdin { }
}
filter {
grok {
match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
}
mutate {
gsub => ["json_data", "\\\\", "\\"]
}
json {
source => "json_data"
}
}
output {
stdout { codec => rubydebug }
}
```
この設定では、最初にGrokを使用してログ内のJSONデータを解析し、`json_data`フィールドに保存します。次に、`mutate`フィルターの`gsub`を使用して余分な反斜線を置き換え、最後に`json`フィルターを使用してJSON形式のイベントに変換します。これにより、不要な反斜線が追加されるのを防ぐことができます。
johnpoint, [5/15/23 3:00 PM]
エラーが発生しました Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]}
openAI, [5/15/23 3:00 PM]
このエラーは、Logstash設定ファイルの36行目、34列目(672バイト)で、`[ \t\r\n], "#", "{", ",", "]"`のいずれかの文字が期待されていましたが、実際には期待通りに解析できない文字に遭遇したことを示しています。これは構文エラーまたは表現が不正であることが原因です。設定ファイルを確認し、構文が正しいことを確認し、表現のエラーを解決してください。
また、エラーメッセージは十分なコンテキストを提供しないため、設定ファイルの他の部分にも他のエラーがないか確認する必要があります。
johnpoint, [5/15/23 3:15 PM]
logstashはフィールドのデータを別のフィールドに移動できますか?
openAI, [5/15/23 3:15 PM]
はい、logstashのフィルタープラグインのmutateプラグインのrenameオプションを使用して実現できます。
例えば、データソースのフィールド名が`source_field`で、その値をターゲットフィールド`target_field`にコピーしたい場合、以下の設定を使用できます:
```
filter {
mutate {
rename => { "source_field" => "target_field" }
}
}
```
これにより、`source_field`の値が新しいフィールド`target_field`にコピーされ、`source_field`フィールド自体は削除されます。
johnpoint, [5/15/23 3:19 PM]
このフィールドを保持したい場合はどうすればいいですか?
openAI, [5/15/23 3:19 PM]
mutateプラグインのadd_fieldオプションを使用して、イベント内に新しいフィールドを作成できます。
例えば、Logstashイベント内に新しいフィールド"new_field"を作成し、元のフィールド"old_field"からその値を移動したい場合:
```
filter {
mutate {
add_field => { "new_field" => "%{[old_field]}" }
remove_field => [ "old_field" ]
}
}
```
これにより、"old_field"の値が新しい"new_field"フィールドに割り当てられ、"old_field"フィールドは削除されます。元のフィールドを保持したまま値を移動したい場合は、`remove_field`オプションを削除してください。
johnpoint, [5/15/23 3:21 PM]
logstashは自動的に作成されたkeywordフィールドを削除します。
openAI, [5/15/23 3:21 PM]
Logstashでは、デフォルトで各文字列型フィールドに対して`keyword`型のサブフィールドが作成され、元のフィールドの正確な値として使用されます。このサブフィールドが不要な場合は、インデックステンプレート(template)で`"include_in_all": false`を設定することで削除できます。ただし、この方法で`keyword`フィールドを削除すると、部分的な正確なクエリの能力を失うことに注意してください。
ログストリーム内で`keyword`フィールドを保持したい場合は、`remove_field`フィルターを使用して元のフィールドを削除し、`keyword`フィールドを保持することができます:
```ruby
filter {
mutate {
remove_field => ["my_field"]
rename => { "my_field.keyword" => "new_field" }
}
}
```
上記の例では、Logstashは元の`my_field`フィールドを削除し、正確な値を`my_field.keyword`サブフィールドに保存し、最後に`rename`フィルターを使用して`my_field.keyword`を`new_field`に名前変更します。これにより、Elasticsearchで`new_field`を使用して正確なクエリを行うことができます。
johnpoint, [5/15/23 4:18 PM]
logstashが作成したesインデックスが毎日ロールオーバーする名前になっていません。
openAI, [5/15/23 4:18 PM]
logstashがESインデックスを作成する際、デフォルトではDaily Indexモードを採用しており、毎日1つのインデックスを作成し、日付のサフィックスが付加されます。例えば、`logstash`という名前のインデックスは`logstash-yyyy.mm.dd`という名前に変換され、データを書き込む際にはUTCタイムゾーンに基づいて対応する日付の具体的な値が決まります。ロールオーバーが行われない場合、設定ファイルで無効になっている可能性があり、またはElasticsearchクラスターの設定ファイルで新しいインデックスの作成が禁止されている可能性があります。Elasticsearchの設定ファイルの関連設定を確認してください。
johnpoint, [5/15/23 4:19 PM]
logstashはDaily Indexモードをどのように有効にしますか?
openAI, [5/15/23 4:19 PM]
Daily Indexモードを有効にするには、Logstashの設定ファイルを変更する必要があります。`output`設定内で、インデックスの名前を指定するために`index`設定項目を使用します。インデックス名を日付ごとにロールオーバーさせるためには、日付形式のプレースホルダーを使用します。例えば:
```
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "your-index-name-%{+YYYY.MM.dd}"
}
}
```
上記の設定では、インデックス名の形式は`your-index-name-YYYY.MM.dd`であり、`%{+YYYY.MM.dd}`は現在の日付を示す日付形式のプレースホルダーです。このように設定すると、毎日自動的に新しいインデックスが作成されます。
また、Daily Indexモードを有効にするためには、Elasticsearchの設定ファイルで`action.auto_create_index`を`true`に設定する必要があります。