Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation for multiple outputs #59

Merged
merged 7 commits into from
Aug 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pkg/
tivan
.vagrant
telegraf
106 changes: 58 additions & 48 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@ package telegraf
import (
"fmt"
"log"
"net/url"
"os"
"sort"
"sync"
"time"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/telegraf/outputs"
"github.com/influxdb/telegraf/plugins"
)

type runningOutput struct {
name string
output outputs.Output
}

type runningPlugin struct {
name string
plugin plugins.Plugin
Expand All @@ -31,9 +35,8 @@ type Agent struct {

Config *Config

outputs []*runningOutput
plugins []*runningPlugin

conn *client.Client
}

// NewAgent returns an Agent struct based off the given Config
Expand All @@ -54,39 +57,42 @@ func NewAgent(config *Config) (*Agent, error) {
agent.Hostname = hostname
}

if config.Tags == nil {
config.Tags = map[string]string{}
}

config.Tags["host"] = agent.Hostname

return agent, nil
}

// Connect connects to the agent's config URL
func (a *Agent) Connect() error {
config := a.Config

u, err := url.Parse(config.URL)
if err != nil {
return err
for _, o := range a.outputs {
err := o.output.Connect(a.Hostname)
if err != nil {
return err
}
}
return nil
}

c, err := client.NewClient(client.Config{
URL: *u,
Username: config.Username,
Password: config.Password,
UserAgent: config.UserAgent,
Timeout: config.Timeout.Duration,
})
func (a *Agent) LoadOutputs() ([]string, error) {
var names []string

if err != nil {
return err
for _, name := range a.Config.OutputsDeclared() {
creator, ok := outputs.Outputs[name]
if !ok {
return nil, fmt.Errorf("Undefined but requested output: %s", name)
}

output := creator()

err := a.Config.ApplyOutput(name, output)
if err != nil {
return nil, err
}

a.outputs = append(a.outputs, &runningOutput{name, output})
names = append(names, name)
}

a.conn = c
sort.Strings(names)

return nil
return names, nil
}

// LoadPlugins loads the agent's plugins
Expand Down Expand Up @@ -144,17 +150,14 @@ func (a *Agent) crankParallel() error {

close(points)

var acc BatchPoints
acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database
var bp BatchPoints
bp.Time = time.Now()

for sub := range points {
acc.Points = append(acc.Points, sub.Points...)
bp.Points = append(bp.Points, sub.Points...)
}

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&bp)
}

func (a *Agent) crank() error {
Expand All @@ -171,12 +174,9 @@ func (a *Agent) crank() error {
}
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

_, err := a.conn.Write(acc.BatchPoints)
return err
return a.flush(&acc)
}

func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) error {
Expand All @@ -194,11 +194,12 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
return err
}

acc.Tags = a.Config.Tags
acc.Time = time.Now()
acc.Database = a.Config.Database

a.conn.Write(acc.BatchPoints)
err = a.flush(&acc)
if err != nil {
return err
}

select {
case <-shutdown:
Expand All @@ -209,6 +210,22 @@ func (a *Agent) crankSeparate(shutdown chan struct{}, plugin *runningPlugin) err
}
}

func (a *Agent) flush(bp *BatchPoints) error {
var wg sync.WaitGroup
var outerr error
for _, o := range a.outputs {
wg.Add(1)
go func(ro *runningOutput) {
defer wg.Done()
outerr = ro.output.Write(bp.BatchPoints)
}(o)
}

wg.Wait()

return outerr
}

