Posts Tagged ‘Logstash’
  • Starting Beats for Java developers

    Beats logo

    Last week, I wrote about how one could start developing one’s Logstash plugin coming from a Java developer background. However, with the acquisition of Packetbeat, Logstash now has help from Beats to push data to Elasticsearch. Beats are developed in Go, another challenge for traditional Java developers. This week, I tried porting my Logstash Reddit plugin to a dedicated Beat. This post documents my findings (spoiler: I found it much easier than with Ruby).

    Setting up the environment

    On OSX, installing the go executable is easy as pie:

    brew install go

    While Java or Ruby (or any language I know for that matter) can reside anywhere on the local filesystem, Go projects must all be situated in one single dedicated location, available under the $GOPATH environment variable.

    Creating the project

    As for Logstash plugins, Beats projects can be created from a template. The documentation to do that is quite straightforward. Given Go’s rigid requirements regarding location on the filesystem, just following instructions yields a new ready-to-use Go project.

    The default template code will repeatedly send an event with an incremented counter in the console:

    ./redditbeat -e -d "*"
    2016/12/13 22:55:56.013362 beat.go:267: INFO
      Home path: [/Users/i303869/projects/private/go/src/]
      Config path: [/Users/i303869/projects/private/go/src/]
      Data path: [/Users/i303869/projects/private/go/src/]
      Logs path: [/Users/i303869/projects/private/go/src/]
    2016/12/13 22:55:56.013390 beat.go:177: INFO Setup Beat: redditbeat; Version: 6.0.0-alpha1
    2016/12/13 22:55:56.013402 processor.go:43: DBG  Processors:
    2016/12/13 22:55:56.013413 beat.go:183: DBG  Initializing output plugins
    2016/12/13 22:55:56.013417 logp.go:219: INFO Metrics logging every 30s
    2016/12/13 22:55:56.013518 output.go:167: INFO Loading template enabled.
    Reading template file:
    2016/12/13 22:55:56.013888 output.go:178: INFO Loading template enabled for Elasticsearch 2.x.
    Reading template file:
    2016/12/13 22:55:56.014229 client.go:120: INFO Elasticsearch url: http://localhost:9200
    2016/12/13 22:55:56.014272 outputs.go:106: INFO Activated elasticsearch as output plugin.
    2016/12/13 22:55:56.014279 publish.go:234: DBG  Create output worker
    2016/12/13 22:55:56.014312 publish.go:276: DBG  No output is defined to store the topology.
      The server fields might not be filled.
    2016/12/13 22:55:56.014326 publish.go:291: INFO Publisher name: LSNM33795267A
    2016/12/13 22:55:56.014386 async.go:63: INFO Flush Interval set to: 1s
    2016/12/13 22:55:56.014391 async.go:64: INFO Max Bulk Size set to: 50
    2016/12/13 22:55:56.014395 async.go:72: DBG  create bulk processing worker (interval=1s, bulk size=50)
    2016/12/13 22:55:56.014449 beat.go:207: INFO redditbeat start running.
    2016/12/13 22:55:56.014459 redditbeat.go:38: INFO redditbeat is running! Hit CTRL-C to stop it.
    2016/12/13 22:55:57.370781 client.go:184: DBG  Publish: {
      "@timestamp": "2016-12-13T22:54:47.252Z",
      "beat": {
        "hostname": "LSNM33795267A",
        "name": "LSNM33795267A",
        "version": "6.0.0-alpha1"
      "counter": 1,
      "type": "redditbeat"

    Regarding command-line parameters: -e logs to the standard err, while -d "*" enables all debugging selectors. For the full list of parameters, type ./redditbeat --help.

    The code

    Go code is located in .go files (what a surprise…​) in the project sub-folder of the $GOPATH/src folder.

    Configuration type

    The first interesting file is config/config.go which defines a struct to declare possible parameters for the Beat. As for the former Logstash plugin, let’s add a subreddit parameter, and sets it default value:

    type Config struct {
        Period time.Duration `config:"period"`
        Subreddit string `config:"subreddit"`
    var DefaultConfig = Config {
        Period: 15 * time.Second,
        Subreddit: "elastic",

    Beater type

    Code for the Beat itself is found in beater/redditbean.go. The default template creates a struct for the Beat and three functions:

    1. The Beat constructor - it reads the configuration:
      func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { ... }
    2. The Run function - should loop over the main feature of the Beat:
      func (bt *Redditbeat) Run(b *beat.Beat) error { ... }
    3. The Stop function handles graceful shutdown:
      func (bt *Redditbeat) Stop() { ... }

    There’s no explicit interface implementation in Go. Just implementing all methods from an interface creates an implicit inheritance relationship. For documentation purposes, here’s the Beater interface:

    type Beater interface {
        Run(b *Beat) error

    Hence, since the Beat struct implements both Run and Stop, it is a Beater.

    There’s no concept of class in Go, so methods cannot be declared on a concrete type. However, it exists the concept of extension functions: functions that can add behavior to a type (inside a single package). It needs to declare the receiver type: this is done between the fun keyword and the function name - here, it’s the Redditbeat type (or more correctly, a pointer to the Redditbeat type, but there’s an implicit conversion).

    The constructor and the Stop function can stay as they are, whatever feature should be developed must be in the Run function. In this case, the feature is to call the Reddit REST API and send a message for every Reddit post.

    The final code looks like the following:

    func (bt *Redditbeat) Run(b *beat.Beat) error {
        bt.client = b.Publisher.Connect()
        ticker := time.NewTicker(bt.config.Period)
        reddit := "" + bt.config.Subreddit + "/.json"      (1)
        client := &http.Client {}                                                   (2)
        for {
            select {
            case <-bt.done:
                return nil
            case <-ticker.C:
            req, reqErr := http.NewRequest("GET", reddit, nil)                      (3)
            req.Header.Add("User-Agent", "Some existing header to bypass 429 HTTP") (4)
            if (reqErr != nil) {                                                    (5)
                panic(reqErr)                                                       (6)
            resp, getErr := client.Do(req)                                          (7)
            if (getErr != nil) {
            body, readErr := ioutil.ReadAll(resp.Body)                              (8)
            defer resp.Body.Close()                                                 (9)
            if (readErr != nil) {
            trimmedBody := body[len(prefix):len(body) - len(suffix)]                (10)
            messages := strings.Split(string(trimmedBody), separator)               (11)
            for i := 0; i < len(messages); i ++ {
                event := common.MapStr{                                             (12)
                    "@timestamp": common.Time(time.Now()),
                    "type":       b.Name,
                    "message":    "{" + messages[i] + "}",
                bt.client.PublishEvent(event)                                       (13)

    Here’s an explanation of the most important pieces:

    1 Create the Reddit REST URL by concatenating Strings, including the configuration Subreddit parameter. Remember that its default value has been defined in the config.go file.
    2 Get a reference on a new HTTP client type
    3 Create a new HTTP request. Note that Go allows for multiple return values.
    4 If no standard request header is set, Reddit’s API will return a 429 status code
    5 Go standard errors are not handled through exceptions but are returned along regular returned values. According to the Golang wiki:
    Indicating error conditions to callers should be done by returning error value
    6 The panic() function is similar to throwing an exception in Java, climbing up the stack until it’s handled. For more information, check the relevant documentation.
    7 Execute the HTTP request
    8 Read the response body into a byte array
    9 Close the body stream. Note the defer keyword:
    A defer statement defers the execution of a function until the surrounding function returns.
    10 Create a slice - a reference to a part of an array, of the whole response body byte array. In essence, it removes the prefix and the suffix to keep the relevant JSON value. It would be overkill to parse the byte array into JSON.
    11 Split the slice to get each JSON fragment separately
    12 Create the message as a simple dictionary structure
    13 Send it

    Configure, Build, and launch

    Default configuration parameters can be found in the redditbeat.yml file at the root of the project. Note that additional common Beat parameters are listed in the redditbeat.full.yml, along with relevant comments.

    The interesting thing about Beats is that their messages can be sent directly to Elasticsearch or to Logstash for further processing. This is configured in the aforementioned configuration file.

      period: 10s
      hosts: ["localhost:9200"]
      hosts: ["localhost:5044"]
      enabled: true

    This configuration snippet will loop over the Run method every 10 seconds and send messages to the Logstash instance running on localhost on port 5044. This can be overridden when running the Beat (see below).

    For Logstash to accept messages from Beats, the Logstash Beat plugin must be installed and Logstash input must be configured for Beats:
    input {
      beats {
        port => 5044

    To build the project, type make at the project’s root. It will create an executable that can be run.

    ./redditbeat -e -E redditbeat.subreddit=java

    The -E flag may override parameters found in the embedded redditbeat.yml configuration file (see above). Here, it sets the subreddit to be read to "java" instead of the default "elastic".

    The output looks like the following:

    2016/12/17 14:51:19.748329 client.go:184: DBG  Publish: {
      "@timestamp": "2016-12-17T14:51:19.748Z",
      "beat": {
        "hostname": "LSNM33795267A",
        "name": "LSNM33795267A",
        "version": "6.0.0-alpha1"
      "message": "{
        \"kind\": \"t3\", \"data\": {
          \"contest_mode\": false, \"banned_by\": null,
          \"domain\": \"\", \"subreddit\": \"java\", \"selftext_html\": null,
          \"selftext\": \"\", \"likes\": null, \"suggested_sort\": null, \"user_reports\": [],
          \"secure_media\": null, \"saved\": false, \"id\": \"5ipzgq\", \"gilded\": 0,
          \"secure_media_embed\": {}, \"clicked\": false, \"report_reasons\": null,
          \"author\": \"pushthestack\", \"media\": null, \"name\": \"t3_5ipzgq\", \"score\": 11,
          \"approved_by\": null, \"over_18\": false, \"removal_reason\": null, \"hidden\": false,
          \"thumbnail\": \"\", \"subreddit_id\": \"t5_2qhd7\", \"edited\": false,
          \"link_flair_css_class\": null, \"author_flair_css_class\": null, \"downs\": 0,
          \"mod_reports\": [], \"archived\": false, \"media_embed\": {}, \"is_self\": false,
          \"hide_score\": false, \"spoiler\": false,
          \"permalink\": \"/r/java/comments/5ipzgq/jdk_9_will_no_longer_bundle_javadb/\",
          \"locked\": false, \"stickied\": false, \"created\": 1481943248.0,
          \"url\": \"\",
          \"author_flair_text\": null, \"quarantine\": false,
          \"title\": \"JDK 9 will no longer bundle JavaDB\", \"created_utc\": 1481914448.0,
          \"link_flair_text\": null, \"distinguished\": null, \"num_comments\": 4,
          \"visited\": false, \"num_reports\": null, \"ups\": 11
      "type": "redditbeat"


    Strangely enough, I found developing a Beat easier than a Logstash plugin. Go is more low-level and some concepts feel really foreign (like implicit interface implementation), but the ecosystem is much simpler - as the language is more recent. Also, Beats are more versatile, in that they can send to Elasticsearch and/or Logstash.

    Categories: Development Tags: LogstashElasticsearchBeatGo
  • Starting Logstash plugin development for Java developers

    Logstash old logo

    I recently became interested in Logstash, and after playing with it for a while, I decided to create my own custom plugin for learning purpose. I chose to pull data from Reddit because a) I use it often and b) there’s no existing plugin that offers that.

    The Elasticsearch site offers quite an exhaustive documentation to create one’s own Logstash plugin. Such endeavour requires Ruby skills - not only the language syntax but also the ecosystem. Expectedly, the site assumes the reader is familiar with both. Unfortunately, that’s not my case. I’ve been developing in Java a lot, I’ve dabbled somewhat in Scala, I’m quite interested in Kotlin - in the end, I’m just a JMV developer (plus some Javascript here and there). Long talk short, I start from scratch in Ruby.

    At this stage, there are two possible approaches:

    1. Read documentation and tutorials about Ruby, Gems, bundler, the whole nine yard and come back in a few months (or more)
    2. Or learn on the spot by diving right into development

    Given that I don’t have months, and that whatever I learned is good enough, I opted for the 2nd option. This post is a sum-up of the steps I went through, in the hopes it might benefit others who find themselves in the same situation.

    The first step is not the hardest

    Though new Logstash plugins can be started from scratch, the documentation advise to start from a template. This is explained in the online procedure. The generation yields the following structure:

    $ tree logstash-input-reddit
    ├── Gemfile
    ├── LICENSE
    ├── Rakefile
    ├── lib
    │   └── logstash
    │       └── inputs
    │           └── reddit.rb
    ├── logstash-input-reddit.gemspec
    └── spec
        └── inputs
            └── reddit_spec.rb

    Not so obviously for a Ruby newbie, this structure is one of a Ruby Gem. In general, dependencies are declared in the associated Gemfile:

    source ''

    However, in this case, the gemspec directive adds one additional indirection level. Not only dependencies, but also meta-data, are declared in the associated gemspec file. This is a feature of the Bundler utility gem.

    To install dependencies, the bundler gem first needs to be installed. Aye, there’s the rub…​

    Ruby is the limit

    Trying to install the gem yields the following:

    gem install bundler
    Fetching: bundler-1.13.6.gem (100%)
    ERROR:  While executing gem ... (TypeError)
        no implicit conversion of nil into String

    The first realization - and it took a lot of time (browsing and reading), is that there are different flavours of Ruby runtimes. Simple Ruby is not enough for Logstash plugin development: it requires a dedicated runtime that runs on the JVM aka JRuby.

    The second realization is that while it’s easy to install multiple Ruby runtimes on a machine, it’s impossible to have them run at the same time. While Homebrew makes the jruby package available, it seems there’s only one single gem repository per system and it reacts very poorly to being managed by different runtimes.

    After some more browsing, I found the solution: rbenv. It not only mangages ruby itself, but also all associated executables (gem, irb, rake, etc.) by isolating every runtime. This makes possible to run my Jekyll site with the latest 2.2.3 Ruby runtime and build the plugin with JRuby on my machine. rbenv is available via Homebrew:

    This is how it goes:

    Install rbenv

    brew install rbenv

    Configure the PATH

    echo 'eval "$(rbenv init -)"' >> ~/.bash_profile

    Source the bash profile script


    List all available runtimes
    rbenv install -l
    Available versions:
    Install the desired runtime

    rbenv install jruby-

    Configure the project to use the desired runtime
    cd logstash-input-reddit
    rbenv local jruby-
    Check it’s configured
    ruby --version

    Finally, bundler can be installed:

    gem install bundler
    Successfully installed bundler-1.13.6
    1 gem installed

    And from this point on, all required gems can be installed as well:

    bundle install
    Fetching gem metadata from
    Fetching version metadata from
    Fetching dependency metadata from
    Resolving dependencies...
    Installing rake 12.0.0
    Installing public_suffix 2.0.4
    Installing rspec-wait 0.0.9
    Installing logstash-core-plugin-api 2.1.17
    Installing logstash-codec-plain 3.0.2
    Installing logstash-devutils 1.1.0
    Using logstash-input-reddit 0.1.0 from source at `.`
    Bundle complete! 2 Gemfile dependencies, 57 gems now installed.
    Use `bundle show [gemname]` to see where a bundled gem is installed.
    Post-install message from jar-dependencies:
    if you want to use the executable lock_jars then install ruby-maven gem before using lock_jars
       $ gem install ruby-maven -v '~> 3.3.11'
    or add it as a development dependency to your Gemfile
       gem 'ruby-maven', '~> 3.3.11'

    Plugin development proper

    With those requirements finally addressed, proper plugin development can start. Let’s skip finding the right API to use to make an HTTP request in Ruby or addressing Bundler warnings when installing dependencies, the final code is quite terse:

    class LogStash::Inputs::Reddit < LogStash::Inputs::Base
      config_name 'reddit'
      default :codec, 'plain'
      config :subreddit, :validate => :string, :default => 'elastic'
      config :interval, :validate => :number, :default => 10
      def register
        @host = Socket.gethostname
        @http ='', 443)
        @get ="/r/={@subreddit}/.json")
        @http.use_ssl = true
      def run(queue)
        = we can abort the loop if stop? becomes true
        while !stop?
          response = @http.request(@get)
          json = JSON.parse(response.body)
          json['data']['children'].each do |child|
            event ='message' => child, 'host' => @host)
            queue << event
          Stud.stoppable_sleep(@interval) { stop? }

    The plugin defines two configuration parameters, which subrredit will be parsed for data and the interval between 2 calls (in seconds).

    The register method initializes the class attributes, while the run method loops over:

    • Making the HTTP call to Reddit
    • Parsing the response body as JSON
    • Making dedicated fragments from the JSON, one for each post. This is particularly important because we want to index each post separately.
    • Sending each fragment as a Logstash event for indexing

    Of course, it’s very crude, there’s no error handling, it doesn’t save the timestamp of the last read post to prevent indexing duplicates, etc. In its current state, the plugin offers a lot of room for improvement, but at least it works from a MVP point-of-view.

    Building and installing

    As written above, the plugin is a Ruby gem. It can be built as any other gem:

    gem build logstash-input-reddit

    This creates a binary file named logstash-input-reddit-0.1.0.gem - name and version both come from the Bundler’s gemspec. It can be installed using the standard Logtstash plugin installation procedure:

    bin/logstash-plugin install logstash-input-reddit-0.1.0.gem

    Downstream processing

    One huge benefit of Logstash is the power of its processing pipeline. The plugin is designed to produce raw data, but the indexing should handle each field separately. Extracting fields from another field can be achieved with the mutate filter.

    Here’s one Logstash configuration snippet example, to fill some relevant fields (and to remove message):

      mutate {
        add_field => {
          "kind" => "%{message[kind]}"
          "subreddit" => "%{message[data][subreddit]}"
          "domain" => "%{message[data][domain]}"
          "selftext" => "%{message[data][selftext]}"
          "url" => "%{message[data][url]}"
          "title" => "%{message[data][title]}"
          "id" => "%{message[data][id]}"
          "author" => "%{message[data][author]}"
          "score" => "%{message[data][score]}"
          "created_utc" => "%{message[data][created_utc]}"
        remove_field => [ "message" ]

    Once the plugin has been built and installed, Logstash can be run with a config file that includes the previous snippet. It should yield something akin to the following - when used in conjunction with the rubydebug codec:

           "selftext" => "",
               "kind" => "t3",
             "author" => "nfrankel",
              "title" => "Structuring data with Logstash",
          "subreddit" => "elastic",
                "url" => "",
               "tags" => [],
              "score" => "9",
         "@timestamp" => 2016-12-07T22:32:03.320Z,
             "domain" => "",
               "host" => "LSNM33795267A",
           "@version" => "1",
                 "id" => "5f66bk",
        "created_utc" => "1.473948927E9"


    Starting from near-zero kwnowledge about the Ruby ecosystem, I’m quite happy of the result.

    The only thing I couldn’t achieve was to add 3rd libraries (like rest-client), Logstash kept complaining about not being able to install the plugin because of a missing dependency. Falling back to standard HTTP calls solved the issue.

    Also, note that the default template has some warnings on install, but they can be fixed quite easily:

    • The license should read Apache-2.0 instead of Apache License (2.0)
    • Dependencies version are open-ended ('>= 0') whereas they should be more limited i.e. '~> 2'
    • Some meta-data is missing, like the homepage

    I hope this post will be useful to other Java developers wanting to develop their own Logstash plugin.

    Categories: Development Tags: Logstashpluginruby