Skip to content

Commit

Permalink
feat(circleci-plugin): incremental data collection (#7986)
Browse files Browse the repository at this point in the history
* feat(api_collector_stateful): handle case were response has records from both before & after createdAfter of last collection

* feat(circleci-plugin): incremental collection for pipelines

* feat(api_collector_stateful): expose Input collector arg for StatefulFinalizableEntity to collect data based on previous collection

* feat(circleci-plugin): incremental data collection for workflows

* feat(circleci-plugin): incremental data collection for jobs

* refactor(circleci-plugin): use common query param function

* refactor(circleci-plugin): use BuildQueryParamsWithPageToken func when collecting unfinished job details
  • Loading branch information
Nickcw6 authored Oct 21, 2024
1 parent 49d9ffc commit 98bc98b
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 79 deletions.
38 changes: 34 additions & 4 deletions backend/helpers/pluginhelper/api/api_collector_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,19 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg
createdAfter := manager.CollectorStateManager.GetSince()
isIncremental := manager.CollectorStateManager.IsIncremental()

var inputIterator Iterator
if args.CollectNewRecordsByList.BuildInputIterator != nil {
inputIterator, err = args.CollectNewRecordsByList.BuildInputIterator(isIncremental, createdAfter)
if err != nil {
return nil, err
}
}

// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
ApiClient: args.ApiClient,
// common
Input: inputIterator,
Incremental: isIncremental,
UrlTemplate: args.CollectNewRecordsByList.UrlTemplate,
Query: func(reqData *RequestData) (url.Values, errors.Error) {
Expand All @@ -169,21 +178,41 @@ func NewStatefulApiCollectorForFinalizableEntity(args FinalizableApiCollectorArg

// time filter or diff sync
if createdAfter != nil && args.CollectNewRecordsByList.GetCreated != nil {
// if the first record of the page was created before createdAfter, return emtpy set and stop
// if the first record of the page was created before createdAfter and not a zero value, return empty set and stop
firstCreated, err := args.CollectNewRecordsByList.GetCreated(items[0])
if err != nil {
return nil, err
}
if firstCreated.Before(*createdAfter) {
if firstCreated.Before(*createdAfter) && !firstCreated.IsZero() {
return nil, ErrFinishCollect
}
// if the last record was created before createdAfter, return records and stop

// If last record was created before CreatedAfter, including a zero value, check each record individually
lastCreated, err := args.CollectNewRecordsByList.GetCreated(items[len(items)-1])
if err != nil {
return nil, err
}
if lastCreated.Before(*createdAfter) {
return items, ErrFinishCollect
var validItems []json.RawMessage
// Only collect items that were created after the last successful collection to prevent duplicates
for _, item := range items {
itemCreatedAt, err := args.CollectNewRecordsByList.GetCreated(item)
if err != nil {
return nil, err
}

if itemCreatedAt.IsZero() {
// If zero then timestamp is null on the response - accept as valid for downstream processing
validItems = append(validItems, item)
continue
}

if itemCreatedAt.Before(*createdAfter) {
// Once we reach an item that was created before the last successful collection, stop & return
return validItems, ErrFinishCollect
}
validItems = append(validItems, item)
}
}
}
return items, err
Expand Down Expand Up @@ -267,6 +296,7 @@ type FinalizableApiCollectorListArgs struct {
Concurrency int // required for Undetermined Strategy, number of concurrent requests
GetNextPageCustomData func(prevReqData *RequestData, prevPageResponse *http.Response) (interface{}, errors.Error) // required for Sequential Strategy, to extract the next page cursor from the given response
GetTotalPages func(res *http.Response, args *ApiCollectorArgs) (int, errors.Error) // required for Determined Strategy, to extract the total number of pages from the given response
BuildInputIterator func(isIncremental bool, createdAfter *time.Time) (Iterator, errors.Error)
}

// FinalizableApiCollectorDetailArgs is the arguments for the detail collector
Expand Down
85 changes: 62 additions & 23 deletions backend/plugins/circleci/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ limitations under the License.
package tasks

import (
"encoding/json"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand All @@ -44,31 +46,68 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect jobs")

clauses := []dal.Clause{
dal.Select("id, pipeline_id"),
dal.From(&models.CircleciWorkflow{}),
dal.Where("_tool_circleci_workflows.connection_id = ? and _tool_circleci_workflows.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
}
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id, pipeline_id"), // pipeline_id not on individual job response but required for result
dal.From(&models.CircleciWorkflow{}),
dal.Where("connection_id = ? and project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
if err != nil {
return err
}
if isIncremental {
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
},
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
},
GetCreated: func(item json.RawMessage) (time.Time, errors.Error) {
var job struct { // Individual job response lacks created_at field, so have to use started_at
CreatedAt time.Time `json:"started_at"` // This will be null in some cases (e.g. queued, not_running, blocked)
}
if err := json.Unmarshal(item, &job); err != nil {
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal job")
}
return job.CreatedAt, nil
},
},
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job", // The individual job endpoint has different fields so need to recollect all jobs for a workflow
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds,
},
BuildInputIterator: func() (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("DISTINCT workflow_id"), // Only need to recollect jobs for a workflow once
dal.From(&models.CircleciJob{}),
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'not_running', 'queued', 'on_hold')", data.Options.ConnectionId, data.Options.ProjectSlug),
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/workflow/{{ .Input.Id }}/job",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a job has been deleted
db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciJob{}))
},
},
})
if err != nil {
logger.Error(err, "collect jobs error")
Expand Down
58 changes: 30 additions & 28 deletions backend/plugins/circleci/tasks/pipeline_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package tasks
import (
"encoding/json"
"net/http"
"time"

"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
Expand All @@ -44,35 +43,38 @@ func CollectPipelines(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
timeAfter := rawDataSubTaskArgs.Ctx.TaskContext().SyncPolicy().TimeAfter
logger.Info("collect pipelines")
collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/project/{{ .Params.ProjectSlug }}/pipeline",
Query: BuildQueryParamsWithPageToken,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
data := CircleciPageTokenResp[[]json.RawMessage]{}
err := api.UnmarshalResponse(res, &data)

if err != nil {
return nil, err
}
filteredItems := []json.RawMessage{}
for _, item := range data.Items {
var pipeline struct {
CreatedAt time.Time `json:"created_at"`
}
if err := json.Unmarshal(item, &pipeline); err != nil {
return nil, errors.Default.Wrap(err, "failed to unmarshal pipeline item")
}
if pipeline.CreatedAt.Before(*timeAfter) {
return filteredItems, api.ErrFinishCollect
}
filteredItems = append(filteredItems, item)
if err != nil {
return nil, err
}
filteredItems := []json.RawMessage{}
for _, item := range data.Items {
pipelineCreatedAt, err := extractCreatedAt(item)

}
return filteredItems, nil
if err != nil {
return nil, err
}
if pipelineCreatedAt.Before(*timeAfter) {
return filteredItems, api.ErrFinishCollect
}
filteredItems = append(filteredItems, item)
}
return filteredItems, nil
},
},
GetCreated: extractCreatedAt,
},
})
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion backend/plugins/circleci/tasks/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"net/http"
"net/url"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand Down Expand Up @@ -108,7 +109,7 @@ func ExtractNextPageToken(prevReqData *api.RequestData, prevPageResponse *http.R
return res.NextPageToken, nil
}

