Skip to main content
Logging

Parsing and Centralizing Elasticsearch Logs with Logstash

Radu Gheorghe Radu Gheorghe on

NOTE: this configuration was tested with Logstash 2.2 on logs generated by Elasticsearch 2.2

No, it’s not an endless loop waiting to happen, the plan here is to use Logstash to parse Elasticsearch logs and send them to another Elasticsearch cluster or to a log analytics service like Logsene (which conveniently exposes the Elasticsearch API, so you can use it without having to run and manage your own Elasticsearch cluster).

If you’re looking for some ELK stack intro and you think you’re in the wrong place, try our 5-minute Logstash tutorial. Still, if you have non-trivial amounts of data, you might end up here again. Because you’ll probably need to centralize Elasticsearch logs for the same reasons you centralize other logs:

  • to avoid SSH-ing into each server to figure out why something went wrong
  • to better understand issues such as slow indexing or searching (via slowlogs, for instance)
  • to search quickly in big logs

In this post, we’ll describe how to use Logstash’s file input to tail the main Elasticsearch log and the slowlogs. We’ll use grok and other filters to parse different parts of those logs into their own fields and we’ll send the resulting structured events to Logsene/Elasticsearch via the elasticsearch output. In the end, you’ll be able to do things like slowlog slicing and dicing with Kibana:

logstash_elasticsearch

TL;DR note: scroll down to the FAQ section for the whole config with comments.

Tailing Files

First, we’ll point the file input to *.log from Elasticsearch’s log directory. This will work nicely with the default rotation, which renames old logs to something like cluster-name.log.SOMEDATE. We’ll use start_position => “beginning”, to index existing content as well. We’ll add the multiline codec to parse exceptions nicely, telling it that every line not starting with a [ sign belongs to the same event as the previous line.

input {
  file {
    path => "/var/log/elasticsearch/*.log"
    type => "elasticsearch"
    start_position => "beginning"
    codec => multiline {
      pattern => "^\["
      negate => true
      what => "previous"
    }
  }
}

Parsing Generic Content

A typical Elasticsearch log comes in the form of:

[2016-03-18 09:33:57,702][INFO ][node                     ] [logstash001] starting ...

while a slowlog is a bit more structured, like:

[2016-02-24 10:45:49,124][WARN ][index.search.slowlog.query] took[2.9ms], took_millis[2], types[config], stats[], search_type[QUERY_AND_FETCH], total_shards[1], source[{"size":1000,"sort":[{"timestamp":{"order":"desc","ignore_unmapped":true}}]}], extra_source[],

But fields from the beginning, like timestamp and severity, are common, so we’ll parse them first:

filter {
  if [type] == "elasticsearch" {
    grok {
      match => [ "message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}(?<message>(.|\r|\n)*)" ]
      overwrite => [ "message" ]
    }

For the main Elasticsearch logs, the message field now contains the actual message, without the timestamp, severity, and log source, which are now in their own fields. We can parse these further, though: some logs have a node name in them, some even have an index name that we can parse:

if "_grokparsefailure" not in [tags] {
      grok {  # regular logs
        match => [
          "message", "^\[%{DATA:node}\] %{SPACE}\[%{DATA:index}\]%{SPACE}(?<short_message>(.|\r|\n)*)",
          "message", "^\[%{DATA:node}\]%{SPACE}(?<short_message>(.|\r|\n)*)" ]
        tag_on_failure => []
      }

Parsing Slowlogs

For a query slowlog, the message field now looks like this:

took[2.9ms], took_millis[2], types[config], stats[], search_type[QUERY_AND_FETCH], total_shards[1], source[{"size":1000,"sort":[{"timestamp":{"order":"desc","ignore_unmapped":true}}]}], extra_source[],

So we’ll parse it with grok:

      grok {  # slow logs
        match => [ "message", "took\[%{DATA:took}\], took_millis\[%{NUMBER:took_millis}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\]," ]
        tag_on_failure => []
        add_tag => [ "elasticsearch-slowlog" ]
      }

Adjust timestamp

At this point, each log’s timestamp (the time logged by the application) is in the timestamp field, while the standard @timestamp was added by Logstash when it read that event. If you want @timestamp to hold the application-generated timestamp, you can do it with the date filter:

      date { # use timestamp from the log
        "match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
        target => "@timestamp"
      }

At this point we can remove the original timestamp field, which how holds the same information as @timestamp via the mutate filter:

      mutate {
        remove_field => [ "timestamp" ]  # remove unused stuff
      }

Sending Events to Logsene/Elasticsearch

Below is an elasticsearch output configuration that works well with Logsene and Logstash 2.0+. For an external Elasticsearch cluster, you can simply specify the host name and port (and comment the SSL line):

output {
  elasticsearch {
    hosts => "logsene-receiver.sematext.com:443"
    ssl => "true"
    index => "LOGSENE_APP_TOKEN_GOES_HERE"
    manage_template => false
  }
}

FAQ

Q: Cool, this works well for logs. How about monitoring Elasticsearch metrics like how much heap is used or how many cache hits I get?
A: Check out our SPM, which can monitor lots of applications, including Elasticsearch. If you’re a Logsene user, too, you’ll be able to correlate logs and metrics
Q: I find this logging and parsing stuff is really exciting.
A: Me too. If you want to join us, we’re hiring worldwide
Q: I’m here from the TL;DR note. Can I get the complete config?
A: Here you go (please check the comments for things you might want to change)

input {
  file {
    path => "/var/log/elasticsearch/*.log" # tail ES log and slowlogs
    type => "elasticsearch"
    start_position => "beginning" # parse existing logs, too
    codec => multiline { # put the whole exception in a single event
      pattern => "^\["
      negate => true
      what => "previous"
    }
  }
}

filter {
  if [type] == "elasticsearch" {
    grok {
      match => [ "message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}(?<message>(.|\r|\n)*)" ]
      overwrite => [ "message" ]
    }

    if "_grokparsefailure" not in [tags] {
      grok {  # regular logs
        match => [
          "message", "^\[%{DATA:node}\] %{SPACE}\[%{DATA:index}\]%{SPACE}(?<short_message>(.|\r|\n)*)",
          "message", "^\[%{DATA:node}\]%{SPACE}(?<short_message>(.|\r|\n)*)" ]
        tag_on_failure => []
      }

      grok {  # slow logs
        match => [ "message", "took\[%{DATA:took}\], took_millis\[%{NUMBER:took_millis}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\]," ]
        tag_on_failure => []
        add_tag => [ "elasticsearch-slowlog" ]
      }

      date { # use timestamp from the log
        "match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
        target => "@timestamp"
      }

      mutate {
        remove_field => [ "timestamp" ]  # remove unused stuff
      }
    }
  }
}

output {
  elasticsearch { # send everything to Logsene/Elasticsearch
    hosts => "logsene-receiver.sematext.com:443"
    ssl => "true"
    index => "TOKEN_GOES_HERE" # fill in your token (click Integration from your Logsene app)
    manage_template => false
  }
}