-
Notifications
You must be signed in to change notification settings - Fork 16
docs(samples): add UpdateUserEventsJson implementation #388
Changes from all commits
b998aa2
4bedf81
68c44f0
c6cafa2
213a52f
756a39c
47cf534
8bd8e92
108a637
5f976e4
475ef6f
1df9f7a
8ee5596
d759ca0
54b8798
23b8ff4
8fffe9e
923cfb2
bb2dc2a
3ea57ae
611aadf
74f9c75
df6c157
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,8 +23,8 @@ | |
package events; | ||
|
||
import com.google.api.gax.rpc.InvalidArgumentException; | ||
import com.google.api.gax.rpc.PermissionDeniedException; | ||
import com.google.cloud.ServiceOptions; | ||
import com.google.cloud.bigquery.BigQueryException; | ||
import com.google.cloud.retail.v2.GcsSource; | ||
import com.google.cloud.retail.v2.ImportErrorsConfig; | ||
import com.google.cloud.retail.v2.ImportMetadata; | ||
|
@@ -34,91 +34,95 @@ | |
import com.google.cloud.retail.v2.UserEventServiceClient; | ||
import com.google.longrunning.Operation; | ||
import com.google.longrunning.OperationsClient; | ||
import events.setup.EventsCreateGcsBucket; | ||
import java.io.IOException; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public class ImportUserEventsGcs { | ||
|
||
public static void main(String[] args) throws IOException, InterruptedException { | ||
// TODO(developer): Replace these variables before running the sample. | ||
String projectId = ServiceOptions.getDefaultProjectId(); | ||
String defaultCatalog = | ||
String.format("projects/%s/locations/global/catalogs/default_catalog", projectId); | ||
// TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE: defaultCatalog = | ||
// "invalid_catalog_name" | ||
String gcsEventsObject = "user_events.json"; | ||
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: gcsEventsObject = | ||
// "user_events_some_invalid.json" | ||
String bucketName = System.getenv("EVENTS_BUCKET_NAME"); | ||
String gcsUserEventsObject = "user_events.json"; | ||
// TO CHECK ERROR HANDLING USE THE JSON WITH INVALID USER EVENT: | ||
// gcsEventsObject = "user_events_some_invalid.json" | ||
|
||
importUserEventsFromGcs(gcsEventsObject, defaultCatalog); | ||
importUserEventsFromGcs(defaultCatalog, bucketName, gcsUserEventsObject); | ||
} | ||
|
||
public static void importUserEventsFromGcs(String gcsEventsObject, String defaultCatalog) | ||
public static void importUserEventsFromGcs( | ||
String defaultCatalog, String bucketName, String gcsUserEventsObject) | ||
throws IOException, InterruptedException { | ||
try { | ||
String gcsBucket = String.format("gs://%s", EventsCreateGcsBucket.getBucketName()); | ||
String gcsErrorsBucket = String.format("%s/error", gcsBucket); | ||
|
||
GcsSource gcsSource = | ||
GcsSource.newBuilder() | ||
.addInputUris(String.format("%s/%s", gcsBucket, gcsEventsObject)) | ||
.build(); | ||
|
||
UserEventInputConfig inputConfig = | ||
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build(); | ||
|
||
ImportErrorsConfig errorsConfig = | ||
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build(); | ||
|
||
ImportUserEventsRequest importRequest = | ||
ImportUserEventsRequest.newBuilder() | ||
.setParent(defaultCatalog) | ||
.setInputConfig(inputConfig) | ||
.setErrorsConfig(errorsConfig) | ||
.build(); | ||
|
||
System.out.printf("Import user events from google cloud source request: %s%n", importRequest); | ||
|
||
// Initialize client that will be used to send requests. This client only needs to be created | ||
// once, and can be reused for multiple requests. After completing all of your requests, call | ||
// the "close" method on the client to safely clean up any remaining background resources. | ||
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) { | ||
String operationName = | ||
serviceClient.importUserEventsCallable().call(importRequest).getName(); | ||
|
||
System.out.printf("OperationName = %s\n", operationName); | ||
|
||
OperationsClient operationsClient = serviceClient.getOperationsClient(); | ||
Operation operation = operationsClient.getOperation(operationName); | ||
|
||
while (!operation.getDone()) { | ||
// Keep polling the operation periodically until the import task is done. | ||
int awaitDuration = 30000; | ||
Thread.sleep(awaitDuration); | ||
operation = operationsClient.getOperation(operationName); | ||
} | ||
|
||
if (operation.hasMetadata()) { | ||
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class); | ||
System.out.printf( | ||
"Number of successfully imported events: %s\n", metadata.getSuccessCount()); | ||
System.out.printf( | ||
"Number of failures during the importing: %s\n", metadata.getFailureCount()); | ||
} | ||
|
||
if (operation.hasResponse()) { | ||
ImportUserEventsResponse response = | ||
operation.getResponse().unpack(ImportUserEventsResponse.class); | ||
System.out.printf("Operation result: %s%n", response); | ||
} | ||
} catch (InvalidArgumentException e) { | ||
String gcsBucket = String.format("gs://%s", bucketName); | ||
String gcsErrorsBucket = String.format("%s/error", gcsBucket); | ||
|
||
GcsSource gcsSource = | ||
GcsSource.newBuilder() | ||
.addInputUris(String.format("%s/%s", gcsBucket, gcsUserEventsObject)) | ||
.build(); | ||
|
||
UserEventInputConfig inputConfig = | ||
UserEventInputConfig.newBuilder().setGcsSource(gcsSource).build(); | ||
|
||
System.out.println("GRS source: " + gcsSource.getInputUrisList()); | ||
|
||
ImportErrorsConfig errorsConfig = | ||
ImportErrorsConfig.newBuilder().setGcsPrefix(gcsErrorsBucket).build(); | ||
|
||
ImportUserEventsRequest importRequest = | ||
ImportUserEventsRequest.newBuilder() | ||
.setParent(defaultCatalog) | ||
.setInputConfig(inputConfig) | ||
.setErrorsConfig(errorsConfig) | ||
.build(); | ||
System.out.printf("Import user events from google cloud source request: %s%n", importRequest); | ||
|
||
// Initialize client that will be used to send requests. This client only | ||
// needs to be created once, and can be reused for multiple requests. After | ||
// completing all of your requests, call the "close" method on the client to | ||
// safely clean up any remaining background resources. | ||
try (UserEventServiceClient serviceClient = UserEventServiceClient.create()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sure to include the comment regarding client best practices. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
String operationName = serviceClient.importUserEventsCallable().call(importRequest).getName(); | ||
|
||
System.out.println("The operation was started."); | ||
System.out.printf("OperationName = %s%n", operationName); | ||
|
||
OperationsClient operationsClient = serviceClient.getOperationsClient(); | ||
Operation operation = operationsClient.getOperation(operationName); | ||
|
||
long assuredBreak = System.currentTimeMillis() + 60000; // 60 seconds delay | ||
|
||
while (!operation.getDone() || System.currentTimeMillis() < assuredBreak) { | ||
System.out.println("Please wait till operation is done."); | ||
TimeUnit.SECONDS.sleep(30); | ||
operation = operationsClient.getOperation(operationName); | ||
} | ||
|
||
if (operation.hasMetadata()) { | ||
ImportMetadata metadata = operation.getMetadata().unpack(ImportMetadata.class); | ||
System.out.printf( | ||
"Given GCS input path was not found. %n%s%n " | ||
+ "Please run CreateTestResources class to create resources.", | ||
e.getMessage()); | ||
"Number of successfully imported events: %s%n", metadata.getSuccessCount()); | ||
System.out.printf( | ||
"Number of failures during the importing: %s%n", metadata.getFailureCount()); | ||
} else { | ||
System.out.println("Metadata is empty."); | ||
} | ||
|
||
if (operation.hasResponse()) { | ||
ImportUserEventsResponse response = | ||
operation.getResponse().unpack(ImportUserEventsResponse.class); | ||
System.out.printf("Operation result: %s%n", response); | ||
} else { | ||
System.out.println("Operation result is empty."); | ||
} | ||
} catch (BigQueryException e) { | ||
System.out.printf("Exception message: %s", e.getMessage()); | ||
} catch (InvalidArgumentException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When can this "InvalidArgumentException" happen? Can we add a try/catch for this exception just where it's relevant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's relevant in that case when bucket has no specified file. |
||
System.out.printf( | ||
"%s%n'%s' file does not exist in the bucket. Please " | ||
+ "make sure you have followed the setting up instructions.", | ||
e.getMessage(), gcsUserEventsObject); | ||
} catch (PermissionDeniedException e) { | ||
System.out.println(e.getMessage()); | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just be a separate test in your tests file. You can have one where you call a valid file (e.g.
importUimportUserEventsFromGcs("your-catalog", "user-events.json
) and one where you call an invalid file (e.g.importUimportUserEventsFromGcs("your-catalog", "some-invalid-file.json
). You shouldn't be depending on someone to edit a comment out, because that means it's not being run the in the CI.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.