diff --git a/lib/airflow/workflow.go b/lib/airflow/workflow.go index 9e58265..142217b 100644 --- a/lib/airflow/workflow.go +++ b/lib/airflow/workflow.go @@ -2,7 +2,6 @@ package airflow import ( "encoding/base64" - "encoding/json" "errors" "fmt" "io" @@ -169,8 +168,7 @@ func (client *client) GetTaskInstances(dagID string, dagRunId string) (airflow.T }() ctx, cancel := Context() defer cancel() - resp, http, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute() - fmt.Println("test : ", http) + resp, _, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute() if err != nil { logger.Println(logger.ERROR, false, "AIRFLOW: Error occurred while getting TaskInstances. (Error: "+err.Error()+").") @@ -248,8 +246,7 @@ func (client *client) ClearTaskInstance(dagID string, dagRunID string, taskID st return logs, nil } - -func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) (airflow.EventLogCollection, error) { +func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) ([]byte, error) { deferFunc := callDagRequestLock(dagID) defer func() { deferFunc() @@ -300,13 +297,7 @@ func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) fmt.Println("Error reading response body:", err) } - var eventlogs airflow.EventLogCollection - err = json.Unmarshal(body, &eventlogs) - if err != nil { - fmt.Println("Error unmarshal response body:", err) - } - - return eventlogs, err + return body, err } func (client *client) GetImportErrors() (airflow.ImportErrorCollection, error) { diff --git a/pkg/api/rest/controller/workflow.go b/pkg/api/rest/controller/workflow.go index 5452d95..5792393 100644 --- a/pkg/api/rest/controller/workflow.go +++ b/pkg/api/rest/controller/workflow.go @@ -1,6 +1,7 @@ package controller import ( + "encoding/json" "errors" "fmt" "net/http" @@ -1025,12 +1026,12 @@ func GetTaskInstances(c echo.Context) error { } startDate, err := time.Parse(layout, taskInstance.GetExecutionDate()) if err != nil { - fmt.Println("Error parsing execution date:", err) + fmt.Println("Error parsing start date:", err) continue } endDate, err := time.Parse(layout, taskInstance.GetExecutionDate()) if err != nil { - fmt.Println("Error parsing execution date:", err) + fmt.Println("Error parsing end date:", err) continue } taskInfo := model.TaskInstance{ @@ -1114,20 +1115,20 @@ func ClearTaskInstances(c echo.Context) error { // @ID get-event-logs // @Summary Get Eventlog // @Description Get Eventlog. -// @Tags [Workflow] -// @Accept json -// @Produce json -// @Param wfId query string true "ID of the workflow." -// @Param wfRunId query string false "ID of the workflow run." -// @Param taskId query string false "ID of the task." -// @Success 200 {object} []model.EventLog "Successfully get the workflow." -// @Failure 400 {object} common.ErrorResponse "Sent bad request." -// @Failure 500 {object} common.ErrorResponse "Failed to get the workflow." -// @Router /eventlogs [get] +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "ID of the workflow." +// @Param wfRunId query string false "ID of the workflow run." +// @Param taskId query string false "ID of the task." +// @Success 200 {object} []model.EventLog "Successfully get the workflow." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the workflow." +// @Router /workflow/{wfId}/eventlogs [get] func GetEventLogs(c echo.Context) error { - wfId := c.QueryParam("dag_id") + wfId := c.Param("wfId") if wfId == "" { - return common.ReturnErrorMsg(c, "Please provide the dagId.") + return common.ReturnErrorMsg(c, "Please provide the wfId.") } var wfRunId, taskId, taskName string @@ -1143,35 +1144,42 @@ func GetEventLogs(c echo.Context) error { } taskName = taskDBInfo.Name } - var eventLogs []model.EventLog - logs, err := airflow.Client.GetEventLogs(wfId, wfRunId, taskName) + var eventLogs model.EventLogs; + logs, err := airflow.Client.GetEventLogs(wfId,wfRunId,taskName) if err != nil { return common.ReturnErrorMsg(c, "Failed to get the taskInstances: "+err.Error()) } - - for _, log := range *logs.EventLogs { - taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(wfId, log.GetTaskId()) + err = json.Unmarshal(logs, &eventLogs) if err != nil { - return common.ReturnErrorMsg(c, "Failed to get the taskInstances: "+err.Error()) - } - taskId := &taskDBInfo.ID - eventlog := model.EventLog{ - WorkflowID: log.GetDagId(), - WorkflowRunID: wfRunId, - TaskID: *taskId, - TaskName: log.GetTaskId(), - Extra: log.GetExtra(), - Event: log.GetEvent(), - When: log.GetWhen(), - } - eventLogs = append(eventLogs, eventlog) + fmt.Println(err) } - // logs, err := airflow.Client.GetEventLogs(wfId) - // if err != nil { - // return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) - // } + var logList []model.EventLog + for _, eventlog := range eventLogs.EventLogs { + var taskID, RunId string + if eventlog.TaskID!= "" { + taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(wfId,eventlog.TaskID) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskID = taskDBInfo.ID + } + eventlog.WorkflowID = wfId + if eventlog.RunID != ""{ + RunId = eventlog.RunID + } - return c.JSONPretty(http.StatusOK, eventLogs, " ") + log := model.EventLog { + WorkflowID: eventlog.WorkflowID, + WorkflowRunID: RunId, + TaskID: taskID, + TaskName: eventlog.TaskID, + Extra: eventlog.Extra, + Event: eventlog.Event, + When: eventlog.When, + } + logList = append(logList,log) + } + return c.JSONPretty(http.StatusOK, logList, " ") } // GetImportErrors godoc diff --git a/pkg/api/rest/docs/docs.go b/pkg/api/rest/docs/docs.go index f1565b8..76c97de 100644 --- a/pkg/api/rest/docs/docs.go +++ b/pkg/api/rest/docs/docs.go @@ -19,66 +19,6 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { - "/eventlogs": { - "get": { - "description": "Get Eventlog.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "[Workflow]" - ], - "summary": "Get Eventlog", - "operationId": "get-event-logs", - "parameters": [ - { - "type": "string", - "description": "ID of the workflow.", - "name": "wfId", - "in": "query", - "required": true - }, - { - "type": "string", - "description": "ID of the workflow run.", - "name": "wfRunId", - "in": "query" - }, - { - "type": "string", - "description": "ID of the task.", - "name": "taskId", - "in": "query" - } - ], - "responses": { - "200": { - "description": "Successfully get the workflow.", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" - } - } - }, - "400": { - "description": "Sent bad request.", - "schema": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" - } - }, - "500": { - "description": "Failed to get the workflow.", - "schema": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" - } - } - } - } - }, "/importErrors": { "get": { "description": "Get the importErrors.", @@ -807,6 +747,66 @@ const docTemplate = `{ } } }, + "/workflow/{wfId}/eventlogs": { + "get": { + "description": "Get Eventlog.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Eventlog", + "operationId": "get-event-logs", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow run.", + "name": "wfRunId", + "in": "query" + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get the workflow.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflow.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/run": { "post": { "description": "Run the workflow.", @@ -1722,7 +1722,7 @@ const docTemplate = `{ "extra": { "type": "string" }, - "start_date": { + "run_id": { "type": "string" }, "task_id": { @@ -1731,6 +1731,9 @@ const docTemplate = `{ "task_name": { "type": "string" }, + "when": { + "type": "string" + }, "workflow_id": { "type": "string" }, diff --git a/pkg/api/rest/docs/swagger.json b/pkg/api/rest/docs/swagger.json index cd47d20..3313e4c 100644 --- a/pkg/api/rest/docs/swagger.json +++ b/pkg/api/rest/docs/swagger.json @@ -12,66 +12,6 @@ }, "basePath": "/cicada", "paths": { - "/eventlogs": { - "get": { - "description": "Get Eventlog.", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "tags": [ - "[Workflow]" - ], - "summary": "Get Eventlog", - "operationId": "get-event-logs", - "parameters": [ - { - "type": "string", - "description": "ID of the workflow.", - "name": "wfId", - "in": "query", - "required": true - }, - { - "type": "string", - "description": "ID of the workflow run.", - "name": "wfRunId", - "in": "query" - }, - { - "type": "string", - "description": "ID of the task.", - "name": "taskId", - "in": "query" - } - ], - "responses": { - "200": { - "description": "Successfully get the workflow.", - "schema": { - "type": "array", - "items": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" - } - } - }, - "400": { - "description": "Sent bad request.", - "schema": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" - } - }, - "500": { - "description": "Failed to get the workflow.", - "schema": { - "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" - } - } - } - } - }, "/importErrors": { "get": { "description": "Get the importErrors.", @@ -800,6 +740,66 @@ } } }, + "/workflow/{wfId}/eventlogs": { + "get": { + "description": "Get Eventlog.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Eventlog", + "operationId": "get-event-logs", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow run.", + "name": "wfRunId", + "in": "query" + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get the workflow.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflow.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/run": { "post": { "description": "Run the workflow.", @@ -1715,7 +1715,7 @@ "extra": { "type": "string" }, - "start_date": { + "run_id": { "type": "string" }, "task_id": { @@ -1724,6 +1724,9 @@ "task_name": { "type": "string" }, + "when": { + "type": "string" + }, "workflow_id": { "type": "string" }, diff --git a/pkg/api/rest/docs/swagger.yaml b/pkg/api/rest/docs/swagger.yaml index 6a19208..51ad0de 100644 --- a/pkg/api/rest/docs/swagger.yaml +++ b/pkg/api/rest/docs/swagger.yaml @@ -120,12 +120,14 @@ definitions: type: string extra: type: string - start_date: + run_id: type: string task_id: type: string task_name: type: string + when: + type: string workflow_id: type: string workflow_run_id: @@ -396,46 +398,6 @@ info: title: CM-Cicada REST API version: latest paths: - /eventlogs: - get: - consumes: - - application/json - description: Get Eventlog. - operationId: get-event-logs - parameters: - - description: ID of the workflow. - in: query - name: wfId - required: true - type: string - - description: ID of the workflow run. - in: query - name: wfRunId - type: string - - description: ID of the task. - in: query - name: taskId - type: string - produces: - - application/json - responses: - "200": - description: Successfully get the workflow. - schema: - items: - $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog' - type: array - "400": - description: Sent bad request. - schema: - $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' - "500": - description: Failed to get the workflow. - schema: - $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' - summary: Get Eventlog - tags: - - '[Workflow]' /importErrors: get: consumes: @@ -892,6 +854,46 @@ paths: summary: Update Workflow tags: - '[Workflow]' + /workflow/{wfId}/eventlogs: + get: + consumes: + - application/json + description: Get Eventlog. + operationId: get-event-logs + parameters: + - description: ID of the workflow. + in: path + name: wfId + required: true + type: string + - description: ID of the workflow run. + in: query + name: wfRunId + type: string + - description: ID of the task. + in: query + name: taskId + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the workflow. + schema: + items: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog' + type: array + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the workflow. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get Eventlog + tags: + - '[Workflow]' /workflow/{wfId}/run: post: consumes: diff --git a/pkg/api/rest/model/workflow.go b/pkg/api/rest/model/workflow.go index 1712a13..1412428 100644 --- a/pkg/api/rest/model/workflow.go +++ b/pkg/api/rest/model/workflow.go @@ -149,15 +149,20 @@ type TaskInstanceReference struct { type TaskLog struct { Content string `json:"content,omitempty"` } +type EventLogs struct { + EventLogs []EventLog `json:"event_logs"` + TotalEntries int `json:"total_entries"` +} type EventLog struct { - WorkflowRunID string `json:"workflow_run_id,omitempty"` - WorkflowID string `json:"workflow_id"` - TaskID string `json:"task_id"` - TaskName string `json:"task_name"` - Event string `json:"event,omitempty"` - When time.Time `json:"start_date,omitempty"` - Extra string `json:"extra,omitempty"` + WorkflowRunID string `json:"workflow_run_id"` + RunID string `json:"run_id,omitempty"` + WorkflowID string `json:"workflow_id"` + TaskID string `json:"task_id"` + TaskName string `json:"task_name"` + Event string `json:"event,omitempty"` + When time.Time `json:"when,omitempty"` + Extra string `json:"extra,omitempty"` } func (d Data) Value() (driver.Value, error) { diff --git a/pkg/api/rest/route/workflow.go b/pkg/api/rest/route/workflow.go index dffbcc4..7943427 100644 --- a/pkg/api/rest/route/workflow.go +++ b/pkg/api/rest/route/workflow.go @@ -32,6 +32,6 @@ func Workflow(e *echo.Echo) { e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/runs", controller.GetWorkflowRuns) e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/taskInstances", controller.GetTaskInstances) e.POST("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/task/:taskId/clear", controller.ClearTaskInstances) - e.GET("/"+strings.ToLower(common.ShortModuleName)+"/eventlogs", controller.GetEventLogs) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/eventlogs", controller.GetEventLogs) e.GET("/"+strings.ToLower(common.ShortModuleName)+"/importErrors", controller.GetImportErrors) }