Skip to content

Commit

Permalink
Merge branch 'main' of github.com:cloud-barista/cm-cicada
Browse files Browse the repository at this point in the history
  • Loading branch information
ish-hcc committed Oct 14, 2024
2 parents 6a7c2ce + 2cdd9a9 commit c9c73f6
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 220 deletions.
15 changes: 3 additions & 12 deletions lib/airflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package airflow

import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -169,8 +168,7 @@ func (client *client) GetTaskInstances(dagID string, dagRunId string) (airflow.T
}()
ctx, cancel := Context()
defer cancel()
resp, http, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute()
fmt.Println("test : ", http)
resp, _, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute()
if err != nil {
logger.Println(logger.ERROR, false,
"AIRFLOW: Error occurred while getting TaskInstances. (Error: "+err.Error()+").")
Expand Down Expand Up @@ -248,8 +246,7 @@ func (client *client) ClearTaskInstance(dagID string, dagRunID string, taskID st

return logs, nil
}

func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) (airflow.EventLogCollection, error) {
func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) ([]byte, error) {
deferFunc := callDagRequestLock(dagID)
defer func() {
deferFunc()
Expand Down Expand Up @@ -300,13 +297,7 @@ func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string)
fmt.Println("Error reading response body:", err)
}

var eventlogs airflow.EventLogCollection
err = json.Unmarshal(body, &eventlogs)
if err != nil {
fmt.Println("Error unmarshal response body:", err)
}

return eventlogs, err
return body, err
}

