From ae356774ab60af7ffa665a7b80e28bd73a9a7f48 Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Wed, 10 May 2023 01:57:07 -0700 Subject: [PATCH] Avoid sorting waiting queue & Minor code cleaning (#93) --- cacheflow/core/scheduler.py | 23 +++++++++------------ cacheflow/core/server.py | 1 - cacheflow/frontend/simple_frontend.py | 1 - cacheflow/model_executor/memory_analyzer.py | 1 - 4 files changed, 10 insertions(+), 16 deletions(-) diff --git a/cacheflow/core/scheduler.py b/cacheflow/core/scheduler.py index 7656ae97..02e864e6 100644 --- a/cacheflow/core/scheduler.py +++ b/cacheflow/core/scheduler.py @@ -1,21 +1,16 @@ import enum -import os -import pickle import time -from typing import Any, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from cacheflow.core.block_manager import BlockSpaceManager from cacheflow.logger import init_logger from cacheflow.core.policy import PolicyFactory from cacheflow.sampling_params import SamplingParams -from cacheflow.sequence import Sequence -from cacheflow.sequence import SequenceGroup -from cacheflow.sequence import SequenceGroupMetadata -from cacheflow.sequence import SequenceOutputs -from cacheflow.sequence import SequenceStatus - +from cacheflow.sequence import (Sequence, SequenceGroup, SequenceGroupMetadata, + SequenceOutputs, SequenceStatus) logger = init_logger(__name__) + _LOGGING_INTERVAL_SEC = 10 @@ -129,7 +124,6 @@ class Scheduler: # Swap in the sequence groups in the SWAPPED state if possible. self.swapped = self.policy.sort_by_priority(now, self.swapped) - # FCFS while self.swapped and not blocks_to_swap_out: seq_group = self.swapped[0] # If the sequence group has been preempted in this step, stop. @@ -162,7 +156,9 @@ class Scheduler: # This is because we want to bound the amount of CPU memory taken by # the swapped sequence groups. if not self.swapped: - self.waiting = self.policy.sort_by_priority(now, self.waiting) + # Optimization: We do not sort the waiting queue since the preempted + # sequence groups are added to the front and the new sequence groups + # are added to the back. while self.waiting: seq_group = self.waiting[0] # If the sequence group has been preempted in this step, stop. @@ -347,7 +343,6 @@ class Scheduler: self.block_manager.allocate(seq_group) for seq in seq_group.seqs: seq.status = SequenceStatus.RUNNING - # FIXME(woosuk): Support interactive generation. if seq_group.group_id not in self.num_steps: self.num_steps[seq_group.group_id] = 0 @@ -404,7 +399,9 @@ class Scheduler: for seq in seqs: seq.status = SequenceStatus.WAITING self.block_manager.free(seq) - self.waiting.append(seq_group) + # NOTE: For FCFS, we insert the preempted sequence group to the front + # of the waiting queue. + self.waiting.insert(0, seq_group) def _preempt_by_swap( self, diff --git a/cacheflow/core/server.py b/cacheflow/core/server.py index 2f968f8b..a35a27bc 100644 --- a/cacheflow/core/server.py +++ b/cacheflow/core/server.py @@ -17,7 +17,6 @@ from cacheflow.sequence import SequenceGroup from cacheflow.utils import get_gpu_memory, get_cpu_memory from cacheflow.worker.controller import Controller, DeviceID - logger = init_logger(__name__) diff --git a/cacheflow/frontend/simple_frontend.py b/cacheflow/frontend/simple_frontend.py index da363953..9d65e4f0 100644 --- a/cacheflow/frontend/simple_frontend.py +++ b/cacheflow/frontend/simple_frontend.py @@ -7,7 +7,6 @@ from cacheflow.sampling_params import SamplingParams from cacheflow.sequence import Sequence, SequenceGroup from cacheflow.utils import Counter - logger = init_logger(__name__) diff --git a/cacheflow/model_executor/memory_analyzer.py b/cacheflow/model_executor/memory_analyzer.py index bc85d658..fb910e64 100644 --- a/cacheflow/model_executor/memory_analyzer.py +++ b/cacheflow/model_executor/memory_analyzer.py @@ -4,7 +4,6 @@ from transformers import AutoConfig from cacheflow.logger import init_logger from cacheflow.model_executor.utils import get_dtype_size - logger = init_logger(__name__) _GiB = 1 << 30