
    &`i                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlm Z m!Z!m"Z" d d	l#m$Z$ d d
l%m&Z&m'Z'm(Z(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1m2Z2 d dl3m4Z4  ej5        e*          Z6e7e8gZ9e G d d                      Z:e G d d                      Z;e G d d                      Z<deee                  deee         eeef         f         fdZ= G d d          Z> G d d          Z?d Z@d ZAdeBddfdZCd eeegeBf                  ddfd!ZD ed"d#$          ZE ed%          ZF ed&          ZG G d' d(eeeEeFeGf                   ZH G d) d*eeeEeFeGf                   ZIed+eeeF         geeG         f         deeFgeGf         fd,            ZJed-eeeF         geeeeeG         f         f         deeFgeeeeGf         f         fd.            ZJed/eHeEeFeGf         deeEeFgeGf         fd0            ZJed1eIeEeFeGf         deeEeFgeeeeGf         f         fd2            ZJe	 	 	 	 	 dEd6eBd7eKdeBd eeegeBf                  d8ed         dd9fd:            ZJ G d; d9e          ZL e4d<=          	 	 	 	 	 dEd6eBd7eKdeBd eeegeBf                  d>ee         defd?            ZJd@e jM        dAefdBZNd@e jM        dCefdDZOdS )F    N)deque)	dataclass)wraps)isasyncgenfunctioniscoroutinefunction)AnyAsyncGeneratorCallable	CoroutineDictGenericIterableListLiteralOptionalProtocolSetTupleTypeVaroverload)serve)extract_signatureflatten_argsrecover_args)get_or_create_event_loop)BATCH_EXECUTION_TIME_BUCKETS_MSBATCH_SIZE_BUCKETS!BATCH_UTILIZATION_BUCKETS_PERCENTBATCH_WAIT_TIME_BUCKETS_MSSERVE_LOGGER_NAME)extract_self_if_method_call)RayServeException)CounterGauge	Histogram)	PublicAPIc                   b    e Zd ZU eed<   ee         ed<   ej        ed<   ej	        j
        ed<   dS )_SingleRequestself_argflattened_argsfuturerequest_contextN)__name__
__module____qualname__r   __annotations__r   asyncioFuturer   context_RequestContext     f/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/batching.pyr(   r(   2   sJ         MMMIN]222222r6   r(   c                   .    e Zd ZU eed<   ej        ed<   dS )_GeneratorResultresultnext_futureN)r-   r.   r/   r   r0   r1   r2   r5   r6   r7   r9   r9   :   s)         KKKr6   r9   c                       e Zd ZU ee         ed<   edee         fd            Zedee         fd            Z	edee         fd            Z
edefd            ZdS )_RuntimeSummaryStatisticsstart_timesreturnc                 <    | j         rt          | j                   nd S N)r>   minselfs    r7   min_start_timez(_RuntimeSummaryStatistics.min_start_timeD        (,(8Bs4#$$$dBr6   c                 f    | j         r)t          | j                   t          | j                   z  nd S rA   )r>   sumlenrC   s    r7   mean_start_timez)_RuntimeSummaryStatistics.mean_start_timeH   s6     >B=MWC !!C(8$9$999SW	
r6   c                 <    | j         rt          | j                   nd S rA   )r>   maxrC   s    r7   max_start_timez(_RuntimeSummaryStatistics.max_start_timeN   rF   r6   c                 *    t          | j                  S rA   )rI   r>   rC   s    r7   num_requestsz&_RuntimeSummaryStatistics.num_requestsR   s    4#$$$r6   N)r-   r.   r/   r   floatr0   propertyr   rE   rJ   rM   intrO   r5   r6   r7   r=   r=   @   s         eC C C C XC 
% 
 
 
 X

 C C C C XC %c % % % X% % %r6   r=   list_of_flattened_argsr?   c                 \   d | D             }t          |          dk    s
J d            |                                }g }t          |          D ]Ndz  dk    r"|                    | d                             -|                    fd| D                        Ot	          |          S )z@Batch a list of flatten args and returns regular args and kwargsc                 ,    h | ]}t          |          S r5   )rI   ).0argss     r7   	<setcomp>z%_batch_args_kwargs.<locals>.<setcomp>^   s    @@@3t99@@@r6      z=All batch requests should have the same number of parameters.   r   c                      g | ]
}|         S r5   r5   )rV   itemidxs     r7   
<listcomp>z&_batch_args_kwargs.<locals>.<listcomp>j   s    >>>tc>>>r6   )rI   poprangeappendr   )rS   arg_lengths
arg_lengthbatched_flattened_argsr]   s       @r7   _batch_args_kwargsre   W   s     A@)?@@@KKAF 	""JZ    7a<<"))*@*CC*HIIII"))>>>>'=>>>    .///r6   c                      e Zd Z	 	 d#dedededee         deeegef                  ddfdZd	 Z	d
eddfdZ
deeej        f         ddfdZdee         defdZdeee         ef         fdZdee         deddfdZdedeej                 deddfdZdej        deej                 defdZdeddfdZdedee         deddfdZdej        ddfdZed ee         ddfd!            Zd" ZdS )$_BatchQueueNmax_batch_sizebatch_wait_timeout_smax_concurrent_batcheshandle_batch_funcbatch_size_fnr?   c                 ,   t          j                    | _        || _        || _        || _        || _        t          j        |          | _        t          j	                    | _
        t                      | _        i | _        t          ddt          d          | _        t          ddt"          d          | _        t'          ddd	          | _        t          d
dt*          d          | _        t          ddt.          d          | _        t3          ddd	          | _        ||j        nd| _        d| _        t=                      | _        |2| j                             | !                    |                    | _        | "                                 dS )a  Async queue that accepts individual items and returns batches.

        Respects max_batch_size and batch_wait_timeout_s; a batch will be returned when
        max_batch_size elements are available or the timeout has passed since
        the previous get.

        If handle_batch_func is passed in, a background coroutine will run to
        poll from the queue and call handle_batch_func on the results.

        Cannot be pickled.

        Arguments:
            max_batch_size: max number of elements to return in a batch.
            batch_wait_timeout_s: time to wait before returning an incomplete
                batch.
            max_concurrent_batches: max number of batches to run concurrently.
            handle_batch_func(Optional[Callable]): callback to run in the
                background to handle batches if provided.
            batch_size_fn(Optional[Callable[[List], int]]): optional function to
                compute the effective batch size. If None, uses len(batch).
                The function takes a list of requests and returns an integer
                representing the batch size. This is useful for batching based
                on custom metrics such as total nodes in graphs, total tokens
                in sequences, etc.
        serve_batch_wait_time_msz9Time requests waited for batch to fill (in milliseconds).)function_name)description
boundariestag_keysserve_batch_execution_time_msz5Time to execute the batch function (in milliseconds).serve_batch_queue_lengthz.Number of requests waiting in the batch queue.)rp   rr   serve_batch_utilization_percentzKBatch utilization as percentage (actual_batch_size / max_batch_size * 100).serve_actual_batch_sizez,The actual number of requests in each batch.serve_batches_processedzCounter of batches executed.Nunknown)#r1   Queuequeuerh   ri   rj   rl   	Semaphore	semaphoreEventrequests_available_eventsettaskscurr_iteration_start_timesr%   r   _batch_wait_time_histogramr   _batch_execution_time_histogramr$   _batch_queue_length_gauger   _batch_utilization_histogramr   _batch_size_histogramr#   _batches_processed_counterr-   _function_name_handle_batch_taskr   _loopcreate_task_process_batches4_warn_if_max_batch_size_exceeds_max_ongoing_requestsrD   rh   ri   rj   rk   rl   s         r7   __init__z_BatchQueue.__init__q   s   B 5<MOO
,$8!&<#* *+ABB(/%(+
 FH' +4&S1'	+
 +
 +
' 09+O6'	0
 0
 0
, */&H'*
 *
 *
&
 -6-e8'	-
 -
 -
) &/%F)'	&
 &
 &
" +2%6'+
 +
 +
' +<*G&&Y 	 #'-//
(&*j&<&<%%&788' 'D# 	AACCCCCr6   c           	          t          j                    j        j        }|| j        | j        z  k     r0t                              d| j         d| j         d| d           dS dS )zHelper to check whether the max_batch_size is bounded.

        Log a warning to configure `max_ongoing_requests` if it's bounded.
        z`max_batch_size` (z) * `max_concurrent_batches` (z)) is larger than `max_ongoing_requests` (z). This means the replica will never achieve the configured `max_batch_size` concurrently. Please update `max_ongoing_requests` to be >= `max_batch_size` * `max_concurrent_batches`.N)r   get_replica_context_deployment_configmax_ongoing_requestsrh   rj   loggerwarning)rD   r   s     r7   r   z@_BatchQueue._warn_if_max_batch_size_exceeds_max_ongoing_requests   s     %'':O 	  $"58S"SSSNN_T%8 _ _/_ _(_ _ _     TSr6   new_max_batch_sizec                 <    || _         |                                  dS zUpdates queue's max_batch_size.N)rh   r   rD   r   s     r7   set_max_batch_sizez_BatchQueue.set_max_batch_size   s"    0AACCCCCr6   requestc                 l    | j                             |           | j                                         d S rA   )rz   
put_nowaitr~   r   )rD   r   s     r7   putz_BatchQueue.put   s3    
g&&&%))+++++r6   batchc                     | j         t          |          S g }|D ]4}t          |j                  \  }}|                    |d                    5|                      |          S )z>Compute the effective batch size using batch_size_fn or len().Nr   )rl   rI   r   r*   ra   )rD   r   itemsr   rW   kwargss         r7   _compute_batch_sizez_BatchQueue._compute_batch_size   sp    %u::  	" 	"G'(>??LD& LLa!!!!!!%(((r6   c                   K   g }| j                                          d{V }| j        }| j        }| j        I|                     |g          }||k    r-t          d          }|j                            |           g dfS |	                    |           t          j
                    }	 | j                            | j                                         d| j        i           t          |t          j
                    |z
  z
  d          }	 t!          j        | j                                        |           d{V  n# t           j        $ r Y nw xY w| j        d}	| j                                         sy| j                                         }
|	                    |
           |                     |          }||k    r|                                 |
}	n| j                                         y|	0| j                             |	           |                     |          }nnt3          |          |k     rq| j                                         sX|	                    | j                                                    t3          |          |k     r| j                                         X| j                                         r| j                                         |                     |          }t          j
                    |z
  |k    s||k    rnft          j
                    |z
  dz  }| j                            |d| j        i           ||fS )a  Wait for batch respecting self.max_batch_size and self.timeout_s.

        Returns a tuple of (batch, computed_batch_size) where batch contains
        up to self.max_batch_size items. Waits for up to self.timeout_s after
        receiving the first request that will be in the next batch. After the
        timeout, returns as many items as are ready.

        Always returns a batch with at least one item - will block
        indefinitely until an item comes in.
        NzSize of item is greater than max_batch_size. Please increase the max_batch_size or check the implementation of the batch_size_fn.r   Tro   tags  )rz   getrh   ri   rl   r   RuntimeErrorr+   set_exceptionra   timer   r   qsizer   rL   r1   wait_forr~   waitTimeoutErrorempty
get_nowaitr_   r   rI   clearr   observe)rD   r   
first_itemrh   ri   first_item_sizeexcbatch_start_timeremaining_batch_time_sdeferred_item	next_itemnew_sizecurrent_batch_sizebatch_wait_time_mss                 r7   wait_for_batchz_BatchQueue.wait_for_batch   s      :>>++++++++
 ,#8 )"66
|DDO//";  !//4441uZ     9;;A	*..
  ""/4;N)O /    &)$	6F(FG& &"&16688:P          '    !- !%***,, 
 $
 5 5 7 7ILL+++#77>>H.00		(1 ***,, 
 !, J))-888 *.)A)A%)H)H& - %jj>11$*:J:J:L:L1LL!6!6!8!8999 %jj>11$*:J:J:L:L1 z!! 6-33555!%!9!9%!@!@	..2FFF%77CA	H #ikk,<<D'//ot7J%K 	0 	
 	
 	
 (((s   2E   EEresultsinput_batch_lengthc                 r    t          |          |k    r#t          d| dt          |           d          d S )NzHBatched function doesn't preserve batch size. The input list has length z" but the returned list has length .)rI   r"   )rD   r   r   s      r7   _validate_resultsz_BatchQueue._validate_resultsh  sZ     w<<---#<-?< <,/LL< < <   .-r6   func_generatorinitial_futuresc           	        K   d}	 t          |          }t          |          |k    sJ |2 3 d{V }|                     ||           t          |          D ]}||         |d         }	}|	|u r|                    |           n|t
          v r+t          |	t                     |                    |           nSt                      	                                }
t          |	t          ||
                     |                    |
           |                                 Ȍ6 |D ]}	|	|urt          |	t                     dS # t          $ r$}|D ]}	|	|urt          |	|           Y d}~dS d}~ww xY w)zConsumes batch function generator.

        This function only runs if the function decorated with @serve.batch
        is a generator.
        Nr   )r   rI   r   r`   ra   USER_CODE_STREAMING_SENTINELS_set_exception_if_not_doneStopAsyncIterationr   create_future_set_result_if_not_doner9   popleft	Exception)rD   r   r   r   FINISHED_TOKENfuturesr   r]   r:   r+   r;   es               r7   _consume_func_generatorz#_BatchQueue._consume_func_generatorr  s      #	:O,,Gw<<#55555!/ & & & & & & &g&&w0BCCC !344 & &C%,S\71:FF//~6666#@@@ 36;MNNN~6666&>&@&@&N&N&P&P/"$4V[$I$I    {333
 OO%%%%+& "02 " K K//.v7IJJJK K  	: 	: 	:! : ://.vq999: : : : : :	:s"   &E D!DE 
