Skip to content

Commit

Permalink
Add disallow deletion AdminFlag (#6518)
Browse files Browse the repository at this point in the history
* Add AdminFlagValue

* Add disallow deletion AdminFlag

See #3218

* Add disallow new upload AdminFlag

* Convert `AdminFlagValue` to enum
  • Loading branch information
calvin authored and ewdurbin committed Sep 12, 2019
1 parent 7212190 commit 30f2389
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 14 deletions.
6 changes: 4 additions & 2 deletions tests/unit/accounts/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
TokenMissing,
TooManyFailedLogins,
)
from warehouse.admin.flags import AdminFlag
from warehouse.admin.flags import AdminFlag, AdminFlagValue

from ...common.db.accounts import EmailFactory, UserFactory

Expand Down Expand Up @@ -903,7 +903,9 @@ def test_register_redirect(self, db_request, monkeypatch):

def test_register_fails_with_admin_flag_set(self, db_request):
# This flag was already set via migration, just need to enable it
flag = db_request.db.query(AdminFlag).get("disallow-new-user-registration")
flag = db_request.db.query(AdminFlag).get(
AdminFlagValue.DISALLOW_NEW_USER_REGISTRATION.value
)
flag.enabled = True

db_request.method = "POST"
Expand Down
11 changes: 9 additions & 2 deletions tests/unit/admin/test_flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import enum

from ...common.db.admin import AdminFlagFactory


class TestAdminFlagValues(enum.Enum):
NOT_A_REAL_FLAG = "not-a-real-flag"
THIS_FLAG_IS_ENABLED = "this-flag-is-enabled"


class TestAdminFlag:
def test_default(self, db_request):
assert not db_request.flags.enabled("not-a-real-flag")
assert not db_request.flags.enabled(TestAdminFlagValues.NOT_A_REAL_FLAG)

def test_enabled(self, db_request):
AdminFlagFactory(id="this-flag-is-enabled")

assert db_request.flags.enabled("this-flag-is-enabled")
assert db_request.flags.enabled(TestAdminFlagValues.THIS_FLAG_IS_ENABLED)
25 changes: 23 additions & 2 deletions tests/unit/forklift/test_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from wtforms.form import Form
from wtforms.validators import ValidationError

from warehouse.admin.flags import AdminFlag
from warehouse.admin.flags import AdminFlag, AdminFlagValue
from warehouse.admin.squats import Squat
from warehouse.classifiers.models import Classifier
from warehouse.forklift import legacy
Expand Down Expand Up @@ -753,6 +753,25 @@ def test_is_duplicate_false(self, pyramid_config, db_request):


class TestFileUpload:
def test_fails_disallow_new_upload(self, pyramid_config, pyramid_request):
pyramid_config.testing_securitypolicy(userid=1)
pyramid_request.flags = pretend.stub(
enabled=lambda value: value == AdminFlagValue.DISALLOW_NEW_UPLOAD
)
pyramid_request.help_url = pretend.call_recorder(lambda **kw: "/the/help/url/")
pyramid_request.user = pretend.stub(primary_email=pretend.stub(verified=True))

with pytest.raises(HTTPForbidden) as excinfo:
legacy.file_upload(pyramid_request)

resp = excinfo.value

assert resp.status_code == 403
assert resp.status == (
"403 New uploads are temporarily disabled. "
"See /the/help/url/ for details"
)

@pytest.mark.parametrize("version", ["2", "3", "-1", "0", "dog", "cat"])
def test_fails_invalid_version(self, pyramid_config, pyramid_request, version):
pyramid_config.testing_securitypolicy(userid=1)
Expand Down Expand Up @@ -1118,7 +1137,9 @@ def test_fails_with_stdlib_names(self, pyramid_config, db_request, name):
def test_fails_with_admin_flag_set(self, pyramid_config, db_request):
admin_flag = (
db_request.db.query(AdminFlag)
.filter(AdminFlag.id == "disallow-new-project-registration")
.filter(
AdminFlag.id == AdminFlagValue.DISALLOW_NEW_PROJECT_REGISTRATION.value
)
.first()
)
admin_flag.enabled = True
Expand Down
135 changes: 135 additions & 0 deletions tests/unit/manage/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import warehouse.utils.otp as otp

