Skip to content

Commit

Permalink
Merge pull request #17 from yby654/develop
Browse files Browse the repository at this point in the history
api: rest: workflow api add
  • Loading branch information
ish-hcc authored Oct 7, 2024
2 parents 50049b8 + efc761c commit eb0d8a1
Show file tree
Hide file tree
Showing 9 changed files with 1,944 additions and 9 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependency: ## Get dependencies
lint: dependency ## Lint the files
@echo "Running linter..."
@if [ ! -f "${GOPATH}/bin/golangci-lint" ] && [ ! -f "$(GOROOT)/bin/golangci-lint" ]; then \
${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/[email protected].2; \
${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/[email protected].3; \
fi
@golangci-lint run --timeout 30m -E contextcheck -D unused

Expand Down
203 changes: 201 additions & 2 deletions lib/airflow/workflow.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package airflow

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io/ioutil"

Check failure on line 8 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build and deploy (1.23.0, ubuntu-22.04)

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
"net/http"
"net/url"
"sync"

"github.com/apache/airflow-client-go/airflow"
"github.com/cloud-barista/cm-cicada/lib/config"
"github.com/cloud-barista/cm-cicada/pkg/api/rest/model"
"github.com/jollaman999/utils/fileutil"
"github.com/jollaman999/utils/logger"
"sync"
)

var dagRequests = make(map[string]*sync.Mutex)
Expand Down Expand Up @@ -71,7 +78,6 @@ func (client *client) GetDAGs() (airflow.DAGCollection, error) {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting DAGs. (Error: "+err.Error()+").")
}

return resp, err
}

Expand Down Expand Up @@ -141,3 +147,196 @@ func (client *client) DeleteDAG(dagID string, deleteFolderOnly bool) error {

return err
}
func (client *client) GetDAGRuns(dagID string) (airflow.DAGRunCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()
resp, _, err := client.api.DAGRunApi.GetDagRuns(ctx, dagID).Execute()
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting DAGRuns. (Error: "+err.Error()+").")
}
return resp, err
}

func (client *client) GetTaskInstances(dagID string, dagRunId string) (airflow.TaskInstanceCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()
resp, http, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute()
fmt.Println("test : ", http)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting TaskInstances. (Error: "+err.Error()+").")
}
return resp, err
}

func (client *client) GetTaskLogs(dagID, dagRunID, taskID string, taskTryNumber int) (airflow.InlineResponse200, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs, _, err := client.api.TaskInstanceApi.GetLog(ctx, dagID, dagRunID, taskID, int32(taskTryNumber)).FullContent(true).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting TaskInstance logs. (Error: "+err.Error()+").")
}

return logs, nil
}

func (client *client) ClearTaskInstance(dagID string, dagRunID string, taskID string) (airflow.TaskInstanceReferenceCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

dryRun := false
taskIds := []string{taskID}
includeDownstream := true
includeFuture := false
includeParentdag := false
includePast := false
includeSubdags := true
includeUpstream := false
onlyFailed := false
onlyRunning := false
resetDagRuns := true

clearTask := airflow.ClearTaskInstances{
DryRun: &dryRun,
TaskIds: &taskIds,
IncludeSubdags: &includeSubdags,
IncludeParentdag: &includeParentdag,
IncludeUpstream: &includeUpstream,
IncludeDownstream: &includeDownstream,
IncludeFuture: &includeFuture,
IncludePast: &includePast,
OnlyFailed: &onlyFailed,
OnlyRunning: &onlyRunning,
ResetDagRuns: &resetDagRuns,
DagRunId: *airflow.NewNullableString(&dagRunID),
}

// 요청 생성
request := client.api.DAGApi.PostClearTaskInstances(ctx, dagID)

// ClearTaskInstances 데이터 설정
request = request.ClearTaskInstances(clearTask)

// 요청 실행
logs, _, err := client.api.DAGApi.PostClearTaskInstancesExecute(request)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while clearing TaskInstance. (Error: " + err.Error() + ").")
return airflow.TaskInstanceReferenceCollection{}, err
}

return logs, nil
}

func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) (airflow.EventLogCollection, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
}()
ctx, cancel := Context()
defer cancel()

localBasePath, err:=client.api.GetConfig().ServerURLWithContext(ctx, "EventLogApiService.GetEventLog")

Check failure on line 260 in lib/airflow/workflow.go

View workflow job for this annotation

GitHub Actions / Build and deploy (1.23.0, ubuntu-22.04)

ineffectual assignment to err (ineffassign)
baseURL := "http://"+client.api.GetConfig().Host+localBasePath + "/eventLogs"
queryParams := map[string]string{
"offset": "0",
"limit": "100",
"dag_id": dagID,
"run_id": dagRunId,
"task_id": taskId,
"order_by": "-when",
"excluded_events": "gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data",
}
query := url.Values{}
for key, value := range queryParams {
query.Add(key, value)
}
queryString := query.Encode()
fullURL := fmt.Sprintf("%s?%s", baseURL, queryString)
httpclient := client.api.GetConfig().HTTPClient
// fmt.Println(&httpclient.)

// 요청 생성
req, err := http.NewRequest("GET", fullURL, nil)
if err != nil {
fmt.Println("Error creating request:", err)
}
cred := ctx.Value(airflow.ContextBasicAuth).(airflow.BasicAuth)
addBasicAuth(req, cred.UserName, cred.Password)
res, err := httpclient.Do(req)
if err != nil {
fmt.Println("Error sending request:", err)
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
}

var eventlogs airflow.EventLogCollection
err = json.Unmarshal(body, &eventlogs)
if err != nil {
fmt.Println("Error unmarshal response body:", err)
}


return eventlogs, err
}

func (client *client) GetImportErrors() (airflow.ImportErrorCollection, error) {
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs,_,err := client.api.ImportErrorApi.GetImportErrors(ctx).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").")
}

return logs, nil
}


func (client *client) PatchDag(dagID string, dagBody airflow.DAG) (airflow.DAG, error){
ctx, cancel := Context()
defer cancel()

// TaskInstanceApi 인스턴스를 사용하여 로그 요청
logs,_,err := client.api.DAGApi.PatchDag(ctx, dagID).DAG(dagBody).Execute()
logger.Println(logger.INFO, false,logs)
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").")
}

return logs, nil
}

func addBasicAuth(req *http.Request, username, password string) {
auth := username + ":" + password
encodedAuth := base64.StdEncoding.EncodeToString([]byte(auth))
req.Header.Add("Authorization", "Basic "+encodedAuth)
}
22 changes: 22 additions & 0 deletions pkg/api/rest/common/url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package common

import (
"fmt"
"net/url"
)

func UrlDecode(text string) (string) {
decodedStr, err := url.QueryUnescape(text)
if err != nil {
fmt.Println("Error decoding URL:", err)
return err.Error()
} else {
return decodedStr
}
}

func UrlEncode(text string) (string) {
encodedStr := url.QueryEscape(text)

return encodedStr
}
Loading

0 comments on commit eb0d8a1

Please sign in to comment.