Parsing and Centralizing Elasticsearch Logs with Logstash

NOTE: this configuration was tested with Logstash 2.2 on logs generated by Elasticsearch 2.2

No, it’s not an endless loop waiting to happen, the plan here is to use Logstash to parse Elasticsearch logs and send them to another Elasticsearch cluster or to a log analytics service like Logsene (which conveniently exposes the Elasticsearch API, so you can use it without having to run and manage your own Elasticsearch cluster).

If you’re looking for some ELK stack intro and you think you’re in the wrong place, try our 5-minute Logstash tutorial. Still, if you have non-trivial amounts of data, you might end up here again. Because you’ll probably need to centralize Elasticsearch logs for the same reasons you centralize other logs:

  • to avoid SSH-ing into each server to figure out why something went wrong
  • to better understand issues such as slow indexing or searching (via slowlogs, for instance)
  • to search quickly in big logs

In this post, we’ll describe how to use Logstash’s file input to tail the main Elasticsearch log and the slowlogs. We’ll use grok and other filters to parse different parts of those logs into their own fields and we’ll send the resulting structured events to Logsene/Elasticsearch via the elasticsearch output. In the end, you’ll be able to do things like slowlog slicing and dicing with Kibana:

logstash_elasticsearch

TL;DR note: scroll down to the FAQ section for the whole config with comments.

Tailing Files

