Skip to content

Commit

Permalink
feat: add functions to set missing value map in the stream writers (#…
Browse files Browse the repository at this point in the history
…1966)

* feat: add functions to set missing value map in the stream writers

* fix syntax error

* fix lint errors

---------

Co-authored-by: Rishabh Nayak <[email protected]>
  • Loading branch information
risnayak and Rishabh Nayak authored Feb 21, 2023
1 parent 3d67443 commit 98d7e44
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
* <p>TODO: support updated schema
*/
class ConnectionWorker implements AutoCloseable {

private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

// Maximum wait time on inflight quota before error out.
Expand Down Expand Up @@ -280,6 +281,8 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
requestBuilder.setOffset(Int64Value.of(offset));
}
requestBuilder.setWriteStream(streamWriter.getStreamName());
requestBuilder.putAllMissingValueInterpretations(
streamWriter.getMissingValueInterpretationMap());
return appendInternal(streamWriter, requestBuilder.build());
}

Expand Down Expand Up @@ -853,6 +856,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {

// Class that wraps AppendRowsRequest and its corresponding Response future.
static final class AppendRequestAndResponse {

final SettableApiFuture<AppendRowsResponse> appendResult;
final AppendRowsRequest message;
final long messageSize;
Expand Down Expand Up @@ -884,6 +888,7 @@ public Load getLoad() {
*/
@AutoValue
public abstract static class Load {

// Consider the load on this worker to be overwhelmed when above some percentage of
// in-flight bytes or in-flight requests count.
private static double overwhelmedInflightCount = 0.2;
Expand Down Expand Up @@ -957,6 +962,7 @@ static void setMaxInflightQueueWaitTime(long waitTime) {

@AutoValue
abstract static class TableSchemaAndTimestamp {

// Shows the timestamp updated schema is reported from response
abstract long updateTimeStamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public long getInflightWaitSeconds() {
return streamWriter.getInflightWaitSeconds();
}

/**
* Sets the missing value interpretation map for the JsonStreamWriter. The input
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by the
* JsonStreamWriter.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return streamWriter.getMissingValueInterpretationMap();
}

/** Sets all StreamWriter settings. */
private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class StreamWriter implements AutoCloseable {
// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();

// Map of fields to their MissingValueInterpretation, which dictates how a field should be
// populated when it is missing from an input user row.
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
new HashMap();

/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -336,6 +342,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
}
}

/**
* Sets the missing value interpretation map for the stream writer. The input
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
*
* @param missingValueInterpretationMap the missing value interpretation map used by stream
* writer.
*/
public void setMissingValueInterpretationMap(
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
this.missingValueInterpretationMap = missingValueInterpretationMap;
}

/**
* Schedules the writing of rows at the end of current stream.
*
Expand Down Expand Up @@ -419,6 +437,12 @@ public String getLocation() {
return location;
}

/** @return the missing value interpretation map used for the writer. */
public Map<String, AppendRowsRequest.MissingValueInterpretation>
getMissingValueInterpretationMap() {
return missingValueInterpretationMap;
}

/**
* @return if a stream writer can no longer be used for writing. It is due to either the
* StreamWriter is explicitly closed or the underlying connection is broken when connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1290,4 +1291,43 @@ private AppendRowsResponse createAppendResponse(long offset) {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
.build();
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test-列")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
JSONObject flexible = new JSONObject();
flexible.put("test-列", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(flexible);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {

Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
missingValueMap);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -67,6 +69,7 @@

@RunWith(JUnit4.class)
public class StreamWriterTest {

private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
Expand Down Expand Up @@ -1227,6 +1230,51 @@ public void testCloseDisconnectedStream() throws Exception {
writer.close();
}

@Test
public void testSetAndGetMissingValueInterpretationMap() throws Exception {
StreamWriter writer = getTestStreamWriter();
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
}

@Test
public void testAppendWithMissingValueMap() throws Exception {
StreamWriter writer = getTestStreamWriter();

long appendCount = 2;
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
// The first append doesn't use a missing value map.
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));

// The second append uses a missing value map.
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));

for (int i = 0; i < appendCount; i++) {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}

// Ensure that the AppendRowsRequest for the first append operation does not have a missing
// value map, and that the second AppendRowsRequest has the missing value map provided in the
// second append.
verifyAppendRequests(appendCount);
AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
assertTrue(request1.getMissingValueInterpretations().isEmpty());
assertEquals(request2.getMissingValueInterpretations(), missingValueMap);

writer.close();
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseMultiplexing() throws Exception {
StreamWriter writer =
Expand Down

0 comments on commit 98d7e44

Please sign in to comment.