Add metrics to RequestOutput (#2876)
This commit is contained in:
parent
181b27d881
commit
017d9f1515
@ -64,7 +64,7 @@ def test_request_tracker():
|
||||
stream_5 = tracker.add_request("5")
|
||||
assert tracker.new_requests_event.flag
|
||||
tracker.process_request_output(
|
||||
RequestOutput("2", "output", [], [], [], finished=True))
|
||||
RequestOutput("2", "output", [], [], [], bool(finished)))
|
||||
new, finished = tracker.get_new_and_finished_requests()
|
||||
assert not tracker.new_requests_event.flag
|
||||
assert len(finished) == 1
|
||||
|
||||
@ -33,7 +33,7 @@ class FCFS(Policy):
|
||||
now: float,
|
||||
seq_group: SequenceGroup,
|
||||
) -> float:
|
||||
return now - seq_group.arrival_time
|
||||
return now - seq_group.metrics.arrival_time
|
||||
|
||||
|
||||
class PolicyFactory:
|
||||
|
||||
@ -365,10 +365,13 @@ class Scheduler:
|
||||
# This function call changes the internal states of the scheduler
|
||||
# such as self.running, self.swapped, and self.waiting.
|
||||
scheduler_outputs = self._schedule()
|
||||
now = time.time()
|
||||
|
||||
# Create input data structures.
|
||||
seq_group_metadata_list: List[SequenceGroupMetadata] = []
|
||||
for seq_group in scheduler_outputs.scheduled_seq_groups:
|
||||
seq_group.maybe_set_first_scheduled_time(now)
|
||||
|
||||
seq_data: Dict[int, SequenceData] = {}
|
||||
block_tables: Dict[int, List[int]] = {}
|
||||
for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
|
||||
|
||||
@ -728,6 +728,7 @@ class LLMEngine:
|
||||
def _process_model_outputs(
|
||||
self, output: SamplerOutput,
|
||||
scheduler_outputs: SchedulerOutputs) -> List[RequestOutput]:
|
||||
now = time.time()
|
||||
# Update the scheduled sequence groups with the model outputs.
|
||||
scheduled_seq_groups = scheduler_outputs.scheduled_seq_groups
|
||||
for seq_group, outputs in zip(scheduled_seq_groups, output):
|
||||
@ -739,6 +740,7 @@ class LLMEngine:
|
||||
# Create the outputs.
|
||||
request_outputs: List[RequestOutput] = []
|
||||
for seq_group in scheduled_seq_groups:
|
||||
seq_group.maybe_set_first_token_time(now)
|
||||
request_output = RequestOutput.from_seq_group(seq_group)
|
||||
request_outputs.append(request_output)
|
||||
for seq_group in scheduler_outputs.ignored_seq_groups:
|
||||
@ -876,11 +878,12 @@ class LLMEngine:
|
||||
# Latency Timings.
|
||||
time_last_iters = []
|
||||
for seq_group in scheduler_outputs.scheduled_seq_groups:
|
||||
# Time since last token. (n.b. updates seq_group.last_token_time)
|
||||
# Time since last token. (n.b. updates seq_group.metrics.last_token_time)
|
||||
time_last_iters.append(seq_group.get_last_latency(now))
|
||||
# Time since arrival for all finished requests.
|
||||
if seq_group.is_finished():
|
||||
time_e2e_requests.append(now - seq_group.arrival_time)
|
||||
time_e2e_requests.append(now -
|
||||
seq_group.metrics.arrival_time)
|
||||
|
||||
time_to_first_tokens = time_last_iters if prompt_run else []
|
||||
time_per_output_tokens = [] if prompt_run else time_last_iters
|
||||
|
||||
@ -1,7 +1,8 @@
|
||||
from typing import List, Optional
|
||||
import time
|
||||
|
||||
from vllm.sequence import (PromptLogprobs, SampleLogprobs, SequenceGroup,
|
||||
SequenceStatus)
|
||||
SequenceStatus, RequestMetrics)
|
||||
from vllm.lora.request import LoRARequest
|
||||
|
||||
|
||||
@ -60,6 +61,7 @@ class RequestOutput:
|
||||
prompt_logprobs: The log probabilities to return per prompt token.
|
||||
outputs: The output sequences of the request.
|
||||
finished: Whether the whole request is finished.
|
||||
metrics: Metrics associated with the request.
|
||||
lora_request: The LoRA request that was used to generate the output.
|
||||
"""
|
||||
|
||||
@ -71,6 +73,7 @@ class RequestOutput:
|
||||
prompt_logprobs: Optional[PromptLogprobs],
|
||||
outputs: List[CompletionOutput],
|
||||
finished: bool,
|
||||
metrics: Optional[RequestMetrics] = None,
|
||||
lora_request: Optional[LoRARequest] = None,
|
||||
) -> None:
|
||||
self.request_id = request_id
|
||||
@ -79,6 +82,7 @@ class RequestOutput:
|
||||
self.prompt_logprobs = prompt_logprobs
|
||||
self.outputs = outputs
|
||||
self.finished = finished
|
||||
self.metrics = metrics
|
||||
self.lora_request = lora_request
|
||||
|
||||
@classmethod
|
||||
@ -115,12 +119,15 @@ class RequestOutput:
|
||||
prompt_token_ids = seq_group.prompt_token_ids
|
||||
prompt_logprobs = seq_group.prompt_logprobs
|
||||
finished = seq_group.is_finished()
|
||||
finished_time = time.time() if finished else None
|
||||
seq_group.set_finished_time(finished_time)
|
||||
return cls(seq_group.request_id,
|
||||
prompt,
|
||||
prompt_token_ids,
|
||||
prompt_logprobs,
|
||||
outputs,
|
||||
finished,
|
||||
seq_group.metrics,
|
||||
lora_request=seq_group.lora_request)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
@ -130,4 +137,5 @@ class RequestOutput:
|
||||
f"prompt_logprobs={self.prompt_logprobs}, "
|
||||
f"outputs={self.outputs}, "
|
||||
f"finished={self.finished}, "
|
||||
f"metrics={self.metrics}, "
|
||||
f"lora_request={self.lora_request})")
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""Sequence and its related classes."""
|
||||
import copy
|
||||
import enum
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from vllm.block import LogicalTokenBlock
|
||||
@ -49,6 +50,25 @@ class SequenceStatus(enum.Enum):
|
||||
return finish_reason
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestMetrics:
|
||||
"""Metrics associated with a request.
|
||||
|
||||
Args:
|
||||
arrival_time: The time when the request arrived.
|
||||
first_scheduled_time: The time when the request was first scheduled.
|
||||
first_token_time: The time when the first token was generated.
|
||||
time_in_queue: The time the request spent in the queue.
|
||||
finished_time: The time when the request was finished.
|
||||
"""
|
||||
arrival_time: float
|
||||
last_token_time: float
|
||||
first_scheduled_time: Optional[float]
|
||||
first_token_time: Optional[float]
|
||||
time_in_queue: Optional[float]
|
||||
finished_time: Optional[float] = None
|
||||
|
||||
|
||||
class SequenceData:
|
||||
"""Data associated with a sequence.
|
||||
|
||||
@ -252,8 +272,11 @@ class SequenceGroup:
|
||||
self.request_id = request_id
|
||||
self.seqs_dict = {seq.seq_id: seq for seq in seqs}
|
||||
self.sampling_params = sampling_params
|
||||
self.arrival_time = arrival_time
|
||||
self.last_token_time = arrival_time
|
||||
self.metrics = RequestMetrics(arrival_time=arrival_time,
|
||||
last_token_time=arrival_time,
|
||||
first_scheduled_time=None,
|
||||
first_token_time=None,
|
||||
time_in_queue=None)
|
||||
self.lora_request = lora_request
|
||||
self.prefix: Optional[Prefix] = prefix
|
||||
self.prompt_logprobs: Optional[PromptLogprobs] = None
|
||||
@ -276,10 +299,25 @@ class SequenceGroup:
|
||||
|
||||
def get_last_latency(self, now: float) -> float:
|
||||
"""Gets last token latency for Request level timings."""
|
||||
latency = now - self.last_token_time
|
||||
self.last_token_time = now
|
||||
latency = now - self.metrics.last_token_time
|
||||
self.metrics.last_token_time = now
|
||||
return latency
|
||||
|
||||
def maybe_set_first_token_time(self, time: float) -> None:
|
||||
"""Sets the first token time for Request level timings."""
|
||||
if self.metrics.first_token_time is None:
|
||||
self.metrics.first_token_time = time
|
||||
|
||||
def maybe_set_first_scheduled_time(self, time: float) -> None:
|
||||
"""Sets the first scheduled time and time in queue for Request level timings."""
|
||||
if self.metrics.first_scheduled_time is None:
|
||||
self.metrics.first_scheduled_time = time
|
||||
self.metrics.time_in_queue = time - self.metrics.arrival_time
|
||||
|
||||
def set_finished_time(self, time: Optional[float]) -> None:
|
||||
"""Sets the finished time for Request level timings."""
|
||||
self.metrics.finished_time = time
|
||||
|
||||
def get_max_num_running_seqs(self) -> int:
|
||||
"""The maximum number of sequences running in parallel in the remaining
|
||||
lifetime of the request."""
|
||||
|
||||
Loading…
Reference in New Issue
Block a user