Skip to content

Commit

Permalink
[Connectors API] Fix bug when creating a sync job via API (#104802) (#…
Browse files Browse the repository at this point in the history
…104824)

(cherry picked from commit a107107)
  • Loading branch information
jedrazb authored Jan 26, 2024
1 parent c3ec0f9 commit 4aea5fc
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 379 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/104802.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 104802
summary: "[Connectors API] Fix bug when triggering a sync job via API"
area: Application
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ setup:
connector_sync_job.get:
connector_sync_job_id: $id

- match: { connector.id: test-connector}
- match: { connector.id: test-connector }
- match: { job_type: full }
- match: { trigger_method: on_demand }
- match: { status: pending }
Expand All @@ -39,6 +39,106 @@ setup:
- exists: created_at
- exists: last_seen

---
'Create connector sync job with filtering':
- do:
connector.update_filtering:
connector_id: test-connector
body:
filtering:
- active:
advanced_snippet:
created_at: "2023-05-25T12:30:00.000Z"
updated_at: "2023-05-25T12:30:00.000Z"
value: { }
rules:
- created_at: "2023-05-25T12:30:00.000Z"
field: _
id: RULE-ACTIVE-SYNC-JOB-TEST
order: 0
policy: include
rule: regex
updated_at: "2023-05-25T12:30:00.000Z"
value: ".*"
validation:
errors: [ ]
state: valid
domain: DEFAULT
draft:
advanced_snippet:
created_at: "2023-05-25T12:30:00.000Z"
updated_at: "2023-05-25T12:30:00.000Z"
value: { }
rules:
- created_at: "2023-05-25T12:30:00.000Z"
field: _
id: RULE-DRAFT-0
order: 0
policy: include
rule: regex
updated_at: "2023-05-25T12:30:00.000Z"
value: ".*"
validation:
errors: [ ]
state: valid
- active:
advanced_snippet:
created_at: "2021-05-25T12:30:00.000Z"
updated_at: "2021-05-25T12:30:00.000Z"
value: { }
rules:
- created_at: "2021-05-25T12:30:00.000Z"
field: _
id: RULE-ACTIVE-1
order: 0
policy: include
rule: regex
updated_at: "2021-05-25T12:30:00.000Z"
value: ".*"
validation:
errors: [ ]
state: valid
domain: TEST
draft:
advanced_snippet:
created_at: "2021-05-25T12:30:00.000Z"
updated_at: "2021-05-25T12:30:00.000Z"
value: { }
rules:
- created_at: "2021-05-25T12:30:00.000Z"
field: _
id: RULE-DRAFT-1
order: 0
policy: exclude
rule: regex
updated_at: "2021-05-25T12:30:00.000Z"
value: ".*"
validation:
errors: [ ]
state: valid

- match: { result: updated }

- do:
connector_sync_job.post:
body:
id: test-connector
job_type: full
trigger_method: on_demand

- set: { id: id }

- match: { id: $id }

- do:
connector_sync_job.get:
connector_sync_job_id: $id

- match: { connector.filtering.rules.0.id: RULE-ACTIVE-SYNC-JOB-TEST }
- match: { connector.filtering.rules.0.rule: regex }
- match: { connector.filtering.validation.state: valid }
- match: { connector.filtering.advanced_snippet.created_at: "2023-05-25T12:30:00.000Z" }

