/ LOGSTASH, ELASTICSEARCH, BEAT, GO

Starting Beats for Java developers

Last week, I wrote about how one could start developing one’s Logstash plugin coming from a Java developer background. However, with the acquisition of Packetbeat, Logstash now has help from Beats to push data to Elasticsearch. Beats are developed in Go, another challenge for traditional Java developers. This week, I tried porting my Logstash Reddit plugin to a dedicated Beat. This post documents my findings (spoiler: I found it much easier than with Ruby).

Setting up the environment

On OSX, installing the go executable is easy as pie:

brew install go

While Java or Ruby (or any language I know for that matter) can reside anywhere on the local filesystem, Go projects must all be situated in one single dedicated location, available under the $GOPATH environment variable.

Creating the project

As for Logstash plugins, Beats projects can be created from a template. The documentation to do that is quite straightforward. Given Go’s rigid requirements regarding location on the filesystem, just following instructions yields a new ready-to-use Go project.

The default template code will repeatedly send an event with an incremented counter in the console:

./redditbeat -e -d "*"
2016/12/13 22:55:56.013362 beat.go:267: INFO
  Home path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat]
  Config path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat]
  Data path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/data]
  Logs path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/logs]
2016/12/13 22:55:56.013390 beat.go:177: INFO Setup Beat: redditbeat; Version: 6.0.0-alpha1
2016/12/13 22:55:56.013402 processor.go:43: DBG  Processors:
2016/12/13 22:55:56.013413 beat.go:183: DBG  Initializing output plugins
2016/12/13 22:55:56.013417 logp.go:219: INFO Metrics logging every 30s
2016/12/13 22:55:56.013518 output.go:167: INFO Loading template enabled.
Reading template file:
  /Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/redditbeat.template.json
2016/12/13 22:55:56.013888 output.go:178: INFO Loading template enabled for Elasticsearch 2.x.
Reading template file:
  /Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/redditbeat.template-es2x.json
2016/12/13 22:55:56.014229 client.go:120: INFO Elasticsearch url: http://localhost:9200
2016/12/13 22:55:56.014272 outputs.go:106: INFO Activated elasticsearch as output plugin.
2016/12/13 22:55:56.014279 publish.go:234: DBG  Create output worker
2016/12/13 22:55:56.014312 publish.go:276: DBG  No output is defined to store the topology.
  The server fields might not be filled.
2016/12/13 22:55:56.014326 publish.go:291: INFO Publisher name: LSNM33795267A
2016/12/13 22:55:56.014386 async.go:63: INFO Flush Interval set to: 1s
2016/12/13 22:55:56.014391 async.go:64: INFO Max Bulk Size set to: 50
2016/12/13 22:55:56.014395 async.go:72: DBG  create bulk processing worker (interval=1s, bulk size=50)
2016/12/13 22:55:56.014449 beat.go:207: INFO redditbeat start running.
2016/12/13 22:55:56.014459 redditbeat.go:38: INFO redditbeat is running! Hit CTRL-C to stop it.
2016/12/13 22:55:57.370781 client.go:184: DBG  Publish: {
  "@timestamp": "2016-12-13T22:54:47.252Z",
  "beat": {
    "hostname": "LSNM33795267A",
    "name": "LSNM33795267A",
    "version": "6.0.0-alpha1"
  },
  "counter": 1,
  "type": "redditbeat"
}

Regarding command-line parameters: -e logs to the standard err, while -d "*" enables all debugging selectors. For the full list of parameters, type ./redditbeat --help.

The code

Go code is located in .go files (what a surprise…​) in the project sub-folder of the $GOPATH/src folder.

Configuration type

The first interesting file is config/config.go which defines a struct to declare possible parameters for the Beat. As for the former Logstash plugin, let’s add a subreddit parameter, and sets it default value:

type Config struct {
    Period time.Duration `config:"period"`
    Subreddit string `config:"subreddit"`
}

