Logstash config for http_poller pulling of DC Capital bikeshare data and doing an XML split of the contents.
## Example of pulling data from DC Capital bikeshare to Elasticsearch in real time
## HTTP Poller -> XML Splitting -> Elasticsearch
input {
## pull data from Capital Bikeshare every 60 seconds
http_poller {
urls => {
bikeshare_dc => ""
request_timeout => 30
interval => 60
codec => "plain"
metadata_target => "http_poller_metadata"
filter {
## interpret the message payload as XML
xml {
source => "message"
target => "parsed"
## Split out each "station" record in the XML into a different event
split {
field => "[parsed][station]"
add_field => {
## generate a unique id for the station # X the sensor time to prevent duplicates
id => "%{[parsed][station][id]}-%{[parsed][station][lastCommWithServer]}"
stationName => "%{[parsed][station][name]}"
lastCommWithServer => "%{[parsed][station][lastCommWithServer]}"
lat => "%{[parsed][station][lat]}"
long => "%{[parsed][station][long]}"
numBikes => "%{[parsed][station][nbBikes]}"
numEmptyDocks => "%{[parsed][station][nbEmptyDocks]}"
mutate {
## Convert the numeric fileds to the appropriate data type from strings
convert => {
"numBikes" => "integer"
"numEmptyDocks" => "integer"
"lat" => "float"
"long" => "float"
## put the geospatial value in the correct [ longitude, latitude ] format
add_field => { "location" => [ "%{[long]}", "%{[lat]}" ]}
## get rid of the extra fields we don't need
remove_field => [ "message", "parsed", "lat", "long", "host", "http_poller_metadata"]
## use the embedded Unix timestamp
date {
match => ["lastCommWithServer", "UNIX_MS"]
remove_field => ["lastCommWithServer"]
output {
# stdout { codec => rubydebug }
stdout { codec => dots }
elasticsearch {
## use a time aware index name
index => "bikestatus-dc-%{+YYYY.MM.dd}"
protocol => "http"
## not super important, but it makes sense to override the default which is "log"
document_type => "bikestatus"
## use the generated id as the document id to prevent duplicates
document_id => "%{[id]}"
## INDEX Template (apply this before loading data!)
# PUT _template/bikestatus
# {
# "template": "bikestatus-*",
# "settings": {
# "number_of_shards": 1,
# "number_of_replicas": 0
# },
# "mappings": {
# "_default_": {
# "dynamic_templates": [
# {
# "string_fields": {
# "mapping": {
# "index": "not_analyzed",
# "omit_norms": true,
# "type": "string",
# "doc_values": true
# },
# "match_mapping_type": "string",
# "match": "*"
# }
# }
# ],
# "_all": {
# "enabled": false
# },
# "properties": {
# "@timestamp": {
# "type": "date",
# "format": "dateOptionalTime",
# "doc_values": true
# },
# "location": {
# "type": "geo_point",
# "geohash": true,
# "fielddata" : {
# "format" : "compressed",
# "precision" : "20m"
# }
# },
# "numBikes": { "type": "integer","doc_values": true },
# "numEmptyDocks": { "type": "integer","doc_values": true }
# }
# }
# }
# }
## Kibana Index Template
## [bikestatus-dc-]YYYY.MM.DD