E0E++E0func_futurer   c                    K   	 | d{V }|                      ||           t          ||          D ]\  }}t          ||           dS # t          $ r }|D ]}t	          ||           Y d}~dS d}~ww xY w)z.Assigns func's results to the list of futures.N)r   zipr   r   r   )rD   r   r   r   r   r:   r+   r   s           r7   _assign_func_resultsz _BatchQueue._assign_func_results  s      	6'''''''G""7,>???"%gw"7"7 8 8'77778 8 	6 	6 	6! 6 6*6155556 6 6 6 6 6	6s   AA
 

A4A//A4funcc                   K   t           j                                         | j                                        s|                                  d{V \  }}|                     |||          }t          j        |          }| j	        
                    |           t          j                    | j        |<   |                    | j                   | j                                        dS dS )z6Loops infinitely and processes queued request batches.N)r   r3   _unset_request_contextr   	is_closedr   _process_batchr1   r   r   addr   r   add_done_callback_handle_completed_task)rD   r   r   computed_batch_sizepromisetasks         r7   r   z_BatchQueue._process_batches  s       	,,...*&&(( 	@/3/B/B/D/D)D)D)D)D)D)D&E&))$7JKKG&w//DJNN4   48IKKD+D1""4#>??? *&&(( 	@ 	@ 	@ 	@ 	@r6   r   c                   K   | j         4 d{V  t          |          }d |D             }t          |          dk    r	 ddd          d{V  dS t          |          |k    r|                     |          }|| j        z  dz  }| j                            |d| j        i           | j                            |d| j        i           | j        	                    d| j        i           d |D             }t          j
                    }	 |d         j        }t          d |D                       \  }	}
