diff --git a/cacheflow/master/scheduler.py b/cacheflow/master/scheduler.py index 2ca10117..3d2e300a 100644 --- a/cacheflow/master/scheduler.py +++ b/cacheflow/master/scheduler.py @@ -10,10 +10,17 @@ class Scheduler: def __int__( self, + controllers: List, block_size: int, num_gpu_blocks: int, num_cpu_blocks: int, ) -> None: + self.controllers = controllers + self.block_size = block_size + self.num_gpu_blocks = num_gpu_blocks + self.num_cpu_blocks = num_cpu_blocks + + # Create the block space manager. self.block_manager = BlockSpaceManager( block_size=block_size, num_gpu_blocks=num_gpu_blocks, @@ -31,9 +38,13 @@ class Scheduler: # Swapped sequence groups (LIFO). self.swapped: List[SequenceGroup] = [] - # Pending sequence groups (FIFO). - self.queue: List[SequenceGroup] = [] + self.pending: List[SequenceGroup] = [] + + # Blocks that need to be swaped or copied before model execution. + self.blocks_to_swap_in: Dict[int, int] = [] + self.blocks_to_swap_out: Dict[int, int] = [] + self.blocks_to_copy: Dict[int, int] = [] def _free_seq(self, seq: Sequence) -> None: seq.status = SequenceStatus.FINISHED @@ -52,12 +63,11 @@ class Scheduler: ret = self.block_manager.append(seq) if ret is not None: src_block, dst_block = ret - # TODO: Issue COPY commands to the workers. + self.blocks_to_copy[src_block] = dst_block def _swap_in(self, seq_group: SequenceGroup) -> None: - # TODO: Issue SWAP_IN commands to the workers. - self.block_manager.swap_in(seq_group) - self.block_manager.append(seq_group) + mapping = self.block_manager.swap_in(seq_group) + self.blocks_to_swap_in.update(mapping) for seq in seq_group.seqs: if seq.status == SequenceStatus.SWAPPED: seq.status = SequenceStatus.RUNNING @@ -65,14 +75,14 @@ class Scheduler: def _swap_out(self, seq_group: SequenceGroup) -> None: assert self.block_manager.can_swap_out(seq_group) - # TODO: Issue SWAP_OUT commands to the workers. - self.block_manager.swap_out(seq_group) + mapping = self.block_manager.swap_out(seq_group) + self.blocks_to_swap_out.update(mapping) for seq in seq_group.seqs: if seq.status == SequenceStatus.RUNNING: seq.status = SequenceStatus.SWAPPED self.swapped.append(seq_group) - def step(self) -> None: + def prepare(self) -> None: # 1. Prepare new slots for the running sequences. # NOTE: Here we implicitly assume FCFS scheduling. # That is, the most recently added sequence group is the first @@ -100,6 +110,7 @@ class Scheduler: for i, seq_group in enumerate(reversed(self.swapped)): if self.block_manager.can_swap_in(seq_group): self._swap_in(seq_group) + self._append(seq_group) else: # OOM. Stop swapping. self.swapped = self.swapped[:len(self.swapped) - i] @@ -112,14 +123,30 @@ class Scheduler: # NOTE: Here we implicitly assume FCFS scheduling. # TODO(woosuk): Add a heuristic to control the maximum batch size. if not self.swapped: - for i, seq_group in enumerate(self.queue): + for i, seq_group in enumerate(self.pending): if self.block_manager.can_allocate(seq_group): self._allocate(seq_group) else: # FIXME: Consider the race condition. - self.queue = self.queue[i:] + self.pending = self.pending[i:] break + def step(self) -> None: + # Ensure that either swap-in or swap-out is performed. + if self.blocks_to_swap_in is not None: + assert self.blocks_to_swap_out is None + + # Execute the first stage of the pipeline. + self.controllers[0].execute_stage( + self.blocks_to_swap_in.copy(), + self.blocks_to_swap_out.copy(), + self.blocks_to_copy.copy(), + ) + # Clear for the next step. + self.blocks_to_swap_in.clear() + self.blocks_to_swap_out.clear() + self.blocks_to_copy.clear() + def post_step( self, next_tokens: Dict[int, Tuple[int, int]],