
    &`iB                        d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZmZmZ d dlmZmZ d dlmZmZ d dlmZ d	Z ej        e          Z e G d
 d                      Z! G d d          Z" G d d          Z#	 	 	 ddedee$         dee$         de$dee$         f
dZ%	 ddedede$dee$         fdZ&dee         defdZ'deeef         deeef         fdZ(dS )     N)defaultdict)	dataclass)chain)
	AwaitableCallableDefaultDictDictHashableIterableListOptionalTupleUnion)
TimeSeriesTimeStampedValue)*METRICS_PUSHER_GRACEFUL_SHUTDOWN_TIMEOUT_SSERVE_LOGGER_NAME)AggregationFunctionqueuedc                   D    e Zd ZU eeeg ef         f         ed<   eed<   dS )_MetricsTask	task_func
interval_sN)__name__
__module____qualname__r   r   r   __annotations__float     t/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/_private/metrics_utils.pyr   r   "   s=         XxI667777r    r   c                       e Zd ZdZdddeeegdf                  fdZede	j
        fd            Zd Zd	efd
Zd	edeeeg ef         f         deddfdZd Zd ZdS )MetricsPusherz+Periodically runs registered asyncio tasks.N)async_sleepr$   c                    |pt           j        | _        t                      | _        t                      | _        d | _        d S N)asynciosleep_async_sleepdict_tasks_async_tasks_stop_event)selfr$   s     r!   __init__zMetricsPusher.__init__+   s:    
 (87=/3vv59VV 59r    returnc                 N    | j         t          j                    | _         | j         S r&   )r-   r'   Eventr.   s    r!   
stop_eventzMetricsPusher.stop_event8   s"    #&}Dr    c                 8    | j                                          d S r&   )r4   clearr3   s    r!   startzMetricsPusher.start?   s    r    namec                   K   t          j        | j                                                  }	 |                                rdS 	 | j        |         j        }t          j        |          r |             d{V  n
 |             n7# t          $ r*}t          
                    d| d|            Y d}~nd}~ww xY wt          j        |                     | j        |         j                            }t          j        ||gt           j                   d{V  |                                s|                                 )zPeriodically runs `task_func` every `interval_s` until `stop_event` is set.

        If `task_func` raises an error, an exception will be logged.
        Supports both sync and async task functions.
        TNzFailed to run metrics task 'z': )return_when)r'   create_taskr4   waitdoner+   r   iscoroutinefunction	Exceptionlogger	exceptionr)   r   FIRST_COMPLETEDcancel)r.   r8   wait_for_stop_eventr   e
sleep_tasks         r!   metrics_taskzMetricsPusher.metrics_taskB   s      &1$/2F2F2H2HII	$"'')) N K-7	.y99  #)++%%%%%%%%IKKK N N N  !L!L!L!L!LMMMMMMMMN !,!!$+d"3">?? J ,01#3         
 ??$$ $!!###/	$s   AB 
B< B77B<r   r   c                     t          ||          | j        |<   || j        vs| j        |                                         r1t	          j        |                     |                    | j        |<   dS dS )a  Register a sync or async task under the provided name, or update it.

        This method is idempotent - if a task is already registered with
        the specified name, it will update it with the most recent info.

        Args:
            name: Unique name for the task.
            task_func: Either a sync function or async function (coroutine function).
            interval_s: Interval in seconds between task executions.
        N)r   r+   r,   r=   r'   r;   rG   )r.   r8   r   r   s       r!   register_or_update_taskz%MetricsPusher.register_or_update_taskc   su    " )J??Dt(((D,=d,C,H,H,J,J(&-&9$:K:KD:Q:Q&R&RDd### )(r    c                     | j                                          | j                                         | j                                         d S r&   )r4   setr+   r6   r,   r3   s    r!   
stop_taskszMetricsPusher.stop_tasksx   sE    !!!!!r    c                 8  K   | j                                          | j        rEt          j        t          | j                                                  t                     d{V  | j        	                                 | j        	                                 dS )zkShutdown metrics pusher gracefully.

        This method will ensure idempotency of shutdown call.
        )timeoutN)
r4   rK   r,   r'   r<   listvaluesr   r+   r6   r3   s    r!   graceful_shutdownzMetricsPusher.graceful_shutdown}   s       	 	,T&--//00B         
 	!!!!!r    )r   r   r   __doc__r   r   intr/   propertyr'   r2   r4   r7   strrG   r   r   rI   rL   rQ   r   r    r!   r#   r#   (   s       55
 8<9 9 9 hud{349 9 9 9  GM       X      $s $ $ $ $BSS 8B	M#::;S 	S
 
S S S S*" " "
" " " " "r    r#   c                   N   e Zd ZdZd Zdeeef         defdZdefdZ	ded	ed
e
fdZdee         deee         gef         d
eee         ef         fdZded
ee         fdZdee         d
eee         ef         fdZdee         d
eee         ef         fdZded
efdZdS )InMemoryMetricsStorez-A very simple, in memory time series databasec                 8    t          t                    | _        d S r&   )r   rO   datar3   s    r!   r/   zInMemoryMetricsStore.__init__   s    7B47H7H			r    data_points	timestampc                     |                                 D ]4\  }}t          j        | j        |         t	          ||                     5dS )aN  Push new data points to the store.

        Args:
            data_points: dictionary containing the metrics values. The
              key should uniquely identify this time series
              and to be used to perform aggregation.
            timestamp: the unix epoch timestamp the metrics are
              collected at.
        axN)itemsbisectinsortrY   r   )r.   rZ   r[   r8   values        r!   add_metrics_pointz&InMemoryMetricsStore.add_metrics_point   sZ     ',,.. 	S 	SKD%MDIdO/?	5/Q/QRRRRR	S 	Sr    start_timestamp_sc                     t          | j                                                  D ]P\  }}t          |          dk    s|d         j        |k     r	| j        |= 2|                     ||          | j        |<   QdS )a  Prune keys and compact data that are outdated.

        For keys that haven't had new data recorded after the timestamp,
        remove them from the database.
        For keys that have, compact the datapoints that were recorded
        before the timestamp.
        r   N)rO   rY   r`   lenr[   _get_datapoints)r.   re   key
