ELK Logstash Filter writing for response time, request and response correlating

Nuwan Chamara
6 min readJul 1, 2018

I hope you are bit familiar with elk setup and up running stuff. If that so continue reading this post.

Now my elk setup is up and running and reads a log file and shows it on Kibana dashboard. Basically there is no business logic was implemented.

The new business requirement was to write a csv file from a log file with request response and response time. and also show them on kibana. Problem i faced here was request and response came as two separate events in the log file. So the requirement can be split as below.

  1. It should read the requests and responses from a file
  2. Correlate them, as they not come in a single event
  3. Calculate the response time
  4. Create a csv file with request response and response time in same line

As we have already configured our system to read log records from a file, there’s no extra work to do to get both requests and responses to elastic-search.

we decided go with log-stash as it’s the tool recommended in elk stack to handle complex processing such as correlating events.

when we writing log-stash filters all of them are written in a custom file with conf extension and placed in /etc/logstash/conf.d/yourfilename.conf. logstash will accept any file in this folder with conf extension. conf file is seperated in to 3 parts as input filter and output.

Input

this defines how the input is received to log-stash. in my example file-beat should send log data to logstash:5044

input {beats {host => “localhost”port => 5044}}

input for the our system is filebeat configured.

Output

this part specifies where to send data once the processing is done.

As the first step i wanted to create the csv file with whatever data i have. so i decide to use csv filter as second output method in our elk setup. this will make sure events emitted are published to both csv and elasticsearch server be used by kibana.

output {elasticsearch {hosts => [ “localhost:9200” ]}csv {path => “csv_reports/transactions.csv”fields => [“tid”,”user”,”api”]}}

This is the abc.conf file located in logstash conf.d folder. this output will make sure output is emitted to kibana and the given fields are emitted to the csv file.

Filter Implementation

This is where we can implement our logics to cater business requirement. there are many log-stash filters you can find on elastic.co site that can implement to satisfy various requirement.

I will separate this part to two. first part will describe problems i faces when writing the solution and second part with final implementation.

Filters Part I Effort

  1. creating new events

my devlopment was done locally and i didn’t had access to the system making logs. so publishing new events done by coping and pasting same event again and again in to the same log file be reading.

2. breaking log json in to fields

we can’t use grok filter to split json data as our given json format can be changes per request. kv filter helped us splitting json data. fields can be extracted using ‘,’ and then value can be extracted using ‘:’.

Later i had to stop splitting json body as this creates fields dynamically and hence elasticsearch started creating fields dynamically which made creating columns dynamically. which uses a lot of data and space.

again when trying use csv filter to write data to file, it requires fields to be configured which didn’t support in our scenario.

So conclusion was to not to split json at the moment.

3. send different fields to different outputs

if you need to send different fields to the outputs you can use clone feature to make a copy of the event and handle it separately. check the type as cloned

i didn't use this for my final implementation

4. grok filter didn’t process response data

this due to response and request had different log format. so decided to handle them in separate grok filters.

5. handling complex data types in grok filter

data like uuid having a complex pattern where grok patterns like word had problems breaking it. when unknown data is available best mechanism to use is GREEDYDATA data type, which extracts all the data without validating. it will read till next grok pattern we have specified. in my case after the greedy-data i have added comma which is available in the source.

6. converting json to csv

i wanted to convert json into a csv like string. still working on it.

7. adding a new field to an event

code => "event.set('request2', 'new add field testing')"

8. correlating request and responses

aggregate filter is used here. it helped us persisting data from request and wait till response arrives. when response arrived request data is extracted from the map[] and send all as a single event.

9. if response didn’t return request is also missing

push_map_as_event_on_timeout => true 

this parameter helped us emitting request after given timeout value.

10. distinguishing transaction is completed or timeout

added a new filed to the event with transaction type. it has request full or response as types. correlated events are marked as full.

11. events started duplicating

when request arrived it was persisted in a map and also emitted as an event. to overcome this issue after event is persisted in the map event should be dropped before reaching to the end. this also helped removing unwanted fields like very long jwt token.

12. response arrives after timeout

This notified should be notified. so correlation happens but still record is visible as a separate record.

13. remove source

to save some space source is removed after gone through grok filter.

14. getting response time

the given logstash filters didn't helped us getting us response time. so we had to roby code base. so once the aggregated event is created, it was sent in to this filter to create the response time.

time1.to_time.to_i- time1.to_time.to_i

initially used above method to get the response time but the output was zero as this was for seconds. to calculate milliseconds had to use following method.

DateTime.parse(event.get('response-time')).to_time.to_f*1000

15. responses coming after timeout had null values

