Skip to content

Commit

Permalink
Support major compaction in ManualCompaction
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed May 7, 2024
1 parent b183480 commit b39f805
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
10 changes: 8 additions & 2 deletions pymilvus/client/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,13 +1525,19 @@ def load_balance(
check_status(status)

@retry_on_rpc_failure()
def compact(self, collection_name: str, timeout: Optional[float] = None, **kwargs) -> int:
def compact(
self,
collection_name: str,
timeout: Optional[float] = None,
is_major: Optional[bool] = False,
**kwargs,
) -> int:
request = Prepare.describe_collection_request(collection_name)
rf = self._stub.DescribeCollection.future(request, timeout=timeout)
response = rf.result()
check_status(response.status)

req = Prepare.manual_compaction(response.collectionID)
req = Prepare.manual_compaction(response.collectionID, is_major)
future = self._stub.ManualCompaction.future(req, timeout=timeout)
response = future.result()
check_status(response.status)
Expand Down
3 changes: 2 additions & 1 deletion pymilvus/client/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,12 +961,13 @@ def load_balance_request(
)

@classmethod
def manual_compaction(cls, collection_id: int):
def manual_compaction(cls, collection_id: int, is_major: bool):
if collection_id is None or not isinstance(collection_id, int):
raise ParamError(message=f"collection_id value {collection_id} is illegal")

request = milvus_types.ManualCompactionRequest()
request.collectionID = collection_id
request.majorCompaction = is_major

return request

Expand Down
18 changes: 14 additions & 4 deletions pymilvus/client/stub.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ def load_balance(
**kwargs,
)

def compact(self, collection_name, timeout=None, **kwargs) -> int:
def compact(self, collection_name, timeout=None, is_major=False, **kwargs) -> int:
"""
Do compaction for the collection.
Expand All @@ -1054,15 +1054,20 @@ def compact(self, collection_name, timeout=None, **kwargs) -> int:
:param timeout: The timeout for this method, unit: second
:type timeout: int
:param is_major: trigger major compaction
:type is_major: bool
:return: the compaction ID
:rtype: int
:raises MilvusException: If collection name not exist.
"""
with self._connection() as handler:
return handler.compact(collection_name, timeout=timeout, **kwargs)
return handler.compact(collection_name, timeout=timeout, is_major=is_major, **kwargs)

def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> CompactionState:
def get_compaction_state(
self, compaction_id: int, timeout=None, is_major=False, **kwargs
) -> CompactionState:
"""
Get compaction states of a targeted compaction id
Expand All @@ -1072,14 +1077,19 @@ def get_compaction_state(self, compaction_id: int, timeout=None, **kwargs) -> Co
:param timeout: The timeout for this method, unit: second
:type timeout: int
:param is_major: get major compaction
:type is_major: bool
:return: the state of the compaction
:rtype: CompactionState
:raises MilvusException: If compaction_id doesn't exist.
"""

with self._connection() as handler:
return handler.get_compaction_state(compaction_id, timeout=timeout, **kwargs)
return handler.get_compaction_state(
compaction_id, timeout=timeout, is_major=is_major, **kwargs
)

def wait_for_compaction_completed(
self, compaction_id: int, timeout=None, **kwargs
Expand Down
28 changes: 25 additions & 3 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1489,37 +1489,53 @@ def drop_index(self, timeout: Optional[float] = None, **kwargs):
)
index.drop(timeout=timeout, **kwargs)

def compact(self, timeout: Optional[float] = None, **kwargs):
def compact(self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs):
"""Compact merge the small segments in a collection
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_major (``bool``, optional): An optional setting to trigger major compaction.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
self.compaction_id = conn.compact(self._name, timeout=timeout, **kwargs)
if is_major:
self.major_compaction_id = conn.compact(
self._name, timeout=timeout, is_major=is_major, **kwargs
)
else:
self.compaction_id = conn.compact(
self._name, timeout=timeout, is_major=is_major, **kwargs
)

def get_compaction_state(self, timeout: Optional[float] = None, **kwargs) -> CompactionState:
def get_compaction_state(
self, timeout: Optional[float] = None, is_major: Optional[bool] = False, **kwargs
) -> CompactionState:
"""Get the current compaction state
Args:
timeout (``float``, optional): An optional duration of time in seconds to allow
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_major (``bool``, optional): An optional setting to get major compaction state.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_major:
return conn.get_compaction_state(self.major_compaction_id, timeout=timeout, **kwargs)
return conn.get_compaction_state(self.compaction_id, timeout=timeout, **kwargs)

def wait_for_compaction_completed(
self,
timeout: Optional[float] = None,
is_major: Optional[bool] = False,
**kwargs,
) -> CompactionState:
"""Block until the current collection's compaction completed
Expand All @@ -1529,10 +1545,16 @@ def wait_for_compaction_completed(
for the RPC. When timeout is set to None, client waits until server response
or error occur.
is_major (``bool``, optional): An optional setting to get major compaction state.
Raises:
MilvusException: If anything goes wrong.
"""
conn = self._get_connection()
if is_major:
return conn.wait_for_compaction_completed(
self.major_compaction_id, timeout=timeout, **kwargs
)
return conn.wait_for_compaction_completed(self.compaction_id, timeout=timeout, **kwargs)

def get_compaction_plans(self, timeout: Optional[float] = None, **kwargs) -> CompactionPlans:
Expand Down

0 comments on commit b39f805

Please sign in to comment.