Skip to content

Commit

Permalink
add the interval based sync routine that compares actual network-poli…
Browse files Browse the repository at this point in the history
…cies with the ones needed according to all the binds/labels
  • Loading branch information
metskem committed Oct 7, 2024
1 parent fc3f0b6 commit ea97bad
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The configuration for the broker consists of the following environment variables
* **CLIENT_ID** - The uaa client to use for logging in to credhub, should have credhub_admin scope.
* **CATALOG_DIR** - The directory where to find the cf catalog for the broker, the directory should contain a file called catalog.json.
* **LISTEN_PORT** - The port that the broker should listen on, default is 8080.
* **SYNC_INTERVAL_SECS** - The interval the broker will sync the required network policies (according to the service bindings) with the actual network policies, and will create the missing policies, default is 300.
* **CFAPI_URL** - The URL of the cf api (i.e. https://api.sys.mydomain.com).
* **SKIP_SSL_VALIDATION** - Skip ssl validation or not, default is false.

Expand Down
16 changes: 14 additions & 2 deletions conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ var (
Debug = false
CredhubURL = os.Getenv("CREDHUB_URL")

Catalog model.Catalog
ListenPort int
Catalog model.Catalog
ListenPort int
SyncIntervalSecs int

ClientId = os.Getenv("CLIENT_ID")
ClientSecret = os.Getenv("CLIENT_SECRET")
BrokerUser = os.Getenv("BROKER_USER")
BrokerPassword = os.Getenv("BROKER_PASSWORD")
CatalogDir = os.Getenv("CATALOG_DIR")
ListenPortStr = os.Getenv("LISTEN_PORT")
SyncIntervalSecsStr = os.Getenv("SYNC_INTERVAL_SECS")
CfApiURL = os.Getenv("CFAPI_URL")
UaaApiURL = os.Getenv("UAA_URL")
SkipSslValidationStr = os.Getenv("SKIP_SSL_VALIDATION")
Expand Down Expand Up @@ -82,6 +84,16 @@ func EnvironmentComplete() {
envComplete = false
}
}
if SyncIntervalSecsStr == "" {
SyncIntervalSecs = 300
} else {
var err error
SyncIntervalSecs, err = strconv.Atoi(SyncIntervalSecsStr)
if err != nil {
fmt.Printf("failed reading envvar SYNC_INTERVAL_SECS, err: %s\n", err)
envComplete = false
}
}

app, e := cfenv.Current()
if e != nil {
Expand Down
12 changes: 10 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package main
import (
"encoding/json"
"fmt"
"os"

"github.com/rabobank/npsb/conf"
"github.com/rabobank/npsb/server"
"github.com/rabobank/npsb/util"
"os"
"time"
)

func main() {
Expand Down Expand Up @@ -35,4 +35,12 @@ func initialize() {
fmt.Printf("failed unmarshalling catalog file %s, error: %s\n", catalogFile, err)
os.Exit(8)
}

// start the routine that checks consistency between the service instance (labels) and the actual network policies:
go func() {
for {
util.SyncLabels2Policies()
time.Sleep(time.Duration(conf.SyncIntervalSecs) * time.Second)
}
}()
}
21 changes: 20 additions & 1 deletion model/NetworkPolicies.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type NetworkPolicyLabels struct {
}

func (np NetworkPolicyLabels) String() string {
return fmt.Sprintf("%s (%s) => %s (%s):%d (%s)", np.SourceName, np.Source, np.DestinationName, np.Destination, np.Port, np.Protocol)
return fmt.Sprintf("%s (%s) => %s (%s:%d(%s))", np.SourceName, np.Source, np.DestinationName, np.Destination, np.Port, np.Protocol)
}

type NetworkPolicies struct {
Expand All @@ -35,3 +35,22 @@ type Destination struct {
Protocol string `json:"protocol"`
Port int `json:"port"`
}

type InstancesWithBinds struct {
BoundApps []Destination `json:"bound_apps"`
SrcOrDst string `json:"src_or_dst"`
NameOrSource string `json:"name_or_source"`
}

func (iwb InstancesWithBinds) String() string {
var bindStr string
for _, bind := range iwb.BoundApps {
bindStr += fmt.Sprintf("%s:%d(%s), ", bind.Id, bind.Port, bind.Protocol)
}
return fmt.Sprintf("type:%s, name/source: %s, #binds: %d : %s", iwb.SrcOrDst, iwb.NameOrSource, len(iwb.BoundApps), bindStr)
}

type PolicyServerGetResponse struct {
TotalPolicies int `json:"total_policies"`
Policies []NetworkPolicy `json:"policies"`
}
162 changes: 157 additions & 5 deletions util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"log"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand All @@ -30,7 +31,7 @@ var orgCache = make(map[string]*resource.Organization)

type CacheEntry struct {
created time.Time
guid string
name string
}

func InitCFClient() *client.Client {
Expand Down Expand Up @@ -104,7 +105,7 @@ func DumpRequest(r *http.Request) {
} else {
fmt.Println(string(body))
}
// Restore the io.ReadCloser to it's original state
// Restore the io.ReadCloser to it s original state
r.Body = io.NopCloser(bytes.NewBuffer(body))
}
}
Expand Down Expand Up @@ -141,13 +142,13 @@ func Guid2AppName(guid string) string {
}
if cacheEntry, found := guid2appNameCache[guid]; found {
PrintfIfDebug("cache hit for guid %s\n", guid)
return cacheEntry.guid
return cacheEntry.name
}
if app, err := conf.CfClient.Applications.Get(conf.CfCtx, guid); err != nil {
fmt.Printf("failed to get app by guid %s, error: %s\n", guid, err)
fmt.Printf("failed to get app by name %s, error: %s\n", guid, err)
return ""
} else {
guid2appNameCache[guid] = CacheEntry{created: time.Now(), guid: app.Name}
guid2appNameCache[guid] = CacheEntry{created: time.Now(), name: app.Name}
return app.Name
}
}
Expand Down Expand Up @@ -314,3 +315,154 @@ func Contains(elems []interface{}, v string) bool {
}
return false
}