from warehouse.accounts.interfaces import IPasswordBreachedService, IUserService
from warehouse.admin.flags import AdminFlagValue
from warehouse.macaroons.interfaces import IMacaroonService
from warehouse.manage import views
from warehouse.packaging.models import (
Expand Down Expand Up @@ -2014,6 +2015,7 @@ def test_delete_project_no_confirm(self):
project = pretend.stub(normalized_name="foo")
request = pretend.stub(
POST={},
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
route_path=lambda *a, **kw: "/foo/bar/",
)
Expand All @@ -2023,6 +2025,9 @@ def test_delete_project_no_confirm(self):
assert exc.value.status_code == 303
assert exc.value.headers["Location"] == "/foo/bar/"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
Expand All @@ -2031,6 +2036,7 @@ def test_delete_project_wrong_confirm(self):
project = pretend.stub(normalized_name="foo")
request = pretend.stub(
POST={"confirm_project_name": "bar"},
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
route_path=lambda *a, **kw: "/foo/bar/",
)
Expand All @@ -2040,13 +2046,46 @@ def test_delete_project_wrong_confirm(self):
assert exc.value.status_code == 303
assert exc.value.headers["Location"] == "/foo/bar/"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call(
"Could not delete project - 'bar' is not the same as 'foo'",
queue="error",
)
]

def test_delete_project_disallow_deletion(self):
project = pretend.stub(name="foo", normalized_name="foo")
request = pretend.stub(
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)

result = views.delete_project(project, request)
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]

assert request.route_path.calls == [
pretend.call("manage.project.settings", project_name="foo")
]

def test_delete_project(self, db_request):
project = ProjectFactory.create(name="foo")

Expand Down Expand Up @@ -2159,6 +2198,7 @@ def test_manage_project_releases(self, db_request):
filename=f"foobar-{release.version}.tar.gz",
packagetype="sdist",
)
db_request.flags = pretend.stub(enabled=pretend.call_recorder(lambda *a: False))

assert views.manage_project_releases(project, db_request) == {
"project": project,
Expand All @@ -2182,6 +2222,48 @@ def test_manage_project_release(self):
"files": files,
}

def test_delete_project_release_disallow_deletion(self, monkeypatch):
release = pretend.stub(
version="1.2.3",
canonical_version="1.2.3",
project=pretend.stub(
name="foobar", record_event=pretend.call_recorder(lambda *a, **kw: None)
),
)
request = pretend.stub(
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
method="POST",
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
view = views.ManageProjectRelease(release, request)

result = view.delete_project_release()
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]

assert request.route_path.calls == [
pretend.call(
"manage.project.release",
project_name=release.project.name,
version=release.version,
)
]

def test_delete_project_release(self, monkeypatch):
release = pretend.stub(
version="1.2.3",
Expand All @@ -2197,6 +2279,7 @@ def test_delete_project_release(self, monkeypatch):
delete=pretend.call_recorder(lambda a: None),
add=pretend.call_recorder(lambda a: None),
),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
user=pretend.stub(username=pretend.stub()),
Expand All @@ -2215,6 +2298,9 @@ def test_delete_project_release(self, monkeypatch):