func (client *client) GetImportErrors() (airflow.ImportErrorCollection, error) {
Expand Down
82 changes: 45 additions & 37 deletions pkg/api/rest/controller/workflow.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"encoding/json"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -1025,12 +1026,12 @@ func GetTaskInstances(c echo.Context) error {
}
startDate, err := time.Parse(layout, taskInstance.GetExecutionDate())
if err != nil {
fmt.Println("Error parsing execution date:", err)
fmt.Println("Error parsing start date:", err)
continue
}
endDate, err := time.Parse(layout, taskInstance.GetExecutionDate())
if err != nil {
fmt.Println("Error parsing execution date:", err)
fmt.Println("Error parsing end date:", err)
continue
}
taskInfo := model.TaskInstance{
Expand Down Expand Up @@ -1114,20 +1115,20 @@ func ClearTaskInstances(c echo.Context) error {
// @ID get-event-logs
// @Summary Get Eventlog
// @Description Get Eventlog.
// @Tags [Workflow]
// @Accept json
// @Produce json
// @Param wfId query string true "ID of the workflow."
// @Param wfRunId query string false "ID of the workflow run."
// @Param taskId query string false "ID of the task."
// @Success 200 {object} []model.EventLog "Successfully get the workflow."
// @Failure 400 {object} common.ErrorResponse "Sent bad request."
// @Failure 500 {object} common.ErrorResponse "Failed to get the workflow."
// @Router /eventlogs [get]
// @Tags [Workflow]
// @Accept json
// @Produce json
// @Param wfId path string true "ID of the workflow."
// @Param wfRunId query string false "ID of the workflow run."
// @Param taskId query string false "ID of the task."
// @Success 200 {object} []model.EventLog "Successfully get the workflow."
// @Failure 400 {object} common.ErrorResponse "Sent bad request."
// @Failure 500 {object} common.ErrorResponse "Failed to get the workflow."
// @Router /workflow/{wfId}/eventlogs [get]
func GetEventLogs(c echo.Context) error {
wfId := c.QueryParam("dag_id")
wfId := c.Param("wfId")
if wfId == "" {
return common.ReturnErrorMsg(c, "Please provide the dagId.")
return common.ReturnErrorMsg(c, "Please provide the wfId.")
}

var wfRunId, taskId, taskName string
Expand All @@ -1143,35 +1144,42 @@ func GetEventLogs(c echo.Context) error {
}
taskName = taskDBInfo.Name
}
var eventLogs []model.EventLog
logs, err := airflow.Client.GetEventLogs(wfId, wfRunId, taskName)
var eventLogs model.EventLogs;
logs, err := airflow.Client.GetEventLogs(wfId,wfRunId,taskName)
if err != nil {
return common.ReturnErrorMsg(c, "Failed to get the taskInstances: "+err.Error())
}

for _, log := range *logs.EventLogs {
taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(wfId, log.GetTaskId())
err = json.Unmarshal(logs, &eventLogs)
if err != nil {
return common.ReturnErrorMsg(c, "Failed to get the taskInstances: "+err.Error())
}
taskId := &taskDBInfo.ID
eventlog := model.EventLog{
WorkflowID: log.GetDagId(),
WorkflowRunID: wfRunId,
TaskID: *taskId,
TaskName: log.GetTaskId(),
Extra: log.GetExtra(),
Event: log.GetEvent(),
When: log.GetWhen(),
}
eventLogs = append(eventLogs, eventlog)
fmt.Println(err)
}
// logs, err := airflow.Client.GetEventLogs(wfId)
// if err != nil {
// return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error())
// }
var logList []model.EventLog
for _, eventlog := range eventLogs.EventLogs {
var taskID, RunId string
if eventlog.TaskID!= "" {
taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(wfId,eventlog.TaskID)
if err != nil {
return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error())
}
taskID = taskDBInfo.ID
}
eventlog.WorkflowID = wfId
if eventlog.RunID != ""{
RunId = eventlog.RunID
}

return c.JSONPretty(http.StatusOK, eventLogs, " ")
log := model.EventLog {
WorkflowID: eventlog.WorkflowID,
WorkflowRunID: RunId,
TaskID: taskID,
TaskName: eventlog.TaskID,
Extra: eventlog.Extra,
Event: eventlog.Event,
When: eventlog.When,
}
logList = append(logList,log)
}
return c.JSONPretty(http.StatusOK, logList, " ")
}

// GetImportErrors godoc
Expand Down
125 changes: 64 additions & 61 deletions pkg/api/rest/docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,66 +19,6 @@ const docTemplate = `{
"host": "{{.Host}}",
"basePath": "{{.BasePath}}",
"paths": {
"/eventlogs": {
"get": {
"description": "Get Eventlog.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"[Workflow]"
],
"summary": "Get Eventlog",
"operationId": "get-event-logs",
"parameters": [
{
"type": "string",
"description": "ID of the workflow.",
"name": "wfId",
"in": "query",
"required": true
},
{
"type": "string",
"description": "ID of the workflow run.",
"name": "wfRunId",
"in": "query"
},
{
"type": "string",
"description": "ID of the task.",
"name": "taskId",
"in": "query"
}
],
"responses": {
"200": {
"description": "Successfully get the workflow.",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog"
}
}
},
"400": {
"description": "Sent bad request.",
"schema": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse"
}
},
"500": {
"description": "Failed to get the workflow.",
"schema": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse"
}
}
}
}
},
"/importErrors": {
"get": {
"description": "Get the importErrors.",
Expand Down Expand Up @@ -807,6 +747,66 @@ const docTemplate = `{
}
}
},
"/workflow/{wfId}/eventlogs": {
"get": {
"description": "Get Eventlog.",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"tags": [
"[Workflow]"
],
"summary": "Get Eventlog",
"operationId": "get-event-logs",
"parameters": [
{
"type": "string",
"description": "ID of the workflow.",
"name": "wfId",
"in": "path",
"required": true
},
{
"type": "string",
"description": "ID of the workflow run.",
"name": "wfRunId",
"in": "query"
},
{
"type": "string",
"description": "ID of the task.",
"name": "taskId",
"in": "query"
}
],
"responses": {
"200": {
"description": "Successfully get the workflow.",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog"
}
}
},
"400": {
"description": "Sent bad request.",
"schema": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse"
}
},
"500": {
"description": "Failed to get the workflow.",
"schema": {
"$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse"
}
}
}
}
},
"/workflow/{wfId}/run": {
"post": {
"description": "Run the workflow.",
Expand Down Expand Up @@ -1722,7 +1722,7 @@ const docTemplate = `{
"extra": {
"type": "string"
},
"start_date": {
"run_id": {
"type": "string"
},
"task_id": {
Expand All @@ -1731,6 +1731,9 @@ const docTemplate = `{
"task_name": {
"type": "string"
},
"when": {
"type": "string"
},
"workflow_id": {
"type": "string"
},
Expand Down
Loading

0 comments on commit c9c73f6

Please sign in to comment.