datapointss       r!   prune_keys_and_compact_dataz0InMemoryMetricsStore.prune_keys_and_compact_data   s      $DIOO$5$566 	N 	NOC:!##z"~'?BS'S'SIcNN!%!5!5c;L!M!M	#		N 	Nr    rj   window_start_timestamp_sr0   c                 z    | j         |         }t          j        |t          |d                    }||d         S )z<Get all data points given key after window_start_timestamp_sr   )r[   rc   r]   N)rY   ra   r   )r.   rj   rm   rk   idxs        r!   ri   z$InMemoryMetricsStore._get_datapoints   sP    
 Ys^
m2!  
 
 
 #$$r    keysaggregate_fnc                      d fd} |            }	 t          |          }n# t          $ r Y dS w xY w |t          |g|                    }|fS )a  Reduce the entire set of timeseries values across the specified keys.

        Args:
            keys: Iterable of keys to aggregate across.
            aggregate_fn: Function to apply across all float values, e.g., sum, max.

        Returns:
            A tuple of (float, int) where the first element is the aggregated value
            and the second element is the number of valid keys used.
            Returns (None, 0) if no valid keys have data.

        Example:
        Suppose the store contains:
        >>> store = InMemoryMetricsStore()
        >>> store.data.update({
        ...     "a": [TimeStampedValue(0, 1.0), TimeStampedValue(1, 2.0)],
        ...     "b": [],
        ...     "c": [TimeStampedValue(0, 10.0)],
        ... })

        Using sum across keys:

        >>> store._aggregate_reduce(keys=["a", "b", "c"], aggregate_fn=sum)
        (13.0, 2)

        Here:
        - The aggregated value is 1.0 + 2.0 + 10.0 = 13.0
        - Only keys "a" and "c" contribute values, so report_count = 2
        r   c               3   x   K   D ]3} j                             | g           }|s dz  |D ]}|j        V  4dS )zPGenerator that yields values from valid keys without storing them all in memory.   NrY   getrc   )rj   seriestimestamp_valuerp   r.   valid_key_counts      r!   _values_generatorzAInMemoryMetricsStore._aggregate_reduce.<locals>._values_generator   sr        0 0sB// 1$'- 0 0O)/////00 0r    )Nr   )nextStopIterationr   )r.   rp   rq   rz   