// SyncLabels2Policies - Find all ServiceInstances and their bound apps, figure out what network policies they represent, check if they exist, and if not, report and create them.
func SyncLabels2Policies() {
fmt.Printf("syncing labels to network policies...\n")
startTime := time.Now()
var allInstancesWithBinds []model.InstancesWithBinds
var totalServiceInstances int
var totalBinds int

//
// find all Instances with their binds, both source and destination
labelSelector := client.LabelSelector{}
labelSelector.Existence(conf.LabelNameType)
instanceListOption := client.ServiceInstanceListOptions{ListOptions: &client.ListOptions{LabelSel: labelSelector, PerPage: 5000}}
if instances, err := conf.CfClient.ServiceInstances.ListAll(conf.CfCtx, &instanceListOption); err != nil {
fmt.Printf("failed to list all service instances with label %s: %s\n", conf.LabelNameType, err)
} else {
if len(instances) < 1 {
PrintfIfDebug("could not find any service instances with label %s\n", conf.LabelNameType)
} else {
totalServiceInstances = len(instances)
for _, instance := range instances {
credBindingListOption := client.ServiceCredentialBindingListOptions{ListOptions: &client.ListOptions{PerPage: 1000}, ServiceInstanceGUIDs: client.Filter{Values: []string{instance.GUID}}}
if bindings, err := conf.CfClient.ServiceCredentialBindings.ListAll(conf.CfCtx, &credBindingListOption); err != nil {
fmt.Printf("failed to list service bindings for service instance %s: %s\n", instance.GUID, err)
} else {
if len(bindings) < 1 {
PrintfIfDebug("could not find any service bindings for service instance %s\n", instance.GUID)
} else {
var nameOrSource string
if instance.Metadata.Labels[conf.LabelNameName] != nil && *instance.Metadata.Labels[conf.LabelNameName] != "" {
nameOrSource = *instance.Metadata.Labels[conf.LabelNameName]
}
if instance.Metadata.Labels[conf.LabelNameSource] != nil && *instance.Metadata.Labels[conf.LabelNameSource] != "" {
nameOrSource = *instance.Metadata.Labels[conf.LabelNameSource]
}
instanceWithBinds := model.InstancesWithBinds{
BoundApps: make([]model.Destination, 0),
SrcOrDst: *instance.Metadata.Labels[conf.LabelNameType],
NameOrSource: nameOrSource,
}
for _, binding := range bindings {
totalBinds++
if instanceWithBinds.SrcOrDst == conf.LabelValueTypeSrc {
// if it is a type=source, we only need the app name
instanceWithBinds.BoundApps = append(instanceWithBinds.BoundApps, model.Destination{Id: binding.Relationships.App.Data.GUID})
} else {
port := 8080
if binding.Metadata.Labels[conf.LabelNamePort] != nil && *binding.Metadata.Labels[conf.LabelNamePort] != "" && *binding.Metadata.Labels[conf.LabelNamePort] != "0" {
port, _ = strconv.Atoi(*binding.Metadata.Labels[conf.LabelNamePort])
}
protocol := conf.LabelValueProtocolTCP
if binding.Metadata.Labels[conf.LabelNameProtocol] != nil && *binding.Metadata.Labels[conf.LabelNameProtocol] != "" {
protocol = *binding.Metadata.Labels[conf.LabelNameProtocol]
}
instanceWithBinds.BoundApps = append(instanceWithBinds.BoundApps, model.Destination{Id: binding.Relationships.App.Data.GUID, Protocol: protocol, Port: port})
}
}
allInstancesWithBinds = append(allInstancesWithBinds, instanceWithBinds)
}
}
}
}
PrintfIfDebug("found %d instances with label %s, %d instances have binds:\n", len(instances), conf.LabelNameType, len(allInstancesWithBinds))
}

//
// for each type=source instances, find the destination instances that point to this source instance, and generate the required network policies objects
var requiredNetworkPolicies []model.NetworkPolicy
for _, sourceInstance := range allInstancesWithBinds {
if sourceInstance.SrcOrDst == conf.LabelValueTypeSrc {
for _, destinationInstance := range allInstancesWithBinds {
if destinationInstance.SrcOrDst == conf.LabelValueTypeDest && destinationInstance.NameOrSource == sourceInstance.NameOrSource {
for _, sourceApp := range sourceInstance.BoundApps {
for _, destinationApp := range destinationInstance.BoundApps {
networkPolicy := model.NetworkPolicy{Source: model.Source{Id: sourceApp.Id}, Destination: model.Destination{Id: destinationApp.Id, Port: destinationApp.Port, Protocol: destinationApp.Protocol}}
// add the network policy to the list of network policies
requiredNetworkPolicies = append(requiredNetworkPolicies, networkPolicy)
// check if the network policy already exists, if not, create it
}
}
}
}
}
}
npString := ""
//for _, np := range requiredNetworkPolicies {
// npString += fmt.Sprintf("%s=>%s:%d(%s)\n", Guid2AppName(np.Source.Id), Guid2AppName(np.Destination.Id), np.Destination.Port, np.Destination.Protocol)
//}
PrintfIfDebug("found %d network policies that should exist according to labels:\n%v\n", len(requiredNetworkPolicies), npString)

//
// get all existing network policies, then for each network policy object check if a real network policy exists, if not, create it
existingNetworkPolicies := getAllNetworkPolicies()
policiesFixed := 0
for _, requiredNetworkPolicy := range requiredNetworkPolicies {
found := false
for _, existingNetworkPolicy := range existingNetworkPolicies {
if existingNetworkPolicy.Source.Id == requiredNetworkPolicy.Source.Id && existingNetworkPolicy.Destination.Id == requiredNetworkPolicy.Destination.Id && existingNetworkPolicy.Destination.Port == requiredNetworkPolicy.Destination.Port && existingNetworkPolicy.Destination.Protocol == requiredNetworkPolicy.Destination.Protocol {
found = true
break
}
}
if !found {
fmt.Printf("network policy %s=>%s:%d(%s) does not exist, creating it\n", Guid2AppName(requiredNetworkPolicy.Source.Id), Guid2AppName(requiredNetworkPolicy.Destination.Id), requiredNetworkPolicy.Destination.Port, requiredNetworkPolicy.Destination.Protocol)
err := Send2PolicyServer(conf.ActionBind, model.NetworkPolicies{Policies: []model.NetworkPolicy{requiredNetworkPolicy}})
if err != nil {
fmt.Printf("failed to create network policy %s=>%s:%d(%s): %s\n", Guid2AppName(requiredNetworkPolicy.Source.Id), Guid2AppName(requiredNetworkPolicy.Destination.Id), requiredNetworkPolicy.Destination.Port, requiredNetworkPolicy.Destination.Protocol, err)
} else {
policiesFixed++
}
}
}
endTime := time.Now()
fmt.Printf("checked %d service instances, checked %d binds, fixed %d missing network policies in %d ms\n", totalServiceInstances, totalBinds, policiesFixed, endTime.Sub(startTime).Milliseconds())
}