| ||g|	R i |
}n ||	i |
}t          j                            d	 |D                        t!          |          r-|}|                     ||t          |                     d{V  n,|}|                     ||t          |                     d{V  t          j                            g            nF# t&          $ r9}t(                              d
           |D ]}t-          ||           Y d}~nd}~ww xY wt          j
                    |z
  dz  }| j                            |d| j        i           nA# t          j
                    |z
  dz  }| j                            |d| j        i           w xY w	 ddd          d{V  dS # 1 d{V swxY w Y   dS )zProcesses queued request batch.Nc                 D    g | ]}|j                                         |S r5   )r+   	cancelledrV   reqs     r7   r^   z._BatchQueue._process_batch.<locals>.<listcomp>  s+    HHHS1E1E1G1GHSHHHr6   r   d   ro   r   c                     g | ]	}|j         
S r5   )r+   rV   r\   s     r7   r^   z._BatchQueue._process_batch.<locals>.<listcomp>  s    555tt{555r6   c                     g | ]	}|j         
S r5   )r*   r   s     r7   r^   z._BatchQueue._process_batch.<locals>.<listcomp>  s    ;;;TT(;;;r6   c                     g | ]	}|j         
S r5   )r,   r   s     r7   r^   z._BatchQueue._process_batch.<locals>.<listcomp>  s    :::SS(:::r6   z0_process_batch ran into an unexpected exception.r   )r|   rI   r   rh   r   r   r   r   r   incr   r)   re   r   r3   _set_batch_request_contextr   r   r   r   r   	exceptionr   r   )rD   r   r   r   original_batch_lenbatch_utilization_percentr   batch_execution_start_timer)   rW   r   func_future_or_generatorr   r   r   r+   batch_execution_time_mss                    r7   r   z_BatchQueue._process_batch  s     
 > P	 P	 P	 P	 P	 P	 P	 P	 "%UHHEHHHE5zzQP	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 5zz///&*&>&>u&E&E# $d&99)% -55)$BU0V 6   
 &..#?D<O*P /   
 +//%t':; 0    65u555G
 *.&) 8,1;;U;;;   f
 '/3tH/Nt/N/N/Nv/N/N,, 04tT/DV/D/D, 88::E:::   &d++ V%=N66&U          #;K33K#e**UUUUUUUUU 88<<<< : : :  !STTT% : :F.vq9999: : : : :: IKK"<<+' 4<<+?DDW2X =     IKK"<<+' 4<<+?DDW2X =     ]P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	 P	sU   0KB<KC0H J
