diff --git a/changelog/README.md b/changelog/README.md index c0797cd78..3ea4465d4 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -7,6 +7,7 @@ - [new feature] [#405](https://github.com/datastax/dsbulk/issues/405): Add support for Prometheus. - [improvement] [#403](https://github.com/datastax/dsbulk/issues/403): Exclude unsupported types from automatic timestamp and TTL preservation. - [improvement] [#402](https://github.com/datastax/dsbulk/issues/402): Upload DSBulk binary distributions to Maven Central. +- [improvement] [#411](https://github.com/datastax/dsbulk/issues/411): Don't check for emptiness of primary key columns. ## 1.8.0 diff --git a/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapper.java b/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapper.java index 192ad74f3..a8ea7c106 100644 --- a/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapper.java +++ b/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapper.java @@ -51,8 +51,6 @@ public class DefaultRecordMapper implements RecordMapper { private final List insertStatements; - private final ImmutableSet partitionKeyVariables; - private final ImmutableSet clusteringColumnVariables; private final ImmutableSet primaryKeyVariables; private final ProtocolVersion protocolVersion; private final Mapping mapping; @@ -99,8 +97,6 @@ public DefaultRecordMapper( boolean allowMissingFields, Function boundStatementBuilderFactory) { this.insertStatements = ImmutableList.copyOf(insertStatements); - this.partitionKeyVariables = ImmutableSet.copyOf(partitionKeyVariables); - this.clusteringColumnVariables = ImmutableSet.copyOf(clusteringColumnVariables); this.protocolVersion = protocolVersion; this.mapping = mapping; this.recordMetadata = recordMetadata; @@ -185,16 +181,8 @@ private BoundStatementBuilder bindColumn( } catch (Exception e) { throw InvalidMappingException.encodeFailed(field, variable, javaType, cqlType, raw, e); } - boolean isNull = isNull(bb, cqlType); - if (isNull || isEmpty(bb)) { - if (partitionKeyVariables.contains(variable)) { - throw isNull - ? InvalidMappingException.nullPrimaryKey(variable) - : InvalidMappingException.emptyPrimaryKey(variable); - } - } - if (isNull) { - if (clusteringColumnVariables.contains(variable)) { + if (isNull(bb, cqlType)) { + if (primaryKeyVariables.contains(variable)) { throw InvalidMappingException.nullPrimaryKey(variable); } if (nullToUnset) { @@ -220,10 +208,6 @@ private boolean isNull(ByteBuffer bb, DataType cqlType) { } } - private boolean isEmpty(ByteBuffer bb) { - return bb == null || !bb.hasRemaining(); - } - private void ensureAllFieldsPresent(Set recordFields) { for (Field field : mapping.fields()) { if (!recordFields.contains(field)) { diff --git a/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/InvalidMappingException.java b/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/InvalidMappingException.java index 3e2b3db9a..0a9c2e2d4 100644 --- a/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/InvalidMappingException.java +++ b/workflow/commons/src/main/java/com/datastax/oss/dsbulk/workflow/commons/schema/InvalidMappingException.java @@ -69,15 +69,6 @@ public static InvalidMappingException nullPrimaryKey(@NonNull CQLWord variable) + "Check that your settings (schema.mapping or schema.query) match your dataset contents."); } - @NonNull - public static InvalidMappingException emptyPrimaryKey(@NonNull CQLWord variable) { - return new InvalidMappingException( - "Primary key column " - + variable.render(VARIABLE) - + " cannot be set to empty. " - + "Check that your settings (schema.mapping or schema.query) match your dataset contents."); - } - @NonNull public static InvalidMappingException unsetPrimaryKey(@NonNull CQLWord variable) { return new InvalidMappingException( diff --git a/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapperTest.java b/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapperTest.java index 7b15f4b16..1a5d2bcd4 100644 --- a/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapperTest.java +++ b/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/schema/DefaultRecordMapperTest.java @@ -611,153 +611,6 @@ void should_return_unmappable_statement_when_missing_field() { + "or set schema.allowMissingFields to true."); } - @Test - void should_map_when_pk_column_is_empty_blob() { - when(record.fields()).thenReturn(set(F1, F2, F3)); - when(c1Def.getType()).thenReturn(DataTypes.TEXT); - when(c2Def.getType()).thenReturn(DataTypes.ASCII); - when(c3Def.getType()).thenReturn(DataTypes.BLOB); - when(record.getFieldValue(F1)).thenReturn("foo"); - when(record.getFieldValue(F2)).thenReturn("foo"); - when(record.getFieldValue(F3)).thenReturn(""); // blobs can be empty - when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1); - when(mapping.codec(C2, DataTypes.ASCII, GenericType.STRING)).thenReturn(codec2); - when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3); - when(codec1.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8))); - when(codec2.encode(any(), any())).thenReturn(ByteBuffer.wrap("foo".getBytes(UTF_8))); - when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0)); - RecordMapper mapper = - new DefaultRecordMapper( - Collections.singletonList(insertStatement), - set(C1), - set(C2, C3), - V4, - mapping, - recordMetadata, - false, - true, - false, - statement -> boundStatementBuilder); - Statement result = mapper.map(record).single().block(); - assertThat(result).isInstanceOf(MappedBoundStatement.class); - assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement); - verify(boundStatementBuilder, times(3)) - .setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture()); - assertParameter(0, C1, ByteBuffer.wrap("foo".getBytes(UTF_8))); - assertParameter(1, C2, ByteBuffer.wrap("foo".getBytes(UTF_8))); - assertParameter(2, C3, ByteBuffer.allocate(0)); - } - - @Test - void should_not_map_when_partition_key_column_is_empty_string() { - when(record.fields()).thenReturn(set(F1, F2, F3)); - when(record.getFieldValue(F1)).thenReturn(""); - when(c1Def.getType()).thenReturn(DataTypes.TEXT); - when(mapping.codec(C1, DataTypes.TEXT, GenericType.STRING)).thenReturn(codec1); - when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0)); - RecordMapper mapper = - new DefaultRecordMapper( - Collections.singletonList(insertStatement), - set(C1), - set(C2, C3), - V4, - mapping, - recordMetadata, - false, - true, - false, - statement -> boundStatementBuilder); - Statement result = mapper.map(record).single().block(); - assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class); - UnmappableStatement unmappableStatement = (UnmappableStatement) result; - assertThat(unmappableStatement.getError()) - .isInstanceOf(InvalidMappingException.class) - .hasMessageContaining("Primary key column col1 cannot be set to empty"); - } - - @Test - void should_not_map_when_partition_key_column_is_empty_blob() { - when(record.fields()).thenReturn(set(F1, F2, F3)); - when(c1Def.getType()).thenReturn(DataTypes.BLOB); - when(record.getFieldValue(F1)).thenReturn(""); - when(mapping.codec(C1, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec1); - when(codec1.encode(any(), any())).thenReturn(ByteBuffer.allocate(0)); - RecordMapper mapper = - new DefaultRecordMapper( - Collections.singletonList(insertStatement), - set(C1), - set(C2, C3), - V4, - mapping, - recordMetadata, - false, - true, - false, - statement -> boundStatementBuilder); - Statement result = mapper.map(record).single().block(); - assertThat(result).isNotSameAs(boundStatement).isInstanceOf(UnmappableStatement.class); - UnmappableStatement unmappableStatement = (UnmappableStatement) result; - assertThat(unmappableStatement.getError()) - .isInstanceOf(InvalidMappingException.class) - .hasMessageContaining("Primary key column col1 cannot be set to empty"); - } - - @Test - void should_map_when_clustering_column_is_empty_string() { - when(record.fields()).thenReturn(set(F1, F2, F3)); - when(record.getFieldValue(F3)).thenReturn(""); - when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0)); - RecordMapper mapper = - new DefaultRecordMapper( - Collections.singletonList(insertStatement), - set(C1), - set(C2, C3), - V4, - mapping, - recordMetadata, - false, - true, - false, - statement -> boundStatementBuilder); - Statement result = mapper.map(record).single().block(); - assertThat(result).isInstanceOf(MappedBoundStatement.class); - assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement); - verify(boundStatementBuilder, times(3)) - .setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture()); - assertParameter(0, C1, TypeCodecs.INT.encode(42, V4)); - assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4)); - assertParameter(2, C3, ByteBuffer.allocate(0)); - } - - @Test - void should_map_when_clustering_column_is_empty_blob() { - when(record.fields()).thenReturn(set(F1, F2, F3)); - when(c3Def.getType()).thenReturn(DataTypes.BLOB); - when(record.getFieldValue(F3)).thenReturn(""); - when(mapping.codec(C3, DataTypes.BLOB, GenericType.STRING)).thenReturn(codec3); - when(codec3.encode(any(), any())).thenReturn(ByteBuffer.allocate(0)); - RecordMapper mapper = - new DefaultRecordMapper( - Collections.singletonList(insertStatement), - set(C1), - set(C2, C3), - V4, - mapping, - recordMetadata, - false, - true, - false, - statement -> boundStatementBuilder); - Statement result = mapper.map(record).single().block(); - assertThat(result).isInstanceOf(MappedBoundStatement.class); - assertThat(ReflectionUtils.getInternalState(result, "delegate")).isSameAs(boundStatement); - verify(boundStatementBuilder, times(3)) - .setBytesUnsafe(variableCaptor.capture(), valueCaptor.capture()); - assertParameter(0, C1, TypeCodecs.INT.encode(42, V4)); - assertParameter(1, C2, TypeCodecs.BIGINT.encode(4242L, V4)); - assertParameter(2, C3, ByteBuffer.allocate(0)); - } - @Test void should_map_to_multiple_statements() { when(record.fields()).thenReturn(set(F1, F2, F3)); diff --git a/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/settings/SchemaSettingsTest.java b/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/settings/SchemaSettingsTest.java index c1425e662..e59f2dbff 100644 --- a/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/settings/SchemaSettingsTest.java +++ b/workflow/commons/src/test/java/com/datastax/oss/dsbulk/workflow/commons/settings/SchemaSettingsTest.java @@ -1756,9 +1756,9 @@ void should_error_when_insert_query_does_not_contain_primary_key() { settings.init(session, false, true); RecordMapper mapper = settings.createRecordMapper(session, recordMetadata, codecFactory, false); @SuppressWarnings("unchecked") - Set clusteringColumnVariables = - (Set) getInternalState(mapper, "clusteringColumnVariables"); - assertThat(clusteringColumnVariables).isEmpty(); + Set primaryKeyVariables = + (Set) getInternalState(mapper, "primaryKeyVariables"); + assertThat(primaryKeyVariables).doesNotContain(CQLWord.fromCqlIdentifier(C2)); } @Test