Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for session id on TableDataWriteChannel #2715

Merged
merged 15 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.io.Serializable;
import java.util.List;
Expand Down Expand Up @@ -56,9 +57,11 @@ public final class WriteChannelConfiguration implements LoadConfiguration, Seria
private final Boolean useAvroLogicalTypes;
private final Map<String, String> labels;
private List<String> decimalTargetTypes;
private final List<ConnectionProperty> connectionProperties;

public static final class Builder implements LoadConfiguration.Builder {
private final Boolean createSession;

public static final class Builder implements LoadConfiguration.Builder {
private TableId destinationTable;
private CreateDisposition createDisposition;
private WriteDisposition writeDisposition;
Expand All @@ -75,10 +78,14 @@ public static final class Builder implements LoadConfiguration.Builder {
private Boolean useAvroLogicalTypes;
private Map<String, String> labels;
private List<String> decimalTargetTypes;
private List<ConnectionProperty> connectionProperties;

private Boolean createSession;

private Builder() {}

private Builder(WriteChannelConfiguration writeChannelConfiguration) {
this();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is cool; didn't know about this until today, isn't this a no-op?

this.destinationTable = writeChannelConfiguration.destinationTable;
this.createDisposition = writeChannelConfiguration.createDisposition;
this.writeDisposition = writeChannelConfiguration.writeDisposition;
Expand All @@ -96,6 +103,8 @@ private Builder(WriteChannelConfiguration writeChannelConfiguration) {
this.useAvroLogicalTypes = writeChannelConfiguration.useAvroLogicalTypes;
this.labels = writeChannelConfiguration.labels;
this.decimalTargetTypes = writeChannelConfiguration.decimalTargetTypes;
this.connectionProperties = writeChannelConfiguration.connectionProperties;
this.createSession = writeChannelConfiguration.createSession;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
Expand Down Expand Up @@ -175,6 +184,13 @@ private Builder(com.google.api.services.bigquery.model.JobConfiguration configur
if (loadConfigurationPb.getDecimalTargetTypes() != null) {
this.decimalTargetTypes = loadConfigurationPb.getDecimalTargetTypes();
}
if (loadConfigurationPb.getConnectionProperties() != null) {

this.connectionProperties =
Lists.transform(
loadConfigurationPb.getConnectionProperties(), ConnectionProperty.FROM_PB_FUNCTION);
}
createSession = loadConfigurationPb.getCreateSession();
}

@Override
Expand Down Expand Up @@ -274,6 +290,16 @@ public Builder setDecimalTargetTypes(List<String> decimalTargetTypes) {
return this;
}

public Builder setConnectionProperties(List<ConnectionProperty> connectionProperties) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not crucial, but if we use lombok, it covers some boiler plate code for getters and setters:
https://projectlombok.org/features/GetterSetter

this.connectionProperties = ImmutableList.copyOf(connectionProperties);
return this;
}

public Builder setCreateSession(Boolean createSession) {
this.createSession = createSession;
return this;
}

@Override
public WriteChannelConfiguration build() {
return new WriteChannelConfiguration(this);
Expand All @@ -297,6 +323,8 @@ protected WriteChannelConfiguration(Builder builder) {
this.useAvroLogicalTypes = builder.useAvroLogicalTypes;
this.labels = builder.labels;
this.decimalTargetTypes = builder.decimalTargetTypes;
this.connectionProperties = builder.connectionProperties;
this.createSession = builder.createSession;
}

@Override
Expand Down Expand Up @@ -390,6 +418,14 @@ public List<String> getDecimalTargetTypes() {
return decimalTargetTypes;
}

public List<ConnectionProperty> getConnectionProperties() {
return connectionProperties;
}

public Boolean getCreateSession() {
return createSession;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -412,7 +448,9 @@ MoreObjects.ToStringHelper toStringHelper() {
.add("clustering", clustering)
.add("useAvroLogicalTypes", useAvroLogicalTypes)
.add("labels", labels)
.add("decimalTargetTypes", decimalTargetTypes);
.add("decimalTargetTypes", decimalTargetTypes)
.add("connectionProperties", connectionProperties)
.add("createSession", createSession);
}

@Override
Expand Down Expand Up @@ -444,7 +482,9 @@ public int hashCode() {
clustering,
useAvroLogicalTypes,
labels,
decimalTargetTypes);
decimalTargetTypes,
connectionProperties,
createSession);
}

WriteChannelConfiguration setProjectId(String projectId) {
Expand Down Expand Up @@ -519,6 +559,13 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
if (decimalTargetTypes != null) {
loadConfigurationPb.setDecimalTargetTypes(decimalTargetTypes);
}
if (connectionProperties != null) {
loadConfigurationPb.setConnectionProperties(
Lists.transform(connectionProperties, ConnectionProperty.TO_PB_FUNCTION));
}
if (createSession != null) {
loadConfigurationPb.setCreateSession(createSession);
}
jobConfiguration.setLoad(loadConfigurationPb);
return jobConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ public class WriteChannelConfigurationTest {
ImmutableMap.of("test-job-name", "test-write-channel");
private static final List<String> DECIMAL_TARGET_TYPES =
ImmutableList.of("NUMERIC", "BIGNUMERIC");

private static final boolean CREATE_SESSION = true;
private static final String KEY = "session_id";
private static final String VALUE = "session_id_1234567890";
private static final ConnectionProperty CONNECTION_PROPERTY =
ConnectionProperty.newBuilder().setKey(KEY).setValue(VALUE).build();
private static final List<ConnectionProperty> CONNECTION_PROPERTIES =
ImmutableList.of(CONNECTION_PROPERTY);
private static final WriteChannelConfiguration LOAD_CONFIGURATION_CSV =
WriteChannelConfiguration.newBuilder(TABLE_ID)
.setCreateDisposition(CREATE_DISPOSITION)
Expand All @@ -76,6 +84,8 @@ public class WriteChannelConfigurationTest {
.setClustering(CLUSTERING)
.setLabels(LABELS)
.setDecimalTargetTypes(DECIMAL_TARGET_TYPES)
.setConnectionProperties(CONNECTION_PROPERTIES)
.setCreateSession(CREATE_SESSION)
.build();

private static final DatastoreBackupOptions BACKUP_OPTIONS =
Expand Down Expand Up @@ -232,5 +242,7 @@ private void compareLoadConfiguration(
assertEquals(expected.getUseAvroLogicalTypes(), value.getUseAvroLogicalTypes());
assertEquals(expected.getLabels(), value.getLabels());
assertEquals(expected.getDecimalTargetTypes(), value.getDecimalTargetTypes());
assertEquals(expected.getConnectionProperties(), value.getConnectionProperties());
assertEquals(expected.getCreateSession(), value.getCreateSession());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.client.util.IOUtils;
import com.google.api.gax.paging.Page;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.auth.oauth2.ServiceAccountCredentials;
Expand Down Expand Up @@ -60,6 +61,7 @@
import com.google.cloud.bigquery.ConnectionProperty;
import com.google.cloud.bigquery.ConnectionSettings;
import com.google.cloud.bigquery.CopyJobConfiguration;
import com.google.cloud.bigquery.CsvOptions;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
Expand Down Expand Up @@ -141,9 +143,13 @@
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
Expand Down Expand Up @@ -711,6 +717,36 @@ public class ITBigQueryTest {
private static final List<ConnectionProperty> CONNECTION_PROPERTIES =
ImmutableList.of(CONNECTION_PROPERTY);

private static final Field ID_SCHEMA =
Field.newBuilder("id", LegacySQLTypeName.STRING)
.setMode(Mode.REQUIRED)
.setDescription("id")
.build();
private static final Field FIRST_NAME_SCHEMA =
Field.newBuilder("firstname", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.setDescription("First Name")
.build();
private static final Field LAST_NAME_SCHEMA =
Field.newBuilder("lastname", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.setDescription("LAST NAME")
.build();
private static final Field EMAIL_SCHEMA =
Field.newBuilder("email", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.setDescription("email")
.build();
private static final Field PROFESSION_SCHEMA =
Field.newBuilder("profession", LegacySQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.setDescription("profession")
.build();
private static final Schema SESSION_TABLE_SCHEMA =
Schema.of(ID_SCHEMA, FIRST_NAME_SCHEMA, LAST_NAME_SCHEMA, EMAIL_SCHEMA, PROFESSION_SCHEMA);
private static final Path csvPath =
FileSystems.getDefault().getPath("src/test/resources", "sessionTest.csv").toAbsolutePath();

private static final Set<String> PUBLIC_DATASETS =
ImmutableSet.of("github_repos", "hacker_news", "noaa_gsod", "samples", "usa_names");

Expand Down Expand Up @@ -3733,6 +3769,81 @@ public void testQuerySessionSupport() throws InterruptedException {
assertEquals(sessionId, statisticsWithSession.getSessionInfo().getSessionId());
}

@Test
public void testLoadSessionSupportWriteChannelConfiguration() throws InterruptedException {
TableId sessionTableId = TableId.of("_SESSION", "test_temp_destination_table_from_file");

WriteChannelConfiguration configuration =
WriteChannelConfiguration.newBuilder(sessionTableId)
.setFormatOptions(CsvOptions.newBuilder().setFieldDelimiter(",").build())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(SESSION_TABLE_SCHEMA)
.setCreateSession(true)
.build();
String jobName = "jobId_" + UUID.randomUUID().toString();
JobId jobId = JobId.newBuilder().setLocation("us").setJob(jobName).build();
String sessionId;

// Imports a local file into a table.
try (TableDataWriteChannel writer = bigquery.writer(jobId, configuration);
OutputStream stream = Channels.newOutputStream(writer)) {
InputStream inputStream =
ITBigQueryTest.class.getClassLoader().getResourceAsStream("sessionTest.csv");
// Can use `Files.copy(csvPath, stream);` instead.
// Using IOUtils here because graalvm can't handle resource files.
IOUtils.copy(inputStream, stream);

} catch (IOException e) {
throw new RuntimeException(e);
}
Job loadJob = bigquery.getJob(jobId);
Job completedJob = loadJob.waitFor();

assertNotNull(completedJob);
assertEquals(jobId.getJob(), completedJob.getJobId().getJob());
JobStatistics.LoadStatistics statistics = completedJob.getStatistics();

sessionId = statistics.getSessionInfo().getSessionId();
System.out.println(sessionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im guessing this was used for debugging, and meant to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

assertNotNull(sessionId);

// Load job in the same session.
// Should load the data to a temp table.
ConnectionProperty sessionConnectionProperty =
ConnectionProperty.newBuilder().setKey("session_id").setValue(sessionId).build();
WriteChannelConfiguration sessionConfiguration =
WriteChannelConfiguration.newBuilder(sessionTableId)
.setConnectionProperties(ImmutableList.of(sessionConnectionProperty))
.setFormatOptions(CsvOptions.newBuilder().setFieldDelimiter(",").build())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(SESSION_TABLE_SCHEMA)
.build();
String sessionJobName = "jobId_" + UUID.randomUUID().toString();
JobId sessionJobId = JobId.newBuilder().setLocation("us").setJob(sessionJobName).build();
try (TableDataWriteChannel writer = bigquery.writer(sessionJobId, sessionConfiguration);
OutputStream stream = Channels.newOutputStream(writer)) {
InputStream inputStream =
ITBigQueryTest.class.getClassLoader().getResourceAsStream("sessionTest.csv");
IOUtils.copy(inputStream, stream);
} catch (IOException e) {
throw new RuntimeException(e);
}
Job queryJobWithSession = bigquery.getJob(sessionJobId);
queryJobWithSession = queryJobWithSession.waitFor();
LoadStatistics statisticsWithSession = queryJobWithSession.getStatistics();
assertNotNull(statisticsWithSession.getSessionInfo().getSessionId());

// Checking if the data loaded to the temp table in the session
String queryTempTable = "SELECT * FROM _SESSION.test_temp_destination_table_from_file;";
QueryJobConfiguration queryJobConfigurationWithSession =
QueryJobConfiguration.newBuilder(queryTempTable)
.setConnectionProperties(ImmutableList.of(sessionConnectionProperty))
.build();
Job queryTempTableJob = bigquery.create(JobInfo.of(queryJobConfigurationWithSession));
queryTempTableJob = queryTempTableJob.waitFor();
assertNotNull(queryTempTableJob.getQueryResults());
}

@Test
public void testLoadSessionSupport() throws InterruptedException {
// Start the session
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"resources":[{"pattern": ".*.csv"}]
"resources":[{"pattern": ".*.csv"},
{"pattern": ".*src/test/resources/sessionTest.csv"}]
}
51 changes: 51 additions & 0 deletions google-cloud-bigquery/src/test/resources/sessionTest.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
id,firstname,lastname,email,profession
100,Rani,Merell,[email protected],firefighter
101,Goldie,Dex,[email protected],developer
102,Cristabel,Munn,[email protected],developer
103,Genevra,Strephon,[email protected],firefighter
104,Augustine,Thema,[email protected],doctor
105,Jemie,Gombach,[email protected],police officer
106,Maye,Stuart,[email protected],developer
107,Ayn,Carmena,[email protected],worker
108,Gale,Celestine,[email protected],doctor
109,Alex,Jerold,[email protected],firefighter
110,Violet,Giule,[email protected],firefighter
111,Starla,Uird,[email protected],doctor
112,Tarra,Pelagias,[email protected],police officer
113,Eugine,Deny,[email protected],doctor
114,Shirlee,Ricarda,[email protected],doctor
115,Ariela,Penelopa,[email protected],worker
116,Lelah,Astra,[email protected],police officer
117,Debee,Deegan,[email protected],developer
118,Pollyanna,Euridice,[email protected],worker
119,Cathie,Halsey,[email protected],firefighter
120,Rebeca,Quinn,[email protected],doctor
121,Paulita,Arquit,[email protected],police officer
122,Rebeca,Emanuel,[email protected],firefighter
123,Tera,Ilka,[email protected],firefighter
124,Orsola,Briney,[email protected],doctor
125,Paulita,Wyn,[email protected],doctor
126,Constance,Christine,[email protected],firefighter
127,Claresta,Kinnard,[email protected],developer
128,Leanna,Mendez,[email protected],developer
129,Corina,Chabot,[email protected],developer
130,Romona,Audly,[email protected],worker
131,Cordi,Lynn,[email protected],firefighter
132,Sheree,Tyson,[email protected],worker
133,Jinny,Bevin,[email protected],police officer
134,Kassey,Havens,[email protected],firefighter
135,Wanda,Thema,[email protected],developer
136,Vita,Jagir,[email protected],developer
137,Alie,Aprile,[email protected],firefighter
138,Modestia,Jena,[email protected],doctor
139,Cyndie,Pelagias,[email protected],worker
140,Ariela,Lilybelle,[email protected],firefighter
141,Jan,Parette,[email protected],firefighter
142,Merry,Horan,[email protected],developer
143,Katuscha,Candy,[email protected],police officer
144,Kerrin,Heisel,[email protected],developer
145,Nollie,Magdalen,[email protected],doctor
146,Karlee,Gordon,[email protected],developer
147,Dolli,Fadiman,[email protected],firefighter
148,Leontine,Delp,[email protected],worker
149,Ricky,Nadia,[email protected],doctor