From c60dc196bde1fe4730699a5b2b0d0eda23f71a55 Mon Sep 17 00:00:00 2001 From: guilherme Date: Sun, 28 Jan 2024 22:32:24 +0000 Subject: [PATCH] main loop + query bug fix + handlers --- cmd/main.go | 21 +++- internal/config/config.go | 4 +- internal/config/{settings.go => events.go} | 2 +- internal/config/sources.go | 2 +- internal/database/models.go | 12 +- internal/watcher/fetch.go | 4 +- internal/watcher/watcher.go | 128 ++++++++++++++------- 7 files changed, 116 insertions(+), 57 deletions(-) rename internal/config/{settings.go => events.go} (98%) diff --git a/cmd/main.go b/cmd/main.go index 88b4ea3..f62b18b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,6 +5,8 @@ import ( "github.com/gweebg/ipwatcher/internal/config" "github.com/gweebg/ipwatcher/internal/database" "github.com/gweebg/ipwatcher/internal/utils" + "github.com/gweebg/ipwatcher/internal/watcher" + "log" ) func main() { @@ -26,7 +28,7 @@ func main() { configFlags["version"] = flag.String( "version", "v4", - "version of the IP protocol, supports 'v4' | 'v6' | 'all'", + "version of the IP protocol, supports 'v4' | 'v6'", ) configFlags["config"] = flag.String( @@ -35,10 +37,10 @@ func main() { "path to the configuration file", ) - configFlags["exec"] = flag.String( + configFlags["exec"] = flag.Bool( "exec", - "on_change", - "run executable/script upon an event, supports 'on_change' | 'on_same' | 'always' | 'never'", + false, + "enable execution of configuration defined actions", ) configFlags["api"] = flag.Bool( @@ -55,6 +57,11 @@ func main() { flag.Parse() + version := configFlags["version"].(*string) + if version == nil || (*version != "v4" && *version != "v6") { + log.Fatalf("flag 'version' must be either 'v4' or 'v6', not '%v'\n", *version) + } + config.Init(configFlags) database.ConnectDatabase() @@ -63,4 +70,10 @@ func main() { err := db.AutoMigrate(&database.AddressEntry{}) utils.Check(err, "could not run AutoMigrate") + w := watcher.NewWatcher() + + log.Printf("started watcher") + go w.Watch() + + select {} } diff --git a/internal/config/config.go b/internal/config/config.go index 31bb0c5..11f0340 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -27,11 +27,11 @@ func Init(userFlags map[string]interface{}) { config.Set("flags."+key, rv.Elem().Interface()) } // append the flags set by the user to the configuration object - parsedSources, err := GetSources() + parsedSources, err := getSources() utils.Check(err, "") config.Set("sources", parsedSources) - parsedEvents, err := GetEvents() + parsedEvents, err := getEvents() utils.Check(err, "") config.Set("watcher.events", parsedEvents) diff --git a/internal/config/settings.go b/internal/config/events.go similarity index 98% rename from internal/config/settings.go rename to internal/config/events.go index 176dca7..0f3a980 100644 --- a/internal/config/settings.go +++ b/internal/config/events.go @@ -55,7 +55,7 @@ type Events struct { OnError *EventHandler `mapstructure:"on_error"` } -func GetEvents() (*Events, error) { +func getEvents() (*Events, error) { config := GetConfig() if config == nil { diff --git a/internal/config/sources.go b/internal/config/sources.go index 31b5227..2e32122 100644 --- a/internal/config/sources.go +++ b/internal/config/sources.go @@ -31,7 +31,7 @@ type Source struct { Field *string `mapstructure:"field"` } -func GetSources() ([]Source, error) { +func getSources() ([]Source, error) { var sources []Source diff --git a/internal/database/models.go b/internal/database/models.go index 5c5b3e5..3ec448a 100644 --- a/internal/database/models.go +++ b/internal/database/models.go @@ -15,8 +15,6 @@ type AddressEntry struct { // PreviousAddress is the previous address, before the update PreviousAddress string `json:"previous_address"` - // Source is the URL of the API that provided the updated address - Source string `json:"source"` // Version specifies the version of the address this record refers to Version string `json:"version"` // CreatedAt is the UNIX time when the address update was detected @@ -24,10 +22,14 @@ type AddressEntry struct { } // Create is the function that creates a new AddressEntry record onto the database -func (e AddressEntry) Create(address string) (*AddressEntry, error) { +func (e AddressEntry) Create(address string, version string, previous string) (*AddressEntry, error) { database := GetDatabase() - entry := AddressEntry{Address: address} + entry := AddressEntry{ + Address: address, + Version: version, + PreviousAddress: previous, + } if err := database.Create(&entry).Error; err != nil { return nil, err @@ -45,7 +47,7 @@ func (e AddressEntry) First(addressVersion string) (*AddressEntry, error) { query := database. Where("version = ?", addressVersion). - Order("id desc"). + Order("created_at DESC"). First(&entry) if query.Error != nil { diff --git a/internal/watcher/fetch.go b/internal/watcher/fetch.go index d5455c0..e7756b6 100644 --- a/internal/watcher/fetch.go +++ b/internal/watcher/fetch.go @@ -11,14 +11,12 @@ import ( "strings" "github.com/gweebg/ipwatcher/internal/config" - "github.com/gweebg/ipwatcher/internal/utils" ) func RequestAddress(version string) (string, error) { conf := config.GetConfig() - sources, err := config.GetSources() - utils.Check(err, "") + sources := conf.Get("sources").([]config.Source) address := "" diff --git a/internal/watcher/watcher.go b/internal/watcher/watcher.go index 21618df..4ddbbdb 100644 --- a/internal/watcher/watcher.go +++ b/internal/watcher/watcher.go @@ -1,32 +1,25 @@ package watcher import ( + "errors" "github.com/gweebg/ipwatcher/internal/config" + "github.com/gweebg/ipwatcher/internal/database" + "github.com/gweebg/ipwatcher/internal/utils" + "log" "time" ) -type Action func() -type EventHandlers struct { - OnChange Action - OnMatch Action - OnError Action -} - -func (e EventHandlers) Load() *EventHandlers { - return nil -} - type Watcher struct { // Version indicates the versions the watcher is supposed to track (v4|v6|all) Version string - // AllowNotification enables notifications via email upon an event - AllowNotification bool - // AllowApi exposes a API with the database records - AllowApi bool - // AllowExec enables the execution of the actions defined in the configuration file - AllowExec bool + // allowApi exposes an API with the database records + allowApi bool + // allowNotif enables notification via email + allowNotif bool + // allowExec enables the execution of actions upon an event + allowExec bool // Timeout represents the duration between each address query Timeout time.Duration @@ -35,9 +28,6 @@ type Watcher struct { ticker *time.Ticker // tickerQuitChan allows the stop of the ticker tickerQuitChan chan struct{} - - // Handlers contains the event handles for determined actions - Handlers *EventHandlers } func NewWatcher() *Watcher { @@ -47,44 +37,100 @@ func NewWatcher() *Watcher { timeout := time.Duration( c.GetInt("watcher.timeout")) * time.Second - allowExec := c.GetBool("flags.exec") - allowNotif := c.GetBool("flags.notify") - return &Watcher{ Version: c.GetString("flags.version"), - AllowNotification: allowNotif, - AllowApi: c.GetBool("flags.api"), - AllowExec: allowExec, + allowApi: c.GetBool("flags.api"), + allowNotif: c.GetBool("flags.notify"), + allowExec: c.GetBool("flags.exec"), Timeout: timeout, ticker: time.NewTicker(timeout), tickerQuitChan: make(chan struct{}), - - Handlers: nil, } } -func (w *Watcher) Watch() error { - go w.Loop() +func (w *Watcher) HandleEvent(eventType string) error { + + c := config.GetConfig() + + var handler *config.EventHandler + events := c.Get("watcher.events").(*config.Events) + + switch eventType { + + case "OnChange": + handler = events.OnChange + case "OnMatch": + handler = events.OnMatch + case "OnError": + handler = events.OnError + + default: + return errors.New("unknown event type: " + eventType) + } + + if handler != nil && w.allowExec { + + for _, exec := range handler.Actions { + if handler.Notify && w.allowNotif { + log.Printf("sent notification to destination\n") + } + log.Printf("executing '%s %s %s'\n\n", exec.Type, exec.Path, exec.Args) + } + } + return nil } -func (w *Watcher) Loop() { +func (w *Watcher) Watch() { + // todo : stop upon ctrl-c + + var records = new(database.AddressEntry) + + go func() { + + for { + select { + + case <-w.ticker.C: - for { - select { + // get the address from the desired source + address, err := RequestAddress(w.Version) + if err != nil { + log.Printf("weh weh weh") + } - case <-w.ticker.C: - // ! Request address - // ! Get previous address - // ! Compare addresses - // ! Execute actions + // get latest address record of the database + previousAddress, err := records.First(w.Version) - case <-w.tickerQuitChan: - w.ticker.Stop() + // if the database is empty, then we insert the current address + if previousAddress == nil { + _, err = records.Create(address, w.Version, address) + continue + } + // compare addresses and handle accordingly + if address != previousAddress.Address { + + _, err = records.Create(address, w.Version, previousAddress.Address) // insert new record onto the database + utils.Check(err, "") + + err = w.HandleEvent("OnChange") + + } else { + err = w.HandleEvent("OnMatch") + } + + if err != nil { + err = w.HandleEvent("OnError") + } + + case <-w.tickerQuitChan: + w.ticker.Stop() + + } } - } + }() }