Skip to content

Commit

Permalink
Merge pull request #100 from aliyun/etl_branch
Browse files Browse the repository at this point in the history
add etl open api
  • Loading branch information
tangkai1719 authored Nov 3, 2020
2 parents b2ded96 + 353a857 commit 71610a3
Show file tree
Hide file tree
Showing 2 changed files with 361 additions and 0 deletions.
213 changes: 213 additions & 0 deletions client_etl_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package sls

import (
"encoding/json"
"fmt"
"io/ioutil"
"time"
)

type ETL struct {
Configuration ETLConfiguration `json:"configuration"`
Description string `json:"description"`
DisplayName string `json:"displayName"`
Name string `json:"name"`
Schedule ETLSchedule `json:"schedule"`
Type string `json:"type"`
Status string `json:"status"`
CreateTime int32 `json:"createTime,omitempty"`
LastModifiedTime int32 `json:"lastModifiedTime,omitempty"`
}

type ETLConfiguration struct {
AccessKeyId string `json:"accessKeyId"`
AccessKeySecret string `json:"accessKeySecret"`
FromTime int64 `json:"fromTime"`
Logstore string `json:"logstore"`
Parameters map[string]string `json:"parameters"`
RoleArn string `json:"roleArn,omitempty"`
Script string `json:"script"`
ToTime int32 `json:"toTime"`
Version int8 `json:"version"`
ETLSinks []ETLSink `json:"sinks"`
}

type ETLSchedule struct {
Type string `json:"type"`
}

type ETLSink struct {
AccessKeyId string `json:"accessKeyId"`
AccessKeySecret string `json:"accessKeySecret"`
Endpoint string `json:"endpoint"`
Logstore string `json:"logstore"`
Name string `json:"name"`
Project string `json:"project"`
RoleArn string `json:"roleArn,omitempty"`
}

type ListETLResponse struct {
Total int `json:"total"`
Count int `json:"count"`
Results []*ETL `json:"results"`
}


func NewETL(endpoint, accessKeyId, accessKeySecret, logstore, name, project string) ETL {
sink := ETLSink{
AccessKeyId:accessKeyId,
AccessKeySecret:accessKeySecret,
Endpoint:endpoint,
Logstore:logstore,
Name:name,
Project:project,
}
config := ETLConfiguration {
AccessKeyId:accessKeyId,
AccessKeySecret:accessKeySecret,
FromTime: time.Now().Unix(),
Script: "e_set('new','aliyun')",
Version:2,
Logstore:logstore,
ETLSinks:[]ETLSink{sink},
Parameters: map[string]string{},

}
schedule := ETLSchedule{
Type:"Resident",
}
etljob := ETL {
Configuration:config,
DisplayName:"displayname",
Description:"go sdk case",
Name:name,
Schedule:schedule,
Type:"ETL",

}
return etljob
}



func (c *Client) CreateETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := "/jobs"

r, err := c.request(project, "POST", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) GetETL(project string, etlName string) (ETLJob *ETL, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := "/jobs/" + etlName
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
buf, _ := ioutil.ReadAll(r.Body)
etlJob := &ETL{}
if err = json.Unmarshal(buf, etlJob); err != nil {
err = NewClientError(err)
}
return etlJob, nil
}

func (c *Client) UpdateETL(project string, etljob ETL) error {
body, err := json.Marshal(etljob)
if err != nil {
return NewClientError(err)
}
h := map[string]string{
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
"Content-Type": "application/json",
}
uri := "/jobs/" + etljob.Name
r, err := c.request(project, "PUT", uri, h, body)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) DeleteETL(project string, etlName string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}
uri := "/jobs/" + etlName
r, err := c.request(project, "DELETE", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) ListETL(project string, offset int, size int) (*ListETLResponse, error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}

uri := fmt.Sprintf("/jobs?offset=%d&size=%d", offset, size)
r, err := c.request(project, "GET", uri, h, nil)
if err != nil {
return nil, err
}
defer r.Body.Close()
buf, _ := ioutil.ReadAll(r.Body)

listETLResponse := &ListETLResponse{}
if err = json.Unmarshal(buf, listETLResponse); err != nil {
err = NewClientError(err)
}
return listETLResponse, err
}

func (c *Client) StartETL(project, name string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}

uri := fmt.Sprintf("/jobs/%s?action=START", name)
r, err := c.request(project, "PUT", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}

func (c *Client) StopETL(project, name string) error {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Content-Type": "application/json",
}