First, we’ll point the file input to *.log from Elasticsearch’s log directory. This will work nicely with the default rotation, which renames old logs to something like cluster-name.log.SOMEDATE. We’ll use start_position => “beginning”, to index existing content as well. We’ll add the multiline codec to parse exceptions nicely, telling it that every line not starting with a [ sign belongs to the same event as the previous line.

input {
  file {
    path => "/var/log/elasticsearch/*.log"
    type => "elasticsearch"
    start_position => "beginning"
    codec => multiline {
      pattern => "^\["
      negate => true
      what => "previous"
    }
  }
}

Parsing Generic Content

A typical Elasticsearch log comes in the form of:

[2016-03-18 09:33:57,702][INFO ][node                     ] [logstash001] starting ...

while a slowlog is a bit more structured, like:

[2016-02-24 10:45:49,124][WARN ][index.search.slowlog.query] took[2.9ms], took_millis[2], types[config], stats[], search_type[QUERY_AND_FETCH], total_shards[1], source[{"size":1000,"sort":[{"timestamp":{"order":"desc","ignore_unmapped":true}}]}], extra_source[],

But fields from the beginning, like timestamp and severity, are common, so we’ll parse them first:

filter {
  if [type] == "elasticsearch" {
    grok {
      match => [ "message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}(?<message>(.|\r|\n)*)" ]
      overwrite => [ "message" ]
    }

For the main Elasticsearch logs, the message field now contains the actual message, without the timestamp, severity, and log source, which are now in their own fields. We can parse these further, though: some logs have a node name in them, some even have an index name that we can parse:

if "_grokparsefailure" not in [tags] {
      grok {  # regular logs
        match => [
          "message", "^\[%{DATA:node}\] %{SPACE}\[%{DATA:index}\]%{SPACE}(?<short_message>(.|\r|\n)*)",
          "message", "^\[%{DATA:node}\]%{SPACE}(?<short_message>(.|\r|\n)*)" ]
        tag_on_failure => []
      }

Parsing Slowlogs

For a query slowlog, the message field now looks like this:

took[2.9ms], took_millis[2], types[config], stats[], search_type[QUERY_AND_FETCH], total_shards[1], source[{"size":1000,"sort":[{"timestamp":{"order":"desc","ignore_unmapped":true}}]}], extra_source[],

So we’ll parse it with grok:

      grok {  # slow logs
        match => [ "message", "took\[%{DATA:took}\], took_millis\[%{NUMBER:took_millis}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\]," ]
        tag_on_failure => []
        add_tag => [ "elasticsearch-slowlog" ]
      }

Adjust timestamp

At this point, each log’s timestamp (the time logged by the application) is in the timestamp field, while the standard @timestamp was added by Logstash when it read that event. If you want @timestamp to hold the application-generated timestamp, you can do it with the date filter:

      date { # use timestamp from the log
        "match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
        target => "@timestamp"
      }

At this point we can remove the original timestamp field, which how holds the same information as @timestamp via the mutate filter:

      mutate {
        remove_field => [ "timestamp" ]  # remove unused stuff
      }

Sending Events to Logsene/Elasticsearch

Below is an elasticsearch output configuration that works well with Logsene and Logstash 2.0+. For an external Elasticsearch cluster, you can simply specify the host name and port (and comment the SSL line):

output {
  elasticsearch {
    hosts => "logsene-receiver.sematext.com:443"
    ssl => "true"
    index => "LOGSENE_APP_TOKEN_GOES_HERE"
    manage_template => false
  }
}

FAQ

Q: Cool, this works well for logs. How about monitoring Elasticsearch metrics like how much heap is used or how many cache hits I get?
A: Check out our SPM, which can monitor lots of applications, including Elasticsearch. If you’re a Logsene user, too, you’ll be able to correlate logs and metrics
Q: I find this logging and parsing stuff is really exciting.
A: Me too. If you want to join us, we’re hiring worldwide
Q: I’m here from the TL;DR note. Can I get the complete config?
A: Here you go (please check the comments for things you might want to change)

input {
  file {
    path => "/var/log/elasticsearch/*.log" # tail ES log and slowlogs
    type => "elasticsearch"
    start_position => "beginning" # parse existing logs, too
    codec => multiline { # put the whole exception in a single event
      pattern => "^\["
      negate => true
      what => "previous"
    }
  }
}

filter {
  if [type] == "elasticsearch" {
    grok {
      match => [ "message", "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{DATA:severity}%{SPACE}\]\[%{DATA:source}%{SPACE}\]%{SPACE}(?<message>(.|\r|\n)*)" ]
      overwrite => [ "message" ]
    }

    if "_grokparsefailure" not in [tags] {
      grok {  # regular logs
        match => [
          "message", "^\[%{DATA:node}\] %{SPACE}\[%{DATA:index}\]%{SPACE}(?<short_message>(.|\r|\n)*)",
          "message", "^\[%{DATA:node}\]%{SPACE}(?<short_message>(.|\r|\n)*)" ]
        tag_on_failure => []
      }

      grok {  # slow logs
        match => [ "message", "took\[%{DATA:took}\], took_millis\[%{NUMBER:took_millis}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\]," ]
        tag_on_failure => []
        add_tag => [ "elasticsearch-slowlog" ]
      }

      date { # use timestamp from the log
        "match" => [ "timestamp", "YYYY-MM-dd HH:mm:ss,SSS" ]
        target => "@timestamp"
      }

      mutate {
        remove_field => [ "timestamp" ]  # remove unused stuff
      }
    }
  }
}

output {
  elasticsearch { # send everything to Logsene/Elasticsearch
    hosts => "logsene-receiver.sematext.com:443"
    ssl => "true"
    index => "TOKEN_GOES_HERE" # fill in your token (click Integration from your Logsene app)
    manage_template => false
  }
}

13 thoughts on “Parsing and Centralizing Elasticsearch Logs with Logstash

  1. Radu,

    This string listed in your file is not valid Regex: “(?(.|\r|\n)*)” Am I missing something? (We’re using Elastic 1.4.x soon to upgrade.)

    However, the rest of it gave me a nice foundation to build on. Thanks for doing the work on this.

    1. Hi Dave,

      Oh, I’m sorry! It was either a copy-paste error or (more likely) WordPress didn’t escape the regex properly. That bit should be “(?(.|\r|\n)*)” or “(?(.|\r|\n)*)” respectively. I’ve updated the post and it should work to just copy-paste from it.

      Thanks for the catch!

  2. In this snippet from the filter:
    “source[(?(.|r|n)*], extra_source[)%{DATA:extra_source}”

    Is it correct that there’s an open bracket after the first instance of ‘extra_source’ ? I’m checking to make sure I have my brackets matched and I’m not seeing a closing bracket for the grok { match => [array] } array within this block:
    if [source] == “index.search.slowlog.fetch” or [source] == “index.search.slowlog.query” {block}
    Thanks

    1. Hi Jeremy,

      Sorry for all this formatting mess, I need to fix it. In the meantime, here’s the filter part that I’ve tested on Elasticsearch 2.2 (logs change from one version to another): https://gist.github.com/radu-gheorghe/5404512ca75029ead9b9

      So the answer to your specific question is that I would close the bracket and add the extra comma, to match the slowlog exactly. But maybe with the 1.x version on which this post is based some slowlogs looked different so I preferred to leave it open.

    2. Hi again, Jeremy,

      I’ve just updated the post to work for Logstash 2.x and ES 2.x. I hope it will work well for you.

      1. Thanks Radu! We’re actually still on logstash 1.5 and ES 1.7.3, so perhaps the initial syntax was correct and my IDE just got confused by it. I’ll give it a shot and see how it goes. And hopefully we’ll get up to 2.x soon. We just moved from ES 0.90.5 to 1.7.3 a few months ago, so we’re getting there, slowly. Thanks again!

        1. You’re welcome, Jeremy. Please let me know if you have any trouble with 1.x and I will dig back to the old post.

  3. Thanks. Looks like just what I was working on.
    I also added to this sending exception counts to statsd:

    filter {
    … the above processing…

    # These two “Grok”‘s can be handled with one Grok and “break_on_match=>false”
    # but there is a bug in the implementation in 1.4.2 which should be fixed in
    # 1.5.0, until then we use two separate grok calls
    grok {
    match => [
    “message”, “^[a-zA-Z0-9.]+.(?[a-zA-Z0-9]*Exception): ”
    ]
    tag_on_failure => false
    }

    if [final_exception] {
    grok {
    match => [
    “message”, “^Caused by: [a-zA-Z0-9.]+.(?[a-zA-Z0-9]*Exception): ”
    ]
    tag_on_failure => false
    }
    }
    }

    output {

    # count each individual exception type per node and whether it was the root
    # cause exception or not
    if [final_exception] {
    statsd {
    host => “statsd.host.name”
    increment => “elasticsearch.%{node}.final_exception.%{final_exception}”
    }
    }

    if [original_exception] {
    statsd {
    host => “statsd.host.name”
    increment => “elasticsearch.%{node}.original_exception.%{original_exception}”
    }
    }

    # keep also a total exception count per node
    if [final_exception] or [original_exception] {
    statsd {
    host => “statsd.host.name”
    increment => “elasticsearch.%{node}.exceptions”
    }
    }
    }

Leave a Reply