From 846cb6b0d5f8489e5a62a39a00a0679ae87c08fe Mon Sep 17 00:00:00 2001 From: Alexander Kratzsch Date: Fri, 28 Jan 2022 23:32:12 +0100 Subject: [PATCH] Improve mutex handling Also switch to hashicorps golang-lru for storing installRequests --- database.go | 149 +++++++++++++++++++++++++++++----------------------- go.mod | 1 + go.sum | 2 + main.go | 7 ++- 4 files changed, 91 insertions(+), 68 deletions(-) diff --git a/database.go b/database.go index aa8489e..b2e4448 100644 --- a/database.go +++ b/database.go @@ -31,6 +31,7 @@ import ( "time" "github.com/gorilla/mux" + lru "github.com/hashicorp/golang-lru" ) type SSHPorts struct { @@ -49,6 +50,7 @@ type Machine struct { InstallerAddr string `json:"installerAddress,omitempty"` installRequest *InstallRequest + mutex sync.RWMutex } func (m *Machine) State() string { @@ -56,18 +58,22 @@ func (m *Machine) State() string { return "provisioned" } - return m.installRequest.State.Current() + return m.installRequest.State.String() } // When calling this function, you should hold a read-lock on the db object func (m *Machine) getInstallRequest(db *tateruDB) { - installRequest, ok := db.installRequests[m.UUID] + m.mutex.Lock() + defer m.mutex.Unlock() + + installRequest, ok := db.installRequests.Peek(m.UUID) if ok { - m.installRequest = &installRequest + ir := installRequest.(InstallRequest) + m.installRequest = &ir // TODO: implement a custom JSON encoder instead of manually duplicating attributes - m.InstallerAddr = installRequest.InstallerAddr - m.SSHPorts = installRequest.SSHPorts + m.InstallerAddr = m.installRequest.InstallerAddr + m.SSHPorts = m.installRequest.SSHPorts } } @@ -107,7 +113,6 @@ func NewInstallRequestStateMachine() *StateMachine { } type InstallRequest struct { - LastUpdate time.Time Nonce string State *StateMachine SSHPubKey string @@ -139,18 +144,30 @@ type CallbackResponse struct { type tateruDB struct { machinesMutex sync.RWMutex machines []Machine - installRequests map[string]InstallRequest + installRequests *lru.Cache indexTmpl *template.Template } +func (db *tateruDB) GetMachineByUUID(uuid string) (Machine, bool) { + db.machinesMutex.RLock() + defer db.machinesMutex.RUnlock() + + for _, m := range db.machines { + if m.UUID == uuid { + return m, true + } + } + + return Machine{}, false +} func (db *tateruDB) HandleIndex(w http.ResponseWriter, r *http.Request) { w.Header().Add("content-type", "text/html; charset=utf-8") var d struct { Machines []Machine } - db.machinesMutex.RLock() + db.machinesMutex.RLock() machs := []Machine{} for _, m := range db.machines { m.getInstallRequest(db) @@ -165,6 +182,7 @@ func (db *tateruDB) HandleIndex(w http.ResponseWriter, r *http.Request) { return } db.machinesMutex.RUnlock() + w.Write(b.Bytes()) return } @@ -220,8 +238,6 @@ func (db *tateruDB) Poll() { db.machines = machs db.machinesMutex.Unlock() - // TODO: expire old install requests? - time.Sleep(time.Second * 30) } } @@ -229,19 +245,16 @@ func (db *tateruDB) Poll() { func (db *tateruDB) HandleMachinesAPI(w http.ResponseWriter, r *http.Request) { w.Header().Add("content-type", "application/json; charset=utf-8") - db.machinesMutex.RLock() - defer db.machinesMutex.RUnlock() - - machines := []Machine{} - // Filter machines if applicable - filterAlias := "" + var filterAlias string query := r.URL.Query() if query.Has("alias") { // Only use one (the first) alias filterAlias = query.Get("alias") } + db.machinesMutex.RLock() + machines := []Machine{} for _, machine := range db.machines { if filterAlias != "" && filterAlias != machine.Name { continue @@ -253,11 +266,13 @@ func (db *tateruDB) HandleMachinesAPI(w http.ResponseWriter, r *http.Request) { } b, err := json.MarshalIndent(machines, "", " ") + db.machinesMutex.RUnlock() if err != nil { http.Error(w, "Failed to render JSON", http.StatusInternalServerError) log.Printf("Failed to marshal machines JSON: %v", err) return } + w.Write(b) return } @@ -267,22 +282,15 @@ func (db *tateruDB) HandleFetchMachineAPI(w http.ResponseWriter, r *http.Request uuid := vars["uuid"] - db.machinesMutex.RLock() - defer db.machinesMutex.RUnlock() - - var machine *Machine - for _, m := range db.machines { - if m.UUID == uuid { - machine = &m - } - } - - if machine == nil { + machine, found := db.GetMachineByUUID(uuid) + if !found { http.Error(w, "No machine with this UUID found", http.StatusNotFound) return } + machine.mutex.RLock() b, err := json.MarshalIndent(machine, "", " ") + machine.mutex.RUnlock() if err != nil { http.Error(w, "Failed to render JSON", http.StatusInternalServerError) log.Printf("Failed to marshal machine JSON: %v", err) @@ -299,16 +307,8 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques uuid := vars["uuid"] - db.machinesMutex.Lock() - defer db.machinesMutex.Unlock() - - machine := Machine{} - for _, m := range db.machines { - if m.UUID == uuid { - machine = m - } - } - if machine.UUID != uuid { + machine, found := db.GetMachineByUUID(uuid) + if !found { http.Error(w, "No machine with this UUID found", http.StatusNotFound) return } @@ -322,13 +322,19 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques return } - installRequest := InstallRequest{ - LastUpdate: time.Now(), - State: NewInstallRequestStateMachine(), - SSHPubKey: bir.SSHPubKey, - Nonce: bir.Nonce, + machine.getInstallRequest(db) + // Create (new) installRequest when there is none or a new nonce is used + machine.mutex.Lock() + if machine.installRequest == nil || machine.installRequest.Nonce != bir.Nonce { + installRequest := InstallRequest{ + State: NewInstallRequestStateMachine(), + SSHPubKey: bir.SSHPubKey, + Nonce: bir.Nonce, + } + db.installRequests.Add(uuid, installRequest) + machine.installRequest = &installRequest } - db.installRequests[uuid] = installRequest + machine.mutex.Unlock() // Send boot-installer request to manager for machine managerBir := ManagerBootInstallerRequest{ @@ -342,7 +348,9 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques } client := &http.Client{} + machine.mutex.RLock() bootInstallerURL := fmt.Sprintf("%s/v1/machines/%s/boot-installer", machine.ManagedBy, machine.UUID) + machine.mutex.RUnlock() resp, err := client.Post(bootInstallerURL, "application/json", bytes.NewBuffer(b)) if err != nil { http.Error(w, "Error when sending boot-installer request to manager", http.StatusInternalServerError) @@ -361,11 +369,13 @@ func (db *tateruDB) HandleBootInstallerAPI(w http.ResponseWriter, r *http.Reques return } - installRequest.LastUpdate = time.Now() - installRequest.State.Transition("SENT_BOOT_INSTALLER_REQUEST_TO_MANAGER") - db.installRequests[uuid] = installRequest + machine.mutex.Lock() + machine.installRequest.State.Transition("SENT_BOOT_INSTALLER_REQUEST_TO_MANAGER") + // Update cached installRequest + db.installRequests.Add(uuid, *(machine.installRequest)) + machine.mutex.Unlock() - installRequest.State.WaitFor("booted") + machine.installRequest.State.WaitFor("booted") return } @@ -375,26 +385,21 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re uuid := vars["uuid"] - db.machinesMutex.Lock() - defer db.machinesMutex.Unlock() - - machine := Machine{} - for _, m := range db.machines { - if m.UUID == uuid { - machine = m - } - } - if machine.UUID != uuid { + machine, found := db.GetMachineByUUID(uuid) + if !found { http.Error(w, "No machine with this UUID found", http.StatusNotFound) return } - installRequest, ok := db.installRequests[uuid] - if !ok { + machine.getInstallRequest(db) + machine.mutex.RLock() + if machine.installRequest == nil { + machine.mutex.RUnlock() http.Error(w, "No install request found for this machine", http.StatusNotFound) log.Printf("Received installer callback for machine '%s', but there was previous InstallerRequest", machine.UUID) return } + machine.mutex.RUnlock() // Parse payload var cr CallbackRequest @@ -405,7 +410,7 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re return } - // Update machine with information + // Update installRequest with information installerAddr := r.Header.Get("X-Forwarded-For") if installerAddr == "" { installerAddr = r.RemoteAddr @@ -417,24 +422,34 @@ func (db *tateruDB) HandleInstallerCallbackAPI(w http.ResponseWriter, r *http.Re installerAddr = parts[0] } } - installRequest.InstallerAddr = installerAddr - installRequest.SSHPorts = cr.SSHPorts + machine.mutex.Lock() + machine.installRequest.InstallerAddr = installerAddr + machine.installRequest.SSHPorts = cr.SSHPorts + machine.installRequest.State.Transition("RECEIVED_INSTALLER_CALLBACK") + // Update cached installRequest + db.installRequests.Add(uuid, *(machine.installRequest)) cresp := CallbackResponse{ - SSHPubKey: installRequest.SSHPubKey, + SSHPubKey: machine.installRequest.SSHPubKey, } b, err := json.MarshalIndent(cresp, "", " ") + machine.mutex.Unlock() if err != nil { http.Error(w, "Failed to render JSON", http.StatusInternalServerError) log.Printf("Failed to marshal CallbackResponse JSON: %v", err) return } - installRequest.LastUpdate = time.Now() - installRequest.State.Transition("RECEIVED_INSTALLER_CALLBACK") - db.installRequests[uuid] = installRequest - w.Header().Add("content-type", "application/json; charset=utf-8") w.Write(b) return } + +func (db *tateruDB) installRequestEvict(key interface{}, value interface{}) { + installRequest := value.(*InstallRequest) + + state := installRequest.State.Current() + if state != "provisioned" { + log.Printf("ERROR: InstallRequest for %s was evicted while still running (state was %s)!", key.(string), state) + } +} diff --git a/go.mod b/go.mod index 74691f3..36cd7da 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 + github.com/hashicorp/golang-lru v0.5.4 gopkg.in/gorilla/handlers.v1 v1.4.0 gopkg.in/gorilla/mux.v1 v1.6.2 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 835d841..705638a 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/gorilla/handlers.v1 v1.4.0 h1:dD+J4RGXwT7R17TX4hqDCoGMIKDbapDJENUwAgyTZow= gopkg.in/gorilla/handlers.v1 v1.4.0/go.mod h1:o7WIirxsm7EPn28G+NWxv3L0Y7mue08OdaxiocakJz8= diff --git a/main.go b/main.go index cc3e1fa..bd6a33f 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" + lru "github.com/hashicorp/golang-lru" "gopkg.in/yaml.v2" ) @@ -57,7 +58,11 @@ func main() { if err := yaml.Unmarshal([]byte(cfile), &cfg); err != nil { log.Fatalf("yaml.Unmarshal failed: %v", err) } - db := &tateruDB{indexTmpl: indexTmpl, installRequests: make(map[string]InstallRequest)} + db := &tateruDB{indexTmpl: indexTmpl} + db.installRequests, err = lru.NewWithEvict(256, db.installRequestEvict) + if err != nil { + log.Fatalf("installRequest cache init error: %v", err) + } go db.Poll() router := mux.NewRouter() rf, _ := fs.Sub(resources, "resources")