// getAllNetworkPolicies - query the policy server and return all network-policies
func getAllNetworkPolicies() []model.NetworkPolicy {
polServerResponse := &model.PolicyServerGetResponse{}
policyServerEndpoint := conf.CfApiURL + "/networking/v0/external/policies"
tokenSource, _ := conf.CfClient.CreateOAuth2TokenSource(conf.CfCtx)
token, _ := tokenSource.Token()
requestHeader := map[string][]string{"Content-Type": {"application/json"}, "Authorization": {token.AccessToken}}
requestUrl, _ := url.Parse(policyServerEndpoint)
httpRequest := http.Request{Method: http.MethodGet, URL: requestUrl, Header: requestHeader}
var httpClient http.Client
if conf.SkipSslValidation {
// Create new Transport that ignores untrusted CA's
clientAllowUntrusted := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
httpClient = http.Client{Transport: clientAllowUntrusted, Timeout: 30 * time.Second}
} else {
httpClient = http.Client{Timeout: 30 * time.Second}
}
response, err := httpClient.Do(&httpRequest)
if err != nil || (response != nil && response.StatusCode != http.StatusOK) {
if err != nil {
fmt.Printf("request to policy server failed: %s \n", err)
}
if response != nil && response.StatusCode != http.StatusOK {
fmt.Printf("request to policy server failed with response code %d\n", response.StatusCode)
}
} else {
defer func() { _ = response.Body.Close() }()
bodyBytes, _ := io.ReadAll(response.Body)
if err = json.Unmarshal(bodyBytes, polServerResponse); err != nil {
fmt.Printf("Failed to parse GET response from Policy Server: %s\n", err)
}
}
return polServerResponse.Policies
}

0 comments on commit ea97bad

Please sign in to comment.