values_genfirst_valueaggregated_resultry   s   ``     @r!   _aggregate_reducez&InMemoryMetricsStore._aggregate_reduce   s    D 
	0 
	0 
	0 
	0 
	0 
	0 
	0 '&((
	z**KK 	 	 	77	
 )L}j)I)IJJ /11s   ( 
66c                 l    | j                             |d          sdS | j         |         d         j        S )z%Get the latest value for a given key.Nrg   ru   )r.   rj   s     r!   
get_latestzInMemoryMetricsStore.get_latest   s5    
 y}}S$'' 	4y~b!''r    c                 8    |                      |t                    S )a  Sum the entire set of timeseries values across the specified keys.
        Args:
            keys: Iterable of keys to aggregate across.
        Returns:
            A tuple of (float, int) where the first element is the sum across
            all values found at `keys`, and the second is the number of valid
            keys used to compute the sum.
            Returns (None, 0) if no valid keys have data.
        )r   sumr.   rp   s     r!   aggregate_sumz"InMemoryMetricsStore.aggregate_sum  s     %%dC000r    c                 B    |                      |t          j                  S )a  Average the entire set of timeseries values across the specified keys.

        Args:
            keys: Iterable of keys to aggregate across.
        Returns:
            A tuple of (float, int) where the first element is the mean across
            all values found at `keys`, and the second is the number of valid
            keys used to compute the mean.
            Returns (None, 0) if no valid keys have data.
        )r   
statisticsmeanr   s     r!   aggregate_avgz"InMemoryMetricsStore.aggregate_avg  s     %%dJO<<<r    c                 ^    | j                             |g           }|sdS t          |          S )zNCount the number of values across all timeseries values at the specified keys.r   )rY   rv   rh   )r.   rj   rw   s      r!   timeseries_countz%InMemoryMetricsStore.timeseries_count$  s2    
 sB'' 	16{{r    N)r   r   r   rR   r/   r	   r
   r   rd   rl   r   ri   r   r   r   r   rS   r   r   r   r   r   r   r    r!   rW   rW      s       77I I IST(E/-B Su S S S SNU N N N N  7< 	       :2x :2 0%78:2 
x#	$	:2 :2 :2 :2x(( 
%( ( ( (1x 1 
x#	$1 1 1 1=x = 
x#	$= = = =  
     r    rW         ?step_serieswindow_start
window_endlast_window_sr0   c                 l   | sdS || d         j         }|| d         j         |z   }||k    rdS d}d}d}|}| D ]X}|j         |k    r|j        }|j         |k    r n7t          |j         |          }	|	|z
  }
|
dk    r|||
z  z  }||
z  }|j        }|	}Y||k     r||z
  }
|||
z  z  }||
z  }|dk    r||z  ndS )a  
    Compute time-weighted average of a step function over a time interval.

    Args:
        step_series: Step function as list of (timestamp, value) points, sorted by time.
            Values are right-continuous (constant until next change).
        window_start: Start of averaging window (inclusive). If None, uses the start of the series.
        window_end: End of averaging window (exclusive). If None, uses the end of the series.
        last_window_s: when window_end is None, uses the last_window_s to compute the end of the window.
    Returns:
        Time-weighted average over the interval, or None if no data overlaps.
    Nr   rg           )r[   rc   min)r   r   r   r   total_weighted_valuetotal_durationcurrent_valuecurrent_timepointsegment_enddurations              r!   time_weighted_averager   /  s5   $  t "1~/ _.>
