Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't check for emptiness of primary key columns #414

Merged
merged 2 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [new feature] [#405](https://github.com/datastax/dsbulk/issues/405): Add support for Prometheus.
- [improvement] [#403](https://github.com/datastax/dsbulk/issues/403): Exclude unsupported types from automatic timestamp and TTL preservation.
- [improvement] [#402](https://github.com/datastax/dsbulk/issues/402): Upload DSBulk binary distributions to Maven Central.
- [improvement] [#411](https://github.com/datastax/dsbulk/issues/411): Don't check for emptiness of primary key columns.

## 1.8.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
public class DefaultRecordMapper implements RecordMapper {

private final List<PreparedStatement> insertStatements;
private final ImmutableSet<CQLWord> partitionKeyVariables;
private final ImmutableSet<CQLWord> clusteringColumnVariables;
private final ImmutableSet<CQLWord> primaryKeyVariables;
private final ProtocolVersion protocolVersion;
private final Mapping mapping;
Expand Down Expand Up @@ -99,8 +97,6 @@ public DefaultRecordMapper(
boolean allowMissingFields,
Function<PreparedStatement, BoundStatementBuilder> boundStatementBuilderFactory) {
this.insertStatements = ImmutableList.copyOf(insertStatements);
this.partitionKeyVariables = ImmutableSet.copyOf(partitionKeyVariables);
this.clusteringColumnVariables = ImmutableSet.copyOf(clusteringColumnVariables);
this.protocolVersion = protocolVersion;
this.mapping = mapping;
this.recordMetadata = recordMetadata;
Expand Down Expand Up @@ -185,16 +181,8 @@ private <T> BoundStatementBuilder bindColumn(
} catch (Exception e) {
throw InvalidMappingException.encodeFailed(field, variable, javaType, cqlType, raw, e);
}
boolean isNull = isNull(bb, cqlType);
if (isNull || isEmpty(bb)) {
if (partitionKeyVariables.contains(variable)) {
throw isNull
? InvalidMappingException.nullPrimaryKey(variable)
: InvalidMappingException.emptyPrimaryKey(variable);
}
}
if (isNull) {
if (clusteringColumnVariables.contains(variable)) {
if (isNull(bb, cqlType)) {
if (primaryKeyVariables.contains(variable)) {
throw InvalidMappingException.nullPrimaryKey(variable);
}
if (nullToUnset) {
Expand All @@ -220,10 +208,6 @@ private boolean isNull(ByteBuffer bb, DataType cqlType) {
}
}

private boolean isEmpty(ByteBuffer bb) {
return bb == null || !bb.hasRemaining();
}

private void ensureAllFieldsPresent(Set<Field> recordFields) {
for (Field field : mapping.fields()) {
if (!recordFields.contains(field)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,6 @@ public static InvalidMappingException nullPrimaryKey(@NonNull CQLWord variable)
+ "Check that your settings (schema.mapping or schema.query) match your dataset contents.");
}

@NonNull
public static InvalidMappingException emptyPrimaryKey(@NonNull CQLWord variable) {
return new InvalidMappingException(
"Primary key column "
+ variable.render(VARIABLE)
+ " cannot be set to empty. "
+ "Check that your settings (schema.mapping or schema.query) match your dataset contents.");
}

@NonNull
public static InvalidMappingException unsetPrimaryKey(@NonNull CQLWord variable) {
return new InvalidMappingException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,153 +611,6 @@ void should_return_unmappable_statement_when_missing_field() {
+ "or set schema.allowMissingFields to true.");
}

@Test
void should_map_when_pk_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c1Def.getType()).thenReturn(DataTypes.TEXT);
when(c2Def.getType()).thenReturn(DataTypes.ASCII);
when(c3Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F1)).thenReturn("foo");
when(record.getFieldValue(F2)).thenReturn("foo");
when(record.getFieldValue(F3)).thenReturn(""); // blobs can be empty
when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1);
when(mapping.codec(C2, DataTypes.ASCII, GenericType.STRING)).thenReturn(codec2);
when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8)));
when(codec2.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8)));
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, ByteBuffer.wrap("foo".getBytes(UTF_8)));
assertParameter(1, C2, ByteBuffer.wrap("foo".getBytes(UTF_8)));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_not_map_when_partition_key_column_is_empty_string() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(record.getFieldValue(F1)).thenReturn("");
when(c1Def.getType()).thenReturn(DataTypes.TEXT);
when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class);
UnmappableStatement unmappableStatement = (UnmappableStatement) result;
assertThat(unmappableStatement.getError())
.isInstanceOf(InvalidMappingException.class)
.hasMessageContaining("Primary key column col1 cannot be set to empty");
}

@Test
void should_not_map_when_partition_key_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c1Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F1)).thenReturn("");
when(mapping.codec(C1, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec1);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class);
UnmappableStatement unmappableStatement = (UnmappableStatement) result;
assertThat(unmappableStatement.getError())
.isInstanceOf(InvalidMappingException.class)
.hasMessageContaining("Primary key column col1 cannot be set to empty");
}

@Test
void should_map_when_clustering_column_is_empty_string() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(record.getFieldValue(F3)).thenReturn("");
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, TypeCodecs.INT.encode(42, V4));
assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_map_when_clustering_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c3Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F3)).thenReturn("");
when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3);
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, TypeCodecs.INT.encode(42, V4));
assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_map_to_multiple_statements() {
when(record.fields()).thenReturn(set(F1, F2, F3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1756,9 +1756,9 @@ void should_error_when_insert_query_does_not_contain_primary_key() {
settings.init(session, false, true);
RecordMapper mapper = settings.createRecordMapper(session, recordMetadata, codecFactory, false);
@SuppressWarnings("unchecked")
Set<CQLWord> clusteringColumnVariables =
(Set<CQLWord>) getInternalState(mapper, "clusteringColumnVariables");
assertThat(clusteringColumnVariables).isEmpty();
Set<CQLWord> primaryKeyVariables =
(Set<CQLWord>) getInternalState(mapper, "primaryKeyVariables");
assertThat(primaryKeyVariables).doesNotContain(CQLWord.fromCqlIdentifier(C2));
}

@Test
Expand Down