Skip to content

Commit

Permalink
Merge pull request #128 from HuiJing-C/master
Browse files Browse the repository at this point in the history
Support Scheduled SQL
  • Loading branch information
shabicheng authored Jun 10, 2021
2 parents bf93bc5 + 3801ffe commit ca0645c
Show file tree
Hide file tree
Showing 4 changed files with 404 additions and 0 deletions.
5 changes: 5 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,9 @@ type ClientInterface interface {
resourceIDs []string,
tags []ResourceFilterTag,
nextToken string) (respTags []*ResourceTagResponse, respNextToken string, err error)
CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error
DeleteScheduledSQL(project string, name string) error
UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error
GetScheduledSQL(project string, name string) (*ScheduledSQL, error)
ListScheduledSQL(project, name, displayName string, offset, size int) ([]*ScheduledSQL, int, int, error)
}
208 changes: 208 additions & 0 deletions client_scheduled_sql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package sls

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/url"
)

type SqlType string
type ResourcePool string
type DataFormat string
type JobType string
type Status string

const (
STANDARD SqlType = "standard"
SEARCH_QUERY SqlType = "searchQuery"
)
const (
DEFAULT ResourcePool = "default"
ENHANCED ResourcePool = "enhanced"
)
const (
LOG_TO_LOG DataFormat = "log2log"
LOG_TO_METRIC DataFormat = "log2metric"
METRIC_TO_metric DataFormat = "metric2metric"
)
const (
ALERT_JOB JobType = "Alert"
REPORT_JOB JobType = "Report"
ETL_JOB JobType = "ETL"
INGESTION_JOB JobType = "Ingestion"
REBUILD_INDEX_JOB JobType = "RebuildIndex"
AUDIT_JOB_JOB JobType = "AuditJob"
EXPORT_JOB JobType = "Export"
SCHEDULED_SQL_JOB JobType = "ScheduledSQL"
)

const (
ENABLED Status = "Enabled"
DISABLED Status = "Disabled"
)

type ScheduledSQL struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
Description string `json:"description"`
Status Status `json:"status"`
ScheduleId string `json:"scheduleId"`
Configuration *ScheduledSQLConfiguration `json:"configuration"`
Schedule *Schedule `json:"schedule"`
CreateTime int64 `json:"createTime,omitempty"`
LastModifiedTime int64 `json:"lastModifiedTime,omitempty"`
Type JobType `json:"type"`
}

type ScheduledSQLConfiguration struct {
SourceLogStore string `json:"sourceLogstore"`
DestProject string `json:"destProject"`
DestEndpoint string `json:"destEndpoint"`
DestLogStore string `json:"destLogstore"`
Script string `json:"script"`
SqlType SqlType `json:"sqlType"`
ResourcePool ResourcePool `json:"resourcePool"`
RoleArn string `json:"roleArn"`
DestRoleArn string `json:"destRoleArn"`
FromTimeExpr string `json:"fromTimeExpr"`
ToTimeExpr string `json:"toTimeExpr"`
MaxRunTimeInSeconds int32 `json:"maxRunTimeInSeconds"`
MaxRetries int32 `json:"maxRetries"`
FromTime int64 `json:"fromTime"`
ToTime int64 `json:"toTime"`
DataFormat DataFormat `json:"dataFormat"`
Parameters *ScheduledSQLParameters `json:"parameters,omitempty"`
}

func NewScheduledSQLConfiguration() *ScheduledSQLConfiguration {
return &ScheduledSQLConfiguration{
SqlType: STANDARD,
ResourcePool: DEFAULT,
FromTime: 0,
ToTime: 0,
DataFormat: LOG_TO_LOG,
}
}

type ScheduledSQLParameters struct {
TimeKey string `json:"timeKey,omitempty"`
LabelKeys string `json:"labelKeys,omitempty"`
MetricKeys string `json:"metricKeys,omitempty"`
MetricName string `json:"metricName,omitempty"`
HashLabels string `json:"hashLabels,omitempty"`
AddLabels string `json:"addLabels,omitempty"`
}

func (c *Client) CreateScheduledSQL(project string, scheduledsql *ScheduledSQL) error {
fromTime := scheduledsql.Configuration.FromTime
toTime := scheduledsql.Configuration.ToTime
timeRange := fromTime > 1451577600 && toTime > fromTime
sustained := fromTime > 1451577600 && toTime == 0
if !timeRange && !sustained {
return fmt.Errorf("invalid fromTime: %d toTime: %d, please ensure fromTime more than 1451577600", fromTime, toTime)
}
body, err := json.Marshal(scheduledsql)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}

uri := "/jobs"
r, err := c.request(project, "POST", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) DeleteScheduledSQL(project string, name string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}

uri := "/jobs/" + name
r, err := c.request(project, "DELETE", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) UpdateScheduledSQL(project string, scheduledsql *ScheduledSQL) error {
body, err := json.Marshal(scheduledsql)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}

uri := "/jobs/" + scheduledsql.Name
r, err := c.request(project, "PUT", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) GetScheduledSQL(project string, name string) (*ScheduledSQL, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := "/jobs/" + name
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
buf, _ := ioutil.ReadAll(r.Body)
scheduledSQL := &ScheduledSQL{}
if err = json.Unmarshal(buf, scheduledSQL); err != nil {
err = NewClientError(err)
}
return scheduledSQL, err
}

