Skip to content

Commit

Permalink
feat: change stream retention to create and update table (#1823)
Browse files Browse the repository at this point in the history
* feat: add change stream retention to create table

Change-Id: I6eec06f3e3178143150490aa5fd97b83b1878cd2

* Remove creating change stream enabled table in IT because we can't delete change stream enabled table

Change-Id: Ia2f92ccae3a2582da771b68a26adc1ab5f9d516e

* feat: add update table with change stream retention

Change-Id: I485507dd224eb0a3a160f7e9b5c569e1ae13ed84

* Disable change stream at end of IT so the table can be deleted

Change-Id: I9b64e208ba3bf47f39a7bac3e4a3eb851b2c9468

* Add change stream retention to create table IT

Change-Id: I8076da5631aa4dc4f97bba88c8f799303e81a65f

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Tony Tang <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Jul 5, 2023
1 parent 1893be7 commit 05fca58
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.cloud.bigtable.admin.v2.models.RestoredTableResult;
import com.google.cloud.bigtable.admin.v2.models.Table;
import com.google.cloud.bigtable.admin.v2.models.UpdateBackupRequest;
import com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest;
import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -258,6 +259,72 @@ public ApiFuture<Table> createTableAsync(CreateTableRequest request) {
this.stub.createTableCallable().futureCall(request.toProto(projectId, instanceId)));
}

/**
* Update a table with the specified configuration.
*
* <p>Sample code:
*
* <pre>{@code
* // Alter change stream retention period.
* Table table = client.updateTable(
* UpdateTableRequest.of("my-table")
* .addChangeStreamRetention(Duration.ofHours(24))
* );
*
* // Disable change stream
* Table table = client.updateTable(
* UpdateTableRequest.of("my-table")
* .disableChangeStream()
* );
* }</pre>
*
* @see UpdateTableRequest for available options.
*/
public Table updateTable(UpdateTableRequest request) {
return ApiExceptions.callAndTranslateApiException(updateTableAsync(request));
}

/**
* Asynchronously update a table with the specified configuration.
*
* <p>Sample code:
*
* <pre>{@code
* // Update table to 1 day change stream retention.
* ApiFuture<Table> tableFuture = client.createTableAsync(
* UpdateTableRequest.of("my-table")
* .addChangeStreamRetention(Duration.ofHours(24))
* );
*
* ApiFutures.addCallback(
* tableFuture,
* new ApiFutureCallback<Table>() {
* public void onSuccess(Table table) {
* System.out.println("Updated table: " + table.getTableName());
* }
*
* public void onFailure(Throwable t) {
* t.printStackTrace();
* }
* },
* MoreExecutors.directExecutor()
* );
* }</pre>
*
* @see UpdateTableRequest for available options.
*/
public ApiFuture<Table> updateTableAsync(UpdateTableRequest request) {
return ApiFutures.transform(
stub.updateTableOperationCallable().futureCall(request.toProto(projectId, instanceId)),
new ApiFunction<com.google.bigtable.admin.v2.Table, Table>() {
@Override
public Table apply(com.google.bigtable.admin.v2.Table tableProto) {
return Table.fromProto(tableProto);
}
},
MoreExecutors.directExecutor());
}

