Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] One shot mode (backport #25972) #28421

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Bundle synthetics deps with heartbeat docker image. {pull}23274[23274]
- Add mime type detection for http responses. {pull}22976[22976]
- Support JSON expressions / validation of JSON arrays. {pull}28073[28073]
- Experimental 'run once' mode. {pull}25972[25972]

*Journalbeat*

Expand Down
12 changes: 12 additions & 0 deletions heartbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ heartbeat.monitors:
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
#service.name: my-apm-service-name

# Experimental: Configure monitors that run exactly once.
# If enabled, heartbeat.monitors will be ignored
# Heartbeat will run these monitors once then exit.
#heartbeat.run_once:
#- type: http
#id: my-monitor
#name: My Monitor
#urls: ["http://localhost:9200"]
# NOTE: you must still provide the schedule field! Heartbeat
# Uses this to determine the contents of the monitor.timespan field
#schedule: '@every 10s'

{{header "Elasticsearch template setting"}}

setup.template.settings:
Expand Down
72 changes: 71 additions & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package beater
import (
"errors"
"fmt"
"sync"
"syscall"
"time"

"github.com/elastic/beats/v7/heartbeat/config"
"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/scheduler"
"github.com/elastic/beats/v7/libbeat/autodiscover"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -34,6 +37,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/x-pack/functionbeat/function/core"

_ "github.com/elastic/beats/v7/libbeat/processors/script"
)
Expand Down Expand Up @@ -81,10 +85,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// Run executes the beat.
func (bt *Heartbeat) Run(b *beat.Beat) error {
logp.Info("heartbeat is running! Hit CTRL-C to stop it.")

groups, _ := syscall.Getgroups()
logp.Info("Effective user/group ids: %d/%d, with groups: %v", syscall.Geteuid(), syscall.Getegid(), groups)

if bt.config.RunOnce != nil {
err := bt.runRunOnce(b)
if err != nil {
return err
}
return nil
}

stopStaticMonitors, err := bt.RunStaticMonitors(b)
if err != nil {
return err
Expand Down Expand Up @@ -126,6 +137,65 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return nil
}

// runRunOnce runs the given config then exits immediately after any queued events have been sent to ES
func (bt *Heartbeat) runRunOnce(b *beat.Beat) error {
logp.Info("Starting run_once run. This is an experimental feature and may be changed or removed in the future!")
cfgs := bt.config.RunOnce

publishClient, err := core.NewSyncClient(logp.NewLogger("run_once mode"), b.Publisher, beat.ClientConfig{})
if err != nil {
return fmt.Errorf("could not create sync client: %w", err)
}
defer publishClient.Close()

wg := &sync.WaitGroup{}
for _, cfg := range cfgs {
err := runRunOnceSingleConfig(cfg, publishClient, wg)
if err != nil {
logp.Warn("error running run_once config: %s", err)
}
}

wg.Wait()
publishClient.Wait()

logp.Info("Ending run_once run")

return nil
}

func runRunOnceSingleConfig(cfg *common.Config, publishClient *core.SyncClient, wg *sync.WaitGroup) (err error) {
sf, err := stdfields.ConfigToStdMonitorFields(cfg)
if err != nil {
return fmt.Errorf("could not get stdmon fields: %w", err)
}
pluginFactory, exists := plugin.GlobalPluginsReg.Get(sf.Type)
if !exists {
return fmt.Errorf("no plugin for type: %s", sf.Type)
}
plugin, err := pluginFactory.Make(sf.Type, cfg)
if err != nil {
return err
}

results := plugin.RunWrapped(sf)

wg.Add(1)
go func() {
defer wg.Done()
defer plugin.Close()
for {
event := <-results
if event == nil {
break
}
publishClient.Publish(*event)
}
}()

return nil
}

// RunStaticMonitors runs the `heartbeat.monitors` portion of the yaml config if present.
func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) (stop func(), err error) {
factory := monitors.NewFactory(b.Info, bt.scheduler, true)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Config defines the structure of heartbeat.yml.
type Config struct {
// Modules is a list of module specific configuration data.
RunOnce []*common.Config `config:"run_once"`
Monitors []*common.Config `config:"monitors"`
ConfigMonitors *common.Config `config:"config.monitors"`
Scheduler Scheduler `config:"scheduler"`
Expand Down
34 changes: 34 additions & 0 deletions heartbeat/docs/heartbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,37 @@ include::monitors/monitor-tcp.asciidoc[]
include::monitors/monitor-http.asciidoc[]

include::monitors/monitor-browser.asciidoc[]

[float]
[[run-once-mode]]
=== Run Once Mode (Experimental)

You can configure {beatname_uc} run monitors exactly once then exit, bypassing the scheduler. This is referred to as running {beatname_uc} in "run once" mode. This is an experimental feature
and is subject to change.

[source,yaml]
----------------------------------------------------------------------
# heartbeat.yml
heartbeat.run_once:
- type: icmp
id: ping-myhost
name: My Host Ping
hosts: ["myhost"]
# Note that schedule is still needed to inform heartbeat when the next
# expected check is to be run. This is needed to populate the monitor.timespan field used by the Uptime app.
schedule: '@every 5s'
- type: tcp
id: myhost-tcp-echo
name: My Host TCP Echo
hosts: ["myhost:777"] # default TCP Echo Protocol
check.send: "Check"
check.receive: "Check"
schedule: '@every 5s'
- type: http
id: service-status
name: Service Status
service.name: my-apm-service-name
hosts: ["http://localhost:80/service/status"]
check.response.status: [200]
schedule: '@every 5s'
----------------------------------------------------------------------
12 changes: 12 additions & 0 deletions heartbeat/heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ heartbeat.monitors:
# Name of corresponding APM service, if Elastic APM is in use for the monitored service.
#service.name: my-apm-service-name

# Experimental: Configure monitors that run exactly once.
# If enabled, heartbeat.monitors will be ignored
# Heartbeat will run these monitors once then exit.
#heartbeat.run_once:
#- type: http
#id: my-monitor
#name: My Monitor
#urls: ["http://localhost:9200"]
# NOTE: you must still provide the schedule field! Heartbeat
# Uses this to determine the contents of the monitor.timespan field
#schedule: '@every 10s'

# ======================= Elasticsearch template setting =======================

setup.template.settings:
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func create(
js[i] = wrappers.WithURLField(u, job)
}

return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(config.Hosts)}, nil
return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil
}

func newRoundTripper(config *Config) (http.RoundTripper, error) {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
j = append(j, wrappers.WithURLField(u, job))
}

return plugin.Plugin{Jobs: j, Close: nil, Endpoints: len(jf.config.Hosts)}, nil
return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil
}

