Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: rest: workflow api add #17

Merged
merged 17 commits into from
Oct 7, 2024
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependency: ## Get dependencies
lint: dependency ## Lint the files
@echo "Running linter..."
@if [ ! -f "${GOPATH}/bin/golangci-lint" ] && [ ! -f "$(GOROOT)/bin/golangci-lint" ]; then \
${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/[email protected].2; \
${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/[email protected].3; \
fi
@golangci-lint run --timeout 30m -E contextcheck -D unused

Expand Down
203 changes: 201 additions & 2 deletions lib/airflow/workflow.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package airflow

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"

Check failure on line 8 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build and deploy (1.23.0, ubuntu-22.04)

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)

Check failure on line 8 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build source code (1.23.0, ubuntu-22.04)

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
"net/http"
"net/url"
"sync"

"github.com/apache/airflow-client-go/airflow"
"github.com/cloud-barista/cm-cicada/lib/config"
"github.com/cloud-barista/cm-cicada/pkg/api/rest/model"
"github.com/jollaman999/utils/fileutil"
"github.com/jollaman999/utils/logger"
"sync"
)

var dagRequests = make(map[string]*sync.Mutex)
Expand Down Expand Up @@ -71,7 +78,6 @@
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting DAGs. (Error: "+err.Error()+").")
}

return resp, err
}

Expand Down Expand Up @@ -141,3 +147,196 @@

return err
}
func (client *client) GetDAGRuns(dagID string) (airflow.DAGRunCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()
resp, _, err := client.api.DAGRunApi.GetDagRuns(ctx, dagID).Execute()
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting DAGRuns. (Error: "+err.Error()+").")
}
return resp, err
}

func (client *client) GetTaskInstances(dagID string, dagRunId string) (airflow.TaskInstanceCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()
resp, http, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute()
fmt.Println("test : ", http)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting TaskInstances. (Error: "+err.Error()+").")
}
return resp, err
}

func (client *client) GetTaskLogs(dagID, dagRunID, taskID string, taskTryNumber int) (airflow.InlineResponse200, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs, _, err := client.api.TaskInstanceApi.GetLog(ctx, dagID, dagRunID, taskID, int32(taskTryNumber)).FullContent(true).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting TaskInstance logs. (Error: "+err.Error()+").")
}

return logs, nil
}

func (client *client) ClearTaskInstance(dagID string, dagRunID string, taskID string) (airflow.TaskInstanceReferenceCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

dryRun := false
taskIds := []string{taskID}
includeDownstream := true
includeFuture := false
includeParentdag := false
includePast := false
includeSubdags := true
includeUpstream := false
onlyFailed := false
onlyRunning := false
resetDagRuns := true

clearTask := airflow.ClearTaskInstances{
DryRun: &dryRun,
TaskIds: &taskIds,
IncludeSubdags: &includeSubdags,
IncludeParentdag: &includeParentdag,
IncludeUpstream: &includeUpstream,
IncludeDownstream: &includeDownstream,
IncludeFuture: &includeFuture,
IncludePast: &includePast,
OnlyFailed: &onlyFailed,
OnlyRunning: &onlyRunning,
ResetDagRuns: &resetDagRuns,
DagRunId: *airflow.NewNullableString(&dagRunID),
}

// 요청 생성
request := client.api.DAGApi.PostClearTaskInstances(ctx, dagID)

// ClearTaskInstances 데이터 설정
request = request.ClearTaskInstances(clearTask)

// 요청 실행
logs, _, err := client.api.DAGApi.PostClearTaskInstancesExecute(request)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while clearing TaskInstance. (Error: " + err.Error() + ").")
return airflow.TaskInstanceReferenceCollection{}, err
}

return logs, nil
}

func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) (airflow.EventLogCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

localBasePath, err:=client.api.GetConfig().ServerURLWithContext(ctx, "EventLogApiService.GetEventLog")

Check failure on line 260 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build and deploy (1.23.0, ubuntu-22.04)

ineffectual assignment to err (ineffassign)

Check failure on line 260 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build source code (1.23.0, ubuntu-22.04)

ineffectual assignment to err (ineffassign)
baseURL := "http://"+client.api.GetConfig().Host+localBasePath + "/eventLogs"
queryParams := map[string]string{
"offset": "0",
"limit": "100",
"dag_id": dagID,
"run_id": dagRunId,
"task_id": taskId,
"order_by": "-when",
"excluded_events": "gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data",
}
query := url.Values{}
for key, value := range queryParams {
query.Add(key, value)
}
queryString := query.Encode()
fullURL := fmt.Sprintf("%s?%s", baseURL, queryString)
httpclient := client.api.GetConfig().HTTPClient
// fmt.Println(&httpclient.)

// 요청 생성
req, err := http.NewRequest("GET", fullURL, nil)
if err != nil {
fmt.Println("Error creating request:", err)
}
cred := ctx.Value(airflow.ContextBasicAuth).(airflow.BasicAuth)
addBasicAuth(req, cred.UserName, cred.Password)
res, err := httpclient.Do(req)
if err != nil {
fmt.Println("Error sending request:", err)
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
}

var eventlogs airflow.EventLogCollection
err = json.Unmarshal(body, &eventlogs)
if err != nil {
fmt.Println("Error unmarshal response body:", err)
}


return eventlogs, err
}

func (client *client) GetImportErrors() (airflow.ImportErrorCollection, error) {
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs,_,err := client.api.ImportErrorApi.GetImportErrors(ctx).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").")
}

return logs, nil
}


func (client *client) PatchDag(dagID string, dagBody airflow.DAG) (airflow.DAG, error){
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs,_,err := client.api.DAGApi.PatchDag(ctx, dagID).DAG(dagBody).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").")
}

return logs, nil
}

func addBasicAuth(req *http.Request, username, password string) {
auth := username + ":" + password
encodedAuth := base64.StdEncoding.EncodeToString([]byte(auth))
req.Header.Add("Authorization", "Basic "+encodedAuth)
}
22 changes: 22 additions & 0 deletions pkg/api/rest/common/url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"fmt"
"net/url"
)

func UrlDecode(text string) (string) {
decodedStr, err := url.QueryUnescape(text)
if err != nil {
fmt.Println("Error decoding URL:", err)
return err.Error()
} else {
return decodedStr
}
}

func UrlEncode(text string) (string) {
encodedStr := url.QueryEscape(text)

return encodedStr
}
Loading
Loading