From df1c9a05a00d927b69d2fbf885f32e0b4f228359 Mon Sep 17 00:00:00 2001 From: aqan213 <55431633+aqan213@users.noreply.github.com> Date: Wed, 15 Apr 2020 18:17:50 +0800 Subject: [PATCH] merge develop branch to master (#556) * upgrade prometheus client_golang to v1.2.1 * Change all the sql statements and import sqlx libary (#549) * MetricsCollector: Only process HTTPStartStopEvents with peerType client (#541) * MetricsCollector: Only process HTTPStartStopEvents with peerType client ### Description: Only process `HTTPStartStop` events with `peerType` `Client`, as they contain the app instance index in the `instanceIndex` field. Check for a non-deprecated tag `peer_type` before checking deprecated tags, and accept any envelope if `peer_type` is not present in either #### Problems fixed: `HTTPStartStop` events with `peerType` `Server` would be counted against instance `0` and thus duplicate the number of requests counted by the App Autoscaler. * bug fix: Schedules can not be removed in synchronization when an application is deleted directly #553 (#554) * bug fix: Schedules can not be removed in synchronization when an application is deleted directly #553 * modify log * Revert "upgrade prometheus client_golang to v1.2.1" (#557) * Revert "Merge pull request #552 from qibobo/dev_qy_upgrade_prometheusgo" This reverts commit 12d173b43762eb52ccc43c4a5bd08312b89cdd9d, reversing changes made to bc8e77c49cec993f616aea6199af9d29328078c6. * Revert "upgrade prometheus client_golang to v1.2.1" This reverts commit 994eab534be621a988946c13939f5aceafe19595. * golangapiserver: Actually test detach policy endpoint (#560) * metricsgateway ut panics because of testhelpers/websocket_handler #558 (#559) * metricsgateway ut panics because of testhelpers/websocket_handler #558 * init con first * add mysql support for golang components (#555) * add mysql support for golang components * fix typo e to err * update db changelog file with mysql * fix .travis.yml and add comment to Connection function * fix integration test * fix err in integration test * remove dbms from scalingengine.db.changelog.yml * refine the helper.go and related test cases * Format code and fix appmetrics empty error * add transaction rollback Co-authored-by: qibobo Co-authored-by: ying Co-authored-by: Silvestre Zabala Co-authored-by: qibobo --- .gitmodules | 6 + .travis.yml | 15 +- scheduler/db/databasechangelog.yaml | 11 ++ .../scheduler/service/ScheduleManager.java | 14 +- .../service/ScheduleManagerTest.java | 7 +- src/autoscaler/api/cmd/api/api_suite_test.go | 5 +- .../publicapiserver/public_api_server_test.go | 8 +- src/autoscaler/db/db.go | 1 + src/autoscaler/db/db_suite_test.go | 13 ++ src/autoscaler/db/helper.go | 148 ++++++++++++++++++ src/autoscaler/db/helper_test.go | 87 ++++++++++ src/autoscaler/db/sqldb/appmetric_sqldb.go | 95 ++++++++--- .../db/sqldb/appmetric_sqldb_test.go | 28 +++- src/autoscaler/db/sqldb/binding_sqldb.go | 37 +++-- src/autoscaler/db/sqldb/binding_sqldb_test.go | 18 ++- .../db/sqldb/instancemetrics_sqldb.go | 119 +++++++++----- .../db/sqldb/instancemetrics_sqldb_test.go | 40 +++-- src/autoscaler/db/sqldb/lock_sqldb.go | 50 ++++-- src/autoscaler/db/sqldb/lock_sqldb_test.go | 17 +- src/autoscaler/db/sqldb/policy_sqldb.go | 40 +++-- src/autoscaler/db/sqldb/policy_sqldb_test.go | 24 ++- .../db/sqldb/scalingengine_sqldb.go | 45 +++--- .../db/sqldb/scalingengine_sqldb_test.go | 16 +- src/autoscaler/db/sqldb/scheduler_sqldb.go | 7 +- .../db/sqldb/scheduler_sqldb_test.go | 16 +- src/autoscaler/db/sqldb/sqldb_suite_test.go | 65 +++++--- .../eventgenerator_suite_test.go | 11 +- .../db/dataaggregator.db.changelog.yml | 60 ++++++- .../metricscollector_suite_test.go | 10 +- .../collector/app_streamer.go | 2 +- .../db/metricscollector.db.changelog.yml | 69 +++++++- .../metricsforwarder_suite_test.go | 12 +- .../metricsgateway_suite_test.go | 18 ++- .../metricsgateway/helpers/ws_helper_test.go | 4 + src/autoscaler/metricsgateway/nozzle.go | 7 +- src/autoscaler/metricsgateway/nozzle_test.go | 27 ++++ .../metricsserver/metricsserver_suite_test.go | 10 +- .../cmd/operator/operator_suite_test.go | 6 +- .../scalingengine/scalingengine_suite_test.go | 18 ++- .../cmd/scalingengine/scalingengine_test.go | 2 +- .../db/scalingengine.db.changelog.yml | 121 +++++++++++++- src/autoscaler/scalingengine/scalingengine.go | 12 +- .../scalingengine/scalingengine_test.go | 8 +- .../scalingengine/server/scaling_handler.go | 17 +- .../server/scaling_handler_test.go | 23 +-- .../scalingengine/server/server_test.go | 4 +- src/autoscaler/sync/sync_suite_test.go | 8 +- .../testhelpers/websocket_handler.go | 5 +- src/github.com/go-sql-driver/mysql | 1 + src/github.com/jmoiron/sqlx | 1 + src/integration/components.go | 48 ++++-- ...tegration_golangapi_eventgenerator_test.go | 15 +- ...ntegration_golangapi_metricsserver_test.go | 2 +- ...ntegration_golangapi_scalingengine_test.go | 2 +- src/integration/integration_suite_test.go | 45 ++++-- 55 files changed, 1172 insertions(+), 328 deletions(-) create mode 100644 scheduler/db/databasechangelog.yaml create mode 100644 src/autoscaler/db/db_suite_test.go create mode 100644 src/autoscaler/db/helper.go create mode 100644 src/autoscaler/db/helper_test.go create mode 160000 src/github.com/go-sql-driver/mysql create mode 160000 src/github.com/jmoiron/sqlx diff --git a/.gitmodules b/.gitmodules index 8f76e67d6..f4515068c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -148,3 +148,9 @@ [submodule "src/github.com/juju/ratelimit"] path = src/github.com/juju/ratelimit url = https://github.com/juju/ratelimit +[submodule "src/github.com/jmoiron/sqlx"] + path = src/github.com/jmoiron/sqlx + url = https://github.com/jmoiron/sqlx +[submodule "src/github.com/go-sql-driver/mysql"] + path = src/github.com/go-sql-driver/mysql + url = https://github.com/go-sql-driver/mysql \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 228ea3761..afe0aa569 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ dist: xenial env: global: - - DBURL=postgres://postgres@localhost/autoscaler?sslmode=disable - NODE_VERSION=6.2 - GO_VERSION=1.13.3 - LOGLEVEL=info @@ -41,21 +40,32 @@ before_script: - sudo dpkg -i mysql-apt-config_0.8.14-1_all.deb - sudo apt-get update -q - sudo apt-get install -q -y --allow-unauthenticated -o Dpkg::Options::=--force-confnew mysql-server + - echo -e "[mysqld]\nsql_mode=NO_ENGINE_SUBSTITUTION,STRICT_ALL_TABLES\n[server]\ninnodb_log_file_size=256MB\ninnodb_buffer_pool_size=512MB\nmax_allowed_packet=32MB" | sudo tee -a /etc/mysql/my.cnf - sudo systemctl restart mysql - sudo mysql_upgrade - mysql --version - mysql -u root -e "CREATE DATABASE autoscaler;" - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=api/db/api.db.changelog.yml --username=root update + - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=servicebroker/db/servicebroker.db.changelog.json --username=root update - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=scheduler/db/scheduler.changelog-master.yaml --username=root update - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=scheduler/db/quartz.changelog-master.yaml --username=root update + - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=src/autoscaler/metricscollector/db/metricscollector.db.changelog.yml --username=root update + - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=src/autoscaler/eventgenerator/db/dataaggregator.db.changelog.yml --username=root update + - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=src/autoscaler/scalingengine/db/scalingengine.db.changelog.yml --username=root update + - java -cp 'db/target/lib/*' liquibase.integration.commandline.Main --url jdbc:mysql://127.0.0.1/autoscaler --driver=com.mysql.cj.jdbc.Driver --changeLogFile=src/autoscaler/operator/db/operator.db.changelog.yml --username=root update jobs: include: - name: unit test script: # Unit test - pushd src/autoscaler + - sudo cp /var/lib/mysql/ca.pem /tmp/ca.pem + - export DBURL="postgres://postgres@localhost/autoscaler?sslmode=disable" + - ginkgo -r -race -randomizeAllSpecs + - export DBURL="root@tcp(localhost)/autoscaler?tls=false" - ginkgo -r -race -randomizeAllSpecs - popd + - sudo rm /tmp/ca.pem - pushd scheduler - mvn test - mvn test -Dspring.profiles.active=mysql @@ -66,6 +76,9 @@ jobs: - pushd scheduler - mvn package -DskipTests - popd + - export DBURL="postgres://postgres@localhost/autoscaler?sslmode=disable" + - ginkgo -r -race -randomizeAllSpecs src/integration + - export DBURL="root@tcp(localhost)/autoscaler?tls=false" - ginkgo -r -race -randomizeAllSpecs src/integration # Tests for legacy components (node apiserver, broker and metricscollector) diff --git a/scheduler/db/databasechangelog.yaml b/scheduler/db/databasechangelog.yaml new file mode 100644 index 000000000..ae241475f --- /dev/null +++ b/scheduler/db/databasechangelog.yaml @@ -0,0 +1,11 @@ +databaseChangeLog: + - changeSet: + id: 1 + author: aqan213 + dbms: mysql + changes: + - addPrimaryKey: + columnNames: "id,author,filename" + constraintName: "PK_DATABASECHANGELOG" + schemaName: autoscaler + tableName: DATABASECHANGELOG \ No newline at end of file diff --git a/scheduler/src/main/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManager.java b/scheduler/src/main/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManager.java index a222585d2..e41235c3b 100644 --- a/scheduler/src/main/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManager.java +++ b/scheduler/src/main/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManager.java @@ -479,15 +479,11 @@ private void notifyScalingEngineForDelete(String appId, long scheduleId) { try { restOperations.delete(scalingEnginePathActiveSchedule); } catch (HttpStatusCodeException hce) { - if (hce.getStatusCode() == HttpStatus.NOT_FOUND) { - message = messageBundleResourceHelper - .lookupMessage("scalingengine.notification.activeschedule.notFound", appId, scheduleId); - logger.info(message, hce); - } else { - String errorMessage = messageBundleResourceHelper.lookupMessage("scalingengine.notification.error", - hce.getResponseBodyAsString(), appId, scheduleId, "delete"); - throw new SchedulerInternalException(errorMessage, hce); - } + + String errorMessage = messageBundleResourceHelper.lookupMessage("scalingengine.notification.error", + hce.getResponseBodyAsString(), appId, scheduleId, "delete"); + throw new SchedulerInternalException(errorMessage, hce); + } catch (ResourceAccessException rae) { String errorMessage = messageBundleResourceHelper.lookupMessage("scalingengine.notification.error", rae.getMessage(), appId, scheduleId, "delete"); diff --git a/scheduler/src/test/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManagerTest.java b/scheduler/src/test/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManagerTest.java index 6723ec014..382dfc700 100644 --- a/scheduler/src/test/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManagerTest.java +++ b/scheduler/src/test/java/org/cloudfoundry/autoscaler/scheduler/service/ScheduleManagerTest.java @@ -535,7 +535,7 @@ public void testDeleteSchedules_when_activeSchedule_not_found_in_scalingEngine() String scalingEnginePathActiveSchedule = scalingEngineUrl + "/v1/apps/" + appId + "/active_schedules/" + scheduleId; mockServer.expect(ExpectedCount.times(1), requestTo(scalingEnginePathActiveSchedule)) - .andExpect(method(HttpMethod.DELETE)).andRespond(withStatus(HttpStatus.NOT_FOUND)); + .andExpect(method(HttpMethod.DELETE)).andRespond(withStatus(HttpStatus.OK)); scheduleManager.deleteSchedules(appId); @@ -554,11 +554,6 @@ public void testDeleteSchedules_when_activeSchedule_not_found_in_scalingEngine() Mockito.verify(activeScheduleDao, Mockito.times(1)).deleteActiveSchedulesByAppId(appId); mockServer.verify(); - - String expectedMessage = messageBundleResourceHelper - .lookupMessage("scalingengine.notification.activeschedule.notFound", appId, scheduleId); - assertThat(logCaptor.getValue().getMessage().getFormattedMessage(), Is.is(expectedMessage)); - assertThat("Log level should be INFO", logCaptor.getValue().getLevel(), Is.is(Level.INFO)); } @Test diff --git a/src/autoscaler/api/cmd/api/api_suite_test.go b/src/autoscaler/api/cmd/api/api_suite_test.go index 60b17ba1d..272473a73 100644 --- a/src/autoscaler/api/cmd/api/api_suite_test.go +++ b/src/autoscaler/api/cmd/api/api_suite_test.go @@ -58,7 +58,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { ap, err := gexec.Build("autoscaler/api/cmd/api", "-race") Expect(err).NotTo(HaveOccurred()) - apDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + apDB, err := sql.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = apDB.Exec("DELETE FROM binding") diff --git a/src/autoscaler/api/publicapiserver/public_api_server_test.go b/src/autoscaler/api/publicapiserver/public_api_server_test.go index 6b2beba50..05e112a13 100644 --- a/src/autoscaler/api/publicapiserver/public_api_server_test.go +++ b/src/autoscaler/api/publicapiserver/public_api_server_test.go @@ -372,7 +372,7 @@ var _ = Describe("PublicApiServer", func() { }) It("should succeed", func() { verifyResponse(httpClient, serverUrl, "/v1/apps/"+TEST_APP_ID+"/policy", - map[string]string{"Authorization": TEST_USER_TOKEN}, http.MethodPut, policy, http.StatusOK) + map[string]string{"Authorization": TEST_USER_TOKEN}, http.MethodDelete, policy, http.StatusOK) }) }) @@ -424,9 +424,9 @@ func verifyResponse(httpClient *http.Client, serverUrl *url.URL, path string, he req.Header.Set(headerName, headerValue) } } - Expect(err).NotTo(HaveOccurred()) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) rsp, err := httpClient.Do(req) - Expect(err).NotTo(HaveOccurred()) - Expect(rsp.StatusCode).To(Equal(expectResponseStatusCode)) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + ExpectWithOffset(1, rsp.StatusCode).To(Equal(expectResponseStatusCode)) } diff --git a/src/autoscaler/db/db.go b/src/autoscaler/db/db.go index 218e128a0..a5d5893f1 100644 --- a/src/autoscaler/db/db.go +++ b/src/autoscaler/db/db.go @@ -9,6 +9,7 @@ import ( ) const PostgresDriverName = "postgres" +const MysqlDriverName = "mysql" type OrderType uint8 diff --git a/src/autoscaler/db/db_suite_test.go b/src/autoscaler/db/db_suite_test.go new file mode 100644 index 000000000..71cd18053 --- /dev/null +++ b/src/autoscaler/db/db_suite_test.go @@ -0,0 +1,13 @@ +package db_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestDb(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Db Suite") +} \ No newline at end of file diff --git a/src/autoscaler/db/helper.go b/src/autoscaler/db/helper.go new file mode 100644 index 000000000..78b8a6b28 --- /dev/null +++ b/src/autoscaler/db/helper.go @@ -0,0 +1,148 @@ +package db + +import ( + "fmt" + "strings" + "crypto/tls" + "crypto/x509" + "io/ioutil" + "net/url" + "github.com/go-sql-driver/mysql" + +) + +type Database struct { + DriverName string + DSN string +} + +type MySQLConfig struct { + config *mysql.Config + cert string +} + +/** + This function is used to generate db connection info, for example, + For mysql: + input dbUrl: 'username:password@tcp(localhost:3306)/autoscaler?tls=custom&sslrootcert=db_ca.crt' + return: + &Database{DriverName: "mysql", DSN:"username:password@tcp(localhost:3306)/autoscaler?parseTime=true&tls=custom"} + +For postgres: + input dbUrl: postgres://postgres:password@localhost:5432/autoscaler?sslmode=disable + return: + &Database{DriverName: "postgres", DSN:"postgres://postgres:password@localhost:5432/autoscaler?sslmode=disable" + **/ + func GetConnection(dbUrl string) (*Database, error) { + database := &Database{} + + database.DriverName = detectDirver(dbUrl) + + switch database.DriverName { + case MysqlDriverName: + cfg, err := parseMySQLURL(dbUrl) + if err != nil { + return nil, err + } + + err = registerConfig(cfg) + if err != nil { + return nil, err + } + database.DSN = cfg.config.FormatDSN() + case PostgresDriverName: + database.DSN = dbUrl + } + return database, nil +} + +func registerConfig(cfg *MySQLConfig) error { + tlsValue := cfg.config.TLSConfig + if _, isBool := readBool(tlsValue); isBool || tlsValue == "" || strings.ToLower(tlsValue) == "skip-verify" || strings.ToLower(tlsValue) == "preferred" { + // Do nothing here + return nil + } else if cfg.cert != "" { + certBytes, err := ioutil.ReadFile(cfg.cert) + if err != nil { + return err + } + caCertPool := x509.NewCertPool() + if ok := caCertPool.AppendCertsFromPEM(certBytes); !ok { + return err + } + + tlsConfig := tls.Config{} + tlsConfig.RootCAs = caCertPool + if tlsValue == "verify_identity" { + tlsConfig.ServerName = strings.Split(cfg.config.Addr,":")[0] + } + + err = mysql.RegisterTLSConfig(tlsValue, &tlsConfig) + if err != nil { + return err + } + + } else { + return fmt.Errorf("sql ca file is not provided") + } + return nil +} + +func readBool(input string) (value bool, valid bool) { + switch input { + case "1", "true", "TRUE", "True": + return true, true + case "0", "false", "FALSE", "False": + return false, true + } + return +} + +func detectDirver(dbUrl string)(driver string) { + if strings.Contains(dbUrl, "postgres"){ + return PostgresDriverName + } else { + return MysqlDriverName + } +} + +// parseMySQLURL can parse the query parameters and remove invalid 'sslrootcert', it return mysql.Config and the cert file. +func parseMySQLURL(dbUrl string)( *MySQLConfig, error) { + var caCert string + var tlsValue string + if strings.Contains(dbUrl,"?"){ + u, err := url.ParseQuery(strings.Split(dbUrl,"?")[1]) + if err != nil { + return nil, err + } + urlParam := url.Values{} + for k, v := range u { + if k == "sslrootcert" { + caCert = v[0] + continue + } + if k == "tls" { + tlsValue = v[0] + continue + } + urlParam.Add(k, v[0]) + } + dbUrl = fmt.Sprintf("%s?%s",strings.Split(dbUrl,"?")[0], urlParam.Encode()) + } + + config, err := mysql.ParseDSN(dbUrl) + if err != nil { + return nil, err + } + config.ParseTime = true + + if tlsValue != "" { + config.TLSConfig = tlsValue + } + + return &MySQLConfig{ + config: config, + cert: caCert, + }, nil +} + diff --git a/src/autoscaler/db/helper_test.go b/src/autoscaler/db/helper_test.go new file mode 100644 index 000000000..e3a2b5895 --- /dev/null +++ b/src/autoscaler/db/helper_test.go @@ -0,0 +1,87 @@ +package db_test + +import ( + . "autoscaler/db" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Helper", func() { + var ( + dbUrl string + err error + database *Database + certPath string + ) + + Describe("GetConnection", func() { + + JustBeforeEach(func() { + database, err = GetConnection(dbUrl) + }) + Context("when mysql query parameters are provided", func() { + BeforeEach(func() { + dbUrl="root@tcp(localhost:3306)/autoscaler?tls=preferred" + }) + It("returns mysql database object", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(database).To(Equal(&Database{ + DriverName: "mysql", + DSN: "root@tcp(localhost:3306)/autoscaler?parseTime=true&tls=preferred", + })) + }) + }) + + Context("when mysql query parameters are not provided", func() { + BeforeEach(func() { + dbUrl="root@tcp(localhost:3306)/autoscaler" + }) + It("returns mysql database object", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(database).To(Equal(&Database{ + DriverName: "mysql", + DSN: "root@tcp(localhost:3306)/autoscaler?parseTime=true", + })) + }) + + }) + + Context("when need to verify mysql server, cert is provided ", func() { + BeforeEach(func() { + certPath="../../../test-certs/api.crt" + dbUrl="root@tcp(localhost:3306)/autoscaler?tls=verify-ca&sslrootcert="+certPath + }) + It("returns mysql database connection", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(database).To(Equal(&Database{ + DriverName: "mysql", + DSN: "root@tcp(localhost:3306)/autoscaler?parseTime=true&tls=verify-ca", + })) + }) + }) + + Context("when need to verify mysql server, cert is not provided ", func() { + BeforeEach(func() { + dbUrl="root@tcp(localhost:3306)/autoscaler?tls=verify-ca" + }) + It("should error", func() { + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql ca file is not provided")) + }) + }) + + Context("when postgres dburl is provided", func() { + BeforeEach(func() { + dbUrl="postgres://postgres:password@localhost:5432/autoscaler?sslmode=disable" + }) + It("returns postgres database object", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(database).To(Equal(&Database{ + DriverName: "postgres", + DSN: "postgres://postgres:password@localhost:5432/autoscaler?sslmode=disable", + })) + }) + }) + }) +}) diff --git a/src/autoscaler/db/sqldb/appmetric_sqldb.go b/src/autoscaler/db/sqldb/appmetric_sqldb.go index acb403fe1..f528b49ff 100644 --- a/src/autoscaler/db/sqldb/appmetric_sqldb.go +++ b/src/autoscaler/db/sqldb/appmetric_sqldb.go @@ -4,9 +4,11 @@ import ( "autoscaler/db" "autoscaler/models" "context" + "strings" "code.cloudfoundry.org/lager" . "github.com/lib/pq" + "github.com/jmoiron/sqlx" "database/sql" "time" @@ -15,13 +17,17 @@ import ( type AppMetricSQLDB struct { dbConfig db.DatabaseConfig logger lager.Logger - sqldb *sql.DB + sqldb *sqlx.DB } func NewAppMetricSQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*AppMetricSQLDB, error) { var err error + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("open-AppMetric-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -53,7 +59,7 @@ func (adb *AppMetricSQLDB) Close() error { return nil } func (adb *AppMetricSQLDB) SaveAppMetric(appMetric *models.AppMetric) error { - query := "INSERT INTO app_metric(app_id, metric_type, unit, timestamp, value) values($1, $2, $3, $4, $5)" + query := adb.sqldb.Rebind("INSERT INTO app_metric(app_id, metric_type, unit, timestamp, value) values(?, ?, ?, ?, ?)") _, err := adb.sqldb.Exec(query, appMetric.AppId, appMetric.MetricType, appMetric.Unit, appMetric.Timestamp, appMetric.Value) if err != nil { @@ -70,31 +76,68 @@ func (adb *AppMetricSQLDB) SaveAppMetricsInBulk(appMetrics []*models.AppMetric) adb.logger.Error("failed-to-start-transaction", err) return err } - stmt, err := txn.Prepare(CopyIn("app_metric", "app_id", "metric_type", "unit", "timestamp", "value")) - if err != nil { - adb.logger.Error("failed-to-prepare-statement", err) - txn.Rollback() - return err - } - for _, appMetric := range appMetrics { - _, err := stmt.Exec(appMetric.AppId, appMetric.MetricType, appMetric.Unit, appMetric.Timestamp, appMetric.Value) + + switch adb.sqldb.DriverName() { + case "postgres": + stmt, err := txn.Prepare(CopyIn("app_metric", "app_id", "metric_type", "unit", "timestamp", "value")) if err != nil { - adb.logger.Error("failed-to-execute", err) + adb.logger.Error("failed-to-prepare-statement", err) + txn.Rollback() + return err + } + for _, appMetric := range appMetrics { + _, err := stmt.Exec(appMetric.AppId, appMetric.MetricType, appMetric.Unit, appMetric.Timestamp, appMetric.Value) + if err != nil { + adb.logger.Error("failed-to-execute", err) + txn.Rollback() + return err + } + } + _, err = stmt.Exec() + if err != nil { + adb.logger.Error("failed-to-execute-statement", err) + txn.Rollback() + return err + } + err = stmt.Close() + if err != nil { + adb.logger.Error("failed-to-close-statement", err) + txn.Rollback() + return err + } + + case "mysql": + sqlStr :="INSERT INTO app_metric(app_id,metric_type,unit,timestamp,value)VALUES" + vals := []interface{}{} + if appMetrics == nil || len(appMetrics)==0 { + txn.Rollback() + return nil + } + for _, appMetric := range appMetrics { + sqlStr += "(?, ?, ?, ?, ?)," + vals = append(vals, appMetric.AppId, appMetric.MetricType, appMetric.Unit, appMetric.Timestamp, appMetric.Value) + } + sqlStr = strings.TrimSuffix(sqlStr, ",") + + stmt, err := txn.Prepare(sqlStr) + if err != nil { + adb.logger.Error("failed-to-prepare-statement", err) + txn.Rollback() + return err + } + + _, err = stmt.Exec(vals...) + if err != nil { + adb.logger.Error("failed-to-execute-statement", err) + txn.Rollback() + return err + } + err = stmt.Close() + if err != nil { + adb.logger.Error("failed-to-close-statement", err) txn.Rollback() return err } - } - _, err = stmt.Exec() - if err != nil { - adb.logger.Error("failed-to-execute-statement", err) - txn.Rollback() - return err - } - err = stmt.Close() - if err != nil { - adb.logger.Error("failed-to-close-statement", err) - txn.Rollback() - return err } err = txn.Commit() @@ -118,7 +161,7 @@ func (adb *AppMetricSQLDB) RetrieveAppMetrics(appIdP string, metricTypeP string, endP = time.Now().UnixNano() } - query := "SELECT app_id,metric_type,value,unit,timestamp FROM app_metric WHERE app_id=$1 AND metric_type=$2 AND timestamp>=$3 AND timestamp<=$4 ORDER BY timestamp " + orderStr + query := adb.sqldb.Rebind("SELECT app_id,metric_type,value,unit,timestamp FROM app_metric WHERE app_id=? AND metric_type=? AND timestamp>=? AND timestamp<=? ORDER BY timestamp " + orderStr) appMetricList := []*models.AppMetric{} rows, err := adb.sqldb.Query(query, appIdP, metricTypeP, startP, endP) if err != nil { @@ -150,7 +193,7 @@ func (adb *AppMetricSQLDB) RetrieveAppMetrics(appIdP string, metricTypeP string, } func (adb *AppMetricSQLDB) PruneAppMetrics(before int64) error { - query := "DELETE FROM app_metric WHERE timestamp <= $1" + query := adb.sqldb.Rebind("DELETE FROM app_metric WHERE timestamp <= ?") _, err := adb.sqldb.Exec(query, before) if err != nil { adb.logger.Error("prune-metrics-from-app_metric-table", err, lager.Data{"query": query, "before": before}) diff --git a/src/autoscaler/db/sqldb/appmetric_sqldb_test.go b/src/autoscaler/db/sqldb/appmetric_sqldb_test.go index 5e6c7a021..e39e0c890 100644 --- a/src/autoscaler/db/sqldb/appmetric_sqldb_test.go +++ b/src/autoscaler/db/sqldb/appmetric_sqldb_test.go @@ -8,9 +8,9 @@ import ( "autoscaler/db" . "autoscaler/db/sqldb" "autoscaler/models" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -59,12 +59,20 @@ var _ = Describe("AppMetricSQLDB", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) - + + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when db url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) @@ -115,6 +123,16 @@ var _ = Describe("AppMetricSQLDB", func() { err = adb.Close() Expect(err).NotTo(HaveOccurred()) }) + Context("When inserting an empty array of app_metric", func() { + BeforeEach(func() { + appMetrics := []*models.AppMetric{} + err = adb.SaveAppMetricsInBulk(appMetrics) + }) + It("Should return nil", func(){ + Expect(err).To(BeNil()) + }) + }) + Context("When inserting an array of app_metric", func() { BeforeEach(func() { appMetrics := []*models.AppMetric{ diff --git a/src/autoscaler/db/sqldb/binding_sqldb.go b/src/autoscaler/db/sqldb/binding_sqldb.go index 819588d4b..6bc59c6d7 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb.go +++ b/src/autoscaler/db/sqldb/binding_sqldb.go @@ -6,6 +6,8 @@ import ( "code.cloudfoundry.org/lager" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "autoscaler/db" ) @@ -13,11 +15,16 @@ import ( type BindingSQLDB struct { dbConfig db.DatabaseConfig logger lager.Logger - sqldb *sql.DB + sqldb *sqlx.DB } func NewBindingSQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*BindingSQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("open-binding-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -51,7 +58,7 @@ func (bdb *BindingSQLDB) Close() error { } func (bdb *BindingSQLDB) CreateServiceInstance(serviceInstanceId string, orgId string, spaceId string) error { - query := "SELECT org_id, space_id FROM service_instance WHERE service_instance_id = $1" + query := bdb.sqldb.Rebind("SELECT org_id, space_id FROM service_instance WHERE service_instance_id =?") rows, err := bdb.sqldb.Query(query, serviceInstanceId) if err != nil { bdb.logger.Error("create-service-instance", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId, "orgid": orgId, "spaceid": spaceId}) @@ -76,9 +83,9 @@ func (bdb *BindingSQLDB) CreateServiceInstance(serviceInstanceId string, orgId s } rows.Close() - query = "INSERT INTO service_instance" + + query = bdb.sqldb.Rebind("INSERT INTO service_instance" + "(service_instance_id, org_id, space_id) " + - " VALUES($1, $2, $3)" + " VALUES(?, ?, ?)") _, err = bdb.sqldb.Exec(query, serviceInstanceId, orgId, spaceId) if err != nil { @@ -88,7 +95,7 @@ func (bdb *BindingSQLDB) CreateServiceInstance(serviceInstanceId string, orgId s } func (bdb *BindingSQLDB) DeleteServiceInstance(serviceInstanceId string) error { - query := "SELECT FROM service_instance WHERE service_instance_id = $1" + query := bdb.sqldb.Rebind("SELECT * FROM service_instance WHERE service_instance_id =?") rows, err := bdb.sqldb.Query(query, serviceInstanceId) if err != nil { bdb.logger.Error("delete-service-instance", err, lager.Data{"query": query, "serviceinstanceid": serviceInstanceId}) @@ -97,7 +104,7 @@ func (bdb *BindingSQLDB) DeleteServiceInstance(serviceInstanceId string) error { if rows.Next() { rows.Close() - query = "DELETE FROM service_instance WHERE service_instance_id = $1" + query = bdb.sqldb.Rebind("DELETE FROM service_instance WHERE service_instance_id =?") _, err = bdb.sqldb.Exec(query, serviceInstanceId) if err != nil { @@ -110,7 +117,7 @@ func (bdb *BindingSQLDB) DeleteServiceInstance(serviceInstanceId string) error { } func (bdb *BindingSQLDB) CreateServiceBinding(bindingId string, serviceInstanceId string, appId string) error { - query := "SELECT FROM binding WHERE app_id = $1" + query := bdb.sqldb.Rebind("SELECT * FROM binding WHERE app_id =?") rows, err := bdb.sqldb.Query(query, appId) if err != nil { bdb.logger.Error("create-service-binding", err, lager.Data{"query": query, "appId": appId, "serviceId": serviceInstanceId, "bindingId": bindingId}) @@ -123,9 +130,9 @@ func (bdb *BindingSQLDB) CreateServiceBinding(bindingId string, serviceInstanceI } rows.Close() - query = "INSERT INTO binding" + + query = bdb.sqldb.Rebind("INSERT INTO binding" + "(binding_id, service_instance_id, app_id, created_at) " + - "VALUES($1, $2, $3, $4)" + "VALUES(?, ?, ?, ?)") _, err = bdb.sqldb.Exec(query, bindingId, serviceInstanceId, appId, time.Now()) if err != nil { @@ -134,7 +141,7 @@ func (bdb *BindingSQLDB) CreateServiceBinding(bindingId string, serviceInstanceI return err } func (bdb *BindingSQLDB) DeleteServiceBinding(bindingId string) error { - query := "SELECT FROM binding WHERE binding_id = $1" + query := bdb.sqldb.Rebind("SELECT * FROM binding WHERE binding_id =?") rows, err := bdb.sqldb.Query(query, bindingId) if err != nil { bdb.logger.Error("delete-service-binding", err, lager.Data{"query": query, "bindingId": bindingId}) @@ -143,7 +150,7 @@ func (bdb *BindingSQLDB) DeleteServiceBinding(bindingId string) error { if rows.Next() { rows.Close() - query = "DELETE FROM binding WHERE binding_id = $1" + query = bdb.sqldb.Rebind("DELETE FROM binding WHERE binding_id =?") _, err = bdb.sqldb.Exec(query, bindingId) if err != nil { @@ -156,7 +163,7 @@ func (bdb *BindingSQLDB) DeleteServiceBinding(bindingId string) error { return db.ErrDoesNotExist } func (bdb *BindingSQLDB) DeleteServiceBindingByAppId(appId string) error { - query := "DELETE FROM binding WHERE app_id = $1" + query := bdb.sqldb.Rebind("DELETE FROM binding WHERE app_id =?") _, err := bdb.sqldb.Exec(query, appId) if err != nil { @@ -167,7 +174,7 @@ func (bdb *BindingSQLDB) DeleteServiceBindingByAppId(appId string) error { } func (bdb *BindingSQLDB) CheckServiceBinding(appId string) bool { var count int - query := "SELECT COUNT(*) FROM binding WHERE app_id=$1" + query := bdb.sqldb.Rebind("SELECT COUNT(*) FROM binding WHERE app_id=?") bdb.sqldb.QueryRow(query, appId).Scan(&count) return count > 0 } @@ -176,7 +183,7 @@ func (bdb *BindingSQLDB) GetDBStatus() sql.DBStats { } func (bdb *BindingSQLDB) GetAppIdByBindingId(bindingId string) (string, error) { var appId string - query := "SELECT app_id FROM binding WHERE binding_id=$1" + query := bdb.sqldb.Rebind("SELECT app_id FROM binding WHERE binding_id=?") err := bdb.sqldb.QueryRow(query, bindingId).Scan(&appId) if err != nil { bdb.logger.Error("get-appid-from-binding-table", err, lager.Data{"query": query, "bindingId": bindingId}) diff --git a/src/autoscaler/db/sqldb/binding_sqldb_test.go b/src/autoscaler/db/sqldb/binding_sqldb_test.go index 912aab947..1e2d7ac29 100644 --- a/src/autoscaler/db/sqldb/binding_sqldb_test.go +++ b/src/autoscaler/db/sqldb/binding_sqldb_test.go @@ -6,9 +6,9 @@ import ( "database/sql" "os" "time" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -53,12 +53,20 @@ var _ = Describe("BindingSqldb", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when db url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) @@ -160,7 +168,7 @@ var _ = Describe("BindingSqldb", func() { Context("When service instance doesn't exist", func() { It("should error", func() { Expect(err).To(HaveOccurred()) - Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) + Expect(err.Error()).To(ContainSubstring("foreign key constraint")) Expect(hasServiceBinding(testBindingId, testInstanceId)).NotTo(BeTrue()) }) }) diff --git a/src/autoscaler/db/sqldb/instancemetrics_sqldb.go b/src/autoscaler/db/sqldb/instancemetrics_sqldb.go index d3d1fdf63..8da8044f5 100644 --- a/src/autoscaler/db/sqldb/instancemetrics_sqldb.go +++ b/src/autoscaler/db/sqldb/instancemetrics_sqldb.go @@ -6,20 +6,27 @@ import ( "code.cloudfoundry.org/lager" . "github.com/lib/pq" + "github.com/jmoiron/sqlx" "context" "database/sql" "time" + "strings" ) type InstanceMetricsSQLDB struct { logger lager.Logger dbConfig db.DatabaseConfig - sqldb *sql.DB + sqldb *sqlx.DB } func NewInstanceMetricsSQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*InstanceMetricsSQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("failed-open-instancemetrics-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -53,7 +60,7 @@ func (idb *InstanceMetricsSQLDB) Close() error { } func (idb *InstanceMetricsSQLDB) SaveMetric(metric *models.AppInstanceMetric) error { - query := "INSERT INTO appinstancemetrics(appid, instanceindex, collectedat, name, unit, value, timestamp) values($1, $2, $3, $4, $5, $6, $7)" + query := idb.sqldb.Rebind("INSERT INTO appinstancemetrics(appid, instanceindex, collectedat, name, unit, value, timestamp) values(?, ?, ?, ?, ?, ?, ?)") _, err := idb.sqldb.Exec(query, metric.AppId, metric.InstanceIndex, metric.CollectedAt, metric.Name, metric.Unit, metric.Value, metric.Timestamp) if err != nil { @@ -70,34 +77,68 @@ func (idb *InstanceMetricsSQLDB) SaveMetricsInBulk(metrics []*models.AppInstance idb.logger.Error("failed-to-start-transaction", err) return err } + switch idb.sqldb.DriverName() { + case "postgres": + stmt, err := txn.Prepare(CopyIn("appinstancemetrics", "appid", "instanceindex", "collectedat", "name", "unit", "value", "timestamp")) + if err != nil { + idb.logger.Error("failed-to-prepare-statement", err) + txn.Rollback() + return err + } + for _, metric := range metrics { + _, err := stmt.Exec(metric.AppId, metric.InstanceIndex, metric.CollectedAt, metric.Name, metric.Unit, metric.Value, metric.Timestamp) + if err != nil { + idb.logger.Error("failed-to-execute", err) + txn.Rollback() + return err + } + } - stmt, err := txn.Prepare(CopyIn("appinstancemetrics", "appid", "instanceindex", "collectedat", "name", "unit", "value", "timestamp")) - if err != nil { - idb.logger.Error("failed-to-prepare-statement", err) - txn.Rollback() - return err - } - for _, metric := range metrics { - _, err := stmt.Exec(metric.AppId, metric.InstanceIndex, metric.CollectedAt, metric.Name, metric.Unit, metric.Value, metric.Timestamp) + _, err = stmt.Exec() if err != nil { - idb.logger.Error("failed-to-execute", err) + idb.logger.Error("failed-to-execute-statement", err) txn.Rollback() return err } - } - _, err = stmt.Exec() - if err != nil { - idb.logger.Error("failed-to-execute-statement", err) - txn.Rollback() - return err - } + err = stmt.Close() + if err != nil { + idb.logger.Error("failed-to-close-statement", err) + txn.Rollback() + return err + } + case "mysql": + sqlStr :="INSERT INTO appinstancemetrics(appid, instanceindex, collectedat, name, unit, value, timestamp)VALUES" + vals := []interface{}{} + if metrics == nil || len(metrics) == 0 { + txn.Rollback() + return nil + } + for _, metric := range metrics { + sqlStr += "(?, ?, ?, ?, ?, ?, ?)," + vals = append(vals, metric.AppId, metric.InstanceIndex, metric.CollectedAt, metric.Name, metric.Unit, metric.Value, metric.Timestamp) + } + sqlStr = strings.TrimSuffix(sqlStr, ",") - err = stmt.Close() - if err != nil { - idb.logger.Error("failed-to-close-statement", err) - txn.Rollback() - return err + stmt, err := txn.Prepare(sqlStr) + if err != nil { + idb.logger.Error("failed-to-prepare-statement", err) + txn.Rollback() + return err + } + + _, err = stmt.Exec(vals...) + if err != nil { + idb.logger.Error("failed-to-execute-statement", err) + txn.Rollback() + return err + } + err = stmt.Close() + if err != nil { + idb.logger.Error("failed-to-close-statement", err) + txn.Rollback() + return err + } } err = txn.Commit() @@ -117,20 +158,20 @@ func (idb *InstanceMetricsSQLDB) RetrieveInstanceMetrics(appid string, instanceI } else { orderStr = db.DESCSTR } - query := "SELECT instanceindex, collectedat, unit, value, timestamp FROM appinstancemetrics WHERE " + - " appid = $1 " + - " AND name = $2 " + - " AND timestamp >= $3" + - " AND timestamp <= $4" + - " ORDER BY timestamp " + orderStr + ", instanceindex" - - queryByInstanceIndex := "SELECT instanceindex, collectedat, unit, value, timestamp FROM appinstancemetrics WHERE " + - " appid = $1 " + - " AND instanceindex = $2" + - " AND name = $3 " + - " AND timestamp >= $4" + - " AND timestamp <= $5" + - " ORDER BY timestamp " + orderStr + query := idb.sqldb.Rebind("SELECT instanceindex, collectedat, unit, value, timestamp FROM appinstancemetrics WHERE " + + " appid = ? " + + " AND name = ? " + + " AND timestamp >= ?" + + " AND timestamp <= ?" + + " ORDER BY timestamp " + orderStr + ", instanceindex") + + queryByInstanceIndex := idb.sqldb.Rebind("SELECT instanceindex, collectedat, unit, value, timestamp FROM appinstancemetrics WHERE " + + " appid = ? " + + " AND instanceindex = ?" + + " AND name = ? " + + " AND timestamp >= ?" + + " AND timestamp <= ?" + + " ORDER BY timestamp " + orderStr) if end < 0 { end = time.Now().UnixNano() @@ -185,7 +226,7 @@ func (idb *InstanceMetricsSQLDB) RetrieveInstanceMetrics(appid string, instanceI return mtrcs, nil } func (idb *InstanceMetricsSQLDB) PruneInstanceMetrics(before int64) error { - query := "DELETE FROM appinstancemetrics WHERE timestamp <= $1" + query := idb.sqldb.Rebind("DELETE FROM appinstancemetrics WHERE timestamp <= ?") _, err := idb.sqldb.Exec(query, before) if err != nil { idb.logger.Error("failed-prune-instancemetric-from-appinstancemetrics-table", err, lager.Data{"query": query, "before": before}) diff --git a/src/autoscaler/db/sqldb/instancemetrics_sqldb_test.go b/src/autoscaler/db/sqldb/instancemetrics_sqldb_test.go index a25542973..abb73b34b 100644 --- a/src/autoscaler/db/sqldb/instancemetrics_sqldb_test.go +++ b/src/autoscaler/db/sqldb/instancemetrics_sqldb_test.go @@ -4,9 +4,9 @@ import ( "autoscaler/db" . "autoscaler/db/sqldb" "autoscaler/models" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/onsi/gomega/gstruct" @@ -65,12 +65,20 @@ var _ = Describe("InstancemetricsSqldb", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) @@ -159,6 +167,15 @@ var _ = Describe("InstancemetricsSqldb", func() { err = idb.Close() Expect(err).NotTo(HaveOccurred()) }) + Context("When inserting an empty array of metrics", func() { + BeforeEach(func() { + metrics := []*models.AppInstanceMetric{} + err = idb.SaveMetricsInBulk(metrics) + }) + It("Should return nil", func(){ + Expect(err).To(BeNil()) + }) + }) Context("When inserting an array of metrics", func() { BeforeEach(func() { @@ -279,12 +296,6 @@ var _ = Describe("InstancemetricsSqldb", func() { err = idb.SaveMetric(metric) Expect(err).NotTo(HaveOccurred()) - metric.InstanceIndex = 0 - metric.CollectedAt = 222222 - metric.Value = "654321" - metric.Timestamp = 111100 - err = idb.SaveMetric(metric) - Expect(err).NotTo(HaveOccurred()) start = 0 end = -1 @@ -343,7 +354,7 @@ var _ = Describe("InstancemetricsSqldb", func() { orderType = db.ASC instanceIndex = -1 }) - It("removes duplicates and returns all the instance metrics of the app ordered by timestamp asc, instanceindex asc", func() { + It("returns all the instance metrics of the app ordered by timestamp asc, instanceindex asc", func() { Expect(err).NotTo(HaveOccurred()) Expect(mtrcs).To(HaveLen(4)) Expect(*mtrcs[0]).To(gstruct.MatchAllFields(gstruct.Fields{ @@ -393,7 +404,7 @@ var _ = Describe("InstancemetricsSqldb", func() { orderType = db.DESC instanceIndex = -1 }) - It("removes duplicates and returns all the instance metrics of the app ordered by timestamp desc, instanceindex asc", func() { + It("returns all the instance metrics of the app ordered by timestamp desc, instanceindex asc", func() { Expect(err).NotTo(HaveOccurred()) Expect(mtrcs).To(HaveLen(4)) @@ -474,7 +485,7 @@ var _ = Describe("InstancemetricsSqldb", func() { BeforeEach(func() { instanceIndex = 1 }) - It("removes duplicates and returns all the instance metrics of the app ordered by timestamp desc", func() { + It("returns all the instance metrics of the app ordered by timestamp desc", func() { Expect(err).NotTo(HaveOccurred()) Expect(mtrcs).To(HaveLen(2)) Expect(*mtrcs[1]).To(gstruct.MatchAllFields(gstruct.Fields{ @@ -651,7 +662,7 @@ var _ = Describe("InstancemetricsSqldb", func() { metric.InstanceIndex = 0 metric.CollectedAt = 222222 metric.Value = "654321" - metric.Timestamp = 111100 + metric.Timestamp = 111110 err = idb.SaveMetric(metric) Expect(err).NotTo(HaveOccurred()) }) @@ -712,3 +723,4 @@ var _ = Describe("InstancemetricsSqldb", func() { }) }) + diff --git a/src/autoscaler/db/sqldb/lock_sqldb.go b/src/autoscaler/db/sqldb/lock_sqldb.go index 5184c6c9f..96d6b31a5 100644 --- a/src/autoscaler/db/sqldb/lock_sqldb.go +++ b/src/autoscaler/db/sqldb/lock_sqldb.go @@ -7,6 +7,8 @@ import ( "code.cloudfoundry.org/lager" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "autoscaler/db" "autoscaler/models" @@ -16,11 +18,16 @@ type LockSQLDB struct { dbConfig db.DatabaseConfig logger lager.Logger table string - sqldb *sql.DB + sqldb *sqlx.DB } func NewLockSQLDB(dbConfig db.DatabaseConfig, table string, logger lager.Logger) (*LockSQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("open-lock-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -61,13 +68,19 @@ func (ldb *LockSQLDB) fetch(tx *sql.Tx) (*models.Lock, error) { timestamp time.Time ttl time.Duration ) - tquery := "LOCK TABLE " + ldb.table + " IN ACCESS EXCLUSIVE MODE" - if _, err := tx.Exec(tquery); err != nil { - ldb.logger.Error("failed-to-set-table-level-lock", err) - return &models.Lock{}, err + + if ldb.sqldb.DriverName() == "postgres" { + tquery := "LOCK TABLE " + ldb.table + " IN ACCESS EXCLUSIVE MODE" + if _, err := tx.Exec(tquery); err != nil { + ldb.logger.Error("failed-to-set-table-level-lock", err) + return &models.Lock{}, err + } } - query := "SELECT owner,lock_timestamp,ttl FROM " + ldb.table + " LIMIT 1 FOR UPDATE NOWAIT" + query := "SELECT owner,lock_timestamp,ttl FROM " + ldb.table + " LIMIT 1 FOR UPDATE" + if ldb.sqldb.DriverName() == "postgres" { + query = query +" NOWAIT " + } row := tx.QueryRow(query) err := row.Scan(&owner, ×tamp, &ttl) if err != nil { @@ -84,7 +97,7 @@ func (ldb *LockSQLDB) fetch(tx *sql.Tx) (*models.Lock, error) { func (ldb *LockSQLDB) remove(owner string, tx *sql.Tx) error { ldb.logger.Debug("removing-lock", lager.Data{"Owner": owner}) - query := "DELETE FROM " + ldb.table + " WHERE owner = $1" + query := ldb.sqldb.Rebind("DELETE FROM " + ldb.table + " WHERE owner = ?") if _, err := tx.Exec(query, owner); err != nil { ldb.logger.Error("failed-to-delete-lock-details-during-release-lock", err) return err @@ -99,7 +112,7 @@ func (ldb *LockSQLDB) insert(lockDetails *models.Lock, tx *sql.Tx) error { ldb.logger.Error("error-getting-timestamp-while-inserting-lock-details", err) return err } - query := "INSERT INTO " + ldb.table + " (owner,lock_timestamp,ttl) VALUES ($1,$2,$3)" + query := ldb.sqldb.Rebind("INSERT INTO " + ldb.table + " (owner,lock_timestamp,ttl) VALUES (?,?,?)") if _, err = tx.Exec(query, lockDetails.Owner, currentTimestamp, int64(lockDetails.Ttl/time.Second)); err != nil { ldb.logger.Error("failed-to-insert-lock-details", err) return err @@ -114,7 +127,7 @@ func (ldb *LockSQLDB) renew(owner string, tx *sql.Tx) error { ldb.logger.Error("error-getting-timestamp-while-renewing-lock", err) return err } - updatequery := "UPDATE " + ldb.table + " SET lock_timestamp=$1 where owner=$2" + updatequery := ldb.sqldb.Rebind("UPDATE " + ldb.table + " SET lock_timestamp=? where owner=?") if _, err = tx.Exec(updatequery, currentTimestamp, owner); err != nil { ldb.logger.Error("failed-to-update-lock-details-during-lock-renewal", err) return err @@ -125,7 +138,7 @@ func (ldb *LockSQLDB) renew(owner string, tx *sql.Tx) error { func (ldb *LockSQLDB) Release(owner string) error { ldb.logger.Debug("releasing-lock", lager.Data{"Owner": owner}) err := ldb.transact(ldb.sqldb, func(tx *sql.Tx) error { - query := "DELETE FROM " + ldb.table + " WHERE owner = $1" + query := ldb.sqldb.Rebind("DELETE FROM " + ldb.table + " WHERE owner = ?") if _, err := tx.Exec(query, owner); err != nil { ldb.logger.Error("failed-to-delete-lock-details-during-release-lock", err) return err @@ -202,7 +215,17 @@ func (ldb *LockSQLDB) Lock(lock *models.Lock) (bool, error) { func (ldb *LockSQLDB) getDatabaseTimestamp(tx *sql.Tx) (time.Time, error) { var currentTimestamp time.Time - err := tx.QueryRow("SELECT NOW() AT TIME ZONE 'utc'").Scan(¤tTimestamp) + var query string + switch ldb.sqldb.DriverName() { + case "postgres": + query = "SELECT NOW() AT TIME ZONE 'utc'" + case "mysql": + query = "SELECT UTC_TIMESTAMP()" + default: + return time.Time{}, nil + } + + err := tx.QueryRow(query).Scan(¤tTimestamp) if err != nil { ldb.logger.Error("failed-fetching-timestamp", err) return time.Time{}, err @@ -210,7 +233,7 @@ func (ldb *LockSQLDB) getDatabaseTimestamp(tx *sql.Tx) (time.Time, error) { return currentTimestamp, nil } -func (ldb *LockSQLDB) transact(db *sql.DB, f func(tx *sql.Tx) error) error { +func (ldb *LockSQLDB) transact(db *sqlx.DB, f func(tx *sql.Tx) error) error { var err error for attempts := 0; attempts < 3; attempts++ { err = func() error { @@ -245,3 +268,4 @@ func (ldb *LockSQLDB) transact(db *sql.DB, f func(tx *sql.Tx) error) error { return err } + diff --git a/src/autoscaler/db/sqldb/lock_sqldb_test.go b/src/autoscaler/db/sqldb/lock_sqldb_test.go index 44fb35359..f9ba73961 100644 --- a/src/autoscaler/db/sqldb/lock_sqldb_test.go +++ b/src/autoscaler/db/sqldb/lock_sqldb_test.go @@ -7,10 +7,9 @@ import ( "os" "code.cloudfoundry.org/lager" - - "time" - "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -49,16 +48,24 @@ var _ = Describe("LockSqldb", func() { } }) - Context("when lock db url is not correct", func() { + Context("when db url is not correct", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when lock db url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/db/sqldb/policy_sqldb.go b/src/autoscaler/db/sqldb/policy_sqldb.go index 97196492e..447047d2e 100644 --- a/src/autoscaler/db/sqldb/policy_sqldb.go +++ b/src/autoscaler/db/sqldb/policy_sqldb.go @@ -8,16 +8,23 @@ import ( "code.cloudfoundry.org/lager" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" ) type PolicySQLDB struct { dbConfig db.DatabaseConfig logger lager.Logger - sqldb *sql.DB + sqldb *sqlx.DB } func NewPolicySQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*PolicySQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("open-policy-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -103,7 +110,7 @@ func (pdb *PolicySQLDB) RetrievePolicies() ([]*models.PolicyJson, error) { func (pdb *PolicySQLDB) GetAppPolicy(appId string) (*models.ScalingPolicy, error) { var policyJson []byte - query := "SELECT policy_json FROM policy_json WHERE app_id = $1" + query := pdb.sqldb.Rebind("SELECT policy_json FROM policy_json WHERE app_id =?") err := pdb.sqldb.QueryRow(query, appId).Scan(&policyJson) if err == sql.ErrNoRows { return nil, nil @@ -124,9 +131,14 @@ func (pdb *PolicySQLDB) GetAppPolicy(appId string) (*models.ScalingPolicy, error } func (pdb *PolicySQLDB) SaveAppPolicy(appId string, policyJSON string, policyGuid string) error { - query := "INSERT INTO policy_json (app_id, policy_json, guid) VALUES ($1,$2, $3) " + - "ON CONFLICT(app_id) DO UPDATE SET policy_json=EXCLUDED.policy_json, guid=EXCLUDED.guid" - + var query string + queryPrefix := "INSERT INTO policy_json (app_id, policy_json, guid) VALUES (?,?,?) " + switch pdb.sqldb.DriverName(){ + case "postgres": + query = pdb.sqldb.Rebind(queryPrefix + "ON CONFLICT(app_id) DO UPDATE SET policy_json=EXCLUDED.policy_json, guid=EXCLUDED.guid") + case "mysql": + query = pdb.sqldb.Rebind(queryPrefix + "ON DUPLICATE KEY UPDATE policy_json=VALUES(policy_json), guid=VALUES(guid)") + } _, err := pdb.sqldb.Exec(query, appId, policyJSON, policyGuid) if err != nil { pdb.logger.Error("save-app-policy", err, lager.Data{"query": query, "app_id": appId, "policyJSON": policyJSON, "policyGuid": policyGuid}) @@ -135,7 +147,7 @@ func (pdb *PolicySQLDB) SaveAppPolicy(appId string, policyJSON string, policyGui } func (pdb *PolicySQLDB) DeletePolicy(appId string) error { - query := "DELETE FROM policy_json WHERE app_id = $1" + query := pdb.sqldb.Rebind("DELETE FROM policy_json WHERE app_id =?") _, err := pdb.sqldb.Exec(query, appId) if err != nil { pdb.logger.Error("failed-to-delete-application-details", err, lager.Data{"query": query, "appId": appId}) @@ -150,7 +162,7 @@ func (pdb *PolicySQLDB) GetDBStatus() sql.DBStats { func (pdb *PolicySQLDB) GetCredential(appId string) (*models.Credential, error) { var password string var username string - query := "SELECT username,password from credentials WHERE id = $1" + query := pdb.sqldb.Rebind("SELECT username,password from credentials WHERE id =?") err := pdb.sqldb.QueryRow(query, appId).Scan(&username, &password) if err != nil { pdb.logger.Error("get-custom-metrics-creds-from-credentials-table", err, lager.Data{"query": query}) @@ -162,8 +174,14 @@ func (pdb *PolicySQLDB) GetCredential(appId string) (*models.Credential, error) }, nil } func (pdb *PolicySQLDB) SaveCredential(appId string, cred models.Credential) error { - query := "INSERT INTO credentials (id, username, password, updated_at) VALUES ($1, $2, $3, CURRENT_TIMESTAMP) " + - "ON CONFLICT(id) DO UPDATE SET username=EXCLUDED.username, password=EXCLUDED.password, updated_at=CURRENT_TIMESTAMP" + var query string + queryPrefix := "INSERT INTO credentials (id, username, password, updated_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) " + switch pdb.sqldb.DriverName() { + case "postgres": + query = pdb.sqldb.Rebind(queryPrefix + "ON CONFLICT(id) DO UPDATE SET username=EXCLUDED.username, password=EXCLUDED.password, updated_at=CURRENT_TIMESTAMP") + case "mysql": + query = pdb.sqldb.Rebind(queryPrefix + "ON DUPLICATE KEY UPDATE username=VALUES(username), password=VALUES(password), updated_at=CURRENT_TIMESTAMP") + } _, err := pdb.sqldb.Exec(query, appId, cred.Username, cred.Password) if err != nil { pdb.logger.Error("save-custom-metric-credential", err, lager.Data{"query": query, "app_id": appId}) @@ -171,7 +189,7 @@ func (pdb *PolicySQLDB) SaveCredential(appId string, cred models.Credential) err return err } func (pdb *PolicySQLDB) DeleteCredential(appId string) error { - query := "DELETE FROM credentials WHERE id = $1" + query := pdb.sqldb.Rebind("DELETE FROM credentials WHERE id =?") _, err := pdb.sqldb.Exec(query, appId) if err != nil { pdb.logger.Error("failed-to-delete-custom-metric-credential", err, lager.Data{"query": query, "appId": appId}) diff --git a/src/autoscaler/db/sqldb/policy_sqldb_test.go b/src/autoscaler/db/sqldb/policy_sqldb_test.go index 0fd1f898a..41fed1546 100644 --- a/src/autoscaler/db/sqldb/policy_sqldb_test.go +++ b/src/autoscaler/db/sqldb/policy_sqldb_test.go @@ -5,9 +5,9 @@ import ( . "autoscaler/db/sqldb" "autoscaler/models" "database/sql" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -62,12 +62,20 @@ var _ = Describe("PolicySQLDB", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when db url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) @@ -207,6 +215,10 @@ var _ = Describe("PolicySQLDB", func() { insertPolicy("second-app-id", scalingPolicy) insertPolicy("third-app-id", scalingPolicy) policies, err = pdb.RetrievePolicies() + for i, policy := range policies { + policy.PolicyStr = formatPolicyString(policy.PolicyStr) + policies[i] = policy + } }) Context("when retriving all the policies", func() { @@ -265,7 +277,7 @@ var _ = Describe("PolicySQLDB", func() { }) It("saves the policy", func() { Expect(err).NotTo(HaveOccurred()) - Expect(getAppPolicy("an-app-id")).To(Equal(policyJsonStr)) + Expect(formatPolicyString(getAppPolicy("an-app-id"))).To(Equal(formatPolicyString(policyJsonStr))) }) }) @@ -298,7 +310,7 @@ var _ = Describe("PolicySQLDB", func() { }) It("updates the policy", func() { Expect(err).NotTo(HaveOccurred()) - Expect(getAppPolicy("an-app-id")).To(Equal(policyJsonStr)) + Expect(formatPolicyString(getAppPolicy("an-app-id"))).To(Equal(formatPolicyString(policyJsonStr))) }) }) }) diff --git a/src/autoscaler/db/sqldb/scalingengine_sqldb.go b/src/autoscaler/db/sqldb/scalingengine_sqldb.go index c98f2fc43..8c01f8b8c 100644 --- a/src/autoscaler/db/sqldb/scalingengine_sqldb.go +++ b/src/autoscaler/db/sqldb/scalingengine_sqldb.go @@ -6,6 +6,8 @@ import ( "code.cloudfoundry.org/lager" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "database/sql" "time" @@ -14,11 +16,16 @@ import ( type ScalingEngineSQLDB struct { dbConfig db.DatabaseConfig logger lager.Logger - sqldb *sql.DB + sqldb *sqlx.DB } func NewScalingEngineSQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*ScalingEngineSQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + + sqldb, err := sqlx.Open(database.DriverName, database.DSN) if err != nil { logger.Error("open-scaling-engine-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err @@ -51,9 +58,9 @@ func (sdb *ScalingEngineSQLDB) Close() error { } func (sdb *ScalingEngineSQLDB) SaveScalingHistory(history *models.AppScalingHistory) error { - query := "INSERT INTO scalinghistory" + - "(appid, timestamp, scalingtype, status, oldinstances, newinstances, reason, message, error) " + - " VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)" + query := sdb.sqldb.Rebind("INSERT INTO scalinghistory" + + "(appid, timestamp, scalingtype, status, oldinstances, newinstances, reason, message, error) " + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)") _, err := sdb.sqldb.Exec(query, history.AppId, history.Timestamp, history.ScalingType, history.Status, history.OldInstances, history.NewInstances, history.Reason, history.Message, history.Error) @@ -71,11 +78,11 @@ func (sdb *ScalingEngineSQLDB) RetrieveScalingHistories(appId string, start int6 orderStr = db.ASCSTR } - query := "SELECT timestamp, scalingtype, status, oldinstances, newinstances, reason, message, error FROM scalinghistory WHERE" + - " appid = $1 " + - " AND timestamp >= $2" + - " AND timestamp <= $3" + - " ORDER BY timestamp " + orderStr + query := sdb.sqldb.Rebind("SELECT timestamp, scalingtype, status, oldinstances, newinstances, reason, message, error FROM scalinghistory WHERE" + + " appid = ? " + + " AND timestamp >= ?" + + " AND timestamp <= ?" + + " ORDER BY timestamp " + orderStr) if end < 0 { end = time.Now().UnixNano() @@ -121,7 +128,7 @@ func (sdb *ScalingEngineSQLDB) RetrieveScalingHistories(appId string, start int6 } func (sdb *ScalingEngineSQLDB) PruneScalingHistories(before int64) error { - query := "DELETE FROM scalinghistory WHERE timestamp <= $1" + query := sdb.sqldb.Rebind("DELETE FROM scalinghistory WHERE timestamp <= ?") _, err := sdb.sqldb.Exec(query, before) if err != nil { sdb.logger.Error("failed-prune-scaling-histories-from-scalinghistory-table", err, lager.Data{"query": query, "before": before}) @@ -130,7 +137,7 @@ func (sdb *ScalingEngineSQLDB) PruneScalingHistories(before int64) error { } func (sdb *ScalingEngineSQLDB) CanScaleApp(appId string) (bool, int64, error) { - query := "SELECT expireat FROM scalingcooldown WHERE appid = $1" + query := sdb.sqldb.Rebind("SELECT expireat FROM scalingcooldown WHERE appid = ?") rows, err := sdb.sqldb.Query(query, appId) if err != nil { sdb.logger.Error("can-scale-app-query-record", err, lager.Data{"query": query, "appid": appId}) @@ -154,13 +161,13 @@ func (sdb *ScalingEngineSQLDB) CanScaleApp(appId string) (bool, int64, error) { } func (sdb *ScalingEngineSQLDB) UpdateScalingCooldownExpireTime(appId string, expireAt int64) error { - _, err := sdb.sqldb.Exec("DELETE FROM scalingcooldown WHERE appid = $1", appId) + _, err := sdb.sqldb.Exec(sdb.sqldb.Rebind("DELETE FROM scalingcooldown WHERE appid = ?"), appId) if err != nil { sdb.logger.Error("update-scaling-cooldown-time-delete", err, lager.Data{"appid": appId}) return err } - _, err = sdb.sqldb.Exec("INSERT INTO scalingcooldown(appid, expireat) values($1, $2)", appId, expireAt) + _, err = sdb.sqldb.Exec(sdb.sqldb.Rebind("INSERT INTO scalingcooldown(appid, expireat) values(?, ?)"), appId, expireAt) if err != nil { sdb.logger.Error("update-scaling-cooldown-time-insert", err, lager.Data{"appid": appId, "expireAt": expireAt}) return err @@ -169,8 +176,8 @@ func (sdb *ScalingEngineSQLDB) UpdateScalingCooldownExpireTime(appId string, exp } func (sdb *ScalingEngineSQLDB) GetActiveSchedule(appId string) (*models.ActiveSchedule, error) { - query := "SELECT scheduleid, instancemincount, instancemaxcount, initialmininstancecount" + - " FROM activeschedule WHERE appid = $1" + query := sdb.sqldb.Rebind("SELECT scheduleid, instancemincount, instancemaxcount, initialmininstancecount" + + " FROM activeschedule WHERE appid = ?") var scheduleId string var instanceMin, instanceMax, instanceMinInitial int @@ -214,7 +221,7 @@ func (sdb *ScalingEngineSQLDB) GetActiveSchedules() (map[string]string, error) { } func (sdb *ScalingEngineSQLDB) RemoveActiveSchedule(appId string) error { - query := "DELETE FROM activeschedule WHERE appid = $1" + query := sdb.sqldb.Rebind("DELETE FROM activeschedule WHERE appid = ?") _, err := sdb.sqldb.Exec(query, appId) if err != nil { sdb.logger.Error("failed-remove-active-scheudle", err, lager.Data{"appid": appId}) @@ -229,8 +236,8 @@ func (sdb *ScalingEngineSQLDB) SetActiveSchedule(appId string, schedule *models. return err } - query := "INSERT INTO activeschedule(appid, scheduleid, instancemincount, instancemaxcount, initialmininstancecount) " + - " VALUES ($1, $2, $3, $4, $5)" + query := sdb.sqldb.Rebind("INSERT INTO activeschedule(appid, scheduleid, instancemincount, instancemaxcount, initialmininstancecount) " + + " VALUES (?, ?, ?, ?, ?)") _, err = sdb.sqldb.Exec(query, appId, schedule.ScheduleId, schedule.InstanceMin, schedule.InstanceMax, schedule.InstanceMinInitial) if err != nil { diff --git a/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go b/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go index cee877c13..2b7c1f73b 100644 --- a/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go +++ b/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go @@ -4,9 +4,9 @@ import ( "autoscaler/db" . "autoscaler/db/sqldb" "autoscaler/models" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -60,12 +60,20 @@ var _ = Describe("ScalingEngineSqldb", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/db/sqldb/scheduler_sqldb.go b/src/autoscaler/db/sqldb/scheduler_sqldb.go index f9bda65f2..fce4b7537 100644 --- a/src/autoscaler/db/sqldb/scheduler_sqldb.go +++ b/src/autoscaler/db/sqldb/scheduler_sqldb.go @@ -6,6 +6,7 @@ import ( "code.cloudfoundry.org/lager" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" "database/sql" "strconv" @@ -18,7 +19,11 @@ type SchedulerSQLDB struct { } func NewSchedulerSQLDB(dbConfig db.DatabaseConfig, logger lager.Logger) (*SchedulerSQLDB, error) { - sqldb, err := sql.Open(db.PostgresDriverName, dbConfig.URL) + database, err := db.GetConnection(dbConfig.URL) + if err != nil { + return nil, err + } + sqldb, err := sql.Open(database.DriverName, database.DSN) if err != nil { logger.Error("failed-open-scheduler-db", err, lager.Data{"dbConfig": dbConfig}) return nil, err diff --git a/src/autoscaler/db/sqldb/scheduler_sqldb_test.go b/src/autoscaler/db/sqldb/scheduler_sqldb_test.go index ebbeb6b3d..4060ac4d2 100644 --- a/src/autoscaler/db/sqldb/scheduler_sqldb_test.go +++ b/src/autoscaler/db/sqldb/scheduler_sqldb_test.go @@ -4,9 +4,9 @@ import ( "autoscaler/db" . "autoscaler/db/sqldb" "autoscaler/models" - - "code.cloudfoundry.org/lager" "github.com/lib/pq" + "github.com/go-sql-driver/mysql" + "code.cloudfoundry.org/lager" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -49,12 +49,20 @@ var _ = Describe("SchedulerSqldb", func() { BeforeEach(func() { dbConfig.URL = "postgres://not-exist-user:not-exist-password@localhost/autoscaler?sslmode=disable" }) - It("should error", func() { + It("should throw an error", func() { Expect(err).To(BeAssignableToTypeOf(&pq.Error{})) }) - }) + Context("when mysql db url is not correct", func() { + BeforeEach(func() { + dbConfig.URL = "not-exist-user:not-exist-password@tcp(localhost)/autoscaler?tls=false" + }) + It("should throw an error", func() { + Expect(err).To(BeAssignableToTypeOf(&mysql.MySQLError{})) + }) + }) + Context("when url is correct", func() { It("should not error", func() { Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/db/sqldb/sqldb_suite_test.go b/src/autoscaler/db/sqldb/sqldb_suite_test.go index f5743c8b9..18daf960f 100644 --- a/src/autoscaler/db/sqldb/sqldb_suite_test.go +++ b/src/autoscaler/db/sqldb/sqldb_suite_test.go @@ -13,11 +13,13 @@ import ( "autoscaler/models" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/jmoiron/sqlx" ) -var dbHelper *sql.DB +var dbHelper *sqlx.DB func TestSqldb(t *testing.T) { RegisterFailHandler(Fail) @@ -31,8 +33,12 @@ var _ = BeforeSuite(func() { if dbUrl == "" { Fail("environment variable $DBURL is not set") } + database, err := db.GetConnection(dbUrl) + if err != nil { + Fail("failed to parse database connection: "+ err.Error()) + } - dbHelper, e = sql.Open(db.PostgresDriverName, dbUrl) + dbHelper, e = sqlx.Open(database.DriverName, database.DSN) if e != nil { Fail("can not connect database: " + e.Error()) } @@ -62,7 +68,7 @@ func cleanInstanceMetricsTable() { } func hasInstanceMetric(appId string, index int, name string, timestamp int64) bool { - query := "SELECT * FROM appinstancemetrics WHERE appid = $1 AND instanceindex = $2 AND name = $3 AND timestamp = $4" + query := dbHelper.Rebind("SELECT * FROM appinstancemetrics WHERE appid = ? AND instanceindex = ? AND name = ? AND timestamp = ?") rows, e := dbHelper.Query(query, appId, index, name, timestamp) if e != nil { Fail("can not query table appinstancemetrics: " + e.Error()) @@ -95,7 +101,7 @@ func cleanServiceInstanceTable() { } func hasServiceInstance(serviceInstanceId string) bool { - query := "SELECT * FROM service_instance WHERE service_instance_id = $1 " + query := dbHelper.Rebind("SELECT * FROM service_instance WHERE service_instance_id = ?") rows, e := dbHelper.Query(query, serviceInstanceId) if e != nil { Fail("can not query table service_instance: " + e.Error()) @@ -105,7 +111,7 @@ func hasServiceInstance(serviceInstanceId string) bool { } func hasServiceBinding(bindingId string, serviceInstanceId string) bool { - query := "SELECT * FROM binding WHERE binding_id = $1 AND service_instance_id = $2 " + query := dbHelper.Rebind("SELECT * FROM binding WHERE binding_id = ? AND service_instance_id = ? ") rows, e := dbHelper.Query(query, bindingId, serviceInstanceId) if e != nil { Fail("can not query table binding: " + e.Error()) @@ -127,7 +133,7 @@ func insertPolicy(appId string, scalingPolicy *models.ScalingPolicy) { Fail("failed to marshall scaling policy" + e.Error()) } - query := "INSERT INTO policy_json(app_id, policy_json, guid) VALUES($1, $2, $3)" + query := dbHelper.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) VALUES(?, ?, ?)") _, e = dbHelper.Exec(query, appId, string(policyJson), "1234") if e != nil { @@ -136,7 +142,7 @@ func insertPolicy(appId string, scalingPolicy *models.ScalingPolicy) { } func getAppPolicy(appId string) string { - query := "SELECT policy_json FROM policy_json WHERE app_id=$1 " + query := dbHelper.Rebind("SELECT policy_json FROM policy_json WHERE app_id=? ") rows, err := dbHelper.Query(query, appId) if err != nil { Fail("failed to get policy" + err.Error()) @@ -160,7 +166,7 @@ func cleanAppMetricTable() { } func hasAppMetric(appId, metricType string, timestamp int64, value string) bool { - query := "SELECT * FROM app_metric WHERE app_id = $1 AND metric_type = $2 AND timestamp = $3 AND value = $4" + query := dbHelper.Rebind("SELECT * FROM app_metric WHERE app_id = ? AND metric_type = ? AND timestamp = ? AND value = ?") rows, e := dbHelper.Query(query, appId, metricType, timestamp, value) if e != nil { Fail("can not query table app_metric: " + e.Error()) @@ -186,7 +192,7 @@ func cleanScalingHistoryTable() { } func hasScalingHistory(appId string, timestamp int64) bool { - query := "SELECT * FROM scalinghistory WHERE appid = $1 AND timestamp = $2" + query := dbHelper.Rebind("SELECT * FROM scalinghistory WHERE appid = ? AND timestamp = ?") rows, e := dbHelper.Query(query, appId, timestamp) if e != nil { Fail("can not query table scalinghistory: " + e.Error()) @@ -212,7 +218,7 @@ func cleanScalingCooldownTable() { } func hasScalingCooldownRecord(appId string, expireAt int64) bool { - query := "SELECT * FROM scalingcooldown WHERE appid = $1 AND expireat = $2" + query := dbHelper.Rebind("SELECT * FROM scalingcooldown WHERE appid = ? AND expireat = ?") rows, e := dbHelper.Query(query, appId, expireAt) if e != nil { Fail("can not query table scalingcooldown: " + e.Error()) @@ -231,8 +237,8 @@ func cleanActiveScheduleTable() error { } func insertActiveSchedule(appId, scheduleId string, instanceMin, instanceMax, instanceMinInitial int) error { - query := "INSERT INTO activeschedule(appid, scheduleid, instancemincount, instancemaxcount, initialmininstancecount) " + - " VALUES ($1, $2, $3, $4, $5)" + query := dbHelper.Rebind("INSERT INTO activeschedule(appid, scheduleid, instancemincount, instancemaxcount, initialmininstancecount) " + + " VALUES (?, ?, ?, ?, ?)") _, e := dbHelper.Exec(query, appId, scheduleId, instanceMin, instanceMax, instanceMinInitial) return e } @@ -246,12 +252,12 @@ func insertSchedulerActiveSchedule(id int, appId string, startJobIdentifier int, var e error var query string if instanceMinInitial <= 0 { - query = "INSERT INTO app_scaling_active_schedule(id, app_id, start_job_identifier, instance_min_count, instance_max_count) " + - " VALUES ($1, $2, $3, $4, $5)" + query = dbHelper.Rebind("INSERT INTO app_scaling_active_schedule(id, app_id, start_job_identifier, instance_min_count, instance_max_count) " + + " VALUES (?, ?, ?, ?, ?)") _, e = dbHelper.Exec(query, id, appId, startJobIdentifier, instanceMin, instanceMax) } else { - query = "INSERT INTO app_scaling_active_schedule(id, app_id, start_job_identifier, instance_min_count, instance_max_count, initial_min_instance_count) " + - " VALUES ($1, $2, $3, $4, $5, $6)" + query = dbHelper.Rebind("INSERT INTO app_scaling_active_schedule(id, app_id, start_job_identifier, instance_min_count, instance_max_count, initial_min_instance_count) " + + " VALUES (?, ?, ?, ?, ?, ?)") _, e = dbHelper.Exec(query, id, appId, startJobIdentifier, instanceMin, instanceMax, instanceMinInitial) } return e @@ -262,13 +268,13 @@ func insertCredential(appid string, username string, password string) error { var err error var query string - query = "INSERT INTO credentials(id, username, password, updated_at) values($1, $2, $3, $4)" + query = dbHelper.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") _, err = dbHelper.Exec(query, appid, username, password, "2011-05-18 15:36:38") return err } func getCredential(appId string) (string, string, error) { - query := "SELECT username,password FROM credentials WHERE id=$1 " + query := dbHelper.Rebind("SELECT username,password FROM credentials WHERE id=? ") rows, err := dbHelper.Query(query, appId) if err != nil { Fail("failed to get credential" + err.Error()) @@ -284,7 +290,7 @@ func getCredential(appId string) (string, string, error) { return username, password, nil } func hasCredential(appId string) bool { - query := "SELECT * FROM credentials WHERE id=$1" + query := dbHelper.Rebind("SELECT * FROM credentials WHERE id=?") rows, e := dbHelper.Query(query, appId) if e != nil { Fail("can not query table credentials: " + e.Error()) @@ -301,7 +307,7 @@ func cleanCredentialTable() error { } func insertLockDetails(lock *models.Lock) (sql.Result, error) { - query := "INSERT INTO test_lock (owner,lock_timestamp,ttl) VALUES ($1,$2,$3)" + query := dbHelper.Rebind("INSERT INTO test_lock (owner,lock_timestamp,ttl) VALUES (?,?,?)") result, err := dbHelper.Exec(query, lock.Owner, lock.LastModifiedTimestamp, int64(lock.Ttl/time.Second)) return result, err } @@ -342,7 +348,7 @@ func validateLockInDB(ownerid string, expectedLock *models.Lock) error { ttl time.Duration owner string ) - query := "SELECT owner,lock_timestamp,ttl FROM test_lock WHERE owner=$1" + query := dbHelper.Rebind("SELECT owner,lock_timestamp,ttl FROM test_lock WHERE owner=?") row := dbHelper.QueryRow(query, ownerid) err := row.Scan(&owner, ×tamp, &ttl) if err != nil { @@ -366,7 +372,7 @@ func validateLockNotInDB(owner string) error { timestamp time.Time ttl time.Duration ) - query := "SELECT owner,lock_timestamp,ttl FROM test_lock WHERE owner=$1" + query := dbHelper.Rebind("SELECT owner,lock_timestamp,ttl FROM test_lock WHERE owner=?") row := dbHelper.QueryRow(query, owner) err := row.Scan(&owner, ×tamp, &ttl) if err != nil { @@ -377,3 +383,18 @@ func validateLockNotInDB(owner string) error { } return fmt.Errorf("lock exists with owner (%s)", owner) } + +func formatPolicyString(policyStr string) string { + scalingPolicy := &models.ScalingPolicy{} + err := json.Unmarshal([]byte(policyStr),&scalingPolicy) + if err != nil { + fmt.Errorf("failed to unmarshal policyJson string %s", policyStr) + return "" + } + policyJsonStr, err := json.Marshal(scalingPolicy) + if err != nil { + fmt.Errorf("failed to marshal ScalingPolicy %v", scalingPolicy) + return "" + } + return string(policyJsonStr) +} diff --git a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go index 82ea61e52..f844e61eb 100644 --- a/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go +++ b/src/autoscaler/eventgenerator/cmd/eventgenerator/eventgenerator_suite_test.go @@ -8,7 +8,6 @@ import ( "autoscaler/eventgenerator/config" "autoscaler/helpers" "autoscaler/models" - "database/sql" "fmt" "io/ioutil" "net/http" @@ -23,6 +22,9 @@ import ( "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "gopkg.in/yaml.v2" ) @@ -113,7 +115,10 @@ var _ = SynchronizedAfterSuite(func() { }) func initDB() { - egDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + egDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = egDB.Exec("DELETE FROM app_metric") @@ -137,7 +142,7 @@ func initDB() { } ] }`, breachDurationSecs) - query := "INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)" + query := egDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") _, err = egDB.Exec(query, testAppId, policy, "1234") Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/eventgenerator/db/dataaggregator.db.changelog.yml b/src/autoscaler/eventgenerator/db/dataaggregator.db.changelog.yml index 9dc9da2a5..a96a60581 100644 --- a/src/autoscaler/eventgenerator/db/dataaggregator.db.changelog.yml +++ b/src/autoscaler/eventgenerator/db/dataaggregator.db.changelog.yml @@ -2,6 +2,7 @@ databaseChangeLog: - changeSet: id: 1 author: qiyang + dbms: postgresql changes: - createTable: tableName: app_metric @@ -57,6 +58,7 @@ databaseChangeLog: - changeSet: id: 3 author: tanmoypal + dbms: postgres changes: - createIndex: columns: @@ -76,4 +78,60 @@ databaseChangeLog: author: byang changes: - dropTable: - tableName: eg_lock \ No newline at end of file + tableName: eg_lock + - changeSet: + id: 5 + author: aqan213 + dbms: mysql + changes: + - createTable: + tableName: app_metric + columns: + - column: + name: app_id + type: varchar(255) + constraints: + nullable: false + - column: + name: metric_type + type: varchar(100) + constraints: + nullable: false + - column: + name: unit + type: varchar(20) + constraints: + nullable: false + - column: + name: timestamp + type: bigint + constraints: + nullable: false + - column: + name: value + type: varchar(255) + constraints: + nullable: true + - addPrimaryKey: + columnNames: "app_id,metric_type,timestamp" + constraintName: "PK_appmetrics" + schemaName: autoscaler + tableName: app_metric + - changeSet: + id: 6 + author: aqan213 + dbms: mysql + changes: + - createIndex: + columns: + - column: + name: app_id + type: varchar(255) + - column: + name: metric_type + type: varchar(100) + - column: + name: timestamp + type: bigint + indexName: index_app_metrics + tableName: app_metric diff --git a/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go b/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go index b25fd02f9..2c693f500 100644 --- a/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go +++ b/src/autoscaler/metricscollector/cmd/metricscollector/metricscollector_suite_test.go @@ -1,7 +1,6 @@ package main_test import ( - "database/sql" "io/ioutil" "log" "mime/multipart" @@ -22,6 +21,8 @@ import ( "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "gopkg.in/yaml.v2" "autoscaler/cf" @@ -53,7 +54,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { mc, err := gexec.Build("autoscaler/metricscollector/cmd/metricscollector", "-race") Expect(err).NotTo(HaveOccurred()) - mcDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + mcDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = mcDB.Exec("DELETE FROM appinstancemetrics") @@ -67,7 +71,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { "instance_min_count": 1, "instance_max_count": 5 }` - query := "INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)" + query := mcDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") _, err = mcDB.Exec(query, "an-app-id", policy, "1234") Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/metricscollector/collector/app_streamer.go b/src/autoscaler/metricscollector/collector/app_streamer.go index 7c376b221..98fec002a 100644 --- a/src/autoscaler/metricscollector/collector/app_streamer.go +++ b/src/autoscaler/metricscollector/collector/app_streamer.go @@ -107,7 +107,7 @@ func (as *appStreamer) processEvent(event *events.Envelope) { } else if event.GetEventType() == events.Envelope_HttpStartStop { as.logger.Debug("process-event-get-httpstartstop-event", lager.Data{"event": event}) ss := event.GetHttpStartStop() - if ss != nil { + if ss != nil && ss.GetPeerType() == events.PeerType_Client { as.numRequests[ss.GetInstanceIndex()]++ as.sumReponseTimes[ss.GetInstanceIndex()] += (ss.GetStopTimestamp() - ss.GetStartTimestamp()) } diff --git a/src/autoscaler/metricscollector/db/metricscollector.db.changelog.yml b/src/autoscaler/metricscollector/db/metricscollector.db.changelog.yml index 7732283ab..f756470fb 100644 --- a/src/autoscaler/metricscollector/db/metricscollector.db.changelog.yml +++ b/src/autoscaler/metricscollector/db/metricscollector.db.changelog.yml @@ -7,6 +7,7 @@ databaseChangeLog: - not: - tableExists: tableName: appinstancemetrics + dbms: postgresql changes: - createTable: tableName: appinstancemetrics @@ -87,4 +88,70 @@ databaseChangeLog: author: byang changes: - dropTable: - tableName: mc_lock \ No newline at end of file + tableName: mc_lock + - changeSet: + id: 4 + author: aqan213 + preConditions: + - onFail: MARK_RAN + - not: + - tableExists: + tableName: appinstancemetrics + dbms: mysql + changes: + - createTable: + tableName: appinstancemetrics + columns: + - column: + name: appid + type: varchar(255) + constraints: + nullable: false + - column: + name: instanceindex + type: int + constraints: + nullable: false + - column: + name: collectedat + type: bigint + constraints: + nullable: false + - column: + name: name + type: varchar(100) + constraints: + nullable: false + - column: + name: unit + type: varchar(20) + constraints: + nullable: false + - column: + name: value + type: varchar(255) + constraints: + nullable: false + - column: + name: timestamp + type: bigint + constraints: + nullable: false + - addPrimaryKey: + columnNames: "appid,instanceindex,name,timestamp" + constraintName: "PK_appinstancemetrics" + schemaName: autoscaler + tableName: appinstancemetrics + - createIndex: + columns: + - column: + name: appid + type: varchar(255) + - column: + name: name + type: varchar(100) + - column: + name: timestamp + type: bigint + indexName: idx_instance_metrics + tableName: appinstancemetrics diff --git a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go index f1204acaf..c26dbb42c 100644 --- a/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go +++ b/src/autoscaler/metricsforwarder/cmd/metricsforwarder/metricsforwarder_suite_test.go @@ -1,7 +1,6 @@ package main_test import ( - "database/sql" "net/http" "path/filepath" @@ -19,6 +18,8 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "gopkg.in/yaml.v2" "autoscaler/db" @@ -51,7 +52,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { mf, err := gexec.Build("autoscaler/metricsforwarder/cmd/metricsforwarder", "-race") Expect(err).NotTo(HaveOccurred()) - policyDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + policyDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = policyDB.Exec("DELETE from policy_json") @@ -75,7 +79,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { } ] }` - query := "INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)" + query := policyDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") _, err = policyDB.Exec(query, "an-app-id", policy, "1234") username = "username" @@ -83,7 +87,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { encryptedUsername, _ := bcrypt.GenerateFromPassword([]byte(username), 8) encryptedPassword, _ := bcrypt.GenerateFromPassword([]byte(password), 8) - query = "INSERT INTO credentials(id, username, password, updated_at) values($1, $2, $3, $4)" + query = policyDB.Rebind("INSERT INTO credentials(id, username, password, updated_at) values(?, ?, ?, ?)") _, err = policyDB.Exec(query, "an-app-id", encryptedUsername, encryptedPassword, "2011-06-18 15:36:38") Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/metricsgateway/cmd/metricsgateway/metricsgateway_suite_test.go b/src/autoscaler/metricsgateway/cmd/metricsgateway/metricsgateway_suite_test.go index efd230c71..65e4f1500 100644 --- a/src/autoscaler/metricsgateway/cmd/metricsgateway/metricsgateway_suite_test.go +++ b/src/autoscaler/metricsgateway/cmd/metricsgateway/metricsgateway_suite_test.go @@ -2,7 +2,6 @@ package main_test import ( "autoscaler/models" - "database/sql" "fmt" "io/ioutil" "log" @@ -20,6 +19,7 @@ import ( "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" "google.golang.org/grpc/grpclog" + "github.com/jmoiron/sqlx" "gopkg.in/yaml.v2" "testing" @@ -56,10 +56,13 @@ var ( fakeMetricServer *ghttp.Server metricServerAddress string - testAppId = "test-app-id" - envelopes []*loggregator_v2.Envelope = []*loggregator_v2.Envelope{ - &loggregator_v2.Envelope{ + testAppId = "test-app-id" + envelopes = []*loggregator_v2.Envelope{ + { SourceId: testAppId, + DeprecatedTags: map[string]*loggregator_v2.Value{ + "peer_type": {Data: &loggregator_v2.Value_Text{Text: "Client"}}, + }, Message: &loggregator_v2.Envelope_Timer{ Timer: &loggregator_v2.Timer{ Name: "http", @@ -101,7 +104,10 @@ var _ = SynchronizedAfterSuite(func() { }) func initDB() { - mgDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + mgDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = mgDB.Exec("DELETE from policy_json") @@ -122,7 +128,7 @@ func initDB() { } ] }`) - query := "INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)" + query := mgDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") _, err = mgDB.Exec(query, testAppId, policy, "1234") Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/metricsgateway/helpers/ws_helper_test.go b/src/autoscaler/metricsgateway/helpers/ws_helper_test.go index 6bb8b0c9a..9c7785f53 100644 --- a/src/autoscaler/metricsgateway/helpers/ws_helper_test.go +++ b/src/autoscaler/metricsgateway/helpers/ws_helper_test.go @@ -105,6 +105,10 @@ var _ = Describe("WsHelper", func() { err = wsHelper.SetupConn() Expect(err).NotTo(HaveOccurred()) + err = wsHelper.Ping() + Expect(err).ShouldNot(HaveOccurred()) + Eventually(pingPongChan, 5*time.Second, 1*time.Second).Should(Receive(Equal(1))) + }) It("close the websocket connection", func() { wsHelper.CloseConn() diff --git a/src/autoscaler/metricsgateway/nozzle.go b/src/autoscaler/metricsgateway/nozzle.go index 823f464d9..435cb1539 100644 --- a/src/autoscaler/metricsgateway/nozzle.go +++ b/src/autoscaler/metricsgateway/nozzle.go @@ -121,7 +121,12 @@ func (n *Nozzle) filterEnvelopes(envelops []*loggregator_v2.Envelope) { n.envelopChan <- e } case *loggregator_v2.Envelope_Timer: - if e.GetTimer().GetName() == "http" { + peerType := e.GetTags()["peer_type"] + if peerTypeFromDeprecatedTags := e.GetDeprecatedTags()["peer_type"]; peerType == "" && peerTypeFromDeprecatedTags != nil { + peerType = peerTypeFromDeprecatedTags.GetText() + } + + if e.GetTimer().GetName() == "http" && (peerType == "" || peerType == "Client") { n.logger.Debug("filter-envelopes-get-httpstartstop", lager.Data{"index": n.index, "appID": e.SourceId, "message": e.Message}) n.envelopChan <- e } diff --git a/src/autoscaler/metricsgateway/nozzle_test.go b/src/autoscaler/metricsgateway/nozzle_test.go index ac8bb714d..a515164f5 100644 --- a/src/autoscaler/metricsgateway/nozzle_test.go +++ b/src/autoscaler/metricsgateway/nozzle_test.go @@ -110,6 +110,22 @@ var _ = Describe("Nozzle", func() { httpStartStopEnvelope = loggregator_v2.Envelope{ SourceId: testAppId, + DeprecatedTags: map[string]*loggregator_v2.Value{ + "peer_type": {Data: &loggregator_v2.Value_Text{Text: "Client"}}, + }, + Message: &loggregator_v2.Envelope_Timer{ + Timer: &loggregator_v2.Timer{ + Name: "http", + Start: 1542325492043447110, + Stop: 1542325492045491009, + }, + }, + } + serverHttpStartStopEnvelope = loggregator_v2.Envelope{ + SourceId: testAppId, + DeprecatedTags: map[string]*loggregator_v2.Value{ + "peer_type": {Data: &loggregator_v2.Value_Text{Text: "Server"}}, + }, Message: &loggregator_v2.Envelope_Timer{ Timer: &loggregator_v2.Timer{ Name: "http", @@ -255,6 +271,17 @@ var _ = Describe("Nozzle", func() { }) }) + Context("when there is a server httpstartstop timer envelope", func() { + BeforeEach(func() { + envelopes = []*loggregator_v2.Envelope{ + &serverHttpStartStopEnvelope, + } + }) + It("should not accept the envelope", func() { + Eventually(envelopChan).ShouldNot(Receive()) + }) + }) + Context("when there is non httpstartstop timer envelope", func() { BeforeEach(func() { envelopes = []*loggregator_v2.Envelope{ diff --git a/src/autoscaler/metricsserver/cmd/metricsserver/metricsserver_suite_test.go b/src/autoscaler/metricsserver/cmd/metricsserver/metricsserver_suite_test.go index fe051420e..227a8758d 100644 --- a/src/autoscaler/metricsserver/cmd/metricsserver/metricsserver_suite_test.go +++ b/src/autoscaler/metricsserver/cmd/metricsserver/metricsserver_suite_test.go @@ -1,7 +1,6 @@ package main_test import ( - "database/sql" "io/ioutil" "net/http" "os" @@ -16,6 +15,8 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "gopkg.in/yaml.v2" "autoscaler/db" @@ -44,7 +45,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { ms, err := gexec.Build("autoscaler/metricsserver/cmd/metricsserver", "-race") Expect(err).NotTo(HaveOccurred()) - msDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + msDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = msDB.Exec("DELETE FROM appinstancemetrics") @@ -58,7 +62,7 @@ var _ = SynchronizedBeforeSuite(func() []byte { "instance_min_count": 1, "instance_max_count": 5 }` - query := "INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)" + query := msDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)") _, err = msDB.Exec(query, "an-app-id", policy, "1234") Expect(err).NotTo(HaveOccurred()) diff --git a/src/autoscaler/operator/cmd/operator/operator_suite_test.go b/src/autoscaler/operator/cmd/operator/operator_suite_test.go index 0185f08a6..6aee1841e 100644 --- a/src/autoscaler/operator/cmd/operator/operator_suite_test.go +++ b/src/autoscaler/operator/cmd/operator/operator_suite_test.go @@ -15,6 +15,7 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" + _ "github.com/go-sql-driver/mysql" "gopkg.in/yaml.v2" "autoscaler/cf" @@ -199,7 +200,10 @@ func (pr *OperatorRunner) KillWithFire() { } func (pr *OperatorRunner) ClearLockDatabase() { - lockDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) + Expect(err).NotTo(HaveOccurred()) + + lockDB, err := sql.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) _, err = lockDB.Exec("DELETE FROM operator_lock") diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go index e30445e1f..72f4d87e9 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_suite_test.go @@ -12,9 +12,10 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gexec" "github.com/onsi/gomega/ghttp" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" "gopkg.in/yaml.v2" - "database/sql" "fmt" "io/ioutil" "net/http" @@ -107,19 +108,22 @@ var _ = SynchronizedBeforeSuite( configFile = writeConfig(&conf) - testDB, err := sql.Open(db.PostgresDriverName, os.Getenv("DBURL")) + database, err := db.GetConnection(os.Getenv("DBURL")) Expect(err).NotTo(HaveOccurred()) - _, err = testDB.Exec("DELETE FROM scalinghistory WHERE appid = $1", appId) + testDB, err := sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) - _, err = testDB.Exec("DELETE from policy_json WHERE app_id = $1", appId) + _, err = testDB.Exec(testDB.Rebind("DELETE FROM scalinghistory WHERE appid = ?"), appId) Expect(err).NotTo(HaveOccurred()) - _, err = testDB.Exec("DELETE from activeschedule WHERE appid = $1", appId) + _, err = testDB.Exec(testDB.Rebind("DELETE from policy_json WHERE app_id = ?"), appId) Expect(err).NotTo(HaveOccurred()) - _, err = testDB.Exec("DELETE from app_scaling_active_schedule WHERE app_id = $1", appId) + _, err = testDB.Exec(testDB.Rebind("DELETE from activeschedule WHERE appid = ?"), appId) + Expect(err).NotTo(HaveOccurred()) + + _, err = testDB.Exec(testDB.Rebind("DELETE from app_scaling_active_schedule WHERE app_id = ?"), appId) Expect(err).NotTo(HaveOccurred()) policy := ` @@ -127,7 +131,7 @@ var _ = SynchronizedBeforeSuite( "instance_min_count": 1, "instance_max_count": 5 }` - _, err = testDB.Exec("INSERT INTO policy_json(app_id, policy_json, guid) values($1, $2, $3)", appId, policy, "1234") + _, err = testDB.Exec(testDB.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) values(?, ?, ?)"), appId, policy, "1234") Expect(err).NotTo(HaveOccurred()) err = testDB.Close() diff --git a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go index 9ebf38a01..b228d5037 100644 --- a/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go +++ b/src/autoscaler/scalingengine/cmd/scalingengine/scalingengine_test.go @@ -190,7 +190,7 @@ var _ = Describe("Main", func() { rsp, err = httpClient.Do(req) Expect(err).NotTo(HaveOccurred()) - Expect(rsp.StatusCode).To(Equal(http.StatusNoContent)) + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) rsp.Body.Close() }) }) diff --git a/src/autoscaler/scalingengine/db/scalingengine.db.changelog.yml b/src/autoscaler/scalingengine/db/scalingengine.db.changelog.yml index e491029fc..017184990 100644 --- a/src/autoscaler/scalingengine/db/scalingengine.db.changelog.yml +++ b/src/autoscaler/scalingengine/db/scalingengine.db.changelog.yml @@ -2,6 +2,7 @@ databaseChangeLog: - changeSet: id: 1 author: byang + dbms: postgresql changes: - createTable: tableName: scalinghistory @@ -67,6 +68,7 @@ databaseChangeLog: - changeSet: id: 3 author: byang + dbms: postgresql changes: - createTable: tableName: activeschedule @@ -125,4 +127,121 @@ databaseChangeLog: type: bigint defaultValue: 0 constraints: - nullable: false \ No newline at end of file + nullable: false + - changeSet: + id: 5 + author: aqan213 + dbms: mysql + changes: + - createTable: + tableName: scalinghistory + columns: + - column: + name: appid + type: varchar(255) + constraints: + nullable: false + - column: + name: timestamp + type: bigint + constraints: + nullable: false + - column: + name: scalingtype + type: int + constraints: + nullable: false + - column: + name: status + type: int + constraints: + nullable: false + - column: + name: oldinstances + type: int + constraints: + nullable: false + - column: + name: newinstances + type: int + constraints: + nullable: false + - column: + name: reason + type: varchar(255) + constraints: + nullable: false + - column: + name: message + type: varchar(255) + constraints: + nullable: true + - column: + name: error + type: varchar(255) + constraints: + nullable: true + - addPrimaryKey: + columnNames: "appid,timestamp" + constraintName: "PK_scalinghistory" + schemaName: autoscaler + tableName: scalinghistory + - createTable: + tableName: scalingcooldown + columns: + - column: + name: appid + type: varchar(255) + constraints: + nullable: false + - column: + name: expireat + type: bigint + constraints: + nullable: false + - addPrimaryKey: + columnNames: "appid,expireat" + constraintName: "PK_scalingcooldown" + schemaName: autoscaler + tableName: scalingcooldown + - changeSet: + id: 6 + author: aqan213 + dbms: mysql + changes: + - createTable: + tableName: activeschedule + columns: + - column: + name: scheduleid + type: varchar(255) + constraints: + nullable: false + - column: + name: appid + type: varchar(250) + constraints: + primaryKey: true + nullable: false + - column: + name: createdat + type: timestamp + constraints: + nullable: false + defaultValueComputed: now() + - column: + name: instancemincount + type: integer + constraints: + nullable: false + - column: + name: instancemaxcount + type: integer + constraints: + nullable: false + - column: + name: initialmininstancecount + type: integer + constraints: + nullable: false + diff --git a/src/autoscaler/scalingengine/scalingengine.go b/src/autoscaler/scalingengine/scalingengine.go index 4979081e9..9a211fda3 100644 --- a/src/autoscaler/scalingengine/scalingengine.go +++ b/src/autoscaler/scalingengine/scalingengine.go @@ -33,6 +33,8 @@ type scalingEngine struct { type ActiveScheduleNotFoundError struct { } +type AppNotFoundError struct { +} func (ase *ActiveScheduleNotFoundError) Error() string { return fmt.Sprintf("active schedule not found") @@ -306,9 +308,8 @@ func (s *scalingEngine) RemoveActiveSchedule(appId string, scheduleId string) er } if (currentSchedule == nil) || (currentSchedule.ScheduleId != scheduleId) { - err = &ActiveScheduleNotFoundError{} - logger.Error("failed-to-remove-active-schedule", err) - return err + logger.Info("active-schedule-not-found", lager.Data{"appId": appId, "scheduleId": scheduleId}) + return nil } err = s.scalingEngineDB.RemoveActiveSchedule(appId) @@ -330,6 +331,11 @@ func (s *scalingEngine) RemoveActiveSchedule(appId string, scheduleId string) er appEntity, err := s.cfClient.GetApp(appId) if err != nil { + if _, ok := err.(*models.AppNotFoundErr); ok { + logger.Info("app-not-found", lager.Data{"appId": appId}) + history.Status = models.ScalingStatusIgnored + return nil + } logger.Error("failed-to-get-app-info", err) history.Status = models.ScalingStatusFailed history.Error = "failed to get app info: " + err.Error() diff --git a/src/autoscaler/scalingengine/scalingengine_test.go b/src/autoscaler/scalingengine/scalingengine_test.go index 32f518651..02fa166b2 100644 --- a/src/autoscaler/scalingengine/scalingengine_test.go +++ b/src/autoscaler/scalingengine/scalingengine_test.go @@ -967,8 +967,8 @@ var _ = Describe("ScalingEngine", func() { BeforeEach(func() { scalingEngineDB.GetActiveScheduleReturns(nil, nil) }) - It("should error", func() { - Expect(err).To(BeAssignableToTypeOf(&ActiveScheduleNotFoundError{})) + It("should not have any error", func() { + Expect(err).NotTo(HaveOccurred()) Expect(scalingEngineDB.RemoveActiveScheduleCallCount()).To(BeZero()) }) }) @@ -977,8 +977,8 @@ var _ = Describe("ScalingEngine", func() { BeforeEach(func() { scalingEngineDB.GetActiveScheduleReturns(&models.ActiveSchedule{ScheduleId: "a-different-schedule-id"}, nil) }) - It("should error", func() { - Expect(err).To(BeAssignableToTypeOf(&ActiveScheduleNotFoundError{})) + It("should not have any error", func() { + Expect(err).NotTo(HaveOccurred()) Expect(scalingEngineDB.RemoveActiveScheduleCallCount()).To(BeZero()) }) }) diff --git a/src/autoscaler/scalingengine/server/scaling_handler.go b/src/autoscaler/scalingengine/server/scaling_handler.go index e8a03274f..6cba52007 100644 --- a/src/autoscaler/scalingengine/server/scaling_handler.go +++ b/src/autoscaler/scalingengine/server/scaling_handler.go @@ -220,22 +220,13 @@ func (h *ScalingHandler) RemoveActiveSchedule(w http.ResponseWriter, r *http.Req if err != nil { logger.Error("failed-to-remove-active-schedule", err) - switch err.(type) { - case *scalingengine.ActiveScheduleNotFoundError: - handlers.WriteJSONResponse(w, http.StatusNotFound, models.ErrorResponse{ - Code: "Not-Found", - Message: "Active schedule not found", - }) - default: - handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ - Code: "Interal-Server-Error", - Message: "Error removing active schedule"}) - - } + handlers.WriteJSONResponse(w, http.StatusInternalServerError, models.ErrorResponse{ + Code: "Interal-Server-Error", + Message: "Error removing active schedule"}) return } - w.WriteHeader(http.StatusNoContent) + w.WriteHeader(http.StatusOK) } func (h *ScalingHandler) GetActiveSchedule(w http.ResponseWriter, r *http.Request, vars map[string]string) { diff --git a/src/autoscaler/scalingengine/server/scaling_handler_test.go b/src/autoscaler/scalingengine/server/scaling_handler_test.go index 2ed9822be..4dd516343 100644 --- a/src/autoscaler/scalingengine/server/scaling_handler_test.go +++ b/src/autoscaler/scalingengine/server/scaling_handler_test.go @@ -4,7 +4,6 @@ import ( "autoscaler/db" "autoscaler/fakes" "autoscaler/models" - "autoscaler/scalingengine" . "autoscaler/scalingengine/server" "code.cloudfoundry.org/lager/lagertest" @@ -514,26 +513,8 @@ var _ = Describe("ScalingHandler", func() { }) Context("when removing active schedule succeeds", func() { - It("returns 204", func() { - Expect(resp.Code).To(Equal(http.StatusNoContent)) - }) - }) - - Context("when active schedule is not found", func() { - BeforeEach(func() { - scalingEngine.RemoveActiveScheduleReturns(&scalingengine.ActiveScheduleNotFoundError{}) - }) - - It("returns 404", func() { - Expect(resp.Code).To(Equal(http.StatusNotFound)) - - errJson := &models.ErrorResponse{} - err = json.Unmarshal(resp.Body.Bytes(), errJson) - Expect(err).ToNot(HaveOccurred()) - Expect(errJson).To(Equal(&models.ErrorResponse{ - Code: "Not-Found", - Message: "Active schedule not found", - })) + It("returns 200", func() { + Expect(resp.Code).To(Equal(http.StatusOK)) }) }) diff --git a/src/autoscaler/scalingengine/server/server_test.go b/src/autoscaler/scalingengine/server/server_test.go index a513a3edf..1ae67aa32 100644 --- a/src/autoscaler/scalingengine/server/server_test.go +++ b/src/autoscaler/scalingengine/server/server_test.go @@ -190,9 +190,9 @@ var _ = Describe("Server", func() { method = http.MethodDelete }) Context("when requesting correctly", func() { - It("should return 204", func() { + It("should return 200", func() { Expect(err).ToNot(HaveOccurred()) - Expect(rsp.StatusCode).To(Equal(http.StatusNoContent)) + Expect(rsp.StatusCode).To(Equal(http.StatusOK)) rsp.Body.Close() }) }) diff --git a/src/autoscaler/sync/sync_suite_test.go b/src/autoscaler/sync/sync_suite_test.go index da0f5c998..ec24e8b41 100644 --- a/src/autoscaler/sync/sync_suite_test.go +++ b/src/autoscaler/sync/sync_suite_test.go @@ -9,6 +9,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + _ "github.com/go-sql-driver/mysql" "testing" ) @@ -30,7 +31,12 @@ var _ = BeforeSuite(func() { Fail("environment variable $DBURL is not set") } - dbHelper, e = sql.Open(db.PostgresDriverName, dbUrl) + database, e := db.GetConnection(dbUrl) + if e != nil { + Fail("failed to get database URL and drivername: " + e.Error()) + } + + dbHelper, e = sql.Open(database.DriverName, database.DSN) if e != nil { Fail("can not connect database: " + e.Error()) } diff --git a/src/autoscaler/testhelpers/websocket_handler.go b/src/autoscaler/testhelpers/websocket_handler.go index 370e1a89e..48c2efe32 100644 --- a/src/autoscaler/testhelpers/websocket_handler.go +++ b/src/autoscaler/testhelpers/websocket_handler.go @@ -30,7 +30,10 @@ func NewWebsocketHandler(m chan []byte, pingPongChan chan int, keepAlive time.Du func (h *WebsocketHandler) CloseWSConnection() { h.conLock.Lock() defer h.conLock.Unlock() - h.wsConn.Close() + if h.wsConn != nil { + h.wsConn.Close() + + } } func (h *WebsocketHandler) ServeWebsocket(rw http.ResponseWriter, r *http.Request) { upgrader := websocket.Upgrader{ diff --git a/src/github.com/go-sql-driver/mysql b/src/github.com/go-sql-driver/mysql new file mode 160000 index 000000000..b66d043e6 --- /dev/null +++ b/src/github.com/go-sql-driver/mysql @@ -0,0 +1 @@ +Subproject commit b66d043e6c8986ca01241b990326db395f9c0afd diff --git a/src/github.com/jmoiron/sqlx b/src/github.com/jmoiron/sqlx new file mode 160000 index 000000000..2ba0fc60e --- /dev/null +++ b/src/github.com/jmoiron/sqlx @@ -0,0 +1 @@ +Subproject commit 2ba0fc60eb4a54030f3a6d73ff0a047349c7eeca diff --git a/src/integration/components.go b/src/integration/components.go index cd9e82336..d6911d341 100644 --- a/src/integration/components.go +++ b/src/integration/components.go @@ -21,10 +21,12 @@ import ( "os/exec" "path/filepath" "time" + "strings" . "github.com/onsi/gomega" "github.com/tedsuo/ifrit/ginkgomon" yaml "gopkg.in/yaml.v2" + "github.com/go-sql-driver/mysql" ) const ( @@ -286,25 +288,43 @@ func (components *Components) PrepareGolangApiServerConfig(dbURI string, publicA } func (components *Components) PrepareSchedulerConfig(dbUri string, scalingEngineUri string, tmpDir string, httpClientTimeout time.Duration) string { - dbUrl, _ := url.Parse(dbUri) - scheme := dbUrl.Scheme - host := dbUrl.Host - path := dbUrl.Path - userInfo := dbUrl.User - userName := userInfo.Username() - password, _ := userInfo.Password() - if scheme == "postgres" { - scheme = "postgresql" + var ( + driverClassName string + userName string + password string + jdbcDBUri string + ) + if strings.Contains(dbUri, "postgres") { + dbUrl, _ := url.Parse(dbUri) + scheme := dbUrl.Scheme + host := dbUrl.Host + path := dbUrl.Path + userInfo := dbUrl.User + userName = userInfo.Username() + password, _ = userInfo.Password() + if scheme == "postgres" { + scheme = "postgresql" + } + jdbcDBUri = fmt.Sprintf("jdbc:%s://%s%s", scheme, host, path) + driverClassName = "org.postgresql.Driver" + }else { + cfg, _ := mysql.ParseDSN(dbUri) + scheme := "mysql" + host := cfg.Addr + path := cfg.DBName + userName = cfg.User + password = cfg.Passwd + jdbcDBUri = fmt.Sprintf("jdbc:%s://%s/%s", scheme, host, path) + driverClassName = "com.mysql.cj.jdbc.Driver" } - jdbcDBUri := fmt.Sprintf("jdbc:%s://%s%s", scheme, host, path) - settingStrTemplate := ` + settingStrTemplate := ` #datasource for application and quartz -spring.datasource.driverClassName=org.postgresql.Driver +spring.datasource.driverClassName=%s spring.datasource.url=%s spring.datasource.username=%s spring.datasource.password=%s #policy db -spring.policyDbDataSource.driverClassName=org.postgresql.Driver +spring.policyDbDataSource.driverClassName=%s spring.policyDbDataSource.url=%s spring.policyDbDataSource.username=%s spring.policyDbDataSource.password=%s @@ -343,7 +363,7 @@ spring.aop.auto=false endpoints.enabled=false spring.data.jpa.repositories.enabled=false ` - settingJsonStr := fmt.Sprintf(settingStrTemplate, jdbcDBUri, userName, password, jdbcDBUri, userName, password, scalingEngineUri, testCertDir, testCertDir, testCertDir, testCertDir, components.Ports[Scheduler], components.Ports[Scheduler], int(httpClientTimeout/time.Second)) + settingJsonStr := fmt.Sprintf(settingStrTemplate, driverClassName, jdbcDBUri, userName, password, driverClassName, jdbcDBUri, userName, password, scalingEngineUri, testCertDir, testCertDir, testCertDir, testCertDir, components.Ports[Scheduler], components.Ports[Scheduler], int(httpClientTimeout/time.Second)) cfgFile, err := os.Create(filepath.Join(tmpDir, "application.properties")) Expect(err).NotTo(HaveOccurred()) ioutil.WriteFile(cfgFile.Name(), []byte(settingJsonStr), 0777) diff --git a/src/integration/integration_golangapi_eventgenerator_test.go b/src/integration/integration_golangapi_eventgenerator_test.go index 285b21901..4841e5941 100644 --- a/src/integration/integration_golangapi_eventgenerator_test.go +++ b/src/integration/integration_golangapi_eventgenerator_test.go @@ -145,7 +145,7 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { metric.Timestamp = 555555 insertAppMetric(metric) - metric.Timestamp = 555555 + metric.Timestamp = 555556 insertAppMetric(metric) metric.Timestamp = 333333 @@ -160,7 +160,7 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { metric.Timestamp = 444444 insertAppMetric(metric) //add some other appId - metric.AppId = "some-other-app-id" + metric.AppId = getRandomId() metric.MetricType = models.MetricNameMemoryUsed metric.Unit = models.UnitMegaBytes metric.Timestamp = 444444 @@ -214,7 +214,7 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { MetricType: models.MetricNameMemoryUsed, Unit: models.UnitMegaBytes, Value: "123456", - Timestamp: 555555, + Timestamp: 555556, }, }, } @@ -270,7 +270,7 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { MetricType: models.MetricNameMemoryUsed, Unit: models.UnitMegaBytes, Value: "123456", - Timestamp: 555555, + Timestamp: 555556, }, models.AppMetric{ AppId: appId, @@ -308,8 +308,8 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { } checkAggregatedMetricResult(components.Ports[GolangAPIServer], pathVariables, parameters, result) - By("get the results from 444444 to 555555") - parameters = map[string]string{"start-time": "444444", "end-time": "555555", "order-direction": "asc", "page": "1", "results-per-page": "10"} + By("get the results from 444444 to 555556") + parameters = map[string]string{"start-time": "444444", "end-time": "555556", "order-direction": "asc", "page": "1", "results-per-page": "10"} result = AppAggregatedMetricResult{ TotalResults: 3, TotalPages: 1, @@ -334,7 +334,7 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { MetricType: models.MetricNameMemoryUsed, Unit: models.UnitMegaBytes, Value: "123456", - Timestamp: 555555, + Timestamp: 555556, }, }, } @@ -343,3 +343,4 @@ var _ = Describe("Integration_GolangApi_EventGenerator", func() { }) }) }) + diff --git a/src/integration/integration_golangapi_metricsserver_test.go b/src/integration/integration_golangapi_metricsserver_test.go index 2c8a08dc1..34a649bbd 100644 --- a/src/integration/integration_golangapi_metricsserver_test.go +++ b/src/integration/integration_golangapi_metricsserver_test.go @@ -178,7 +178,7 @@ var _ = Describe("Integration_GolangApi_MetricsServer", func() { metric.InstanceIndex = 1 insertAppInstanceMetric(metric) //add some other appId - metric.AppId = "some-other-app-id" + metric.AppId = getRandomId() metric.Name = models.MetricNameMemoryUsed metric.Unit = models.UnitMegaBytes metric.Timestamp = 444444 diff --git a/src/integration/integration_golangapi_scalingengine_test.go b/src/integration/integration_golangapi_scalingengine_test.go index 0fa6d6d9a..cb43dacb6 100644 --- a/src/integration/integration_golangapi_scalingengine_test.go +++ b/src/integration/integration_golangapi_scalingengine_test.go @@ -169,7 +169,7 @@ var _ = Describe("Integration_GolangApi_ScalingEngine", func() { insertScalingHistory(history) //add some other app id - history.AppId = "some-other-app-id" + history.AppId = getRandomId() history.Timestamp = 444444 insertScalingHistory(history) diff --git a/src/integration/integration_suite_test.go b/src/integration/integration_suite_test.go index dcbb77585..988aee74c 100644 --- a/src/integration/integration_suite_test.go +++ b/src/integration/integration_suite_test.go @@ -8,7 +8,7 @@ import ( as_testhelpers "autoscaler/testhelpers" "bytes" - "database/sql" + //"database/sql" "encoding/json" "fmt" "io/ioutil" @@ -31,6 +31,8 @@ import ( "github.com/cloudfoundry/sonde-go/events" "github.com/gogo/protobuf/proto" _ "github.com/lib/pq" + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/onsi/gomega/gexec" @@ -67,7 +69,7 @@ var ( appSummaryRegPath = regexp.MustCompile(`^/v2/apps/.*/summary$`) appInstanceRegPath = regexp.MustCompile(`^/v2/apps/.*$`) checkUserSpaceRegPath = regexp.MustCompile(`^/v2/users/.+/spaces.*$`) - dbHelper *sql.DB + dbHelper *sqlx.DB fakeCCNOAAUAA *ghttp.Server messagesToSend chan []byte streamingDoneChan chan bool @@ -117,7 +119,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Fail("environment variable $DBURL is not set") } - dbHelper, err = sql.Open(db.PostgresDriverName, dbUrl) + database, err := db.GetConnection(dbUrl) + Expect(err).NotTo(HaveOccurred()) + + dbHelper, err = sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) clearDatabase() @@ -132,7 +137,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Expect(err).NotTo(HaveOccurred()) dbUrl = os.Getenv("DBURL") - dbHelper, err = sql.Open(db.PostgresDriverName, dbUrl) + database, err := db.GetConnection(dbUrl) + Expect(err).NotTo(HaveOccurred()) + + dbHelper, err = sqlx.Open(database.DriverName, database.DSN) Expect(err).NotTo(HaveOccurred()) LOGLEVEL = os.Getenv("LOGLEVEL") @@ -570,22 +578,22 @@ func clearDatabase() { } func insertPolicy(appId string, policyStr string, guid string) { - query := "INSERT INTO policy_json(app_id, policy_json, guid) VALUES($1, $2, $3)" + query := dbHelper.Rebind("INSERT INTO policy_json(app_id, policy_json, guid) VALUES(?, ?, ?)") _, err := dbHelper.Exec(query, appId, policyStr, guid) Expect(err).NotTo(HaveOccurred()) } func deletePolicy(appId string) { - query := "DELETE FROM policy_json WHERE app_id=$1" + query := dbHelper.Rebind("DELETE FROM policy_json WHERE app_id=?") _, err := dbHelper.Exec(query, appId) Expect(err).NotTo(HaveOccurred()) } func insertScalingHistory(history *models.AppScalingHistory) { - query := "INSERT INTO scalinghistory" + + query := dbHelper.Rebind("INSERT INTO scalinghistory" + "(appid, timestamp, scalingtype, status, oldinstances, newinstances, reason, message, error) " + - " VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9)" + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)") _, err := dbHelper.Exec(query, history.AppId, history.Timestamp, history.ScalingType, history.Status, history.OldInstances, history.NewInstances, history.Reason, history.Message, history.Error) @@ -593,36 +601,36 @@ func insertScalingHistory(history *models.AppScalingHistory) { } func getScalingHistoryCount(appId string, oldInstanceCount int, newInstanceCount int) int { var count int - query := "SELECT COUNT(*) FROM scalinghistory WHERE appid=$1 AND oldinstances=$2 AND newinstances=$3" + query := dbHelper.Rebind("SELECT COUNT(*) FROM scalinghistory WHERE appid=? AND oldinstances=? AND newinstances=?") err := dbHelper.QueryRow(query, appId, oldInstanceCount, newInstanceCount).Scan(&count) Expect(err).NotTo(HaveOccurred()) return count } func getScalingHistoryTotalCount(appId string) int { var count int - query := "SELECT COUNT(*) FROM scalinghistory WHERE appid=$1" + query := dbHelper.Rebind("SELECT COUNT(*) FROM scalinghistory WHERE appid=?") err := dbHelper.QueryRow(query, appId).Scan(&count) Expect(err).NotTo(HaveOccurred()) return count } func insertAppInstanceMetric(appInstanceMetric *models.AppInstanceMetric) { - query := "INSERT INTO appinstancemetrics" + + query := dbHelper.Rebind("INSERT INTO appinstancemetrics" + "(appid, instanceindex, collectedat, name, unit, value, timestamp) " + - "VALUES($1, $2, $3, $4, $5, $6, $7)" + "VALUES(?, ?, ?, ?, ?, ?, ?)") _, err := dbHelper.Exec(query, appInstanceMetric.AppId, appInstanceMetric.InstanceIndex, appInstanceMetric.CollectedAt, appInstanceMetric.Name, appInstanceMetric.Unit, appInstanceMetric.Value, appInstanceMetric.Timestamp) Expect(err).NotTo(HaveOccurred()) } func insertAppMetric(appMetrics *models.AppMetric) { - query := "INSERT INTO app_metric" + + query := dbHelper.Rebind("INSERT INTO app_metric" + "(app_id, metric_type, unit, value, timestamp) " + - "VALUES($1, $2, $3, $4, $5)" + "VALUES(?, ?, ?, ?, ?)") _, err := dbHelper.Exec(query, appMetrics.AppId, appMetrics.MetricType, appMetrics.Unit, appMetrics.Value, appMetrics.Timestamp) Expect(err).NotTo(HaveOccurred()) } func getAppInstanceMetricTotalCount(appId string) int { var count int - query := "SELECT COUNT(*) FROM appinstancemetrics WHERE appid=$1" + query := dbHelper.Rebind("SELECT COUNT(*) FROM appinstancemetrics WHERE appid=?") err := dbHelper.QueryRow(query, appId).Scan(&count) Expect(err).NotTo(HaveOccurred()) return count @@ -630,7 +638,7 @@ func getAppInstanceMetricTotalCount(appId string) int { func getAppMetricTotalCount(appId string) int { var count int - query := "SELECT COUNT(*) FROM app_metric WHERE app_id=$1" + query := dbHelper.Rebind("SELECT COUNT(*) FROM app_metric WHERE app_id=?") err := dbHelper.QueryRow(query, appId).Scan(&count) Expect(err).NotTo(HaveOccurred()) return count @@ -638,7 +646,7 @@ func getAppMetricTotalCount(appId string) int { func getCredentialsCount(appId string) int { var count int - query := "SELECT COUNT(*) FROM credentials WHERE id=$1" + query := dbHelper.Rebind("SELECT COUNT(*) FROM credentials WHERE id=?") err := dbHelper.QueryRow(query, appId).Scan(&count) Expect(err).NotTo(HaveOccurred()) return count @@ -855,6 +863,9 @@ func createHTTPTimerEnvelope(appId string, start int64, end int64) []*loggregato Stop: end, }, }, + DeprecatedTags: map[string]*loggregator_v2.Value{ + "peer_type": {Data: &loggregator_v2.Value_Text{Text: "Client"}}, + }, }, }