// TestAllPlugins verifies that we can 'Gather' from all plugins with the
// default configuration
func (a *Agent) TestAllPlugins() error {
Expand Down Expand Up @@ -267,13 +284,6 @@ func (a *Agent) Test() error {

// Run runs the agent daemon, gathering every Interval
func (a *Agent) Run(shutdown chan struct{}) error {
if a.conn == nil {
err := a.Connect()
if err != nil {
return err
}
}

var wg sync.WaitGroup

for _, plugin := range a.plugins {
Expand Down
14 changes: 8 additions & 6 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/influxdb/telegraf"
_ "github.com/influxdb/telegraf/outputs/all"
_ "github.com/influxdb/telegraf/plugins/all"
)

Expand Down Expand Up @@ -61,6 +62,11 @@ func main() {
ag.Debug = true
}

outputs, err := ag.LoadOutputs()
if err != nil {
log.Fatal(err)
}

plugins, err := ag.LoadPlugins()
if err != nil {
log.Fatal(err)
Expand Down Expand Up @@ -96,19 +102,15 @@ func main() {
close(shutdown)
}()

log.Print("InfluxDB Agent running")
log.Print("Telegraf Agent running")
log.Printf("Loaded outputs: %s", strings.Join(outputs, " "))
log.Printf("Loaded plugins: %s", strings.Join(plugins, " "))
if ag.Debug {
log.Printf("Debug: enabled")
log.Printf("Agent Config: Interval:%s, Debug:%#v, Hostname:%#v\n",
ag.Interval, ag.Debug, ag.Hostname)
}

if config.URL != "" {
log.Printf("Sending metrics to: %s", config.URL)
log.Printf("Tags enabled: %v", config.ListTags())
}

if *fPidfile != "" {
f, err := os.Create(*fPidfile)
if err != nil {
Expand Down
83 changes: 44 additions & 39 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,25 @@ func (d *Duration) UnmarshalTOML(b []byte) error {
return nil
}

// Config specifies the URL/user/password for the database that telegraf
// Config specifies the outputs that telegraf
// will be logging to, as well as all the plugins that the user has
// specified
type Config struct {
URL string
Username string
Password string
Database string
UserAgent string
Timeout Duration
Tags map[string]string

agent *ast.Table
plugins map[string]*ast.Table
outputs map[string]*ast.Table
}

// Plugins returns the configured plugins as a map of name -> plugin toml
func (c *Config) Plugins() map[string]*ast.Table {
return c.plugins
}

// Outputs returns the configured outputs as a map of name -> output toml
func (c *Config) Outputs() map[string]*ast.Table {
return c.outputs
}

// ConfiguredPlugin containing a name, interval, and drop/pass prefix lists
type ConfiguredPlugin struct {
Name string
Expand Down Expand Up @@ -86,6 +84,15 @@ func (cp *ConfiguredPlugin) ShouldPass(measurement string) bool {
return true
}

// ApplyOutput loads the toml config into the given interface
func (c *Config) ApplyOutput(name string, v interface{}) error {
if c.outputs[name] != nil {
return toml.UnmarshalTable(c.outputs[name], v)
}

return nil
}

// ApplyAgent loads the toml config into the given interface
func (c *Config) ApplyAgent(v interface{}) error {
if c.agent != nil {
Expand Down Expand Up @@ -151,15 +158,24 @@ func (c *Config) ApplyPlugin(name string, v interface{}) (*ConfiguredPlugin, err

// PluginsDeclared returns the name of all plugins declared in the config.
func (c *Config) PluginsDeclared() []string {
var plugins []string
return declared(c.plugins)
}

// OutputsDeclared returns the name of all outputs declared in the config.
func (c *Config) OutputsDeclared() []string {
return declared(c.outputs)
}

func declared(endpoints map[string]*ast.Table) []string {
var names []string

for name := range c.plugins {
plugins = append(plugins, name)
for name, _ := range endpoints {
names = append(names, name)
}

sort.Strings(plugins)
sort.Strings(names)

return plugins
return names
}

// DefaultConfig returns an empty default configuration
Expand All @@ -183,6 +199,7 @@ func LoadConfig(path string) (*Config, error) {

c := &Config{
plugins: make(map[string]*ast.Table),
outputs: make(map[string]*ast.Table),
}

for name, val := range tbl.Fields {
Expand All @@ -192,13 +209,16 @@ func LoadConfig(path string) (*Config, error) {
}

switch name {
case "influxdb":
err := toml.UnmarshalTable(subtbl, c)
if err != nil {
return nil, err
}
case "agent":
c.agent = subtbl
case "outputs":
for outputName, outputVal := range subtbl.Fields {
outputSubtbl, ok := outputVal.(*ast.Table)
if !ok {
return nil, errInvalidConfig
}
c.outputs[outputName] = outputSubtbl
}
default:
c.plugins[name] = subtbl
}
Expand All @@ -207,20 +227,6 @@ func LoadConfig(path string) (*Config, error) {
return c, nil
}

// ListTags returns a string of tags specified in the config,
// line-protocol style
func (c *Config) ListTags() string {
var tags []string

for k, v := range c.Tags {
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
}

sort.Strings(tags)

return strings.Join(tags, " ")
}

type hasConfig interface {
BasicConfig() string
}
Expand Down Expand Up @@ -253,8 +259,11 @@ var header = `# Telegraf configuration
# NOTE: The configuration has a few required parameters. They are marked
# with 'required'. Be sure to edit those to make this configuration work.

# OUTPUTS
[outputs]

# Configuration for influxdb server to send metrics to
[influxdb]
[outputs.influxdb]
# The full HTTP endpoint URL for your InfluxDB instance
url = "http://localhost:8086" # required.

Expand All @@ -271,12 +280,8 @@ database = "telegraf" # required.

# Set the user agent for the POSTs (can be useful for log differentiation)
# user_agent = "telegraf"
# tags = { "dc": "us-east-1" }

# Tags can also be specified via a normal map, but only one form at a time:

# [influxdb.tags]
# dc = "us-east-1"
# tags = { "dc" = "us-east-1" }

# Configuration for telegraf itself
# [agent]
Expand Down
Loading