Skip to content

Commit

Permalink
feat: add routing header for multiplexed connection (#2035)
Browse files Browse the repository at this point in the history
  • Loading branch information
yirutang authored Mar 10, 2023
1 parent e4c5e97 commit 1f2752f
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.concurrent.GuardedBy;

/**
Expand Down Expand Up @@ -219,11 +221,29 @@ class ConnectionWorker implements AutoCloseable {
private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
private long testOnlyAppendLoopSleepTime = 0;

private static String projectMatching = "projects/[^/]+/";
private static Pattern streamPatternProject = Pattern.compile(projectMatching);

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

static String extractProjectName(String streamName) {
Matcher streamMatcher = streamPatternProject.matcher(streamName);
if (streamMatcher.find()) {
return streamMatcher.group();
} else {
throw new IllegalStateException(
String.format("The passed in stream name does not match standard format %s", streamName));
}
}

static String getRoutingHeader(String streamName, String location) {
String project = extractProjectName(streamName);
return project + "locations/" + location;
}

public ConnectionWorker(
String streamName,
String location,
Expand Down Expand Up @@ -259,6 +279,10 @@ public ConnectionWorker(
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
if (this.location == null) {
newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName);
} else {
newHeaders.put(
"x-goog-request-params",
"write_location=" + getRoutingHeader(this.streamName, this.location));
}
BigQueryWriteSettings stubSettings =
clientSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,4 +710,10 @@ public void testLongTimeIdleWontFail() throws Exception {
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
}
}

@Test
public void testLocationName() throws Exception {
assertEquals(
"projects/p1/locations/us", ConnectionWorker.getRoutingHeader(TEST_STREAM_1, "us"));
}
}

0 comments on commit 1f2752f

Please sign in to comment.