From 6273f7471d91ca918a1c9a476d94ed797ea21dea Mon Sep 17 00:00:00 2001 From: sang Date: Thu, 29 Feb 2024 01:40:54 -0800 Subject: [PATCH 1/2] ip --- .buildkite/test-pipeline.yaml | 3 + tests/core/test_block_manager.py | 324 +++++++++++++++++++++++++++++++ tests/core/utils.py | 39 ++++ 3 files changed, 366 insertions(+) create mode 100644 tests/core/test_block_manager.py create mode 100644 tests/core/utils.py diff --git a/.buildkite/test-pipeline.yaml b/.buildkite/test-pipeline.yaml index c65ab04b8ddda..15f971b66e3bd 100644 --- a/.buildkite/test-pipeline.yaml +++ b/.buildkite/test-pipeline.yaml @@ -13,6 +13,9 @@ steps: - label: Basic Correctness Test command: pytest -v -s --forked basic_correctness + +- label: Core Test + command: pytest -v -s core - label: Distributed Comm Ops Test command: pytest -v -s --forked test_comm_ops.py diff --git a/tests/core/test_block_manager.py b/tests/core/test_block_manager.py new file mode 100644 index 0000000000000..3f695e48a76ee --- /dev/null +++ b/tests/core/test_block_manager.py @@ -0,0 +1,324 @@ +# This library may only be used in the Anyscale Platform. +# Notwithstanding the terms of any license or notice within this container, +# you may not modify, copy or remove this file. +# Your right to use this library is subject to the +# Anyscale Terms of Service (anyscale.com/terms) +# or other written agreement between you and Anyscale. + +# Copyright (2023 and onwards) Anyscale, Inc. +# This Software includes software developed at Anyscale (anyscale.com/) +# and its use is subject to the included LICENSE file. + +import pytest +import time +from typing import List + +from vllm import SamplingParams +from vllm.block import PhysicalTokenBlock +from vllm.core.block_manager import BlockAllocator, BlockSpaceManager, AllocStatus +from vllm.utils import Device +from vllm.sequence import SampleLogprob, Sequence, SequenceGroup, SequenceStatus +from tests.utils import round_up_to_next_block + +from .utils import create_dummy_prompt + + +def test_block_allocator_allocate(): + block_size = 4 + num_cpu_blocks = 4 + cpu_allocator = BlockAllocator(Device.CPU, block_size, num_cpu_blocks) + + # Allocate all available cpu blocks. + num_free = num_cpu_blocks + assert cpu_allocator.get_num_free_blocks() == num_free + for _ in range(num_cpu_blocks): + block = cpu_allocator.allocate() + num_free -= 1 + assert block not in cpu_allocator.free_blocks + assert cpu_allocator.get_num_free_blocks() == num_free + + with pytest.raises(ValueError): + cpu_allocator.allocate() + + +def test_block_allocator_free(): + block_size = 4 + num_cpu_blocks = 4 + cpu_allocator = BlockAllocator(Device.CPU, block_size, num_cpu_blocks) + + # Allocate all available cpu blocks. + blocks: List[PhysicalTokenBlock] = [] + for _ in range(num_cpu_blocks): + block = cpu_allocator.allocate() + blocks.append(block) + assert block not in cpu_allocator.free_blocks + + # Free all allocated cpu blocks. + num_free = 0 + assert cpu_allocator.get_num_free_blocks() == num_free + for block in blocks: + cpu_allocator.free(block) + num_free += 1 + assert block in cpu_allocator.free_blocks + assert cpu_allocator.get_num_free_blocks() == num_free + + with pytest.raises(ValueError): + cpu_allocator.free(block) + + +def test_allocate(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + # Allocate same sequence group to all available gpu blocks. + for i in range(num_gpu_blocks): + _, seq_group = create_dummy_prompt(str(i), block_size) + assert block_manager.can_allocate(seq_group) + block_manager.allocate(seq_group) + assert block_manager.can_allocate(seq_group) != AllocStatus.OK + + # Allocate same sequence group to all available gpu blocks. + # Use watermark to reserve one gpu block. + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=1 / num_gpu_blocks) + for i in range(num_gpu_blocks - 1): + _, seq_group = create_dummy_prompt(str(i), block_size) + assert block_manager.can_allocate(seq_group) + block_manager.allocate(seq_group) + assert block_manager.can_allocate(seq_group) != AllocStatus.OK + + +def test_append_slot_single_seq(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + # Allocate single seq to gpu block. + prompt, seq_group = create_dummy_prompt("1", block_size) + block_manager.allocate(seq_group) + + # Nothing to append. Sequence has no new logical blocks. + assert block_manager.can_append_slots(seq_group) + before_blocks = block_manager.get_num_free_gpu_blocks() + assert not block_manager.append_slots(prompt) + after_blocks = block_manager.get_num_free_gpu_blocks() + assert before_blocks == after_blocks + + # Add block_size number of new tokens and append slot. + for i in range(block_size): + token_id = i + 5 + prompt.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + + assert block_manager.can_append_slots(seq_group) + before_blocks = block_manager.get_num_free_gpu_blocks() + assert not block_manager.append_slots(prompt) + after_blocks = block_manager.get_num_free_gpu_blocks() + assert before_blocks - after_blocks == 1 + + +@pytest.mark.parametrize("prompt_len", [1, 10, 100]) +@pytest.mark.parametrize("num_unprocessed_tokens", [1, 10, 100]) +@pytest.mark.parametrize("block_size", [1, 8, 16, 32]) +def test_append_multiple_slot_single_seq(prompt_len: int, + num_unprocessed_tokens: int, + block_size: int): + """Verify correct allocation when multiple tokens need to be processed. + """ + num_cpu_blocks = 0 + num_gpu_blocks = 8192 // block_size + block_manager = BlockSpaceManager(block_size=block_size, + num_gpu_blocks=num_gpu_blocks, + num_cpu_blocks=num_cpu_blocks, + watermark=0) + + # Allocate single seq to gpu block. + prompt, seq_group = create_dummy_prompt(request_id="1", + prompt_length=prompt_len, + block_size=block_size) + block_manager.allocate(seq_group) + + # Nothing to append. Sequence has no new logical blocks. + assert block_manager.can_append_slots(seq_group) + before_blocks = block_manager.get_num_free_gpu_blocks() + assert not block_manager.append_slots(prompt) + after_blocks = block_manager.get_num_free_gpu_blocks() + assert before_blocks == after_blocks + + # Append new tokens, expect correct number of new blocks + new_token_ids = list(range(num_unprocessed_tokens)) + + prompt.append_token_ids( + new_token_ids, + [{token_id: SampleLogprob(logprob=0.0) + for token_id in new_token_ids}]) + + old_seq_len_in_blocks = round_up_to_next_block(prompt_len, block_size) + new_seq_len_in_blocks = round_up_to_next_block( + prompt_len + num_unprocessed_tokens, block_size) + num_expected_new_blocks = new_seq_len_in_blocks - old_seq_len_in_blocks + + assert block_manager.can_append_slots(seq_group) + before_blocks = block_manager.get_num_free_gpu_blocks() + assert not block_manager.append_slots(prompt) + after_blocks = block_manager.get_num_free_gpu_blocks() + assert before_blocks - after_blocks == num_expected_new_blocks + + +def test_append_slot_cow(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + # Allocate prompt to gpu block. + prompt = Sequence(1, "one two three", [1, 2, 3], block_size) + child = prompt.fork(2) + token_id = 4 + child.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + seq_group = SequenceGroup("1", [prompt, child], SamplingParams(), + time.time(), time.perf_counter) + block_manager.allocate(seq_group) + + # Append slot for child token. + # Last block being modified is shared. Copy on write occurs. + assert block_manager.can_append_slots(seq_group) + before_blocks = block_manager.get_num_free_gpu_blocks() + cow_src_dst = block_manager.append_slots(child) + + assert len(cow_src_dst) > 0 + for src_block, dst_block in cow_src_dst.items(): + assert src_block != dst_block + + after_blocks = block_manager.get_num_free_gpu_blocks() + assert before_blocks - after_blocks == 1 + + +def test_fork(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + prompt, seq_group = create_dummy_prompt("1", + block_size - 1, + block_size=block_size) + block_manager.allocate(seq_group) + + # Fork prompt and copy block tables. + child = prompt.fork(2) + block_manager.fork(prompt, child) + assert block_manager.get_block_table( + prompt) == block_manager.get_block_table(child) + token_id = 4 + # Append token to child. Block is shared so copy on write occurs. + child.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + block_manager.append_slots(child) + assert block_manager.get_block_table( + prompt) != block_manager.get_block_table(child) + + +def test_swap(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1) + prompt.status = SequenceStatus.RUNNING + block_manager.allocate(seq_group) + + # Emulate a forward pass by appending a single token. + # The block manager then knows how many unprocessed + # tokens will be written in the next forward pass. + token_id = 0 + prompt.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + + # Swap seq group from GPU -> CPU. + gpu_blocks = block_manager.get_block_table(prompt) + assert block_manager.can_swap_out(seq_group) + before_cpu_blocks = block_manager.get_num_free_cpu_blocks() + before_gpu_blocks = block_manager.get_num_free_gpu_blocks() + mapping = block_manager.swap_out(seq_group) + assert list(mapping.keys()) == gpu_blocks + after_cpu_blocks = block_manager.get_num_free_cpu_blocks() + after_gpu_blocks = block_manager.get_num_free_gpu_blocks() + assert before_cpu_blocks == after_cpu_blocks + len(gpu_blocks) + assert before_gpu_blocks + len(gpu_blocks) == after_gpu_blocks + prompt.status = SequenceStatus.SWAPPED + + # Swap seq group from CPU -> GPU. + cpu_blocks = block_manager.get_block_table(prompt) + assert block_manager.can_swap_in(seq_group) + before_cpu_blocks = block_manager.get_num_free_cpu_blocks() + before_gpu_blocks = block_manager.get_num_free_gpu_blocks() + mapping = block_manager.swap_in(seq_group) + assert list(mapping.keys()) == cpu_blocks + after_cpu_blocks = block_manager.get_num_free_cpu_blocks() + after_gpu_blocks = block_manager.get_num_free_gpu_blocks() + assert before_cpu_blocks + len(cpu_blocks) == after_cpu_blocks + assert before_gpu_blocks == after_gpu_blocks + len(cpu_blocks) + + +def test_free(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + prompt, seq_group = create_dummy_prompt("1", block_size) + block_manager.allocate(seq_group) + + # Free allocated seq. + prompt_blocks = len(block_manager.get_block_table(prompt)) + before_blocks = block_manager.get_num_free_gpu_blocks() + block_manager.free(prompt) + after_blocks = block_manager.get_num_free_gpu_blocks() + assert after_blocks == before_blocks + prompt_blocks + + # Block table for freed seq is deleted. + with pytest.raises(KeyError): + block_manager.get_block_table(prompt) + + +def test_reset(): + block_size = 4 + num_cpu_blocks = 4 + num_gpu_blocks = 4 + block_manager = BlockSpaceManager(block_size, + num_cpu_blocks, + num_gpu_blocks, + watermark=0) + + # Allocate same seq group on all available gpu blocks. + original_blocks = block_manager.get_num_free_gpu_blocks() + for i in range(num_gpu_blocks): + _, seq_group = create_dummy_prompt(str(i), block_size) + block_manager.allocate(seq_group) + assert block_manager.get_num_free_gpu_blocks() == 0 + + # Resetting block manager frees all allocated blocks. + block_manager.reset() + assert block_manager.get_num_free_gpu_blocks() == original_blocks diff --git a/tests/core/utils.py b/tests/core/utils.py new file mode 100644 index 0000000000000..9e56f95094df8 --- /dev/null +++ b/tests/core/utils.py @@ -0,0 +1,39 @@ +# This library may only be used in the Anyscale Platform. +# Notwithstanding the terms of any license or notice within this container, +# you may not modify, copy or remove this file. +# Your right to use this library is subject to the +# Anyscale Terms of Service (anyscale.com/terms) +# or other written agreement between you and Anyscale. + +# Copyright (2023 and onwards) Anyscale, Inc. +# This Software includes software developed at Anyscale (anyscale.com/) +# and its use is subject to the included LICENSE file. + +import time +from typing import Tuple + +from vllm import SamplingParams +from vllm.sequence import Sequence, SequenceGroup + + +def create_dummy_prompt( + request_id: str, + prompt_length: int, + block_size: int = None, + num_processed_token_ids: int = 0) -> Tuple[Sequence, SequenceGroup]: + if not block_size: + block_size = prompt_length + + # Create dummy prompt sequence with tokens 0...block_size-1 + # and prompt "0 ... block_size". + prompt_tokens = list(range(prompt_length)) + prompt_str = " ".join([str(t) for t in prompt_tokens]) + prompt = Sequence(int(request_id), + prompt_str, + prompt_tokens, + block_size, + num_processed_token_ids=num_processed_token_ids) + seq_group = SequenceGroup(request_id, [prompt], SamplingParams(), + time.time(), time.perf_counter()) + + return prompt, seq_group From e17a29d6dfa9f8550bcc21ba894428568154a34e Mon Sep 17 00:00:00 2001 From: sang Date: Thu, 29 Feb 2024 02:20:32 -0800 Subject: [PATCH 2/2] upstream tests. --- tests/core/__init__.py | 0 tests/core/test_block_manager.py | 92 +++-------------- tests/core/test_scheduler.py | 170 +++++++++++++++++++++++++++++++ tests/core/utils.py | 26 ++--- 4 files changed, 192 insertions(+), 96 deletions(-) create mode 100644 tests/core/__init__.py create mode 100644 tests/core/test_scheduler.py diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/core/test_block_manager.py b/tests/core/test_block_manager.py index 3f695e48a76ee..ecdf3025cffdf 100644 --- a/tests/core/test_block_manager.py +++ b/tests/core/test_block_manager.py @@ -1,14 +1,3 @@ -# This library may only be used in the Anyscale Platform. -# Notwithstanding the terms of any license or notice within this container, -# you may not modify, copy or remove this file. -# Your right to use this library is subject to the -# Anyscale Terms of Service (anyscale.com/terms) -# or other written agreement between you and Anyscale. - -# Copyright (2023 and onwards) Anyscale, Inc. -# This Software includes software developed at Anyscale (anyscale.com/) -# and its use is subject to the included LICENSE file. - import pytest import time from typing import List @@ -17,8 +6,7 @@ from vllm.block import PhysicalTokenBlock from vllm.core.block_manager import BlockAllocator, BlockSpaceManager, AllocStatus from vllm.utils import Device -from vllm.sequence import SampleLogprob, Sequence, SequenceGroup, SequenceStatus -from tests.utils import round_up_to_next_block +from vllm.sequence import Sequence, SequenceGroup, SequenceStatus from .utils import create_dummy_prompt @@ -109,72 +97,24 @@ def test_append_slot_single_seq(): block_manager.allocate(seq_group) # Nothing to append. Sequence has no new logical blocks. - assert block_manager.can_append_slots(seq_group) + assert block_manager.can_append_slot(seq_group) before_blocks = block_manager.get_num_free_gpu_blocks() - assert not block_manager.append_slots(prompt) + assert not block_manager.append_slot(prompt) after_blocks = block_manager.get_num_free_gpu_blocks() assert before_blocks == after_blocks # Add block_size number of new tokens and append slot. for i in range(block_size): token_id = i + 5 - prompt.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + prompt.append_token_id(token_id, {token_id: 0.0}) - assert block_manager.can_append_slots(seq_group) + assert block_manager.can_append_slot(seq_group) before_blocks = block_manager.get_num_free_gpu_blocks() - assert not block_manager.append_slots(prompt) + assert not block_manager.append_slot(prompt) after_blocks = block_manager.get_num_free_gpu_blocks() assert before_blocks - after_blocks == 1 -@pytest.mark.parametrize("prompt_len", [1, 10, 100]) -@pytest.mark.parametrize("num_unprocessed_tokens", [1, 10, 100]) -@pytest.mark.parametrize("block_size", [1, 8, 16, 32]) -def test_append_multiple_slot_single_seq(prompt_len: int, - num_unprocessed_tokens: int, - block_size: int): - """Verify correct allocation when multiple tokens need to be processed. - """ - num_cpu_blocks = 0 - num_gpu_blocks = 8192 // block_size - block_manager = BlockSpaceManager(block_size=block_size, - num_gpu_blocks=num_gpu_blocks, - num_cpu_blocks=num_cpu_blocks, - watermark=0) - - # Allocate single seq to gpu block. - prompt, seq_group = create_dummy_prompt(request_id="1", - prompt_length=prompt_len, - block_size=block_size) - block_manager.allocate(seq_group) - - # Nothing to append. Sequence has no new logical blocks. - assert block_manager.can_append_slots(seq_group) - before_blocks = block_manager.get_num_free_gpu_blocks() - assert not block_manager.append_slots(prompt) - after_blocks = block_manager.get_num_free_gpu_blocks() - assert before_blocks == after_blocks - - # Append new tokens, expect correct number of new blocks - new_token_ids = list(range(num_unprocessed_tokens)) - - prompt.append_token_ids( - new_token_ids, - [{token_id: SampleLogprob(logprob=0.0) - for token_id in new_token_ids}]) - - old_seq_len_in_blocks = round_up_to_next_block(prompt_len, block_size) - new_seq_len_in_blocks = round_up_to_next_block( - prompt_len + num_unprocessed_tokens, block_size) - num_expected_new_blocks = new_seq_len_in_blocks - old_seq_len_in_blocks - - assert block_manager.can_append_slots(seq_group) - before_blocks = block_manager.get_num_free_gpu_blocks() - assert not block_manager.append_slots(prompt) - after_blocks = block_manager.get_num_free_gpu_blocks() - assert before_blocks - after_blocks == num_expected_new_blocks - - def test_append_slot_cow(): block_size = 4 num_cpu_blocks = 4 @@ -188,20 +128,17 @@ def test_append_slot_cow(): prompt = Sequence(1, "one two three", [1, 2, 3], block_size) child = prompt.fork(2) token_id = 4 - child.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + child.append_token_id(token_id, {token_id: 0.0}) seq_group = SequenceGroup("1", [prompt, child], SamplingParams(), time.time(), time.perf_counter) block_manager.allocate(seq_group) # Append slot for child token. # Last block being modified is shared. Copy on write occurs. - assert block_manager.can_append_slots(seq_group) + assert block_manager.can_append_slot(seq_group) before_blocks = block_manager.get_num_free_gpu_blocks() - cow_src_dst = block_manager.append_slots(child) - - assert len(cow_src_dst) > 0 - for src_block, dst_block in cow_src_dst.items(): - assert src_block != dst_block + src_block, dst_block = block_manager.append_slot(child) + assert src_block != dst_block after_blocks = block_manager.get_num_free_gpu_blocks() assert before_blocks - after_blocks == 1 @@ -228,8 +165,8 @@ def test_fork(): prompt) == block_manager.get_block_table(child) token_id = 4 # Append token to child. Block is shared so copy on write occurs. - child.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) - block_manager.append_slots(child) + child.append_token_id(token_id, {token_id: 0.0}) + block_manager.append_slot(child) assert block_manager.get_block_table( prompt) != block_manager.get_block_table(child) @@ -244,14 +181,15 @@ def test_swap(): watermark=0) prompt, seq_group = create_dummy_prompt("1", prompt_length=block_size - 1) - prompt.status = SequenceStatus.RUNNING + prompt.status = SequenceStatus.WAITING block_manager.allocate(seq_group) # Emulate a forward pass by appending a single token. # The block manager then knows how many unprocessed # tokens will be written in the next forward pass. token_id = 0 - prompt.append_token_id(token_id, {token_id: SampleLogprob(0.0)}) + prompt.status = SequenceStatus.RUNNING + prompt.append_token_id(token_id, {token_id: 0.0}) # Swap seq group from GPU -> CPU. gpu_blocks = block_manager.get_block_table(prompt) diff --git a/tests/core/test_scheduler.py b/tests/core/test_scheduler.py new file mode 100644 index 0000000000000..6322b2f2d5e9e --- /dev/null +++ b/tests/core/test_scheduler.py @@ -0,0 +1,170 @@ +from typing import List +import pytest # noqa + +from vllm.config import CacheConfig, SchedulerConfig +from vllm.core.scheduler import Scheduler +from vllm.sequence import SequenceGroup + +from .utils import create_dummy_prompt + + +def test_scheduler_add_seq_group(): + block_size = 4 + scheduler_config = SchedulerConfig(100, 64, 1, 256) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 4 + cache_config.num_gpu_blocks = 4 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add seq group to scheduler. + num_seq_group = 4 + for i in range(num_seq_group): + _, seq_group = create_dummy_prompt(str(i), block_size) + scheduler.add_seq_group(seq_group) + assert scheduler.get_num_unfinished_seq_groups() == i + 1 + + +def test_scheduler_abort_seq_group(): + block_size = 4 + scheduler_config = SchedulerConfig(100, 64, 1, 256) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 4 + cache_config.num_gpu_blocks = 4 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add multiple seq groups to scheduler. + num_seq_group = 4 + request_ids = set() + for i in range(num_seq_group): + _, seq_group = create_dummy_prompt(str(i), block_size) + scheduler.add_seq_group(seq_group) + request_ids.add(str(i)) + + # Abort all added seq groups. + assert scheduler.get_num_unfinished_seq_groups() == num_seq_group + scheduler.abort_seq_group(request_ids) + assert scheduler.get_num_unfinished_seq_groups() == 0 + + +def test_scheduler_schedule_simple(): + block_size = 4 + num_seq_group = 4 + max_model_len = 16 + scheduler_config = SchedulerConfig(64, num_seq_group, max_model_len, 256) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 8 + cache_config.num_gpu_blocks = 8 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add seq groups to scheduler. + running: List[SequenceGroup] = [] + for i in range(num_seq_group): + _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size) + scheduler.add_seq_group(seq_group) + running.append(seq_group) + + # Schedule seq groups prompts. + seq_group_meta, out = scheduler.schedule() + assert set(out.scheduled_seq_groups) == set(running) + assert out.num_batched_tokens == num_seq_group * seq_group.get_seqs( + )[0].get_len() + assert (not out.blocks_to_copy and not out.blocks_to_swap_in + and not out.blocks_to_swap_out) + assert len(seq_group_meta) == num_seq_group + + # Schedule seq groups generation. + seq_group_meta, out = scheduler.schedule() + assert set(out.scheduled_seq_groups) == set(running) + assert out.num_batched_tokens == num_seq_group + assert (not out.blocks_to_copy and not out.blocks_to_swap_in + and not out.blocks_to_swap_out) + assert len(seq_group_meta) == num_seq_group + + +def test_scheduler_schedule_preempt_abort(): + block_size = 4 + max_model_len = 16 + scheduler_config = SchedulerConfig(64, 2, max_model_len, 256) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 2 + cache_config.num_gpu_blocks = 2 + scheduler = Scheduler(scheduler_config, cache_config, None) + + # Add seq groups to scheduler. + seq_a, seq_group_a = create_dummy_prompt("1", block_size) + seq_b, seq_group_b = create_dummy_prompt("2", block_size) + scheduler.add_seq_group(seq_group_a) + scheduler.add_seq_group(seq_group_b) + + # Schedule seq groups prompts. + seq_group_meta, out = scheduler.schedule() + assert out.scheduled_seq_groups == [seq_group_a, seq_group_b] + assert out.num_batched_tokens == seq_group_a.get_seqs()[0].get_len() * 2 + assert (not out.blocks_to_copy and not out.blocks_to_swap_in + and not out.blocks_to_swap_out) + assert len(seq_group_meta) == 2 + assert scheduler.get_num_unfinished_seq_groups() == 2 + + # Append "generated" tokens, allowing the sequence to mark prompt tokens as + # processed. + token_id = 0 + seq_a.append_token_id(token_id, {token_id: 0.0}) + seq_b.append_token_id(token_id, {token_id: 0.0}) + + # Schedule seq groups generation and preempt seq group b. + seq_group_meta, out = scheduler.schedule() + assert out.scheduled_seq_groups == [seq_group_a] + assert out.num_batched_tokens == 1 + assert (not out.blocks_to_copy and not out.blocks_to_swap_in + and not out.blocks_to_swap_out) + assert len(seq_group_meta) == 1 + assert scheduler.get_num_unfinished_seq_groups() == 2 + + # Abort seq group a. Re-schedule seq group b prompt with recomputation. + scheduler.abort_seq_group("1") + seq_group_meta, out = scheduler.schedule() + assert out.scheduled_seq_groups == [seq_group_b] + assert out.num_batched_tokens == seq_group_b.get_seqs()[0].get_len() + assert (not out.blocks_to_copy and not out.blocks_to_swap_in + and not out.blocks_to_swap_out) + assert len(seq_group_meta) == 1 + assert scheduler.get_num_unfinished_seq_groups() == 1 + + +def test_scheduler_max_seqs(): + block_size = 4 + num_seq_group = 4 + max_seq_group = 2 + max_model_len = 16 + scheduler_config = SchedulerConfig(64, max_seq_group, max_model_len, 256) + cache_config = CacheConfig(block_size, 1.0, 1, "auto") + cache_config.num_cpu_blocks = 8 + cache_config.num_gpu_blocks = 8 + scheduler = Scheduler(scheduler_config, cache_config, None) + + all_seq_groups: List[SequenceGroup] = [] + # Add seq groups to scheduler. + for i in range(num_seq_group): + _, seq_group = create_dummy_prompt(str(i), prompt_length=block_size) + all_seq_groups.append(seq_group) + + # Append 1 seq group + scheduler.add_seq_group(all_seq_groups[0]) + + # Schedule seq groups prompts. + _, out = scheduler.schedule() + assert set(out.scheduled_seq_groups) == set([all_seq_groups[0]]) + + # Schedule seq groups generation. + _, out = scheduler.schedule() + assert set(out.scheduled_seq_groups) == set([all_seq_groups[0]]) + + # Append 2 more seq group + scheduler.add_seq_group(all_seq_groups[1]) + scheduler.add_seq_group(all_seq_groups[2]) + + # Schedule seq groups prompts. + # Only 1 seq group should be scheduled since max_seq_group is 2 + # and one is prompting. + _, out = scheduler.schedule() + assert set(out.scheduled_seq_groups) == set([all_seq_groups[1]]) diff --git a/tests/core/utils.py b/tests/core/utils.py index 9e56f95094df8..9c0cfe1a7cf66 100644 --- a/tests/core/utils.py +++ b/tests/core/utils.py @@ -1,14 +1,3 @@ -# This library may only be used in the Anyscale Platform. -# Notwithstanding the terms of any license or notice within this container, -# you may not modify, copy or remove this file. -# Your right to use this library is subject to the -# Anyscale Terms of Service (anyscale.com/terms) -# or other written agreement between you and Anyscale. - -# Copyright (2023 and onwards) Anyscale, Inc. -# This Software includes software developed at Anyscale (anyscale.com/) -# and its use is subject to the included LICENSE file. - import time from typing import Tuple @@ -19,8 +8,7 @@ def create_dummy_prompt( request_id: str, prompt_length: int, - block_size: int = None, - num_processed_token_ids: int = 0) -> Tuple[Sequence, SequenceGroup]: + block_size: int = None) -> Tuple[Sequence, SequenceGroup]: if not block_size: block_size = prompt_length @@ -28,12 +16,12 @@ def create_dummy_prompt( # and prompt "0 ... block_size". prompt_tokens = list(range(prompt_length)) prompt_str = " ".join([str(t) for t in prompt_tokens]) - prompt = Sequence(int(request_id), - prompt_str, - prompt_tokens, - block_size, - num_processed_token_ids=num_processed_token_ids) + prompt = Sequence(int(request_id), prompt_str, prompt_tokens, block_size) seq_group = SequenceGroup(request_id, [prompt], SamplingParams(), - time.time(), time.perf_counter()) + time.time(), None, None) return prompt, seq_group + + +def round_up_to_next_block(seq_len: int, block_size: int) -> int: + return (seq_len + block_size - 1) // block_size