From 42e7d2e06e33c2f6cdd2661fc7c19e7c2f16a32f Mon Sep 17 00:00:00 2001 From: Josh Date: Fri, 9 Sep 2016 16:42:08 +0200 Subject: [PATCH] Add ability to set a lockfile for a run (closes #54) --- lock_file.go | 137 +++++++++++++++++++++++++++++++++++++++++++ main.go | 51 ++++++++++++++++ options.go | 4 ++ playbook/consul.go | 53 +++++++++++++++-- playbook/provider.go | 2 +- 5 files changed, 240 insertions(+), 7 deletions(-) create mode 100644 lock_file.go diff --git a/lock_file.go b/lock_file.go new file mode 100644 index 0000000..43886cd --- /dev/null +++ b/lock_file.go @@ -0,0 +1,137 @@ +// +// Copyright (c) 2015-2016 Snowplow Analytics Ltd. All rights reserved. +// +// This program is licensed to you under the Apache License Version 2.0, +// and you may not use this file except in compliance with the Apache License Version 2.0. +// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the Apache License Version 2.0 is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. +// +package main + +import ( + "fmt" + "github.com/snowplow/sql-runner/playbook" + "log" + "os" + "path/filepath" + "time" +) + +type LockFile struct { + Path string + SoftLock bool + ConsulAddress string + locked bool +} + +// InitLockFile creates a LockFile object +// which is used to ensures jobs can not run +// at the same time. +func InitLockFile(path string, softLock bool, consulAddress string) (LockFile, error) { + lockFile := LockFile{ + Path: path, + SoftLock: softLock, + ConsulAddress: consulAddress, + locked: false, + } + + if lockFile.LockExists() { + return lockFile, fmt.Errorf("%s found on start, previous run failed or is ongoing. Cannot start", path) + } else { + return lockFile, nil + } +} + +// Lock creates a new lock file or kv entry +func (lf *LockFile) Lock() error { + if lf.locked == true { + return fmt.Errorf("LockFile is already locked!") + } + + value := time.Now().UTC().Format("2006-01-02T15:04:05-0700") + + log.Printf("Checking and setting the lockfile at this key '%s'", lf.Path) + + if lf.ConsulAddress == "" { + // Check if dir exists + dirStr := filepath.Dir(lf.Path) + if _, err := os.Stat(dirStr); os.IsNotExist(err) { + return fmt.Errorf("directory for key does not exist") + } + + // Create the file + f, err := os.OpenFile(lf.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + defer f.Close() + if err != nil { + return err + } + + // Write a line to it + _, err = f.WriteString(value) + if err != nil { + return err + } + + lf.locked = true + return nil + } else { + // Create the KV pair + err := playbook.PutStringValueToConsul(lf.ConsulAddress, lf.Path, value) + if err != nil { + return err + } + + lf.locked = true + return nil + } +} + +// Unlock deletes the lock or kv entry +func (lf *LockFile) Unlock() error { + + log.Printf("Deleting lockfile at this key '%s'", lf.Path) + + if lf.ConsulAddress == "" { + // Delete the file + err := os.Remove(lf.Path) + if err != nil { + return err + } + + lf.locked = false + return nil + } else { + // Delete the KV pair + err := playbook.DeleteValueFromConsul(lf.ConsulAddress, lf.Path) + if err != nil { + return err + } + + lf.locked = false + return nil + } +} + +// LockExists checks if the lock file +// exists already +func (lf *LockFile) LockExists() bool { + if lf.ConsulAddress == "" { + if _, err := os.Stat(lf.Path); os.IsNotExist(err) { + return false + } else { + return true + } + } else { + value, err := playbook.GetStringValueFromConsul(lf.ConsulAddress, lf.Path) + + if err != nil && value == "" { + return false + } else { + return true + } + } +} diff --git a/main.go b/main.go index a1f4171..0f5c7c9 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,12 @@ func main() { options := processFlags() + lockFile, lockErr := LockFileFromOptions(options) + if lockErr != nil { + log.Printf("Could not get LockFile: %s", lockErr.Error()) + os.Exit(3) + } + pbp, pbpErr := PlaybookProviderFromOptions(options) if pbpErr != nil { log.Fatalf("Could not determine playbook source: %s", pbpErr.Error()) @@ -57,9 +63,24 @@ func main() { log.Fatalf("Could not determine sql source: %s", spErr.Error()) } + // Lock it up... + if lockFile != nil { + lockErr2 := lockFile.Lock() + if lockErr2 != nil { + log.Fatalf("Error making lock: %s", lockErr2.Error()) + } + } + statuses := run.Run(*pb, sp, options.fromStep, options.dryRun) code, message := review(statuses) + // Unlock on success and soft-lock + if lockFile != nil { + if code == 0 || lockFile.SoftLock { + lockFile.Unlock() + } + } + log.Printf(message) os.Exit(code) } @@ -120,6 +141,36 @@ func SQLProviderFromOptions(options Options) (playbook.SQLProvider, error) { } } +// LockFileFromOptions will check if a LockFile already +// exists and will then either: +// 1. Raise an error +// 2. Set a new lock +func LockFileFromOptions(options Options) (*LockFile, error) { + + // Do nothing if dry-run + if options.dryRun == true { + return nil, nil + } + + var lockPath string + var isSoftLock bool + + if options.lock != "" { + lockPath = options.lock + isSoftLock = false + } else if options.softLock != "" { + lockPath = options.softLock + isSoftLock = true + } else { + // no-op + return nil, nil + } + + lockFile, err := InitLockFile(lockPath, isSoftLock, options.consul) + + return &lockFile, err +} + // Resolve the path to our SQL scripts func resolveSqlRoot(sqlroot string, playbookPath string, consulAddress string) (string, error) { consulErr1 := fmt.Errorf("Cannot use %s option with -consul argument", sqlroot) diff --git a/options.go b/options.go index f4e7c78..d8bf9c0 100644 --- a/options.go +++ b/options.go @@ -43,6 +43,8 @@ type Options struct { fromStep string dryRun bool consul string + lock string + softLock string variables CLIVariables } @@ -61,6 +63,8 @@ func (o *Options) GetFlagSet() *flag.FlagSet { fs.StringVar(&(o.fromStep), "fromStep", "", "Starts from a given step defined in your playbook") fs.BoolVar(&(o.dryRun), "dryRun", false, "Runs through a playbook without executing any of the SQL") fs.StringVar(&(o.consul), "consul", "", "The address of a consul server with playbooks and SQL files stored in KV pairs") + fs.StringVar(&(o.lock), "lock", "", "Optional argument which checks and sets a lockfile to ensure this run is a singleton. Deletes lock on run completing successfully") + fs.StringVar(&(o.softLock), "softLock", "", "Optional argument, like '-lock' but the lockfile will be deleted even if the run fails") // TODO: add format flag if/when we support TOML return fs diff --git a/playbook/consul.go b/playbook/consul.go index b30f89f..1ac1150 100644 --- a/playbook/consul.go +++ b/playbook/consul.go @@ -17,15 +17,19 @@ import ( "github.com/hashicorp/consul/api" ) -// Attempts to return the bytes -// of a key stored in a Consul server -func GetBytesFromConsul(address string, key string) ([]byte, error) { +func GetConsulClient(address string) (*api.Client, error) { // Add address to config conf := api.DefaultConfig() conf.Address = address // Connect to consul - client, err := api.NewClient(conf) + return api.NewClient(conf) +} + +// GetBytesFromConsul attempts to return the bytes +// of a key stored in a Consul server +func GetBytesFromConsul(address string, key string) ([]byte, error) { + client, err := GetConsulClient(address) if err != nil { return nil, err } @@ -45,8 +49,8 @@ func GetBytesFromConsul(address string, key string) ([]byte, error) { } } -// Attempts to return the string value -// of a key stored in a Consul server +// GetStringValueFromConsul attempts to return +// the string value of a key stored in a Consul server func GetStringValueFromConsul(address string, key string) (string, error) { bytes, err := GetBytesFromConsul(address, key) @@ -56,3 +60,40 @@ func GetStringValueFromConsul(address string, key string) (string, error) { return string(bytes), nil } } + +// PutBytesToConsul attempts to push a new +// KV pair to a Consul Server +func PutBytesToConsul(address string, key string, value []byte) error { + client, err := GetConsulClient(address) + if err != nil { + return err + } + + kv := client.KV() + + // Put a new KV pair to consul + p := &api.KVPair{Key: key, Value: value} + _, err = kv.Put(p, nil) + return err +} + +// PutStringValueToConsul attempts to push a new +// KV pair to a Consul Server +func PutStringValueToConsul(address string, key string, value string) error { + return PutBytesToConsul(address, key, []byte(value)) +} + +// DeleteValueFromConsul attempts to delete a +// KV pair from a Consul Server +func DeleteValueFromConsul(address string, key string) error { + client, err := GetConsulClient(address) + if err != nil { + return err + } + + kv := client.KV() + + // Delete the KV pair + _, err = kv.Delete(key, nil) + return err +} diff --git a/playbook/provider.go b/playbook/provider.go index 67a9373..48e0a4a 100644 --- a/playbook/provider.go +++ b/playbook/provider.go @@ -21,7 +21,7 @@ type PlaybookProvider interface { GetPlaybook() (*Playbook, error) } -// readLines reads a whole file into memory +// loadLocalFile reads a whole file into memory // and returns a slice of its lines. func loadLocalFile(path string) ([]byte, error) { file, err := os.Open(path)