diff --git a/blueprints/concurrency-limiting/base/config.libsonnet b/blueprints/concurrency-limiting/base/config.libsonnet index ef45b52a75..da2ca52d15 100644 --- a/blueprints/concurrency-limiting/base/config.libsonnet +++ b/blueprints/concurrency-limiting/base/config.libsonnet @@ -13,7 +13,6 @@ commonConfig { max_concurrency: '__REQUIRED_FIELD__', selectors: commonConfig.selectors_defaults, parameters: { - limit_by_label_key: 'limit_by_label_key', max_inflight_duration: '__REQUIRED_FIELD__', }, request_parameters: {}, diff --git a/blueprints/concurrency-limiting/base/gen/definitions.json b/blueprints/concurrency-limiting/base/gen/definitions.json index bfd9947efb..c13412fe4b 100644 --- a/blueprints/concurrency-limiting/base/gen/definitions.json +++ b/blueprints/concurrency-limiting/base/gen/definitions.json @@ -67,7 +67,6 @@ "parameters": { "description": "Parameters.", "default": { - "limit_by_label_key": "limit_by_label_key", "max_inflight_duration": "__REQUIRED_FIELD__" }, "type": "object", diff --git a/blueprints/concurrency-limiting/base/gen/values.yaml b/blueprints/concurrency-limiting/base/gen/values.yaml index 2d6fb53eb2..df065c97f7 100644 --- a/blueprints/concurrency-limiting/base/gen/values.yaml +++ b/blueprints/concurrency-limiting/base/gen/values.yaml @@ -29,7 +29,6 @@ policy: # Type: aperture.spec.v1.ConcurrencyLimiterParameters # Required: True parameters: - limit_by_label_key: "limit_by_label_key" max_inflight_duration: __REQUIRED_FIELD__ # Request Parameters. # Type: aperture.spec.v1.ConcurrencyLimiterRequestParameters diff --git a/blueprints/concurrency-scheduling/base/config.libsonnet b/blueprints/concurrency-scheduling/base/config.libsonnet index 3f88e008e1..50a4851940 100644 --- a/blueprints/concurrency-scheduling/base/config.libsonnet +++ b/blueprints/concurrency-scheduling/base/config.libsonnet @@ -13,14 +13,9 @@ commonConfig { max_concurrency: '__REQUIRED_FIELD__', selectors: commonConfig.selectors_defaults, concurrency_limiter: { - limit_by_label_key: 'limit_by_label_key', max_inflight_duration: '__REQUIRED_FIELD__', }, - scheduler: { - tokens_label_key: 'tokens', - priority_label_key: 'priority', - workload_label_key: 'workload', - }, + scheduler: {}, alerter: { alert_name: 'Too many inflight requests', }, diff --git a/blueprints/concurrency-scheduling/base/gen/definitions.json b/blueprints/concurrency-scheduling/base/gen/definitions.json index 81afa83720..988b45e953 100644 --- a/blueprints/concurrency-scheduling/base/gen/definitions.json +++ b/blueprints/concurrency-scheduling/base/gen/definitions.json @@ -61,7 +61,6 @@ "concurrency_limiter": { "description": "Concurrency Limiter Parameters.", "default": { - "limit_by_label_key": "limit_by_label_key", "max_inflight_duration": "__REQUIRED_FIELD__" }, "type": "object", @@ -75,11 +74,7 @@ }, "scheduler": { "description": "Scheduler configuration.", - "default": { - "priority_label_key": "priority", - "tokens_label_key": "tokens", - "workload_label_key": "workload" - }, + "default": {}, "type": "object", "$ref": "../../../gen/jsonschema/_definitions.json#/definitions/Scheduler" }, diff --git a/blueprints/concurrency-scheduling/base/gen/values.yaml b/blueprints/concurrency-scheduling/base/gen/values.yaml index a59ce455bd..8e0f4dc89d 100644 --- a/blueprints/concurrency-scheduling/base/gen/values.yaml +++ b/blueprints/concurrency-scheduling/base/gen/values.yaml @@ -25,7 +25,6 @@ policy: # Type: aperture.spec.v1.ConcurrencyLimiterParameters # Required: True concurrency_limiter: - limit_by_label_key: "limit_by_label_key" max_inflight_duration: __REQUIRED_FIELD__ # Max concurrency. # Type: float64 @@ -33,10 +32,7 @@ policy: max_concurrency: __REQUIRED_FIELD__ # Scheduler configuration. # Type: aperture.spec.v1.Scheduler - scheduler: - priority_label_key: "priority" - tokens_label_key: "tokens" - workload_label_key: "workload" + scheduler: {} # Flow selectors to match requests against. # Type: []aperture.spec.v1.Selector # Required: True diff --git a/blueprints/quota-scheduling/base/config.libsonnet b/blueprints/quota-scheduling/base/config.libsonnet index c7ea95f588..b1530677c4 100644 --- a/blueprints/quota-scheduling/base/config.libsonnet +++ b/blueprints/quota-scheduling/base/config.libsonnet @@ -15,14 +15,9 @@ commonConfig { fill_amount: '__REQUIRED_FIELD__', selectors: commonConfig.selectors_defaults, rate_limiter: { - limit_by_label_key: 'limit_key', interval: '__REQUIRED_FIELD__', }, - scheduler: { - tokens_label_key: 'tokens', - priority_label_key: 'priority', - workload_label_key: 'workload', - }, + scheduler: {}, alerter: { alert_name: 'More than 90% of requests are being rate limited', }, diff --git a/blueprints/quota-scheduling/base/gen/definitions.json b/blueprints/quota-scheduling/base/gen/definitions.json index 5bb9048839..97c841de2a 100644 --- a/blueprints/quota-scheduling/base/gen/definitions.json +++ b/blueprints/quota-scheduling/base/gen/definitions.json @@ -78,19 +78,14 @@ "rate_limiter": { "description": "Rate Limiter Parameters.", "default": { - "interval": "__REQUIRED_FIELD__", - "limit_by_label_key": "limit_key" + "interval": "__REQUIRED_FIELD__" }, "type": "object", "$ref": "../../../gen/jsonschema/_definitions.json#/definitions/RateLimiterParameters" }, "scheduler": { "description": "Scheduler configuration.", - "default": { - "priority_label_key": "priority", - "tokens_label_key": "tokens", - "workload_label_key": "workload" - }, + "default": {}, "type": "object", "$ref": "../../../gen/jsonschema/_definitions.json#/definitions/Scheduler" }, diff --git a/blueprints/quota-scheduling/base/gen/values.yaml b/blueprints/quota-scheduling/base/gen/values.yaml index 90b1967ecb..3758adf906 100644 --- a/blueprints/quota-scheduling/base/gen/values.yaml +++ b/blueprints/quota-scheduling/base/gen/values.yaml @@ -34,13 +34,9 @@ policy: # Required: True rate_limiter: interval: __REQUIRED_FIELD__ - limit_by_label_key: "limit_key" # Scheduler configuration. # Type: aperture.spec.v1.Scheduler - scheduler: - priority_label_key: "priority" - tokens_label_key: "tokens" - workload_label_key: "workload" + scheduler: {} # Flow selectors to match requests against. # Type: []aperture.spec.v1.Selector # Required: True diff --git a/blueprints/rate-limiting/base/config.libsonnet b/blueprints/rate-limiting/base/config.libsonnet index 2c8a24c3c6..01c88eff09 100644 --- a/blueprints/rate-limiting/base/config.libsonnet +++ b/blueprints/rate-limiting/base/config.libsonnet @@ -15,7 +15,6 @@ commonConfig { fill_amount: '__REQUIRED_FIELD__', selectors: commonConfig.selectors_defaults, parameters: { - limit_by_label_key: 'limit_key', interval: '__REQUIRED_FIELD__', }, request_parameters: {}, diff --git a/blueprints/rate-limiting/base/gen/definitions.json b/blueprints/rate-limiting/base/gen/definitions.json index 48de2877f8..244a1bd2ab 100644 --- a/blueprints/rate-limiting/base/gen/definitions.json +++ b/blueprints/rate-limiting/base/gen/definitions.json @@ -78,8 +78,7 @@ "parameters": { "description": "Parameters.", "default": { - "interval": "__REQUIRED_FIELD__", - "limit_by_label_key": "limit_key" + "interval": "__REQUIRED_FIELD__" }, "type": "object", "$ref": "../../../gen/jsonschema/_definitions.json#/definitions/RateLimiterParameters" diff --git a/blueprints/rate-limiting/base/gen/values.yaml b/blueprints/rate-limiting/base/gen/values.yaml index 691f746fdf..3eea9e97c4 100644 --- a/blueprints/rate-limiting/base/gen/values.yaml +++ b/blueprints/rate-limiting/base/gen/values.yaml @@ -34,7 +34,6 @@ policy: # Required: True parameters: interval: __REQUIRED_FIELD__ - limit_by_label_key: "limit_key" # Request Parameters. # Type: aperture.spec.v1.RateLimiterRequestParameters request_parameters: {} diff --git a/docs/content/aperture-for-infra/guides/api-quota-management/assets/inter-service-rate-limiting/policy.yaml b/docs/content/aperture-for-infra/guides/api-quota-management/assets/inter-service-rate-limiting/policy.yaml index ec8e6cdc92..1baf23bb0f 100644 --- a/docs/content/aperture-for-infra/guides/api-quota-management/assets/inter-service-rate-limiting/policy.yaml +++ b/docs/content/aperture-for-infra/guides/api-quota-management/assets/inter-service-rate-limiting/policy.yaml @@ -26,9 +26,6 @@ spec: num_sync: 4 limit_by_label_key: api_key scheduler: - priority_label_key: priority - tokens_label_key: tokens - workload_label_key: workload workloads: - label_matcher: match_labels: diff --git a/docs/content/guides/assets/managing-quotas/policy.yaml b/docs/content/guides/assets/managing-quotas/policy.yaml index 0797259d12..e16e241b7f 100644 --- a/docs/content/guides/assets/managing-quotas/policy.yaml +++ b/docs/content/guides/assets/managing-quotas/policy.yaml @@ -24,7 +24,6 @@ spec: limit_by_label_key: user_id scheduler: priority_label_key: priority - tokens_label_key: tokens workload_label_key: workload selectors: - control_point: quota-scheduling-feature diff --git a/docs/content/guides/assets/openai/policy.yaml b/docs/content/guides/assets/openai/policy.yaml index 63750eeaa4..a20200d372 100644 --- a/docs/content/guides/assets/openai/policy.yaml +++ b/docs/content/guides/assets/openai/policy.yaml @@ -24,8 +24,6 @@ spec: limit_by_label_key: api_key scheduler: priority_label_key: priority - tokens_label_key: tokens - workload_label_key: workload selectors: - control_point: openai label_matcher: diff --git a/docs/content/reference/aperture-cli/policies/assets/raw_values.yaml b/docs/content/reference/aperture-cli/policies/assets/raw_values.yaml index 691f746fdf..3eea9e97c4 100644 --- a/docs/content/reference/aperture-cli/policies/assets/raw_values.yaml +++ b/docs/content/reference/aperture-cli/policies/assets/raw_values.yaml @@ -34,7 +34,6 @@ policy: # Required: True parameters: interval: __REQUIRED_FIELD__ - limit_by_label_key: "limit_key" # Request Parameters. # Type: aperture.spec.v1.RateLimiterRequestParameters request_parameters: {} diff --git a/docs/content/reference/blueprints/concurrency-limiting/base.md b/docs/content/reference/blueprints/concurrency-limiting/base.md index 0c2bfeddc4..f1ee1f92c4 100644 --- a/docs/content/reference/blueprints/concurrency-limiting/base.md +++ b/docs/content/reference/blueprints/concurrency-limiting/base.md @@ -121,7 +121,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/concurrency description='Parameters.' type='Object (aperture.spec.v1.ConcurrencyLimiterParameters)' reference='../../configuration/spec#concurrency-limiter-parameters' - value='{"limit_by_label_key": "limit_by_label_key", "max_inflight_duration": "__REQUIRED_FIELD__"}' + value='{"max_inflight_duration": "__REQUIRED_FIELD__"}' /> diff --git a/docs/content/reference/blueprints/concurrency-scheduling/base.md b/docs/content/reference/blueprints/concurrency-scheduling/base.md index 4afd98aa04..d64011e9b9 100644 --- a/docs/content/reference/blueprints/concurrency-scheduling/base.md +++ b/docs/content/reference/blueprints/concurrency-scheduling/base.md @@ -107,7 +107,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/concurrency description='Concurrency Limiter Parameters.' type='Object (aperture.spec.v1.ConcurrencyLimiterParameters)' reference='../../configuration/spec#concurrency-limiter-parameters' - value='{"limit_by_label_key": "limit_by_label_key", "max_inflight_duration": "__REQUIRED_FIELD__"}' + value='{"max_inflight_duration": "__REQUIRED_FIELD__"}' /> @@ -135,7 +135,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/concurrency description='Scheduler configuration.' type='Object (aperture.spec.v1.Scheduler)' reference='../../configuration/spec#scheduler' - value='{"priority_label_key": "priority", "tokens_label_key": "tokens", "workload_label_key": "workload"}' + value='{}' /> diff --git a/docs/content/reference/blueprints/quota-scheduling/base.md b/docs/content/reference/blueprints/quota-scheduling/base.md index 687b340032..2d31e57c77 100644 --- a/docs/content/reference/blueprints/quota-scheduling/base.md +++ b/docs/content/reference/blueprints/quota-scheduling/base.md @@ -135,7 +135,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/quota-sched description='Rate Limiter Parameters.' type='Object (aperture.spec.v1.RateLimiterParameters)' reference='../../configuration/spec#rate-limiter-parameters' - value='{"interval": "__REQUIRED_FIELD__", "limit_by_label_key": "limit_key"}' + value='{"interval": "__REQUIRED_FIELD__"}' /> @@ -149,7 +149,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/quota-sched description='Scheduler configuration.' type='Object (aperture.spec.v1.Scheduler)' reference='../../configuration/spec#scheduler' - value='{"priority_label_key": "priority", "tokens_label_key": "tokens", "workload_label_key": "workload"}' + value='{}' /> diff --git a/docs/content/reference/blueprints/rate-limiting/base.md b/docs/content/reference/blueprints/rate-limiting/base.md index c5aca5e1d5..5af13ac4d6 100644 --- a/docs/content/reference/blueprints/rate-limiting/base.md +++ b/docs/content/reference/blueprints/rate-limiting/base.md @@ -135,7 +135,7 @@ href={`https://github.com/fluxninja/aperture/tree/${aver}/blueprints/rate-limiti description='Parameters.' type='Object (aperture.spec.v1.RateLimiterParameters)' reference='../../configuration/spec#rate-limiter-parameters' - value='{"interval": "__REQUIRED_FIELD__", "limit_by_label_key": "limit_key"}' + value='{"interval": "__REQUIRED_FIELD__"}' /> diff --git a/pkg/scheduler/wfq.go b/pkg/scheduler/wfq.go index 4779e1e218..0d198379b3 100644 --- a/pkg/scheduler/wfq.go +++ b/pkg/scheduler/wfq.go @@ -528,11 +528,32 @@ func (pMetrics *preemptionMetrics) onQueueEntry(request *Request, qRequest *queu // Update metrics for preemption and delay // WARNING: Unsafe and should be called with scheduler lock. func (pMetrics *preemptionMetrics) onQueueExit(request *Request, qRequest *queuedRequest, allowed bool) { + initMetrics := func(labels prometheus.Labels) error { + var err error + _, err = pMetrics.workloadPreemptedTokensSummary.GetMetricWith(labels) + if err != nil { + return fmt.Errorf("%w: failed to get workload_preempted_tokens summary", err) + } + _, err = pMetrics.workloadDelayedTokensSummary.GetMetricWith(labels) + if err != nil { + return fmt.Errorf("%w: failed to get workload_delayed_tokens summary", err) + } + _, err = pMetrics.workloadOnTimeCounter.GetMetricWith(labels) + if err != nil { + return fmt.Errorf("%w: failed to get workload_on_time_total counter", err) + } + return nil + } + publishSummary := func(summary *prometheus.SummaryVec, value float64) { if summary == nil { return } metricsLabels := appendWorkloadLabel(pMetrics.metricsLabels, request.FairnessLabel) + err := initMetrics(metricsLabels) + if err != nil { + log.Error().Err(err).Msg("Failed to initialize metrics") + } observer, err := summary.GetMetricWith(metricsLabels) if err != nil { log.Error().Err(err).Msg("Failed to get workload preempted tokens summary") @@ -546,6 +567,10 @@ func (pMetrics *preemptionMetrics) onQueueExit(request *Request, qRequest *queue return } metricsLabels := appendWorkloadLabel(pMetrics.metricsLabels, request.FairnessLabel) + err := initMetrics(metricsLabels) + if err != nil { + log.Error().Err(err).Msg("Failed to initialize metrics") + } counter, err := counterVec.GetMetricWith(metricsLabels) if err != nil { log.Error().Err(err).Msg("Failed to get workload on time counter") diff --git a/playground/Tiltfile b/playground/Tiltfile index e128dcf94e..950819d09c 100644 --- a/playground/Tiltfile +++ b/playground/Tiltfile @@ -379,6 +379,9 @@ DEP_TREE = { { "workload": "aperture-go-example", "resource_deps": ["agent", "controller", "cluster-bootstrap"], + "extra_objects": [ + ("^aperture-go-example", "serviceaccount", APERTURE_GO_EXAMPLE_NS) + ], } ] }, @@ -1133,6 +1136,10 @@ def declare_resources(resources, dep_tree, inv_dep_tree, race_arg, cloud_extensi namespace = app_config.get("namespace", None) if tkenv: base_dir = app_config.get("base_dir") + if not namespace and "playground/tanka/apps/aperture-go-example" in tkenv and APERTURE_GO_EXAMPLE_NS + ":namespace" not in managed_namespaces: + namespace_create(APERTURE_GO_EXAMPLE_NS, labels=ns_labels) + namespace = APERTURE_GO_EXAMPLE_NS + managed_namespaces += [namespace + ":namespace"] render_tanka(tkenv, env_vars, base_dir, namespace, cloud_extension, values) elif renderer == "yaml": render_yaml(manifests_path, env_vars) diff --git a/playground/scenarios/concurreny-limiter/load-generator/test.js b/playground/scenarios/concurrency-limiter/load-generator/test.js similarity index 100% rename from playground/scenarios/concurreny-limiter/load-generator/test.js rename to playground/scenarios/concurrency-limiter/load-generator/test.js diff --git a/playground/scenarios/concurreny-limiter/metadata.json b/playground/scenarios/concurrency-limiter/metadata.json similarity index 100% rename from playground/scenarios/concurreny-limiter/metadata.json rename to playground/scenarios/concurrency-limiter/metadata.json diff --git a/playground/scenarios/concurreny-limiter/policies/concurrency-limiting-cr.yaml b/playground/scenarios/concurrency-limiter/policies/concurrency-limiting-cr.yaml similarity index 100% rename from playground/scenarios/concurreny-limiter/policies/concurrency-limiting-cr.yaml rename to playground/scenarios/concurrency-limiter/policies/concurrency-limiting-cr.yaml diff --git a/playground/scenarios/concurreny-limiter/policies/concurrency-limiting.yaml b/playground/scenarios/concurrency-limiter/policies/concurrency-limiting.yaml similarity index 100% rename from playground/scenarios/concurreny-limiter/policies/concurrency-limiting.yaml rename to playground/scenarios/concurrency-limiter/policies/concurrency-limiting.yaml diff --git a/playground/scenarios/concurrency-scheduler/policies/concurrency-scheduler-cr.yaml b/playground/scenarios/concurrency-scheduler/policies/concurrency-scheduler-cr.yaml index a73a9f0f69..b9b6bb55ab 100644 --- a/playground/scenarios/concurrency-scheduler/policies/concurrency-scheduler-cr.yaml +++ b/playground/scenarios/concurrency-scheduler/policies/concurrency-scheduler-cr.yaml @@ -20,9 +20,6 @@ spec: accept_percentage: signal_name: ACCEPT_PERCENTAGE scheduler: - priority_label_key: priority - tokens_label_key: tokens - workload_label_key: workload workloads: - label_matcher: match_labels: diff --git a/playground/scenarios/postgres-concurrency/load-generator/test.js b/playground/scenarios/postgres-concurrency/load-generator/test.js new file mode 100644 index 0000000000..e69f291597 --- /dev/null +++ b/playground/scenarios/postgres-concurrency/load-generator/test.js @@ -0,0 +1,50 @@ +import { randomIntBetween } from "https://jslib.k6.io/k6-utils/1.4.0/index.js"; +import { check, sleep } from "k6"; +import { vu } from "k6/execution"; +import http from "k6/http"; + +export let vuStages = [ + { duration: "10s", target: 10 }, + { duration: "2m", target: 50 }, + { duration: "10s", target: 10 }, +]; + +export let options = { + discardResponseBodies: true, + scenarios: { + guests: { + executor: "ramping-vus", + stages: vuStages, + env: { USER_TYPE: "guest" }, + }, + subscribers: { + executor: "ramping-vus", + stages: vuStages, + env: { USER_TYPE: "subscriber" }, + }, + }, +}; + +export default function () { + let userType = __ENV.USER_TYPE; + let userId = vu.idInTest; + const url = "http://aperture-go-example.aperture-go-example.svc.cluster.local:80/postgres"; + const headers = { + "Content-Type": "application/json", + Cookie: + "session=eyJ1c2VyIjoia2Vub2JpIn0.YbsY4Q.kTaKRTyOIfVlIbNB48d9YH6Q0wo", + "User-Type": userType, + "User-Id": userId, + }; + const body = {} + let res = http.request("POST", url, JSON.stringify(body), { + headers: headers, + }); + const ret = check(res, { + "http status was 200": res.status === 200, + }); + if (!ret) { + // sleep for 10ms to 25ms + sleep(randomIntBetween(0.01, 0.025)); + } +} diff --git a/playground/scenarios/postgres-concurrency/metadata.json b/playground/scenarios/postgres-concurrency/metadata.json new file mode 100644 index 0000000000..7d88f822a8 --- /dev/null +++ b/playground/scenarios/postgres-concurrency/metadata.json @@ -0,0 +1,31 @@ +{ + "renderer": "tanka", + "tkenv": "playground/tanka/apps/aperture-go-example", + "needs": ["postgresql"], + "aperture_policies": [ + { + "policy_name": "concurrency-scheduling", + "values_file": "policies/concurrency-scheduling.yaml" + } + ], + "images": [ + { + "ref": "aperture-go-example", + "context": "sdks/aperture-go", + "dockerfile": "Dockerfile.manual", + "ssh": "default" + } + ], + "child_resources": [ + { + "workload": "aperture-go-example", + "resource_deps": [ + "agent", + "controller", + "cluster-bootstrap", + "postgresql" + ], + "extra_objects": ["aperture-go-example:serviceaccount"] + } + ] +} diff --git a/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling-cr.yaml b/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling-cr.yaml new file mode 100644 index 0000000000..fc67c6ab45 --- /dev/null +++ b/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling-cr.yaml @@ -0,0 +1,49 @@ +apiVersion: fluxninja.com/v1alpha1 +kind: Policy +metadata: + labels: + fluxninja.com/validate: "true" + name: concurrency-scheduling +spec: + circuit: + components: + - flow_control: + concurrency_scheduler: + concurrency_limiter: + max_idle_time: 7200s + max_inflight_duration: 60s + in_ports: + max_concurrency: + constant_signal: + value: 2 + out_ports: + accept_percentage: + signal_name: ACCEPT_PERCENTAGE + scheduler: + denied_response_status_code: BadRequest + priority_label_key: priority + workload_label_key: userType + selectors: + - agent_group: default + control_point: postgres + - decider: + in_ports: + lhs: + signal_name: ACCEPT_PERCENTAGE + rhs: + constant_signal: + value: 90 + operator: gte + out_ports: + output: + signal_name: ACCEPT_PERCENTAGE_ALERT + - alerter: + in_ports: + signal: + signal_name: ACCEPT_PERCENTAGE_ALERT + parameters: + alert_name: Too many inflight requests + evaluation_interval: 1s + resources: + flow_control: + classifiers: [] diff --git a/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling.yaml b/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling.yaml new file mode 100644 index 0000000000..b296a7bd07 --- /dev/null +++ b/playground/scenarios/postgres-concurrency/policies/concurrency-scheduling.yaml @@ -0,0 +1,27 @@ +# yaml-language-server: $schema=../../../../blueprints/concurrency-scheduling/base/gen/definitions.json +# Generated values file for concurrency-scheduling/base blueprint +# Documentation/Reference for objects and parameters can be found at: +# https://docs.fluxninja.com/reference/blueprints/concurrency-scheduling/base + +blueprint: concurrency-scheduling/base +uri: ../../../../blueprints +policy: + components: [] + policy_name: concurrency-scheduling + resources: + flow_control: + classifiers: [] + concurrency_scheduler: + alerter: + alert_name: "Too many inflight requests" + concurrency_limiter: + max_inflight_duration: 60s + max_idle_time: "7200s" + max_concurrency: 2 + scheduler: + workload_label_key: "userType" + priority_label_key: "priority" + denied_response_status_code: BadRequest + selectors: + - agent_group: default + control_point: postgres diff --git a/playground/scenarios/postgres-concurrency/values.yaml b/playground/scenarios/postgres-concurrency/values.yaml new file mode 100644 index 0000000000..73e693097d --- /dev/null +++ b/playground/scenarios/postgres-concurrency/values.yaml @@ -0,0 +1,4 @@ +sdk: + extraEnv: + APERTURE_ENABLE_POSTGRES: 'true' + POSTGRES_CONNECTION_STRING: 'postgres://postgres:secretpassword@postgresql.postgresql.svc.cluster.local:5432/postgres?sslmode=disable' diff --git a/playground/scenarios/quota-scheduler/policies/quota-scheduler-cr.yaml b/playground/scenarios/quota-scheduler/policies/quota-scheduler-cr.yaml index d49442a87d..bab0394a59 100644 --- a/playground/scenarios/quota-scheduler/policies/quota-scheduler-cr.yaml +++ b/playground/scenarios/quota-scheduler/policies/quota-scheduler-cr.yaml @@ -26,9 +26,6 @@ spec: num_sync: 4 limit_by_label_key: http.request.header.api_key scheduler: - priority_label_key: priority - tokens_label_key: tokens - workload_label_key: workload workloads: - label_matcher: match_labels: diff --git a/playground/tanka/apps/aperture-go-example/mixins.libsonnet b/playground/tanka/apps/aperture-go-example/mixins.libsonnet index 8ce2a7d025..7b96d3ce95 100644 --- a/playground/tanka/apps/aperture-go-example/mixins.libsonnet +++ b/playground/tanka/apps/aperture-go-example/mixins.libsonnet @@ -1,8 +1,12 @@ local sdk = import 'apps/aperture-sdk-example/main.libsonnet'; +local valuesStr = std.extVar('VALUES'); +local values = if valuesStr != '' then std.parseYaml(valuesStr) else {}; +local sdkValues = if std.objectHas(values, 'sdk') then values.sdk else {}; + sdk { - values+:: { + values+:: sdkValues { image+: { repository: 'docker.io/fluxninja/aperture-go-example', }, diff --git a/playground/tanka/lib/apps/aperture-sdk-example/aperture-sdk-example.libsonnet b/playground/tanka/lib/apps/aperture-sdk-example/aperture-sdk-example.libsonnet index 4e749eb777..6d5151b300 100644 --- a/playground/tanka/lib/apps/aperture-sdk-example/aperture-sdk-example.libsonnet +++ b/playground/tanka/lib/apps/aperture-sdk-example/aperture-sdk-example.libsonnet @@ -2,9 +2,11 @@ local k = import 'k.libsonnet'; local container = k.core.v1.container; local containerPort = k.core.v1.containerPort; +local namespace = k.core.v1.namespace; local deployment = k.apps.v1.deployment; local service = k.core.v1.service; local servicePort = k.core.v1.servicePort; +local serviceAccount = k.core.v1.serviceAccount; local defaults = { environment:: { @@ -22,12 +24,14 @@ local defaults = { port: 80, }, labels: { - 'app.kubernetes.io/name': $.environment.name, + 'app.kubernetes.io/name': 'example', + 'app.kubernetes.io/instance': $.environment.name, }, app_port: 8080, agent: { address: 'aperture-agent.aperture-agent.svc.cluster.local:8080', }, + extraEnv: {}, }, }; @@ -40,22 +44,27 @@ function(values={}, environment={}) { container.new(_environment.name, image='%(repository)s:%(tag)s' % _values.image) + container.withImagePullPolicy(_values.image.pullPolicy) + container.withPorts([ - containerPort.newNamed(_values.app_port, 'http'), + containerPort.newNamed(_values.app_port, 'srvhttp'), ]) + container.withEnvMap({ APERTURE_APP_PORT: std.toString(_values.app_port), APERTURE_AGENT_ADDRESS: _values.agent.address, APERTURE_AGENT_INSECURE: 'true', - }), + } + _values.extraEnv), ]) + + deployment.metadata.withLabels(_values.labels) + deployment.metadata.withNamespace(_environment.namespace) + deployment.spec.selector.withMatchLabels(_values.labels) - + deployment.spec.template.metadata.withLabels(_values.labels), + + deployment.spec.template.metadata.withLabels(_values.labels) + + deployment.spec.template.spec.withServiceAccountName(_environment.name), service: service.new($.deployment.metadata.name, selector=_values.labels, ports=[ - local portName = 'http'; - servicePort.newNamed(name=portName, port=_values.service.port, targetPort=portName), + servicePort.newNamed(name='http', port=_values.service.port, targetPort='srvhttp'), ]) - + service.metadata.withNamespace(_environment.namespace) - + service.metadata.withLabels(_values.labels), + + service.spec.withSelector(_values.labels) + + service.metadata.withNamespace(_environment.namespace), + serviceAccount: + serviceAccount.new($.deployment.metadata.name) + + serviceAccount.metadata.withNamespace(_environment.namespace) + + serviceAccount.metadata.withLabels(_values.labels), } diff --git a/sdks/aperture-go/Dockerfile.manual b/sdks/aperture-go/Dockerfile.manual new file mode 100644 index 0000000000..4e000a2326 --- /dev/null +++ b/sdks/aperture-go/Dockerfile.manual @@ -0,0 +1,25 @@ +# syntax=docker/dockerfile:1 +FROM golang:1.21.5 AS builder + +WORKDIR /src +COPY --link . . +RUN cd examples/manual && go mod download && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o "example" . + +# Final image +FROM debian:bullseye-slim + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + ca-certificates \ + wget \ + curl \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=builder /src/examples/manual/example /local/bin/example +COPY --from=builder /src/examples/manual/init.sql /init.sql + +HEALTHCHECK --interval=5s --timeout=60s --retries=3 --start-period=5s \ + CMD wget --no-verbose --tries=1 --spider 127.0.0.1:8080/health || exit 1 + +CMD ["/local/bin/example"] diff --git a/sdks/aperture-go/examples/manual/go.mod b/sdks/aperture-go/examples/manual/go.mod new file mode 100644 index 0000000000..b6b315f683 --- /dev/null +++ b/sdks/aperture-go/examples/manual/go.mod @@ -0,0 +1,36 @@ +module github.com/fluxninja/aperture-go/v2/manual + +go 1.21.5 + +require ( + github.com/fluxninja/aperture-go/v2 v2.0.0 + github.com/gorilla/mux v1.8.1 + github.com/lib/pq v1.10.9 + google.golang.org/grpc v1.59.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.2.1 // indirect + github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect + github.com/fluxninja/aperture/api/v2 v2.0.0-20240105045107-1a352382e8d8 // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/sdk v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect + go.opentelemetry.io/proto/otlp v1.0.0 // indirect + golang.org/x/net v0.19.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect + google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) + +replace github.com/fluxninja/aperture-go/v2 => ../../ diff --git a/sdks/aperture-go/examples/manual/go.sum b/sdks/aperture-go/examples/manual/go.sum new file mode 100644 index 0000000000..19d9493790 --- /dev/null +++ b/sdks/aperture-go/examples/manual/go.sum @@ -0,0 +1,66 @@ +github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= +github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA= +github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE= +github.com/fluxninja/aperture/api/v2 v2.0.0-20240105045107-1a352382e8d8 h1:wPAL1p5DeZFBl63UZPkYfWezdhBR2FK3OIEPmoXx1BY= +github.com/fluxninja/aperture/api/v2 v2.0.0-20240105045107-1a352382e8d8/go.mod h1:KSjIteqXmGJl1WOxQeBF9/K6/0sMHfKsRl5VOQkxyNg= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1 h1:6UKoz5ujsI55KNpsJH3UwCq3T8kKbZwNZBNPuTTje8U= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.1/go.mod h1:YvJ2f6MplWDhfxiUC3KpyTy76kYUZA4W3pTv/wdKQ9Y= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0 h1:cl5P5/GIfFh4t6xyruOgJP5QiA1pw4fYYdv6nc6CBWw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.21.0/go.mod h1:zgBdWWAu7oEEMC06MMKc5NLbA/1YDXV1sMpSqEeLQLg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqheXEFWAZ7O8A7m+J0aPTmpJN3YQ7qetUAdkkkKpk= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f h1:Vn+VyHU5guc9KjB5KrjI2q0wCOWEOIh0OEsleqakHJg= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f/go.mod h1:nWSwAFPb+qfNJXsoeO3Io7zf4tMSfN8EA8RlDA04GhY= +google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4 h1:ZcOkrmX74HbKFYnpPY8Qsw93fC29TbJXspYKaBkSXDQ= +google.golang.org/genproto/googleapis/api v0.0.0-20231127180814-3a041ad873d4/go.mod h1:k2dtGpRrbsSyKcNPKKI5sstZkrNCZwpU/ns96JoHbGg= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sdks/aperture-go/examples/manual/init.sql b/sdks/aperture-go/examples/manual/init.sql new file mode 100644 index 0000000000..a38fcd304c --- /dev/null +++ b/sdks/aperture-go/examples/manual/init.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS users ( + id TEXT, + type TEXT, + PRIMARY KEY (id) +); diff --git a/sdks/aperture-go/examples/manual_flow/main.go b/sdks/aperture-go/examples/manual/main.go similarity index 60% rename from sdks/aperture-go/examples/manual_flow/main.go rename to sdks/aperture-go/examples/manual/main.go index 7a67a7eaa4..6d952c4c60 100644 --- a/sdks/aperture-go/examples/manual_flow/main.go +++ b/sdks/aperture-go/examples/manual/main.go @@ -4,16 +4,21 @@ import ( "context" "crypto/tls" "crypto/x509" + "database/sql" + "encoding/json" + "fmt" "log" "net" "net/http" "os" "os/signal" "strconv" + "strings" "syscall" "time" "github.com/gorilla/mux" + _ "github.com/lib/pq" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/connectivity" @@ -24,13 +29,14 @@ import ( ) const ( - defaultAppPort = "8080" + defaultAppPort = "8099" ) // app struct contains the server and the Aperture client. type app struct { server *http.Server apertureClient aperture.Client + db *sql.DB } // START: grpcOptions @@ -60,9 +66,34 @@ func grpcOptions(insecureMode, skipVerify bool) []grpc.DialOption { // END: grpcOptions +func runInitScript(db *sql.DB, scriptPath string) error { + // Read the init script + script, err := os.ReadFile(scriptPath) + if err != nil { + return fmt.Errorf("failed to read init script: %w", err) + } + + // Split the script into separate queries + queries := strings.Split(string(script), ";") + + // Execute each query + for _, query := range queries { + query = strings.TrimSpace(query) // Remove leading/trailing whitespace + if query != "" { + _, err := db.Exec(query) + if err != nil { + return fmt.Errorf("failed to execute query: %w", err) + } + } + } + + return nil +} + func main() { ctx := context.Background() + apertureAgentAddress := getEnvOrDefault("APERTURE_AGENT_ADDRESS", "localhost:8089") apertureAgentInsecure := getEnvOrDefault("APERTURE_AGENT_INSECURE", "false") apertureAgentInsecureBool, _ := strconv.ParseBool(apertureAgentInsecure) apertureAgentSkipVerify := getEnvOrDefault("APERTURE_AGENT_SKIP_VERIFY", "false") @@ -71,9 +102,8 @@ func main() { // START: clientConstructor opts := aperture.Options{ - Address: "ORGANIZATION.app.fluxninja.com:443", + Address: apertureAgentAddress, DialOptions: grpcOptions(apertureAgentInsecureBool, apertureAgentSkipVerifyBool), - APIKey: getEnvOrDefault("APERTURE_API_KEY", ""), } // initialize Aperture Client with the provided options. @@ -84,21 +114,43 @@ func main() { // END: clientConstructor + pgsqlDB := &sql.DB{} + enablePostgres := getEnvOrDefault("APERTURE_ENABLE_POSTGRES", "false") + if enablePostgres == "true" { + pgsqlURL := getEnvOrDefault("POSTGRES_CONNECTION_STRING", "") + if pgsqlURL == "" { + log.Fatalf("failed to get postgres connection string") + } + + pgsqlDB, err = sql.Open("postgres", pgsqlURL) + if err != nil { + log.Fatalf("failed to open postgres connection: %v", err) + } + err = pgsqlDB.Ping() + if err != nil { + log.Fatalf("failed to ping postgres: %v", err) + } + + err = runInitScript(pgsqlDB, "/init.sql") + if err != nil { + log.Fatalf("failed to run init script: %v", err) + } + } + appPort := getEnvOrDefault("APERTURE_APP_PORT", defaultAppPort) // Create a server with passing it the Aperture client. mux := mux.NewRouter() a := &app{ server: &http.Server{ - Addr: net.JoinHostPort("localhost", appPort), + Addr: net.JoinHostPort("0.0.0.0", appPort), Handler: mux, }, apertureClient: apertureClient, + db: pgsqlDB, } - // Adding the http middleware to be executed before the actual business logic execution. - superRouter := mux.PathPrefix("/super").Subrouter() - superRouter.HandleFunc("", a.SuperHandler) - + mux.HandleFunc("/super", a.SuperHandler) + mux.HandleFunc("/postgres", a.PostgresHandler) mux.HandleFunc("/connected", a.ConnectedHandler) mux.HandleFunc("/health", a.HealthHandler) @@ -120,11 +172,13 @@ func main() { if err := a.server.Shutdown(ctx); err != nil { log.Fatalf("Failed to shutdown server: %+v", err) } + if err := pgsqlDB.Close(); err != nil { + log.Fatalf("Failed to close postgres connection: %+v", err) + } } // SuperHandler handles HTTP requests on /super endpoint. func (a *app) SuperHandler(w http.ResponseWriter, r *http.Request) { - // START: manualFlowNoCaching // START: defineLabels @@ -157,17 +211,14 @@ func (a *app) SuperHandler(w http.ResponseWriter, r *http.Request) { log.Println("Flow Accepted Processing work") w.WriteHeader(http.StatusAccepted) - w.Write([]byte("Super!")) - } else { - // handle flow rejection by Aperture Agent log.Println("Flow Rejected") flow.SetStatus(aperture.Error) w.WriteHeader(http.StatusForbidden) } - endResponse:= flow.End() + endResponse := flow.End() if endResponse.Error != nil { log.Printf("Failed to end flow: %+v", endResponse.Error) } @@ -177,6 +228,74 @@ func (a *app) SuperHandler(w http.ResponseWriter, r *http.Request) { // END: manualFlowNoCaching } +// PostgresHandler handles HTTP requests on /postgres endpoint. +func (a *app) PostgresHandler(w http.ResponseWriter, r *http.Request) { + if a.db == nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + + priority := "1" + + userID := r.Header.Get("User-Id") + if userID == "" { + userID = "kenobi" + } + + userType := r.Header.Get("User-Type") + if userType == "" { + userType = "jedi" + } else if userType == "guest" { + priority = "50" + } else if userType == "subscriber" { + priority = "200" + } + + labels := map[string]string{ + "userId": userID, + "userType": userType, + "priority": priority, + } + + flowParams := aperture.FlowParams{ + Labels: labels, + RampMode: false, + } + + ctxTimeout, cancel := context.WithTimeout(r.Context(), 10*time.Second) + defer cancel() + + log.Printf("Starting flow with params: %+v", flowParams) + flow := a.apertureClient.StartFlow(ctxTimeout, "postgres", flowParams) + if flow.ShouldRun() { + time.Sleep(2 * time.Second) + + log.Println("Flow Accepted Processing work") + + _, err := a.db.Exec("INSERT into users (id, type) VALUES ($1, $2)", userID, userType) + if err != nil { + log.Printf("Failed to insert into postgres: %+v", err) + flow.SetStatus(aperture.Error) + w.WriteHeader(http.StatusInternalServerError) + } + + w.WriteHeader(http.StatusAccepted) + } else { + log.Println("Flow Rejected") + flow.SetStatus(aperture.Error) + w.WriteHeader(http.StatusForbidden) + } + + checkResponse := flow.CheckResponse() + checkResponseBytes, _ := json.Marshal(checkResponse) + log.Printf("Flow check response: %+v", string(checkResponseBytes)) + + endResponse := flow.End() + if endResponse.Error != nil { + log.Printf("Failed to end flow: %+v", endResponse.Error) + } +} + // ConnectedHandler handles HTTP requests on /connected endpoint. func (a *app) ConnectedHandler(w http.ResponseWriter, r *http.Request) { a.apertureClient.GetGRPClientConn().Connect() diff --git a/sdks/aperture-go/sdk/flow.go b/sdks/aperture-go/sdk/flow.go index 538f6a4ca8..a50922b0cf 100644 --- a/sdks/aperture-go/sdk/flow.go +++ b/sdks/aperture-go/sdk/flow.go @@ -310,7 +310,7 @@ func (f *flow) End() EndResponse { ) f.span.End() - inflightRequests := make([]*checkv1.InflightRequestRef, len(f.checkResponse.GetLimiterDecisions())) + inflightRequests := []*checkv1.InflightRequestRef{} for _, decision := range f.checkResponse.GetLimiterDecisions() { if decision.GetConcurrencyLimiterInfo() != nil {