Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

fix: Executor fixes #265

Merged
merged 6 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 24 additions & 37 deletions provider/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type TableExecutor struct {

// NewTableExecutor creates a new TableExecutor for given schema.Table
func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, table *schema.Table, extraFields, metadata map[string]interface{}, classifier ErrorClassifier, goroutinesSem *semaphore.Weighted, timeout time.Duration) TableExecutor {

var classifiers = []ErrorClassifier{defaultErrorClassifier}
if classifier != nil {
classifiers = append([]ErrorClassifier{classifier}, classifiers...)
Expand All @@ -77,50 +76,34 @@ func NewTableExecutor(resourceName string, db Storage, logger hclog.Logger, tabl

// Resolve is the root function of table executor which starts an execution of a Table resolving it, and it's relations.
func (e TableExecutor) Resolve(ctx context.Context, meta schema.ClientMeta) (uint64, diag.Diagnostics) {
var clients []schema.ClientMeta

if e.Table.Multiplex != nil {
if clients := e.Table.Multiplex(meta); len(clients) > 0 {
return e.doMultiplexResolve(ctx, clients)
}
clients = e.Table.Multiplex(meta)
}

if e.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, e.timeout)
defer cancel()
if len(clients) == 0 {
clients = append(clients, meta)
}
return e.withLogger(append(meta.Logger().ImpliedArgs(), "multiplex", false)...).callTableResolve(ctx, meta, nil)

return e.doMultiplexResolve(ctx, clients)
}

// withTable allows to create a new TableExecutor for received *schema.Table
func (e TableExecutor) withTable(t *schema.Table, kv ...interface{}) *TableExecutor {
var c [2]schema.ColumnList
c[0], c[1] = e.Db.Dialect().Columns(t).Sift()
return &TableExecutor{
ResourceName: e.ResourceName,
Table: t,
Db: e.Db,
Logger: e.Logger.With(kv...),
classifiers: e.classifiers,
extraFields: e.extraFields,
executionStart: e.executionStart,
columns: c,
goroutinesSem: e.goroutinesSem,
}
cpy := e
cpy.Table = t
cpy.Logger = cpy.Logger.With(kv...)
cpy.columns = c

return &cpy
}

func (e TableExecutor) withLogger(kv ...interface{}) *TableExecutor {
return &TableExecutor{
ResourceName: e.ResourceName,
Table: e.Table,
Db: e.Db,
Logger: e.Logger.With(kv...),
classifiers: e.classifiers,
extraFields: e.extraFields,
executionStart: e.executionStart,
columns: e.columns,
goroutinesSem: e.goroutinesSem,
}

cpy := e
cpy.Logger = cpy.Logger.With(kv...)
return &cpy
}

// doMultiplexResolve resolves table with multiplexed clients appending all diagnostics returned from each multiplex.
Expand Down Expand Up @@ -286,9 +269,13 @@ func (e TableExecutor) callTableResolve(ctx context.Context, client schema.Clien
if parent == nil {
e.Logger.Info("fetched successfully", "count", nc)
}
if err := e.cleanupStaleData(ctx, client, parent); err != nil {
return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name)))

if !diags.HasErrors() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a strong gut feeling we should avoid this change for now:

  • Even a single column (e.g. in a child relation) that failed to be resolved can cause "error" diags, right? So a single column with a null value in any child relation will cause failure of cleanup .

Seems dangerous in any case. I'd really avoid doing this unless we have a specific reason to do it (e.g. customers complining, sentry errors, ....).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the column resolver returns error diags, it doesn't continue to complete all other tasks so (if there are any) the new rows are incomplete anyway. no postresourceresolver called, no internal resolvers (cq_id!) called. https://github.com/cloudquery/cq-provider-sdk/blob/main/provider/execution/execution.go#L367-L370

not sure if it will lead to more rows or not on edge cases.

if err := e.cleanupStaleData(ctx, client, parent); err != nil {
return nc, diags.Add(fromError(err, diag.WithType(diag.DATABASE), diag.WithSummary("failed to cleanup stale data on table %q", e.Table.Name)))
}
}

return nc, diags
}

Expand All @@ -299,8 +286,8 @@ func (e TableExecutor) resolveResources(ctx context.Context, meta schema.ClientM
diags diag.Diagnostics
)

for _, o := range objects {
resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, o, e.metadata, e.executionStart)
for i := range objects {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems unnecessary, no?

Copy link
Member Author

@disq disq May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the interface{} isn't a pointer then this change is necessary (and more future proof) due to the way for for range works in go.

resource := schema.NewResourceData(e.Db.Dialect(), e.Table, parent, objects[i], e.metadata, e.executionStart)
// Before inserting resolve all table column resolvers
resolveDiags := e.resolveResourceValues(ctx, meta, resource)
diags = diags.Add(resolveDiags)
Expand Down
4 changes: 3 additions & 1 deletion provider/schema/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Meta struct {
FetchId string `json:"fetch_id,omitempty"`
}

const FetchIdMetaKey = "cq_fetch_id"

var (
cqMeta = Column{
Name: "cq_meta",
Expand All @@ -31,7 +33,7 @@ var (
mi := Meta{
LastUpdate: time.Now().UTC(),
}
if val, ok := resource.GetMeta("cq_fetch_id"); ok {
if val, ok := resource.GetMeta(FetchIdMetaKey); ok {
if s, ok := val.(string); ok {
mi.FetchId = s
}
Expand Down