Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add functions to set missing value map in the stream writers #1966

Merged
merged 5 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,19 @@ public void run(Throwable finalStatus) {

/** Schedules the writing of rows at given offset. */
ApiFuture<AppendRowsResponse> append(
String streamName, ProtoSchema writerSchema, ProtoRows rows, long offset) {
String streamName,
ProtoSchema writerSchema,
ProtoRows rows,
long offset,
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap) {
AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
requestBuilder.setProtoRows(
ProtoData.newBuilder().setWriterSchema(writerSchema).setRows(rows).build());
if (offset >= 0) {
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamName);
requestBuilder.putAllMissingValueInterpretations(missingValueMap);
return appendInternal(requestBuilder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,11 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
streamWriter.getStreamName(),
streamWriter.getProtoSchema(),
rows,
offset,
streamWriter.getMissingValueInterpretationMap());
return ApiFutures.transform(
responseFuture,
// Add callback for update schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/**
* Sets the missing value interpretation map for the JsonStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* JsonStreamWriter.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return streamWriter.getMissingValueInterpretationMap();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -56,6 +57,11 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();

// Map of fields to their MissingValueInterpretation, which dictates how a field should be
// populated when it is missing from an input user row.
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
new HashMap();

/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -134,7 +140,12 @@ public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows protoRows, long offset) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker()
.append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset);
.append(
streamWriter.getStreamName(),
streamWriter.getProtoSchema(),
protoRows,
offset,
streamWriter.getMissingValueInterpretationMap());
} else {
return connectionWorkerPool().append(streamWriter, protoRows, offset);
}
Expand Down Expand Up @@ -325,6 +336,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}

/**
* Sets the missing value interpretation map for the stream writer. The input
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by stream
* writer.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
}

/**
* Schedules the writing of rows at the end of current stream.
*
Expand Down Expand Up @@ -397,6 +420,12 @@ public String getLocation() {
return location;
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return missingValueInterpretationMap;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.junit.Before;
Expand Down Expand Up @@ -396,7 +397,7 @@ private ApiFuture<AppendRowsResponse> sendTestMessage(
ProtoSchema protoSchema,
ProtoRows protoRows,
long offset) {
return connectionWorker.append(streamName, protoSchema, protoRows, offset);
return connectionWorker.append(streamName, protoSchema, protoRows, offset, new HashMap());
}

private ProtoRows createFooProtoRows(String[] messages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1145,4 +1146,43 @@ private AppendRowsResponse createAppendResponse(long offset) {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
.build();
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test-列")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
JSONObject flexible = new JSONObject();
flexible.put("test-列", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {

Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
missingValueMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -66,6 +68,7 @@

@RunWith(JUnit4.class)
public class StreamWriterTest {

private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
Expand Down Expand Up @@ -1107,4 +1110,49 @@ public void testCloseDisconnectedStream() throws Exception {
// Ensure closing the writer after disconnect succeeds.
writer.close();
}

@Test
public void testSetAndGetMissingValueInterpretationMap() throws Exception {
StreamWriter writer = getTestStreamWriter();
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
StreamWriter writer = getTestStreamWriter();

long appendCount = 2;
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// The first append doesn't use a missing value map.
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));

// The second append uses a missing value map.
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

// Ensure that the AppendRowsRequest for the first append operation does not have a missing
// value map, and that the second AppendRowsRequest has the missing value map provided in the
// second append.
verifyAppendRequests(appendCount);
AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
assertTrue(request1.getMissingValueInterpretations().isEmpty());
assertEquals(request2.getMissingValueInterpretations(), missingValueMap);

writer.close();
}
}