func (jf *jobFactory) pingIPFactory(config *Config) func(*net.IPAddr) jobs.Job {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func createWithResolver(
return plugin.Plugin{}, err
}

return plugin.Plugin{Jobs: js, Close: nil, Endpoints: len(jc.endpoints)}, nil
return plugin.Plugin{Jobs: js, Endpoints: len(jc.endpoints)}, nil
}

// jobFactory is where most of the logic here lives. It provides a common context around
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
return plugin.PluginFactory{
Name: "test",
Aliases: []string{"testAlias"},
Builder: func(s string, config *common.Config) (plugin.Plugin, error) {
Make: func(s string, config *common.Config) (plugin.Plugin, error) {
built.Inc()
// Declare a real config block with a required attr so we can see what happens when it doesn't work
unpacked := struct {
Expand All @@ -160,7 +160,7 @@ func mockPluginBuilder() (plugin.PluginFactory, *atomic.Int, *atomic.Int) {
closed.Inc()
return nil
}
return plugin.Plugin{Jobs: j, Close: closer, Endpoints: 1}, err
return plugin.Plugin{Jobs: j, DoClose: closer, Endpoints: 1}, err
},
Stats: plugin.NewPluginCountersRecorder("test", reg)},
built,
Expand Down
56 changes: 49 additions & 7 deletions heartbeat/monitors/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,67 @@ import (

"github.com/elastic/beats/v7/heartbeat/hbregistry"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/plugin"
)

// PluginFactory represents an uninstantiated plug in instance generated from a monitor config. Invoking the Make function creates a plug-in instance.
type PluginFactory struct {
Name string
Aliases []string
Builder PluginFactoryCreate
Make PluginMake
Stats RegistryRecorder
}

type PluginFactoryCreate func(string, *common.Config) (p Plugin, err error)
type PluginMake func(string, *common.Config) (p Plugin, err error)

// Plugin describes a configured instance of a plug-in with its jobs already instantiated.
type Plugin struct {
Jobs []jobs.Job
Close func() error
DoClose func() error
Endpoints int
}

// Close closes the plugin, invoking any DoClose hooks if avialable.
func (p Plugin) Close() error {
if p.DoClose != nil {
return p.DoClose()
}
return nil
}

// RunWrapped runs the plug-in with the provided wrappers returning a channel of resultant events.
func (p Plugin) RunWrapped(fields stdfields.StdMonitorFields) chan *beat.Event {
wj := wrappers.WrapCommon(p.Jobs, fields)
results := make(chan *beat.Event)

var runJob func(j jobs.Job)
runJob = func(j jobs.Job) {
e := &beat.Event{}
conts, err := j(e)
// No error handling since WrapCommon handles all errors
if err != nil {
panic(fmt.Sprintf("unexpected error on wrapped job!: %s", err))
}
results <- e
for _, c := range conts {
runJob(c)
}
}

go func() {
for _, j := range wj {
runJob(j)
}
close(results)
}()

return results
}

var pluginKey = "heartbeat.monitor"

// stateGlobalRecorder records statistics across all plugin types
Expand All @@ -70,7 +112,7 @@ func init() {
}

stats := statsForPlugin(p.Name)
return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Builder, stats})
return GlobalPluginsReg.Register(PluginFactory{p.Name, p.Aliases, p.Make, stats})
})
}

Expand Down Expand Up @@ -98,9 +140,9 @@ func NewPluginsReg() *PluginsReg {
}

// Register registers a new active (as opposed to passive) monitor.
func Register(name string, builder PluginFactoryCreate, aliases ...string) {
func Register(name string, make PluginMake, aliases ...string) {
stats := statsForPlugin(name)
if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, builder, stats}); err != nil {
if err := GlobalPluginsReg.Add(PluginFactory{name, aliases, make, stats}); err != nil {
panic(err)
}
}
Expand Down Expand Up @@ -161,5 +203,5 @@ func (r *PluginsReg) MonitorNames() []string {
}

func (e *PluginFactory) Create(cfg *common.Config) (p Plugin, err error) {
return e.Builder(e.Name, cfg)
return e.Make(e.Name, cfg)
}
Loading