Skip to content

Commit

Permalink
Implement changes for python/cpython#103975
Browse files Browse the repository at this point in the history
  • Loading branch information
Gobot1234 committed May 5, 2023
1 parent 2292ca9 commit 4733287
Showing 1 changed file with 175 additions and 94 deletions.
269 changes: 175 additions & 94 deletions steam/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datetime import datetime
from io import BytesIO
from operator import attrgetter, methodcaller
from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast
from typing import TYPE_CHECKING, Any, Final, Literal, TypeGuard, cast, overload
from zipfile import BadZipFile, ZipFile
from zlib import crc32

Expand Down Expand Up @@ -45,6 +45,8 @@


if TYPE_CHECKING:
from _typeshed import StrPath

from .state import ConnectionState
from .types import manifest
from .types.vdf import VDFInt
Expand Down Expand Up @@ -177,6 +179,10 @@ def read_nowait(self, n: int = -1, /) -> bytes:
return content


def _manifest_parts(filename: str) -> list[str]:
return filename.rstrip("\x00 \n\t").split("\\")


class ManifestPath(PurePathBase, _IOMixin):
"""A :class:`pathlib.PurePath` subclass representing a binary file in a Manifest. This class is broadly compatible
with :class:`pathlib.Path`.
Expand All @@ -201,12 +207,68 @@ class ManifestPath(PurePathBase, _IOMixin):
_manifest: Manifest
_mapping: PayloadFileMapping

def __new__(cls, manifest: Manifest, mapping: PayloadFileMapping) -> Self:
# super().__new__ breaks
self: Self = super()._from_parts(mapping.filename.rstrip("\x00 \n\t").split("\\")) # type: ignore
self._mapping = mapping
self._manifest = manifest
return self
if sys.version_info < (3, 12):

def __new__(cls, *args: StrPath, manifest: Manifest, mapping: PayloadFileMapping | None = None) -> Self:
# super().__new__ breaks
self: Self = super()._from_parts(_manifest_parts(mapping.filename) if mapping is not None else args) # type: ignore
self._manifest = manifest
if mapping is not None:
self._mapping = mapping
return self

def with_segments(self, *args: StrPath) -> Self:
return self._select_from_manifest(self._from_parts(self.parts + tuple(map(os.fspath, args))))

def _select_from_manifest(self, new_self: Self) -> Self:
try:
# try and return the actual path if exists
return self._manifest._paths[new_self.parts]
except KeyError:
# else attach the manifest and return, this will not support most operations
new_self._manifest = self._manifest
return new_self

def _from_parts(self, args: tuple[str, ...]) -> Self:
new_self = super()._from_parts(args) # type: ignore
return self._select_from_manifest(new_self)

def _from_parsed_parts(self, drv: str, root: str, parts: tuple[str, ...]) -> Self:
new_self = super()._from_parsed_parts(drv, root, parts) # type: ignore
return self._select_from_manifest(new_self)

@property
def parents(self) -> tuple[Self, ...]:
"""A tuple of this path's logical parents."""
path = self
parent = self.parent
parents: list[Self] = []
while path != parent:
parents.append(parent)
path, parent = parent, parent.parent
return tuple(parents)

else:

def __init__(
self,
*args: StrPath,
manifest: Manifest,
mapping: PayloadFileMapping | None = None,
):
super().__init__(*_manifest_parts(mapping.filename) if mapping is not None else args)
self._manifest = manifest
if mapping is not None:
self._mapping = mapping

def with_segments(self, *args: StrPath) -> Self:
new_self = self.__class__(*args, manifest=self._manifest)
try:
# try and return the actual path if exists
return self._manifest._paths[new_self.parts]
except KeyError:
# else attach the manifest and return, this will not support most operations
return new_self

def __repr__(self) -> str:
return f"<{self.__class__.__name__} {str(self)!r}>"
Expand All @@ -215,36 +277,8 @@ def __repr__(self) -> str:

def __getattr__(self, name: str) -> Never:
if name in self.__annotations__: # give a more helpful error
raise AttributeError("Attempting operations on a non-existent file")
raise AttributeError(f"{self.__class__.__name__!r} object has no attribute {name!r}")