assert request.db.delete.calls == [pretend.call(release)]
assert request.db.add.calls == [pretend.call(journal_obj)]
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert journal_cls.calls == [
pretend.call(
name=release.project.name,
Expand Down Expand Up @@ -2247,6 +2333,7 @@ def test_delete_project_release_no_confirm(self):
POST={"confirm_version": ""},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2261,6 +2348,9 @@ def test_delete_project_release_no_confirm(self):
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.route_path.calls == [
pretend.call(
"manage.project.release",
Expand All @@ -2275,6 +2365,7 @@ def test_delete_project_release_bad_confirm(self):
POST={"confirm_version": "invalid"},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2301,6 +2392,42 @@ def test_delete_project_release_bad_confirm(self):
)
]

def test_delete_project_release_file_disallow_deletion(self):
release = pretend.stub(version="1.2.3", project=pretend.stub(name="foobar"))
request = pretend.stub(
method="POST",
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: True)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
view = views.ManageProjectRelease(release, request)

result = view.delete_project_release_file()

assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]

assert request.session.flash.calls == [
pretend.call(
(
"Project deletion temporarily disabled. "
"See https://pypi.org/help#admin-intervention for details."
),
queue="error",
)
]
assert request.route_path.calls == [
pretend.call(
"manage.project.release",
project_name=release.project.name,
version=release.version,
)
]

def test_delete_project_release_file(self, db_request):
user = UserFactory.create()

Expand Down Expand Up @@ -2359,6 +2486,7 @@ def test_delete_project_release_file_no_confirm(self):
POST={"confirm_project_name": ""},
method="POST",
db=pretend.stub(delete=pretend.call_recorder(lambda a: None)),
flags=pretend.stub(enabled=pretend.call_recorder(lambda *a: False)),
route_path=pretend.call_recorder(lambda *a, **kw: "/the-redirect"),
session=pretend.stub(flash=pretend.call_recorder(lambda *a, **kw: None)),
)
Expand All @@ -2370,6 +2498,9 @@ def test_delete_project_release_file_no_confirm(self):
assert result.headers["Location"] == "/the-redirect"

assert request.db.delete.calls == []
assert request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert request.session.flash.calls == [
pretend.call("Confirm the request", queue="error")
]
Expand All @@ -2396,6 +2527,7 @@ def no_result_found():
filter=lambda *a: pretend.stub(one=no_result_found)
),
)
db_request.flags = pretend.stub(enabled=pretend.call_recorder(lambda *a: False))
db_request.route_path = pretend.call_recorder(lambda *a, **kw: "/the-redirect")
db_request.session = pretend.stub(
flash=pretend.call_recorder(lambda *a, **kw: None)
Expand All @@ -2409,6 +2541,9 @@ def no_result_found():
assert result.headers["Location"] == "/the-redirect"

assert db_request.db.delete.calls == []
assert db_request.flags.enabled.calls == [
pretend.call(AdminFlagValue.DISALLOW_DELETION)
]
assert db_request.session.flash.calls == [
pretend.call("Could not find file", queue="error")
]
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from sqlalchemy.exc import OperationalError

from warehouse import db
from warehouse.admin.flags import AdminFlagValue
from warehouse.db import (
DEFAULT_ISOLATION,
DatabaseNotAvailable,
Expand Down Expand Up @@ -273,7 +274,7 @@ def test_create_session_read_only_mode(
)

assert _create_session(request) is session_obj
assert get.calls == [pretend.call("read-only")]
assert get.calls == [pretend.call(AdminFlagValue.READ_ONLY.value)]
assert request.tm.doom.calls == doom_calls


Expand Down
3 changes: 2 additions & 1 deletion warehouse/accounts/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
TooManyFailedLogins,
)
from warehouse.accounts.models import Email, User
from warehouse.admin.flags import AdminFlagValue
from warehouse.cache.origin import origin_cache
from warehouse.email import send_email_verification_email, send_password_reset_email
from warehouse.packaging.models import Project, Release
Expand Down Expand Up @@ -377,7 +378,7 @@ def register(request, _form_class=RegistrationForm):
if request.method == "POST" and request.POST.get("confirm_form"):
return HTTPSeeOther(request.route_path("index"))

if request.flags.enabled("disallow-new-user-registration"):
if request.flags.enabled(AdminFlagValue.DISALLOW_NEW_USER_REGISTRATION):
request.session.flash(
(
"New user registration temporarily disabled. "
Expand Down
Loading

0 comments on commit 30f2389

Please sign in to comment.