---
'Create connector sync job with complex connector document':

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -47,6 +48,7 @@
* <li>An error string capturing the latest error encountered during the connector's operation, if any.</li>
* <li>A {@link ConnectorFeatures} object encapsulating the set of features enabled for this connector.</li>
* <li>A list of {@link ConnectorFiltering} objects for applying filtering rules to the data processed by the connector.</li>
* <li>An optional {@link FilteringRules} object that represents active filtering rules applied to a sync job.</li>
* <li>The name of the Elasticsearch index where the synchronized data is stored or managed.</li>
* <li>A boolean flag 'isNative' indicating whether the connector is a native Elasticsearch connector.</li>
* <li>The language associated with the connector.</li>
Expand Down Expand Up @@ -79,6 +81,8 @@ public class Connector implements NamedWriteable, ToXContentObject {
private final ConnectorFeatures features;
private final List<ConnectorFiltering> filtering;
@Nullable
private final FilteringRules syncJobFiltering;
@Nullable
private final String indexName;
private final boolean isNative;
@Nullable
Expand Down Expand Up @@ -110,6 +114,7 @@ public class Connector implements NamedWriteable, ToXContentObject {
* @param error Information about the last error encountered by the connector, if any.
* @param features Features enabled for the connector.
* @param filtering Filtering settings applied by the connector.
* @param syncJobFiltering Filtering settings used by a sync job, it contains subset of data from 'filtering'.
* @param indexName Name of the index associated with the connector.
* @param isNative Flag indicating whether the connector is a native type.
* @param language The language supported by the connector.
Expand All @@ -132,6 +137,7 @@ private Connector(
String error,
ConnectorFeatures features,
List<ConnectorFiltering> filtering,
FilteringRules syncJobFiltering,
String indexName,
boolean isNative,
String language,
Expand All @@ -152,7 +158,8 @@ private Connector(
this.description = description;
this.error = error;
this.features = features;
this.filtering = Objects.requireNonNull(filtering, "[filtering] cannot be null");
this.filtering = filtering;
this.syncJobFiltering = syncJobFiltering;
this.indexName = indexName;
this.isNative = isNative;
this.language = language;
Expand All @@ -176,6 +183,7 @@ public Connector(StreamInput in) throws IOException {
this.error = in.readOptionalString();
this.features = in.readOptionalWriteable(ConnectorFeatures::new);
this.filtering = in.readOptionalCollectionAsList(ConnectorFiltering::new);
this.syncJobFiltering = in.readOptionalWriteable(FilteringRules::new);
this.indexName = in.readOptionalString();
this.isNative = in.readBoolean();
this.language = in.readOptionalString();
Expand Down Expand Up @@ -388,6 +396,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(error);
out.writeOptionalWriteable(features);
out.writeOptionalCollection(filtering);
out.writeOptionalWriteable(syncJobFiltering);
out.writeOptionalString(indexName);
out.writeBoolean(isNative);
out.writeOptionalString(language);
Expand Down Expand Up @@ -434,6 +443,10 @@ public List<ConnectorFiltering> getFiltering() {
return filtering;
}

public FilteringRules getSyncJobFiltering() {
return syncJobFiltering;
}

public String getIndexName() {
return indexName;
}
Expand Down Expand Up @@ -497,6 +510,7 @@ public boolean equals(Object o) {
&& Objects.equals(error, connector.error)
&& Objects.equals(features, connector.features)
&& Objects.equals(filtering, connector.filtering)
&& Objects.equals(syncJobFiltering, connector.syncJobFiltering)
&& Objects.equals(indexName, connector.indexName)
&& Objects.equals(language, connector.language)
&& Objects.equals(lastSeen, connector.lastSeen)
Expand All @@ -520,6 +534,7 @@ public int hashCode() {
error,
features,
filtering,
syncJobFiltering,
indexName,
isNative,
language,
Expand Down Expand Up @@ -550,6 +565,7 @@ public static class Builder {
private String error;
private ConnectorFeatures features;
private List<ConnectorFiltering> filtering = List.of(ConnectorFiltering.getDefaultConnectorFilteringConfig());
private FilteringRules syncJobFiltering;
private String indexName;
private boolean isNative = false;
private String language;
Expand Down Expand Up @@ -603,6 +619,11 @@ public Builder setFiltering(List<ConnectorFiltering> filtering) {
return this;
}

public Builder setSyncJobFiltering(FilteringRules syncJobFiltering) {
this.syncJobFiltering = syncJobFiltering;
return this;
}

public Builder setIndexName(String indexName) {
this.indexName = indexName;
return this;
Expand Down Expand Up @@ -676,6 +697,7 @@ public Connector build() {
error,
features,
filtering,
syncJobFiltering,
indexName,
isNative,
language,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ public ConnectorFiltering(StreamInput in) throws IOException {
this.draft = new FilteringRules(in);
}

public FilteringRules getActive() {
return active;
}

public String getDomain() {
return domain;
}

public FilteringRules getDraft() {
return draft;
}

private static final ParseField ACTIVE_FIELD = new ParseField("active");
private static final ParseField DOMAIN_FIELD = new ParseField("domain");
private static final ParseField DRAFT_FIELD = new ParseField("draft");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public FilteringRules(StreamInput in) throws IOException {
this.filteringValidationInfo = new FilteringValidationInfo(in);
}

public FilteringAdvancedSnippet getAdvancedSnippet() {
return advancedSnippet;
}

public List<FilteringRule> getRules() {
return rules;
}

public FilteringValidationInfo getFilteringValidationInfo() {
return filteringValidationInfo;
}

private static final ParseField ADVANCED_SNIPPET_FIELD = new ParseField("advanced_snippet");
private static final ParseField RULES_FIELD = new ParseField("rules");
private static final ParseField VALIDATION_FIELD = new ParseField("validation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.Connector;
import org.elasticsearch.xpack.application.connector.ConnectorConfiguration;
import org.elasticsearch.xpack.application.connector.ConnectorFiltering;
import org.elasticsearch.xpack.application.connector.ConnectorIngestPipeline;
import org.elasticsearch.xpack.application.connector.ConnectorSyncStatus;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;

import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand Down Expand Up @@ -343,21 +342,23 @@ private static Instant parseNullableInstant(XContentParser p) throws IOException
String syncJobConnectorId = Strings.isNullOrEmpty(connectorId) ? parsedConnectorId : connectorId;

return new Connector.Builder().setConnectorId(syncJobConnectorId)
.setFiltering((List<ConnectorFiltering>) args[i++])
.setFiltering(null)
.setSyncJobFiltering((FilteringRules) args[i++])
.setIndexName((String) args[i++])
.setLanguage((String) args[i++])
.setPipeline((ConnectorIngestPipeline) args[i++])
.setServiceType((String) args[i++])
.setConfiguration((Map<String, ConnectorConfiguration>) args[i++])
.setConfiguration((Map<String, ConnectorConfiguration>) args[i])
.build();
}
);

static {
SYNC_JOB_CONNECTOR_PARSER.declareString(optionalConstructorArg(), Connector.ID_FIELD);
SYNC_JOB_CONNECTOR_PARSER.declareObjectArray(
SYNC_JOB_CONNECTOR_PARSER.declareObjectOrNull(
optionalConstructorArg(),
(p, c) -> ConnectorFiltering.fromXContent(p),
(p, c) -> FilteringRules.fromXContent(p),
null,
Connector.FILTERING_FIELD
);
SYNC_JOB_CONNECTOR_PARSER.declareStringOrNull(optionalConstructorArg(), Connector.INDEX_NAME_FIELD);
Expand Down Expand Up @@ -491,8 +492,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (connector.getConnectorId() != null) {
builder.field(Connector.ID_FIELD.getPreferredName(), connector.getConnectorId());
}
if (connector.getFiltering() != null) {
builder.field(Connector.FILTERING_FIELD.getPreferredName(), connector.getFiltering());
if (connector.getSyncJobFiltering() != null) {
builder.field(Connector.FILTERING_FIELD.getPreferredName(), connector.getSyncJobFiltering());
}
if (connector.getIndexName() != null) {
builder.field(Connector.INDEX_NAME_FIELD.getPreferredName(), connector.getIndexName());
Expand Down
Loading

0 comments on commit 4aea5fc

Please sign in to comment.