Skip to content

Commit

Permalink
migrate CF API from v2 to v3
Browse files Browse the repository at this point in the history
  • Loading branch information
metskem committed Oct 14, 2024
1 parent ee9f345 commit a20120d
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 375 deletions.
36 changes: 16 additions & 20 deletions conf/Config.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package conf

import (
"context"
"fmt"
"github.com/cloudfoundry-community/go-cfenv"
"github.com/cloudfoundry/go-cfclient/v3/client"
"github.com/cloudfoundry/go-cfclient/v3/config"
"github.com/rabobank/scheduler-service-broker/model"
"os"
"strconv"
)

var (
Catalog model.Catalog
ListenPort int
Debug = false
httpTimeoutStr = os.Getenv("SSB_HTTP_TIMEOUT")
HttpTimeout int
HttpTimeoutDefault = 10
ClientId = os.Getenv("SSB_CLIENT_ID")
CfApiURL = os.Getenv("SSB_CFAPI_URL")
tokenRefreshIntervalStr = os.Getenv("SSB_TOKEN_REFRESH_INTERVAL")
TokenRefreshInterval int64
SchedulerEndpoint = os.Getenv("SSB_SCHEDULER_ENDPOINT")
Catalog model.Catalog
ListenPort int
Debug = false
httpTimeoutStr = os.Getenv("SSB_HTTP_TIMEOUT")
HttpTimeout int
HttpTimeoutDefault = 10
ClientId = os.Getenv("SSB_CLIENT_ID")
CfApiURL = os.Getenv("SSB_CFAPI_URL")
SchedulerEndpoint = os.Getenv("SSB_SCHEDULER_ENDPOINT")

DebugStr = os.Getenv("SSB_DEBUG")
BrokerUser = os.Getenv("SSB_BROKER_USER")
Expand All @@ -35,6 +36,10 @@ var (
BrokerPassword string
DBPassword string
ClientSecret string

CfClient *client.Client
CfConfig *config.Config
CfCtx = context.Background()
)

const BasicAuthRealm = "scheduler-service-broker"
Expand Down Expand Up @@ -62,15 +67,6 @@ func EnvironmentComplete() {
envComplete = false
fmt.Println("missing envvar: SSB_CFAPI_URL")
}
if len(tokenRefreshIntervalStr) == 0 {
TokenRefreshInterval = 90
} else {
var err error
TokenRefreshInterval, err = strconv.ParseInt(tokenRefreshIntervalStr, 0, 64)
if err != nil {
panic(err)
}
}
if SchedulerEndpoint == "" {
envComplete = false
fmt.Println("missing envvar: SSB_SCHEDULER_ENDPOINT")
Expand Down
5 changes: 3 additions & 2 deletions controllers/Calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"fmt"
"github.com/rabobank/scheduler-service-broker/conf"
"github.com/rabobank/scheduler-service-broker/cron"
"github.com/rabobank/scheduler-service-broker/db"
"github.com/rabobank/scheduler-service-broker/model"
Expand All @@ -15,7 +16,7 @@ func CallCreate(w http.ResponseWriter, r *http.Request) {
if req.Name == "" {
util.WriteHttpResponse(w, http.StatusBadRequest, "the name of the call was empty")
} else {
if _, err := util.CfClient.AppByGuid(req.AppGUID); err != nil {
if _, err := conf.CfClient.Applications.Get(conf.CfCtx, req.AppGUID); err != nil {
util.WriteHttpResponse(w, http.StatusBadRequest, fmt.Sprintf("app with guid %s not found: %s", req.AppGUID, err))
} else {
if !util.IsAppBoundToSchedulerService(req.AppGUID) {
Expand Down Expand Up @@ -62,7 +63,7 @@ func CallGet(w http.ResponseWriter, r *http.Request) {
var calls = make([]model.Call, 0)
for _, call := range result {
appName := "<unknown>"
if app, err := util.CfClient.GetAppByGuid(call.AppGuid); err == nil {
if app, err := conf.CfClient.Applications.Get(conf.CfCtx, call.AppGuid); err == nil {
appName = app.Name
}
calls = append(calls, model.Call{CallName: call.Name, AppName: appName, Url: call.Url, AuthHeader: call.AuthHeader})
Expand Down
5 changes: 3 additions & 2 deletions controllers/Jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controllers

import (
"fmt"
"github.com/rabobank/scheduler-service-broker/conf"
"github.com/rabobank/scheduler-service-broker/cron"
"github.com/rabobank/scheduler-service-broker/db"
"github.com/rabobank/scheduler-service-broker/model"
Expand All @@ -16,7 +17,7 @@ func JobCreate(w http.ResponseWriter, r *http.Request) {
if req.Name == "" {
util.WriteHttpResponse(w, http.StatusBadRequest, "the name of the job was empty")
} else {
if _, err = util.CfClient.AppByGuid(req.AppGUID); err != nil {
if _, err = conf.CfClient.Applications.Get(conf.CfCtx, req.AppGUID); err != nil {
util.WriteHttpResponse(w, http.StatusBadRequest, fmt.Sprintf("app with guid %s not found: %s", req.AppGUID, err))
} else {
if !util.IsAppBoundToSchedulerService(req.AppGUID) {
Expand Down Expand Up @@ -63,7 +64,7 @@ func JobGet(w http.ResponseWriter, r *http.Request) {
var jobs = make([]model.Job, 0)
for _, job := range result {
appName := "<unknown>"
if app, err := util.CfClient.GetAppByGuid(job.AppGuid); err == nil {
if app, err := conf.CfClient.Applications.Get(conf.CfCtx, job.AppGuid); err == nil {
appName = app.Name
}
jobs = append(jobs, model.Job{JobName: job.Name, AppName: appName, Command: job.Command, MemoryInMB: job.MemoryInMB, DiskInMB: job.DiskInMB})
Expand Down
25 changes: 13 additions & 12 deletions controllers/ServiceBindingController.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controllers
import (
"fmt"
"github.com/gorilla/mux"
"github.com/rabobank/scheduler-service-broker/conf"
"github.com/rabobank/scheduler-service-broker/cron"
"github.com/rabobank/scheduler-service-broker/db"
"github.com/rabobank/scheduler-service-broker/model"
Expand All @@ -27,42 +28,42 @@ func CreateServiceBinding(w http.ResponseWriter, r *http.Request) {
func DeleteServiceBinding(w http.ResponseWriter, r *http.Request) {
serviceInstanceId := mux.Vars(r)["service_instance_guid"]
serviceBindingId := mux.Vars(r)["service_binding_guid"]
if serviceInstance, err := util.CfClient.GetServiceInstanceByGuid(serviceInstanceId); err != nil {
if serviceInstance, err := conf.CfClient.ServiceInstances.Get(conf.CfCtx, serviceInstanceId); err != nil {
fmt.Printf("could not find service instance with guid %s: %s\n", serviceInstanceId, err)
util.WriteHttpResponse(w, http.StatusGone, model.DummyResponse{DummyField: "bye bye"})
} else {
if serviceBinding, err := util.CfClient.GetServiceBindingByGuid(serviceBindingId); err != nil {
if serviceBinding, err := conf.CfClient.ServiceCredentialBindings.Get(conf.CfCtx, serviceBindingId); err != nil {
fmt.Printf("could not find service binding with guid %s: %s\n", serviceBindingId, err)
util.WriteHttpResponse(w, http.StatusGone, model.DummyResponse{DummyField: "bye bye"})
} else {
// delete the jobs , calls and schedules for this app
jobs, jobError := db.GetJobs(serviceInstance.SpaceGuid, "")
_ = db.DeleteJobBySpaceGuidAndAppGuid(serviceInstance.SpaceGuid, serviceBinding.AppGuid)
jobs, jobError := db.GetJobs(serviceInstance.Relationships.Space.Data.GUID, "")
_ = db.DeleteJobBySpaceGuidAndAppGuid(serviceInstance.Relationships.Space.Data.GUID, serviceBinding.Relationships.App.Data.GUID)
// delete the job from cron as well (we need the jobname to do that)
var jobOrCallName string
if jobError == nil {
for _, job := range jobs {
jobOrCallName = job.Name
if job.AppGuid == serviceBinding.AppGuid {
cron.DeleteJobByNameAndSpaceGuid(job.Name, serviceInstance.SpaceGuid)
if job.AppGuid == serviceBinding.Relationships.App.Data.GUID {
cron.DeleteJobByNameAndSpaceGuid(job.Name, serviceInstance.Relationships.Space.Data.GUID)
}
}
} else {
fmt.Printf("no jobs found for deletion (app name: %s, spaceguid: %s)", jobOrCallName, serviceInstance.SpaceGuid)
fmt.Printf("no jobs found for deletion (app name: %s, spaceguid: %s)", jobOrCallName, serviceInstance.Relationships.Space.Data.GUID)
}

calls, callError := db.GetCalls(serviceInstance.SpaceGuid, "")
_ = db.DeleteCallBySpaceGuidAndAppGuid(serviceInstance.SpaceGuid, serviceBinding.AppGuid)
calls, callError := db.GetCalls(serviceInstance.Relationships.Space.Data.GUID, "")
_ = db.DeleteCallBySpaceGuidAndAppGuid(serviceInstance.Relationships.Space.Data.GUID, serviceBinding.Relationships.App.Data.GUID)
// delete the call from cron as well (we need the callname to do that)
if callError == nil {
for _, call := range calls {
jobOrCallName = call.Name
if call.AppGuid == serviceBinding.AppGuid {
cron.DeleteCallByNameAndSpaceGuid(call.Name, serviceInstance.SpaceGuid)
if call.AppGuid == serviceBinding.Relationships.App.Data.GUID {
cron.DeleteCallByNameAndSpaceGuid(call.Name, serviceInstance.Relationships.Space.Data.GUID)
}
}
} else {
fmt.Printf("no calls found for deletion (app name: %s, spaceguid: %s)", jobOrCallName, serviceInstance.SpaceGuid)
fmt.Printf("no calls found for deletion (app name: %s, spaceguid: %s)", jobOrCallName, serviceInstance.Relationships.Space.Data.GUID)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions controllers/ServiceInstanceController.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
)

func Catalog(w http.ResponseWriter, r *http.Request) {
_ = r // get rid of compiler warning
util.WriteHttpResponse(w, http.StatusOK, conf.Catalog)
}

Expand Down
65 changes: 19 additions & 46 deletions cron/runner.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package cron

import (
"encoding/json"
"fmt"
"github.com/cloudfoundry/go-cfclient/v3/client"
"github.com/cloudfoundry/go-cfclient/v3/resource"
"github.com/rabobank/scheduler-service-broker/conf"
"github.com/rabobank/scheduler-service-broker/db"
"github.com/rabobank/scheduler-service-broker/model"
Expand Down Expand Up @@ -180,19 +181,13 @@ func StartHousekeeping() {
_ = db.DeleteHistoryByAge(conf.MaxHistoriesDays)
_ = db.DeleteScheduleByAge(conf.MaxHistoriesDays)
}
req := util.CfClient.NewRequest(http.MethodGet, "/v3/tasks?order_by=-created_at&per_page=100") // TODO should the per_page be configurable with an envvar?
if resp, err := util.CfClient.DoRequest(req); err != nil {
taskListOptions := client.TaskListOptions{ListOptions: &client.ListOptions{PerPage: 100, OrderBy: "-created_at"}}
if tasks, err := conf.CfClient.Tasks.ListAll(conf.CfCtx, &taskListOptions); err != nil {
fmt.Println(err)
} else {
body, _ := io.ReadAll(resp.Body)
response := model.TaskListResponse{}
if err = json.Unmarshal(body, &response); err != nil {
fmt.Println(err)
} else {
fmt.Printf("found %d cf tasks in %d pages...", response.Pagination.TotalResults, response.Pagination.TotalPages)
if err = db.UpdateState(response); err != nil {
fmt.Printf("failed to update histories state: %s\n", err)
}
fmt.Printf("found %d cf tasks", len(tasks))
if err = db.UpdateState(tasks); err != nil {
fmt.Printf("failed to update histories state: %s\n", err)
}
}
}
Expand All @@ -209,13 +204,13 @@ func DoCall(scheduledTime time.Time, call model.SchedulableCall) {
}
// do the actual call to the URL
transport := http.Transport{IdleConnTimeout: time.Second}
client := http.Client{Timeout: time.Duration(conf.HttpTimeout) * time.Second, Transport: &transport}
httpClient := http.Client{Timeout: time.Duration(conf.HttpTimeout) * time.Second, Transport: &transport}
callUrl, _ := url.Parse(call.Url)
req := http.Request{Method: http.MethodGet, URL: callUrl}
if call.AuthHeader != "" {
req = http.Request{Method: http.MethodGet, URL: callUrl, Header: map[string][]string{"Authorization": {call.AuthHeader}}}
}
resp, err := client.Do(&req)
resp, err := httpClient.Do(&req)
historyRecord.ExecutionEndTime = time.Now()
if err != nil {
fmt.Printf("failed calling url \"%s\": %s\n", callUrl, err)
Expand Down Expand Up @@ -244,44 +239,22 @@ func DoCall(scheduledTime time.Time, call model.SchedulableCall) {
}

func DoJob(scheduledTime time.Time, job model.SchedulableJob) {
historyRecord := model.History{
Guid: util.GenerateGUID(),
ScheduledTime: scheduledTime,
ExecutionStartTime: time.Now(),
ScheduleGuid: job.ScheduleGuid,
CreatedAt: time.Now(),
}
// prepare the post body, the memory and disk are optional
postBodyDiskPart := ""
postBodyMemoryPart := ""
if job.DiskInMB > 0 {
postBodyDiskPart = fmt.Sprintf(",\"disk_in_mb\":\"%d\"", job.DiskInMB)
}
if job.MemoryInMB > 0 {
postBodyMemoryPart = fmt.Sprintf(",\"memory_in_mb\":\"%d\"", job.MemoryInMB)
}
postBody := fmt.Sprintf("{ \"command\": \"%s\" %s %s}", strings.ReplaceAll(job.Command, `"`, `\"`), postBodyDiskPart, postBodyMemoryPart)
historyRecord := model.History{Guid: util.GenerateGUID(), ScheduledTime: scheduledTime, ExecutionStartTime: time.Now(), ScheduleGuid: job.ScheduleGuid, CreatedAt: time.Now()}

// run the actual cf task (https://v3-apidocs.cloudfoundry.org/version/3.112.0/index.html#create-a-task):
req := util.CfClient.NewRequestWithBody(http.MethodPost, fmt.Sprintf("/v3/apps/%s/tasks", job.AppGuid), strings.NewReader(postBody))
if resp, err := util.CfClient.DoRequest(req); err != nil {
// run the actual cf task
taskCreateRequest := resource.TaskCreate{Command: &job.Command, MemoryInMB: &job.MemoryInMB, DiskInMB: &job.DiskInMB}
if task, err := conf.CfClient.Tasks.Create(conf.CfCtx, job.AppGuid, &taskCreateRequest); err != nil {
fmt.Printf("failed running cmd %s in app with guid %s: %s\n", job.Command, job.AppGuid, err)
} else {
historyRecord.ExecutionEndTime = time.Now()
body, _ := io.ReadAll(resp.Body)
response := model.TaskCreateResponse{}
if err = json.Unmarshal(body, &response); err != nil {
fmt.Printf("failed running cmd %s in app with guid %s: %s\n", job.Command, job.AppGuid, err)
} else {
util.PrintfIfDebug("response from schedule task: %s\n", body)
historyRecord.State = response.State
historyRecord.TaskGuid = response.GUID
historyRecord.Message = ""
if response.Result.FailureReason != nil {
historyRecord.Message = util.LastXChars(fmt.Sprintf("%s", response.Result.FailureReason), 255)
}
historyRecord.State = task.State
historyRecord.TaskGuid = task.GUID
historyRecord.Message = ""
if task.Result.FailureReason != nil {
historyRecord.Message = util.LastXChars(fmt.Sprintf("%s", *task.Result.FailureReason), 255)
}
}

if job.ScheduleGuid == "" {
// only when we did a "cf run-job"
var err error
Expand Down
25 changes: 13 additions & 12 deletions db/Histories.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/cloudfoundry/go-cfclient/v3/resource"
"github.com/rabobank/scheduler-service-broker/model"
"github.com/rabobank/scheduler-service-broker/util"
"time"
Expand All @@ -12,7 +13,7 @@ import (
func InsertHistory(history model.History) (string, error) {
var err error
db := GetDB()
defer db.Close()
defer func() { _ = db.Close() }()
history.Guid = util.GenerateGUID()
util.PrintfIfDebug("inserting history: %s\n", history)
var scheduleGuid sql.NullString
Expand All @@ -37,7 +38,7 @@ func GetJobHistories(spaceguid, name string) ([]model.History, error) {
}
result := make([]model.History, 0)
db := GetDB()
defer db.Close()
defer func() { _ = db.Close() }()
var rows *sql.Rows
rows, err = db.Query("select h.guid,h.scheduled_time,h.execution_start_time,h.execution_end_time,h.message,h.state,h.schedule_guid,h.task_guid,h.created_at from histories h, schedulables a, schedules s, jobs j where h.schedule_guid=s.guid and s.schedulable_guid=a.guid and a.guid=j.guid and j.spaceguid like ? and j.name like ? order by h.execution_end_time desc", spaceguid, name)
if err != nil {
Expand All @@ -59,7 +60,7 @@ func GetCallHistories(spaceguid, name string) ([]model.History, error) {
}
result := make([]model.History, 0)
db := GetDB()
defer db.Close()
defer func() { _ = db.Close() }()
var rows *sql.Rows
rows, err = db.Query("select h.guid,h.scheduled_time,h.execution_start_time,h.execution_end_time,h.message,h.state,h.schedule_guid,h.task_guid,h.created_at from histories h, schedulables a, schedules s, calls c where h.schedule_guid=s.guid and s.schedulable_guid=a.guid and a.guid=c.guid and c.spaceguid=? and c.name=? order by h.execution_end_time desc", spaceguid, name)
if err != nil {
Expand All @@ -74,7 +75,7 @@ func GetCallHistories(spaceguid, name string) ([]model.History, error) {
func histories2array(rows *sql.Rows) []model.History {
result := make([]model.History, 0)
if rows != nil {
defer rows.Close()
defer func() { _ = rows.Close() }()
var guid, message, state, scheduleGuid, taskGuid string
var scheduledTime, executionStartTime, executionEndTime, createdAt time.Time
for rows.Next() {
Expand Down Expand Up @@ -104,7 +105,7 @@ func DeleteHistoryByAge(maxDays int64) error {
cutOffDate := time.Now().Add(time.Duration(-nanoseconds))
fmt.Printf("cleaning histories table for rows older than %d days (older than %s)\n", maxDays, cutOffDate.Format(time.RFC3339))
db := GetDB()
defer db.Close()
defer func() { _ = db.Close() }()
var result sql.Result
if result, err = db.Exec("delete from histories where execution_start_time<?", cutOffDate); err != nil {
fmt.Printf("failed to clean up histories: %s\n", err)
Expand All @@ -116,19 +117,19 @@ func DeleteHistoryByAge(maxDays int64) error {
return nil
}

func UpdateState(response model.TaskListResponse) error {
func UpdateState(tasks []*resource.Task) error {
var err error
db := GetDB()
defer db.Close()
defer func() { _ = db.Close() }()
totErrCount := 0
var totUpdates int64
if response.Pagination.TotalResults > 0 {
for _, resource := range response.Resources {
if len(tasks) > 0 {
for _, task := range tasks {
message := ""
if resource.State == "FAILED" {
message = util.LastXChars(resource.Result.FailureReason, 255)
if task.State == "FAILED" {
message = util.LastXChars(*task.Result.FailureReason, 255)
}
if result, err := db.Exec("update histories set state=?, message=? where task_guid=?", resource.State, message, resource.GUID); err != nil {
if result, err := db.Exec("update histories set state=?, message=? where task_guid=?", task.State, message, task.GUID); err != nil {
totErrCount++
fmt.Printf("failed to clean up histories: %s\n", err)
} else {
Expand Down
Loading

0 comments on commit a20120d

Please sign in to comment.