From 017d9f15151ce571a5f4fd381699c72a872636ec Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 20 Feb 2024 21:55:57 -0800 Subject: [PATCH] Add metrics to RequestOutput (#2876) --- tests/async_engine/test_request_tracker.py | 2 +- vllm/core/policy.py | 2 +- vllm/core/scheduler.py | 3 ++ vllm/engine/llm_engine.py | 7 +++- vllm/outputs.py | 10 ++++- vllm/sequence.py | 46 ++++++++++++++++++++-- 6 files changed, 61 insertions(+), 9 deletions(-) diff --git a/tests/async_engine/test_request_tracker.py b/tests/async_engine/test_request_tracker.py index 3e4d53c5..4043558b 100644 --- a/tests/async_engine/test_request_tracker.py +++ b/tests/async_engine/test_request_tracker.py @@ -64,7 +64,7 @@ def test_request_tracker(): stream_5 = tracker.add_request("5") assert tracker.new_requests_event.flag tracker.process_request_output( - RequestOutput("2", "output", [], [], [], finished=True)) + RequestOutput("2", "output", [], [], [], bool(finished))) new, finished = tracker.get_new_and_finished_requests() assert not tracker.new_requests_event.flag assert len(finished) == 1 diff --git a/vllm/core/policy.py b/vllm/core/policy.py index 99f183b4..2e9ebbda 100644 --- a/vllm/core/policy.py +++ b/vllm/core/policy.py @@ -33,7 +33,7 @@ class FCFS(Policy): now: float, seq_group: SequenceGroup, ) -> float: - return now - seq_group.arrival_time + return now - seq_group.metrics.arrival_time class PolicyFactory: diff --git a/vllm/core/scheduler.py b/vllm/core/scheduler.py index 4fdf9ec3..5dde9097 100644 --- a/vllm/core/scheduler.py +++ b/vllm/core/scheduler.py @@ -365,10 +365,13 @@ class Scheduler: # This function call changes the internal states of the scheduler # such as self.running, self.swapped, and self.waiting. scheduler_outputs = self._schedule() + now = time.time() # Create input data structures. seq_group_metadata_list: List[SequenceGroupMetadata] = [] for seq_group in scheduler_outputs.scheduled_seq_groups: + seq_group.maybe_set_first_scheduled_time(now) + seq_data: Dict[int, SequenceData] = {} block_tables: Dict[int, List[int]] = {} for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING): diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 2fa04f72..f0de40f5 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -728,6 +728,7 @@ class LLMEngine: def _process_model_outputs( self, output: SamplerOutput, scheduler_outputs: SchedulerOutputs) -> List[RequestOutput]: + now = time.time() # Update the scheduled sequence groups with the model outputs. scheduled_seq_groups = scheduler_outputs.scheduled_seq_groups for seq_group, outputs in zip(scheduled_seq_groups, output): @@ -739,6 +740,7 @@ class LLMEngine: # Create the outputs. request_outputs: List[RequestOutput] = [] for seq_group in scheduled_seq_groups: + seq_group.maybe_set_first_token_time(now) request_output = RequestOutput.from_seq_group(seq_group) request_outputs.append(request_output) for seq_group in scheduler_outputs.ignored_seq_groups: @@ -876,11 +878,12 @@ class LLMEngine: # Latency Timings. time_last_iters = [] for seq_group in scheduler_outputs.scheduled_seq_groups: - # Time since last token. (n.b. updates seq_group.last_token_time) + # Time since last token. (n.b. updates seq_group.metrics.last_token_time) time_last_iters.append(seq_group.get_last_latency(now)) # Time since arrival for all finished requests. if seq_group.is_finished(): - time_e2e_requests.append(now - seq_group.arrival_time) + time_e2e_requests.append(now - + seq_group.metrics.arrival_time) time_to_first_tokens = time_last_iters if prompt_run else [] time_per_output_tokens = [] if prompt_run else time_last_iters diff --git a/vllm/outputs.py b/vllm/outputs.py index 534e9d5e..a6de2a5a 100644 --- a/vllm/outputs.py +++ b/vllm/outputs.py @@ -1,7 +1,8 @@ from typing import List, Optional +import time from vllm.sequence import (PromptLogprobs, SampleLogprobs, SequenceGroup, - SequenceStatus) + SequenceStatus, RequestMetrics) from vllm.lora.request import LoRARequest @@ -60,6 +61,7 @@ class RequestOutput: prompt_logprobs: The log probabilities to return per prompt token. outputs: The output sequences of the request. finished: Whether the whole request is finished. + metrics: Metrics associated with the request. lora_request: The LoRA request that was used to generate the output. """ @@ -71,6 +73,7 @@ class RequestOutput: prompt_logprobs: Optional[PromptLogprobs], outputs: List[CompletionOutput], finished: bool, + metrics: Optional[RequestMetrics] = None, lora_request: Optional[LoRARequest] = None, ) -> None: self.request_id = request_id @@ -79,6 +82,7 @@ class RequestOutput: self.prompt_logprobs = prompt_logprobs self.outputs = outputs self.finished = finished + self.metrics = metrics self.lora_request = lora_request @classmethod @@ -115,12 +119,15 @@ class RequestOutput: prompt_token_ids = seq_group.prompt_token_ids prompt_logprobs = seq_group.prompt_logprobs finished = seq_group.is_finished() + finished_time = time.time() if finished else None + seq_group.set_finished_time(finished_time) return cls(seq_group.request_id, prompt, prompt_token_ids, prompt_logprobs, outputs, finished, + seq_group.metrics, lora_request=seq_group.lora_request) def __repr__(self) -> str: @@ -130,4 +137,5 @@ class RequestOutput: f"prompt_logprobs={self.prompt_logprobs}, " f"outputs={self.outputs}, " f"finished={self.finished}, " + f"metrics={self.metrics}, " f"lora_request={self.lora_request})") diff --git a/vllm/sequence.py b/vllm/sequence.py index 9669562c..44adb058 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -1,6 +1,7 @@ """Sequence and its related classes.""" import copy import enum +from dataclasses import dataclass from typing import Dict, List, Optional, Union from vllm.block import LogicalTokenBlock @@ -49,6 +50,25 @@ class SequenceStatus(enum.Enum): return finish_reason +@dataclass +class RequestMetrics: + """Metrics associated with a request. + + Args: + arrival_time: The time when the request arrived. + first_scheduled_time: The time when the request was first scheduled. + first_token_time: The time when the first token was generated. + time_in_queue: The time the request spent in the queue. + finished_time: The time when the request was finished. + """ + arrival_time: float + last_token_time: float + first_scheduled_time: Optional[float] + first_token_time: Optional[float] + time_in_queue: Optional[float] + finished_time: Optional[float] = None + + class SequenceData: """Data associated with a sequence. @@ -252,8 +272,11 @@ class SequenceGroup: self.request_id = request_id self.seqs_dict = {seq.seq_id: seq for seq in seqs} self.sampling_params = sampling_params - self.arrival_time = arrival_time - self.last_token_time = arrival_time + self.metrics = RequestMetrics(arrival_time=arrival_time, + last_token_time=arrival_time, + first_scheduled_time=None, + first_token_time=None, + time_in_queue=None) self.lora_request = lora_request self.prefix: Optional[Prefix] = prefix self.prompt_logprobs: Optional[PromptLogprobs] = None @@ -276,10 +299,25 @@ class SequenceGroup: def get_last_latency(self, now: float) -> float: """Gets last token latency for Request level timings.""" - latency = now - self.last_token_time - self.last_token_time = now + latency = now - self.metrics.last_token_time + self.metrics.last_token_time = now return latency + def maybe_set_first_token_time(self, time: float) -> None: + """Sets the first token time for Request level timings.""" + if self.metrics.first_token_time is None: + self.metrics.first_token_time = time + + def maybe_set_first_scheduled_time(self, time: float) -> None: + """Sets the first scheduled time and time in queue for Request level timings.""" + if self.metrics.first_scheduled_time is None: + self.metrics.first_scheduled_time = time + self.metrics.time_in_queue = time - self.metrics.arrival_time + + def set_finished_time(self, time: Optional[float]) -> None: + """Sets the finished time for Request level timings.""" + self.metrics.finished_time = time + def get_max_num_running_seqs(self) -> int: """The maximum number of sequences running in parallel in the remaining lifetime of the request."""