def _select_from_manifest(self, new_self: Self) -> Self:
try:
# try and return the actual path if exists
return self._manifest._paths[new_self.parts]
except KeyError:
# else attach the manifest and return, this will not support most operations
new_self._manifest = self._manifest
return new_self

def _from_parts(self, args: tuple[str, ...]) -> Self:
new_self = super()._from_parts(args) # type: ignore
return self._select_from_manifest(new_self)

def _from_parsed_parts(self, drv: str, root: str, parts: tuple[str, ...]) -> Self:
new_self = super()._from_parsed_parts(drv, root, parts) # type: ignore
return self._select_from_manifest(new_self)

@property
def parents(self) -> tuple[Self, ...]:
"""A tuple of this path's logical parents."""
path = self
parent = self.parent
parents: list[Self] = []
while path != parent:
parents.append(parent)
path, parent = parent, parent.parent
return tuple(parents)
raise ValueError("Attempting operations on a non-existent file")
raise AttributeError(f"{self.__class__.__name__!r} object has no attribute {name!r}", name=name, obj=self)

@property
def size(self) -> int:
Expand Down Expand Up @@ -302,9 +336,66 @@ def readlink(self) -> Self:
if not self.is_symlink():
raise OSError(errno.EINVAL, os.strerror(errno.EINVAL), str(self))

link_parts = tuple(self._mapping.linktarget.rstrip("\x00 \n\t").split("\\"))
link_parts = tuple(_manifest_parts(self._mapping.linktarget))
return self._manifest._paths[link_parts]

@overload
def resolve(self, *, strict: bool = False) -> Self: # type: ignore
...

def resolve(self, *, strict: bool = False, _follow_symlinks: bool = True) -> Self:
"""Return the canonical path of the symbolic link, eliminating any symbolic links encountered in the path.
Similar to :meth:`pathlib.Path.resolve`
Parameters
----------
strict
Whether to raise an error if a path doesn't exist.
Raises
------
FileNotFoundError
If ``strict`` is ``True`` and the path doesn't exist.
RuntimeError
If a recursive path is detected.
"""
new_parts: list[str] = []
seen = set[tuple[str, ...]]()
idx = 0
raw_parts = list(self.parts)
if not raw_parts:
raise RuntimeError("Cannot resolve empty path")

for part in raw_parts:
match part:
case "." | "":
idx += 1
continue
case "..":
raw_parts.insert(idx + 1, raw_parts[idx - 1])
idx += 1
continue
new_parts.append(part)

path = self.with_segments(*new_parts)

if not hasattr(path, "_mapping"):
if strict:
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), str(self))
elif path.is_symlink() and _follow_symlinks:
new_parts = _manifest_parts(path._mapping.linktarget)

if (tuple_parts := tuple(new_parts)) in seen:
raise RuntimeError("Recursive path detected. Cannot resolve")
seen.add(tuple_parts)
idx += 1

return path # type: ignore # cannot be unbound

def exists(self, *, follow_symlinks: bool = True) -> bool:
"""Return whether this path exists. Similar to :meth:`pathlib.Path.exists`."""
return hasattr(self.resolve(strict=False, _follow_symlinks=follow_symlinks), "_mapping")

def iterdir(self) -> Generator[Self, None, None]:
"""Iterate over this path. Similar to :meth:`pathlib.Path.iterdir`."""
for path in self._manifest._paths.values():
Expand Down Expand Up @@ -508,8 +599,8 @@ def __len__(self) -> int:

@cached_slot_property("_cs_paths")
def _paths(self) -> dict[tuple[str, ...], ManifestPath]:
return {("/",): ManifestPath(self, PayloadFileMapping("/", flags=DepotFileFlag.Directory))} | {
(path := ManifestPath(self, mapping)).parts: path for mapping in self._payload.mappings
return {("/",): ManifestPath(manifest=self, mapping=PayloadFileMapping("/", flags=DepotFileFlag.Directory))} | {
(path := ManifestPath(manifest=self, mapping=mapping)).parts: path for mapping in self._payload.mappings
}

@property
Expand Down Expand Up @@ -592,48 +683,31 @@ async def fetch_manifest(
return manifest


@dataclass(slots=True)
class Branch:
"""Represents a branch on for a Steam app. Branches are specific builds of an application that have made available
publicly or privately through Steam.
Read more on `steamworks <https://partner.steamgames.com/doc/store/application/branches>`_.
"""

__slots__ = (
"name",
"build_id",
"password_required",
"updated_at",
"description",
"depots",
"password",
)

def __init__(
self,
name: str,
build_id: int,
updated_at: datetime | None,
password_required: bool,
description: str | None,
):
self.name = name
"""The name of the branch."""
self.build_id = build_id
"""
The branch's build ID. This is a globally incrementing number. Build IDs are updated when a new build of an
application is pushed.
"""
self.password_required = password_required
"""Whether a password is required to access this branch."""
self.updated_at = updated_at
"""The time this branch was last updated."""
self.description = description
"""This branch's description."""
self.depots: list[Depot] = []
"""This branch's depots."""
self.password: str | None = None
"""This branch's password."""
name: str
"""The name of the branch."""
build_id: int
"""
The branch's build ID. This is a globally incrementing number. Build IDs are updated when a new build of an
application is pushed.
"""
updated_at: datetime | None
"""The time this branch was last updated."""
password_required: bool
"""Whether a password is required to access this branch."""
description: str | None
"""This branch's description."""
depots: list[Depot] = field(default_factory=list)
"""This branch's depots."""
password: str | None = None
"""This branch's password."""

def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r} build_id={self.build_id}>"
Expand All @@ -648,25 +722,20 @@ async def fetch_manifests(self) -> list[Manifest]:
return await asyncio.gather(*(manifest.fetch() for manifest in self.manifests)) # type: ignore # typeshed lies


@dataclass(slots=True)
class ManifestInfo:
"""Represents information about a manifest."""

__slots__ = ("_state", "id", "branch", "depot")
depot: Depot
_state: ConnectionState
id: ManifestID
"""The manifest's ID."""
branch: Branch
"""The branch this manifest is for."""
size: int | None
download_size: int | None
depot: Depot = field(init=False)
"""The depot this manifest is for."""

def __init__(
self,
state: ConnectionState,
id: ManifestID,
branch: Branch,
):
self._state = state
self.id = id
"""The manifest's ID."""
self.branch = branch
"""The branch this manifest is for."""

def __repr__(self) -> str:
return f"<{self.__class__.__name__} name={self.name!r} id={self.id}>"

Expand All @@ -688,14 +757,14 @@ def __init__(self, state: ConnectionState, encrypted_id: str, branch: Branch):
self.encrypted_id = encrypted_id
self.branch = branch

@cached_slot_property
def id(self) -> int:
@cached_slot_property # type: ignore
def id(self) -> ManifestID:
if self.branch.password is None:
raise ValueError("Cannot access the id of this depot as the password is not set.")
cipher = Cipher(algorithms.AES(self.branch.password.encode("UTF-8")), modes.ECB())
decryptor = cipher.decryptor()
to_unpack = utils.unpad(decryptor.update(bytes.fromhex(self.encrypted_id) + decryptor.finalize()))
return struct.unpack("<Q", to_unpack)[0]
return ManifestID(*struct.unpack("<Q", to_unpack))

@staticmethod
def _get_id(depots: manifest.Depot, branch: Branch) -> VDFInt | None:
Expand All @@ -705,7 +774,7 @@ def _get_id(depots: manifest.Depot, branch: Branch) -> VDFInt | None:
return None


@dataclass(repr=False, slots=True)
@dataclass(slots=True)
class HeadlessDepot:
"""Represents a depot without a branch."""

Expand Down Expand Up @@ -1068,7 +1137,19 @@ def __repr__(self) -> str:
resolved = [f"{name}={getattr(self, name)!r}" for name in attrs]
return f"<{self.__class__.__name__} {' '.join(resolved)}>"

async def apps(self, *, language: Language | None = None) -> list[PartialApp[None]]:
@overload
async def apps(self, *, language: Language) -> list[PartialApp[str]]:
...

@overload
async def apps(self, *, language: None = ...) -> list[PartialApp[None]]:
...

async def apps( # type: ignore[reportIncompatibleMethodOverride]
self,
*,
language: Language | None = None,
) -> list[PartialApp[None]] | list[PartialApp[str]]:
if language is not None:
return await super().apps(language=language)
return self._apps

0 comments on commit 4733287

Please sign in to comment.