Skip to content

Commit

Permalink
CI: Mark s3 tests parallel safe (#35895)
Browse files Browse the repository at this point in the history
Closes #35856
  • Loading branch information
TomAugspurger authored Aug 26, 2020
1 parent c74ed38 commit 9f3e429
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 30 deletions.
24 changes: 13 additions & 11 deletions pandas/tests/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ def feather_file(datapath):


@pytest.fixture
def s3so():
return dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
def s3so(worker_id):
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
return dict(client_kwargs={"endpoint_url": f"http://127.0.0.1:555{worker_id}/"})


@pytest.fixture(scope="module")
def s3_base():
@pytest.fixture(scope="session")
def s3_base(worker_id):
"""
Fixture for mocking S3 interaction.
Expand All @@ -61,11 +62,13 @@ def s3_base():
# Launching moto in server mode, i.e., as a separate process
# with an S3 endpoint on localhost

endpoint_uri = "http://127.0.0.1:5555/"
worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
endpoint_port = f"555{worker_id}"
endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"

# pipe to null to avoid logging in terminal
proc = subprocess.Popen(
shlex.split("moto_server s3 -p 5555"), stdout=subprocess.DEVNULL
shlex.split(f"moto_server s3 -p {endpoint_port}"), stdout=subprocess.DEVNULL
)

timeout = 5
Expand All @@ -79,7 +82,7 @@ def s3_base():
pass
timeout -= 0.1
time.sleep(0.1)
yield
yield endpoint_uri

proc.terminate()
proc.wait()
Expand Down Expand Up @@ -119,9 +122,8 @@ def add_tips_files(bucket_name):
cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)

bucket = "pandas-test"
endpoint_uri = "http://127.0.0.1:5555/"
conn = boto3.resource("s3", endpoint_url=endpoint_uri)
cli = boto3.client("s3", endpoint_url=endpoint_uri)
conn = boto3.resource("s3", endpoint_url=s3_base)
cli = boto3.client("s3", endpoint_url=s3_base)

try:
cli.create_bucket(Bucket=bucket)
Expand All @@ -143,7 +145,7 @@ def add_tips_files(bucket_name):
s3fs.S3FileSystem.clear_instance_cache()
yield conn

s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})

try:
s3.rm(bucket, recursive=True)
Expand Down
6 changes: 2 additions & 4 deletions pandas/tests/io/json/test_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_read_zipped_json(datapath):


@td.skip_if_not_us_locale
def test_with_s3_url(compression, s3_resource):
def test_with_s3_url(compression, s3_resource, s3so):
# Bucket "pandas-test" created in tests/io/conftest.py

df = pd.read_json('{"a": [1, 2, 3], "b": [4, 5, 6]}')
Expand All @@ -45,9 +45,7 @@ def test_with_s3_url(compression, s3_resource):
s3_resource.Bucket("pandas-test").put_object(Key="test-1", Body=f)

roundtripped_df = pd.read_json(
"s3://pandas-test/test-1",
compression=compression,
storage_options=dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}),
"s3://pandas-test/test-1", compression=compression, storage_options=s3so,
)
tm.assert_frame_equal(df, roundtripped_df)

Expand Down
7 changes: 2 additions & 5 deletions pandas/tests/io/json/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,17 +1702,14 @@ def test_json_multiindex(self, dataframe, expected):
result = series.to_json(orient="index")
assert result == expected

def test_to_s3(self, s3_resource):
def test_to_s3(self, s3_resource, s3so):
import time

# GH 28375
mock_bucket_name, target_file = "pandas-test", "test.json"
df = DataFrame({"x": [1, 2, 3], "y": [2, 4, 6]})
df.to_json(
f"s3://{mock_bucket_name}/{target_file}",
storage_options=dict(
client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"}
),
f"s3://{mock_bucket_name}/{target_file}", storage_options=s3so,
)
timeout = 5
while True:
Expand Down
34 changes: 24 additions & 10 deletions pandas/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ def check_round_trip(
"""
write_kwargs = write_kwargs or {"compression": None}
read_kwargs = read_kwargs or {}
if isinstance(path, str) and "s3://" in path:
s3so = dict(client_kwargs={"endpoint_url": "http://127.0.0.1:5555/"})
read_kwargs["storage_options"] = s3so
write_kwargs["storage_options"] = s3so

if expected is None:
expected = df
Expand Down Expand Up @@ -555,15 +551,24 @@ def test_s3_roundtrip_explicit_fs(self, df_compat, s3_resource, pa, s3so):
write_kwargs=kw,
)

def test_s3_roundtrip(self, df_compat, s3_resource, pa):
def test_s3_roundtrip(self, df_compat, s3_resource, pa, s3so):
if LooseVersion(pyarrow.__version__) <= LooseVersion("0.17.0"):
pytest.skip()
# GH #19134
check_round_trip(df_compat, pa, path="s3://pandas-test/pyarrow.parquet")
s3so = dict(storage_options=s3so)
check_round_trip(
df_compat,
pa,
path="s3://pandas-test/pyarrow.parquet",
read_kwargs=s3so,
write_kwargs=s3so,
)

@td.skip_if_no("s3fs")
@pytest.mark.parametrize("partition_col", [["A"], []])
def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
def test_s3_roundtrip_for_dir(
self, df_compat, s3_resource, pa, partition_col, s3so
):
# GH #26388
expected_df = df_compat.copy()

Expand All @@ -587,7 +592,10 @@ def test_s3_roundtrip_for_dir(self, df_compat, s3_resource, pa, partition_col):
pa,
expected=expected_df,
path="s3://pandas-test/parquet_dir",
write_kwargs={"partition_cols": partition_col, "compression": None},
read_kwargs=dict(storage_options=s3so),
write_kwargs=dict(
partition_cols=partition_col, compression=None, storage_options=s3so
),
check_like=True,
repeat=1,
)
Expand Down Expand Up @@ -761,9 +769,15 @@ def test_filter_row_groups(self, fp):
result = read_parquet(path, fp, filters=[("a", "==", 0)])
assert len(result) == 1

def test_s3_roundtrip(self, df_compat, s3_resource, fp):
def test_s3_roundtrip(self, df_compat, s3_resource, fp, s3so):
# GH #19134
check_round_trip(df_compat, fp, path="s3://pandas-test/fastparquet.parquet")
check_round_trip(
df_compat,
fp,
path="s3://pandas-test/fastparquet.parquet",
read_kwargs=dict(storage_options=s3so),
write_kwargs=dict(compression=None, storage_options=s3so),
)

def test_partition_cols_supported(self, fp, df_full):
# GH #23283
Expand Down

0 comments on commit 9f3e429

Please sign in to comment.