From b4fab71957803582814367476d08798acf15c5bb Mon Sep 17 00:00:00 2001 From: Anand Swaminathan Date: Tue, 15 Oct 2019 12:34:32 -0700 Subject: [PATCH] Implementation of Weighted Clusters and domain based routing (#16) --- Gopkg.lock | 191 +++++++++--------- Gopkg.toml | 2 +- cmd/entrypoints/clusterresource.go | 7 +- pkg/clusterresource/controller.go | 8 +- pkg/executioncluster/execution_target.go | 43 +--- pkg/executioncluster/factory.go | 40 ---- .../impl/cluster_execution_target_provider.go | 46 +++++ pkg/executioncluster/impl/factory.go | 24 +++ pkg/executioncluster/{ => impl}/in_cluster.go | 16 +- .../{ => impl}/in_cluster_test.go | 12 +- .../impl/random_cluster_selector.go | 173 ++++++++++++++++ .../impl/random_cluster_selector_test.go | 121 +++++++++++ pkg/executioncluster/interfaces/cluster.go | 11 + .../interfaces/execution_target_provider.go | 11 + .../mocks/execution_target_provider.go | 17 ++ .../random_cluster_selector.go | 71 ------- .../random_cluster_selector_test.go | 59 ------ .../testdata/clusters_config.yaml | 30 +++ pkg/rpc/adminservice/base.go | 6 +- .../interfaces/cluster_configuration.go | 10 +- pkg/workflowengine/impl/propeller_executor.go | 10 +- .../impl/propeller_executor_test.go | 28 ++- 22 files changed, 594 insertions(+), 342 deletions(-) delete mode 100644 pkg/executioncluster/factory.go create mode 100644 pkg/executioncluster/impl/cluster_execution_target_provider.go create mode 100644 pkg/executioncluster/impl/factory.go rename pkg/executioncluster/{ => impl}/in_cluster.go (63%) rename pkg/executioncluster/{ => impl}/in_cluster_test.go (68%) create mode 100644 pkg/executioncluster/impl/random_cluster_selector.go create mode 100644 pkg/executioncluster/impl/random_cluster_selector_test.go create mode 100644 pkg/executioncluster/interfaces/cluster.go create mode 100644 pkg/executioncluster/interfaces/execution_target_provider.go create mode 100644 pkg/executioncluster/mocks/execution_target_provider.go delete mode 100644 pkg/executioncluster/random_cluster_selector.go delete mode 100644 pkg/executioncluster/random_cluster_selector_test.go create mode 100644 pkg/executioncluster/testdata/clusters_config.yaml diff --git a/Gopkg.lock b/Gopkg.lock index 3e2117eeff..b5aae042ac 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,12 @@ [[projects]] - digest = "1:80004fcc5cf64e591486b3e11b406f1e0d17bf85d475d64203c8494f5da4fcd1" + digest = "1:26ee1e365ea8f312ee11e170fc6675bac0dd3d4adf2406e753d0a43527e1afb8" name = "cloud.google.com/go" packages = ["compute/metadata"] pruneopts = "UT" - revision = "ceeb313ad77b789a7fa5287b36a1d127b69b7093" - version = "v0.44.3" + revision = "cfe8f6d1fe6976d03af790d7a8b9bf6aa73287bd" + version = "v0.47.0" [[projects]] digest = "1:94d4ae958b3d2ab476bef4bed53c1dcc3cb0fb2639bd45dd08b40e57139192e5" @@ -18,7 +18,7 @@ version = "v10.2.1-beta" [[projects]] - digest = "1:0aa68ac7d88c06b85442e07b9e4d56cb5e332df2360fa2a5441b2edc5f1ae32b" + digest = "1:0f857a863c24bb1c277a5f3f8cb8a30e65b841405f69590132ee23ed9e7e6fbe" name = "github.com/Azure/go-autorest" packages = [ "autorest", @@ -29,8 +29,8 @@ "tracing", ] pruneopts = "UT" - revision = "5e7a399d8bbf4953ab0c8e3167d7fd535fd74ce1" - version = "v13.0.0" + revision = "740293c019d8314ce3378d456b4327fa646297e6" + version = "v13.2.0" [[projects]] digest = "1:4d8aa8bc01f60d0fd7f764e1838f26dbc5a5dec428217f936726007cdf3929f0" @@ -54,7 +54,7 @@ version = "v1.0.7" [[projects]] - digest = "1:313b743d54588010f7c6f5e00bbfe00ad0a2d63a075cb7d71ea85eaf8f91efa7" + digest = "1:e490a9e2f55d80388dce0be4cc64a17c720aecf8ca0a067a92ba83949457ba14" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -107,8 +107,8 @@ "service/sts/stsiface", ] pruneopts = "UT" - revision = "d57c8d96f72d9475194ccf18d2ba70ac294b0cb3" - version = "v1.23.13" + revision = "e42fa244c725b2a423a8e9cf1c86828c0bf9400a" + version = "v1.25.12" [[projects]] branch = "master" @@ -136,19 +136,19 @@ [[projects]] branch = "master" - digest = "1:f98385a9b77f6cacae716a59c04e6ac374d101466d4369c4e8cc706a39c4bb2e" + digest = "1:615811fa58b830195c12ad4509eb40d0daf34872879cfb8bd3d65e08df2d0c3f" name = "github.com/bradfitz/gomemcache" packages = ["memcache"] pruneopts = "UT" - revision = "551aad21a6682b95329c1f5bd62ee5060d64f7e8" + revision = "a41fca850d0b6f392931a78cbae438803ea0b886" [[projects]] - digest = "1:998cf998358a303ac2430c386ba3fd3398477d6013153d3c6e11432765cc9ae6" + digest = "1:47056e6fe3a3524238d48ddcb6d38bfcfa6db346a11d73d83f1620a8cbf69793" name = "github.com/cespare/xxhash" packages = ["."] pruneopts = "UT" - revision = "3b82fb7d186719faeedd0c2864f868c74fbf79a1" - version = "v2.0.0" + revision = "de209a9ffae3256185a6bb135d1a0ada7b2b5f09" + version = "v2.1.0" [[projects]] digest = "1:00eb5d8bd96289512920ac43367d5bee76bbca2062da34862a98b26b92741896" @@ -192,22 +192,22 @@ [[projects]] branch = "master" - digest = "1:78a5b63751bd99054bee07a498f6aa54da0a909922f9365d1aa3339091efa70a" + digest = "1:925a2ad8acf10a486cdae4366eaf45847b16d6d7448e654814d8f1d51adeefe4" name = "github.com/fsnotify/fsnotify" packages = ["."] pruneopts = "UT" - revision = "1485a34d5d5723fea214f5710708e19a831720e4" + revision = "4bf2d1fec78374803a39307bfb8d340688f4f28e" [[projects]] - digest = "1:4d02824a56d268f74a6b6fdd944b20b58a77c3d70e81008b3ee0c4f1a6777340" + digest = "1:582e25eccee928dc12416ea4c23b6dae8f3b5687730632aa1473ebebe80a2359" name = "github.com/gogo/protobuf" packages = [ "proto", "sortkeys", ] pruneopts = "UT" - revision = "ba06b47c162d49f2af050fb4c75bcbc86a159d5c" - version = "v1.2.1" + revision = "5628607bb4c51c3157aacc3a50f0ab707582b805" + version = "v1.3.1" [[projects]] branch = "master" @@ -218,15 +218,21 @@ revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998" [[projects]] - digest = "1:b532ee3f683c057e797694b5bfeb3827d89e6adf41c53dbc80e549bca76364ea" + branch = "master" + digest = "1:b7cb6054d3dff43b38ad2e92492f220f57ae6087ee797dca298139776749ace8" + name = "github.com/golang/groupcache" + packages = ["lru"] + pruneopts = "UT" + revision = "404acd9df4cc9859d64fb9eed42e5c026187287a" + +[[projects]] + digest = "1:e3839df32927e8d3403cd5aa7253d966e8ff80fc8f10e2e35d146461cd83fcfa" name = "github.com/golang/protobuf" packages = [ + "descriptor", "jsonpb", "proto", "protoc-gen-go/descriptor", - "protoc-gen-go/generator", - "protoc-gen-go/generator/internal/remap", - "protoc-gen-go/plugin", "ptypes", "ptypes/any", "ptypes/duration", @@ -282,7 +288,7 @@ revision = "903027f87de7054953efcdb8ba70d5dc02df38c7" [[projects]] - digest = "1:73513cdd52d6f0768201cebbf82612aa39a9d8022bc6337815cd504e532281b7" + digest = "1:64275948f03cdff02b7eca878ecc0f9539a6df62d12356d3d3d90c7e7b8f9023" name = "github.com/grpc-ecosystem/go-grpc-middleware" packages = [ ".", @@ -291,8 +297,8 @@ "util/metautils", ] pruneopts = "UT" - revision = "c250d6563d4d4c20252cd865923440e829844f4e" - version = "v1.0.0" + revision = "dd15ed025b6054e5253963e355991f3070d4e593" + version = "v1.1.0" [[projects]] digest = "1:9b7a07ac7577787a8ecc1334cb9f34df1c76ed82a917d556c5713d3ab84fbc43" @@ -303,7 +309,7 @@ version = "v1.2.0" [[projects]] - digest = "1:9da9ffdf93e29e054fb3b066e3c258e8ed090f6bec4bba1e86aeb9b1ba0056a9" + digest = "1:eb9de289c8c6ff3bc3c07a0cb36506b20dc7295cc2a5d72d98d4b96ea0e1be18" name = "github.com/grpc-ecosystem/grpc-gateway" packages = [ "internal", @@ -312,16 +318,8 @@ "utilities", ] pruneopts = "UT" - revision = "a9bbe40ed238db18f710b0e3d2970348c8fcec41" - version = "v1.10.0" - -[[projects]] - digest = "1:7fae9ec96d10b2afce0da23c378c8b3389319b7f92fa092f2621bba3078cfb4b" - name = "github.com/hashicorp/golang-lru" - packages = ["simplelru"] - pruneopts = "UT" - revision = "7f827b33c0f158ec5dfbba01bb0b14a4541fd81d" - version = "v0.5.3" + revision = "ece8fdf051b731392b407fdb9a9b1b9ffb6f9793" + version = "v1.11.3" [[projects]] digest = "1:c0d19ab64b32ce9fe5cf4ddceba78d5bc9807f0016db6b1183599da3dcc24d10" @@ -343,12 +341,12 @@ version = "v1.0.0" [[projects]] - digest = "1:a0cefd27d12712af4b5018dc7046f245e1e3b5760e2e848c30b171b570708f9b" + digest = "1:78d28d5b84a26159c67ea51996a230da4bc07cac648adaae1dfb5fc0ec8e40d3" name = "github.com/imdario/mergo" packages = ["."] pruneopts = "UT" - revision = "7c29201646fa3de8506f701213473dd407f19646" - version = "v0.3.7" + revision = "1afb36080aec31e0d1528973ebe6721b191b0369" + version = "v0.3.8" [[projects]] digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" @@ -359,15 +357,15 @@ version = "v1.0" [[projects]] - digest = "1:da6718abe4d47b1132d98bf3f9b18e302d537bf6daf02bd40804d9295a3f32bd" + digest = "1:a0216296da5c3191358b114fa8d2490477f5dff7380541551495b4f2a16989b3" name = "github.com/jinzhu/gorm" packages = [ ".", "dialects/postgres", ] pruneopts = "UT" - revision = "836fb2c19d84dac7b0272958dfb9af7cf0d0ade4" - version = "v1.9.10" + revision = "81c17a7e2529c59efc4e74c5b32c1fb71fb12fa2" + version = "v1.9.11" [[projects]] digest = "1:01ed62f8f4f574d8aff1d88caee113700a2b44c42351943fa73cc1808f736a50" @@ -440,16 +438,16 @@ version = "v0.16.0" [[projects]] - digest = "1:09785a77f804b9b5524cfec6d6240ea0ce53251a38eb55abeb616bcfdd85de99" + digest = "1:938998e14bd5e42c54f3b640a41d869eb79029ad7c623fa47c604b8480c781fc" name = "github.com/lyft/flyteplugins" packages = ["go/tasks/v1/types"] pruneopts = "UT" - revision = "9156da396c7af5b34b4411c3ec99470864425b18" + revision = "301315810dbc9361567645c6961b97d5c968f2a3" source = "https://github.com/lyft/flyteplugins" - version = "v0.1.1" + version = "v0.1.10" [[projects]] - digest = "1:3dfb37d4f608c21e5f1d14de40b82d919b76c5044cc6daf38f94a98162e899c7" + digest = "1:af952e5d7b139f49e58826a68455b9b3b38f54e29f51563c890272a70777e5f0" name = "github.com/lyft/flytepropeller" packages = [ "pkg/apis/flyteworkflow", @@ -466,12 +464,12 @@ "pkg/utils", ] pruneopts = "UT" - revision = "40db32eaa4dc75293560e50c51c2120c9c41d4bb" + revision = "eb1504e4bd6307626a955af48db5bbd2589f0725" source = "https://github.com/lyft/flytepropeller" - version = "v0.1.0" + version = "v0.1.10" [[projects]] - digest = "1:3218b76036eebb079cc456504891ab7b5edace6bc8ce8473b507a5cfd7a6f81e" + digest = "1:85376f177ff6ee109741ae6e86882b78c56f9d1000befcec3e0540180f25d148" name = "github.com/lyft/flytestdlib" packages = [ "atomic", @@ -486,13 +484,14 @@ "profutils", "promutils", "promutils/labeled", + "random", "storage", "version", ] pruneopts = "UT" - revision = "7292f20ec17b42f104fd61d7f0120e17bcacf751" + revision = "2a121b0e3bc7fb85e36a5088e23ef59d401a78be" source = "https://github.com/lyft/flytestdlib" - version = "v0.2.16" + version = "v0.2.22" [[projects]] digest = "1:2a0da3440db3f2892609d99cd0389c2776a3fef24435f7b7b58bfc9030aa86ca" @@ -514,12 +513,12 @@ version = "v0.0.9" [[projects]] - digest = "1:36325ebb862e0382f2f14feef409ba9351271b89ada286ae56836c603d43b59c" + digest = "1:d62282425ffb75047679d7e2c3b980eea7f82c05ef5fb9142ee617ebac6e7432" name = "github.com/mattn/go-isatty" packages = ["."] pruneopts = "UT" - revision = "e1f7b56ace729e4a73a29a6b4fac6cd5fcda7ab3" - version = "v0.0.9" + revision = "88ba11cfdc67c7588b30042edf244b2875f892b6" + version = "v0.0.10" [[projects]] digest = "1:ff5ebae34cfbf047d505ee150de27e60570e8c394b3b8fdbb720ff6ac71985fc" @@ -562,12 +561,12 @@ revision = "a24ef33bc9b7e59ae4bed9e87a51d7bc76122731" [[projects]] - digest = "1:93131d8002d7025da13582877c32d1fc302486775a1b06f62241741006428c5e" + digest = "1:bbd3997f0121200f72b64d7a3826eb8a0b910d6a4c19894c9fe2852b9e5eaf3b" name = "github.com/pelletier/go-toml" packages = ["."] pruneopts = "UT" - revision = "728039f679cbcd4f6a54e080d2219a4c4928c546" - version = "v1.4.0" + revision = "8fe62057ea2d46ce44254c98e84e810044dbe197" + version = "v1.5.0" [[projects]] digest = "1:cf31692c14422fa27c83a05292eb5cbe0fb2775972e8f1f8446a71549bd8980b" @@ -606,7 +605,7 @@ revision = "14fe0d1b01d4d5fc031dd4bec1823bd3ebbe8016" [[projects]] - digest = "1:8dcedf2e8f06c7f94e48267dea0bc0be261fa97b377f3ae3e87843a92a549481" + digest = "1:f119e3205d3a1f0f19dbd7038eb37528e2c6f0933269dc344e305951fb87d632" name = "github.com/prometheus/common" packages = [ "expfmt", @@ -614,11 +613,11 @@ "model", ] pruneopts = "UT" - revision = "31bed53e4047fd6c510e43a941f90cb31be0972a" - version = "v0.6.0" + revision = "287d3e634a1e550c9e463dd7e5a75a422c614505" + version = "v0.7.0" [[projects]] - digest = "1:8232537905152d6a0b116b9af5a0868fcac0e84eb02ec5a150624c077bdedb0b" + digest = "1:a210815b437763623ecca8eb91e6a0bf4f2d6773c5a6c9aec0e28f19e5fd6deb" name = "github.com/prometheus/procfs" packages = [ ".", @@ -626,8 +625,8 @@ "internal/util", ] pruneopts = "UT" - revision = "00ec24a6a2d86e7074629c8384715dbb05adccd8" - version = "v0.0.4" + revision = "499c85531f756d1129edd26485a5f73871eeb308" + version = "v0.0.5" [[projects]] digest = "1:274f67cb6fed9588ea2521ecdac05a6d62a8c51c074c1fccc6a49a40ba80e925" @@ -681,12 +680,12 @@ version = "v1.1.0" [[projects]] - digest = "1:c1b1102241e7f645bc8e0c22ae352e8f0dc6484b6cb4d132fa9f24174e0119e2" + digest = "1:524b71991fc7d9246cc7dc2d9e0886ccb97648091c63e30eef619e6862c955dd" name = "github.com/spf13/pflag" packages = ["."] pruneopts = "UT" - revision = "298182f68c66c05229eb03ac171abe6e309ee79a" - version = "v1.0.3" + revision = "2e9d26c8c37aae03e3f9d4e90b7116f5accb7cab" + version = "v1.0.5" [[projects]] digest = "1:2532daa308722c7b65f4566e634dac2ddfaa0a398a17d8418e96ef2af3939e37" @@ -715,7 +714,7 @@ version = "v1.4.0" [[projects]] - digest = "1:74055050ea547bb04600be79cc501965cb3de8988018262f2ca430f0a0b48ec3" + digest = "1:b984f402fbabb0e9eb0476f0ecfa51d0b2ff11cd0ac03538d6284091033b39ae" name = "go.opencensus.io" packages = [ ".", @@ -736,8 +735,8 @@ "trace/tracestate", ] pruneopts = "UT" - revision = "9c377598961b706d1542bd2d84d538b5094d596e" - version = "v0.22.0" + revision = "59d1ce35d30f3c25ba762169da2a37eab6ffa041" + version = "v0.22.1" [[projects]] branch = "master" @@ -745,11 +744,11 @@ name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "UT" - revision = "9756ffdc24725223350eb3266ffb92590d28f278" + revision = "87dc89f01550277dc22b74ffcf4cd89fa2f40f4c" [[projects]] branch = "master" - digest = "1:e93fe09ca93cf16f8b2dc48053f56c2f91ed4f3fd16bfaf9596b6548c7b48a7f" + digest = "1:bf89144f888e7e621c7eaba03e68c5426b22fb008bf70d2ebf417beea541bb58" name = "golang.org/x/net" packages = [ "context", @@ -762,7 +761,7 @@ "trace", ] pruneopts = "UT" - revision = "ba9fcec4b297b415637633c5a6e8fa592e4a16c3" + revision = "491137f692577e390404f177a0515c9f86f79754" [[projects]] branch = "master" @@ -780,14 +779,14 @@ [[projects]] branch = "master" - digest = "1:db4d094dcdda93745779828d4f7536085eae66f9ebcba842bda762883db08800" + digest = "1:9f5be1dbb5091bb62d83fbb6db8e794adb8f0e05424679d6f15a17b670e38b4c" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "UT" - revision = "1e83adbbebd0f5dc971915fd7e5db032c3d2b731" + revision = "b09406accb4736d857a32bf9444cd7edae2ffa79" [[projects]] digest = "1:8d8faad6b12a3a4c819a3f9618cb6ee1fa1cfc33253abeeea8b55336721e3405" @@ -816,32 +815,32 @@ [[projects]] branch = "master" - digest = "1:9fdc2b55e8e0fafe4b41884091e51e77344f7dc511c5acedcfd98200003bff90" + digest = "1:cdd088b35bbf78713a6861a44a1bbe97e581861b6b8835c7f2211bbeca3671f6" name = "golang.org/x/time" packages = ["rate"] pruneopts = "UT" - revision = "9d24e82272b4f38b78bc8cff74fa936d31ccd8ef" + revision = "c4c64cad1fd0a1a8dab2523e04e61d35308e131e" [[projects]] branch = "master" - digest = "1:218feb07b42ba85b991b6f2decbc81e7fa6bec9d59cb0c617be40c65dd5edf22" + digest = "1:7f43c34476621e07944957f19514e61e3822acb1dd5cb8823e2717ccececf40a" name = "google.golang.org/api" packages = [ - "gensupport", "googleapi", "googleapi/internal/uritemplates", "googleapi/transport", "internal", + "internal/gensupport", "option", "storage/v1", "transport/http", "transport/http/internal/propagation", ] pruneopts = "UT" - revision = "d1c9f49851b5339dea6bf7e4076b60a66e62be1f" + revision = "5e0e3f4a3bb8f1c7d2bf1465670c5c0b3c45c277" [[projects]] - digest = "1:498b722d33dde4471e7d6e5d88a5e7132d2a8306fea5ff5ee82d1f418b4f41ed" + digest = "1:3c03b58f57452764a4499c55c582346c0ee78c8a5033affe5bdfd9efd3da5bd1" name = "google.golang.org/appengine" packages = [ ".", @@ -856,12 +855,12 @@ "urlfetch", ] pruneopts = "UT" - revision = "5f2a59506353b8d5ba8cbbcd9f3c1f41f1eaf079" - version = "v1.6.2" + revision = "971852bfffca25b069c31162ae8f247a3dba083b" + version = "v1.6.5" [[projects]] branch = "master" - digest = "1:1233ed1b527b0ff66c3df5879f7e80b1d8631e030cc45821b77fc25acd0d72a6" + digest = "1:802a0ee14db7898b2bf4a1bea346b8dcb68e900dcf89d501cc65339439ef60d4" name = "google.golang.org/genproto" packages = [ "googleapis/api/annotations", @@ -870,10 +869,10 @@ "protobuf/field_mask", ] pruneopts = "UT" - revision = "24fa4b261c55da65468f2abfdae2b024eef27dfb" + revision = "548a555dbc03994223efbaba0090152849259498" [[projects]] - digest = "1:3b97661db2e5d4c87f7345e875ea28f911e54c715ba0a74be08e1649d67e05cd" + digest = "1:6cd77d0b616d2dcebd363dfecba593f27b0151fc82cdb5fbfb96c5a7cfbc95b5" name = "google.golang.org/grpc" packages = [ ".", @@ -911,8 +910,8 @@ "tap", ] pruneopts = "UT" - revision = "6eaf6f47437a6b4e2153a190160ef39a92c7eceb" - version = "v1.23.0" + revision = "f6d0f9ee430895e87ef1ceb5ac8f39725bafceef" + version = "v1.24.0" [[projects]] digest = "1:1048ae210f190cd7b6aea19a92a055bd6112b025dd49f560579dfdfd76c8c42e" @@ -931,12 +930,12 @@ version = "v0.9.1" [[projects]] - digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96" + digest = "1:59f10c1537d2199d9115d946927fe31165959a95190849c82ff11e05803528b0" name = "gopkg.in/yaml.v2" packages = ["."] pruneopts = "UT" - revision = "51d6538a90f86fe93ac480b35f37b2be17fef232" - version = "v2.2.2" + revision = "f221b8435cfb71e54062f6c6e99e9ade30b124d5" + version = "v2.2.4" [[projects]] digest = "1:86ad5797d1189de342ed6988fbb76b92dc0429a4d677ad69888d6137efa5712e" @@ -1063,12 +1062,12 @@ version = "kubernetes-1.14.6" [[projects]] - digest = "1:ccb9be4c583b6ec848eb98aa395a4e8c8f8ad9ebb823642c0dd1c1c45939a5bb" + digest = "1:93e82f25d75aba18436ad1ac042cb49493f096011f2541075721ed6f9e05c044" name = "k8s.io/klog" packages = ["."] pruneopts = "UT" - revision = "3ca30a56d8a775276f9cdae009ba326fdc05af7f" - version = "v0.4.0" + revision = "2ca9ad30301bf30a8a6e0fa2110db6b8df699a91" + version = "v1.0.0" [[projects]] branch = "master" @@ -1076,7 +1075,7 @@ name = "k8s.io/utils" packages = ["integer"] pruneopts = "UT" - revision = "69764acb6e8e900b7c05296c5d3c9c19545475f9" + revision = "8d271d903fe4c290aa361acfb242cff7bcee96f1" [[projects]] digest = "1:19c282f372fb82ff1e3599e1ecfa13d18a7d836a401abf38c11f5124d697a656" @@ -1147,6 +1146,7 @@ "github.com/lyft/flytestdlib/profutils", "github.com/lyft/flytestdlib/promutils", "github.com/lyft/flytestdlib/promutils/labeled", + "github.com/lyft/flytestdlib/random", "github.com/lyft/flytestdlib/storage", "github.com/magiconair/properties/assert", "github.com/mitchellh/mapstructure", @@ -1165,7 +1165,6 @@ "k8s.io/apimachinery/pkg/api/resource", "k8s.io/apimachinery/pkg/apis/meta/v1", "k8s.io/apimachinery/pkg/runtime/schema", - "k8s.io/apimachinery/pkg/util/rand", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/client-go/kubernetes/scheme", diff --git a/Gopkg.toml b/Gopkg.toml index 45cd61f256..27aec45fe8 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -66,7 +66,7 @@ [[override]] name = "github.com/lyft/flytestdlib" source = "https://github.com/lyft/flytestdlib" - version = "^v0.2.12" + version = "^v0.2.22" [[constraint]] name = "github.com/magiconair/properties" diff --git a/cmd/entrypoints/clusterresource.go b/cmd/entrypoints/clusterresource.go index 1da4a6fae6..cc16961243 100644 --- a/cmd/entrypoints/clusterresource.go +++ b/cmd/entrypoints/clusterresource.go @@ -3,9 +3,8 @@ package entrypoints import ( "context" - "github.com/lyft/flyteadmin/pkg/executioncluster" - "github.com/lyft/flyteadmin/pkg/clusterresource" + executioncluster "github.com/lyft/flyteadmin/pkg/executioncluster/impl" "github.com/lyft/flyteadmin/pkg/runtime" @@ -57,7 +56,7 @@ var controllerRunCmd = &cobra.Command{ scope.NewSubScope("cluster"), cfg.KubeConfig, cfg.Master, - configuration.ClusterConfiguration()) + configuration) clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope) clusterResourceController.Run() @@ -89,7 +88,7 @@ var controllerSyncCmd = &cobra.Command{ scope.NewSubScope("cluster"), cfg.KubeConfig, cfg.Master, - configuration.ClusterConfiguration()) + configuration) clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope) err := clusterResourceController.Sync(ctx) diff --git a/pkg/clusterresource/controller.go b/pkg/clusterresource/controller.go index b3d5361dfc..d1ae876a89 100644 --- a/pkg/clusterresource/controller.go +++ b/pkg/clusterresource/controller.go @@ -11,9 +11,9 @@ import ( "strings" "time" - "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" - "github.com/lyft/flyteadmin/pkg/executioncluster" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/lyft/flyteadmin/pkg/common" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin" @@ -68,7 +68,7 @@ type NamespaceCache = map[NamespaceName]LastModTimeCache type controller struct { db repositories.RepositoryInterface config runtimeInterfaces.Configuration - executionCluster executioncluster.ClusterInterface + executionCluster interfaces.ClusterInterface poller chan struct{} metrics controllerMetrics lastAppliedTemplateDir string @@ -340,7 +340,7 @@ func newMetrics(scope promutils.Scope) controllerMetrics { } } -func NewClusterResourceController(db repositories.RepositoryInterface, executionCluster executioncluster.ClusterInterface, scope promutils.Scope) Controller { +func NewClusterResourceController(db repositories.RepositoryInterface, executionCluster interfaces.ClusterInterface, scope promutils.Scope) Controller { config := runtime.NewConfigurationProvider() return &controller{ db: db, diff --git a/pkg/executioncluster/execution_target.go b/pkg/executioncluster/execution_target.go index 1d3c59fe75..d594f19e12 100644 --- a/pkg/executioncluster/execution_target.go +++ b/pkg/executioncluster/execution_target.go @@ -1,17 +1,17 @@ package executioncluster import ( - "github.com/lyft/flyteadmin/pkg/flytek8s" - runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned" - "github.com/lyft/flytestdlib/promutils" - "k8s.io/client-go/rest" + "github.com/lyft/flytestdlib/random" + "sigs.k8s.io/controller-runtime/pkg/client" ) // Spec to determine the execution target type ExecutionTargetSpec struct { - TargetID string + TargetID string + ExecutionID *core.WorkflowExecutionIdentifier } // Client object of the target execution cluster @@ -22,35 +22,6 @@ type ExecutionTarget struct { Enabled bool } -func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) { - fc, err := flyteclient.NewForConfig(kubeConfiguration) - if err != nil { - scope.MustNewCounter( - "flyteclient_initialization_error", - "count of errors encountered initializing a flyte client from kube config").Inc() - return nil, err - } - return fc, nil -} - -// Creates a new Execution target for a cluster based on config passed in. -func NewExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*ExecutionTarget, error) { - kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster) - if err != nil { - return nil, err - } - flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf) - if err != nil { - return nil, err - } - client, err := client.New(kubeConf, client.Options{}) - if err != nil { - return nil, err - } - return &ExecutionTarget{ - FlyteClient: flyteClient, - Client: client, - ID: k8sCluster.Name, - Enabled: k8sCluster.Enabled, - }, nil +func (e ExecutionTarget) Compare(to random.Comparable) bool { + return e.ID < to.(ExecutionTarget).ID } diff --git a/pkg/executioncluster/factory.go b/pkg/executioncluster/factory.go deleted file mode 100644 index ec687f6f21..0000000000 --- a/pkg/executioncluster/factory.go +++ /dev/null @@ -1,40 +0,0 @@ -package executioncluster - -import ( - "github.com/lyft/flytestdlib/promutils" - - runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces" -) - -type ClusterInterface interface { - GetTarget(*ExecutionTargetSpec) (*ExecutionTarget, error) - GetAllValidTargets() []ExecutionTarget -} - -func GetEnabledClusters(clusterConfig runtime.ClusterConfiguration) []runtime.ClusterConfig { - enabledClusters := make([]runtime.ClusterConfig, 0) - for _, cluster := range clusterConfig.GetClusterConfigs() { - if cluster.Enabled { - enabledClusters = append(enabledClusters, cluster) - } - } - return enabledClusters -} - -func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, clusterConfig runtime.ClusterConfiguration) ClusterInterface { - enabledClusters := GetEnabledClusters(clusterConfig) - switch len(enabledClusters) { - case 0: - cluster, err := NewInCluster(scope, kubeConfig, master) - if err != nil { - panic(err) - } - return cluster - default: - cluster, err := NewRandomClusterSelector(scope, clusterConfig) - if err != nil { - panic(err) - } - return cluster - } -} diff --git a/pkg/executioncluster/impl/cluster_execution_target_provider.go b/pkg/executioncluster/impl/cluster_execution_target_provider.go new file mode 100644 index 0000000000..cc1154bea4 --- /dev/null +++ b/pkg/executioncluster/impl/cluster_execution_target_provider.go @@ -0,0 +1,46 @@ +package impl + +import ( + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/lyft/flyteadmin/pkg/flytek8s" + runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned" + "github.com/lyft/flytestdlib/promutils" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type clusterExecutionTargetProvider struct{} + +// Creates a new Execution target for a cluster based on config passed in. +func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) { + kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster) + if err != nil { + return nil, err + } + flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf) + if err != nil { + return nil, err + } + client, err := client.New(kubeConf, client.Options{}) + if err != nil { + return nil, err + } + return &executioncluster.ExecutionTarget{ + FlyteClient: flyteClient, + Client: client, + ID: k8sCluster.Name, + Enabled: k8sCluster.Enabled, + }, nil +} + +func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) { + fc, err := flyteclient.NewForConfig(kubeConfiguration) + if err != nil { + scope.MustNewCounter( + "flyteclient_initialization_error", + "count of errors encountered initializing a flyte client from kube config").Inc() + return nil, err + } + return fc, nil +} diff --git a/pkg/executioncluster/impl/factory.go b/pkg/executioncluster/impl/factory.go new file mode 100644 index 0000000000..4c85379d1c --- /dev/null +++ b/pkg/executioncluster/impl/factory.go @@ -0,0 +1,24 @@ +package impl + +import ( + executioncluster_interface "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flytestdlib/promutils" +) + +func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration) executioncluster_interface.ClusterInterface { + switch len(config.ClusterConfiguration().GetClusterConfigs()) { + case 0: + cluster, err := NewInCluster(scope, kubeConfig, master) + if err != nil { + panic(err) + } + return cluster + default: + cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, config.ApplicationConfiguration().GetDomainsConfig()) + if err != nil { + panic(err) + } + return cluster + } +} diff --git a/pkg/executioncluster/in_cluster.go b/pkg/executioncluster/impl/in_cluster.go similarity index 63% rename from pkg/executioncluster/in_cluster.go rename to pkg/executioncluster/impl/in_cluster.go index 617e38a1db..05a28713d5 100644 --- a/pkg/executioncluster/in_cluster.go +++ b/pkg/executioncluster/impl/in_cluster.go @@ -1,8 +1,10 @@ -package executioncluster +package impl import ( "fmt" + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" "github.com/lyft/flyteadmin/pkg/flytek8s" "github.com/lyft/flytestdlib/promutils" "github.com/pkg/errors" @@ -10,23 +12,23 @@ import ( ) type InCluster struct { - target ExecutionTarget + target executioncluster.ExecutionTarget } -func (i InCluster) GetTarget(spec *ExecutionTargetSpec) (*ExecutionTarget, error) { +func (i InCluster) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { if spec != nil && spec.TargetID != "" { return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID)) } return &i.target, nil } -func (i InCluster) GetAllValidTargets() []ExecutionTarget { - return []ExecutionTarget{ +func (i InCluster) GetAllValidTargets() []executioncluster.ExecutionTarget { + return []executioncluster.ExecutionTarget{ i.target, } } -func NewInCluster(scope promutils.Scope, kubeConfig, master string) (ClusterInterface, error) { +func NewInCluster(scope promutils.Scope, kubeConfig, master string) (interfaces.ClusterInterface, error) { clientConfig, err := flytek8s.GetRestClientConfig(kubeConfig, master, nil) if err != nil { return nil, err @@ -40,7 +42,7 @@ func NewInCluster(scope promutils.Scope, kubeConfig, master string) (ClusterInte return nil, err } return &InCluster{ - target: ExecutionTarget{ + target: executioncluster.ExecutionTarget{ Client: client, FlyteClient: flyteClient, }, diff --git a/pkg/executioncluster/in_cluster_test.go b/pkg/executioncluster/impl/in_cluster_test.go similarity index 68% rename from pkg/executioncluster/in_cluster_test.go rename to pkg/executioncluster/impl/in_cluster_test.go index 08ce08fa59..414fab1e00 100644 --- a/pkg/executioncluster/in_cluster_test.go +++ b/pkg/executioncluster/impl/in_cluster_test.go @@ -1,14 +1,16 @@ -package executioncluster +package impl import ( "testing" + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/stretchr/testify/assert" ) func TestInClusterGetTarget(t *testing.T) { cluster := InCluster{ - target: ExecutionTarget{ + target: executioncluster.ExecutionTarget{ ID: "t1", }, } @@ -19,15 +21,15 @@ func TestInClusterGetTarget(t *testing.T) { func TestInClusterGetRemoteTarget(t *testing.T) { cluster := InCluster{ - target: ExecutionTarget{}, + target: executioncluster.ExecutionTarget{}, } - _, err := cluster.GetTarget(&ExecutionTargetSpec{TargetID: "t1"}) + _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "t1"}) assert.EqualError(t, err, "remote target t1 is not supported") } func TestInClusterGetAllValidTargets(t *testing.T) { cluster := InCluster{ - target: ExecutionTarget{ + target: executioncluster.ExecutionTarget{ ID: "t1", }, } diff --git a/pkg/executioncluster/impl/random_cluster_selector.go b/pkg/executioncluster/impl/random_cluster_selector.go new file mode 100644 index 0000000000..0450fa060b --- /dev/null +++ b/pkg/executioncluster/impl/random_cluster_selector.go @@ -0,0 +1,173 @@ +package impl + +import ( + "context" + "fmt" + "hash/fnv" + "math/rand" + + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + + "github.com/lyft/flytestdlib/random" + + runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flytestdlib/promutils" +) + +// Implementation of Random cluster selector +// Selects cluster based on weights and domains. +type RandomClusterSelector struct { + domainWeightedRandomMap map[string]random.WeightedRandomList + executionTargetMap map[string]executioncluster.ExecutionTarget +} + +func getRandSource(seed string) (rand.Source, error) { + h := fnv.New64a() + _, err := h.Write([]byte(seed)) + if err != nil { + return nil, err + } + hashedSeed := int64(h.Sum64()) + return rand.NewSource(hashedSeed), nil +} + +func getValidDomainMap(validDomains runtime.DomainsConfig) map[string]runtime.Domain { + domainMap := make(map[string]runtime.Domain) + for _, domain := range validDomains { + domainMap[domain.ID] = domain + } + return domainMap +} + +func getExecutionTargetMap(scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, clusterConfig runtime.ClusterConfiguration) (map[string]executioncluster.ExecutionTarget, error) { + executionTargetMap := make(map[string]executioncluster.ExecutionTarget) + for _, cluster := range clusterConfig.GetClusterConfigs() { + if _, ok := executionTargetMap[cluster.Name]; ok { + return nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name) + } + executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster) + if err != nil { + return nil, err + } + executionTargetMap[cluster.Name] = *executionTarget + } + return executionTargetMap, nil +} + +func getDomainsForCluster(cluster runtime.ClusterConfig, domainMap map[string]runtime.Domain) ([]string, error) { + if len(cluster.AllowedDomains) == 0 { + allDomains := make([]string, len(domainMap)) + index := 0 + for id := range domainMap { + allDomains[index] = id + index++ + } + return allDomains, nil + } + for _, allowedDomain := range cluster.AllowedDomains { + if _, ok := domainMap[allowedDomain]; !ok { + return nil, fmt.Errorf("invalid domain %s", allowedDomain) + } + } + return cluster.AllowedDomains, nil +} + +func getDomainWeightedRandomForCluster(ctx context.Context, scope promutils.Scope, executionTargetProvider interfaces.ExecutionTargetProvider, + clusterConfig runtime.ClusterConfiguration, + domainMap map[string]runtime.Domain) (map[string]random.WeightedRandomList, error) { + domainEntriesMap := make(map[string][]random.Entry) + for _, cluster := range clusterConfig.GetClusterConfigs() { + // If cluster is not enabled, it is not eligible for selection + if !cluster.Enabled { + continue + } + executionTarget, err := executionTargetProvider.GetExecutionTarget(scope, cluster) + if err != nil { + return nil, err + } + targetEntry := random.Entry{ + Item: *executionTarget, + Weight: cluster.Weight, + } + clusterDomains, err := getDomainsForCluster(cluster, domainMap) + if err != nil { + return nil, err + } + for _, domain := range clusterDomains { + if _, ok := domainEntriesMap[domain]; ok { + domainEntriesMap[domain] = append(domainEntriesMap[domain], targetEntry) + } else { + domainEntriesMap[domain] = []random.Entry{targetEntry} + } + } + } + domainWeightedRandomMap := make(map[string]random.WeightedRandomList) + for domain, entries := range domainEntriesMap { + weightedRandomList, err := random.NewWeightedRandom(ctx, entries) + if err != nil { + return nil, err + } + domainWeightedRandomMap[domain] = weightedRandomList + } + return domainWeightedRandomMap, nil +} + +func (s RandomClusterSelector) GetAllValidTargets() []executioncluster.ExecutionTarget { + v := make([]executioncluster.ExecutionTarget, 0) + for _, value := range s.executionTargetMap { + if value.Enabled { + v = append(v, value) + } + } + return v +} + +func (s RandomClusterSelector) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) { + if spec == nil || (spec.TargetID == "" && spec.ExecutionID == nil) { + return nil, fmt.Errorf("invalid executionTargetSpec %v", spec) + } + if spec.TargetID != "" { + if val, ok := s.executionTargetMap[spec.TargetID]; ok { + return &val, nil + } + return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID) + } + if spec.ExecutionID != nil { + if weightedRandomList, ok := s.domainWeightedRandomMap[spec.ExecutionID.GetDomain()]; ok { + executionName := spec.ExecutionID.GetName() + if executionName != "" { + randSrc, err := getRandSource(executionName) + if err != nil { + return nil, err + } + result, err := weightedRandomList.GetWithSeed(randSrc) + if err != nil { + return nil, err + } + execTarget := result.(executioncluster.ExecutionTarget) + return &execTarget, nil + } + execTarget := weightedRandomList.Get().(executioncluster.ExecutionTarget) + return &execTarget, nil + } + } + return nil, fmt.Errorf("invalid executionTargetSpec %v", *spec) + +} + +func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration, executionTargetProvider interfaces.ExecutionTargetProvider, domainConfig *runtime.DomainsConfig) (interfaces.ClusterInterface, error) { + executionTargetMap, err := getExecutionTargetMap(scope, executionTargetProvider, clusterConfig) + if err != nil { + return nil, err + } + domainMap := getValidDomainMap(*domainConfig) + domainWeightedRandomMap, err := getDomainWeightedRandomForCluster(context.Background(), scope, executionTargetProvider, clusterConfig, domainMap) + if err != nil { + return nil, err + } + return &RandomClusterSelector{ + domainWeightedRandomMap: domainWeightedRandomMap, + executionTargetMap: executionTargetMap, + }, nil +} diff --git a/pkg/executioncluster/impl/random_cluster_selector_test.go b/pkg/executioncluster/impl/random_cluster_selector_test.go new file mode 100644 index 0000000000..d655da9209 --- /dev/null +++ b/pkg/executioncluster/impl/random_cluster_selector_test.go @@ -0,0 +1,121 @@ +package impl + +import ( + "context" + "go/build" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/lyft/flyteadmin/pkg/executioncluster" + interfaces2 "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/executioncluster/mocks" + "github.com/lyft/flyteadmin/pkg/runtime" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/config" + "github.com/lyft/flytestdlib/config/viper" + "github.com/lyft/flytestdlib/promutils" + + "github.com/stretchr/testify/assert" +) + +var defaultDomains = []interfaces.Domain{{ID: "d1", Name: "d1"}, {ID: "d2", Name: "d2"}, {ID: "d3", Name: "domain3"}} + +func initTestConfig(fileName string) error { + var searchPaths []string + for _, goPath := range strings.Split(build.Default.GOPATH, string(os.PathListSeparator)) { + searchPaths = append(searchPaths, filepath.Join(goPath, "src/github.com/lyft/flyteadmin/pkg/executioncluster/testdata/", fileName)) + } + + configAccessor := viper.NewAccessor(config.Options{ + SearchPaths: searchPaths, + StrictMode: false, + }) + return configAccessor.UpdateConfig(context.Background()) +} + +func getRandomClusterSelectorForTest(t *testing.T, domainsConfig interfaces.DomainsConfig) interfaces2.ClusterInterface { + var clusterScope promutils.Scope + err := initTestConfig("clusters_config.yaml") + assert.NoError(t, err) + + configProvider := runtime.NewConfigurationProvider() + randomCluster, err := NewRandomClusterSelector(clusterScope, configProvider.ClusterConfiguration(), &mocks.MockExecutionTargetProvider{}, &domainsConfig) + assert.NoError(t, err) + return randomCluster +} + +func TestRandomClusterSelectorGetTarget(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "testcluster"}) + assert.Nil(t, err) + assert.Equal(t, "testcluster", target.ID) + assert.False(t, target.Enabled) + target, err = cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "testcluster2"}) + assert.Nil(t, err) + assert.Equal(t, "testcluster2", target.ID) + assert.True(t, target.Enabled) +} + +func TestRandomClusterSelectorGetTargetForDomain(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ + Domain: "d1", + }}) + assert.Nil(t, err) + assert.Equal(t, "testcluster2", target.ID) + assert.True(t, target.Enabled) +} + +func TestRandomClusterSelectorGetTargetForDomainAndExecution(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ + Domain: "d2", + Name: "exec", + }}) + assert.Nil(t, err) + assert.Equal(t, "testcluster3", target.ID) + assert.True(t, target.Enabled) +} + +func TestRandomClusterSelectorGetTargetForDomainAndExecution2(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + target, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ + Domain: "d2", + Name: "exec2", + }}) + assert.Nil(t, err) + assert.Equal(t, "testcluster2", target.ID) + assert.True(t, target.Enabled) +} + +func TestRandomClusterSelectorGetTargetForInvalidDomain(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{ExecutionID: &core.WorkflowExecutionIdentifier{ + Domain: "d4", + Name: "exec", + }}) + assert.EqualError(t, err, "invalid executionTargetSpec { domain:\"d4\" name:\"exec\" }") +} + +func TestRandomClusterSelectorGetRandomTarget(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + _, err := cluster.GetTarget(nil) + assert.NotNil(t, err) + assert.EqualError(t, err, "invalid executionTargetSpec ") +} + +func TestRandomClusterSelectorGetRemoteTarget(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + _, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "cluster-3"}) + assert.NotNil(t, err) + assert.EqualError(t, err, "invalid cluster target cluster-3") +} + +func TestRandomClusterSelectorGetAllValidTargets(t *testing.T) { + cluster := getRandomClusterSelectorForTest(t, defaultDomains) + targets := cluster.GetAllValidTargets() + assert.Equal(t, 2, len(targets)) +} diff --git a/pkg/executioncluster/interfaces/cluster.go b/pkg/executioncluster/interfaces/cluster.go new file mode 100644 index 0000000000..1b09767c9f --- /dev/null +++ b/pkg/executioncluster/interfaces/cluster.go @@ -0,0 +1,11 @@ +package interfaces + +import ( + "github.com/lyft/flyteadmin/pkg/executioncluster" +) + +// Interface for the Execution Cluster +type ClusterInterface interface { + GetTarget(*executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) + GetAllValidTargets() []executioncluster.ExecutionTarget +} diff --git a/pkg/executioncluster/interfaces/execution_target_provider.go b/pkg/executioncluster/interfaces/execution_target_provider.go new file mode 100644 index 0000000000..e56d9cf9ca --- /dev/null +++ b/pkg/executioncluster/interfaces/execution_target_provider.go @@ -0,0 +1,11 @@ +package interfaces + +import ( + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flytestdlib/promutils" +) + +type ExecutionTargetProvider interface { + GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) +} diff --git a/pkg/executioncluster/mocks/execution_target_provider.go b/pkg/executioncluster/mocks/execution_target_provider.go new file mode 100644 index 0000000000..ee83ffa41d --- /dev/null +++ b/pkg/executioncluster/mocks/execution_target_provider.go @@ -0,0 +1,17 @@ +package mocks + +import ( + "github.com/lyft/flyteadmin/pkg/executioncluster" + "github.com/lyft/flyteadmin/pkg/runtime/interfaces" + "github.com/lyft/flytestdlib/promutils" +) + +type MockExecutionTargetProvider struct{} + +// Creates a new Execution target for a cluster based on config passed in. +func (c *MockExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster interfaces.ClusterConfig) (*executioncluster.ExecutionTarget, error) { + return &executioncluster.ExecutionTarget{ + ID: k8sCluster.Name, + Enabled: k8sCluster.Enabled, + }, nil +} diff --git a/pkg/executioncluster/random_cluster_selector.go b/pkg/executioncluster/random_cluster_selector.go deleted file mode 100644 index 6b5f7db018..0000000000 --- a/pkg/executioncluster/random_cluster_selector.go +++ /dev/null @@ -1,71 +0,0 @@ -package executioncluster - -import ( - "fmt" - - runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces" - "github.com/lyft/flytestdlib/promutils" - "k8s.io/apimachinery/pkg/util/rand" -) - -type RandomClusterSelector struct { - executionTargetMap map[string]ExecutionTarget - totalEnabledClusterCount int -} - -func getExecutionTargetMap(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration) (map[string]ExecutionTarget, error) { - executionTargetMap := make(map[string]ExecutionTarget) - for _, cluster := range clusterConfig.GetClusterConfigs() { - if _, ok := executionTargetMap[cluster.Name]; ok { - return nil, fmt.Errorf("duplicate clusters for name %s", cluster.Name) - } - executionTarget, err := NewExecutionTarget(scope, cluster) - if err != nil { - return nil, err - } - executionTargetMap[cluster.Name] = *executionTarget - } - return executionTargetMap, nil -} - -func (s RandomClusterSelector) GetAllValidTargets() []ExecutionTarget { - v := make([]ExecutionTarget, 0, len(s.executionTargetMap)) - for _, value := range s.executionTargetMap { - if value.Enabled { - v = append(v, value) - } - } - return v -} - -func (s RandomClusterSelector) GetTarget(spec *ExecutionTargetSpec) (*ExecutionTarget, error) { - if spec != nil && spec.TargetID != "" { - if val, ok := s.executionTargetMap[spec.TargetID]; ok { - return &val, nil - } - return nil, fmt.Errorf("invalid cluster target %s", spec.TargetID) - } - targetIdx := rand.Intn(s.totalEnabledClusterCount) - index := 0 - for _, val := range s.executionTargetMap { - if val.Enabled { - if index == targetIdx { - return &val, nil - } - index++ - } - } - return nil, nil -} - -func NewRandomClusterSelector(scope promutils.Scope, clusterConfig runtime.ClusterConfiguration) (ClusterInterface, error) { - executionTargetMap, err := getExecutionTargetMap(scope, clusterConfig) - if err != nil { - return nil, err - } - enabledClusters := GetEnabledClusters(clusterConfig) - return &RandomClusterSelector{ - executionTargetMap: executionTargetMap, - totalEnabledClusterCount: len(enabledClusters), - }, nil -} diff --git a/pkg/executioncluster/random_cluster_selector_test.go b/pkg/executioncluster/random_cluster_selector_test.go deleted file mode 100644 index 6a12ca4389..0000000000 --- a/pkg/executioncluster/random_cluster_selector_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package executioncluster - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func getRandomClusterSelectorForTest() RandomClusterSelector { - return RandomClusterSelector{ - executionTargetMap: map[string]ExecutionTarget{ - "cluster-1": { - ID: "t1", - Enabled: true, - }, - "cluster-2": { - ID: "t2", - Enabled: true, - }, - "cluster-disabled": { - ID: "t4", - }, - }, - totalEnabledClusterCount: 2, - } -} - -func TestRandomClusterSelectorGetTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest() - target, err := cluster.GetTarget(&ExecutionTargetSpec{TargetID: "cluster-1"}) - assert.Nil(t, err) - assert.Equal(t, "t1", target.ID) - assert.True(t, target.Enabled) - target, err = cluster.GetTarget(&ExecutionTargetSpec{TargetID: "cluster-disabled"}) - assert.Nil(t, err) - assert.Equal(t, "t4", target.ID) - assert.False(t, target.Enabled) -} - -func TestRandomClusterSelectorGetRandomTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest() - target, err := cluster.GetTarget(nil) - assert.Nil(t, err) - assert.NotNil(t, target) - assert.NotEmpty(t, target.ID) -} - -func TestRandomClusterSelectorGetRemoteTarget(t *testing.T) { - cluster := getRandomClusterSelectorForTest() - _, err := cluster.GetTarget(&ExecutionTargetSpec{TargetID: "cluster-3"}) - assert.NotNil(t, err) - assert.EqualError(t, err, "invalid cluster target cluster-3") -} - -func TestRandomClusterSelectorGetAllValidTargets(t *testing.T) { - cluster := getRandomClusterSelectorForTest() - targets := cluster.GetAllValidTargets() - assert.Equal(t, 2, len(targets)) -} diff --git a/pkg/executioncluster/testdata/clusters_config.yaml b/pkg/executioncluster/testdata/clusters_config.yaml new file mode 100644 index 0000000000..b17d94a3f6 --- /dev/null +++ b/pkg/executioncluster/testdata/clusters_config.yaml @@ -0,0 +1,30 @@ +clusters: + clusterConfigs: + - name: "testcluster" + endpoint: "testcluster_endpoint" + auth: + type: "file_path" + tokenPath: "/path/to/testcluster/token" + certPath: "/path/to/testcluster/cert" + - name: "testcluster2" + endpoint: "testcluster2_endpoint" + weight: 0.5 + enabled: true + allowedDomains: + - "d1" + - "d2" + auth: + type: "file_path" + tokenPath: "/path/to/testcluster2/token" + certPath: "/path/to/testcluster2/cert" + - name: "testcluster3" + endpoint: "testcluster3_endpoint" + enabled: true + weight: 0.5 + allowedDomains: + - "d2" + - "d3" + auth: + type: "file_path" + tokenPath: "/path/to/testcluster3/token" + certPath: "/path/to/testcluster3/cert" diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 0ff2311d60..dc36a13d28 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -5,7 +5,7 @@ import ( "fmt" "runtime/debug" - "github.com/lyft/flyteadmin/pkg/executioncluster" + executionCluster "github.com/lyft/flyteadmin/pkg/executioncluster/impl" "github.com/lyft/flytestdlib/profutils" @@ -77,11 +77,11 @@ func NewAdminServer(kubeConfig, master string) *AdminService { db := repositories.GetRepository( repositories.POSTGRES, dbConfig, adminScope.NewSubScope("database")) storeConfig := storage.GetConfig() - executionCluster := executioncluster.GetExecutionCluster( + executionCluster := executionCluster.GetExecutionCluster( adminScope.NewSubScope("executor").NewSubScope("cluster"), kubeConfig, master, - configuration.ClusterConfiguration()) + configuration) workflowExecutor := workflowengine.NewFlytePropeller( applicationConfiguration.RoleNameKey, executionCluster, diff --git a/pkg/runtime/interfaces/cluster_configuration.go b/pkg/runtime/interfaces/cluster_configuration.go index b67c31c690..a28ef1c736 100644 --- a/pkg/runtime/interfaces/cluster_configuration.go +++ b/pkg/runtime/interfaces/cluster_configuration.go @@ -8,10 +8,12 @@ import ( // Holds details about a cluster used for workflow execution. type ClusterConfig struct { - Name string `json:"name"` - Endpoint string `json:"endpoint"` - Auth Auth `json:"auth"` - Enabled bool `json:"enabled"` + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Auth Auth `json:"auth"` + Enabled bool `json:"enabled"` + Weight float32 `json:"weight"` + AllowedDomains []string `json:"allowedDomains"` } type Auth struct { diff --git a/pkg/workflowengine/impl/propeller_executor.go b/pkg/workflowengine/impl/propeller_executor.go index 22ed3bc7ed..83c7364253 100644 --- a/pkg/workflowengine/impl/propeller_executor.go +++ b/pkg/workflowengine/impl/propeller_executor.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + interfaces2 "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/executioncluster" "github.com/lyft/flyteadmin/pkg/workflowengine/interfaces" @@ -38,7 +40,7 @@ type propellerMetrics struct { } type FlytePropeller struct { - executionCluster executioncluster.ClusterInterface + executionCluster interfaces2.ClusterInterface builder interfaces.FlyteWorkflowInterface roleNameKey string metrics propellerMetrics @@ -115,7 +117,9 @@ func (c *FlytePropeller) ExecuteWorkflow(ctx context.Context, input interfaces.E annotations := addMapValues(input.Annotations, flyteWf.Annotations) flyteWf.Annotations = annotations - var executionTargetSpec executioncluster.ExecutionTargetSpec + executionTargetSpec := executioncluster.ExecutionTargetSpec{ + ExecutionID: input.ExecutionID, + } targetCluster, err := c.executionCluster.GetTarget(&executionTargetSpec) if err != nil { return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create workflow in propeller %v", err) @@ -181,7 +185,7 @@ func newPropellerMetrics(scope promutils.Scope) propellerMetrics { } } -func NewFlytePropeller(roleNameKey string, executionCluster executioncluster.ClusterInterface, +func NewFlytePropeller(roleNameKey string, executionCluster interfaces2.ClusterInterface, scope promutils.Scope) interfaces.Executor { return &FlytePropeller{ diff --git a/pkg/workflowengine/impl/propeller_executor_test.go b/pkg/workflowengine/impl/propeller_executor_test.go index e566bde292..cc2c065e7b 100644 --- a/pkg/workflowengine/impl/propeller_executor_test.go +++ b/pkg/workflowengine/impl/propeller_executor_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + interfaces2 "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces" + "github.com/lyft/flyteadmin/pkg/executioncluster" cluster_mock "github.com/lyft/flyteadmin/pkg/executioncluster/mocks" @@ -37,7 +39,7 @@ var clusterName = "C1" var acceptedAt = time.Now() -func getFlytePropellerForTest(execCluster executioncluster.ClusterInterface, builder *FlyteWorkflowBuilderTest) *FlytePropeller { +func getFlytePropellerForTest(execCluster interfaces2.ClusterInterface, builder *FlyteWorkflowBuilderTest) *FlytePropeller { return &FlytePropeller{ executionCluster: execCluster, builder: builder, @@ -107,7 +109,7 @@ func (b *FakeK8FlyteClient) FlyteworkflowV1alpha1() v1alpha12.FlyteworkflowV1alp return &fakeFlyteWF } -func getFakeExecutionCluster() executioncluster.ClusterInterface { +func getFakeExecutionCluster() interfaces2.ClusterInterface { fakeCluster := cluster_mock.MockCluster{} fakeCluster.SetGetTargetCallback(func(spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { return &executioncluster.ExecutionTarget{ @@ -119,7 +121,19 @@ func getFakeExecutionCluster() executioncluster.ClusterInterface { } func TestExecuteWorkflowHappyCase(t *testing.T) { - cluster := getFakeExecutionCluster() + cluster := cluster_mock.MockCluster{} + execID := core.WorkflowExecutionIdentifier{ + Project: "p", + Domain: "d", + Name: "n", + } + cluster.SetGetTargetCallback(func(spec *executioncluster.ExecutionTargetSpec) (target *executioncluster.ExecutionTarget, e error) { + assert.Equal(t, execID, *spec.ExecutionID) + return &executioncluster.ExecutionTarget{ + ID: "C1", + FlyteClient: &FakeK8FlyteClient{}, + }, nil + }) fakeFlyteWorkflow := FakeFlyteWorkflow{ createCallback: func(workflow *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { assert.EqualValues(t, map[string]string{ @@ -137,16 +151,12 @@ func TestExecuteWorkflowHappyCase(t *testing.T) { assert.Equal(t, "p-d", namespace) return &fakeFlyteWorkflow } - propeller := getFlytePropellerForTest(cluster, &FlyteWorkflowBuilderTest{}) + propeller := getFlytePropellerForTest(&cluster, &FlyteWorkflowBuilderTest{}) execInfo, err := propeller.ExecuteWorkflow( context.Background(), interfaces.ExecuteWorkflowInput{ - ExecutionID: &core.WorkflowExecutionIdentifier{ - Project: "p", - Domain: "d", - Name: "n", - }, + ExecutionID: &execID, WfClosure: core.CompiledWorkflowClosure{ Primary: &core.CompiledWorkflow{ Template: &core.WorkflowTemplate{},