Skip to content

Commit

Permalink
Move tcp protocol generator to packetbeat (#3447)
Browse files Browse the repository at this point in the history
The tcp procol generator is packetbeat specific. Similar to module and metricset generator it belongs inside the beat.

* The generator was migrated from cookiecutter to a python script to not have additional dependency.
* A makefile target was added to simplify the generation

In the future collect should fetch all protocols and add them automatically to the import to have it the same as for metricbeat. In addition it should be possible based on the global generator to create a packetbeat "shell" to put in own protocols.
  • Loading branch information
ruflin authored and Steffen Siering committed Jan 24, 2017
1 parent 58ed51f commit 868b622
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 50 deletions.
6 changes: 0 additions & 6 deletions generate/packetbeat/tcp-protocol/cookiecutter.json

This file was deleted.

4 changes: 4 additions & 0 deletions packetbeat/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ fields:
.PHONY: benchmark
benchmark:
go test -short -bench=. ./... -cpu=2

.PHONY: create-tcp-protocol
create-tcp-protocol:
python scripts/create_tcp_protocol.py
87 changes: 87 additions & 0 deletions packetbeat/scripts/create_tcp_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import os
import argparse

# Creates a tcp protocol

protocol = ""
plugin_type = ""
plugin_var = ""

def generate_protocol():
read_input()
process_file()

def read_input():
"""Requests input form the command line for empty variables if needed.
"""
global protocol, plugin_type, plugin_var

if protocol == "":
protocol = raw_input("Protocol Name [exampletcp]: ") or "exampletcp"

protocol = protocol.lower()

plugin_type = protocol + "Plugin"
plugin_var = protocol[0] + "p"


def process_file():

# Load path information
generator_path = os.path.dirname(os.path.realpath(__file__))
go_path = os.environ['GOPATH']

for root, dirs, files in os.walk(generator_path + '/tcp-protocol/{protocol}'):

for file in files:

full_path = root + "/" + file

## load file
content = ""
with open(full_path) as f:
content = f.read()

# process content
content = replace_variables(content)

# Write new path
new_path = replace_variables(full_path).replace(".go.tmpl", ".go")

# remove generator info from path
file_path = new_path.replace(generator_path + "/tcp-protocol/", "")

# New file path to write file content to
write_file = "protos/" + file_path

# Create parent directory if it does not exist yet
dir = os.path.dirname(write_file)
if not os.path.exists(dir):
os.makedirs(dir)

# Write file to new location
with open(write_file, 'w') as f:
f.write(content)

def replace_variables(content):
"""Replace all template variables with the actual values
"""
return content.replace("{protocol}", protocol) \
.replace("{plugin_var}", plugin_var) \
.replace("{plugin_type}", plugin_type)


if __name__ == "__main__":

parser = argparse.ArgumentParser(description="Creates a beat")
parser.add_argument("--protocol", help="Protocol name")

args = parser.parse_args()


if args.protocol is not None:
protocol = args.protocol

generate_protocol()


Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ want to create the protocol analyzer (stand-alone, within packetbeat based
project or packetbeat itself):

```
cookiecutter ${GOPATH}/src/github.com/elastic/beats/generate/packetbeat/tcp-protocol
python ${GOPATH}/src/github.com/elastic/beats/packetbeat/scripts/create_tcp_protocol.py
```

Note: If you have multiple go paths use `${GOPATH%%:*}`instead of `${GOPATH}`.

This requires [python](https://www.python.org/downloads/) and [cookiecutter](https://github.com/audreyr/cookiecutter) to be installed. More details on how to install cookiecutter can be found [here](http://cookiecutter.readthedocs.io/en/latest/installation.html).
This requires [python](https://www.python.org/downloads/) to be installed.

## Tutorial (TODO):

Expand Down Expand Up @@ -103,7 +103,7 @@ Create analyzer skeleton from code generator template.

```
$ cd ${GOPATH}/src/github.com/elastic/beats/packetbeat/protos
$ cookiecutter ${GOPATH}/src/github.com/elastic/beats/generate/packetbeat/tcp-protocol
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/script/create_tcp_protocol.py
```

Load plugin into packetbeat by adding `_ "github.com/elastic/beats/packetbeat/protos/echo"` to packetbeat import list in `$GOPATH/src/github.com/elastic/beats/packetbeat/main.go`
Expand Down Expand Up @@ -152,7 +152,7 @@ Create protocol analyzer module (use name ‘echo’ for new protocol):
```
$ mkdir proto
$ cd proto
$ cookiecutter ${GOPATH}/src/github.com/elastic/beats/generate/packetbeat/tcp-protocol
$ python ${GOPATH}/src/github.com/elastic/beats/packetbeat/script/create_tcp_protocol.py
```

### 3 Implement application layer analyzer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package {{ cookiecutter.module }}
package {protocol}

import (
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
)

type {{ cookiecutter.module }}Config struct {
type {protocol}Config struct {
config.ProtocolCommon `config:",inline"`
}

var (
defaultConfig = {{ cookiecutter.module }}Config{
defaultConfig = {protocol}Config{
ProtocolCommon: config.ProtocolCommon{
TransactionTimeout: protos.DefaultTransactionExpiration,
},
}
)

func (c *{{ cookiecutter.module }}Config) Validate() error {
func (c *{protocol}Config) Validate() error {
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package {{ cookiecutter.module }}
package {protocol}

import (
"errors"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package {{ cookiecutter.module }}
package {protocol}

import (
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -42,7 +42,7 @@ func (pub *transPub) createEvent(requ, resp *message) common.MapStr {

event := common.MapStr{
"@timestamp": common.Time(requ.Ts),
"type": "{{ cookiecutter.module }}",
"type": "{protocol}",
"status": status,
"responsetime": responseTime,
"bytes_in": requ.Size,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package {{ cookiecutter.module }}
package {protocol}

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package {{ cookiecutter.module }}
package {protocol}

import (
"time"
Expand All @@ -11,8 +11,8 @@ import (
"github.com/elastic/beats/packetbeat/publish"
)

// {{ cookiecutter.plugin_type }} application level protocol analyzer plugin
type {{ cookiecutter.plugin_type }} struct {
// {plugin_type} application level protocol analyzer plugin
type {plugin_type} struct {
ports protos.PortsConfig
parserConfig parserConfig
transConfig transactionConfig
Expand All @@ -31,24 +31,24 @@ type stream struct {
}

var (
debugf = logp.MakeDebug("{{ cookiecutter.module }}")
debugf = logp.MakeDebug("{protocol}")

// use isDebug/isDetailed to guard debugf/detailedf to minimize allocations
// (garbage collection) when debug log is disabled.
isDebug = false
)

func init() {
protos.Register("{{ cookiecutter.module }}", New)
protos.Register("{protocol}", New)
}

// New create and initializes a new {{ cookiecutter.protocol }} protocol analyzer instance.
// New create and initializes a new {protocol} protocol analyzer instance.
func New(
testMode bool,
results publish.Transactions,
cfg *common.Config,
) (protos.Plugin, error) {
p := &{{ cookiecutter.plugin_type }}{}
p := &{plugin_type}{}
config := defaultConfig
if !testMode {
if err := cfg.Unpack(&config); err != nil {
Expand All @@ -62,33 +62,33 @@ func New(
return p, nil
}

func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) init(results publish.Transactions, config *{{ cookiecutter.module }}Config) error {
if err := {{ cookiecutter.plugin_var }}.setFromConfig(config); err != nil {
func ({plugin_var} *{plugin_type}) init(results publish.Transactions, config *{protocol}Config) error {
if err := {plugin_var}.setFromConfig(config); err != nil {
return err
}
{{ cookiecutter.plugin_var }}.pub.results = results
{plugin_var}.pub.results = results

isDebug = logp.IsDebug("http")
return nil
}

func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) setFromConfig(config *{{ cookiecutter.module }}Config) error {
func ({plugin_var} *{plugin_type}) setFromConfig(config *{protocol}Config) error {

// set module configuration
if err := {{ cookiecutter.plugin_var }}.ports.Set(config.Ports); err != nil {
if err := {plugin_var}.ports.Set(config.Ports); err != nil {
return err
}

// set parser configuration
parser := &{{ cookiecutter.plugin_var }}.parserConfig
parser := &{plugin_var}.parserConfig
parser.maxBytes = tcp.TCPMaxDataInStream

// set transaction correlator configuration
trans := &{{ cookiecutter.plugin_var }}.transConfig
trans := &{plugin_var}.transConfig
trans.transactionTimeout = config.TransactionTimeout

// set transaction publisher configuration
pub := &{{ cookiecutter.plugin_var }}.pub
pub := &{plugin_var}.pub
pub.sendRequest = config.SendRequest
pub.sendResponse = config.SendResponse

Expand All @@ -97,73 +97,73 @@ func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) setFromConf

// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) ConnectionTimeout() time.Duration {
return {{ cookiecutter.plugin_var }}.transConfig.transactionTimeout
func ({plugin_var} *{plugin_type}) ConnectionTimeout() time.Duration {
return {plugin_var}.transConfig.transactionTimeout
}

// GetPorts returns the ports numbers packets shall be processed for.
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) GetPorts() []int {
return {{ cookiecutter.plugin_var }}.ports.Ports
func ({plugin_var} *{plugin_type}) GetPorts() []int {
return {plugin_var}.ports.Ports
}

// Parse processes a TCP packet. Return nil if connection
// state shall be dropped (e.g. parser not in sync with tcp stream)
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) Parse(
func ({plugin_var} *{plugin_type}) Parse(
pkt *protos.Packet,
tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
defer logp.Recover("Parse {{ cookiecutter.plugin_type }} exception")
defer logp.Recover("Parse {plugin_type} exception")

conn := {{ cookiecutter.plugin_var }}.ensureConnection(private)
conn := {plugin_var}.ensureConnection(private)
st := conn.streams[dir]
if st == nil {
st = &stream{}
st.parser.init(&{{ cookiecutter.plugin_var }}.parserConfig, func(msg *message) error {
st.parser.init(&{plugin_var}.parserConfig, func(msg *message) error {
return conn.trans.onMessage(tcptuple.IPPort(), dir, msg)
})
conn.streams[dir] = st
}

if err := st.parser.feed(pkt.Ts, pkt.Payload); err != nil {
debugf("%v, dropping TCP stream for error in direction %v.", err, dir)
{{ cookiecutter.plugin_var }}.onDropConnection(conn)
{plugin_var}.onDropConnection(conn)
return nil
}
return conn
}

// ReceivedFin handles TCP-FIN packet.
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) ReceivedFin(
func ({plugin_var} *{plugin_type}) ReceivedFin(
tcptuple *common.TCPTuple, dir uint8,
private protos.ProtocolData,
) protos.ProtocolData {
return private
}

// GapInStream handles lost packets in tcp-stream.
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) GapInStream(tcptuple *common.TCPTuple, dir uint8,
func ({plugin_var} *{plugin_type}) GapInStream(tcptuple *common.TCPTuple, dir uint8,
nbytes int,
private protos.ProtocolData,
) (protos.ProtocolData, bool) {
conn := getConnection(private)
if conn != nil {
{{ cookiecutter.plugin_var }}.onDropConnection(conn)
{plugin_var}.onDropConnection(conn)
}

return nil, true
}

// onDropConnection processes and optionally sends incomplete
// transaction in case of connection being dropped due to error
func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) onDropConnection(conn *connection) {
func ({plugin_var} *{plugin_type}) onDropConnection(conn *connection) {
}

func ({{ cookiecutter.plugin_var }} *{{ cookiecutter.plugin_type }}) ensureConnection(private protos.ProtocolData) *connection {
func ({plugin_var} *{plugin_type}) ensureConnection(private protos.ProtocolData) *connection {
conn := getConnection(private)
if conn == nil {
conn = &connection{}
conn.trans.init(&{{ cookiecutter.plugin_var }}.transConfig, {{ cookiecutter.plugin_var }}.pub.onTransaction)
conn.trans.init(&{plugin_var}.transConfig, {plugin_var}.pub.onTransaction)
}
return conn
}
Expand All @@ -180,11 +180,11 @@ func getConnection(private protos.ProtocolData) *connection {

priv, ok := private.(*connection)
if !ok {
logp.Warn("{{ cookiecutter.module }} connection type error")
logp.Warn("{protocol} connection type error")
return nil
}
if priv == nil {
logp.Warn("Unexpected: {{ cookiecutter.module }} connection data not set")
logp.Warn("Unexpected: {protocol} connection data not set")
return nil
}
return priv
Expand Down

2 comments on commit 868b622

@rwaweber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this commit removes the dependency on cookiecutter.

I noticed that the docs located here reflect this change, but those located here do not. Hope this helps!

@ruflin
Copy link
Member Author

@ruflin ruflin commented on 868b622 Jan 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rwaweber The reason for the difference is that the second link goes to current which is today 5.1 branch. This branch doc must be updated with a line to checkout the 5.1 branch (will happen this week). If you use this link instead which points to master, you should see the new docs. https://www.elastic.co/guide/en/beats/libbeat/master/newbeat-generate.html#newbeat-generate

Please sign in to comment.