I/H?:J?IJ=K>KK
K#&K#r   c                     | j                             |           | j        |= |                     |                                           d S rA   )r   remover   _log_if_exceptionr   )rD   r   s     r7   r   z"_BatchQueue._handle_completed_task  sG    
$+D1t~~//00000r6   exception_maybec                     | Rt          | t          j                  rt                              d           d S t                              d           d S d S )NzTask was cancelledzTask failed unexpectedly)
isinstancer1   CancelledErrorr   debugr   )r   s    r7   r   z_BatchQueue._log_if_exception  sY    &/7+ABB =122222  !;<<<<<	 '&r6   c                     | j          t                                                      sd S | j                                          d S rA   )r   r   
is_runningcancelrC   s    r7   __del__z_BatchQueue.__del__'  sI    #++--88:: , F
 	&&(((((r6   )NN) r-   r.   r/   rR   rP   r   r
   r   r   r   r   r   r(   r1   r2   r   r   r   r   r   r   r	   r   r   r   r   Taskr   staticmethodBaseExceptionr   r   r5   r6   r7   rg   rg   p   s        159=[D [D[D $[D !$	[D
 $H-[D  $ 56[D 
[D [D [D [Dz  "DS DT D D D D
,5!?@ ,T , , , ,)n)= )# ) ) ) )"m)eD,@#,E&F m) m) m) m)^}:=	   1:&1: gn-1:  	1:
 
1: 1: 1: 1:f6^6 gn%6  	6 6 6 6"@8 @ @ @ @ @UU%).%9UPSU	U U U Un17< 1D 1 1 1 1
 =8M+B =t = = = \=
) 
) 
) 
) 
)r6   rg   c                       e Zd ZdZ	 	 	 	 	 ddededed	ee         d
eeegef                  f
dZ	e
defd            ZdeddfdZdeddfdZdefdZdefdZdefdZdefdZdee         fdZdS )_LazyBatchQueueWrapperzStores a _BatchQueue and updates its settings.

    _BatchQueue cannot be pickled, you must construct it lazily
    at runtime inside a replica. This class initializes a queue only upon
    first access.
    
           rY   Nrh   ri   rj   rk   rl   c                 Z    d | _         || _        || _        || _        || _        || _        d S rA   )_queuerh   ri   rj   rk   rl   r   s         r7   r   z_LazyBatchQueueWrapper.__init__<  s9     .2,$8!&<#!2*r6   r?   c                     | j         1t          | j        | j        | j        | j        | j                  | _         | j         S )zXReturns _BatchQueue.

        Initializes queue when called for the first time.
        )r  rg   rh   ri   rj   rk   rl   rC   s    r7   rz   z_LazyBatchQueueWrapper.queueK  sE     ;%#)+&" DK {r6   r   c                 Z    || _         | j        | j                            |           dS dS r   )rh   r  r   r   s     r7   r   z)_LazyBatchQueueWrapper.set_max_batch_size[  s9     1;"K**+=>>>>> #"r6   new_batch_wait_timeout_sc                 >    || _         | j        || j        _         d S d S rA   )ri   r  )rD   r
  s     r7   set_batch_wait_timeout_sz/_LazyBatchQueueWrapper.set_batch_wait_timeout_sc  s*    $<!;"/GDK,,, #"r6   c                     | j         S rA   rh   rC   s    r7   get_max_batch_sizez)_LazyBatchQueueWrapper.get_max_batch_sizei  s    ""r6   c                     | j         S rA   ri   rC   s    r7   get_batch_wait_timeout_sz/_LazyBatchQueueWrapper.get_batch_wait_timeout_sl  s    ((r6   c                 r    t          t          | j        j                                                            S )z;Gets summary statistics of current iteration's start times.)r=   listrz   r   valuesrC   s    r7   _get_curr_iteration_start_timesz6_LazyBatchQueueWrapper._get_curr_iteration_start_timeso  s0    (6==??@@
 
 	
r6   c                 r   K   t          | j        d          r| j        j                                         S dS )zGets whether default _BatchQueue's background task is alive.

        Returns False if the batch handler doesn't use a default _BatchQueue.
        r   F)hasattrrz   r   donerC   s    r7   _is_batching_task_alivez._LazyBatchQueueWrapper._is_batching_task_aliveu  s<       4:344 	z499;;;;5r6   c                    K   t          | j        d          rGt          j                    }| j        j                            |           |                                S dS )zGets the stack for the default _BatchQueue's background task.

        Returns empty string if the batch handler doesn't use a default _BatchQueue.
        r   )fileN)r  rz   ioStringIOr   print_stackgetvalue)rD   
