-
Notifications
You must be signed in to change notification settings - Fork 4
/
transaction.py
264 lines (212 loc) · 8.88 KB
/
transaction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""
Functionality for extended lakeFS transactions to conduct versioning operations between file uploads.
"""
import logging
import random
import string
import warnings
from collections import deque
from typing import TYPE_CHECKING, Literal, TypeVar
import lakefs
from fsspec.transaction import Transaction
from lakefs.branch import Branch, Reference
from lakefs.client import Client
from lakefs.exceptions import ServerException
from lakefs.object import ObjectWriter
from lakefs.reference import Commit, ReferenceType
from lakefs.repository import Repository
from lakefs.tag import Tag
T = TypeVar("T")
logger = logging.getLogger("lakefs-spec")
if TYPE_CHECKING: # pragma: no cover
from lakefs_spec import LakeFSFileSystem
def _ensurebranch(b: str | Branch, repository: str, client: Client) -> Branch:
if isinstance(b, str):
return Branch(repository, b, client=client)
return b
class LakeFSTransaction(Transaction):
"""
A lakeFS transaction model capable of versioning operations in between file uploads.
Parameters
----------
fs: LakeFSFileSystem
The lakeFS file system associated with the transaction.
"""
def __init__(self, fs: "LakeFSFileSystem"):
super().__init__(fs=fs)
self.fs: LakeFSFileSystem
self.files: deque[ObjectWriter] = deque(self.files)
self.repository: str | None = None
self.base_branch: Branch | None = None
self.automerge: bool = False
self.delete: Literal["onsuccess", "always", "never"] = "onsuccess"
self._ephemeral_branch: Branch | None = None
def __call__(
self,
repository: str | Repository,
base_branch: str | Branch = "main",
branch_name: str | None = None,
automerge: bool = True,
delete: Literal["onsuccess", "always", "never"] = "onsuccess",
) -> "LakeFSTransaction":
"""
Creates an ephemeral branch, conducts all uploads and operations on that branch,
and optionally merges it back into the source branch.
repository: str | Repository
The repository in which to conduct the transaction.
base_branch: str | Branch
The branch on which the transaction operations should be based.
automerge: bool
Automatically merge the ephemeral branch into the base branch after successful
transaction completion.
delete: Literal["onsuccess", "always", "never"]
Cleanup policy / deletion handling for the ephemeral branch after the transaction.
If ``"onsuccess"``, the branch is deleted if the transaction succeeded,
or left over if an error occurred.
If ``"always"``, the ephemeral branch is always deleted after transaction regardless of success
or failure.
If ``"never"``, the transaction branch is always left in the repository.
"""
if isinstance(repository, str):
self.repository = repository
else:
self.repository = repository.id
repo = lakefs.Repository(self.repository, client=self.fs.client)
try:
_ = repo.metadata
except ServerException:
raise ValueError(f"repository {self.repository!r} does not exist") from None
# base branch needs to be a lakefs.Branch, since it is being diffed
# with the ephemeral branch in __exit__.
self.base_branch = _ensurebranch(base_branch, self.repository, self.fs.client)
self.automerge = automerge
self.delete = delete
ephem_name = branch_name or "transaction-" + "".join(random.choices(string.digits, k=6)) # nosec: B311
self._ephemeral_branch = Branch(self.repository, ephem_name, client=self.fs.client)
return self
def __enter__(self):
logger.debug(
f"Creating ephemeral branch {self._ephemeral_branch.id!r} "
f"from branch {self.base_branch.id!r}."
)
self._ephemeral_branch.create(self.base_branch, exist_ok=False)
self.fs._intrans = True
return self
def __exit__(self, exc_type, exc_val, exc_tb):
success = exc_type is None
while self.files:
# fsspec base class calls `append` on the file, which means we
# have to pop from the left to preserve order.
f = self.files.popleft()
if not success:
f.discard()
self.fs._intrans = False
self.fs._transaction = None
if any(self._ephemeral_branch.uncommitted()):
msg = f"Finished transaction on branch {self._ephemeral_branch.id!r} with uncommitted changes."
if self.delete != "never":
msg += " Objects added but not committed are lost."
warnings.warn(msg)
if success and self.automerge:
if any(self.base_branch.diff(self._ephemeral_branch)):
self._ephemeral_branch.merge_into(self.base_branch)
if self.delete == "always" or (success and self.delete == "onsuccess"):
self._ephemeral_branch.delete()
@property
def branch(self):
return self._ephemeral_branch
def commit(self, message: str, metadata: dict[str, str] | None = None) -> Reference:
"""
Create a commit on this transaction's ephemeral branch with a commit message
and attached metadata.
Parameters
----------
message: str
The commit message to attach to the newly created commit.
metadata: dict[str, str] | None
Optional metadata to enrich the created commit with (author, e-mail, ...).
Returns
-------
Reference
The created commit.
"""
diff = list(self.branch.uncommitted())
if not diff:
logger.warning(f"No changes to commit on branch {self.branch.id!r}.")
return self.branch.head
return self.branch.commit(message, metadata=metadata)
def merge(self, source_ref: str | Branch, into: str | Branch) -> Commit:
"""
Merge a branch into another branch in a repository.
In case the branch contains no changes relevant to the target branch,
no merge happens, and the tip of the target branch is returned instead.
Parameters
----------
source_ref: str | Branch
Source reference containing the changes to merge.
Can be a branch name or partial commit SHA.
into: str | Branch
Target branch into which the changes will be merged.
Returns
-------
Commit
Either the created merge commit, or the head commit of the target branch.
"""
source = _ensurebranch(source_ref, self.repository, self.fs.client)
dest = _ensurebranch(into, self.repository, self.fs.client)
if any(dest.diff(source)):
source.merge_into(dest)
return dest.head.get_commit()
def revert(self, branch: str | Branch, ref: ReferenceType, parent_number: int = 1) -> Commit:
"""
Revert a previous commit on a branch.
Parameters
----------
branch: str | Branch
Branch on which the commit should be reverted.
ref: ReferenceType
The reference to revert.
parent_number: int
If there are multiple parents to a commit, specify to which parent
the commit should be reverted. ``parent_number = 1`` (the default)
refers to the first parent commit of the current ``branch`` tip.
Returns
-------
Commit
The created revert commit.
"""
b = _ensurebranch(branch, self.repository, self.fs.client)
ref_id = ref if isinstance(ref, str) else ref.id
b.revert(ref_id, parent_number=parent_number)
return b.head.get_commit()
def rev_parse(self, ref: ReferenceType) -> Commit:
"""
Parse a given lakeFS reference expression and obtain its corresponding commit.
Parameters
----------
ref: ReferenceType
Reference object to resolve, can be a branch, commit SHA, or tag.
Returns
-------
Commit
The commit referenced by the expression ``ref``.
"""
ref_id = ref.id if isinstance(ref, Reference) else ref
reference = lakefs.Reference(self.repository, ref_id, client=self.fs.client)
return reference.get_commit()
def tag(self, ref: ReferenceType, name: str) -> Tag:
"""
Create a tag referencing a commit in a repository.
Parameters
----------
ref: ReferenceType
Commit SHA or placeholder for a reference or commit object
to which the new tag will point.
name: str
Name of the tag to be created.
Returns
-------
Tag
The requested tag.
"""
return lakefs.Tag(self.repository, name, client=self.fs.client).create(ref)