Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

docs(samples): add samples for events #155

Merged
merged 30 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
71c7360
feat: Retail. Products importing code samples
tetiana-karasova Feb 15, 2022
4ad5c3c
🦉 Updates from OwlBot
gcf-owl-bot[bot] Feb 15, 2022
db7267b
lint fix
tetiana-karasova Feb 18, 2022
e7187cc
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 18, 2022
10509ae
update copyright year
parthea Feb 18, 2022
ad00a55
remove ClientOptions
parthea Feb 18, 2022
3d5e020
update requirements
parthea Feb 18, 2022
46c9c92
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 18, 2022
67d21fd
add requirement for pytest-xdist
parthea Feb 18, 2022
163c7d9
test samples on all py3.7+ versions
parthea Feb 18, 2022
67e5202
add EVENTS_BUCKET_NAME
parthea Feb 18, 2022
e5346e7
importing trsts fix
tetiana-karasova Feb 21, 2022
24fc710
importing trsts fix
tetiana-karasova Feb 21, 2022
3fe4424
Merge branch 'main' into tutorials-events-import
kweinmeister Feb 21, 2022
d14a1f9
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 21, 2022
520b56d
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 21, 2022
83b5d9f
Merge branch 'tutorials-events-import' of https://github.com/tetiana-…
gcf-owl-bot[bot] Feb 21, 2022
536c3e7
Update import_products_gcs_test.py
tetiana-karasova Feb 23, 2022
62ea25d
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 23, 2022
f653ac7
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 23, 2022
df59a4a
Merge branch 'tutorials-events-import' of https://github.com/tetiana-…
gcf-owl-bot[bot] Feb 23, 2022
ffa3d47
add google-cloud-testutils==1.3.1 to requirements-test.txt
parthea Feb 24, 2022
ea25857
rename setup->setup_events
parthea Feb 24, 2022
7326b30
fix tests
parthea Feb 24, 2022
6804765
lint
parthea Feb 24, 2022
51f350c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 24, 2022
d4a87bc
fix flaky tests; address review feedback
parthea Feb 25, 2022
68e910e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 25, 2022
58c2b96
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Feb 25, 2022
52f8db1
Merge branch 'tutorials-events-import' of https://github.com/tetiana-…
gcf-owl-bot[bot] Feb 25, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions samples/interactive-tutorials/events/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2022 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
import test_utils.prefixer

prefixer = test_utils.prefixer.Prefixer(
"python-retail", "samples/interactive-tutorials/product"
)


@pytest.fixture(scope="session")
def table_id_prefix() -> str:
return prefixer.create_prefix()


@pytest.fixture(scope="session")
def bucket_name_prefix() -> str:
return prefixer.create_prefix()
108 changes: 108 additions & 0 deletions samples/interactive-tutorials/events/import_user_events_big_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# Copyright 2022 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os

project_id = os.environ["GOOGLE_CLOUD_PROJECT"]


def main(project_id, dataset_id, table_id):
# [START retail_import_user_events_from_big_query]
# TODO: Set project_id to your Google Cloud Platform project ID.
# project_id = "my-project"

# TODO: Set dataset_id
# dataset_id = "user_events"

# TODO: Set table_id
# table_id = "events"

# Import products into a catalog from big query table using Retail API
import time

from google.cloud.retail import (
BigQuerySource,
ImportUserEventsRequest,
UserEventInputConfig,
UserEventServiceClient,
)

default_catalog = f"projects/{project_id}/locations/global/catalogs/default_catalog"

# TO CHECK ERROR HANDLING USE THE TABLE OF INVALID USER EVENTS:
# table_id = "events_some_invalid"

# get import user events from big query request
def get_import_events_big_query_request():
# TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
# default_catalog = "invalid_catalog_name"
big_query_source = BigQuerySource()
big_query_source.project_id = project_id
big_query_source.dataset_id = dataset_id
big_query_source.table_id = table_id
big_query_source.data_schema = "user_event"

input_config = UserEventInputConfig()
input_config.big_query_source = big_query_source

import_request = ImportUserEventsRequest()
import_request.parent = default_catalog
import_request.input_config = input_config

print("---import user events from BigQuery source request---")
print(import_request)

return import_request

# call the Retail API to import user events
def import_user_events_from_big_query():
import_big_query_request = get_import_events_big_query_request()
big_query_operation = UserEventServiceClient().import_user_events(
import_big_query_request
)

print("---the operation was started:----")
print(big_query_operation.operation.name)

while not big_query_operation.done():
print("---please wait till operation is done---")
time.sleep(30)
print("---import user events operation is done---")

if big_query_operation.metadata is not None:
print("---number of successfully imported events---")
print(big_query_operation.metadata.success_count)
print("---number of failures during the importing---")
print(big_query_operation.metadata.failure_count)
else:
print("---operation.metadata is empty---")

if big_query_operation.result is not None:
print("---operation result:---")
print(big_query_operation.result())
else:
print("---operation.result is empty---")

import_user_events_from_big_query()