str_buffers     r7   _get_handling_task_stackz/_LazyBatchQueueWrapper._get_handling_task_stack  s[       4:344 	JJ)55:5FFF&&(((4r6   )r  r  rY   NN)r-   r.   r/   __doc__rR   rP   r   r
   r   r   rQ   rg   rz   r   r  r  r  r=   r  boolr  strr"  r5   r6   r7   r  r  4  s         !&)&'049=+ ++ $+ !$	+
 $H-+  $ 56+ + + + {    X?S ?T ? ? ? ?H H4 H H H H#C # # # #)% ) ) ) )
1J 
 
 
 
	t 	 	 	 	      r6   r  c                     t          | t                    sKt          | t                    r$|                                 rt          |           } nt	          d|            | dk     rt          d|            d S )Nz)max_batch_size must be integer >= 1, got rY   z,max_batch_size must be an integer >= 1, got )r   rR   rP   
is_integer	TypeError
ValueErrorr  s    r7   _validate_max_batch_sizer*    s    nc** ne,, 	1J1J1L1L 	 00NNLNLL   K>KK
 
 	
 r6   c                     t          | t          t          f          st          d|            | dk     rt	          d|            d S )Nz/batch_wait_timeout_s must be a float >= 0, got r   )r   rP   rR   r(  r)  r  s    r7   _validate_batch_wait_timeout_sr,    sj    *UCL99 
