Skip to content

Commit

Permalink
main loop + query bug fix + handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
gweebg committed Jan 28, 2024
1 parent 328ece5 commit c60dc19
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 57 deletions.
21 changes: 17 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/gweebg/ipwatcher/internal/config"
"github.com/gweebg/ipwatcher/internal/database"
"github.com/gweebg/ipwatcher/internal/utils"
"github.com/gweebg/ipwatcher/internal/watcher"
"log"
)

func main() {
Expand All @@ -26,7 +28,7 @@ func main() {
configFlags["version"] = flag.String(
"version",
"v4",
"version of the IP protocol, supports 'v4' | 'v6' | 'all'",
"version of the IP protocol, supports 'v4' | 'v6'",
)

configFlags["config"] = flag.String(
Expand All @@ -35,10 +37,10 @@ func main() {
"path to the configuration file",
)

configFlags["exec"] = flag.String(
configFlags["exec"] = flag.Bool(
"exec",
"on_change",
"run executable/script upon an event, supports 'on_change' | 'on_same' | 'always' | 'never'",
false,
"enable execution of configuration defined actions",
)

configFlags["api"] = flag.Bool(
Expand All @@ -55,6 +57,11 @@ func main() {

flag.Parse()

version := configFlags["version"].(*string)
if version == nil || (*version != "v4" && *version != "v6") {
log.Fatalf("flag 'version' must be either 'v4' or 'v6', not '%v'\n", *version)
}

config.Init(configFlags)

database.ConnectDatabase()
Expand All @@ -63,4 +70,10 @@ func main() {
err := db.AutoMigrate(&database.AddressEntry{})
utils.Check(err, "could not run AutoMigrate")

w := watcher.NewWatcher()

log.Printf("started watcher")
go w.Watch()

select {}
}
4 changes: 2 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ func Init(userFlags map[string]interface{}) {
config.Set("flags."+key, rv.Elem().Interface())
} // append the flags set by the user to the configuration object

parsedSources, err := GetSources()
parsedSources, err := getSources()
utils.Check(err, "")
config.Set("sources", parsedSources)

parsedEvents, err := GetEvents()
parsedEvents, err := getEvents()
utils.Check(err, "")
config.Set("watcher.events", parsedEvents)

Expand Down
2 changes: 1 addition & 1 deletion internal/config/settings.go → internal/config/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Events struct {
OnError *EventHandler `mapstructure:"on_error"`
}

func GetEvents() (*Events, error) {
func getEvents() (*Events, error) {

config := GetConfig()
if config == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/config/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type Source struct {
Field *string `mapstructure:"field"`
}

func GetSources() ([]Source, error) {
func getSources() ([]Source, error) {

var sources []Source

Expand Down
12 changes: 7 additions & 5 deletions internal/database/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,21 @@ type AddressEntry struct {
// PreviousAddress is the previous address, before the update
PreviousAddress string `json:"previous_address"`

// Source is the URL of the API that provided the updated address
Source string `json:"source"`
// Version specifies the version of the address this record refers to
Version string `json:"version"`
// CreatedAt is the UNIX time when the address update was detected
CreatedAt uint64 `gorm:"autoCreateTime" json:"at"`
}

// Create is the function that creates a new AddressEntry record onto the database
func (e AddressEntry) Create(address string) (*AddressEntry, error) {
func (e AddressEntry) Create(address string, version string, previous string) (*AddressEntry, error) {

database := GetDatabase()
entry := AddressEntry{Address: address}
entry := AddressEntry{
Address: address,
Version: version,
PreviousAddress: previous,
}

if err := database.Create(&entry).Error; err != nil {
return nil, err
Expand All @@ -45,7 +47,7 @@ func (e AddressEntry) First(addressVersion string) (*AddressEntry, error) {

query := database.
Where("version = ?", addressVersion).
Order("id desc").
Order("created_at DESC").
First(&entry)

if query.Error != nil {
Expand Down
4 changes: 1 addition & 3 deletions internal/watcher/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
"strings"

"github.com/gweebg/ipwatcher/internal/config"
"github.com/gweebg/ipwatcher/internal/utils"
)

func RequestAddress(version string) (string, error) {

conf := config.GetConfig()
sources, err := config.GetSources()
utils.Check(err, "")
sources := conf.Get("sources").([]config.Source)

address := ""

Expand Down
128 changes: 87 additions & 41 deletions internal/watcher/watcher.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
package watcher

import (
"errors"
"github.com/gweebg/ipwatcher/internal/config"
"github.com/gweebg/ipwatcher/internal/database"
"github.com/gweebg/ipwatcher/internal/utils"
"log"
"time"
)

type Action func()
type EventHandlers struct {
OnChange Action
OnMatch Action
OnError Action
}

func (e EventHandlers) Load() *EventHandlers {
return nil
}

type Watcher struct {

// Version indicates the versions the watcher is supposed to track (v4|v6|all)
Version string

// AllowNotification enables notifications via email upon an event
AllowNotification bool
// AllowApi exposes a API with the database records
AllowApi bool
// AllowExec enables the execution of the actions defined in the configuration file
AllowExec bool
// allowApi exposes an API with the database records
allowApi bool
// allowNotif enables notification via email
allowNotif bool
// allowExec enables the execution of actions upon an event
allowExec bool

// Timeout represents the duration between each address query
Timeout time.Duration
Expand All @@ -35,9 +28,6 @@ type Watcher struct {
ticker *time.Ticker
// tickerQuitChan allows the stop of the ticker
tickerQuitChan chan struct{}

// Handlers contains the event handles for determined actions
Handlers *EventHandlers
}

func NewWatcher() *Watcher {
Expand All @@ -47,44 +37,100 @@ func NewWatcher() *Watcher {
timeout := time.Duration(
c.GetInt("watcher.timeout")) * time.Second

allowExec := c.GetBool("flags.exec")
allowNotif := c.GetBool("flags.notify")

return &Watcher{
Version: c.GetString("flags.version"),

AllowNotification: allowNotif,
AllowApi: c.GetBool("flags.api"),
AllowExec: allowExec,
allowApi: c.GetBool("flags.api"),
allowNotif: c.GetBool("flags.notify"),
allowExec: c.GetBool("flags.exec"),

Timeout: timeout,
ticker: time.NewTicker(timeout),
tickerQuitChan: make(chan struct{}),

Handlers: nil,
}
}

func (w *Watcher) Watch() error {
go w.Loop()
func (w *Watcher) HandleEvent(eventType string) error {

c := config.GetConfig()

var handler *config.EventHandler
events := c.Get("watcher.events").(*config.Events)

switch eventType {

case "OnChange":
handler = events.OnChange
case "OnMatch":
handler = events.OnMatch
case "OnError":
handler = events.OnError

default:
return errors.New("unknown event type: " + eventType)
}

if handler != nil && w.allowExec {

for _, exec := range handler.Actions {
if handler.Notify && w.allowNotif {
log.Printf("sent notification to destination\n")
}
log.Printf("executing '%s %s %s'\n\n", exec.Type, exec.Path, exec.Args)
}
}

return nil
}

func (w *Watcher) Loop() {
func (w *Watcher) Watch() {
// todo : stop upon ctrl-c

var records = new(database.AddressEntry)

go func() {

for {
select {

case <-w.ticker.C:

for {
select {
// get the address from the desired source
address, err := RequestAddress(w.Version)
if err != nil {
log.Printf("weh weh weh")
}

case <-w.ticker.C:
// ! Request address
// ! Get previous address
// ! Compare addresses
// ! Execute actions
// get latest address record of the database
previousAddress, err := records.First(w.Version)

case <-w.tickerQuitChan:
w.ticker.Stop()
// if the database is empty, then we insert the current address
if previousAddress == nil {
_, err = records.Create(address, w.Version, address)
continue
}

// compare addresses and handle accordingly
if address != previousAddress.Address {

_, err = records.Create(address, w.Version, previousAddress.Address) // insert new record onto the database
utils.Check(err, "")

err = w.HandleEvent("OnChange")

} else {
err = w.HandleEvent("OnMatch")
}

if err != nil {
err = w.HandleEvent("OnError")
}

case <-w.tickerQuitChan:
w.ticker.Stop()

}
}
}

}()
}

0 comments on commit c60dc19

Please sign in to comment.