Skip to content

Commit

Permalink
Add ability to set a lockfile for a run (closes #54)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Sep 9, 2016
1 parent 4f3c480 commit 42e7d2e
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 7 deletions.
137 changes: 137 additions & 0 deletions lock_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
//
// Copyright (c) 2015-2016 Snowplow Analytics Ltd. All rights reserved.
//
// This program is licensed to you under the Apache License Version 2.0,
// and you may not use this file except in compliance with the Apache License Version 2.0.
// You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the Apache License Version 2.0 is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
//
package main

import (
"fmt"
"github.com/snowplow/sql-runner/playbook"
"log"
"os"
"path/filepath"
"time"
)

type LockFile struct {
Path string
SoftLock bool
ConsulAddress string
locked bool
}

// InitLockFile creates a LockFile object
// which is used to ensures jobs can not run
// at the same time.
func InitLockFile(path string, softLock bool, consulAddress string) (LockFile, error) {
lockFile := LockFile{
Path: path,
SoftLock: softLock,
ConsulAddress: consulAddress,
locked: false,
}

if lockFile.LockExists() {
return lockFile, fmt.Errorf("%s found on start, previous run failed or is ongoing. Cannot start", path)
} else {
return lockFile, nil
}
}

// Lock creates a new lock file or kv entry
func (lf *LockFile) Lock() error {
if lf.locked == true {
return fmt.Errorf("LockFile is already locked!")
}

value := time.Now().UTC().Format("2006-01-02T15:04:05-0700")

log.Printf("Checking and setting the lockfile at this key '%s'", lf.Path)

if lf.ConsulAddress == "" {
// Check if dir exists
dirStr := filepath.Dir(lf.Path)
if _, err := os.Stat(dirStr); os.IsNotExist(err) {
return fmt.Errorf("directory for key does not exist")
}

// Create the file
f, err := os.OpenFile(lf.Path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
defer f.Close()
if err != nil {
return err
}

// Write a line to it
_, err = f.WriteString(value)
if err != nil {
return err
}

lf.locked = true
return nil
} else {
// Create the KV pair
err := playbook.PutStringValueToConsul(lf.ConsulAddress, lf.Path, value)
if err != nil {
return err
}

lf.locked = true
return nil
}
}

// Unlock deletes the lock or kv entry
func (lf *LockFile) Unlock() error {

log.Printf("Deleting lockfile at this key '%s'", lf.Path)

if lf.ConsulAddress == "" {
// Delete the file
err := os.Remove(lf.Path)
if err != nil {
return err
}

lf.locked = false
return nil
} else {
// Delete the KV pair
err := playbook.DeleteValueFromConsul(lf.ConsulAddress, lf.Path)
if err != nil {
return err
}

lf.locked = false
return nil
}
}

// LockExists checks if the lock file
// exists already
func (lf *LockFile) LockExists() bool {
if lf.ConsulAddress == "" {
if _, err := os.Stat(lf.Path); os.IsNotExist(err) {
return false
} else {
return true
}
} else {
value, err := playbook.GetStringValueFromConsul(lf.ConsulAddress, lf.Path)

if err != nil && value == "" {
return false
} else {
return true
}
}
}
51 changes: 51 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ func main() {

options := processFlags()

lockFile, lockErr := LockFileFromOptions(options)
if lockErr != nil {
log.Printf("Could not get LockFile: %s", lockErr.Error())
os.Exit(3)
}

pbp, pbpErr := PlaybookProviderFromOptions(options)
if pbpErr != nil {
log.Fatalf("Could not determine playbook source: %s", pbpErr.Error())
Expand All @@ -57,9 +63,24 @@ func main() {
log.Fatalf("Could not determine sql source: %s", spErr.Error())
}

// Lock it up...
if lockFile != nil {
lockErr2 := lockFile.Lock()
if lockErr2 != nil {
log.Fatalf("Error making lock: %s", lockErr2.Error())
}
}

statuses := run.Run(*pb, sp, options.fromStep, options.dryRun)
code, message := review(statuses)

// Unlock on success and soft-lock
if lockFile != nil {
if code == 0 || lockFile.SoftLock {
lockFile.Unlock()
}
}

log.Printf(message)
os.Exit(code)
}
Expand Down Expand Up @@ -120,6 +141,36 @@ func SQLProviderFromOptions(options Options) (playbook.SQLProvider, error) {
}
}

// LockFileFromOptions will check if a LockFile already
// exists and will then either:
// 1. Raise an error
// 2. Set a new lock
func LockFileFromOptions(options Options) (*LockFile, error) {

// Do nothing if dry-run
if options.dryRun == true {
return nil, nil
}

var lockPath string
var isSoftLock bool

if options.lock != "" {
lockPath = options.lock
isSoftLock = false
} else if options.softLock != "" {
lockPath = options.softLock
isSoftLock = true
} else {
// no-op
return nil, nil
}

lockFile, err := InitLockFile(lockPath, isSoftLock, options.consul)

return &lockFile, err
}

// Resolve the path to our SQL scripts
func resolveSqlRoot(sqlroot string, playbookPath string, consulAddress string) (string, error) {
consulErr1 := fmt.Errorf("Cannot use %s option with -consul argument", sqlroot)
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type Options struct {
fromStep string
dryRun bool
consul string
lock string
softLock string
variables CLIVariables
}

Expand All @@ -61,6 +63,8 @@ func (o *Options) GetFlagSet() *flag.FlagSet {
fs.StringVar(&(o.fromStep), "fromStep", "", "Starts from a given step defined in your playbook")
fs.BoolVar(&(o.dryRun), "dryRun", false, "Runs through a playbook without executing any of the SQL")
fs.StringVar(&(o.consul), "consul", "", "The address of a consul server with playbooks and SQL files stored in KV pairs")
fs.StringVar(&(o.lock), "lock", "", "Optional argument which checks and sets a lockfile to ensure this run is a singleton. Deletes lock on run completing successfully")
fs.StringVar(&(o.softLock), "softLock", "", "Optional argument, like '-lock' but the lockfile will be deleted even if the run fails")
// TODO: add format flag if/when we support TOML

return fs
Expand Down
53 changes: 47 additions & 6 deletions playbook/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ import (
"github.com/hashicorp/consul/api"
)

// Attempts to return the bytes
// of a key stored in a Consul server
func GetBytesFromConsul(address string, key string) ([]byte, error) {
func GetConsulClient(address string) (*api.Client, error) {
// Add address to config
conf := api.DefaultConfig()
conf.Address = address

// Connect to consul
client, err := api.NewClient(conf)
return api.NewClient(conf)
}

// GetBytesFromConsul attempts to return the bytes
// of a key stored in a Consul server
func GetBytesFromConsul(address string, key string) ([]byte, error) {
client, err := GetConsulClient(address)
if err != nil {
return nil, err
}
Expand All @@ -45,8 +49,8 @@ func GetBytesFromConsul(address string, key string) ([]byte, error) {
}
}

// Attempts to return the string value
// of a key stored in a Consul server
// GetStringValueFromConsul attempts to return
// the string value of a key stored in a Consul server
func GetStringValueFromConsul(address string, key string) (string, error) {
bytes, err := GetBytesFromConsul(address, key)

Expand All @@ -56,3 +60,40 @@ func GetStringValueFromConsul(address string, key string) (string, error) {
return string(bytes), nil
}
}

// PutBytesToConsul attempts to push a new
// KV pair to a Consul Server
func PutBytesToConsul(address string, key string, value []byte) error {
client, err := GetConsulClient(address)
if err != nil {
return err
}

kv := client.KV()

// Put a new KV pair to consul
p := &api.KVPair{Key: key, Value: value}
_, err = kv.Put(p, nil)
return err
}

// PutStringValueToConsul attempts to push a new
// KV pair to a Consul Server
func PutStringValueToConsul(address string, key string, value string) error {
return PutBytesToConsul(address, key, []byte(value))
}

// DeleteValueFromConsul attempts to delete a
// KV pair from a Consul Server
func DeleteValueFromConsul(address string, key string) error {
client, err := GetConsulClient(address)
if err != nil {
return err
}

kv := client.KV()

// Delete the KV pair
_, err = kv.Delete(key, nil)
return err
}
2 changes: 1 addition & 1 deletion playbook/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type PlaybookProvider interface {
GetPlaybook() (*Playbook, error)
}

// readLines reads a whole file into memory
// loadLocalFile reads a whole file into memory
// and returns a slice of its lines.
func loadLocalFile(path string) ([]byte, error) {
file, err := os.Open(path)
Expand Down

0 comments on commit 42e7d2e

Please sign in to comment.