T>RTT
 
 	
 aT>RTT
 
 	
  r6   rj   c                 `    t          | t                    r| dk     rt          d|            d S )NrY   z4max_concurrent_batches must be an integer >= 1, got )r   rR   r(  )rj   s    r7    _validate_max_concurrent_batchesr.    sG    ,c22 
6Lq6P6P[CY[[
 
 	
 7Q6Pr6   rl   c                 j    | .t          |           s!t          dt          |                      d S d S )Nz.batch_size_fn must be a callable or None, got )callabler(  type)rl   s    r7   _validate_batch_size_fnr2    sH     -)@)@ RT-=P=PRR
 
 	
 !   r6   SelfTypeT)contravariantTRc                   :    e Zd Zdedee         dee         fdZdS )_SyncBatchingMethodself__SyncBatchingMethod__batchr?   c                    d S rA   r5   )rD   r9  r:  s      r7   __call__z_SyncBatchingMethod.__call__  s    r6   Nr-   r.   r/   r3  r   r5  r6  r<  r5   r6   r7   r8  r8    sE        h a Q      r6   r8  c                   :    e Zd Zdedee         dee         fdZdS )_AsyncBatchingMethodr9  _AsyncBatchingMethod__batchr?   c                
   K   d S rA   r5   )rD   r9  r@  s      r7   r<  z_AsyncBatchingMethod.__call__  s      r6   Nr=  r5   r6   r7   r?  r?    sE        H tAw d1g      r6   r?  
_sync_funcc                    d S rA   r5   )rB  s    r7   r   r     s    Cr6   _async_funcc                    d S rA   r5   )rD  s    r7   r   r     	     Cr6   
_sync_methc                    d S rA   r5   )rG  s    r7   r   r     rF  r6   _async_methc                    d S rA   r5   )rI  s    r7   r   r     rF  r6   r  {Gz?rY   rh   ri   __BatchDecoratorc                    d S rA   r5   )rL  rh   ri   rj   rl   s        r7   r   r     s	     Cr6   c            
          e Zd ZdZedeee         gee         f         deegef         fd            Z	edeee         ge
eeee         f         f         deege
eeef         f         fd            Z	edeeeef         deeegef         fd            Z	ed	eeeef         deeege
eeef         f         fd
            Z	dS )rM  zJDescibes behaviour of decorator produced by calling `batch` with argumentsrB  r?   c                    d S rA   r5   )rD   rB  s     r7   r<  z_BatchDecorator.__call__  s    r6   rD  c                    d S rA   r5   )rD   rD  s     r7   r<  z_BatchDecorator.__call__  	     	r6   rG  c                    d S rA   r5   )rD   rG  s     r7   r<  z_BatchDecorator.__call__  rR  r6   rI  c                    d S rA   r5   )rD   rI  s     r7   r<  z_BatchDecorator.__call__  rR  r6   N)r-   r.   r/   r#  r   r
   r   r5  r6  r<  r   r   r8  r3  r?  r5   r6   r7   rM  rM    sm       TT8T!WItAw,>#? xQRPSUVPVGW    X #T!WIyc479J/K$KL	1#yc1--	.   X
 -h1n=	8Q-"	#   X
 /!Q?	8Q-3Q;!77	8   X  r6   stable)	stability_funcc                F   | <t          |           st          d          t          |           st          d          t                     t	                     t                     t                     fd}t          |           r ||           n|S )a
  Converts a function to asynchronously handle batches.

    The function can be a standalone function or a class method. In both
    cases, the function must be `async def` and take a list of objects as
    its sole argument and return a list of the same length as a result.

    When invoked, the caller passes a single object. These will be batched
    and executed asynchronously once there is a batch of `max_batch_size`
    or `batch_wait_timeout_s` has elapsed, whichever occurs first.

    `max_batch_size` and `batch_wait_timeout_s` can be updated using setter
    methods from the batch_handler (`set_max_batch_size` and
    `set_batch_wait_timeout_s`).

    Example:

    .. code-block:: python

            from ray import serve
            from starlette.requests import Request

            @serve.deployment
            class BatchedDeployment:
                @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
                async def batch_handler(self, requests: List[Request]) -> List[str]:
                    response_batch = []
                    for r in requests:
                        name = (await requests.json())["name"]
                        response_batch.append(f"Hello {name}!")

                    return response_batch

                def update_batch_params(self, max_batch_size, batch_wait_timeout_s):
                    self.batch_handler.set_max_batch_size(max_batch_size)
                    self.batch_handler.set_batch_wait_timeout_s(batch_wait_timeout_s)

                async def __call__(self, request: Request):
                    return await self.batch_handler(request)

            app = BatchedDeployment.bind()

    Arguments:
        max_batch_size: the maximum batch size that will be executed in
            one call to the underlying function.
        batch_wait_timeout_s: the maximum duration to wait for
            `max_batch_size` elements before running the current batch.
        max_concurrent_batches: the maximum number of batches that can be
            executed concurrently. If the number of concurrent batches exceeds
            this limit, the batch handler will wait for a batch to complete
            before sending the next batch to the underlying function.
        batch_size_fn: optional function to compute the effective batch size.
            If provided, this function takes a list of items and returns an
            integer representing the batch size. This is useful for batching
            based on custom metrics such as total nodes in graphs, total tokens
            in sequences, or other domain-specific measures. If None, the batch
            size is computed as len(batch).
    Nz?@serve.batch can only be used to decorate functions or methods.z9Functions decorated with @serve.batch must be 'async def'c                     t          	
           dt          j        dt          fddt          j        f fdt	                     fd            }t	                     fd            }t                     r|}n|}j        |_        j        |_	        j
        |_
        j        |_        j        |_        j        |_        j        |_        |S )Nfirst_futurer?   c                f   K   | }	 	 | d{V }|j         }|j        W V  n# t          $ r Y dS w xY w-)z1Generator that handles generator batch functions.TN)r;   r:   r   )rZ  r+   async_responses      r7   batch_handler_generatorz@batch.<locals>._batch_decorator.<locals>.batch_handler_generatora  sl      
 "F=C||||||N+7F(//////)   EEs   ! 
//c                 P   t          t                    | |          }t          |           }|
|dd          }j        }t	                                                      }t          j                                        }|	                    t          ||||                     |S )NrZ   )r   r   r!   rz   r   r   r   r3   _get_serve_request_contextr   r(   )	rW   r   r*   rD   batch_queuer+   r,   rW  lazy_batch_queue_wrappers	          r7   enqueue_requestz8batch.<locals>._batch_decorator.<locals>.enqueue_requesto  s    #/0A%0H0H$PV#W#WN /tU;;D!/!328K-//==??F#mFFHHOOOt^V_MM   Mr6   c                  2     | |          } |          S rA   r5   )rW   r   rZ  r]  rb  s      r7   generator_batch_wrapperz@batch.<locals>._batch_decorator.<locals>.generator_batch_wrapper  s%    *?488L**<888r6   c                  ,   K    | |           d {V S rA   r5   )rW   r   rb  s     r7   batch_wrapperz6batch.<locals>._batch_decorator.<locals>.batch_wrapper  s-       )v666666666r6   )r  r1   r2   r	   r   r   r  _get_max_batch_sizer  _get_batch_wait_timeout_sr   r  r  r  r"  )rW  rd  rf  wrapperr]  rb  ra  rl   ri   rh   rj   s   `   @@@r7   _batch_decoratorzbatch.<locals>._batch_decoratorX  s^   #9 "$
 $
 	!.		 	 	 		W^ 	 	 	 	 	 	 	" 