var DefaultConfig = Config {
    Period: 15 * time.Second,
    Subreddit: "elastic",
}

Beater type

Code for the Beat itself is found in beater/redditbean.go. The default template creates a struct for the Beat and three functions:

  1. The Beat constructor - it reads the configuration:
    func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { ... }
  2. The Run function - should loop over the main feature of the Beat:
    func (bt *Redditbeat) Run(b *beat.Beat) error { ... }
  3. The Stop function handles graceful shutdown:
    func (bt *Redditbeat) Stop() { ... }

There’s no explicit interface implementation in Go. Just implementing all methods from an interface creates an implicit inheritance relationship. For documentation purposes, here’s the Beater interface:

type Beater interface {
    Run(b *Beat) error
    Stop()
}

Hence, since the Beat struct implements both Run and Stop, it is a Beater.

There’s no concept of class in Go, so methods cannot be declared on a concrete type. However, it exists the concept of extension functions: functions that can add behavior to a type (inside a single package). It needs to declare the receiver type: this is done between the fun keyword and the function name - here, it’s the Redditbeat type (or more correctly, a pointer to the Redditbeat type, but there’s an implicit conversion).

The constructor and the Stop function can stay as they are, whatever feature should be developed must be in the Run function. In this case, the feature is to call the Reddit REST API and send a message for every Reddit post.

The final code looks like the following:

func (bt *Redditbeat) Run(b *beat.Beat) error {
    bt.client = b.Publisher.Connect()
    ticker := time.NewTicker(bt.config.Period)
    reddit := "https://www.reddit.com/r/" + bt.config.Subreddit + "/.json"      (1)
    client := &http.Client {}                                                   (2)
    for {
        select {
        case <-bt.done:
            return nil
        case <-ticker.C:
        }
        req, reqErr := http.NewRequest("GET", reddit, nil)                      (3)
        req.Header.Add("User-Agent", "Some existing header to bypass 429 HTTP") (4)
        if (reqErr != nil) {                                                    (5)
            panic(reqErr)                                                       (6)
        }
        resp, getErr := client.Do(req)                                          (7)
        if (getErr != nil) {
            panic(getErr)
        }
        body, readErr := ioutil.ReadAll(resp.Body)                              (8)
        defer resp.Body.Close()                                                 (9)
        if (readErr != nil) {
            panic(readErr)
        }
        trimmedBody := body[len(prefix):len(body) - len(suffix)]                (10)
        messages := strings.Split(string(trimmedBody), separator)               (11)
        for i := 0; i < len(messages); i ++ {
            event := common.MapStr{                                             (12)
                "@timestamp": common.Time(time.Now()),
                "type":       b.Name,
                "message":    "{" + messages[i] + "}",
            }
            bt.client.PublishEvent(event)                                       (13)
        }
    }
}

Here’s an explanation of the most important pieces:

1 Create the Reddit REST URL by concatenating Strings, including the configuration Subreddit parameter. Remember that its default value has been defined in the config.go file.
2 Get a reference on a new HTTP client type
3 Create a new HTTP request. Note that Go allows for multiple return values.
4 If no standard request header is set, Reddit’s API will return a 429 status code
5 Go standard errors are not handled through exceptions but are returned along regular returned values. According to the Golang wiki:
Indicating error conditions to callers should be done by returning error value
6 The panic() function is similar to throwing an exception in Java, climbing up the stack until it’s handled. For more information, check the relevant documentation.
7 Execute the HTTP request
8 Read the response body into a byte array
9 Close the body stream. Note the defer keyword:
A defer statement defers the execution of a function until the surrounding function returns.
10 Create a slice - a reference to a part of an array, of the whole response body byte array. In essence, it removes the prefix and the suffix to keep the relevant JSON value. It would be overkill to parse the byte array into JSON.
11 Split the slice to get each JSON fragment separately
12 Create the message as a simple dictionary structure
13 Send it

Configure, Build, and launch