\!!tNML  # #?l**!KM?j((E %/:66-a<< MH$<< h&N" j  , 88("4BQ4F4F.00DPr    
timeseriesaggregation_functionc                    |t           j        k    rt          | |          S |t           j        k    r| rt	          d | D                       ndS |t           j        k    r| rt          d | D                       ndS t          d|           )z@Aggregate the values in a timeseries using a specified function.)r   c              3   $   K   | ]}|j         V  d S r&   rc   .0tss     r!   	<genexpr>z'aggregate_timeseries.<locals>.<genexpr>x  $      1128111111r    Nc              3   $   K   | ]}|j         V  d S r&   r   r   s     r!   r   z'aggregate_timeseries.<locals>.<genexpr>z  r   r    zInvalid aggregation function: )r   MEANr   MAXmaxMINr   
ValueError)r   r   r   s      r!   aggregate_timeseriesr   o  s     2777$Z}MMMM	!4!8	8	85?Is11j111111TI	!4!8	8	85?Is11j111111TIP:NPPQQQr    replicas_timeseriesc                    d | D             }|sg S t          |          dk    r|d         S g }dgt          |          z  }t          |          D ]Z\  }}|rSt          |          }	 t          |          }t	          j        ||j        ||j        |f           J# t          $ r Y Vw xY w[g }d}	|rt	          j	        |          \  }
}}}||         }|||<   |	||z
  z  }		 t          |          }t	          j        ||j        ||j        |f           n# t          $ r Y nw xY w||k    rZt          |
d          }|r%|d         j        |k    rt          ||	          |d<   n#|                    t          ||	                     ||S )aJ  
    Merge multiple gauge time series (right-continuous, LOCF) into an
    instantaneous total time series as a step function.

    This approach treats each replica's gauge as right-continuous, last-observation-
    carried-forward (LOCF), which matches gauge semantics. It produces an exact
    instantaneous total across replicas without bias from arbitrary windowing.

    Uses a k-way merge algorithm for O(n log k) complexity where k is the number
    of timeseries and n is the total number of events.

    Timestamps are rounded to 10ms precision (2 decimal places) and datapoints
    with the same rounded timestamp are combined, keeping the most recent value.

    Args:
        replicas_timeseries: List of time series, one per replica. Each time series
            is a list of TimeStampedValue objects sorted by timestamp.

    Returns:
        A list of TimeStampedValue representing the instantaneous total at event times.
        Between events, the total remains constant (step function). Timestamps are
        rounded to 10ms precision and duplicate timestamps are combined.
    c                     g | ]}||S r   r   )r   rw   s     r!   
<listcomp>z-merge_instantaneous_total.<locals>.<listcomp>  s    HHHHVHHHr    rt   r   r      rg   )rh   	enumerateiterr{   heapqheappushr[   rc   r|   heappoproundr   append)r   active_series
merge_heapcurrent_valuesreplica_idxrw   iteratorfirst_pointmergedrunning_totalr[   rc   	old_value
next_pointrounded_timestamps                  r!   merge_instantaneous_totalr     s(   6 IH*=HHHM 	
=QQ JUS///N  )77 
 
V 		F||H"8nn *K9JHU    !   		 FM
 R27-
2K2K/	;x";/	&+{#**	+/>>JN%{J4DhO     	 	 	D	 I %i 3 3  R&*.2CCC-.?OOr

 ./@-PPQQQ=  R@ Ms$   '2B
B'&B' 2D 
D D timeseries_dictsc                      t          t                    }| D ]7}|                                D ] \  }}||                             |           !8d |                                D             S )zU
    Merge multiple time-series dictionaries using instantaneous merge approach.
    c                 4    i | ]\  }}|t          |          S r   )r   )r   rj   ts_lists      r!   
<dictcomp>z*merge_timeseries_dicts.<locals>.<dictcomp>  s'    WWWWC*733WWWr    )r   rO   r`   r   )r   r   ts_dictrj   r   s        r!   merge_timeseries_dictsr     sz     1<D0A0AF# # #}} 	# 	#GC3Kr""""	# XWWWWWr    )NNr   )r   ))r'   ra   r   loggingr   collectionsr   dataclassesr   	itertoolsr   typingr   r   r   r	   r
   r   r   r   r   r   ray.serve._private.commonr   r   ray.serve._private.constantsr   r   ray.serve.configr   QUEUED_REQUESTS_KEY	getLoggerr@   r   r#   rW   r   r   r   r   r   r   r    r!   <module>r      s           # # # # # # ! ! ! ! ! !                              C B B B B B B B        1 0 0 0 0 0 		,	-	-        
c" c" c" c" c" c" c" c"L^ ^ ^ ^ ^ ^ ^ ^F %)"&	=Q =Q=Q5/=Q =Q 	=Q
 e_=Q =Q =Q =QF R RR-R R e_	R R R R Wj)WW W W WtX"8Z#78X:%&X X X X X Xr    