Responses coming after timeout had null values as values looking are in the requested which has been expired.

16. some other events which has nothing to do with requests and responses stated capturing in the system.

checking whether the transaction id is null fixed the issue as those events didn’t had those id’s.

17. ruby code bug

ruby scripts had some issues of handling “-” characters and getting rid of solved the issues.

Filters Part II Final

this is the log entries created from the application. API_REQUEST_ID act as the way of correlating two events.

Request
[2018–06–22 11:36:19,886] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:request,API_REQUEST_ID:urn:uuid:e86a7610,APPLICATION_ID:9,API_NAME:smsmessaging,API_PUBLISHER:admin,API_VERSION:v1,API_CONTEXT:/smsmessaging/v1,USER_ID:admin,jwzToken:eyJ0,body: { “emailVerified”:”true”, “phoneNumber”:”0718783373", “phoneNumberCountryCode”:””, “zoneinfo”:””, “BillingSegment”:””, “isLostStolen”:””, “idHash”:””, “SubscriptionActivity”:””, “LengthOfTenure”:””, “pairingChange”:””, “isRoaming”:”No”, “roamingCountry”:”UK”, “isDivertSet”:””, “locationCountry”:””, “deviceChange”:””}
Response
[2018–06–22 11:36:19,997] INFO {REQUEST_RESPONSE_LOGGER} — TRANSACTION:response,API_REQUEST_ID:urn:uuid:e86a7610,body: { “msisdn”:”0718783373", “imsi”:”0718783373", “title”:”abc”, “firstName”:”Nuwan”, “lastName”:”Senanayake”, “dob”:””, “identificationType”:””, “identificationNumber”:””, “onBehalfOf”:””, “purchaseCategoryCode”:””, “accountType”:””, “ownerType”:””, “status”:””}

As you can see two log entry formats different. in order to apply logstash filleting format should be fixed. so we have to handle requests and responses separately.

if [message] =~ "TRANSACTION:request" {

this condition checks whether request keyword exists in the log.

Next step is to extract keyvalue pairs from the log record so that can process the record using logstash filters. Grok filter is used get this done as below.

grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" }
}
filter {if [message] =~ "TRANSACTION:request" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},APPLICATION_ID:%{NUMBER:application},API_NAME:%{WORD:api},API_PUBLISHER:%{WORD:publisher},API_VERSION:%{WORD:api_v},API_CONTEXT:%{GREEDYDATA:context},USER_ID:%{GREEDYDATA:user},jwzToken:%{GREEDYDATA:jwztoken},body:%{GREEDYDATA:body}" }
}
aggregate { #store required data in a map
task_id => "%{tid}"
code => "
map['reqBody'] = event.get('body')
map['user'] = event.get('user')
map['application'] = event.get('application')
map['api'] = event.get('api')
map['timestamp'] = event.get('timestamp')
"
map_action => "create"
}
drop {}#drop the request before persisting, to save indexing space in elasticsearch server
}
if [message] =~ "TRANSACTION:response" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:response-time}] %{LOGLEVEL:level} \{%{DATA:logtype}} - TRANSACTION:%{WORD:transaction},API_REQUEST_ID:%{GREEDYDATA:tid},body:%{GREEDYDATA:response}" }
remove_field => ["message"]
}
aggregate {
task_id => "%{tid}"
code => "
event.set('request', map['reqBody'])
event.set('user', map['user'])
event.set('application', map['application'])
event.set('api', map['api'])
event.set('request-time', map['timestamp'])
event.set('transaction', 'full')

"
map_action => "update"
end_of_task => true
push_map_as_event_on_timeout => true
timeout => 120
timeout_task_id_field => "tid"
timeout_code => "
event.set('response','Response-timeout')
event.set('type','request-response')
event.set('transaction', 'request')
"
}
ruby {
init => "require 'time'"
code => "duration = (DateTime.parse(event.get('response-time')).to_time.to_f*1000 - DateTime.parse(event.get('request-time')).to_time.to_f*1000) rescue nil; event.set('service-time', duration); "
}
}
}

Post Implementation for Production

Once our business requirement is satisfied there are some more things to do before send this to production.

if [source] =~ “request-response-logger” {

this will make sure our filter is applied only to the logs read from “request-response-logger.log” file as this same log-stash instance is used to process some other logs as well.

mutate { replace => { “type” => “request-response” } }

there is a field cold type comes with the incoming event which is bit lengthy word. we will replace it with our text and will use later when doing output as shown in below code snippet.

if [type] == "request-response" {
elasticsearch {
hosts => [ "localhost:9200" ]
}
csv {
path => "csv_reports/transactions.csv"
fields => ["tid","user","api"]
}
}

--

--