/**
* Creates, updates and drops column families as specified in the request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package com.google.cloud.bigtable.admin.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.admin.v2.ChangeStreamConfig;
import com.google.bigtable.admin.v2.ColumnFamily;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.cloud.bigtable.admin.v2.models.GCRules.GCRule;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* Fluent wrapper for {@link com.google.bigtable.admin.v2.CreateTableRequest}
Expand Down Expand Up @@ -76,6 +78,22 @@ public CreateTableRequest addSplit(ByteString key) {
return this;
}

/** Add change stream retention period between 1 day and 7 days */
public CreateTableRequest addChangeStreamRetention(Duration retention) {
Preconditions.checkNotNull(retention);
requestBuilder
.getTableBuilder()
.setChangeStreamConfig(
ChangeStreamConfig.newBuilder()
.setRetentionPeriod(
com.google.protobuf.Duration.newBuilder()
.setSeconds(retention.getSeconds())
.setNanos(retention.getNano())
.build())
.build());
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/** Wrapper for {@link Table} protocol buffer object */
public final class Table {
Expand Down Expand Up @@ -103,6 +104,8 @@ public com.google.bigtable.admin.v2.Table.ClusterState.ReplicationState toProto(
private final Map<String, ReplicationState> replicationStatesByClusterId;
private final List<ColumnFamily> columnFamilies;

private final Duration changeStreamRetention;

@InternalApi
public static Table fromProto(@Nonnull com.google.bigtable.admin.v2.Table proto) {
ImmutableMap.Builder<String, ReplicationState> replicationStates = ImmutableMap.builder();
Expand All @@ -120,18 +123,31 @@ public static Table fromProto(@Nonnull com.google.bigtable.admin.v2.Table proto)
columnFamilies.add(ColumnFamily.fromProto(entry.getKey(), entry.getValue()));
}

Duration changeStreamConfig = null;
if (proto.hasChangeStreamConfig()) {
changeStreamConfig =
Duration.ofSeconds(
proto.getChangeStreamConfig().getRetentionPeriod().getSeconds(),
proto.getChangeStreamConfig().getRetentionPeriod().getNanos());
}

return new Table(
TableName.parse(proto.getName()), replicationStates.build(), columnFamilies.build());
TableName.parse(proto.getName()),
replicationStates.build(),
columnFamilies.build(),
changeStreamConfig);
}

private Table(
TableName tableName,
Map<String, ReplicationState> replicationStatesByClusterId,
List<ColumnFamily> columnFamilies) {
List<ColumnFamily> columnFamilies,
Duration changeStreamRetention) {
this.instanceId = tableName.getInstance();
this.id = tableName.getTable();
this.replicationStatesByClusterId = replicationStatesByClusterId;
this.columnFamilies = columnFamilies;
this.changeStreamRetention = changeStreamRetention;
}

/** Gets the table's id. */
Expand All @@ -152,6 +168,10 @@ public List<ColumnFamily> getColumnFamilies() {
return columnFamilies;
}

public Duration getChangeStreamRetention() {
return changeStreamRetention;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -164,11 +184,13 @@ public boolean equals(Object o) {
return Objects.equal(id, table.id)
&& Objects.equal(instanceId, table.instanceId)
&& Objects.equal(replicationStatesByClusterId, table.replicationStatesByClusterId)
&& Objects.equal(columnFamilies, table.columnFamilies);
&& Objects.equal(columnFamilies, table.columnFamilies)
&& Objects.equal(changeStreamRetention, table.changeStreamRetention);
}

@Override
public int hashCode() {
return Objects.hashCode(id, instanceId, replicationStatesByClusterId, columnFamilies);
return Objects.hashCode(
id, instanceId, replicationStatesByClusterId, columnFamilies, changeStreamRetention);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigtable.admin.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.admin.v2.ChangeStreamConfig;
import com.google.cloud.bigtable.admin.v2.internal.NameUtil;
import com.google.common.base.Preconditions;
import java.util.Objects;
import org.threeten.bp.Duration;

/**
* Wrapper for {@link com.google.bigtable.admin.v2.UpdateTableRequest}
*
* <p>Allows for updating table:
*
* <ul>
* <li>Change stream retention period.
* </ul>
*/
public class UpdateTableRequest {

private final String tableId;

private final com.google.bigtable.admin.v2.UpdateTableRequest.Builder requestBuilder =
com.google.bigtable.admin.v2.UpdateTableRequest.newBuilder();

public static UpdateTableRequest of(String tableId) {
return new UpdateTableRequest(tableId);
}

private UpdateTableRequest(String tableId) {
this.tableId = tableId;
}

/** Update change stream retention period between 1 day and 7 days. */
public UpdateTableRequest addChangeStreamRetention(Duration retention) {
Preconditions.checkNotNull(retention);
if (!retention.isZero()) {
requestBuilder
.getTableBuilder()
.setChangeStreamConfig(
ChangeStreamConfig.newBuilder()
.setRetentionPeriod(
com.google.protobuf.Duration.newBuilder()
.setSeconds(retention.getSeconds())
.setNanos(retention.getNano())
.build())
.build());
requestBuilder.getUpdateMaskBuilder().addPaths("change_stream_config.retention_period");
} else {
requestBuilder.getTableBuilder().clearChangeStreamConfig();
requestBuilder.getUpdateMaskBuilder().addPaths("change_stream_config");
}
return this;
}

/** Disable change stream for table */
public UpdateTableRequest disableChangeStreamRetention() {
return addChangeStreamRetention(Duration.ZERO);
}

@InternalApi
public com.google.bigtable.admin.v2.UpdateTableRequest toProto(
String projectId, String instanceId) {
requestBuilder
.getTableBuilder()
.setName(NameUtil.formatTableName(projectId, instanceId, tableId));
return requestBuilder.build();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof UpdateTableRequest)) return false;
UpdateTableRequest that = (UpdateTableRequest) o;
return Objects.equals(requestBuilder, that.requestBuilder);
}

@Override
public int hashCode() {
return Objects.hash(requestBuilder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.gax.rpc.testing.FakeOperationSnapshot;
import com.google.bigtable.admin.v2.Backup.State;
import com.google.bigtable.admin.v2.BackupInfo;
import com.google.bigtable.admin.v2.ChangeStreamConfig;
import com.google.bigtable.admin.v2.ColumnFamily;
import com.google.bigtable.admin.v2.CreateBackupMetadata;
import com.google.bigtable.admin.v2.DeleteBackupRequest;
Expand All @@ -45,6 +46,7 @@
import com.google.bigtable.admin.v2.Table.ClusterState;
import com.google.bigtable.admin.v2.Table.View;
import com.google.bigtable.admin.v2.TableName;
import com.google.bigtable.admin.v2.UpdateTableMetadata;
import com.google.cloud.Identity;
import com.google.cloud.Policy;
import com.google.cloud.Role;
Expand All @@ -68,6 +70,7 @@
import com.google.common.io.BaseEncoding;
import com.google.longrunning.Operation;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Empty;
import com.google.protobuf.Timestamp;
import com.google.protobuf.util.Timestamps;
Expand Down Expand Up @@ -117,6 +120,13 @@ public class BigtableTableAdminClientTests {
com.google.bigtable.admin.v2.CreateTableRequest, com.google.bigtable.admin.v2.Table>
mockCreateTableCallable;

@Mock
private OperationCallable<
com.google.bigtable.admin.v2.UpdateTableRequest,
com.google.bigtable.admin.v2.Table,
UpdateTableMetadata>
mockUpdateTableOperationCallable;

@Mock
private UnaryCallable<
com.google.bigtable.admin.v2.ModifyColumnFamiliesRequest,
Expand Down Expand Up @@ -204,6 +214,40 @@ public void testCreateTable() {
assertThat(result).isEqualTo(Table.fromProto(expectedResponse));
}

@Test
public void testUpdateTable() {
// Setup
Mockito.when(mockStub.updateTableOperationCallable())
.thenReturn(mockUpdateTableOperationCallable);

com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest request =
com.google.cloud.bigtable.admin.v2.models.UpdateTableRequest.of(TABLE_ID)
.addChangeStreamRetention(org.threeten.bp.Duration.ofHours(24));

com.google.bigtable.admin.v2.Table expectedResponse =
com.google.bigtable.admin.v2.Table.newBuilder()
.setName(TABLE_NAME)
.setChangeStreamConfig(
ChangeStreamConfig.newBuilder()
.setRetentionPeriod(Duration.newBuilder().setSeconds(86400).build())
.build())
.build();

mockOperationResult(
mockUpdateTableOperationCallable,
request.toProto(PROJECT_ID, INSTANCE_ID),
expectedResponse,
UpdateTableMetadata.newBuilder().setName(TABLE_NAME).build());

// Execute
Table actualResult = adminClient.updateTable(request);

// Verify
assertThat(actualResult.getId()).isEqualTo(TABLE_ID);
assertThat(actualResult.getChangeStreamRetention())
.isEqualTo(org.threeten.bp.Duration.ofHours(24));
}

@Test
public void testModifyFamilies() {
// Setup
Expand Down
Loading

0 comments on commit 05fca58

Please sign in to comment.