
    &`i                        d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZmZmZmZmZmZ d dlZd d	l m!Z! d d
l"m#Z#m$Z$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5m6Z6 d dl7m8Z8m9Z9 d dl:m;Z;m<Z<m=Z=m>Z> d dl?m@Z@ d dlAmBZBmCZC d dlDmEZE d dlFmGZG d dlHmIZI d dlJmKZKmLZL d dlMmNZN d dlOmPZPmQZQ d dlRmSZS  ejT        e6          ZU G d d          ZV G d de          ZWde jX        fdZY G d d           ZZ G d! d"eW          Z[ G d# d$          Z\ G d% d&eW          Z]dS )'    N)ABCabstractmethod)AbstractEventLoopensure_futurefutures)defaultdict)MutableMapping)contextmanager)	lru_cachepartial)AnyCallable	CoroutineDefaultDictDictListOptionalUnion)ActorHandle)ActorDiedErrorActorUnavailableErrorRayError)RUNNING_REQUESTS_KEYDeploymentHandleSourceDeploymentIDDeploymentTargetInfoHandleMetricReport	ReplicaIDRequestMetadataRunningReplicaInfo)DeploymentConfig)/RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE5RAY_SERVE_HANDLE_AUTOSCALING_METRIC_RECORD_INTERVAL_S$RAY_SERVE_METRICS_EXPORT_INTERVAL_MS'RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTINGSERVE_LOGGER_NAME)LongPollClientLongPollNamespace)QUEUED_REQUESTS_KEYInMemoryMetricsStoreMetricsPusherTimeStampedValue)ReplicaResult)PendingRequestRequestRouter)PowerOfTwoChoicesRequestRouterRunningReplica)ServeUsageTag)generate_request_idresolve_deployment_response)AutoscalingConfig)BackPressureErrorDeploymentUnavailableError)metricsc                   d   e Zd ZdZdZdZdedededede	d	e
j        d
e
j        de
j        dej        fdZedefd            Zededefd            Zdee         fdZedee         fd            ZdedefdZd Zd Z defdZ!d Z"d Z#d e$fd!Z%d e$fd"Z&dedefd#Z'd$ Z(d% Z)de*fd&Z+d' Z,d(S ))RouterMetricsManagerzManages metrics for the router.push_metrics_to_controllerrecord_metricsdeployment_id	handle_idself_actor_idhandle_sourcecontroller_handlerouter_requests_counterqueued_requests_gaugerunning_requests_gauge
event_loopc
                 p   	 | _         | _        | _        | _        | _        | _         j                            |j        |j         j          j        d           d _	        | _
         j
                            |j        |j         j          j        d            j
                            d           t          t                     _        | _         j                            |j        |j         j          j        d           t!          j                     _        t'                       _        t+                       _        d  _        d _        t2          dk     _        t2          dz   _         j        r6t          t                     _        	 fd}
	                    |
           d S d S )N
deploymentapplicationhandleactor_idr   Fi  c                  V                                                                     d S N)create_task_report_cached_metrics_forever)rF   selfs   m/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/_private/router.pycreate_metrics_taskz:RouterMetricsManager.__init__.<locals>.create_metrics_task   s)    &&t'J'J'L'LMMMMM    )
_handle_id_deployment_id_self_actor_id_handle_source_controller_handlenum_router_requestsset_default_tagsnameapp_namenum_queued_requestsnum_queued_requests_gaugesetr   intnum_requests_sent_to_replicasnum_running_requests_gauge	threadingLock_queries_lockr+   metrics_pusherr*   metrics_store_deployment_config	_shutdownr$   _cached_metrics_enabled_cached_metrics_interval_s_cached_num_router_requestscall_soon_threadsafe)rQ   r>   r?   r@   rA   rB   rC   rD   rE   rF   rS   s   `        ` rR   __init__zRouterMetricsManager.__init__L   s    $+++"3 $;  11+0,5/ /	 	
 	
 	
 $% )>&&77+0,5/ /	 	
 	
 	
 	&**1--- KVK
 K
* +A''88+0,5/ /	 	
 	
 	
 '^--+oo133 ?C$ (Lq'P$*NQU*U'' 		A/:3/?/?D,N N N N N N ++,?@@@@@		A 		ArT   request_metac              #      K   | j         | j         j        nd}|dk    rB| j        |k    r7t          | j        |          }t                              |j                   ||                     |j                   d V  d S )N)r^   max_queued_requests)	ri   rs   r^   r7   loggerwarningmessageinc_num_total_requestsroute)rQ   rp   rs   es       rR   wrap_request_assignmentz,RouterMetricsManager.wrap_request_assignment   s       &2 #77 	  2%%(,???$ "$($<$7  A NN19%%%G##L$6777rT   is_retrynum_curr_replicasc              #      K   	 |                                   |s*|                     |          r|                                  dV  |                                  dS # |                                  w xY w)zQIncrement queued requests gauge and maybe push autoscaling metrics to controller.curr_num_replicasN)inc_num_queued_requests)should_send_scaled_to_zero_optimized_push&push_autoscaling_metrics_to_controllerdec_num_queued_requests)rQ   r{   r|   s      rR   wrap_queued_requestz(RouterMetricsManager.wrap_queued_request   s      	+((***  > N N"3 !O ! ! > ;;===EEE ((*****D((****s   AA A4running_replicasc                      d |D              j         5  t          t           fd j                                        D                        _        ddd           dS # 1 swxY w Y   dS )zPrune list of replica ids in self.num_queries_sent_to_replicas.

        We want to avoid self.num_queries_sent_to_replicas from growing
        in memory as the deployment upscales and downscales over time.
        c                     h | ]	}|j         
S  )
replica_id).0replicas     rR   	<setcomp>z@RouterMetricsManager._update_running_replicas.<locals>.<setcomp>   s    RRRgw1RRRrT   c                 >    i | ]\  }}|s|v |j         |         S r   )rb   )r   idnum_queriesrunning_replica_setrQ   s      rR   
<dictcomp>zARouterMetricsManager._update_running_replicas.<locals>.<dictcomp>   sG       'K" '),?&?&? :2>&?&?&?rT   N)rf   r   ra   rb   items)rQ   r   r   s   ` @rR   _update_running_replicasz-RouterMetricsManager._update_running_replicas   s     SRAQRRR 	 	1<    +/+M+S+S+U+U  2 2D.	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   ?A""A&)A&returnc                 ,    | j         d S | j         j        S rN   )ri   autoscaling_configrQ   s    rR   r   z'RouterMetricsManager.autoscaling_config   s    "*4&99rT   deployment_configr   c                    | j         rdS || _        | j        }|r| j                                         |                     |          r|                                  | j                            | j        | j	        t          t          |j                             | j                            | j        | j        |j                   dS | j        r| j                                         dS dS )zCUpdate the config for the deployment this router sends requests to.N)rj   ri   r   rg   startr   r   register_or_update_taskRECORD_METRICS_TASK_NAME_add_autoscaling_metrics_pointminr#   metrics_interval_s$PUSH_METRICS_TO_CONTROLLER_TASK_NAME
stop_tasks)rQ   r   r   r   s       rR   update_deployment_configz-RouterMetricsManager.update_deployment_config   s   
 > 	F"3 "4 	1%%'''
 ==>OPP >;;=== 77-3I&9    779;"5     " 1#..000001 1rT   c                 l   | j                                         D ]#\  }}| j                            |d|i           $| j                                          | j                            | j                   | j                            t          | j
                                                             d S )Nrx   tags)rm   r   rZ   incclearr_   r`   r^   rc   sumrb   values)rQ   rx   counts      rR   _report_cached_metricsz+RouterMetricsManager._report_cached_metrics  s     <BBDD 	G 	GLE5$((gu5E(FFFF(..000&**4+CDDD'++299;;<<	
 	
 	
 	
 	
rT   c                 N  K   | j         dk    sJ d}	 	 t          j        | j                    d {V  |                                  d}n\# t          $ rO t
                              d           t          dd|z            }|dz  }t          j        |           d {V  Y nw xY w)Nr   Tz#Unexpected error reporting metrics.
         )rl   asynciosleepr   	Exceptionrt   	exceptionr   )rQ   consecutive_errorsbackoff_time_ss      rR   rP   z3RouterMetricsManager._report_cached_metrics_forever)  s      .2222	4
4mD$CDDDDDDDDD++---%&"" 4 4 4  !FGGG "%R,>)>!?!?"a'"mN333333333334	4s   5A
 
AB#"B#rx   c                 |    | j         r| j        |xx         dz  cc<   d S | j                            d|i           d S )Nr   rx   r   )rk   rm   rZ   r   )rQ   rx   s     rR   rw   z+RouterMetricsManager.inc_num_total_requests:  sX    ' 	@,U333q833333$((w.>(?????rT   c                 v    | xj         dz  c_         | j        s!| j                            | j                    d S d S Nr   r^   rk   r_   r`   r   s    rR   r   z,RouterMetricsManager.inc_num_queued_requests@  N      A%  + 	I*..t/GHHHHH	I 	IrT   c                 v    | xj         dz  c_         | j        s!| j                            | j                    d S d S r   r   r   s    rR   r   z,RouterMetricsManager.dec_num_queued_requestsE  r   rT   r   c                     | j         5  | j        |xx         dz  cc<   | j        s>| j                            t          | j                                                             d d d            d S # 1 swxY w Y   d S r   rf   rb   rk   rc   r`   r   r   rQ   r   s     rR   $inc_num_running_requests_for_replicaz9RouterMetricsManager.inc_num_running_requests_for_replicaJ       	 	.z:::a?:::/ /33:AACCDD  	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	   AA00A47A4c                     | j         5  | j        |xx         dz  cc<   | j        s>| j                            t          | j                                                             d d d            d S # 1 swxY w Y   d S r   r   r   s     rR   $dec_num_running_requests_for_replicaz9RouterMetricsManager.dec_num_running_requests_for_replicaR  r   r   c                 6    | j         d uo|dk    o
| j        dk    S )Nr   )r   r^   )rQ   r   s     rR   r   z>RouterMetricsManager.should_send_scaled_to_zero_optimized_pushZ  s/    #4/ -!Q&-(1,	
rT   c                 h    | j         j                            |                                            dS )zPushes queued and running request metrics to the controller.

        These metrics are used by the controller for autoscaling.
        N)rY   &record_autoscaling_metrics_from_handleremote_get_metrics_reportr   s    rR   r   z;RouterMetricsManager.push_autoscaling_metrics_to_controllera  s;    
 	FMM$$&&	
 	
 	
 	
 	
rT   c                 <   t          j                     }| j                            t          | j        i|           t
          r | j                            | j        |           t          j                     | j        j        z
  }| j        	                    |           dS )ud
  Adds metrics point for queued and running requests at replicas.

        Also prunes keys in the in memory metrics store with outdated datapoints.

        ┌─────────────────────────────────────────────────────────────────┐
        │  Handle-based metrics collection                                │
        ├─────────────────────────────────────────────────────────────────┤
        │                                                                 │
        │  Client                Handle              Replicas             │
        │  ┌──────┐            ┌────────┐          ┌─────────┐           │
        │  │  App │───────────>│ Handle │─────────>│ Replica │           │
        │  │      │  Requests  │        │ Forwards │    1    │           │
        │  └──────┘            │ Tracks │          └─────────┘           │
        │                      │ Queued │                                │
        │                      │   +    │          ┌─────────┐           │
        │                      │Running │─────────>│ Replica │           │
        │                      │Requests│ Forwards │    2    │           │
        │                      └────────┘          └─────────┘           │
        │                          │                                      │
        │                          │ Push metrics                         │
        │                          └─────────────────> Controller         │
        │                                                                 │
        └─────────────────────────────────────────────────────────────────┘

        :::{note}
        The long-term plan is to deprecate handle-based metrics collection in favor of
        replica-based collection. Replica-based collection will become the default in a
        future release. Queued requests will be continues to be tracked at the handle.
        :::
        N)
timerh   add_metrics_pointr)   r^   r"   rb   r   look_back_period_sprune_keys_and_compact_data)rQ   	timestampstart_timestamps      rR   r   z3RouterMetricsManager._add_autoscaling_metrics_pointj  s    @ IKK	,, $":;Y	
 	
 	
 ; 	002I  
 )++(?(RR66GGGGGrT   c                 v   t          j                     }t                      }t                      }| j        j        }| j                            t          j                     |z
             | j                            t          g          d         }|| j        }| j        	                    t                    pd}| j        j
                            t          t          || j                  g          }t          r| j        r~| j                                        D ]d\  }}	| j                            |g          d         }
|
|	}
|
|z  ||<   | j        j
                            |t          ||	          g          ||<   et#          | j        | j        | j        | j        ||t,          |it,          |i|	  	        }|S )Nr   r   )	r>   r?   rL   rA   aggregated_queued_requestsqueued_requestsaggregated_metricsr9   r   )r   dictr   r   rh   r   aggregate_avgr)   r^   timeseries_countdatagetr,   r"   rb   r   aggregate_sumr   rV   rU   rW   rX   r   )rQ   r   running_requestsavg_running_requestslook_back_periodavg_queued_requestsnum_data_pointsr   r   num_requestsrunning_requests_sumhandle_metric_reports               rR   r   z(RouterMetricsManager._get_metrics_report  s   IKK	66#vv2E66ty{{EU7UVVV"0>>@S?TUUVWX& #'": ,==>QRRWVW,155"29d>V"W"W!X
 
 ; 	t?V 	,0,N,T,T,V,V  (
L (,'9'G'G'U'UVW'X$'/ ,8((?: %Z0 04/A/F/J/J!1)\!J!J K0 0 ,,  2-o(-':+$&:  %&6   
  
  
  $#rT   c                 d   K   | j         r| j                                          d{V  d| _        dS )z$Shutdown metrics manager gracefully.NT)rg   graceful_shutdownrj   r   s    rR   shutdownzRouterMetricsManager.shutdown  sE        	:%77999999999rT   N)-__name__
__module____qualname____doc__r   r   r   strr   r   r9   CounterGauger   BaseEventLoopro   r
   r   rz   boolra   r   r   r    r   propertyr   r6   r   r!   r   r   rP   rw   r   r   r   r   r   r   r   r   r   r   r   r   rT   rR   r;   r;   F   s       ))+G(/SA#SA SA 	SA
 .SA 'SA ")SA  '}SA !(SA )SA SA SA SAj "O " " " ^"H +D +S + + + ^+(>P9Q    $ :H->$? : : : X:(1!1(1FI(1 (1 (1 (1T	
 	
 	
4 4 4"@C @ @ @ @I I I
I I I
y    y    
3 
SW 
 
 
 

 
 
+H +H +HZ6$%7 6$ 6$ 6$ 6$p    rT   r;   c                       e Zd Zedefd            Zededej        j	        e
         fd            Zedej        j	        fd            ZdS )Routerr   c                     d S rN   r   r   s    rR   running_replicas_populatedz!Router.running_replicas_populated      rT   rp   c                     d S rN   r   rQ   rp   request_argsrequest_kwargss       rR   assign_requestzRouter.assign_request  s	     	rT   c                     d S rN   r   r   s    rR   r   zRouter.shutdown  r   rT   N)r   r   r   r   r   r   r   
concurrentr   Futurer-   r   r   r   rT   rR   r   r     s        D    ^ %
 
		"=	1   ^ *,3    ^  rT   r   r   c                  ,   K   t          j                    S )z<Helper to create an asyncio event in the current event loop.)r   Eventr   rT   rR   create_eventr     s      =??rT   c                       e Zd Zeddddfdedededededej	        de
d	ed
ee         de
dedee         deeeef                  dee         deej                 fdZedee         fd            Zde
fdZdefdZdefdZdeddfdZdedededeeef         fdZdeded e
dee          fd!Z!dedede fd"Z"d#e#de fd$Z$d% Z%dS )&AsyncioRouterNrB   r>   r?   r@   rA   rF   "enable_strict_max_ongoing_requestsnode_idavailability_zoneprefer_local_node_routingresolve_request_arg_funcrequest_router_classrequest_router_kwargsrequest_router!_request_router_initialized_eventc                 d   || _         || _        || _        || _        || _        || _        |r|ni | _        || _        || _        |	| _	        |
| _
        d| _        || _        |r|| _        n?t          j        t!                      | j                  }|                                | _        | j        r| j                                         || _        d| _        d| _        t-          |||||t/          j        ddd          t/          j        dd	d
          t/          j        ddd
          |	  	        | _        t7          |t8          j        |f| j        t8          j        |f| j         i| j                  | _!        tD          #                    || j                  }|$                    |            dS )zUsed to assign requests to downstream replicas for a deployment.

        The routing behavior is delegated to a RequestRouter; this is a thin
        wrapper that adds metrics and logging.
        TNFserve_num_router_requestsz/The number of requests processed by the router.)rI   rx   rJ   rK   rL   )descriptiontag_keysserve_deployment_queued_querieszUThe current number of queries to this deployment waiting to be assigned to a replica.rH   &serve_num_ongoing_requests_at_replicaszXThe current number of requests to this deployment that have been submitted to a replica.)call_in_event_loop)%rY   r>   rW   rX   _event_loop_request_router_class_request_router_kwargs#_enable_strict_max_ongoing_requests_node_id_availability_zone_prefer_local_node_routing_deployment_available_request_router_request_router_initializedr   run_coroutine_threadsafer   resultr`   _resolve_request_arg_func_running_replicas_running_replicas_populatedr;   r9   r   r   _metrics_managerr'   r(   DEPLOYMENT_TARGETSupdate_deployment_targetsDEPLOYMENT_CONFIGr   long_poll_clientSharedRouterLongPollClientget_or_createregister)rQ   rB   r>   r?   r@   rA   rF   r  r  r  r  r  r  r  r  r	  futureshareds                     rR   ro   zAsyncioRouter.__init__  s   . #4*++%%9"%:B!! 	# 4V0"3*C' &*" 9G, 	?/PD,,5lnndFVWWF/5}}D, 	3,00222)A&EI 27(
 !5O+MU  
 M14 M   M88 M   7!
 !
L !/ &8! 1%7! 0	  $/!
 !
 !
 ,99t/
 
 	rT   r   c                 R   | j         s| j        r|                     | j        | j        | j        | j        t          j                                                    rt          j                    j	        nd| j
        d | j        t          | j        
  
        } |j        di | j         | j        |                    | j                   || _         | j                                         | j        j        t*          j        k    rt,          j                            d           | j         S )a  Get and lazy loading request router.

        If the request_router_class not provided, and the request router is not
        yet initialized, then it will return None. Otherwise, if request router
        is not yet initialized, it will be initialized and returned. Also,
        setting `self._request_router_initialized` to signal that the request
        router is initialized.
        Nc                      t          |           S rN   r1   )rs    rR   <lambda>z.AsyncioRouter.request_router.<locals>.<lambda>}  s    nQ6G6G rT   )
r>   rA   self_node_idr@   self_actor_handleuse_replica_queue_len_cachecreate_replica_wrapper_funcr  prefer_local_az_routingself_availability_zone1r   )r  r  r>   rX   r  rW   rayget_runtime_contextget_actor_idcurrent_actorr  r  r%   r  initialize_stater  r  r   r  r`   r   r0   r3   CUSTOM_REQUEST_ROUTER_USEDrecord)rQ   r  s     rR   r  zAsyncioRouter.request_routerh  s2    # 	E(B 	E!77"0"1!]"1*,,99;;##"9";";"I"I,0,T,G,G*.*I(O'+'> 8  N ,N+LLt/JLLL %1778NOOO#1D ,00222
 *31:; ; 8??DDD##rT   c                     | j         S rN   )r  r   s    rR   r   z(AsyncioRouter.running_replicas_populated  s    //rT   deployment_target_infoc                     |j         | _        |j        }| j        r| j                            |           n|| _        | j                            |           |r	d| _        d S d S )NT)is_availabler  r   r  r   r  r   r  )rQ   r=  r   s      rR   r"  z'AsyncioRouter.update_deployment_targets  s~    %;%H"1B 	6889IJJJJ
 &6D"667GHHH 	4/3D,,,	4 	4rT   r   c                     |j                                         | _        |j         j        | _        | j                            |t          | j        j	                             d S )Nr~   )
request_router_configget_request_router_classr  r  r  r   r   lenr  curr_replicas)rQ   r   s     rR   r   z&AsyncioRouter.update_deployment_config  so    3LLNN 	" 3I 	# 	66!$"5"CDD 	7 	
 	
 	
 	
 	
rT   prc                 @  K   |j         rdS t          |j                  }|j                                        }i }t          |j                  D ]-\  }}|                     ||j                   d{V }||||<   .i }|j                                        D ]-\  }	}|                     ||j                   d{V }||||	<   .|s|r]t          |	                                          t          |	                                          z   }
t          j        |
           d{V  |                                D ]\  }}|                                ||<   |                                D ]\  }}|                                ||<   ||_        ||_        d|_         dS )zEAsynchronously resolve and replace top-level request args and kwargs.NT)resolvedlistargskwargscopy	enumerater  metadatar   r   r   waitr  )rQ   rE  new_args
new_kwargsresolve_arg_tasksiobjtaskresolve_kwarg_tasksk	all_tasksindexkeys                rR   _resolve_request_argumentsz(AsyncioRouter._resolve_request_arguments  s     
 ; 	F==Y^^%%
 (( 	, 	,FAs77R[IIIIIIIID'+!!$ !ioo'' 	. 	.FAs77R[IIIIIIIID)-#A&  	* 3 	*.5577884#**,,< < I ,y))))))))) -2244 	, 	,KE4"kkmmHUOO,2244 	, 	,IC"kkmmJsOO	rT   r   parent_request_idresponse_idr  c                    | j                             |           t          |t                    r@| j        r| j                            |           t                              | d           d S t          |t                    rA| j        r| j        	                    |           t                              d| d           d S d S )N@ will not be considered for future requests because it has died.zRequest failed because  is temporarily unavailable.)
r   r   
isinstancer   r  on_replica_actor_diedrt   ru   r   on_replica_actor_unavailable)rQ   r   r[  r\  r  s        rR   _process_finished_requestz'AsyncioRouter._process_finished_request  s     	BB:NNNfn-- 	 " F#99*EEENN 0 0 0      566 
	 " M#@@LLLNNR*RRR    
	 
	rT   r{   c                   K   d }d }	 t          | j        j                  }| j                            ||          5  |j        s|                     |           d {V  | j                            ||           d {V }| j        o|j	         }|
                    ||          }| j                            |j                   d d d            n# 1 swxY w Y   t          rzt          j        j                                        }|j        }	| j                            |j                   t)          | j        |j        |	|          }
|                    |
           |s|S |                                 d {V }| j                            |j        |           |j        r#| j                            ||j        |           |S n# t6          j        $ r ||                                  t<          $ rF |A| j                            |j                   t@          !                    |j         d           Y nRtD          $ rF |A| j        #                    |j                   t@          !                    |j         d           Y nw xY wd S )N)r{   )with_rejectionr^  r_  )$rC  r  rD  r   r   rG  rZ  _choose_replica_for_requestr  is_cross_languagetry_send_requeston_send_requestr   r"   r5  servecontext_get_serve_request_context
request_idr   r   rc  add_done_callbackget_rejection_responseon_new_queue_len_infoacceptedon_request_routedr   CancelledErrorcancelr   ra  rt   ru   r   rb  )rQ   rE  r\  r{   r  r   r|   re  _request_contextrm  callback
queue_infos               rR   _route_and_send_request_oncez*AsyncioRouter._route_and_send_request_once  sm      +/,0L	T #D$7$E F F&::8EVWW H H
 { >99"========= $ 3 O O !P ! !       < 6#55  !11"^1TT #33G4FGGG-H H H H H H H H H H H H H H H2 ? 3#&9#4#O#O#Q#Q "2"=
%JJ&   #2&	  ((222! %<<>>>>>>>>J55g6H*UUU" #55b':LfUUU % 	 	 	 ! 		 		 		 "#99':LMMM) 4 4 4   % 	T 	T 	T "#@@ASTTT'"4RRRSSS	T tsE   4G B
CG CG CBG "A#G A2J
;AJ
	J
c                    K   | j                                          d{V  d}	 |                     |||           d{V }||S d}$)zChoose a replica for the request and send it.

        This will block indefinitely if no replicas are available to handle the
        request, so it's up to the caller to time out or cancel the request.
        NFT)r  rN  rx  )rQ   rE  r\  r{   r  s        rR   route_and_send_requestz$AsyncioRouter.route_and_send_requestS  s       .33555555555	<<       F
 ! H	rT   rp   c                   K   | j         st          | j                  t                      t	          j                    }t          j        j        	                    j
        |           |                    fd           | j                                         d{V  | j                                      5  d}	 |                     t#          t%          |          |                     d{V }|cddd           S # t          j        $ r ||                                  w xY w# 1 swxY w Y   dS )zBAssign a request to a replica and return the resulting object_ref.c                 X    t           j        j                            j                  S rN   )r5  rj  rk  "_remove_request_pending_assignmentinternal_request_id)_rp   r\  s    rR   r-  z.AsyncioRouter.assign_request.<locals>.<lambda>  s$    ci'JJ0+  rT   N)rI  rJ  rM  )r  r8   r>   r4   r   current_taskr5  rj  rk  _add_request_pending_assignmentr~  rn  r  rN  r   rz   rz  r.   rH  rs  rt  )rQ   rp   r   r   assign_request_taskreplica_resultr\  s    `    @rR   r   zAsyncioRouter.assign_requestp  s      ) 	A,T-?@@@)++%244	99,k;N	
 	
 	
 	--    	
 	
 	
 .33555555555"::<HH 	 	!N'+'B'B"!,//-!-  
  ( ( " " " " " " &	 	 	 	 	 	 	 	 )    "-"))+++	 	 	 	 	 	 	 	 	 	s$   >D3:D		'D00D33D7:D7c                 H   K   | j                                          d {V  d S rN   )r   r   r   s    rR   r   zAsyncioRouter.shutdown  s3      #,,...........rT   )&r   r   r   r5   r   r   r   r   r   r   r   r   r   r   r   r   r/   r   ro   r   r  r   r   r"  r!   r   r.   rZ  r   r   r   rc  r-   rx  rz  r   r   r   r   rT   rR   r   r     s        /J37:>26EI!u u&u $u 	u
 u .u )u -1u u $C=u $(u #,u 'x0u  (S#X7u !/u  ,4GM+B!u u u un )$ 7 )$ )$ )$ X)$V0D 0 0 0 04@T 4 4 4 4 

:J 

 

 

 

(( 
( ( ( (T  	
 c8m$   <VV V 	V
 
-	 V V V Vp  
	   :,%,
 
, , , ,\/ / / / /rT   r   c                       e Zd ZU dZdZeej                 ed<    e	j
                    Zd Zedej        fd            ZdefdZdedej        j        e         fd	Zdej        j        fd
ZdS )SingletonThreadRoutera-  Wrapper class that runs an AsyncioRouter on a separate thread.

    The motivation for this is to avoid user code blocking the event loop and
    preventing the router from making progress.

    Maintains a singleton event loop running in a daemon thread that is shared by
    all AsyncioRouters.
    N_asyncio_loopc                 l    d|vs
J d            t          dd|                                 i|| _        d S )NrF   z4SingletonThreadRouter manages the router event loop.r   )r   _get_singleton_asyncio_loop_asyncio_routerrQ   passthrough_kwargss     rR   ro   zSingletonThreadRouter.__init__  sZ     2222A 322  -  
  
7799 
=O 
  
rT   r   c                     | j         5  | j        Lt          j                    | _        t	          j        d| j        j                  }|                                 ddd           n# 1 swxY w Y   | j        S )zdGet singleton asyncio loop running in a daemon thread.

        This method is thread safe.
        NT)daemontarget)_asyncio_loop_creation_lockr  r   new_event_looprd   Threadrun_foreverr   )clsthreads     rR   r  z1SingletonThreadRouter._get_singleton_asyncio_loop  s     , 	 	 ($+$:$<$<!"),8   	 	 	 	 	 	 	 	 	 	 	 	 	 	 	   s   AA((A,/A,c                 4    | j                                         S rN   r  r   r   s    rR   r   z0SingletonThreadRouter.running_replicas_populated      #>>@@@rT   rp   c                      dt           j        dt          j        j        fdt          j                                         fd} j                            |           S )a  Routes assign_request call on the internal asyncio loop.

        This method uses `run_coroutine_threadsafe` to execute the actual request
        assignment logic (`_asyncio_router.assign_request`) on the dedicated
        asyncio event loop thread. It returns a `concurrent.futures.Future` that
        can be awaited or queried from the calling thread.

        Returns:
            A concurrent.futures.Future resolving to the ReplicaResult representing
            the assigned request.
        asyncio_futureconcurrent_futurec                    |                                 rl|                                  sZ|                                 H|                                 }t                              d           |                                 dS dS dS dS )a  Callback attached to the asyncio Task running assign_request.

            This runs when the asyncio Task finishes (completes, fails, or is cancelled).
            Its primary goal is to propagate cancellation initiated via the
            `concurrent_future` back to the `ReplicaResult` in situations where
            asyncio_future didn't see the cancellation event in time. Think of it
            like a second line of defense for cancellation of replica results.
            NzuAsyncio task completed despite cancellation attempt. Attempting to cancel the request that was assigned to a replica.)	cancelledr   r  rt   infort  )r  r  r  s      rR   asyncio_future_callbackzESingletonThreadRouter.assign_request.<locals>.asyncio_future_callback  s     "++--
 &0022
  #,,..6(6(=(=(?(?W   
  
  
  
  76rT   c                     j                              j        j        gR i           } |                     fd           	 t          j        t          | j                              d S # t          t          f$ r  t          $ r/}                                r                    |            d }~ww xY w)Nc                      |           S rN   r   )r  r  r  s    rR   r-  zUSingletonThreadRouter.assign_request.<locals>.create_task_and_setup.<locals>.<lambda>  s    11!5FGG rT   loop)r  rO   r  r   rn  r   _chain_futurer   
SystemExitKeyboardInterruptBaseExceptionset_running_or_notify_cancelset_exception)rT  excr  r  r   r   rp   rQ   s     rR   create_task_and_setupzCSingletonThreadRouter.assign_request.<locals>.create_task_and_setup  s   %113$3 #/  3A  D ""GGGGG  
%!$T-?@@@BS      12       $AACC 9%33C888s   
)A5 5B?*B::B?)r   r   r   r   r  rn   )rQ   rp   r   r   r  r  r  s   ```` @@rR   r   z$SingletonThreadRouter.assign_request  s    $	 #N	 ?I?Q?X	  	  	  	 0 '.5577	 	 	 	 	 	 	 	 	 	2 	//0EFFF  rT   c                 f    t          j        | j                                        | j                  S )Nr  )r   r  r  r   r  r   s    rR   r   zSingletonThreadRouter.shutdown  s3    / ))++$2D
 
 
 	
rT   )r   r   r   r   r  r   r   r   __annotations__rd   re   r  ro   classmethodr  r   r   r   r   r   r   r-   r   r   r   rT   rR   r  r    s           :>M8G56==="0)."2"2
 
 
 !G,E ! ! ! [! AD A A A AF!%F!
 
		"=	1F! F! F! F!P
*,3 
 
 
 
 
 
rT   r  c                       e Zd ZdedefdZe ed          dededd fd                        Zde	d	e
ddfd
Zded	e
ddfdZdeddfdZdeddfdZdS )r%  rB   rF   c                     || _         || _        t          t          j                  | _        t          |i | j                  | _        d S )N)key_listenersr  )controller_handlerrF   r   weakrefWeakSetroutersr'   r$  )rQ   rB   rF   s      rR   ro   z#SharedRouterLongPollClient.__init__  sQ    "3$ (( 	
 !/#!
 !
 !
rT   N)maxsizer   c                 \     | ||          }t                               d| d           |S )N)rB   rF   zStarted .)rt   r  )r  rB   rF   r)  s       rR   r&  z(SharedRouterLongPollClient.get_or_create,  s<    
 '8ZPPP(v((()))rT   r=  r>   c                     | j         |         D ]0}|                    |           |j                                         1d S rN   )r  r"  r$  stop)rQ   r=  r>   routers       rR   r"  z4SharedRouterLongPollClient.update_deployment_targets5  sQ    
 l=1 	+ 	+F,,-CDDD#((****	+ 	+rT   r   c                     | j         |         D ]0}|                    |           |j                                         1d S rN   )r  r   r$  r  )rQ   r   r>   r  s       rR   r   z3SharedRouterLongPollClient.update_deployment_config>  sQ     l=1 	+ 	+F++,=>>>#((****	+ 	+rT   r  c                 F    | j                             | j        |           d S rN   )rF   rn   	_register)rQ   r  s     rR   r'  z#SharedRouterLongPollClient.registerE  s$     	,,T^VDDDDDrT   c                      j         |j                                     |           t           j                                                   D ]!\  }}|s j                             |           " fd j                                         D              fd j                                         D             z  } j                            |           d S )Nc                 V    i | ]%}t           j        |ft          j        |           &S )r>   )r(   r!  r   r"  r   r>   rQ   s     rR   r   z8SharedRouterLongPollClient._register.<locals>.<dictcomp>[  sO     
 
 
  1=A7.mD D D
 
 
rT   c                 V    i | ]%}t           j        |ft          j        |           &S r  )r(   r#  r   r   r  s     rR   r   z8SharedRouterLongPollClient._register.<locals>.<dictcomp>`  sO     
 
 
  0-@'-]C C C
 
 
rT   )	r  r>   addrH  r   popkeysr$  add_key_listeners)rQ   r  r>   r  r  s   `    rR   r  z$SharedRouterLongPollClient._registerN  s    V)*..v666 '+4<+=+=+?+?&@&@ 	0 	0"M7 0  ///
 
 
 
 "&!2!2!4!4	
 
 


 
 
 
 "&!2!2!4!4	
 
 


 	//>>>>>rT   )r   r   r   r   r   ro   r  r   r&  r   r   r"  r!   r   r   r'  r  r   rT   rR   r%  r%    s-       
+ 
CT 
 
 
 
" Yt +9J	%    [+ 4+ $+ 
	+ + + ++!1+BN+	+ + + +E} E E E E E? ?$ ? ? ? ? ? ?rT   r%  c                   `    e Zd ZdZd ZdefdZdedej	        e
         fdZdej	        fdZdS )	CurrentLoopRouterzWrapper class that runs an AsyncioRouter on the current asyncio loop.
    Note that this class is NOT THREAD-SAFE, and all methods are expected to be
    invoked from a single asyncio event loop.
    c                     d|vs
J d            t          j                    | _        t          d| j        t          j                    d|| _        d S )NrF   z.CurrentLoopRouter uses the current event loop.)rF   r	  r   )r   get_running_loopr  r   r   r  r  s     rR   ro   zCurrentLoopRouter.__init__o  sm     2222; 322 %577,  
).5moo 
  
 ! 
  
rT   r   c                 4    | j                                         S rN   r  r   s    rR   r   z,CurrentLoopRouter.running_replicas_populated{  r  rT   rp   c                 ^    | j                              | j        j        |g|R i |          S rN   )r  rO   r  r   r   s       rR   r   z CurrentLoopRouter.assign_request~  sQ     !--/D /+  /= 
 
 	
rT   c                 d    | j                             | j                                                  S rN   )r  rO   r  r   r   s    rR   r   zCurrentLoopRouter.shutdown  s(    !--d.B.K.K.M.MNNNrT   N)r   r   r   r   ro   r   r   r   r   r   r-   r   r   r   rT   rR   r  r  i  s         


 

 

AD A A A A

%


 
	&

 

 

 

O'. O O O O O OrT   r  )^r   concurrent.futuresr   loggingrd   r   r  abcr   r   r   r   r   collectionsr   collections.abcr	   
contextlibr
   	functoolsr   r   typingr   r   r   r   r   r   r   r   r5  	ray.actorr   ray.exceptionsr   r   r   ray.serve._private.commonr   r   r   r   r   r   r   r    ray.serve._private.configr!   ray.serve._private.constantsr"   r#   r$   r%   r&   ray.serve._private.long_pollr'   r(    ray.serve._private.metrics_utilsr)   r*   r+   r,   !ray.serve._private.replica_resultr-   !ray.serve._private.request_routerr.   r/   .ray.serve._private.request_router.pow_2_routerr0   1ray.serve._private.request_router.replica_wrapperr2   ray.serve._private.usager3   ray.serve._private.utilsr4   r5   ray.serve.configr6   ray.serve.exceptionsr7   r8   ray.utilr9   	getLoggerrt   r;   r   r   r   r   r  r%  r  r   rT   rR   <module>r     s               # # # # # # # # = = = = = = = = = = # # # # # # * * * * * * % % % % % % ( ( ( ( ( ( ( (	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 


 ! ! ! ! ! ! J J J J J J J J J J	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 7 6 6 6 6 6              K J J J J J J J            < ; ; ; ; ; K K K K K K K K      M L L L L L 2 2 2 2 2 2        / . . . . . N N N N N N N N      		,	-	-O O O O O O O Od    S   &GM    
o/ o/ o/ o/ o/ o/ o/ o/du
 u
 u
 u
 u
F u
 u
 u
pL? L? L? L? L? L? L? L?^"O "O "O "O "O "O "O "O "O "OrT   