From 9f3e4290a4591f1bfeefd44c59025526b16befa6 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 25 Aug 2020 21:21:40 -0500 Subject: [PATCH] CI: Mark s3 tests parallel safe (#35895) Closes https://github.com/pandas-dev/pandas/issues/35856 --- pandas/tests/io/conftest.py | 24 +++++++++-------- pandas/tests/io/json/test_compression.py | 6 ++--- pandas/tests/io/json/test_pandas.py | 7 ++--- pandas/tests/io/test_parquet.py | 34 +++++++++++++++++------- 4 files changed, 41 insertions(+), 30 deletions(-) diff --git a/pandas/tests/io/conftest.py b/pandas/tests/io/conftest.py index 518f31d73efa9..193baa8c3ed74 100644 --- a/pandas/tests/io/conftest.py +++ b/pandas/tests/io/conftest.py @@ -34,12 +34,13 @@ def feather_file(datapath): @pytest.fixture -def s3so(): - return dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) +def s3so(worker_id): + worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw") + return dict(client_kwargs={"endpoint_url": f"http://127.0.0.1:555{worker_id}/"}) -@pytest.fixture(scope="module") -def s3_base(): +@pytest.fixture(scope="session") +def s3_base(worker_id): """ Fixture for mocking S3 interaction. @@ -61,11 +62,13 @@ def s3_base(): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_uri = "http://127.0.0.1:5555/" + worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw") + endpoint_port = f"555{worker_id}" + endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" # pipe to null to avoid logging in terminal proc = subprocess.Popen( - shlex.split("moto_server s3 -p 5555"), stdout=subprocess.DEVNULL + shlex.split(f"moto_server s3 -p {endpoint_port}"), stdout=subprocess.DEVNULL ) timeout = 5 @@ -79,7 +82,7 @@ def s3_base(): pass timeout -= 0.1 time.sleep(0.1) - yield + yield endpoint_uri proc.terminate() proc.wait() @@ -119,9 +122,8 @@ def add_tips_files(bucket_name): cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f) bucket = "pandas-test" - endpoint_uri = "http://127.0.0.1:5555/" - conn = boto3.resource("s3", endpoint_url=endpoint_uri) - cli = boto3.client("s3", endpoint_url=endpoint_uri) + conn = boto3.resource("s3", endpoint_url=s3_base) + cli = boto3.client("s3", endpoint_url=s3_base) try: cli.create_bucket(Bucket=bucket) @@ -143,7 +145,7 @@ def add_tips_files(bucket_name): s3fs.S3FileSystem.clear_instance_cache() yield conn - s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) + s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base}) try: s3.rm(bucket, recursive=True) diff --git a/pandas/tests/io/json/test_compression.py b/pandas/tests/io/json/test_compression.py index 5bb205842269e..c0e3220454bf1 100644 --- a/pandas/tests/io/json/test_compression.py +++ b/pandas/tests/io/json/test_compression.py @@ -34,7 +34,7 @@ def test_read_zipped_json(datapath): @td.skip_if_not_us_locale -def test_with_s3_url(compression, s3_resource): +def test_with_s3_url(compression, s3_resource, s3so): # Bucket "pandas-test" created in tests/io/conftest.py df = pd.read_json('{"a": [1, 2, 3], "b": [4, 5, 6]}') @@ -45,9 +45,7 @@ def test_with_s3_url(compression, s3_resource): s3_resource.Bucket("pandas-test").put_object(Key="test-1", Body=f) roundtripped_df = pd.read_json( - "s3://pandas-test/test-1", - compression=compression, - storage_options=dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}), + "s3://pandas-test/test-1", compression=compression, storage_options=s3so, ) tm.assert_frame_equal(df, roundtripped_df) diff --git a/pandas/tests/io/json/test_pandas.py b/pandas/tests/io/json/test_pandas.py index 64a666079876f..2022abbaee323 100644 --- a/pandas/tests/io/json/test_pandas.py +++ b/pandas/tests/io/json/test_pandas.py @@ -1702,17 +1702,14 @@ def test_json_multiindex(self, dataframe, expected): result = series.to_json(orient="index") assert result == expected - def test_to_s3(self, s3_resource): + def test_to_s3(self, s3_resource, s3so): import time # GH 28375 mock_bucket_name, target_file = "pandas-test", "test.json" df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]}) df.to_json( - f"s3://{mock_bucket_name}/{target_file}", - storage_options=dict( - client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"} - ), + f"s3://{mock_bucket_name}/{target_file}", storage_options=s3so, ) timeout = 5 while True: diff --git a/pandas/tests/io/test_parquet.py b/pandas/tests/io/test_parquet.py index 4e0c16c71a6a8..15f9837176315 100644 --- a/pandas/tests/io/test_parquet.py +++ b/pandas/tests/io/test_parquet.py @@ -158,10 +158,6 @@ def check_round_trip( """ write_kwargs = write_kwargs or {"compression": None} read_kwargs = read_kwargs or {} - if isinstance(path, str) and "s3://" in path: - s3so = dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}) - read_kwargs["storage_options"] = s3so - write_kwargs["storage_options"] = s3so if expected is None: expected = df @@ -555,15 +551,24 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so): write_kwargs=kw, ) - def test_s3_roundtrip(self, df_compat, s3_resource, pa): + def test_s3_roundtrip(self, df_compat, s3_resource, pa, s3so): if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"): pytest.skip() # GH #19134 - check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet") + s3so = dict(storage_options=s3so) + check_round_trip( + df_compat, + pa, + path="s3://pandas-test/pyarrow.parquet", + read_kwargs=s3so, + write_kwargs=s3so, + ) @td.skip_if_no("s3fs") @pytest.mark.parametrize("partition_col", [["A"], []]) - def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): + def test_s3_roundtrip_for_dir( + self, df_compat, s3_resource, pa, partition_col, s3so + ): # GH #26388 expected_df = df_compat.copy() @@ -587,7 +592,10 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col): pa, expected=expected_df, path="s3://pandas-test/parquet_dir", - write_kwargs={"partition_cols": partition_col, "compression": None}, + read_kwargs=dict(storage_options=s3so), + write_kwargs=dict( + partition_cols=partition_col, compression=None, storage_options=s3so + ), check_like=True, repeat=1, ) @@ -761,9 +769,15 @@ def test_filter_row_groups(self, fp): result = read_parquet(path, fp, filters=[("a", "==", 0)]) assert len(result) == 1 - def test_s3_roundtrip(self, df_compat, s3_resource, fp): + def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so): # GH #19134 - check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet") + check_round_trip( + df_compat, + fp, + path="s3://pandas-test/fastparquet.parquet", + read_kwargs=dict(storage_options=s3so), + write_kwargs=dict(compression=None, storage_options=s3so), + ) def test_partition_cols_supported(self, fp, df_full): # GH #23283