From a70d683c909d5d0f9c31c190a6b7d4e0569f4a85 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Mon, 7 Oct 2024 13:57:44 -0700 Subject: [PATCH 1/8] test: restore original udf in the remote function test, use supported type in the test data (#996) * test: fix test_df_apply_axis_1_complex by converting numpy value * undo all changes * improve numpy value handling in gcf code * enable the multiindex axis=1 test back * nit reword comment * Revert "improve numpy value handling in gcf code" This reverts commit f549683a2fe98b825f56fca81e9dba4523567bc0. --------- Co-authored-by: Shobhit Singh --- tests/system/large/test_remote_function.py | 42 ++++++++++++---------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 18d2609347..2365002857 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1670,7 +1670,11 @@ def analyze(row): (3, 4): ["pq", "rs", "tu"], (5.0, "six", 7): [8, 9, 10], 'raise Exception("hacked!")': [11, 12, 13], - } + }, + # Default pandas index has non-numpy type, whereas bigframes is + # always numpy-based type, so let's use the index compatible + # with bigframes. See more details in b/369689696. + index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()), ), id="all-kinds-of-column-names", ), @@ -1681,17 +1685,22 @@ def analyze(row): "y": [1.5, 3.75, 5], "z": ["pq", "rs", "tu"], }, - index=pandas.MultiIndex.from_tuples( - [ - ("a", 100), - ("a", 200), - ("b", 300), - ] + index=pandas.MultiIndex.from_frame( + pandas.DataFrame( + { + "idx0": pandas.Series( + ["a", "a", "b"], dtype=pandas.StringDtype() + ), + "idx1": pandas.Series( + [100, 200, 300], dtype=pandas.Int64Dtype() + ), + } + ) ), ), id="multiindex", marks=pytest.mark.skip( - reason="TODO(b/368639580) revert this skip after fix" + reason="TODO: revert this skip after this pandas bug is fixed: https://github.com/pandas-dev/pandas/issues/59908" ), ), pytest.param( @@ -1701,6 +1710,10 @@ def analyze(row): [20, 3.75, "rs"], [30, 8.0, "tu"], ], + # Default pandas index has non-numpy type, whereas bigframes is + # always numpy-based type, so let's use the index compatible + # with bigframes. See more details in b/369689696. + index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()), columns=pandas.MultiIndex.from_arrays( [ ["first", "last_two", "last_two"], @@ -1729,10 +1742,8 @@ def test_df_apply_axis_1_complex(session, pd_df): def serialize_row(row): custom = { - "name": row.name.item() if hasattr(row.name, "item") else row.name, - "index": [ - idx.item() if hasattr(idx, "item") else idx for idx in row.index - ], + "name": row.name, + "index": [idx for idx in row.index], "values": [ val.item() if hasattr(val, "item") else val for val in row.values ], @@ -1756,12 +1767,7 @@ def serialize_row(row): bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) - # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' - # , ignore this mismatch by using check_dtype=False. - # - # bf_result.index[0].dtype is 'string[pyarrow]' while - # pd_result.index[0].dtype is 'object', ignore this mismatch by using - # check_index_type=False. + # ignore known dtype difference between pandas and bigframes pandas.testing.assert_series_equal( pd_result, bf_result, check_dtype=False, check_index_type=False ) From 1162b3b1cf1297dd3f9a216864ac33139ab2e302 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Oct 2024 15:10:06 -0700 Subject: [PATCH 2/8] chore: implement semantic search (#1058) --- bigframes/operations/semantics.py | 96 ++++- .../experimental/semantic_operators.ipynb | 342 ++++++++++++++++-- tests/system/large/operations/conftest.py | 7 + .../system/large/operations/test_semantics.py | 62 ++++ 4 files changed, 481 insertions(+), 26 deletions(-) diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index 1a4ff90b4e..cb24633ed5 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -15,7 +15,7 @@ import re import typing -from typing import List +from typing import List, Optional import bigframes @@ -279,6 +279,100 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): return joined_df.semantics.filter(instruction, model).reset_index(drop=True) + def search( + self, + search_column: str, + query: str, + top_k: int, + model, + score_column: Optional[str] = None, + ): + """ + Performs semantic search on the DataFrame. + + ** Examples: ** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> import bigframes + >>> bigframes.options.experiments.semantic_operators = True + + >>> import bigframes.ml.llm as llm + >>> model = llm.TextEmbeddingGenerator(model_name="text-embedding-004") + + >>> df = bpd.DataFrame({"creatures": ["salmon", "sea urchin", "frog", "chimpanzee"]}) + >>> df.semantics.search("creatures", "monkey", top_k=1, model=model, score_column='distance') + creatures distance + 3 chimpanzee 0.781101 + + [1 rows x 2 columns] + + Args: + search_column: + The name of the column to search from. + query (str): + The search query. + top_k (int): + The number of nearest neighbors to return. + model (TextEmbeddingGenerator): + A TextEmbeddingGenerator provided by Bigframes ML package. + score_column (Optional[str], default None): + The name of the the additional column containning the similarity scores. If None, + this column won't be attached to the result. + + Returns: + DataFrame: the DataFrame with the search result. + + Raises: + ValueError: when the search_column is not found from the the data frame. + TypeError: when the provided model is not TextEmbeddingGenerator. + """ + + if search_column not in self._df.columns: + raise ValueError(f"Column {search_column} not found") + + import bigframes.ml.llm as llm + + if not isinstance(model, llm.TextEmbeddingGenerator): + raise TypeError(f"Expect a text embedding model, but got: {type(model)}") + + embedded_df = model.predict(self._df[search_column]) + embedded_table = embedded_df.reset_index().to_gbq() + + import bigframes.pandas as bpd + + embedding_result_column = "ml_generate_embedding_result" + query_df = model.predict(bpd.DataFrame({"query_id": [query]})).rename( + columns={"content": "query_id", embedding_result_column: "embedding"} + ) + + import bigframes.bigquery as bbq + + search_result = ( + bbq.vector_search( + base_table=embedded_table, + column_to_search=embedding_result_column, + query=query_df, + top_k=top_k, + ) + .rename(columns={"content": search_column}) + .set_index("index") + ) + + search_result.index.name = self._df.index.name + + if score_column is not None: + search_result = search_result.rename(columns={"distance": score_column})[ + [search_column, score_column] + ] + else: + search_result = search_result[[search_column]] + + import bigframes.dataframe + + return typing.cast(bigframes.dataframe.DataFrame, search_result) + def _validate_model(model): from bigframes.ml.llm import GeminiTextGenerator diff --git a/notebooks/experimental/semantic_operators.ipynb b/notebooks/experimental/semantic_operators.ipynb index 80ed1a35a5..96813159b2 100644 --- a/notebooks/experimental/semantic_operators.ipynb +++ b/notebooks/experimental/semantic_operators.ipynb @@ -65,7 +65,7 @@ { "data": { "text/html": [ - "Query job b113bf05-6b85-41bc-8754-aebab5d6c65e is DONE. 0 Bytes processed. Open Job" + "Query job 3ab1bb82-0634-4a31-8ce1-0bac7c06887d is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -77,7 +77,7 @@ ], "source": [ "import bigframes.ml.llm as llm\n", - "model = llm.GeminiTextGenerator(model_name=llm._GEMINI_1P5_FLASH_001_ENDPOINT)" + "gemini_model = llm.GeminiTextGenerator(model_name=llm._GEMINI_1P5_FLASH_001_ENDPOINT)" ] }, { @@ -95,7 +95,7 @@ { "data": { "text/html": [ - "Query job 105c8fa5-6c33-48c7-9526-25fc5ac0013d is DONE. 0 Bytes processed. Open Job" + "Query job a3652c48-9c00-438b-8443-ef11b8a4c81d is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -115,7 +115,7 @@ { "data": { "text/html": [ - "Query job cf8e6bdd-1845-412b-ac13-16f03e968262 is DONE. 4 Bytes processed. Open Job" + "Query job dd133a7f-22a5-45a1-bedb-65c2cebd74ec is DONE. 4 Bytes processed. Open Job" ], "text/plain": [ "" @@ -127,7 +127,7 @@ { "data": { "text/html": [ - "Query job 28b6a9e8-5742-421b-b79c-f2fa647ebd10 is DONE. 33 Bytes processed. Open Job" + "Query job d162b1a6-76e3-4c1c-9d06-7cc8792297e1 is DONE. 33 Bytes processed. Open Job" ], "text/plain": [ "" @@ -139,7 +139,7 @@ { "data": { "text/html": [ - "Query job 3d3e530f-4dde-447e-939d-eaf3b072f75b is DONE. 33 Bytes processed. Open Job" + "Query job 3b8212fb-07ba-4e6f-afa9-db98becd59fe is DONE. 33 Bytes processed. Open Job" ], "text/plain": [ "" @@ -198,7 +198,7 @@ ], "source": [ "df = bpd.DataFrame({'country': ['USA', 'Germany'], 'city': ['Seattle', 'Berlin']})\n", - "df.semantics.filter(\"{city} is the capital of {country}\", model)" + "df.semantics.filter(\"{city} is the capital of {country}\", gemini_model)" ] }, { @@ -227,7 +227,7 @@ { "data": { "text/html": [ - "Query job 25b719ed-c1ad-45d9-9926-0a3408aff33d is DONE. 0 Bytes processed. Open Job" + "Query job ee842f5d-3572-4283-8161-f7b7cee51b06 is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -247,7 +247,7 @@ { "data": { "text/html": [ - "Query job 010de4ac-7e7d-4aa2-9e73-34e2db77f2c8 is DONE. 4 Bytes processed. Open Job" + "Query job c2f1829d-b0b5-4316-942d-ab6881ad09bb is DONE. 4 Bytes processed. Open Job" ], "text/plain": [ "" @@ -259,7 +259,7 @@ { "data": { "text/html": [ - "Query job e5688060-a489-44bb-91e1-0272ca2d836b is DONE. 34 Bytes processed. Open Job" + "Query job 066c9101-c069-450f-8189-810b474fa5f2 is DONE. 34 Bytes processed. Open Job" ], "text/plain": [ "" @@ -271,7 +271,7 @@ { "data": { "text/html": [ - "Query job 7eeea068-83dd-4451-a18d-ae06525097d0 is DONE. 93 Bytes processed. Open Job" + "Query job e0bb869a-e3ef-49c2-8a9e-7dd343b06cbb is DONE. 93 Bytes processed. Open Job" ], "text/plain": [ "" @@ -340,7 +340,7 @@ } ], "source": [ - "df.semantics.map(\"What is the food made from {ingredient_1} and {ingredient_2}? One word only.\", output_column=\"food\", model=model)" + "df.semantics.map(\"What is the food made from {ingredient_1} and {ingredient_2}? One word only.\", output_column=\"food\", model=gemini_model)" ] }, { @@ -368,7 +368,7 @@ { "data": { "text/html": [ - "Query job 5467ef2f-832a-44c0-a4f8-f3b92f495348 is DONE. 0 Bytes processed. Open Job" + "Query job 2857268d-a5f6-47bc-9819-eef64c89dd83 is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -388,7 +388,7 @@ { "data": { "text/html": [ - "Query job f3123dc6-ec48-444f-a940-7b768cf6341a is DONE. 30 Bytes processed. Open Job" + "Query job ebea0da7-ed07-480b-84dd-2abbd1fb5258 is DONE. 30 Bytes processed. Open Job" ], "text/plain": [ "" @@ -400,7 +400,7 @@ { "data": { "text/html": [ - "Query job 71c00675-0639-4d75-b5d2-5fb150f1aa54 is DONE. 251 Bytes processed. Open Job" + "Query job 8971d7c6-11de-47ea-b6c9-6b59cd6b58d2 is DONE. 251 Bytes processed. Open Job" ], "text/plain": [ "" @@ -412,7 +412,7 @@ { "data": { "text/html": [ - "Query job f64ede6e-f4ed-4764-a82a-8fefce20dfca is DONE. 144 Bytes processed. Open Job" + "Query job 7f90c077-48a9-433f-9c6b-ef7bc775e8ec is DONE. 144 Bytes processed. Open Job" ], "text/plain": [ "" @@ -488,7 +488,7 @@ } ], "source": [ - "cities.semantics.join(continents, \"{city} is in {continent}\", model)" + "cities.semantics.join(continents, \"{city} is in {continent}\", gemini_model)" ] }, { @@ -500,7 +500,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 9, "metadata": {}, "outputs": [], "source": [ @@ -509,13 +509,13 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ - "Query job ca4abfb9-6fc9-4d70-b948-d426ec85e83f is DONE. 0 Bytes processed. Open Job" + "Query job f8aaae82-3622-4ec1-8e42-65a1712f26e2 is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -535,7 +535,7 @@ { "data": { "text/html": [ - "Query job 25e0cdb2-9cfd-4d5e-91ea-34116af5dd91 is DONE. 32 Bytes processed. Open Job" + "Query job 511dbbca-8514-4ba6-bcca-93fbe961a2c9 is DONE. 32 Bytes processed. Open Job" ], "text/plain": [ "" @@ -547,7 +547,7 @@ { "data": { "text/html": [ - "Query job 9b93cd37-51f2-4908-91e3-369380d43028 is DONE. 266 Bytes processed. Open Job" + "Query job 2cca3eee-db60-4faa-9f26-6656002f0200 is DONE. 266 Bytes processed. Open Job" ], "text/plain": [ "" @@ -559,7 +559,7 @@ { "data": { "text/html": [ - "Query job 552be84f-e6e7-46b3-ab74-3e8261ccb396 is DONE. 180 Bytes processed. Open Job" + "Query job 91385afd-1107-4162-973b-3a8da98541ee is DONE. 180 Bytes processed. Open Job" ], "text/plain": [ "" @@ -641,13 +641,305 @@ "[6 rows x 2 columns]" ] }, - "execution_count": 14, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "animals.semantics.join(animals, \"{animal_left} generally weighs heavier than {animal_right}\", gemini_model)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Semantic Search" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 48aafee2-4948-4677-ab02-a94a71b9f6e2 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "text_embedding_model = llm.TextEmbeddingGenerator(model_name=\"text-embedding-004\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job b03ec7b9-dfd6-4c0c-9eb6-484a999c913e is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
creatures
0salmon
1sea urchin
2baboons
3frog
4chimpanzee
\n", + "

5 rows × 1 columns

\n", + "
[5 rows x 1 columns in total]" + ], + "text/plain": [ + " creatures\n", + "0 salmon\n", + "1 sea urchin\n", + "2 baboons\n", + "3 frog\n", + "4 chimpanzee\n", + "\n", + "[5 rows x 1 columns]" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = bpd.DataFrame({\"creatures\": [\"salmon\", \"sea urchin\", \"baboons\", \"frog\", \"chimpanzee\"]})\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 93fbc66e-4f98-4cdb-bb3a-5af738d6fd67 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/google/home/sycai/src/python-bigquery-dataframes/bigframes/core/__init__.py:112: PreviewWarning: Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.\n", + " warnings.warn(\n" + ] + }, + { + "data": { + "text/html": [ + "Query job 34a9725a-058a-4930-bc78-8372355d0c85 is DONE. 10 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job ed533856-32fe-4012-8d72-ffc73c41418c is DONE. 30.9 kB processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job 91f67984-652b-44d4-a69a-91c3e1a3d2b1 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/google/home/sycai/src/python-bigquery-dataframes/bigframes/core/__init__.py:112: PreviewWarning: Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.\n", + " warnings.warn(\n" + ] + }, + { + "data": { + "text/html": [ + "Query job f866ce11-f832-4c25-b3a7-1dce683b7553 is DONE. 2 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job 41e22a53-6ef8-44c5-9cdd-01d99fc1c2fc is RUNNING. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job 57f809b4-7e2a-46ba-992d-d33b64b92664 is DONE. 37.2 kB processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job d512a274-0c54-4c53-a92e-20b111477697 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
creaturessimilarity score
2baboons0.773411
4chimpanzee0.781101
\n", + "

2 rows × 2 columns

\n", + "
[2 rows x 2 columns in total]" + ], + "text/plain": [ + " creatures similarity score\n", + "2 baboons 0.773411\n", + "4 chimpanzee 0.781101\n", + "\n", + "[2 rows x 2 columns]" + ] + }, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "animals.semantics.join(animals, \"{animal_left} generally weighs heavier than {animal_right}\", model)" + "df.semantics.search(\"creatures\", \"monkey\", top_k = 2, model = text_embedding_model, score_column='similarity score')" ] } ], diff --git a/tests/system/large/operations/conftest.py b/tests/system/large/operations/conftest.py index a85df4c5df..7ab3811f10 100644 --- a/tests/system/large/operations/conftest.py +++ b/tests/system/large/operations/conftest.py @@ -24,3 +24,10 @@ def gemini_flash_model(session, bq_connection) -> llm.GeminiTextGenerator: connection_name=bq_connection, model_name="gemini-1.5-flash-001", ) + + +@pytest.fixture(scope="session") +def text_embedding_generator(session, bq_connection) -> llm.TextEmbeddingGenerator: + return llm.TextEmbeddingGenerator( + session=session, connection_name=bq_connection, model_name="text-embedding-004" + ) diff --git a/tests/system/large/operations/test_semantics.py b/tests/system/large/operations/test_semantics.py index a2c73b5d26..f0e13640e0 100644 --- a/tests/system/large/operations/test_semantics.py +++ b/tests/system/large/operations/test_semantics.py @@ -302,3 +302,65 @@ def test_join_invalid_model_raise_error(): with pytest.raises(ValueError): cities.semantics.join(countries, "{city} is in {country}", None) + + +@pytest.mark.parametrize( + "score_column", + [ + pytest.param(None, id="no_score_column"), + pytest.param("distance", id="has_score_column"), + ], +) +def test_search(session, text_embedding_generator, score_column): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + data={"creatures": ["salmon", "sea urchin", "baboons", "frog", "chimpanzee"]}, + session=session, + ) + + actual_result = df.semantics.search( + "creatures", + "monkey", + top_k=2, + model=text_embedding_generator, + score_column=score_column, + ).to_pandas() + + expected_result = pd.Series( + ["baboons", "chimpanzee"], index=[2, 4], name="creatures" + ) + pandas.testing.assert_series_equal( + actual_result["creatures"], + expected_result, + check_dtype=False, + check_index_type=False, + ) + + if score_column is None: + assert len(actual_result.columns) == 1 + else: + assert score_column in actual_result.columns + + +def test_search_invalid_column_raises_error(session, text_embedding_generator): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + data={"creatures": ["salmon", "sea urchin", "baboons", "frog", "chimpanzee"]}, + session=session, + ) + + with pytest.raises(ValueError): + df.semantics.search( + "whatever", "monkey", top_k=2, model=text_embedding_generator + ) + + +def test_search_invalid_model_raises_error(session): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + data={"creatures": ["salmon", "sea urchin", "baboons", "frog", "chimpanzee"]}, + session=session, + ) + + with pytest.raises(TypeError): + df.semantics.search("creatures", "monkey", top_k=2, model=None) From 02c2da733b834b99d8044f3c5cac3ac9a85802a6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 7 Oct 2024 16:53:27 -0700 Subject: [PATCH 3/8] fix: show warning for unknown location set through .ctor (#1052) * fix: show warning for unknown location set through .ctor * update expected location in bigframes session --- bigframes/_config/bigquery_options.py | 2 +- tests/system/large/test_location.py | 2 +- tests/unit/_config/test_bigquery_options.py | 50 ++++++++++++++------- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 42007a388e..afb0f00a27 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -94,7 +94,7 @@ def __init__( ): self._credentials = credentials self._project = project - self._location = location + self._location = _get_validated_location(location) self._bq_connection = bq_connection self._use_regional_endpoints = use_regional_endpoints self._application_name = application_name diff --git a/tests/system/large/test_location.py b/tests/system/large/test_location.py index 2ef002d7e0..3521e4cd20 100644 --- a/tests/system/large/test_location.py +++ b/tests/system/large/test_location.py @@ -101,7 +101,7 @@ def test_bq_location_non_canonical(set_location, resolved_location): context=bigframes.BigQueryOptions(location=set_location) ) - assert session.bqclient.location == set_location + assert session.bqclient.location == resolved_location # by default global endpoint is used assert ( diff --git a/tests/unit/_config/test_bigquery_options.py b/tests/unit/_config/test_bigquery_options.py index f40c140a9e..d04b5bd575 100644 --- a/tests/unit/_config/test_bigquery_options.py +++ b/tests/unit/_config/test_bigquery_options.py @@ -97,17 +97,26 @@ def test_setter_if_session_started_but_setting_the_same_value(attribute): ], ) def test_location_set_to_valid_no_warning(valid_location): - options = bigquery_options.BigQueryOptions() - # Ensure that no warnings are emitted. - # https://docs.pytest.org/en/7.0.x/how-to/capture-warnings.html#additional-use-cases-of-warnings-in-tests - with warnings.catch_warnings(): - # Turn matching UnknownLocationWarning into exceptions. - # https://docs.python.org/3/library/warnings.html#warning-filter - warnings.simplefilter( - "error", category=bigframes.exceptions.UnknownLocationWarning - ) + # test setting location through constructor + def set_location_in_ctor(): + bigquery_options.BigQueryOptions(location=valid_location) + + # test setting location property + def set_location_property(): + options = bigquery_options.BigQueryOptions() options.location = valid_location + for op in [set_location_in_ctor, set_location_property]: + # Ensure that no warnings are emitted. + # https://docs.pytest.org/en/7.0.x/how-to/capture-warnings.html#additional-use-cases-of-warnings-in-tests + with warnings.catch_warnings(): + # Turn matching UnknownLocationWarning into exceptions. + # https://docs.python.org/3/library/warnings.html#warning-filter + warnings.simplefilter( + "error", category=bigframes.exceptions.UnknownLocationWarning + ) + op() + @pytest.mark.parametrize( [ @@ -126,11 +135,20 @@ def test_location_set_to_valid_no_warning(valid_location): ], ) def test_location_set_to_invalid_warning(invalid_location, possibility): - options = bigquery_options.BigQueryOptions() - with pytest.warns( - bigframes.exceptions.UnknownLocationWarning, - match=re.escape( - f"The location '{invalid_location}' is set to an unknown value. Did you mean '{possibility}'?" - ), - ): + # test setting location through constructor + def set_location_in_ctor(): + bigquery_options.BigQueryOptions(location=invalid_location) + + # test setting location property + def set_location_property(): + options = bigquery_options.BigQueryOptions() options.location = invalid_location + + for op in [set_location_in_ctor, set_location_property]: + with pytest.warns( + bigframes.exceptions.UnknownLocationWarning, + match=re.escape( + f"The location '{invalid_location}' is set to an unknown value. Did you mean '{possibility}'?" + ), + ): + op() From 650d80d1ad90927068cdb71efbfc548b416641a6 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 7 Oct 2024 17:34:28 -0700 Subject: [PATCH 4/8] feat: update LLM generators to warn user about model name instead of raising error. (#1048) * feat: update LLM generators to warn user about model name instead of raise error. * update message and format * update message and format --- bigframes/ml/llm.py | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index c12da01b54..3920da6c71 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -83,6 +83,13 @@ _ML_EMBED_TEXT_STATUS = "ml_embed_text_status" _ML_GENERATE_EMBEDDING_STATUS = "ml_generate_embedding_status" +_MODEL_NOT_SUPPORTED_WARNING = ( + "Model name '{model_name}' is not supported. " + "We are currently aware of the following models: {known_models}. " + "However, model names can change, and the supported models may be outdated. " + "You should use this model name only if you are sure that it is supported in BigQuery." +) + @typing_extensions.deprecated( "PaLM2TextGenerator is going to be deprecated. Use GeminiTextGenerator(https://cloud.google.com/python/docs/reference/bigframes/latest/bigframes.ml.llm.GeminiTextGenerator) instead. ", @@ -154,8 +161,11 @@ def _create_bqml_model(self): ) if self.model_name not in _TEXT_GENERATOR_ENDPOINTS: - raise ValueError( - f"Model name {self.model_name} is not supported. We only support {', '.join(_TEXT_GENERATOR_ENDPOINTS)}." + warnings.warn( + _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_TEXT_GENERATOR_ENDPOINTS), + ) ) options = { @@ -484,8 +494,11 @@ def _create_bqml_model(self): ) if self.model_name not in _PALM2_EMBEDDING_GENERATOR_ENDPOINTS: - raise ValueError( - f"Model name {self.model_name} is not supported. We only support {', '.join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS)}." + warnings.warn( + _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS), + ) ) endpoint = ( @@ -644,8 +657,11 @@ def _create_bqml_model(self): ) if self.model_name not in _TEXT_EMBEDDING_ENDPOINTS: - raise ValueError( - f"Model name {self.model_name} is not supported. We only support {', '.join(_TEXT_EMBEDDING_ENDPOINTS)}." + warnings.warn( + _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_TEXT_EMBEDDING_ENDPOINTS), + ) ) options = { @@ -801,8 +817,11 @@ def _create_bqml_model(self): ) if self.model_name not in _GEMINI_ENDPOINTS: - raise ValueError( - f"Model name {self.model_name} is not supported. We only support {', '.join(_GEMINI_ENDPOINTS)}." + warnings.warn( + _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_GEMINI_ENDPOINTS), + ) ) options = {"endpoint": self.model_name} @@ -1118,8 +1137,11 @@ def _create_bqml_model(self): ) if self.model_name not in _CLAUDE_3_ENDPOINTS: - raise ValueError( - f"Model name {self.model_name} is not supported. We only support {', '.join(_CLAUDE_3_ENDPOINTS)}." + warnings.warn( + _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_CLAUDE_3_ENDPOINTS), + ) ) options = { From b53607015abb79be0aa5666681f1c53b5b1bc2b5 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Mon, 7 Oct 2024 18:14:01 -0700 Subject: [PATCH 5/8] fix: correct zero row count in DataFrame from table view (#1062) * fix: correct zero row count display in DataFrame from table view * update logic and test --- bigframes/core/nodes.py | 4 +++- tests/system/small/test_dataframe.py | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index e65040686e..d750ee63fb 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -508,6 +508,7 @@ class GbqTable: table_id: str = field() physical_schema: Tuple[bq.SchemaField, ...] = field() n_rows: int = field() + is_physical_table: bool = field() cluster_cols: typing.Optional[Tuple[str, ...]] @staticmethod @@ -523,6 +524,7 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable: table_id=table.table_id, physical_schema=schema, n_rows=table.num_rows, + is_physical_table=(table.table_type == "TABLE"), cluster_cols=None if table.clustering_fields is None else tuple(table.clustering_fields), @@ -603,7 +605,7 @@ def variables_introduced(self) -> int: @property def row_count(self) -> typing.Optional[int]: - if self.source.sql_predicate is None: + if self.source.sql_predicate is None and self.source.table.is_physical_table: return self.source.table.n_rows return None diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index cfd6efe9bd..6ee9fb8247 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1521,6 +1521,26 @@ def test_shape(scalars_dfs): assert bf_result == pd_result +@pytest.mark.parametrize( + "reference_table, test_table", + [ + ( + "bigframes-dev.bigframes_tests_sys.base_table", + "bigframes-dev.bigframes_tests_sys.base_table_view", + ), + ( + "bigframes-dev.bigframes_tests_sys.csv_native_table", + "bigframes-dev.bigframes_tests_sys.csv_external_table", + ), + ], +) +def test_view_and_external_table_shape(session, reference_table, test_table): + reference_df = session.read_gbq(reference_table) + test_df = session.read_gbq(test_table) + + assert test_df.shape == reference_df.shape + + def test_len(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_result = len(scalars_df) From 575a10a7ba0fbac76867f02da1dd65355f00d7aa Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 7 Oct 2024 21:38:01 -0700 Subject: [PATCH 6/8] fix: remove palm2 test case from llm load test (#1063) --- tests/system/load/test_llm.py | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index 51b45485ad..4b0f50973b 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -38,30 +38,6 @@ def llm_remote_text_df(session, llm_remote_text_pandas_df): return session.read_pandas(llm_remote_text_pandas_df) -@pytest.mark.flaky(retries=2) -def test_llm_palm_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_df): - model = llm.PaLM2TextGenerator(model_name="text-bison", max_iterations=1) - - X_train = llm_fine_tune_df_default_index[["prompt"]] - y_train = llm_fine_tune_df_default_index[["label"]] - model.fit(X_train, y_train) - - assert model is not None - - df = model.predict(llm_remote_text_df["prompt"]).to_pandas() - utils.check_pandas_df_schema_and_index( - df, - columns=[ - "ml_generate_text_llm_result", - "ml_generate_text_rai_result", - "ml_generate_text_status", - "prompt", - ], - index=3, - ) - # TODO(ashleyxu b/335492787): After bqml rolled out version control: save, load, check parameters to ensure configuration was kept - - @pytest.mark.flaky(retries=2) def test_llm_gemini_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_df): model = llm.GeminiTextGenerator(model_name="gemini-pro", max_iterations=1) From d1b87e205d1d7c290ca2d499da1c03fe321c89f7 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Mon, 7 Oct 2024 22:39:30 -0700 Subject: [PATCH 7/8] chore: Implement Semantics agg (#1059) * chore: Implement Semantics agg * fix tests * address comments --- bigframes/operations/semantics.py | 268 +++++++++++++++--- .../experimental/semantic_operators.ipynb | 221 ++++++++++++++- .../system/large/operations/test_semantics.py | 100 +++++++ 3 files changed, 540 insertions(+), 49 deletions(-) diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index cb24633ed5..0df7041914 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -18,6 +18,8 @@ from typing import List, Optional import bigframes +import bigframes.core.guid +import bigframes.dtypes as dtypes class Semantics: @@ -27,6 +29,171 @@ def __init__(self, df) -> None: self._df = df + def agg( + self, + instruction: str, + model, + cluster_column: typing.Optional[str] = None, + max_agg_rows: int = 10, + ): + """ + Performs an aggregation over all rows of the table. + + This method recursively aggregates the input data to produce partial answers + in parallel, until a single answer remains. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> bpd.options.experiments.semantic_operators = True + + >>> import bigframes.ml.llm as llm + >>> model = llm.GeminiTextGenerator(model_name="gemini-1.5-flash-001") + + >>> df = bpd.DataFrame( + ... { + ... "Movies": [ + ... "Titanic", + ... "The Wolf of Wall Street", + ... "Inception", + ... ], + ... "Year": [1997, 2013, 2010], + ... }) + >>> df.semantics.agg( + ... "Find the first name shared by all actors in {Movies}. One word answer.", + ... model=model, + ... ) + 0 Leonardo + + Name: Movies, dtype: string + + Args: + instruction (str): + An instruction on how to map the data. This value must contain + column references by name enclosed in braces. + For example, to reference a column named "movies", use "{movies}" in the + instruction, like: "Find actor names shared by all {movies}." + + model (bigframes.ml.llm.GeminiTextGenerator): + A GeminiTextGenerator provided by the Bigframes ML package. + + cluster_column (Optional[str], default None): + If set, aggregates each cluster before performing aggregations across + clusters. Clustering based on semantic similarity can improve accuracy + of the sementic aggregations. + + max_agg_rows (int, default 10): + The maxinum number of rows to be aggregated at a time. + + Returns: + bigframes.dataframe.DataFrame: A new DataFrame with the aggregated answers. + + Raises: + NotImplementedError: when the semantic operator experiment is off. + ValueError: when the instruction refers to a non-existing column, or when + more than one columns are referred to. + """ + self._validate_model(model) + + columns = self._parse_columns(instruction) + for column in columns: + if column not in self._df.columns: + raise ValueError(f"Column {column} not found.") + if len(columns) > 1: + raise NotImplementedError( + "Semantic aggregations are limited to a single column." + ) + column = columns[0] + + if max_agg_rows <= 1: + raise ValueError( + f"Invalid value for `max_agg_rows`: {max_agg_rows}." + "It must be greater than 1." + ) + + import bigframes.bigquery as bbq + import bigframes.dataframe + import bigframes.series + + df: bigframes.dataframe.DataFrame = self._df.copy() + user_instruction = self._format_instruction(instruction, columns) + + num_cluster = 1 + if cluster_column is not None: + if cluster_column not in df.columns: + raise ValueError(f"Cluster column `{cluster_column}` not found.") + + if df[cluster_column].dtype != dtypes.INT_DTYPE: + raise TypeError( + "Cluster column must be an integer type, not " + f"{type(df[cluster_column])}" + ) + + num_cluster = len(df[cluster_column].unique()) + df = df.sort_values(cluster_column) + else: + cluster_column = bigframes.core.guid.generate_guid("pid") + df[cluster_column] = 0 + + aggregation_group_id = bigframes.core.guid.generate_guid("agg") + group_row_index = bigframes.core.guid.generate_guid("gid") + llm_prompt = bigframes.core.guid.generate_guid("prompt") + df = ( + df.reset_index(drop=True) + .reset_index() + .rename(columns={"index": aggregation_group_id}) + ) + + output_instruction = ( + "Answer user instructions using the provided context from various sources. " + "Combine all relevant information into a single, concise, well-structured response. " + f"Instruction: {user_instruction}.\n\n" + ) + + while len(df) > 1: + df[group_row_index] = (df[aggregation_group_id] % max_agg_rows + 1).astype( + dtypes.STRING_DTYPE + ) + df[aggregation_group_id] = (df[aggregation_group_id] / max_agg_rows).astype( + dtypes.INT_DTYPE + ) + df[llm_prompt] = "\t\nSource #" + df[group_row_index] + ": " + df[column] + + if len(df) > num_cluster: + # Aggregate within each partition + agg_df = bbq.array_agg( + df.groupby(by=[cluster_column, aggregation_group_id]) + ) + else: + # Aggregate cross partitions + agg_df = bbq.array_agg(df.groupby(by=[aggregation_group_id])) + agg_df[cluster_column] = agg_df[cluster_column].list[0] + + # Skip if the aggregated group only has a single item + single_row_df: bigframes.series.Series = bbq.array_to_string( + agg_df[agg_df[group_row_index].list.len() <= 1][column], + delimiter="", + ) + prompt_s: bigframes.series.Series = bbq.array_to_string( + agg_df[agg_df[group_row_index].list.len() > 1][llm_prompt], + delimiter="", + ) + prompt_s = output_instruction + prompt_s # type:ignore + + # Run model + predict_df = typing.cast( + bigframes.dataframe.DataFrame, model.predict(prompt_s) + ) + agg_df[column] = predict_df["ml_generate_text_llm_result"].combine_first( + single_row_df + ) + + agg_df = agg_df.reset_index() + df = agg_df[[aggregation_group_id, cluster_column, column]] + + return df[column] + def filter(self, instruction: str, model): """ Filters the DataFrame with the semantics of the user instruction. @@ -35,9 +202,7 @@ def filter(self, instruction: str, model): >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - - >>> import bigframes - >>> bigframes.options.experiments.semantic_operators = True + >>> bpd.options.experiments.semantic_operators = True >>> import bigframes.ml.llm as llm >>> model = llm.GeminiTextGenerator(model_name="gemini-1.5-flash-001") @@ -68,14 +233,22 @@ def filter(self, instruction: str, model): ValueError: when the instruction refers to a non-existing column, or when no columns are referred to. """ - _validate_model(model) + self._validate_model(model) + columns = self._parse_columns(instruction) + for column in columns: + if column not in self._df.columns: + raise ValueError(f"Column {column} not found.") + user_instruction = self._format_instruction(instruction, columns) output_instruction = "Based on the provided context, reply to the following claim by only True or False:" from bigframes.dataframe import DataFrame results = typing.cast( - DataFrame, model.predict(self._make_prompt(instruction, output_instruction)) + DataFrame, + model.predict( + self._make_prompt(columns, user_instruction, output_instruction) + ), ) return self._df[ @@ -90,9 +263,7 @@ def map(self, instruction: str, output_column: str, model): >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - - >>> import bigframes - >>> bigframes.options.experiments.semantic_operators = True + >>> bpd.options.experiments.semantic_operators = True >>> import bigframes.ml.llm as llm >>> model = llm.GeminiTextGenerator(model_name="gemini-1.5-flash-001") @@ -129,8 +300,13 @@ def map(self, instruction: str, output_column: str, model): ValueError: when the instruction refers to a non-existing column, or when no columns are referred to. """ - _validate_model(model) + self._validate_model(model) + columns = self._parse_columns(instruction) + for column in columns: + if column not in self._df.columns: + raise ValueError(f"Column {column} not found.") + user_instruction = self._format_instruction(instruction, columns) output_instruction = ( "Based on the provided contenxt, answer the following instruction:" ) @@ -139,34 +315,15 @@ def map(self, instruction: str, output_column: str, model): results = typing.cast( Series, - model.predict(self._make_prompt(instruction, output_instruction))[ - "ml_generate_text_llm_result" - ], + model.predict( + self._make_prompt(columns, user_instruction, output_instruction) + )["ml_generate_text_llm_result"], ) from bigframes.core.reshape import concat return concat([self._df, results.rename(output_column)], axis=1) - def _make_prompt(self, user_instruction: str, output_instruction: str): - columns = _parse_columns(user_instruction) - - for column in columns: - if column not in self._df.columns: - raise ValueError(f"Column {column} not found.") - - # Replace column references with names. - user_instruction = user_instruction.format(**{col: col for col in columns}) - - prompt_df = self._df[columns].copy() - prompt_df["prompt"] = f"{output_instruction}\n{user_instruction}\nContext: " - - # Combine context from multiple columns. - for col in columns: - prompt_df["prompt"] += f"{col} is `" + prompt_df[col] + "`\n" - - return prompt_df["prompt"] - def join(self, other, instruction: str, model, max_rows: int = 1000): """ Joines two dataframes by applying the instruction over each pair of rows from @@ -176,9 +333,7 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - - >>> import bigframes - >>> bigframes.options.experiments.semantic_operators = True + >>> bpd.options.experiments.semantic_operators = True >>> import bigframes.ml.llm as llm >>> model = llm.GeminiTextGenerator(model_name="gemini-1.5-flash-001") @@ -221,7 +376,8 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): Raises: ValueError if the amount of data that will be sent for LLM processing is larger than max_rows. """ - _validate_model(model) + self._validate_model(model) + columns = self._parse_columns(instruction) joined_table_rows = len(self._df) * len(other) @@ -230,8 +386,6 @@ def join(self, other, instruction: str, model, max_rows: int = 1000): f"Number of rows that need processing is {joined_table_rows}, which exceeds row limit {max_rows}." ) - columns = _parse_columns(instruction) - left_columns = [] right_columns = [] @@ -373,18 +527,40 @@ def search( return typing.cast(bigframes.dataframe.DataFrame, search_result) + def _make_prompt( + self, columns: List[str], user_instruction: str, output_instruction: str + ): + prompt_df = self._df[columns].copy() + prompt_df["prompt"] = f"{output_instruction}\n{user_instruction}\nContext: " + + # Combine context from multiple columns. + for col in columns: + prompt_df["prompt"] += f"{col} is `" + prompt_df[col] + "`\n" + + return prompt_df["prompt"] -def _validate_model(model): - from bigframes.ml.llm import GeminiTextGenerator + def _parse_columns(self, instruction: str) -> List[str]: + """Extracts column names enclosed in curly braces from the user instruction. + For example, _parse_columns("{city} is in {continent}") == ["city", "continent"] + """ + columns = re.findall(r"(? List[str]: - columns = re.findall(r"(? str: + """Extracts column names enclosed in curly braces from the user instruction. + For example, `_format_instruction(["city", "continent"], "{city} is in {continent}") + == "city is in continent"` + """ + return instruction.format(**{col: col for col in columns}) - if not columns: - raise ValueError("No column references") + @staticmethod + def _validate_model(model): + from bigframes.ml.llm import GeminiTextGenerator - return columns + if not isinstance(model, GeminiTextGenerator): + raise ValueError("Model is not GeminiText Generator") diff --git a/notebooks/experimental/semantic_operators.ipynb b/notebooks/experimental/semantic_operators.ipynb index 96813159b2..4273ceb36b 100644 --- a/notebooks/experimental/semantic_operators.ipynb +++ b/notebooks/experimental/semantic_operators.ipynb @@ -33,7 +33,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "/usr/local/google/home/sycai/src/python-bigquery-dataframes/bigframes/_config/experiment_options.py:33: UserWarning: Semantic operators are still under experiments, and are subject to change in the future.\n", + "/usr/local/google/home/chelsealin/src/bigframes3/bigframes/_config/experiment_options.py:33: UserWarning: Semantic operators are still under experiments, and are subject to change in the future.\n", " warnings.warn(\n" ] } @@ -58,14 +58,14 @@ "name": "stderr", "output_type": "stream", "text": [ - "/usr/local/google/home/sycai/src/python-bigquery-dataframes/bigframes/pandas/__init__.py:559: DefaultLocationWarning: No explicit location is set, so using location US for the session.\n", + "/usr/local/google/home/chelsealin/src/bigframes3/bigframes/pandas/__init__.py:559: DefaultLocationWarning: No explicit location is set, so using location US for the session.\n", " return global_session.get_global_session()\n" ] }, { "data": { "text/html": [ - "Query job 3ab1bb82-0634-4a31-8ce1-0bac7c06887d is DONE. 0 Bytes processed. Open Job" + "Query job aef2dd7b-bdad-4dda-91be-867e8dac2613 is DONE. 0 Bytes processed. Open Job" ], "text/plain": [ "" @@ -941,6 +941,221 @@ "source": [ "df.semantics.search(\"creatures\", \"monkey\", top_k = 2, model = text_embedding_model, score_column='similarity score')" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Semantic Aggregation" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 9922a236-2597-48d3-9188-d859d31042e4 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
MoviesYear
0Titanic1997
1The Wolf of Wall Street2013
2Killers of the Flower Moon2023
3The Revenant2015
4Inception2010
5Shuttle Island2010
6The Great Gatsby2013
\n", + "

7 rows × 2 columns

\n", + "
[7 rows x 2 columns in total]" + ], + "text/plain": [ + " Movies Year\n", + "0 Titanic 1997\n", + "1 The Wolf of Wall Street 2013\n", + "2 Killers of the Flower Moon 2023\n", + "3 The Revenant 2015\n", + "4 Inception 2010\n", + "5 Shuttle Island 2010\n", + "6 The Great Gatsby 2013\n", + "\n", + "[7 rows x 2 columns]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = bpd.DataFrame({\n", + " \"Movies\": [\n", + " \"Titanic\",\n", + " \"The Wolf of Wall Street\",\n", + " \"Killers of the Flower Moon\",\n", + " \"The Revenant\",\n", + " \"Inception\",\n", + " \"Shuttle Island\",\n", + " \"The Great Gatsby\",\n", + " ],\n", + " \"Year\": [1997, 2013, 2023, 2015, 2010, 2010, 2013],\n", + "})\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 2b50b8d0-8cf6-4930-8219-2a25ca2b0285 is DONE. 0 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/usr/local/google/home/chelsealin/src/bigframes3/bigframes/core/__init__.py:112: PreviewWarning: Interpreting JSON column(s) as StringDtype. This behavior may change in future versions.\n", + " warnings.warn(\n" + ] + }, + { + "data": { + "text/html": [ + "Query job e57370cb-1cb7-4ceb-b084-ac45dd0149dd is DONE. 2 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job 7b6ab8a2-37f8-4206-a5c4-da84d45d94b2 is DONE. 16 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job e1b92cd9-cdda-4d1e-8fb4-5b83183eb26b is DONE. 28 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Query job 643a742a-5cbf-4c3d-bd94-dd2211cb685c is DONE. 28 Bytes processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "0 Leonardo \n", + "\n", + "Name: Movies, dtype: string" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "agg_df = df.semantics.agg(\"Find the shared first name of actors in {Movies}. One word answer.\", model=gemini_model)\n", + "agg_df" + ] } ], "metadata": { diff --git a/tests/system/large/operations/test_semantics.py b/tests/system/large/operations/test_semantics.py index f0e13640e0..f66f2bc029 100644 --- a/tests/system/large/operations/test_semantics.py +++ b/tests/system/large/operations/test_semantics.py @@ -18,6 +18,7 @@ import bigframes import bigframes.dataframe as dataframe +import bigframes.dtypes as dtypes def test_semantics_experiment_off_raise_error(): @@ -30,6 +31,105 @@ def test_semantics_experiment_off_raise_error(): df.semantics +@pytest.mark.parametrize( + ("max_agg_rows", "cluster_column"), + [ + pytest.param(1, None, id="one", marks=pytest.mark.xfail(raises=ValueError)), + pytest.param(2, None, id="two"), + pytest.param(3, None, id="three"), + pytest.param(4, None, id="four"), + pytest.param(5, "Year", id="two_w_cluster_column"), + pytest.param(6, "Year", id="three_w_cluster_column"), + pytest.param(7, "Year", id="four_w_cluster_column"), + ], +) +def test_agg_w_max_agg_rows(session, gemini_flash_model, max_agg_rows, cluster_column): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + data={ + "Movies": [ + "Titanic", + "The Wolf of Wall Street", + "Killers of the Flower Moon", + "The Revenant", + "Inception", + "Shuttle Island", + "The Great Gatsby", + ], + "Year": [1997, 2013, 2023, 2015, 2010, 2010, 2013], + }, + session=session, + ) + instruction = "Find the shared first name of actors in {Movies}. One word answer." + actual_s = df.semantics.agg( + instruction, + model=gemini_flash_model, + max_agg_rows=max_agg_rows, + cluster_column=cluster_column, + ).to_pandas() + + expected_s = pd.Series(["Leonardo \n"], dtype=dtypes.STRING_DTYPE) + expected_s.name = "Movies" + pandas.testing.assert_series_equal(actual_s, expected_s, check_index_type=False) + + +@pytest.mark.parametrize( + "instruction", + [ + pytest.param( + "No column reference", + id="zero_column", + marks=pytest.mark.xfail(raises=ValueError), + ), + pytest.param( + "{city} is in the {non_existing_column}", + id="non_existing_column", + marks=pytest.mark.xfail(raises=ValueError), + ), + pytest.param( + "{city} is in the {country}", + id="two_columns", + marks=pytest.mark.xfail(raises=NotImplementedError), + ), + ], +) +def test_agg_invalid_instruction_raise_error(instruction, gemini_flash_model): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + {"country": ["USA", "Germany"], "city": ["Seattle", "Berlin"]} + ) + df.semantics.agg(instruction, gemini_flash_model) + + +@pytest.mark.parametrize( + "cluster_column", + [ + pytest.param( + "non_existing_column", + id="non_existing_column", + marks=pytest.mark.xfail(raises=ValueError), + ), + pytest.param( + "Movies", id="non_int_column", marks=pytest.mark.xfail(raises=TypeError) + ), + ], +) +def test_agg_invalid_cluster_column_raise_error(gemini_flash_model, cluster_column): + bigframes.options.experiments.semantic_operators = True + df = dataframe.DataFrame( + data={ + "Movies": [ + "Titanic", + "The Wolf of Wall Street", + "Killers of the Flower Moon", + "The Revenant", + ], + }, + ) + instruction = "Find the shared first name of actors in {Movies}. One word answer." + df.semantics.agg(instruction, gemini_flash_model, cluster_column=cluster_column) + + def test_filter(session, gemini_flash_model): bigframes.options.experiments.semantic_operators = True df = dataframe.DataFrame( From 4379438fc4f44ea847fd2c00a82af544265a30d2 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Tue, 8 Oct 2024 12:05:21 -0700 Subject: [PATCH 8/8] perf: Speedup internal tree comparisons (#1060) --- bigframes/core/nodes.py | 117 +++++++++++++++------------------------- 1 file changed, 42 insertions(+), 75 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index d750ee63fb..c152aabf29 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -51,8 +51,8 @@ class Field: dtype: bigframes.dtypes.Dtype -@dataclass(frozen=True) -class BigFrameNode: +@dataclass(eq=False, frozen=True) +class BigFrameNode(abc.ABC): """ Immutable node for representing 2D typed array as a tree of operators. @@ -95,12 +95,30 @@ def session(self): return sessions[0] return None + def _as_tuple(self) -> Tuple: + """Get all fields as tuple.""" + return tuple(getattr(self, field.name) for field in fields(self)) + + def __hash__(self) -> int: + # Custom hash that uses cache to avoid costly recomputation + return self._cached_hash + + def __eq__(self, other) -> bool: + # Custom eq that tries to short-circuit full structural comparison + if not isinstance(other, self.__class__): + return False + if self is other: + return True + if hash(self) != hash(other): + return False + return self._as_tuple() == other._as_tuple() + # BigFrameNode trees can be very deep so its important avoid recalculating the hash from scratch # Each subclass of BigFrameNode should use this property to implement __hash__ # The default dataclass-generated __hash__ method is not cached @functools.cached_property - def _node_hash(self): - return hash(tuple(hash(getattr(self, field.name)) for field in fields(self))) + def _cached_hash(self): + return hash(self._as_tuple()) @property def roots(self) -> typing.Set[BigFrameNode]: @@ -226,7 +244,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return self.transform_children(lambda x: x.prune(used_cols)) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class UnaryNode(BigFrameNode): child: BigFrameNode @@ -252,7 +270,7 @@ def order_ambiguous(self) -> bool: return self.child.order_ambiguous -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class JoinNode(BigFrameNode): left_child: BigFrameNode right_child: BigFrameNode @@ -285,9 +303,6 @@ def explicitly_ordered(self) -> bool: # Do not consider user pre-join ordering intent - they need to re-order post-join in unordered mode. return False - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: return tuple(itertools.chain(self.left_child.fields, self.right_child.fields)) @@ -320,7 +335,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return self.transform_children(lambda x: x.prune(new_used)) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ConcatNode(BigFrameNode): # TODO: Explcitly map column ids from each child children: Tuple[BigFrameNode, ...] @@ -345,9 +360,6 @@ def explicitly_ordered(self) -> bool: # Consider concat as an ordered operations (even though input frames may not be ordered) return True - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: # TODO: Output names should probably be aligned beforehand or be part of concat definition @@ -371,16 +383,13 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return self -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class FromRangeNode(BigFrameNode): # TODO: Enforce single-row, single column constraint start: BigFrameNode end: BigFrameNode step: int - def __hash__(self): - return self._node_hash - @property def roots(self) -> typing.Set[BigFrameNode]: return {self} @@ -419,7 +428,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # Input Nodex # TODO: Most leaf nodes produce fixed column names based on the datasource # They should support renaming -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class LeafNode(BigFrameNode): @property def roots(self) -> typing.Set[BigFrameNode]: @@ -451,7 +460,7 @@ class ScanList: items: typing.Tuple[ScanItem, ...] -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ReadLocalNode(LeafNode): feather_bytes: bytes data_schema: schemata.ArraySchema @@ -460,9 +469,6 @@ class ReadLocalNode(LeafNode): scan_list: ScanList session: typing.Optional[bigframes.session.Session] = None - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: return tuple(Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) @@ -547,7 +553,7 @@ class BigqueryDataSource: ## Put ordering in here or just add order_by node above? -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ReadTableNode(LeafNode): source: BigqueryDataSource # Subset of physical schema column @@ -570,9 +576,6 @@ def __post_init__(self): def session(self): return self.table_session - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: return tuple(Field(col_id, dtype) for col_id, dtype, _ in self.scan_list.items) @@ -616,15 +619,12 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return ReadTableNode(self.source, new_scan_list, self.table_session) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class CachedTableNode(ReadTableNode): # The original BFET subtree that was cached # note: this isn't a "child" node. original_node: BigFrameNode = field() - def __hash__(self): - return self._node_hash - def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: new_scan_list = ScanList( tuple(item for item in self.scan_list.items if item.id in used_cols) @@ -635,13 +635,10 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # Unary nodes -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class PromoteOffsetsNode(UnaryNode): col_id: bigframes.core.identifiers.ColumnId - def __hash__(self): - return self._node_hash - @property def non_local(self) -> bool: return True @@ -666,7 +663,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return self.transform_children(lambda x: x.prune(new_used)) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class FilterNode(UnaryNode): predicate: ex.Expression @@ -674,9 +671,6 @@ class FilterNode(UnaryNode): def row_preserving(self) -> bool: return False - def __hash__(self): - return self._node_hash - @property def variables_introduced(self) -> int: return 1 @@ -687,13 +681,10 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return FilterNode(pruned_child, self.predicate) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class OrderByNode(UnaryNode): by: Tuple[OrderingExpression, ...] - def __hash__(self): - return self._node_hash - @property def variables_introduced(self) -> int: return 0 @@ -716,14 +707,11 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return OrderByNode(pruned_child, self.by) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ReversedNode(UnaryNode): # useless field to make sure has distinct hash reversed: bool = True - def __hash__(self): - return self._node_hash - @property def variables_introduced(self) -> int: return 0 @@ -734,15 +722,12 @@ def relation_ops_created(self) -> int: return 0 -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class SelectionNode(UnaryNode): input_output_pairs: typing.Tuple[ typing.Tuple[ex.DerefOp, bigframes.core.identifiers.ColumnId], ... ] - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: return tuple( @@ -772,7 +757,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return SelectionNode(pruned_child, pruned_selections) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ProjectionNode(UnaryNode): """Assigns new variables (without modifying existing ones)""" @@ -788,9 +773,6 @@ def __post_init__(self): # Cannot assign to existing variables - append only! assert all(name not in self.child.schema.names for _, name in self.assignments) - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: input_types = self.child._dtype_lookup @@ -819,7 +801,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # TODO: Merge RowCount into Aggregate Node? # Row count can be compute from table metadata sometimes, so it is a bit special. -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class RowCountNode(UnaryNode): @property def row_preserving(self) -> bool: @@ -842,7 +824,7 @@ def defines_namespace(self) -> bool: return True -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class AggregateNode(UnaryNode): aggregations: typing.Tuple[ typing.Tuple[ex.Aggregation, bigframes.core.identifiers.ColumnId], ... @@ -854,9 +836,6 @@ class AggregateNode(UnaryNode): def row_preserving(self) -> bool: return False - def __hash__(self): - return self._node_hash - @property def non_local(self) -> bool: return True @@ -904,7 +883,7 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: return AggregateNode(pruned_child, pruned_aggs, self.by_column_ids, self.dropna) -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class WindowOpNode(UnaryNode): column_name: ex.DerefOp op: agg_ops.UnaryWindowOp @@ -913,9 +892,6 @@ class WindowOpNode(UnaryNode): never_skip_nulls: bool = False skip_reproject_unsafe: bool = False - def __hash__(self): - return self._node_hash - @property def non_local(self) -> bool: return True @@ -945,11 +921,8 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # TODO: Remove this op -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ReprojectOpNode(UnaryNode): - def __hash__(self): - return self._node_hash - @property def variables_introduced(self) -> int: return 0 @@ -960,7 +933,7 @@ def relation_ops_created(self) -> int: return 0 -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class RandomSampleNode(UnaryNode): fraction: float @@ -972,16 +945,13 @@ def deterministic(self) -> bool: def row_preserving(self) -> bool: return False - def __hash__(self): - return self._node_hash - @property def variables_introduced(self) -> int: return 1 # TODO: Explode should create a new column instead of overriding the existing one -@dataclass(frozen=True) +@dataclass(frozen=True, eq=False) class ExplodeNode(UnaryNode): column_ids: typing.Tuple[ex.DerefOp, ...] @@ -989,9 +959,6 @@ class ExplodeNode(UnaryNode): def row_preserving(self) -> bool: return False - def __hash__(self): - return self._node_hash - @functools.cached_property def fields(self) -> Tuple[Field, ...]: return tuple(