Skip to content

Commit

Permalink
Improve mutex handling
Browse files Browse the repository at this point in the history
Also switch to hashicorps golang-lru for storing installRequests
  • Loading branch information
klump committed Feb 7, 2022
1 parent 2a5adf3 commit 846cb6b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 68 deletions.
149 changes: 82 additions & 67 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"time"

"github.com/gorilla/mux"
lru "github.com/hashicorp/golang-lru"
)

type SSHPorts struct {
Expand All @@ -49,25 +50,30 @@ type Machine struct {
InstallerAddr string `json:"installerAddress,omitempty"`

installRequest *InstallRequest
mutex sync.RWMutex
}

func (m *Machine) State() string {
if m.installRequest == nil {
return "provisioned"
}

return m.installRequest.State.Current()
return m.installRequest.State.String()
}

// When calling this function, you should hold a read-lock on the db object
func (m *Machine) getInstallRequest(db *tateruDB) {
installRequest, ok := db.installRequests[m.UUID]
m.mutex.Lock()
defer m.mutex.Unlock()

installRequest, ok := db.installRequests.Peek(m.UUID)
if ok {
m.installRequest = &installRequest
ir := installRequest.(InstallRequest)
m.installRequest = &ir

// TODO: implement a custom JSON encoder instead of manually duplicating attributes
m.InstallerAddr = installRequest.InstallerAddr
m.SSHPorts = installRequest.SSHPorts
m.InstallerAddr = m.installRequest.InstallerAddr
m.SSHPorts = m.installRequest.SSHPorts
}
}

Expand Down Expand Up @@ -107,7 +113,6 @@ func NewInstallRequestStateMachine() *StateMachine {
}

type InstallRequest struct {
LastUpdate time.Time
Nonce string
State *StateMachine
SSHPubKey string
Expand Down Expand Up @@ -139,18 +144,30 @@ type CallbackResponse struct {
type tateruDB struct {
machinesMutex sync.RWMutex
machines []Machine
installRequests map[string]InstallRequest
installRequests *lru.Cache
indexTmpl *template.Template
}

func (db *tateruDB) GetMachineByUUID(uuid string) (Machine, bool) {
db.machinesMutex.RLock()
defer db.machinesMutex.RUnlock()

for _, m := range db.machines {
if m.UUID == uuid {
return m, true
}
}

return Machine{}, false
}
func (db *tateruDB) HandleIndex(w http.ResponseWriter, r *http.Request) {
w.Header().Add("content-type", "text/html; charset=utf-8")

var d struct {
Machines []Machine
}
db.machinesMutex.RLock()

db.machinesMutex.RLock()
machs := []Machine{}
for _, m := range db.machines {
m.getInstallRequest(db)
Expand All @@ -165,6 +182,7 @@ func (db *tateruDB) HandleIndex(w http.ResponseWriter, r *http.Request) {
return
}
db.machinesMutex.RUnlock()

w.Write(b.Bytes())
return
}
Expand Down Expand Up @@ -220,28 +238,23 @@ func (db *tateruDB) Poll() {
db.machines = machs
db.machinesMutex.Unlock()

// TODO: expire old install requests?

time.Sleep(time.Second * 30)
}
}

func (db *tateruDB) HandleMachinesAPI(w http.ResponseWriter, r *http.Request) {
w.Header().Add("content-type", "application/json; charset=utf-8")

db.machinesMutex.RLock()
defer db.machinesMutex.RUnlock()

machines := []Machine{}

// Filter machines if applicable
filterAlias := ""
var filterAlias string
query := r.URL.Query()
if query.Has("alias") {
// Only use one (the first) alias
filterAlias = query.Get("alias")
}

db.machinesMutex.RLock()
machines := []Machine{}
for _, machine := range db.machines {
if filterAlias != "" && filterAlias != machine.Name {
continue
Expand All @@ -253,11 +266,13 @@ func (db *tateruDB) HandleMachinesAPI(w http.ResponseWriter, r *http.Request) {
}

b, err := json.MarshalIndent(machines, "", " ")
db.machinesMutex.RUnlock()
if err != nil {
http.Error(w, "Failed to render JSON", http.StatusInternalServerError)
log.Printf("Failed to marshal machines JSON: %v", err)
return
}

w.Write(b)
return
}
Expand All @@ -267,22 +282,15 @@ func (db *tateruDB) HandleFetchMachineAPI(w http.ResponseWriter, r *http.Request

uuid := vars["uuid"]

db.machinesMutex.RLock()
defer db.machinesMutex.RUnlock()

var machine *Machine
for _, m := range db.machines {
if m.UUID == uuid {
machine = &m
}
}

if machine == nil {
machine, found := db.GetMachineByUUID(uuid)
if !found {
http.Error(w, "No machine with this UUID found", http.StatusNotFound)
return
}

machine.mutex.RLock()
b, err := json.MarshalIndent(machine, "", " ")
machine.mutex.RUnlock()
if err != nil {
http.Error(w, "Failed to render JSON", http.StatusInternalServerError)
log.Printf("Failed to marshal machine JSON: %v", err)
Expand All @@ -299,16 +307,8 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques

uuid := vars["uuid"]

db.machinesMutex.Lock()
defer db.machinesMutex.Unlock()

machine := Machine{}
for _, m := range db.machines {
if m.UUID == uuid {
machine = m
}
}
if machine.UUID != uuid {
machine, found := db.GetMachineByUUID(uuid)
if !found {
http.Error(w, "No machine with this UUID found", http.StatusNotFound)
return
}
Expand All @@ -322,13 +322,19 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques
return
}

installRequest := InstallRequest{
LastUpdate: time.Now(),
State: NewInstallRequestStateMachine(),
SSHPubKey: bir.SSHPubKey,
Nonce: bir.Nonce,
machine.getInstallRequest(db)
// Create (new) installRequest when there is none or a new nonce is used
machine.mutex.Lock()
if machine.installRequest == nil || machine.installRequest.Nonce != bir.Nonce {
installRequest := InstallRequest{
State: NewInstallRequestStateMachine(),
SSHPubKey: bir.SSHPubKey,
Nonce: bir.Nonce,
}
db.installRequests.Add(uuid, installRequest)
machine.installRequest = &installRequest
}
db.installRequests[uuid] = installRequest
machine.mutex.Unlock()

// Send boot-installer request to manager for machine
managerBir := ManagerBootInstallerRequest{
Expand All @@ -342,7 +348,9 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques
}

client := &http.Client{}
machine.mutex.RLock()
bootInstallerURL := fmt.Sprintf("%s/v1/machines/%s/boot-installer", machine.ManagedBy, machine.UUID)
machine.mutex.RUnlock()
resp, err := client.Post(bootInstallerURL, "application/json", bytes.NewBuffer(b))
if err != nil {
http.Error(w, "Error when sending boot-installer request to manager", http.StatusInternalServerError)
Expand All @@ -361,11 +369,13 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques
return
}

installRequest.LastUpdate = time.Now()
installRequest.State.Transition("SENT_BOOT_INSTALLER_REQUEST_TO_MANAGER")
db.installRequests[uuid] = installRequest
machine.mutex.Lock()
machine.installRequest.State.Transition("SENT_BOOT_INSTALLER_REQUEST_TO_MANAGER")
// Update cached installRequest
db.installRequests.Add(uuid, *(machine.installRequest))
machine.mutex.Unlock()

installRequest.State.WaitFor("booted")
machine.installRequest.State.WaitFor("booted")

return
}
Expand All @@ -375,26 +385,21 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re

uuid := vars["uuid"]

db.machinesMutex.Lock()
defer db.machinesMutex.Unlock()

machine := Machine{}
for _, m := range db.machines {
if m.UUID == uuid {
machine = m
}
}
if machine.UUID != uuid {
machine, found := db.GetMachineByUUID(uuid)
if !found {
http.Error(w, "No machine with this UUID found", http.StatusNotFound)
return
}

installRequest, ok := db.installRequests[uuid]
if !ok {
machine.getInstallRequest(db)
machine.mutex.RLock()
if machine.installRequest == nil {
machine.mutex.RUnlock()
http.Error(w, "No install request found for this machine", http.StatusNotFound)
log.Printf("Received installer callback for machine '%s', but there was previous InstallerRequest", machine.UUID)
return
}
machine.mutex.RUnlock()

// Parse payload
var cr CallbackRequest
Expand All @@ -405,7 +410,7 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re
return
}

// Update machine with information
// Update installRequest with information
installerAddr := r.Header.Get("X-Forwarded-For")
if installerAddr == "" {
installerAddr = r.RemoteAddr
Expand All @@ -417,24 +422,34 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re
installerAddr = parts[0]
}
}
installRequest.InstallerAddr = installerAddr
installRequest.SSHPorts = cr.SSHPorts
machine.mutex.Lock()
machine.installRequest.InstallerAddr = installerAddr
machine.installRequest.SSHPorts = cr.SSHPorts
machine.installRequest.State.Transition("RECEIVED_INSTALLER_CALLBACK")
// Update cached installRequest
db.installRequests.Add(uuid, *(machine.installRequest))

cresp := CallbackResponse{
SSHPubKey: installRequest.SSHPubKey,
SSHPubKey: machine.installRequest.SSHPubKey,
}
b, err := json.MarshalIndent(cresp, "", " ")
machine.mutex.Unlock()
if err != nil {
http.Error(w, "Failed to render JSON", http.StatusInternalServerError)
log.Printf("Failed to marshal CallbackResponse JSON: %v", err)
return
}

installRequest.LastUpdate = time.Now()
installRequest.State.Transition("RECEIVED_INSTALLER_CALLBACK")
db.installRequests[uuid] = installRequest

w.Header().Add("content-type", "application/json; charset=utf-8")
w.Write(b)
return
}

func (db *tateruDB) installRequestEvict(key interface{}, value interface{}) {
installRequest := value.(*InstallRequest)

state := installRequest.State.Current()
if state != "provisioned" {
log.Printf("ERROR: InstallRequest for %s was evicted while still running (state was %s)!", key.(string), state)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/handlers v1.5.1
github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru v0.5.4
gopkg.in/gorilla/handlers.v1 v1.4.0
gopkg.in/gorilla/mux.v1 v1.6.2
gopkg.in/yaml.v2 v2.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/gorilla/handlers.v1 v1.4.0 h1:dD+J4RGXwT7R17TX4hqDCoGMIKDbapDJENUwAgyTZow=
gopkg.in/gorilla/handlers.v1 v1.4.0/go.mod h1:o7WIirxsm7EPn28G+NWxv3L0Y7mue08OdaxiocakJz8=
Expand Down
7 changes: 6 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
lru "github.com/hashicorp/golang-lru"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -57,7 +58,11 @@ func main() {
if err := yaml.Unmarshal([]byte(cfile), &cfg); err != nil {
log.Fatalf("yaml.Unmarshal failed: %v", err)
}
db := &tateruDB{indexTmpl: indexTmpl, installRequests: make(map[string]InstallRequest)}
db := &tateruDB{indexTmpl: indexTmpl}
db.installRequests, err = lru.NewWithEvict(256, db.installRequestEvict)
if err != nil {
log.Fatalf("installRequest cache init error: %v", err)
}
go db.Poll()
router := mux.NewRouter()
rf, _ := fs.Sub(resources, "resources")
Expand Down

0 comments on commit 846cb6b

Please sign in to comment.