# [END retail_import_user_events_from_big_query]


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("dataset_id", nargs="?", default="user_events")
parser.add_argument("table_id", nargs="?", default="events")
args = parser.parse_args()
main(project_id, args.dataset_id, args.table_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright 2022 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import subprocess

from setup_events.setup_cleanup import (
create_bq_dataset,
create_bq_table,
delete_bq_table,
upload_data_to_bq_table,
)
from setup_events.update_user_events_json import update_events_timestamp


def test_import_products_bq(table_id_prefix):
dataset = "user_events"
valid_products_table = f"{table_id_prefix}events"
product_schema = "../resources/events_schema.json"
valid_products_source_file = "../resources/user_events.json"

try:
update_events_timestamp("../resources/user_events.json")
update_events_timestamp("../resources/user_events_some_invalid.json")
create_bq_dataset(dataset)
create_bq_table(dataset, valid_products_table, product_schema)
upload_data_to_bq_table(
dataset, valid_products_table, valid_products_source_file, product_schema
)
output = str(
subprocess.check_output(
f"python import_user_events_big_query.py {dataset} {valid_products_table}",
shell=True,
)
)
finally:
delete_bq_table(dataset, valid_products_table)

assert re.match(
'.*import user events from BigQuery source request.*?parent: "projects/.*?/locations/global/catalogs/default_catalog.*',
output,
)
assert re.match(
".*import user events from BigQuery source request.*?input_config.*?big_query_source.*",
output,
)
assert re.match(
".*the operation was started.*?projects/.*?/locations/global/catalogs/default_catalog/operations/import-user-events.*",
output,
)
assert re.match(".*import user events operation is done.*", output)
assert re.match(".*number of successfully imported events.*", output)
assert re.match(".*number of failures during the importing.*?0.*", output)
assert re.match(".*operation result.*?errors_config.*", output)
115 changes: 115 additions & 0 deletions samples/interactive-tutorials/events/import_user_events_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright 2022 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import os


def main(bucket_name):
# [START retail_import_user_events_from_gcs]
# Import user events into a catalog from GCS using Retail API

import time

from google.cloud.retail import (
GcsSource,
ImportErrorsConfig,
ImportUserEventsRequest,
UserEventInputConfig,
UserEventServiceClient,
)

# Read the project number from the environment variable
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")

# Read bucket name from the environment variable
bucket_name = os.getenv("EVENTS_BUCKET_NAME")

# TODO: Developer set the bucket_name
# bucket_name = 'user_events_bucket'

default_catalog = "projects/{0}/locations/global/catalogs/default_catalog".format(
project_id
)

gcs_bucket = "gs://{}".format(bucket_name)
gcs_errors_bucket = "{}/error".format(gcs_bucket)
gcs_events_object = "user_events.json"

# TO CHECK ERROR HANDLING USE THE JSON WITH INVALID PRODUCT
# gcs_events_object = "user_events_some_invalid.json"

# get import user events from gcs request
def get_import_events_gcs_request(gcs_object_name: str):
# TO CHECK ERROR HANDLING PASTE THE INVALID CATALOG NAME HERE:
# default_catalog = "invalid_catalog_name"
gcs_source = GcsSource()
gcs_source.input_uris = [f"{gcs_bucket}/{gcs_object_name}"]

input_config = UserEventInputConfig()
input_config.gcs_source = gcs_source

errors_config = ImportErrorsConfig()
errors_config.gcs_prefix = gcs_errors_bucket

import_request = ImportUserEventsRequest()
import_request.parent = default_catalog
import_request.input_config = input_config
import_request.errors_config = errors_config

print("---import user events from google cloud source request---")
print(import_request)

return import_request

# call the Retail API to import user events
def import_user_events_from_gcs():
import_gcs_request = get_import_events_gcs_request(gcs_events_object)
gcs_operation = UserEventServiceClient().import_user_events(import_gcs_request)

print("---the operation was started:----")
print(gcs_operation.operation.name)

while not gcs_operation.done():
print("---please wait till operation is done---")
time.sleep(30)

print("---import user events operation is done---")

if gcs_operation.metadata is not None:
print("---number of successfully imported events---")
print(gcs_operation.metadata.success_count)
print("---number of failures during the importing---")
print(gcs_operation.metadata.failure_count)
else:
print("---operation.metadata is empty---")

if gcs_operation.result is not None:
print("---operation result:---")
print(gcs_operation.result())
else:
print("---operation.result is empty---")

import_user_events_from_gcs()


# [END retail_import_user_events_from_gcs]

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"bucket_name", nargs="?", default=os.environ["EVENTS_BUCKET_NAME"]
)
args = parser.parse_args()
main(args.bucket_name)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2022 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import subprocess

from setup_events.setup_cleanup import create_bucket, delete_bucket, upload_blob
from setup_events.update_user_events_json import update_events_timestamp


def test_import_events_gcs(bucket_name_prefix):
# gcs buckets have a limit of 63 characters. Get the last 60 characters
bucket_name = bucket_name_prefix[63:]

try:
update_events_timestamp("../resources/user_events.json")
update_events_timestamp("../resources/user_events_some_invalid.json")
create_bucket(bucket_name)
upload_blob(bucket_name, "../resources/user_events.json")

output = str(
subprocess.check_output("python import_user_events_gcs.py", shell=True)
)
finally:
delete_bucket(bucket_name)

assert re.match(
'.*import user events from google cloud source request.*?parent: "projects/.*?/locations/global/catalogs/default_catalog.*',
output,
)
assert re.match(
".*import user events from google cloud source request.*?input_config.*?gcs_source.*",
output,
)
assert re.match(
".*the operation was started.*?projects/.*?/locations/global/catalogs/default_catalog/operations/import-user-events.*",
output,
)
assert re.match(".*import user events operation is done.*", output)
assert re.match(".*number of successfully imported events.*?4.*", output)
assert re.match(".*number of failures during the importing.*?0.*", output)
assert re.match(".*operation result.*?errors_config.*", output)
Loading