uri := fmt.Sprintf("/jobs/%s?action=STOP", name)
fmt.Println(uri)
r, err := c.request(project, "PUT", uri, h, nil)
if err != nil {
return err
}
r.Body.Close()
return nil
}
148 changes: 148 additions & 0 deletions client_etl_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package sls

import (
"github.com/stretchr/testify/suite"
"os"
"testing"
"time"
)

func TestETLJobV2(t *testing.T) {
suite.Run(t, new(ETLJobTestV2Suite))
}

type ETLJobTestV2Suite struct {
suite.Suite
endpoint string
projectName string
logstoreName string
accessKeyID string
accessKeySecret string
targetLogstoreName string
etlName string
client *Client
}

func (s *ETLJobTestV2Suite) SetupTest() {
s.endpoint = os.Getenv("LOG_TEST_ENDPOINT")
s.projectName = os.Getenv("LOG_TEST_PROJECT")
s.logstoreName = os.Getenv("LOG_TEST_LOGSTORE")
s.targetLogstoreName = os.Getenv("LOG_TEST_TARGET_LOGSTORE")
s.accessKeyID = os.Getenv("LOG_TEST_ACCESS_KEY_ID")
s.accessKeySecret = os.Getenv("LOG_TEST_ACCESS_KEY_SECRET")
s.client = &Client{
AccessKeyID: s.accessKeyID,
AccessKeySecret: s.accessKeySecret,
Endpoint: s.endpoint,
}
}

func (s *ETLJobTestV2Suite) createETLJobV2() error {
sink := ETLSink{
AccessKeyId: s.accessKeyID,
AccessKeySecret: s.accessKeySecret,
Endpoint: s.endpoint,
Logstore: s.logstoreName,
Name: "aliyun-etl-test",
Project: s.projectName,
}
config := ETLConfiguration{
AccessKeyId: s.accessKeyID,
AccessKeySecret: s.accessKeySecret,
FromTime: time.Now().Unix(),
Script: "e_set('aliyun','new')",
Version: 2,
Logstore: s.logstoreName,
ETLSinks: []ETLSink{sink},
Parameters: map[string]string{},
}
schedule := ETLSchedule{
Type: "Resident",
}
etljob := ETL{
Configuration: config,
DisplayName: "displayName",
Description: "go sdk case",
Name: s.etlName,
Schedule: schedule,
Type: "ETL",
}
return s.client.CreateETL(s.projectName, etljob)
}

func (s *ETLJobTestV2Suite) TestClient_UpdateETLJobV2() {
err := s.createETLJobV2()
s.Require().Nil(err)
etljob, err := s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
etljob.DisplayName = "update"
etljob.Description = "update description"
etljob.Configuration.Script = "e_set('update','update')"
err = s.client.UpdateETL(s.projectName, *etljob)
s.Require().Nil(err)
etljob, err = s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
s.Require().Equal("update", etljob.DisplayName)
s.Require().Equal("update description", etljob.Description)
err = s.client.DeleteETL(s.projectName, s.etlName)
s.Require().Nil(err)
}

func (s *ETLJobTestV2Suite) TestClient_DeleteETLJobV2() {
err := s.createETLJobV2()
s.Require().Nil(err)
_, err = s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
err = s.client.DeleteETL(s.projectName, s.etlName)
s.Require().Nil(err)
time.Sleep(time.Second * 100)
_, err = s.client.GetETL(s.projectName, s.etlName)
s.Require().NotNil(err)

}

func (s *ETLJobTestV2Suite) TestClient_ListETLJobV2() {
err := s.createETLJobV2()
s.Require().Nil(err)
etljobList, err := s.client.ListETL(s.projectName, 0, 100)
s.Require().Nil(err)
s.Require().Equal(1, etljobList.Total)
s.Require().Equal(1, etljobList.Count)
err = s.client.DeleteETL(s.projectName, s.etlName)
s.Require().Nil(err)

}

func (s *ETLJobTestV2Suite) TestClient_StartStopETLJobV2() {
err := s.createETLJobV2()
s.Require().Nil(err)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "RUNNING" {
break
}
}

err = s.client.StopETL(s.projectName, s.etlName)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "STOPPED" {
break
}
}
err = s.client.StartETL(s.projectName, s.etlName)
for {
etljob, err := s.client.GetETL(s.projectName, s.etlName)
s.Require().Nil(err)
time.Sleep(10 * time.Second)
if etljob.Status == "RUNNING" {
break
}
}


}

0 comments on commit 71610a3

Please sign in to comment.