func (c *Client) ListScheduledSQL(project, name, displayName string, offset, size int) (scheduledsqls []*ScheduledSQL, total, count int, error error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
v := url.Values{}
v.Add("jobName", name)
if displayName != "" {
v.Add("displayName", displayName)
}
v.Add("jobType", "ScheduledSQL")
v.Add("offset", fmt.Sprintf("%d", offset))
v.Add("size", fmt.Sprintf("%d", size))

uri := "/jobs?" + v.Encode()
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, 0, 0, err
}
defer r.Body.Close()

type ScheduledSqlList struct {
Total int `json:"total"`
Count int `json:"count"`
Results []*ScheduledSQL `json:"results"`
}
buf, _ := ioutil.ReadAll(r.Body)
scheduledSqlList := &ScheduledSqlList{}
if err = json.Unmarshal(buf, scheduledSqlList); err != nil {
err = NewClientError(err)
}
return scheduledSqlList.Results, scheduledSqlList.Total, scheduledSqlList.Count, err
}
140 changes: 140 additions & 0 deletions client_scheduled_sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package sls

import (
"fmt"
"os"
"testing"
"time"
)

func TestCreate(t *testing.T) {
client := makeClient()
err := setUp(client)
if err != nil {
t.Fatalf("%v", err)
}
err = client.CreateScheduledSQL("test-scheduled-sql", getScheduleSQL("111"))
if err != nil {
t.Fatalf("%v", err)
}
}

func TestDelete(t *testing.T) {
client := makeClient()
err := client.DeleteScheduledSQL("test-scheduled-sql", "test01")
if err != nil {
t.Fatalf("%v", err)
}
}

func TestUpdate(t *testing.T) {
client := makeClient()
err := client.UpdateScheduledSQL("test-scheduled-sql", getScheduleSQL("222"))
if err != nil {
t.Fatalf("%v", err)
}
}

func TestGet(t *testing.T) {
client := makeClient()
scheduledSQL, err := client.GetScheduledSQL("test-scheduled-sql", "test01")
if err != nil {
t.Fatalf("%v", err)
}
fmt.Printf("%v\n", scheduledSQL)
}

func TestList(t *testing.T) {
client := makeClient()
scheduledSQL, total, count, err := client.ListScheduledSQL("test-scheduled-sql", "", "", 0, 10)
if err != nil {
t.Fatalf("%v", err)
}
fmt.Printf("%v\n%d\n%d\n", scheduledSQL, total, count)
}

func makeClient() *Client {
return &Client{
Endpoint: "pub-cn-hangzhou-staging-share.log.aliyuncs.com",
AccessKeyID: os.Getenv("ALICLOUD_ACCESS_KEY"),
AccessKeySecret: os.Getenv("ALICLOUD_SECRET_KEY"),
}
}

func getScheduleSQL(des string) *ScheduledSQL {
return &ScheduledSQL{
Name: "test01",
DisplayName: "dis001",
Description: des,
Status: ENABLED,
Configuration: &ScheduledSQLConfiguration{
SourceLogStore: "test-source",
DestProject: "test-schedulesql",
DestEndpoint: "cn-hangzhou-intranet.log.aliyuncs.com",
DestLogStore: "test-target",
Script: "*|SELECT COUNT(__value__)",
SqlType: SEARCH_QUERY,
ResourcePool: DEFAULT,
RoleArn: os.Getenv("ROLE_ARN"),
DestRoleArn: os.Getenv("ROLE_ARN"),
FromTimeExpr: "@m-15m",
ToTimeExpr: "@m",
MaxRunTimeInSeconds: 60,
MaxRetries: 20,
FromTime: 1621828800,
ToTime: 1623311901,
DataFormat: LOG_TO_LOG,
Parameters: nil,
},
Schedule: &Schedule{
Type: "FixedRate",
Interval: "15m",
Delay: 30,
DayOfWeek: 0,
Hour: 0,
},
CreateTime: 0,
LastModifiedTime: 0,
Type: SCHEDULED_SQL_JOB,
}
}

func setUp(c *Client) error {
if ok, err := c.CheckProjectExist("test-scheduled-sql"); err != nil {
return err
} else if ok {
err := c.DeleteProject("test-scheduled-sql")
if err != nil {
return err
}
time.Sleep(time.Second * 30)
_, err = c.CreateProject("test-scheduled-sql", "test scheduled sql")
if err != nil {
return err
} else {
time.Sleep(time.Second * 60)
}
}
err1 := c.CreateLogStore("test-scheduled-sql", "test-source", 3, 2, true, 4)
if err1 != nil {
return err1
}
err2 := c.CreateLogStore("test-scheduled-sql", "test-target", 3, 2, true, 4)
if err2 != nil {
return err2
}
err3 := c.CreateIndex("test-scheduled-sql", "test-source", Index{
Keys: map[string]IndexKey{"__labels__": {
Token: []string{",", " ", "'"},
CaseSensitive: true,
Type: "text",
DocValue: true,
Chn: true,
}},
Line: nil,
})
if err3 != nil {
return err3
}
return nil
}
Loading

0 comments on commit ca0645c

Please sign in to comment.