Skip to content

Commit

Permalink
Don't check for emptiness of primary key columns (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra authored Apr 4, 2022
1 parent 0a7af94 commit 5eade49
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 177 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [new feature] [#405](https://github.com/datastax/dsbulk/issues/405): Add support for Prometheus.
- [improvement] [#403](https://github.com/datastax/dsbulk/issues/403): Exclude unsupported types from automatic timestamp and TTL preservation.
- [improvement] [#402](https://github.com/datastax/dsbulk/issues/402): Upload DSBulk binary distributions to Maven Central.
- [improvement] [#411](https://github.com/datastax/dsbulk/issues/411): Don't check for emptiness of primary key columns.

## 1.8.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@
public class DefaultRecordMapper implements RecordMapper {

private final List<PreparedStatement> insertStatements;
private final ImmutableSet<CQLWord> partitionKeyVariables;
private final ImmutableSet<CQLWord> clusteringColumnVariables;
private final ImmutableSet<CQLWord> primaryKeyVariables;
private final ProtocolVersion protocolVersion;
private final Mapping mapping;
Expand Down Expand Up @@ -99,8 +97,6 @@ public DefaultRecordMapper(
boolean allowMissingFields,
Function<PreparedStatement, BoundStatementBuilder> boundStatementBuilderFactory) {
this.insertStatements = ImmutableList.copyOf(insertStatements);
this.partitionKeyVariables = ImmutableSet.copyOf(partitionKeyVariables);
this.clusteringColumnVariables = ImmutableSet.copyOf(clusteringColumnVariables);
this.protocolVersion = protocolVersion;
this.mapping = mapping;
this.recordMetadata = recordMetadata;
Expand Down Expand Up @@ -185,16 +181,8 @@ private <T> BoundStatementBuilder bindColumn(
} catch (Exception e) {
throw InvalidMappingException.encodeFailed(field, variable, javaType, cqlType, raw, e);
}
boolean isNull = isNull(bb, cqlType);
if (isNull || isEmpty(bb)) {
if (partitionKeyVariables.contains(variable)) {
throw isNull
? InvalidMappingException.nullPrimaryKey(variable)
: InvalidMappingException.emptyPrimaryKey(variable);
}
}
if (isNull) {
if (clusteringColumnVariables.contains(variable)) {
if (isNull(bb, cqlType)) {
if (primaryKeyVariables.contains(variable)) {
throw InvalidMappingException.nullPrimaryKey(variable);
}
if (nullToUnset) {
Expand All @@ -220,10 +208,6 @@ private boolean isNull(ByteBuffer bb, DataType cqlType) {
}
}

private boolean isEmpty(ByteBuffer bb) {
return bb == null || !bb.hasRemaining();
}

private void ensureAllFieldsPresent(Set<Field> recordFields) {
for (Field field : mapping.fields()) {
if (!recordFields.contains(field)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,6 @@ public static InvalidMappingException nullPrimaryKey(@NonNull CQLWord variable)
+ "Check that your settings (schema.mapping or schema.query) match your dataset contents.");
}

@NonNull
public static InvalidMappingException emptyPrimaryKey(@NonNull CQLWord variable) {
return new InvalidMappingException(
"Primary key column "
+ variable.render(VARIABLE)
+ " cannot be set to empty. "
+ "Check that your settings (schema.mapping or schema.query) match your dataset contents.");
}

@NonNull
public static InvalidMappingException unsetPrimaryKey(@NonNull CQLWord variable) {
return new InvalidMappingException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,153 +611,6 @@ void should_return_unmappable_statement_when_missing_field() {
+ "or set schema.allowMissingFields to true.");
}

@Test
void should_map_when_pk_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c1Def.getType()).thenReturn(DataTypes.TEXT);
when(c2Def.getType()).thenReturn(DataTypes.ASCII);
when(c3Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F1)).thenReturn("foo");
when(record.getFieldValue(F2)).thenReturn("foo");
when(record.getFieldValue(F3)).thenReturn(""); // blobs can be empty
when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1);
when(mapping.codec(C2, DataTypes.ASCII, GenericType.STRING)).thenReturn(codec2);
when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8)));
when(codec2.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8)));
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, ByteBuffer.wrap("foo".getBytes(UTF_8)));
assertParameter(1, C2, ByteBuffer.wrap("foo".getBytes(UTF_8)));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_not_map_when_partition_key_column_is_empty_string() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(record.getFieldValue(F1)).thenReturn("");
when(c1Def.getType()).thenReturn(DataTypes.TEXT);
when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class);
UnmappableStatement unmappableStatement = (UnmappableStatement) result;
assertThat(unmappableStatement.getError())
.isInstanceOf(InvalidMappingException.class)
.hasMessageContaining("Primary key column col1 cannot be set to empty");
}

@Test
void should_not_map_when_partition_key_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c1Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F1)).thenReturn("");
when(mapping.codec(C1, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec1);
when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class);
UnmappableStatement unmappableStatement = (UnmappableStatement) result;
assertThat(unmappableStatement.getError())
.isInstanceOf(InvalidMappingException.class)
.hasMessageContaining("Primary key column col1 cannot be set to empty");
}

@Test
void should_map_when_clustering_column_is_empty_string() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(record.getFieldValue(F3)).thenReturn("");
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, TypeCodecs.INT.encode(42, V4));
assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_map_when_clustering_column_is_empty_blob() {
when(record.fields()).thenReturn(set(F1, F2, F3));
when(c3Def.getType()).thenReturn(DataTypes.BLOB);
when(record.getFieldValue(F3)).thenReturn("");
when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3);
when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0));
RecordMapper mapper =
new DefaultRecordMapper(
Collections.singletonList(insertStatement),
set(C1),
set(C2, C3),
V4,
mapping,
recordMetadata,
false,
true,
false,
statement -> boundStatementBuilder);
Statement<?> result = mapper.map(record).single().block();
assertThat(result).isInstanceOf(MappedBoundStatement.class);
assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement);
verify(boundStatementBuilder, times(3))
.setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture());
assertParameter(0, C1, TypeCodecs.INT.encode(42, V4));
assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4));
assertParameter(2, C3, ByteBuffer.allocate(0));
}

@Test
void should_map_to_multiple_statements() {
when(record.fields()).thenReturn(set(F1, F2, F3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1756,9 +1756,9 @@ void should_error_when_insert_query_does_not_contain_primary_key() {
settings.init(session, false, true);
RecordMapper mapper = settings.createRecordMapper(session, recordMetadata, codecFactory, false);
@SuppressWarnings("unchecked")
Set<CQLWord> clusteringColumnVariables =
(Set<CQLWord>) getInternalState(mapper, "clusteringColumnVariables");
assertThat(clusteringColumnVariables).isEmpty();
Set<CQLWord> primaryKeyVariables =
(Set<CQLWord>) getInternalState(mapper, "primaryKeyVariables");
assertThat(primaryKeyVariables).doesNotContain(CQLWord.fromCqlIdentifier(C2));
}

@Test
Expand Down

0 comments on commit 5eade49

Please sign in to comment.