diff --git a/client_interface.go b/client_interface.go index 4dc46c56..9b90eabb 100644 --- a/client_interface.go +++ b/client_interface.go @@ -258,4 +258,9 @@ type ClientInterface interface { resourceIDs []string, tags []ResourceFilterTag, nextToken string) (respTags []*ResourceTagResponse, respNextToken string, err error) + CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error + DeleteScheduledSQL(project string, name string) error + UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error + GetScheduledSQL(project string, name string) (*ScheduledSQL, error) + ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error) } diff --git a/client_scheduled_sql.go b/client_scheduled_sql.go new file mode 100644 index 00000000..5c192c31 --- /dev/null +++ b/client_scheduled_sql.go @@ -0,0 +1,208 @@ +package sls + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/url" +) + +type SqlType string +type ResourcePool string +type DataFormat string +type JobType string +type Status string + +const ( + STANDARD SqlType = "standard" + SEARCH_QUERY SqlType = "searchQuery" +) +const ( + DEFAULT ResourcePool = "default" + ENHANCED ResourcePool = "enhanced" +) +const ( + LOG_TO_LOG DataFormat = "log2log" + LOG_TO_METRIC DataFormat = "log2metric" + METRIC_TO_metric DataFormat = "metric2metric" +) +const ( + ALERT_JOB JobType = "Alert" + REPORT_JOB JobType = "Report" + ETL_JOB JobType = "ETL" + INGESTION_JOB JobType = "Ingestion" + REBUILD_INDEX_JOB JobType = "RebuildIndex" + AUDIT_JOB_JOB JobType = "AuditJob" + EXPORT_JOB JobType = "Export" + SCHEDULED_SQL_JOB JobType = "ScheduledSQL" +) + +const ( + ENABLED Status = "Enabled" + DISABLED Status = "Disabled" +) + +type ScheduledSQL struct { + Name string `json:"name"` + DisplayName string `json:"displayName"` + Description string `json:"description"` + Status Status `json:"status"` + ScheduleId string `json:"scheduleId"` + Configuration *ScheduledSQLConfiguration `json:"configuration"` + Schedule *Schedule `json:"schedule"` + CreateTime int64 `json:"createTime,omitempty"` + LastModifiedTime int64 `json:"lastModifiedTime,omitempty"` + Type JobType `json:"type"` +} + +type ScheduledSQLConfiguration struct { + SourceLogStore string `json:"sourceLogstore"` + DestProject string `json:"destProject"` + DestEndpoint string `json:"destEndpoint"` + DestLogStore string `json:"destLogstore"` + Script string `json:"script"` + SqlType SqlType `json:"sqlType"` + ResourcePool ResourcePool `json:"resourcePool"` + RoleArn string `json:"roleArn"` + DestRoleArn string `json:"destRoleArn"` + FromTimeExpr string `json:"fromTimeExpr"` + ToTimeExpr string `json:"toTimeExpr"` + MaxRunTimeInSeconds int32 `json:"maxRunTimeInSeconds"` + MaxRetries int32 `json:"maxRetries"` + FromTime int64 `json:"fromTime"` + ToTime int64 `json:"toTime"` + DataFormat DataFormat `json:"dataFormat"` + Parameters *ScheduledSQLParameters `json:"parameters,omitempty"` +} + +func NewScheduledSQLConfiguration() *ScheduledSQLConfiguration { + return &ScheduledSQLConfiguration{ + SqlType: STANDARD, + ResourcePool: DEFAULT, + FromTime: 0, + ToTime: 0, + DataFormat: LOG_TO_LOG, + } +} + +type ScheduledSQLParameters struct { + TimeKey string `json:"timeKey,omitempty"` + LabelKeys string `json:"labelKeys,omitempty"` + MetricKeys string `json:"metricKeys,omitempty"` + MetricName string `json:"metricName,omitempty"` + HashLabels string `json:"hashLabels,omitempty"` + AddLabels string `json:"addLabels,omitempty"` +} + +func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { + fromTime := scheduledsql.Configuration.FromTime + toTime := scheduledsql.Configuration.ToTime + timeRange := fromTime > 1451577600 && toTime > fromTime + sustained := fromTime > 1451577600 && toTime == 0 + if !timeRange && !sustained { + return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime) + } + body, err := json.Marshal(scheduledsql) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + + uri := "/jobs" + r, err := c.request(project, "POST", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) DeleteScheduledSQL(project string, name string) error { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + + uri := "/jobs/" + name + r, err := c.request(project, "DELETE", uri, h, nil) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error { + body, err := json.Marshal(scheduledsql) + if err != nil { + return NewClientError(err) + } + h := map[string]string{ + "x-log-bodyrawsize": fmt.Sprintf("%v", len(body)), + "Content-Type": "application/json", + } + + uri := "/jobs/" + scheduledsql.Name + r, err := c.request(project, "PUT", uri, h, body) + if err != nil { + return err + } + r.Body.Close() + return nil +} + +func (c *Client) GetScheduledSQL(project string, name string) (*ScheduledSQL, error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + uri := "/jobs/" + name + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, err + } + defer r.Body.Close() + buf, _ := ioutil.ReadAll(r.Body) + scheduledSQL := &ScheduledSQL{} + if err = json.Unmarshal(buf, scheduledSQL); err != nil { + err = NewClientError(err) + } + return scheduledSQL, err +} + +func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, error error) { + h := map[string]string{ + "x-log-bodyrawsize": "0", + "Content-Type": "application/json", + } + v := url.Values{} + v.Add("jobName", name) + if displayName != "" { + v.Add("displayName", displayName) + } + v.Add("jobType", "ScheduledSQL") + v.Add("offset", fmt.Sprintf("%d", offset)) + v.Add("size", fmt.Sprintf("%d", size)) + + uri := "/jobs?" + v.Encode() + r, err := c.request(project, "GET", uri, h, nil) + if err != nil { + return nil, 0, 0, err + } + defer r.Body.Close() + + type ScheduledSqlList struct { + Total int `json:"total"` + Count int `json:"count"` + Results []*ScheduledSQL `json:"results"` + } + buf, _ := ioutil.ReadAll(r.Body) + scheduledSqlList := &ScheduledSqlList{} + if err = json.Unmarshal(buf, scheduledSqlList); err != nil { + err = NewClientError(err) + } + return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, err +} diff --git a/client_scheduled_sql_test.go b/client_scheduled_sql_test.go new file mode 100644 index 00000000..c02f3a5c --- /dev/null +++ b/client_scheduled_sql_test.go @@ -0,0 +1,140 @@ +package sls + +import ( + "fmt" + "os" + "testing" + "time" +) + +func TestCreate(t *testing.T) { + client := makeClient() + err := setUp(client) + if err != nil { + t.Fatalf("%v", err) + } + err = client.CreateScheduledSQL("test-scheduled-sql", getScheduleSQL("111")) + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestDelete(t *testing.T) { + client := makeClient() + err := client.DeleteScheduledSQL("test-scheduled-sql", "test01") + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestUpdate(t *testing.T) { + client := makeClient() + err := client.UpdateScheduledSQL("test-scheduled-sql", getScheduleSQL("222")) + if err != nil { + t.Fatalf("%v", err) + } +} + +func TestGet(t *testing.T) { + client := makeClient() + scheduledSQL, err := client.GetScheduledSQL("test-scheduled-sql", "test01") + if err != nil { + t.Fatalf("%v", err) + } + fmt.Printf("%v\n", scheduledSQL) +} + +func TestList(t *testing.T) { + client := makeClient() + scheduledSQL, total, count, err := client.ListScheduledSQL("test-scheduled-sql", "", "", 0, 10) + if err != nil { + t.Fatalf("%v", err) + } + fmt.Printf("%v\n%d\n%d\n", scheduledSQL, total, count) +} + +func makeClient() *Client { + return &Client{ + Endpoint: "pub-cn-hangzhou-staging-share.log.aliyuncs.com", + AccessKeyID: os.Getenv("ALICLOUD_ACCESS_KEY"), + AccessKeySecret: os.Getenv("ALICLOUD_SECRET_KEY"), + } +} + +func getScheduleSQL(des string) *ScheduledSQL { + return &ScheduledSQL{ + Name: "test01", + DisplayName: "dis001", + Description: des, + Status: ENABLED, + Configuration: &ScheduledSQLConfiguration{ + SourceLogStore: "test-source", + DestProject: "test-schedulesql", + DestEndpoint: "cn-hangzhou-intranet.log.aliyuncs.com", + DestLogStore: "test-target", + Script: "*|SELECT COUNT(__value__)", + SqlType: SEARCH_QUERY, + ResourcePool: DEFAULT, + RoleArn: os.Getenv("ROLE_ARN"), + DestRoleArn: os.Getenv("ROLE_ARN"), + FromTimeExpr: "@m-15m", + ToTimeExpr: "@m", + MaxRunTimeInSeconds: 60, + MaxRetries: 20, + FromTime: 1621828800, + ToTime: 1623311901, + DataFormat: LOG_TO_LOG, + Parameters: nil, + }, + Schedule: &Schedule{ + Type: "FixedRate", + Interval: "15m", + Delay: 30, + DayOfWeek: 0, + Hour: 0, + }, + CreateTime: 0, + LastModifiedTime: 0, + Type: SCHEDULED_SQL_JOB, + } +} + +func setUp(c *Client) error { + if ok, err := c.CheckProjectExist("test-scheduled-sql"); err != nil { + return err + } else if ok { + err := c.DeleteProject("test-scheduled-sql") + if err != nil { + return err + } + time.Sleep(time.Second * 30) + _, err = c.CreateProject("test-scheduled-sql", "test scheduled sql") + if err != nil { + return err + } else { + time.Sleep(time.Second * 60) + } + } + err1 := c.CreateLogStore("test-scheduled-sql", "test-source", 3, 2, true, 4) + if err1 != nil { + return err1 + } + err2 := c.CreateLogStore("test-scheduled-sql", "test-target", 3, 2, true, 4) + if err2 != nil { + return err2 + } + err3 := c.CreateIndex("test-scheduled-sql", "test-source", Index{ + Keys: map[string]IndexKey{"__labels__": { + Token: []string{",", " ", "'"}, + CaseSensitive: true, + Type: "text", + DocValue: true, + Chn: true, + }}, + Line: nil, + }) + if err3 != nil { + return err3 + } + return nil +} diff --git a/token_auto_update_client.go b/token_auto_update_client.go index 78907ef5..795803f0 100644 --- a/token_auto_update_client.go +++ b/token_auto_update_client.go @@ -1179,3 +1179,54 @@ func (c *TokenAutoUpdateClient) ListTagResources(project string, } return } + +// ####################### Scheduled SQL API ###################### +func (c *TokenAutoUpdateClient) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.CreateScheduledSQL(project, scheduledsql) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) DeleteScheduledSQL(project string, name string) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.DeleteScheduledSQL(project, name) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) (err error) { + for i := 0; i < c.maxTryTimes; i++ { + err = c.logClient.UpdateScheduledSQL(project, scheduledsql) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) GetScheduledSQL(project string, name string) (s *ScheduledSQL, err error) { + for i := 0; i < c.maxTryTimes; i++ { + s, err = c.logClient.GetScheduledSQL(project, name) + if !c.processError(err) { + return + } + } + return +} + +func (c *TokenAutoUpdateClient) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, err error) { + for i := 0; i < c.maxTryTimes; i++ { + scheduledsqls, total, count, err = c.logClient.ListScheduledSQL(project, name, displayName, offset, size) + if !c.processError(err) { + return + } + } + return +}