Request Response correlation through Logstash

Nuwan Chamara
4 min readJul 8, 2018

--

Let me explain how to correlate requests and responses using Logstash. My requirement is to print request and response in a single line.

In-order to implement this feature first we should have following features implemented in incoming events. I had to request my Dev team to make the changes for the logs I am reading.

  1. Timestamps
  2. Transaction type: ie request or response
  3. Transaction id: unique id

Logstash filters can be used to do this task. Requests and responses will be handled separately. Once the request is arrived the required fields to be merged to response will be stored in a map based on the transaction id. when the response is arrived request data is reed from the map and attached.

Following Logstash filters are used

  1. grok
  2. aggregate

Implementation

if [message] =~ "TRANSACTION:request" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:body}" }
}
aggregate { #store required data in a map
task_id => "%{tid}"
code => "
map['reqBody'] = event.get('body')
map['timestamp'] = event.get('timestamp')
"
map_action => "create"
}

}
if [message] =~ "TRANSACTION:response" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:response}" }

}
aggregate {
task_id => "%{tid}"
code => "
event.set('request', map['reqBody'])
"
map_action => "update"
end_of_task => true
push_map_as_event_on_timeout => true
timeout => 120
timeout_task_id_field => "tid"
timeout_code => "
event.set('response','Response-timeout')
event.set('type','request-response')
event.set('transaction', 'request')
"
}

}

This is the code used to correlated requests and responses. Let me explain line by line.

if [message] =~ "TRANSACTION:request" {

Filters the requests from the incoming events, simply looks for wording “TRANSACTION:request”.

grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:body}" }
}

Grok filter is used to convert log line to set of key value pairs. Above code will return an output similar to,

timestamp:2018-06-22 11:36:19,997
transaction:request
tid:e86a7610
body:{dsdsd:dsdsd,dsds:dsds}

Next aggregate filter is used to store data in a map. Actually in this phase no aggregation happens here. Just the values are persisted for future reference.

aggregate { #store required data in a map
task_id => "%{tid}"
code => "
map['reqBody'] = event.get('body')
map['timestamp'] = event.get('timestamp')
"
map_action => "create"
}

Aggregation requires a unique id as a reference, we will use our transaction id. Then we have a code part where we will use bit of ruby coding for extracting event to a map.

map['reqBody'] = event.get('body')

Body field, which is created from grok filter now stored in map field called ‘reqBody’.

map_action => "create"

This is the part we inform the aggregate filter that the mapping is started. So it will create a new session for the data.

Now we have all the fields required from the request. Can delete the request if not required. I will not add that code to focus on the title we discussing.

if [message] =~ "TRANSACTION:response" {

Now let’s see how to manage the response. An event is considered as an response if it has “TRANSACTION:response” wording in the logs.

grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:response}" }

Now will be adding another grok filter to the filtered response event. Note that the format of response is not as request. Hence we are using new grok filter. Output of this filter should be similar something as following.

timestamp:2018-06-22 11:36:19,997
transaction:response
tid:e86a7610
response:{dsdsd:dsdsd,dsds:dsds}

Let’s see how real aggregation is happened.

aggregate {                
task_id => "%{tid}"
code => "
event.set('request', map['reqBody'])
"
map_action => "update"
end_of_task => true
push_map_as_event_on_timeout => true
timeout => 120
timeout_task_id_field => "tid"
timeout_code => "
event.set('response','Response-timeout')
event.set('type','request-response')
event.set('transaction', 'request')
"
}

As I mentioned earlier aggregation is totally based on a unique id. When the aggregate filter finds the mapped transaction id, it starts processing. There are thousands of events coming in so transaction id is the only way of mapping response to a request.

event.set('request', map['reqBody'])

This code will read previously stored request data back from the map and add to the current event.

end_of_task => true

This task will tell the system that the event is completed so the resources can be freed.

The code below can be used to handle additional scenarios such as event is now arrived so we will not have a tracking the request at least.

push_map_as_event_on_timeout => true
timeout => 120
timeout_task_id_field => "tid"
timeout_code => "
event.set('response','Response-timeout')
event.set('type','request-response')
event.set('transaction', 'request')
push_map_as_event_on_timeout => true

This informs the system to emit the request mapping data if no response is arrived. Timeout value is used to inform how long to wait before emitting the map event data.

timeout_code => "

This code can be used to add some additional fields to event when timeout has happened. In my example I am setting response as “Response-timeout”. so the viewer will know timeout has happened.

Final output may look like

timestamp:2018-06-22 11:36:19,997
transaction:response
tid:e86a7610
response:{dsdsd:dsdsd,dsds:dsds}
request:{sdsdsd:dsdsds}

--

--

No responses yet