func BuildQueryParamsWithPageToken(reqData *api.RequestData) (url.Values, errors.Error) {
func BuildQueryParamsWithPageToken(reqData *api.RequestData, _ *time.Time) (url.Values, errors.Error) {
query := url.Values{}
if pageToken, ok := reqData.CustomData.(string); ok && pageToken != "" {
query.Set("page-token", pageToken)
Expand All @@ -130,3 +131,13 @@ func ignoreDeletedBuilds(res *http.Response) errors.Error {
}
return nil
}

func extractCreatedAt(item json.RawMessage) (time.Time, errors.Error) {
var entity struct {
CreatedAt time.Time `json:"created_at"`
}
if err := json.Unmarshal(item, &entity); err != nil {
return time.Time{}, errors.Default.Wrap(err, "failed to unmarshal item")
}
return entity.CreatedAt, nil
}
82 changes: 59 additions & 23 deletions backend/plugins/circleci/tasks/workflow_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ limitations under the License.
package tasks

import (
"encoding/json"
"net/http"
"reflect"
"time"

"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
Expand All @@ -44,31 +47,64 @@ func CollectWorkflows(taskCtx plugin.SubTaskContext) errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect workflows")

clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciPipeline{}),
dal.Where("_tool_circleci_pipelines.connection_id = ? and _tool_circleci_pipelines.project_slug = ? ", data.Options.ConnectionId, data.Options.ProjectSlug),
}
collector, err := api.NewStatefulApiCollectorForFinalizableEntity(api.FinalizableApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
CollectNewRecordsByList: api.FinalizableApiCollectorListArgs{
PageSize: int(data.Options.PageSize),
GetNextPageCustomData: ExtractNextPageToken,
BuildInputIterator: func(isIncremental bool, createdAfter *time.Time) (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciPipeline{}),
dal.Where("connection_id = ? AND project_slug = ?", data.Options.ConnectionId, data.Options.ProjectSlug),
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
iterator, err := api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
if err != nil {
return err
}
if isIncremental {
clauses = append(clauses, dal.Where("created_date > ?", createdAfter))
}

db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciPipeline{}))
},
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
},
GetCreated: extractCreatedAt,
},
CollectUnfinishedDetails: &api.FinalizableApiCollectorDetailArgs{
FinalizableApiCollectorCommonArgs: api.FinalizableApiCollectorCommonArgs{
UrlTemplate: "/v2/workflow/{{ .Input.Id }}",
Query: nil,
ResponseParser: func(res *http.Response) ([]json.RawMessage, errors.Error) {
var data json.RawMessage
err := api.UnmarshalResponse(res, &data)
return []json.RawMessage{data}, err
},
AfterResponse: ignoreDeletedBuilds,
},
BuildInputIterator: func() (api.Iterator, errors.Error) {
clauses := []dal.Clause{
dal.Select("id"),
dal.From(&models.CircleciWorkflow{}),
dal.Where("connection_id = ? AND project_slug = ? AND status IN ('running', 'on_hold', 'failing')", data.Options.ConnectionId, data.Options.ProjectSlug),
}

collector, err := api.NewApiCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
UrlTemplate: "/v2/pipeline/{{ .Input.Id }}/workflow",
Input: iterator,
GetNextPageCustomData: ExtractNextPageToken,
Query: BuildQueryParamsWithPageToken,
ResponseParser: ParseCircleciPageTokenResp,
AfterResponse: ignoreDeletedBuilds, // Ignore the 404 response if a workflow has been deleted
db := taskCtx.GetDal()
cursor, err := db.Cursor(clauses...)
if err != nil {
return nil, err
}
return api.NewDalCursorIterator(db, cursor, reflect.TypeOf(models.CircleciWorkflow{}))
},
},
})
if err != nil {
logger.Error(err, "collect workflows error")
Expand Down

0 comments on commit 98bc98b

Please sign in to comment.