From 00c142919984466245a3efd2038d26f159f579be Mon Sep 17 00:00:00 2001 From: wb-cjh663673 Date: Tue, 8 Jun 2021 14:44:05 +0800 Subject: [PATCH 1/4] add scheduled sql --- client_scheduled_sql.go | 188 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 client_scheduled_sql.go diff --git a/client_scheduled_sql.go b/client_scheduled_sql.go new file mode 100644 index 00000000..170c4ced --- /dev/null +++ b/client_scheduled_sql.go @@ -0,0 +1,188 @@ +package sls + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/url" +) + +type ScheduledSQL struct { + Name string `json:"name"` + DisplayName string `json:"displayName"` + Description string `json:"description"` + State string `json:"state"` + Status string `json:"status"` + ScheduleId string `json:"scheduleId"` + Configuration *ScheduledSQLConfiguration `json:"configuration"` + Schedule *Schedule `json:"schedule"` + CreateTime int64 `json:"createTime,omitempty"` + LastModifiedTime int64 `json:"lastModifiedTime,omitempty"` +} + +type ScheduledSQLConfiguration struct { + SourceLogStore string `json:"sourceLogstore"` + DestProject string `json:"destProject"` + DestEndpoint string `json:"destEndpoint"` + DestLogStore string `json:"destLogstore"` + Script string `json:"script"` + SqlType string `json:"sqlType"` + ResourcePool string `json:"resourcePool"` + RoleArn string `json:"roleArn"` + DestRoleArn string `json:"destRoleArn"` + FromTimeExpr string `json:"fromTimeExpr"` + ToTimeExpr string `json:"toTimeExpr"` + MaxRunTimeInSeconds int32 `json:"maxRunTimeInSeconds"` + MaxRetries int32 `json:"maxRetries"` + FromTime int64 `json:"fromTime"` + ToTime int64 `json:"toTime"` + DataFormat string `json:"dataFormat"` + Parameters *ScheduledSQLParameters `json:"parameters,omitempty"` +} + +func (s *ScheduledSQL) MarshalJSON() ([]byte, error) { + body := map[string]interface{}{ + "name": s.Name, + "displayName": s.DisplayName, + "description": s.Description, + "state": s.State, + "status": s.Status, + "scheduleId": s.ScheduleId, + "configuration": s.Configuration, + "schedule": s.Schedule, + "type": "ScheduledSQL", + } + return json.Marshal(body) +} + +func NewScheduledSQLConfiguration() *ScheduledSQLConfiguration { + return &ScheduledSQLConfiguration{ + SqlType: "standard", + ResourcePool: "default", + FromTime: 0, + ToTime: 0, + DataFormat: "log2log", + } +} + +type ScheduledSQLParameters struct { + TimeKey string `json:"timeKey,omitempty"` + LabelKeys string `json:"labelKeys,omitempty"` + MetricKeys string `json:"metricKeys,omitempty"` + MetricName string `json:"metricName,omitempty"` + HashLabels string `json:"hashLabels,omitempty"` + AddLabels string `json:"addLabels,omitempty"` +} + +func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { + fromTime := scheduledsql.Configuration.FromTime + toTime := scheduledsql.Configuration.ToTime + timeRange := fromTime > 1451577600 && toTime > fromTime + sustained := fromTime > 1451577600 && toTime == 0 + if !timeRange && !sustained { + return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime) + } + body, err := json.Marshal(scheduledsql) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + + uri := "/jobs" + r, err := c.request(project, "POST", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) DeleteScheduledSQL(project string, name string) error { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := "/jobs/" + name + r, err := c.request(project, "DELETE", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { + body, err := json.Marshal(scheduledsql) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + + uri := "/jobs/" + scheduledsql.Name + r, err := c.request(project, "PUT", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) GetScheduledSQL(project string, name string) (*ScheduledSQL, error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + uri := "/jobs/" + name + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, err + } + defer r.Body.Close() + buf, _ := ioutil.ReadAll(r.Body) + scheduledSQL := &ScheduledSQL{} + if err = json.Unmarshal(buf, scheduledSQL); err != nil { + err = NewClientError(err) + } + return scheduledSQL, err +} + +func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + v := url.Values{} + v.Add("jobName", name) + if displayName != "" { + v.Add("displayName", displayName) + } + v.Add("jobType", "ScheduledSQL") + v.Add("offset", fmt.Sprintf("%d", offset)) + v.Add("size", fmt.Sprintf("%d", size)) + + uri := "/jobs?" + v.Encode() + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, 0, 0, err + } + defer r.Body.Close() + + type ScheduledSqlList struct { + Total int `json:"total"` + Count int `json:"count"` + Results []*ScheduledSQL `json:"results"` + } + buf, _ := ioutil.ReadAll(r.Body) + scheduledSqlList := &ScheduledSqlList{} + if err = json.Unmarshal(buf, scheduledSqlList); err != nil { + return nil, 0, 0, NewClientError(err) + } + return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, nil +} From 463bba4c94cc4e920e164b453977895a5c49b94c Mon Sep 17 00:00:00 2001 From: wb-cjh663673 Date: Tue, 8 Jun 2021 16:09:57 +0800 Subject: [PATCH 2/4] improve the token_auto_update_client --- client_interface.go | 5 ++++ client_scheduled_sql.go | 30 ++++++++++----------- client_scheduled_sql_test.go | 19 ++++++++++++++ token_auto_update_client.go | 51 ++++++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 client_scheduled_sql_test.go diff --git a/client_interface.go b/client_interface.go index 4dc46c56..9b90eabb 100644 --- a/client_interface.go +++ b/client_interface.go @@ -258,4 +258,9 @@ type ClientInterface interface { resourceIDs []string, tags []ResourceFilterTag, nextToken string) (respTags []*ResourceTagResponse, respNextToken string, err error) + CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error + DeleteScheduledSQL(project string, name string) error + UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error + GetScheduledSQL(project string, name string) (*ScheduledSQL, error) + ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error) } diff --git a/client_scheduled_sql.go b/client_scheduled_sql.go index 170c4ced..69932a77 100644 --- a/client_scheduled_sql.go +++ b/client_scheduled_sql.go @@ -11,7 +11,6 @@ type ScheduledSQL struct { Name string `json:"name"` DisplayName string `json:"displayName"` Description string `json:"description"` - State string `json:"state"` Status string `json:"status"` ScheduleId string `json:"scheduleId"` Configuration *ScheduledSQLConfiguration `json:"configuration"` @@ -42,15 +41,16 @@ type ScheduledSQLConfiguration struct { func (s *ScheduledSQL) MarshalJSON() ([]byte, error) { body := map[string]interface{}{ - "name": s.Name, - "displayName": s.DisplayName, - "description": s.Description, - "state": s.State, - "status": s.Status, - "scheduleId": s.ScheduleId, - "configuration": s.Configuration, - "schedule": s.Schedule, - "type": "ScheduledSQL", + "name": s.Name, + "displayName": s.DisplayName, + "description": s.Description, + "status": s.Status, + "scheduleId": s.ScheduleId, + "configuration": s.Configuration, + "schedule": s.Schedule, + "createTime": s.CreateTime, + "lastModifiedTime": s.LastModifiedTime, + "type": "ScheduledSQL", } return json.Marshal(body) } @@ -82,7 +82,7 @@ func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) if !timeRange && !sustained { return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime) } - body, err := json.Marshal(scheduledsql) + body, err := scheduledsql.MarshalJSON() if err != nil { return NewClientError(err) } @@ -116,7 +116,7 @@ func (c *Client) DeleteScheduledSQL(project string, name string) error { } func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { - body, err := json.Marshal(scheduledsql) + body, err := scheduledsql.MarshalJSON() if err != nil { return NewClientError(err) } @@ -153,7 +153,7 @@ func (c *Client) GetScheduledSQL(project string, name string) (*ScheduledSQL, er return scheduledSQL, err } -func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error) { +func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, error error) { h := map[string]string{ "x-log-bodyrawsize": "0", "Content-Type": "application/json", @@ -182,7 +182,7 @@ func (c *Client) ListScheduledSQL(project, name, displayName string, offset, siz buf, _ := ioutil.ReadAll(r.Body) scheduledSqlList := &ScheduledSqlList{} if err = json.Unmarshal(buf, scheduledSqlList); err != nil { - return nil, 0, 0, NewClientError(err) + err = NewClientError(err) } - return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, nil + return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, err } diff --git a/client_scheduled_sql_test.go b/client_scheduled_sql_test.go new file mode 100644 index 00000000..5d5cb6d5 --- /dev/null +++ b/client_scheduled_sql_test.go @@ -0,0 +1,19 @@ +package sls + +import ( + "fmt" + "os" + "testing" +) + +func TestList(t *testing.T) { + AccessKeyID := os.Getenv("ALICLOUD_ACCESS_KEY") + AccessKeySecret := os.Getenv("ALICLOUD_SECRET_KEY") + Endpoint := "cn-hangzhou.log.aliyuncs.com" + client := CreateNormalInterface(Endpoint, AccessKeyID, AccessKeySecret, "") + scheduledSQL, err := client.GetScheduledSQL("", "") + if err != nil { + fmt.Printf("%v", err) + } + fmt.Printf("%v", scheduledSQL) +} diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 78907ef5..795803f0 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -1179,3 +1179,54 @@ func (c *TokenAutoUpdateClient) ListTagResources(project string, } return } + +// ####################### Scheduled SQL API ###################### +func (c *TokenAutoUpdateClient) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.CreateScheduledSQL(project, scheduledsql) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) DeleteScheduledSQL(project string, name string) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.DeleteScheduledSQL(project, name) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.UpdateScheduledSQL(project, scheduledsql) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) GetScheduledSQL(project string, name string) (s *ScheduledSQL, err error) { + for i := 0; i < c.maxTryTimes; i++ { + s, err = c.logClient.GetScheduledSQL(project, name) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, err error) { + for i := 0; i < c.maxTryTimes; i++ { + scheduledsqls, total, count, err = c.logClient.ListScheduledSQL(project, name, displayName, offset, size) + if !c.processError(err) { + return + } + } + return +} From c9c028d8c9522a297cd408f8557a444b8c827721 Mon Sep 17 00:00:00 2001 From: wb-cjh663673 Date: Wed, 9 Jun 2021 16:22:58 +0800 Subject: [PATCH 3/4] optimize code --- client_scheduled_sql.go | 70 ++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/client_scheduled_sql.go b/client_scheduled_sql.go index 69932a77..5c192c31 100644 --- a/client_scheduled_sql.go +++ b/client_scheduled_sql.go @@ -7,16 +7,52 @@ import ( "net/url" ) +type SqlType string +type ResourcePool string +type DataFormat string +type JobType string +type Status string + +const ( + STANDARD SqlType = "standard" + SEARCH_QUERY SqlType = "searchQuery" +) +const ( + DEFAULT ResourcePool = "default" + ENHANCED ResourcePool = "enhanced" +) +const ( + LOG_TO_LOG DataFormat = "log2log" + LOG_TO_METRIC DataFormat = "log2metric" + METRIC_TO_metric DataFormat = "metric2metric" +) +const ( + ALERT_JOB JobType = "Alert" + REPORT_JOB JobType = "Report" + ETL_JOB JobType = "ETL" + INGESTION_JOB JobType = "Ingestion" + REBUILD_INDEX_JOB JobType = "RebuildIndex" + AUDIT_JOB_JOB JobType = "AuditJob" + EXPORT_JOB JobType = "Export" + SCHEDULED_SQL_JOB JobType = "ScheduledSQL" +) + +const ( + ENABLED Status = "Enabled" + DISABLED Status = "Disabled" +) + type ScheduledSQL struct { Name string `json:"name"` DisplayName string `json:"displayName"` Description string `json:"description"` - Status string `json:"status"` + Status Status `json:"status"` ScheduleId string `json:"scheduleId"` Configuration *ScheduledSQLConfiguration `json:"configuration"` Schedule *Schedule `json:"schedule"` CreateTime int64 `json:"createTime,omitempty"` LastModifiedTime int64 `json:"lastModifiedTime,omitempty"` + Type JobType `json:"type"` } type ScheduledSQLConfiguration struct { @@ -25,8 +61,8 @@ type ScheduledSQLConfiguration struct { DestEndpoint string `json:"destEndpoint"` DestLogStore string `json:"destLogstore"` Script string `json:"script"` - SqlType string `json:"sqlType"` - ResourcePool string `json:"resourcePool"` + SqlType SqlType `json:"sqlType"` + ResourcePool ResourcePool `json:"resourcePool"` RoleArn string `json:"roleArn"` DestRoleArn string `json:"destRoleArn"` FromTimeExpr string `json:"fromTimeExpr"` @@ -35,33 +71,17 @@ type ScheduledSQLConfiguration struct { MaxRetries int32 `json:"maxRetries"` FromTime int64 `json:"fromTime"` ToTime int64 `json:"toTime"` - DataFormat string `json:"dataFormat"` + DataFormat DataFormat `json:"dataFormat"` Parameters *ScheduledSQLParameters `json:"parameters,omitempty"` } -func (s *ScheduledSQL) MarshalJSON() ([]byte, error) { - body := map[string]interface{}{ - "name": s.Name, - "displayName": s.DisplayName, - "description": s.Description, - "status": s.Status, - "scheduleId": s.ScheduleId, - "configuration": s.Configuration, - "schedule": s.Schedule, - "createTime": s.CreateTime, - "lastModifiedTime": s.LastModifiedTime, - "type": "ScheduledSQL", - } - return json.Marshal(body) -} - func NewScheduledSQLConfiguration() *ScheduledSQLConfiguration { return &ScheduledSQLConfiguration{ - SqlType: "standard", - ResourcePool: "default", + SqlType: STANDARD, + ResourcePool: DEFAULT, FromTime: 0, ToTime: 0, - DataFormat: "log2log", + DataFormat: LOG_TO_LOG, } } @@ -82,7 +102,7 @@ func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) if !timeRange && !sustained { return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime) } - body, err := scheduledsql.MarshalJSON() + body, err := json.Marshal(scheduledsql) if err != nil { return NewClientError(err) } @@ -116,7 +136,7 @@ func (c *Client) DeleteScheduledSQL(project string, name string) error { } func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { - body, err := scheduledsql.MarshalJSON() + body, err := json.Marshal(scheduledsql) if err != nil { return NewClientError(err) } From 3801ffe3a60aa7a6c69454d76bd3e00ad8a4ac75 Mon Sep 17 00:00:00 2001 From: wb-cjh663673 Date: Wed, 9 Jun 2021 16:50:04 +0800 Subject: [PATCH 4/4] add test --- client_scheduled_sql_test.go | 135 +++++++++++++++++++++++++++++++++-- 1 file changed, 128 insertions(+), 7 deletions(-) diff --git a/client_scheduled_sql_test.go b/client_scheduled_sql_test.go index 5d5cb6d5..c02f3a5c 100644 --- a/client_scheduled_sql_test.go +++ b/client_scheduled_sql_test.go @@ -4,16 +4,137 @@ import ( "fmt" "os" "testing" + "time" ) +func TestCreate(t *testing.T) { + client := makeClient() + err := setUp(client) + if err != nil { + t.Fatalf("%v", err) + } + err = client.CreateScheduledSQL("test-scheduled-sql", getScheduleSQL("111")) + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestDelete(t *testing.T) { + client := makeClient() + err := client.DeleteScheduledSQL("test-scheduled-sql", "test01") + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestUpdate(t *testing.T) { + client := makeClient() + err := client.UpdateScheduledSQL("test-scheduled-sql", getScheduleSQL("222")) + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestGet(t *testing.T) { + client := makeClient() + scheduledSQL, err := client.GetScheduledSQL("test-scheduled-sql", "test01") + if err != nil { + t.Fatalf("%v", err) + } + fmt.Printf("%v\n", scheduledSQL) +} + func TestList(t *testing.T) { - AccessKeyID := os.Getenv("ALICLOUD_ACCESS_KEY") - AccessKeySecret := os.Getenv("ALICLOUD_SECRET_KEY") - Endpoint := "cn-hangzhou.log.aliyuncs.com" - client := CreateNormalInterface(Endpoint, AccessKeyID, AccessKeySecret, "") - scheduledSQL, err := client.GetScheduledSQL("", "") + client := makeClient() + scheduledSQL, total, count, err := client.ListScheduledSQL("test-scheduled-sql", "", "", 0, 10) if err != nil { - fmt.Printf("%v", err) + t.Fatalf("%v", err) + } + fmt.Printf("%v\n%d\n%d\n", scheduledSQL, total, count) +} + +func makeClient() *Client { + return &Client{ + Endpoint: "pub-cn-hangzhou-staging-share.log.aliyuncs.com", + AccessKeyID: os.Getenv("ALICLOUD_ACCESS_KEY"), + AccessKeySecret: os.Getenv("ALICLOUD_SECRET_KEY"), + } +} + +func getScheduleSQL(des string) *ScheduledSQL { + return &ScheduledSQL{ + Name: "test01", + DisplayName: "dis001", + Description: des, + Status: ENABLED, + Configuration: &ScheduledSQLConfiguration{ + SourceLogStore: "test-source", + DestProject: "test-schedulesql", + DestEndpoint: "cn-hangzhou-intranet.log.aliyuncs.com", + DestLogStore: "test-target", + Script: "*|SELECT COUNT(__value__)", + SqlType: SEARCH_QUERY, + ResourcePool: DEFAULT, + RoleArn: os.Getenv("ROLE_ARN"), + DestRoleArn: os.Getenv("ROLE_ARN"), + FromTimeExpr: "@m-15m", + ToTimeExpr: "@m", + MaxRunTimeInSeconds: 60, + MaxRetries: 20, + FromTime: 1621828800, + ToTime: 1623311901, + DataFormat: LOG_TO_LOG, + Parameters: nil, + }, + Schedule: &Schedule{ + Type: "FixedRate", + Interval: "15m", + Delay: 30, + DayOfWeek: 0, + Hour: 0, + }, + CreateTime: 0, + LastModifiedTime: 0, + Type: SCHEDULED_SQL_JOB, + } +} + +func setUp(c *Client) error { + if ok, err := c.CheckProjectExist("test-scheduled-sql"); err != nil { + return err + } else if ok { + err := c.DeleteProject("test-scheduled-sql") + if err != nil { + return err + } + time.Sleep(time.Second * 30) + _, err = c.CreateProject("test-scheduled-sql", "test scheduled sql") + if err != nil { + return err + } else { + time.Sleep(time.Second * 60) + } + } + err1 := c.CreateLogStore("test-scheduled-sql", "test-source", 3, 2, true, 4) + if err1 != nil { + return err1 + } + err2 := c.CreateLogStore("test-scheduled-sql", "test-target", 3, 2, true, 4) + if err2 != nil { + return err2 + } + err3 := c.CreateIndex("test-scheduled-sql", "test-source", Index{ + Keys: map[string]IndexKey{"__labels__": { + Token: []string{",", " ", "'"}, + CaseSensitive: true, + Type: "text", + DocValue: true, + Chn: true, + }}, + Line: nil, + }) + if err3 != nil { + return err3 } - fmt.Printf("%v", scheduledSQL) + return nil }