Default configuration parameters can be found in the redditbeat.yml file at the root of the project. Note that additional common Beat parameters are listed in the redditbeat.full.yml, along with relevant comments.

The interesting thing about Beats is that their messages can be sent directly to Elasticsearch or to Logstash for further processing. This is configured in the aforementioned configuration file.

redditbeat:
  period: 10s
output.elasticsearch:
  hosts: ["localhost:9200"]
output.logstash:
  hosts: ["localhost:5044"]
  enabled: true

This configuration snippet will loop over the Run method every 10 seconds and send messages to the Logstash instance running on localhost on port 5044. This can be overridden when running the Beat (see below).

For Logstash to accept messages from Beats, the Logstash Beat plugin must be installed and Logstash input must be configured for Beats:
input {
  beats {
    port => 5044
  }
}

To build the project, type make at the project’s root. It will create an executable that can be run.

./redditbeat -e -E redditbeat.subreddit=java

The -E flag may override parameters found in the embedded redditbeat.yml configuration file (see above). Here, it sets the subreddit to be read to "java" instead of the default "elastic".

The output looks like the following:

2016/12/17 14:51:19.748329 client.go:184: DBG  Publish: {
  "@timestamp": "2016-12-17T14:51:19.748Z",
  "beat": {
    "hostname": "LSNM33795267A",
    "name": "LSNM33795267A",
    "version": "6.0.0-alpha1"
  },
  "message": "{
    \"kind\": \"t3\", \"data\": {
      \"contest_mode\": false, \"banned_by\": null,
      \"domain\": \"blogs.oracle.com\", \"subreddit\": \"java\", \"selftext_html\": null,
      \"selftext\": \"\", \"likes\": null, \"suggested_sort\": null, \"user_reports\": [],
      \"secure_media\": null, \"saved\": false, \"id\": \"5ipzgq\", \"gilded\": 0,
      \"secure_media_embed\": {}, \"clicked\": false, \"report_reasons\": null,
      \"author\": \"pushthestack\", \"media\": null, \"name\": \"t3_5ipzgq\", \"score\": 11,
      \"approved_by\": null, \"over_18\": false, \"removal_reason\": null, \"hidden\": false,
      \"thumbnail\": \"\", \"subreddit_id\": \"t5_2qhd7\", \"edited\": false,
      \"link_flair_css_class\": null, \"author_flair_css_class\": null, \"downs\": 0,
      \"mod_reports\": [], \"archived\": false, \"media_embed\": {}, \"is_self\": false,
      \"hide_score\": false, \"spoiler\": false,
      \"permalink\": \"/r/java/comments/5ipzgq/jdk_9_will_no_longer_bundle_javadb/\",
      \"locked\": false, \"stickied\": false, \"created\": 1481943248.0,
      \"url\": \"https://blogs.oracle.com/java-platform-group/entry/deferring_to_derby_in_jdk\",
      \"author_flair_text\": null, \"quarantine\": false,
      \"title\": \"JDK 9 will no longer bundle JavaDB\", \"created_utc\": 1481914448.0,
      \"link_flair_text\": null, \"distinguished\": null, \"num_comments\": 4,
      \"visited\": false, \"num_reports\": null, \"ups\": 11
    }
  }",
  "type": "redditbeat"
}

Conclusion

Strangely enough, I found developing a Beat easier than a Logstash plugin. Go is more low-level and some concepts feel really foreign (like implicit interface implementation), but the ecosystem is much simpler - as the language is more recent. Also, Beats are more versatile, in that they can send to Elasticsearch and/or Logstash.

The complete source code for this post can be found on Github.
Nicolas Fränkel

Nicolas Fränkel

Developer Advocate with 15+ years experience consulting for many different customers, in a wide range of contexts (such as telecoms, banking, insurances, large retail and public sector). Usually working on Java/Java EE and Spring technologies, but with focused interests like Rich Internet Applications, Testing, CI/CD and DevOps. Also double as a trainer and triples as a book author.

Read More
Starting Beats for Java developers
Share this