Prerequisites#
It's been a while since I last updated my blog, and recently I've been experimenting with Logstash, so I thought I'd document it.
Why use Logstash? It's because the log collection for Tencent Cloud TKE is not enabled in the testing environment, making troubleshooting quite painful. Since I had some free time, I thought about extracting the logs and putting them into Elasticsearch for easier troubleshooting in the future. I noticed that Tencent Cloud's logging rules allow collecting pod stdout logs and delivering them to Kafka, so I gave it a try.
Deploying Logstash#
I chose to use Docker Compose for a quick deployment of Logstash.
Here is the deployment process, referenced from the deviantony/docker-elk project.
Create Directories#
mkdir logstash/config logstash/pipeline -p
Create Environment Variables#
Path: .env
ELASTIC_VERSION=8.7.1
LOGSTASH_INTERNAL_PASSWORD='changeme'
Create Dockerfile#
Path: logstash/Dockerfile
ARG ELASTIC_VERSION
# https://www.docker.elastic.co/
FROM docker.elastic.co/logstash/logstash:${ELASTIC_VERSION}
Configuration File#
Path: logstash/config/logstash.yml
---
## Default Logstash configuration from Logstash base image.
## https://github.com/elastic/logstash/blob/main/docker/data/logstash/config/logstash-full.yml
#
http.host: 0.0.0.0
node.name: logstash
Path: logstash/pipeline/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 50000
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
Start the Service#
version: '3.7'
services:
logstash:
build:
context: logstash/
args:
ELASTIC_VERSION: ${ELASTIC_VERSION}
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro,Z
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro,Z
ports:
- 5044:5044
- 50000:50000/tcp
- 50000:50000/udp
- 9600:9600
environment:
LS_JAVA_OPTS: -Xms256m -Xmx256m
LOGSTASH_INTERNAL_PASSWORD: ${LOGSTASH_INTERNAL_PASSWORD:-}
depends_on:
- elasticsearch
restart: unless-stopped
Configure Logstash Pipeline#
Configure Input#
Since the logs need to be read from Kafka, a new data source must be declared in the input block.
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka {
bootstrap_servers => "kafka address"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
Configure Filters#
Since the data delivered to Kafka by Tencent Cloud is not directly usable, the data sample is as follows:
{"@timestamp":1684203000.007603,"@filepath":"/var/log/tke-log-agent/log/stdout-containerd/xxxxxxx.log","log":"2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxxxxx\"}","kubernetes":{"pod_name":"xxxxx","namespace_name":"default","pod_id":"xxxxxx","labels":{"k8s-app":"xxxxx","pod-template-hash":"xxxxx","qcloud-app":"xxxxxxxx"},"annotations":{"qcloud-redeploy-timestamp":"1681975158658","tke.cloud.tencent.com/networks-status":"json"},"host":"xxxxx","container_name":"xxxx","docker_id":"xxxxx","container_hash":"xxxxxx","container_image":"xxxxxxxx"}}
At this point, preprocessing of the data is necessary, as many data points do not need to be added to Elasticsearch. The Logstash filter is used for processing.
Save Original Data#
This filter adds a field called source and stores the entire Kafka message in it.
mutate {
add_field => { "source" => "%{[message]}" }
}
Parse JSON#
Using the JSON plugin, the JSON string in the message field is serialized, and the serialized data is stored in the kafkajson field, while the message field is deleted.
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
Extract Log Metadata#
Extract some location data from the logs to the outer layer.
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
Use Regular Expressions to Process Data#
The data provided by Tencent Cloud adds a timestamp and "stdout F" before the service's own printed logs, so regular expressions need to be used to replace unnecessary parts.
2023-05-16T10:10:00.002817673+08:00 stdout F {\"level\":\"debug\",\"time\":\"2023-05-16T10:10:00.002+0800\",\"msg\":\"xxxxx\"}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
Extract Service Printed Logs#
Now we can finally extract the logs.
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
Extract Log Level#
The final step is to extract the log level field to the outer layer for easier use.
mutate {
rename => { "[data][level]" => "level" }
}
Complete Pipeline#
input {
beats {
port => 5044
}
tcp {
port => 50000
}
kafka{
bootstrap_servers => "kafka address"
client_id => "test_logstash"
group_id => "test_logstash"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["tencent-tke-log"]
type => "bhy"
}
}
filter {
mutate {
add_field => { "source" => "%{[message]}" }
}
json {
source => "message"
remove_field => [
"message"
]
target => "kafkajson"
}
mutate {
rename => { "[kafkajson][kubernetes][namespace_name]" => "namespace" }
rename => { "[kafkajson][kubernetes][pod_name]" => "podname" }
rename => { "[kafkajson][kubernetes][labels][k8s-app]" => "k8s-app" }
rename => { "[kafkajson][kubernetes][container_image]" => "container-image" }
}
mutate {
gsub => [ "[kafkajson][log]",".+stdout\sF\s","" ]
}
json {
source => "[kafkajson][log]"
remove_field => [
"kafkajson"
]
target => "data"
}
mutate {
rename => { "[data][level]" => "level" }
}
}
## Add your filters / logstash plugins configuration here
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "logstash_internal"
password => "${LOGSTASH_INTERNAL_PASSWORD}"
index => "logstash-%{+YYYY-MM-dd}"
}
}
Special Thanks#
During the configuration of Logstash, my self-built Telegram bot, which uses the OpenAPI API and is based on the GPT-3.5 language model, provided me with great help.
Here is a conversation I had with it (the content has been processed):
johnpoint, [5/15/23 2:18 PM]
I want to extract JSON data using Logstash from the following data: 2023-05-15T13:51:00.004687384+08:00 stdout F {"level":"debug","time":"2023-05-15T13:51:00.002+0800","msg":"xxxxxxxx"}. How should the Logstash pipeline be written?
openAI, [5/15/23 2:18 PM]
Assuming your log file outputs in JSON format, you need to configure a JSON parser to parse the JSON data in the logs and convert it into usable events. Here is a simple Logstash configuration file:
```
input {
file {
path => "/path/to/your/logfile"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
```
In the input section of Logstash, we specify the path of the file to read and start reading data from the beginning of the file.
In the filter section, we use the `json` plugin to extract the JSON object from the logs.
Finally, we use `rubydebug` in the output section for output.
Run this Logstash configuration command:
```
bin/logstash -f /path/to/your/config/file
```
This way, Logstash will read data from the specified file and output the extracted JSON objects to the console.
johnpoint, [5/15/23 2:19 PM]
This data has JSON format data, but there is some dirty data before the JSON format data. Can it be removed?
openAI, [5/15/23 2:19 PM]
Yes, when analyzing data with Logstash, you can use the Grok plugin to match the required data using regular expressions.
Here is a possible Grok expression that matches the data you provided:
```
%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}
```
This regular expression means to match a timestamp represented in ISO8601 format (like "2023-05-15T13:51:00.004687384+08:00") followed by the string "stdout F" and then a JSON formatted string. "%{GREEDYDATA:json_data}" indicates that this string will be saved into a field named "json_data".
The above Grok expression can be used in the Logstash configuration file, for example:
```
input {
# Configure the source from which Logstash receives data
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}"
}
}
# Other filters
}
output {
# Configure the target to which Logstash outputs data
}
```
In this configuration file, the Grok plugin under the filter section matches the timestamp and json_data fields. Thus, Logstash can use this data for further operations, such as outputting to Elasticsearch or stdout, achieving the desired result.
Where `%{TIMESTAMP_ISO8601:timestamp}` indicates matching a timestamp in ISO8601 format and saving it into a field named "timestamp". `%{GREEDYDATA:json_data}` indicates matching any character, which can match the additional content at the end of the line that does not conform to JSON format. Since Grok will continue to match until the end of the line, it will include all content in the last field (i.e., json_data).
johnpoint, [5/15/23 2:36 PM]
What does the error "Can't get text on a START_OBJECT at 1:8" mean?
openAI, [5/15/23 2:36 PM]
This error commonly occurs when trying to parse a JSON object as a string, but what is actually received is a JSON object instead of a string. This means that the beginning of the object is a "{", not a "\". You can check the function or method that calls the code to ensure that the string being parsed is indeed a string. Additionally, check the source code of the JSON object to ensure it is formatted as expected.
johnpoint, [5/15/23 2:53 PM]
[2023-05-15T06:52:48,511][WARN ][logstash.filters.json ][main][xxxxxx] Error parsing json {:source=>"json_data", :raw=>"{\\\"level\\\":\\\"debug\\\",\\\"time\\\":\\\"2023-05-15T14:52:47.664+0800\\\",\\\"msg\\\":\\\"xxxxxxxx\\\"}\", \"@timestamp\"=>0.1684133567674155e10}", :exception=>#<LogStash::Json::ParserError: Unexpected character ('\' (code 92)): was expecting double-quote to start field name
What does this log indicate about Logstash?
openAI, [5/15/23 2:53 PM]
This log indicates that Logstash encountered a parsing issue with the input data json_data, where the input data contains characters like `\'` that do not conform to JSON standards, causing the JSON parser to fail to parse correctly. Specifically, it may be that the input data used backslashes to escape quotes but did not correctly escape the backslashes themselves. You can try preprocessing the input data to escape or remove non-conforming characters so that Logstash can parse the data correctly.
johnpoint, [5/15/23 2:53 PM]
Can you provide the corresponding pipeline configuration?
openAI, [5/15/23 2:53 PM]
Sure, here is a simple Logstash Pipeline configuration to extract the `json_data` field from the input and filter out any invalid data:
```conf
input {
# Configure the input plugin here
stdin {}
}
filter {
# Extract JSON data from the json_data field
json {
source => "json_data"
# Used to ignore incomplete JSON data strings
skip_on_invalid_json => true
}
# Filter out events without valid JSON data
if "_jsonparsefailure" in [tags] {
drop {}
}
}
output {
# Configure the output plugin here
stdout {}
}
```
You will need to modify and adjust according to your needs, such as changing `json_data` to the actual field name or using input and output plugins that suit you.
johnpoint, [5/15/23 2:54 PM]
The JSON data extracted using Grok has unnecessary escaped backslashes.
openAI, [5/15/23 2:54 PM]
Yes, this is because the Grok pattern treats backslashes as escape characters, so if the log already has backslashes, they will be escaped additionally. To solve this issue, you can use the gsub filter in the configuration file to remove the extra backslashes. Here’s an example configuration:
```
input {
stdin { }
}
filter {
grok {
match => { "message" => '^\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{WORD:log_level}\] %{GREEDYDATA:json_data}$' }
}
mutate {
gsub => ["json_data", "\\\\", "\\"]
}
json {
source => "json_data"
}
}
output {
stdout { codec => rubydebug }
}
```
In this configuration, we first use Grok to parse the JSON data in the logs and store it in the `json_data` field. Then we use the `mutate` filter with `gsub` to replace the extra backslashes, and finally, we use the `json` filter to convert it into JSON formatted events. This way, we can avoid unnecessary extra backslashes.
johnpoint, [5/15/23 3:00 PM]
Error: Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\", \",\", \"]\" at line 36, column 34 (byte 672) after filter {\n\tjson {\n\t\tsource => \"message\"\n\t\tremove_field => [ \n\t\t\t\"message\"\n\t\t]\n\t\ttarget => \"kafkajson\"\n\t}\n\tgrok {\n\t\tmatch => {\"kafkajson\" => \"%{TIMESTAMP_ISO8601:timestamp} stdout F %{GREEDYDATA:json_data}\"}\n\t}\n\tmutate {\n\t\tgsub => [\"json_data\", \"\\\\\\\\\", \"", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:187:in `initialize'", "org/logstash/execution/JavaBasePipelineExt.java:72:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:52:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:383:in `block in converge_state'"]}
openAI, [5/15/23 3:00 PM]
This error occurs in the Logstash configuration file at line 36, column 34 (672 bytes), where it expected to see one of the characters `[ \t\r\n], "#", "{", ",", "]"` but encountered a character that could not be parsed as expected. This may be due to a syntax error or an incorrect expression. Please check your configuration file and ensure the syntax is correct and resolve any expression errors.
Additionally, since the error message does not provide enough context, you may also need to check other parts of the configuration file to determine if there are other errors.
johnpoint, [5/15/23 3:15 PM]
Can Logstash move data from one field to another?
openAI, [5/15/23 3:15 PM]
Yes, you can use the mutate plugin in Logstash's filter section with the rename option to achieve this.
For example, if the source field name is `source_field` and you want to copy it to the target field `target_field`, you can use the following configuration:
```
filter {
mutate {
rename => { "source_field" => "target_field" }
}
}
```
This will copy the value of `source_field` into the new field `target_field` and remove the `source_field` itself.
johnpoint, [5/15/23 3:19 PM]
What if I want to keep this field?
openAI, [5/15/23 3:19 PM]
You can use the add_field option of the mutate plugin to create a new field in the event.
For example, if you want to create a new field "new_field" in the Logstash event and move its value from the original field "old_field":
```
filter {
mutate {
add_field => { "new_field" => "%{[old_field]}" }
remove_field => [ "old_field" ]
}
}
```
This will assign the value from "old_field" to the new "new_field" field and remove the "old_field" field. If you want to keep the source field while moving the value, you can remove the remove_field option.
johnpoint, [5/15/23 3:21 PM]
Does Logstash delete the automatically created keyword field?
openAI, [5/15/23 3:21 PM]
Logstash automatically creates a `keyword` type subfield for each string type field as the exact value of the original field. If you do not need this subfield, you can remove it in the index template by setting `"include_in_all": false` in the `mappings`. However, be aware that using this method to delete the `keyword` field will result in losing some precise query capabilities.
If you want to keep the `keyword` field in the log stream, you can use the remove_field filter to delete the original field but keep the `keyword` field:
```ruby
filter {
mutate {
remove_field => ["my_field"]
rename => { "my_field.keyword" => "new_field" }
}
}
```
In the above example, Logstash will delete the original `my_field` field and keep the exact value in the `my_field.keyword` subfield, finally using the rename filter to rename `my_field.keyword` to `new_field`. This way, you can use `new_field` for precise queries in Elasticsearch.
johnpoint, [5/15/23 4:18 PM]
Why does the ES index created by Logstash not have a daily rolling name?
openAI, [5/15/23 4:18 PM]
Logstash creates ES indexes in Daily Index mode by default, with one index per day and a date suffix. For example, an index named `logstash` will be converted to `logstash-yyyy.mm.dd`, and when writing data, it will determine the specific value of the corresponding date based on the UTC timezone. If it is not rolling, one possibility is that the configuration file has not been enabled, and another possibility is that the Elasticsearch cluster configuration file has settings that prohibit creating new indexes. You can check the relevant settings in the Elasticsearch configuration file.
johnpoint, [5/15/23 4:19 PM]
How to enable Daily Index mode in Logstash?
openAI, [5/15/23 4:19 PM]
To enable Daily Index mode, you need to modify the Logstash configuration file. In the `output` configuration, you can use the `index` configuration option to specify the name of the Elasticsearch index. To make the index name roll daily, you can use a date format placeholder, such as:
```
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "your-index-name-%{+YYYY.MM.dd}"
}
}
```
In the above configuration, the index name format is `your-index-name-YYYY.MM.dd`, where `%{+YYYY.MM.dd}` is a date format placeholder representing the current date. With this configuration, a new index will be automatically created every day.
Also, note that to enable Date Index mode, you need to set `action.auto_create_index` to `true` in the Elasticsearch configuration file.