u	9 	9 	9 	9 	9 
	9 
u	7 	7 	7 	7 
	7 e$$ 	$-GG#G '?&Q#$= 	) &>%P"$= 	( %D 	/ %< 	' %= 	( r6   )r0  r(  r   r*  r,  r.  r2  )rW  rh   ri   rj   rl   rj  s    ```` r7   r   r     s    H  	Q   #5)) 	YWXXX^,,,"#7888$%;<<<M***M M M M M M M Ml '/uooKE""";KKr6   r+   r:   c                 \    |                                  s|                     |           dS dS )z3Sets the future's result if the future is not done.N)r  
set_result)r+   r:   s     r7   r   r     s8     ;;== "&!!!!!" "r6   r   c                 \    |                                  s|                     |           dS dS )z6Sets the future's exception if the future is not done.N)r  r   )r+   r   s     r7   r   r     s8     ;;== (Y'''''( (r6   )Nr  rK  rY   N)Pr1   r  loggingr   collectionsr   dataclassesr   	functoolsr   inspectr   r   typingr   r	   r
   r   r   r   r   r   r   r   r   r   r   r   r   rayr   ray._common.signaturer   r   r   ray._common.utilsr   ray.serve._private.constantsr   r   r   r   r    ray.serve._private.utilsr!   ray.serve.exceptionsr"   ray.serve.metricsr#   r$   r%   ray.util.annotationsr&   	getLoggerr   StopIterationr   r   r(   r9   r=   re   rg   r  r*  r,  rR   r.  r2  r3  r5  r6  r8  r?  r   rP   rM  r2   r   r   r5   r6   r7   <module>r~     sj    				         ! ! ! ! ! !       ; ; ; ; ; ; ; ;                                 $       O O O O O O O O O O 6 6 6 6 6 6              A @ @ @ @ @ 2 2 2 2 2 2 7 7 7 7 7 7 7 7 7 7 * * * * * *		,	-	-
 "/0B C  3 3 3 3 3 3 3 3                
 % % % % % % % %,0 cO0
5:tCH~%&0 0 0 02A) A) A) A) A) A) A) A)HW W W W W W W Wt
 
 
	
 	
 	

S 
T 
 
 
 

8HdVS[4I+J 
t 
 
 
 
 7:T222GCLLGCLL    (GHaN$;   
    8WXq!^%<   
 
hQy$q'12 (A36:J    
 
47)YsCa/@%AABqc9S#q[))*   
 
#HaN3xmQ   
 
%h1n5xmYsC{334   
 
"&"#59    	
   HdVS[12t}    
    h   4 X $"&"#59gL gL gL  	gL
  gL HdVS[12gLHgL gL gL gL gLT"GN "C " " " "(w~ (# ( ( ( ( ( (r6   