
    &`i]                    p   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZm Z m!Z!m"Z"m#Z# d dl$Z%d dl&m'Z' d d	l(m)Z) d d
l*m+Z+ d dl,m-Z-m.Z.m/Z/m0Z0 d dl1Z1d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? d dl@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZHmIZI d dlJmKZK d dlLmMZMmNZNmOZOmPZPmQZQmRZRmSZSmTZTmUZUmVZVmWZWmXZXmYZYmZZZm[Z[m\Z\m]Z]m^Z^m_Z_m`Z`maZambZb d dlcmdZdmeZe d dlfmgZgmhZhmiZimjZjmkZk d dllmmZmmnZnmoZompZp d dlqmrZrmsZs d dltmuZu d dlvmwZw d dlxmyZy d dlzm{Z{m|Z|m}Z} d dl~mZ d dlmZ d d lmZ d d!lmZ d d"lmZmZmZ d d#lmZ d d$lmZmZmZ  ej        ea          Ze"eKee e         e e         e e         eeee ee                  e eeB                  f
         Zd%ed&efd'Z G d( d)          Zeegdf         Z G d* d+e          Z G d, d-e          Z G d. d/          Ze G d0 d1                      Z G d2 d3          ZdS )4    N)ABCabstractmethod)defaultdictdeque)asynccontextmanagercontextmanager)	dataclass)import_module)
AnyAsyncGeneratorCallableDict	GeneratorListOptionalSetTupleUnion)	to_thread)Request)	Starlette)ASGIAppReceiveScopeSend)cloudpickle)CoreContextFilter)get_or_create_event_loop)
ActorClassActorHandle)_PyObjScanner)RemoteFunction)metrics)	RUNNING_REQUESTS_KEYDeploymentID	ReplicaIDReplicaMetricReportReplicaQueueLengthInfoRequestMetadataServeComponentTypeStreamingHTTPRequestgRPCRequest)DeploymentConfig)GRPC_CONTEXT_ARG_NAMEHEALTH_CHECK_METHOD/RAY_SERVE_COLLECT_AUTOSCALING_METRICS_ON_HANDLE$RAY_SERVE_METRICS_EXPORT_INTERVAL_MS,RAY_SERVE_RECORD_AUTOSCALING_STATS_TIMEOUT_S6RAY_SERVE_REPLICA_AUTOSCALING_METRIC_RECORD_INTERVAL_S&RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE RAY_SERVE_RUN_SYNC_IN_THREADPOOL(RAY_SERVE_RUN_SYNC_IN_THREADPOOL_WARNING*RAY_SERVE_RUN_USER_CODE_IN_SEPARATE_THREADRECONFIGURE_METHODREQUEST_LATENCY_BUCKETS_MSREQUEST_ROUTING_STATS_METHODSERVE_CONTROLLER_NAMESERVE_LOG_APPLICATIONSERVE_LOG_COMPONENTSERVE_LOG_DEPLOYMENTSERVE_LOG_REPLICASERVE_LOG_REQUEST_IDSERVE_LOG_ROUTESERVE_LOGGER_NAMESERVE_NAMESPACE)create_replica_implcreate_replica_metrics_manager)ASGIAppReplicaWrapperASGIArgsASGIReceiveProxyMessageQueueResponse)access_log_msgconfigure_component_logger#configure_component_memory_profilerget_component_logger_file_path)InMemoryMetricsStoreMetricsPusher)TaskConsumerWrapper)extract_route_patterns)ServeUsageTag)	Semaphoreget_component_file_nameparse_import_path)DeploymentVersion)AutoscalingConfig)_get_in_flight_requests)
Deployment)BackPressureErrorDeploymentUnavailableErrorRayServeException)DeploymentHandle)EncodingTypeLoggingConfigReplicaRankimport_pathreturnc                 V   t          |           \  }}t          t          |          |          }t          |t                    r|j        }n\t          |t                    r|j        j        }n:t          |t                    r%t                              d|  d           |j        }|S )NzThe import path "zm" contains a decorated Serve deployment. The decorator's settings are ignored when deploying via import path.)rV   getattrr
   
isinstancer"   	_functionr   __ray_metadata__modified_classrZ   loggerwarningfunc_or_class)rb   module_name	attr_namedeployment_defs       n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/_private/replica.py%_load_deployment_def_from_import_pathrq      s    .{;;K];77CCN ..11 
6'1	NJ	/	/ 6'8G	NJ	/	/ 6: : : :	
 	
 	

 (5    c                   d   e Zd ZdZdZdZdZdedej	        de
e         defd	Zd
 Zd Zd Zd ZdefdZde
e         fdZdedeg e
ej        j                 f         fdZdedefdZdedefdZdefdZdededefdZ de!ee"f         fdZ#de
e!ee$eef         f                  fdZ%d dZ&dS )!ReplicaMetricsManagera"  Manages metrics for the replica.

    A variety of metrics are managed:
        - Fine-grained metrics are set for every request.
        - Autoscaling statistics are periodically pushed to the controller.
        - Queue length metrics are periodically recorded as user-facing gauges.
    push_metrics_to_controllerrecord_metrics set_replica_request_metric_gauge
replica_id
event_loopautoscaling_configingressc                    || _         |j        | _        t                      | _        t                      | _        || _        t          j	        t          t                    | _        d| _        |pt          j                    | _        d| _        d| _        d | _        t(          dk    | _        t(          dz  | _        t/          j        dd          | _        | j                                         t/          j        dd	d
          | _        | j        rt9          t:                    | _        t/          j        ddd
          | _        | j        rt9          t:                    | _         tB          "                    dtF                      t/          j$        ddtF          d
          | _%        | j        rEt9          tL                    | _'        | j        (                    | )                                           t/          j*        dd          | _+        t/          j        ddd          | _,        t/          j$        ddtF                    | _-        | .                    |           d S )N)	namespacer   F  serve_deployment_replica_startszCThe number of times this replica has been restarted due to failure.)description serve_deployment_request_counterz?The number of queries that have been processed in this replica.)route)r   tag_keysserve_deployment_error_counterz<The number of exceptions that have occurred in this replica.zREQUEST_LATENCY_BUCKETS_MS: &serve_deployment_processing_latency_msz(The latency for queries to be processed.)r   
boundariesr    serve_replica_processing_queriesz.The current number of queries being processed.%serve_record_autoscaling_stats_failed)exception_namez;The number of errored record_autoscaling_stats invocations.)r   r   'serve_user_autoscaling_stats_latency_mszRTime taken to execute the user-defined autoscaling stats function in milliseconds.)r   r   )/_replica_iddeployment_id_deployment_idrP   _metrics_pusherrO   _metrics_store_ingressray	get_actorr;   rC   _controller_handle_num_ongoing_requestsasyncioget_event_loop_event_loop_custom_metrics_enabled_checked_custom_metrics_record_autoscaling_stats_fnr1   _cached_metrics_enabled_cached_metrics_interval_sr#   Counter_restart_counterinc_request_counterr   int_cached_request_counter_error_counter_cached_error_counterrj   debugr9   	Histogram_processing_latency_trackerr   _cached_latenciescreate_task_report_cached_metrics_foreverGauge_num_ongoing_requests_gauge'record_autoscaling_stats_failed_counter&user_autoscaling_stats_latency_trackerset_autoscaling_config)selfrx   ry   rz   r{   s        rp   __init__zReplicaMetricsManager.__init__   st    &(6,244"%-!_#
 #
 #
 &'"%A)?)A)A (-$',$,0) (Lq'P$*NQU*U' !(-U!
 !
 !
 	!!### !(.Q!
 !
 !
 ' 	<+6s+;+;D(%o,N
 
 
 ' 	:)4S)9)9D& 	P4NPPQQQ+2+<4B1	,
 ,
 ,
( ' 	P%0%7%7D"(()L)L)N)NOOO+2=.H,
 ,
 ,
(
 8?3(U8
 8
 8
4 7>6G5# 27
 7
 7
3 	##$677777rr   c                 R   | j                                         D ]#\  }}| j                            |d|i           $| j                                          | j                                        D ]#\  }}| j                            |d|i           $| j                                         | j                                        D ](\  }}|D ] }| j        	                    |d|i           !)| j                                         | j
                            | j                   d S )Nr   tags)r   itemsr   r   clearr   r   r   r   observer   setr   )r   r   count	latencies
latency_mss        rp   _report_cached_metricsz,ReplicaMetricsManager._report_cached_metrics  sY    8>>@@ 	D 	DLE5!%%e7E2B%CCCC$**,,, 6<<>> 	B 	BLE5##E%0@#AAAA"((*** $ 6 < < > > 	 	E9'  
088gu%5 9     	$$&&&(,,T-GHHHHHrr   c                 N  K   | j         dk    sJ d}	 	 t          j        | j                    d {V  |                                  d}n\# t          $ rO t
                              d           t          dd|z            }|dz  }t          j        |           d {V  Y nw xY w)Nr   Tz#Unexpected error reporting metrics.
         )r   r   sleepr   	Exceptionrj   	exceptionmin)r   consecutive_errorsbackoff_time_ss      rp   r   z4ReplicaMetricsManager._report_cached_metrics_forever  s      .2222	4
4mD$CDDDDDDDDD++---%&"" 4 4 4  !FGGG "%R,>)>!?!?"a'"mN333333333334	4s   5A
 
AB#"B#c                 H   K   | j                                          d{V  dS )zStop periodic background tasks.N)r   graceful_shutdownr   s    rp   shutdownzReplicaMetricsManager.shutdown+  s5       "4466666666666rr   c                    | j                                          | j                             | j        | j        | j        j                   | j                             | j        | j        t          t          | j        j                             d S N)r   startregister_or_update_task$PUSH_METRICS_TO_CONTROLLER_TASK_NAME_push_autoscaling_metrics_autoscaling_configmetrics_interval_sRECORD_METRICS_TASK_NAME$_add_autoscaling_metrics_point_asyncr   r3   r   s    rp   start_metrics_pusherz*ReplicaMetricsManager.start_metrics_pusher0  s    ""$$$ 	445*$7	
 	
 	
 	44)5F(; 	
 	
 	
 	
 	
rr   rc   c                     t            S )u  Determine if replicas should collect ongoing request metrics.

        ┌────────────────────────────────────────────────────────────────┐
        │  Replica-based metrics collection                              │
        ├────────────────────────────────────────────────────────────────┤
        │                                                                │
        │      Client          Handle            Replicas                │
        │      ┌──────┐      ┌────────┐                                  │
        │      │  App │─────>│ Handle │────┬───>┌─────────┐              │
        │      │      │      │ Tracks │    │    │ Replica │              │
        │      └──────┘      │ Queued │    │    │    1    │              │
        │                    │Requests│    │    │ Tracks  │              │
        │                    └────────┘    │    │ Running │              │
        │                         │        │    └─────────┘              │
        │                         │        │         │                   │
        │                         │        │         │                   │
        │                         │        │    ┌─────────┐              │
        │                         │        └───>│ Replica │              │
        │                         │             │    2    │              │
        │                         │             │ Tracks  │              │
        │                         │             │ Running │              │
        │                         │             └─────────┘              │
        │                         │                  │                   │
        │                         │                  │                   │
        │                         ▼                  ▼                   │
        │                  ┌──────────────────────────────┐              │
        │                  │        Controller            │              │
        │                  │  • Queued metrics (handle)   │              │
        │                  │  • Running metrics (replica1)│              │
        │                  │  • Running metrics (replica2)│              │
        │                  └──────────────────────────────┘              │
        │                                                                │
        └────────────────────────────────────────────────────────────────┘
        )r0   r   s    rp   should_collect_ongoing_requestsz5ReplicaMetricsManager.should_collect_ongoing_requestsC  s    F CBBrr   c                 z    || _         | j         r*|                                 r|                                  dS dS dS )z&Dynamically update autoscaling config.N)r   r   r   )r   rz   s     rp   r   z,ReplicaMetricsManager.set_autoscaling_configh  sV     $6 # 	((L(L(N(N 	(%%'''''	( 	( 	( 	(rr   custom_metrics_enabledrecord_autoscaling_stats_fnc                 R    |r$|| _         || _        |                                  dS dS )z]Runs after the user callable wrapper is initialized to enable autoscaling metrics collection.N)r   r   r   )r   r   r   s      rp   !enable_custom_autoscaling_metricsz7ReplicaMetricsManager.enable_custom_autoscaling_metricsp  s>     " 	(+AD(0KD-%%'''''	( 	(rr   request_metadatac                 v    | xj         dz  c_         | j        s!| j                            | j                    dS dS )zFIncrement the current total queue length of requests for this replica.r   Nr   r   r   r   r   r   s     rp   inc_num_ongoing_requestsz.ReplicaMetricsManager.inc_num_ongoing_requests{  N    ""a'""+ 	M,001KLLLLL	M 	Mrr   c                 v    | xj         dz  c_         | j        s!| j                            | j                    dS dS )zFDecrement the current total queue length of requests for this replica.r   Nr   r   s     rp   dec_num_ongoing_requestsz.ReplicaMetricsManager.dec_num_ongoing_requests  r   rr   c                     | j         S )z<Get current total queue length of requests for this replica.)r   r   s    rp   get_num_ongoing_requestsz.ReplicaMetricsManager.get_num_ongoing_requests      ))rr   r   r   	was_errorc                l   | j         rP| j        |                             |           |r| j        |xx         dz  cc<   dS | j        |xx         dz  cc<   dS | j                            |d|i           |r| j                            d|i           dS | j	                            d|i           dS )zRecords per-request metrics.r   r   r   N)
r   r   appendr   r   r   r   r   r   r   )r   r   r   r   s       rp   record_request_metricsz,ReplicaMetricsManager.record_request_metrics  s    ' 	A"5)00<<< 9*5111Q611111,U333q833333,44ZwPUFV4WWW A#''gu-='>>>>>%))/?)@@@@@rr   c                    | j         j        }| j                            t	          j                    |z
             i }i | j        j        }|                                 rD| j                            t          g          d         pd}|	                    t          |i           t          | j        t	          j                    ||          }| j        j                            |           d S )Nr   g        )rx   	timestampaggregated_metricsr#   )r   look_back_period_sr   prune_keys_and_compact_datatimedatar   aggregate_avgr$   updater'   r   r   'record_autoscaling_metrics_from_replicaremote)r   look_back_periodnew_aggregated_metricsnew_metrics
window_avgreplica_metric_reports         rp   r   z/ReplicaMetricsManager._push_autoscaling_metrics  s    3F77	FV8VWWW!#2,12//11 	N #113G2HII!LSPS  #))+?*LMMM 3'ikk5	!
 !
 !
 	GNN!	
 	
 	
 	
 	
rr   c                   K   	 t          j                     }t          j        |                                 t                     d {V }t          j                     |z
  dz  }| j                            |           | j        st          |t                    s9t                              dt          |          j         d           d| _        d S |                                D ]^\  }}t          |t           t"          f          s=t                              dt          |          j         d| d           d| _         d S _d	| _        |S # t          j        $ rT}t                              d
t           d           | j                            d|j        j        i           Y d }~n^d }~wt,          $ rN}t                              d|            | j                            d|j        j        i           Y d }~nd }~ww xY wd S )N)timeoutr~   z'User autoscaling stats method returned zE, expected Dict[str, Union[int, float]]. Disabling autoscaling stats.Fz:User autoscaling stats method returned invalid value type z
 for key 'z6', expected int or float. Disabling autoscaling stats.Tz*Replica autoscaling stats timed out after zs.r   r   z"Replica autoscaling stats failed. )r   r   wait_forr   r2   r   r   r   rf   dictrj   errortype__name__r   r   r   floatTimeoutErrorr   r   	__class__r   )r   
start_timeresr   keyvaluees          rp   !_fetch_custom_autoscaling_metricsz7ReplicaMetricsManager._fetch_custom_autoscaling_metrics  s     ,	J(1133D        C )++
2d:J7??
KKK / 4!#t,,  LL_$s))BT _ _ _   49D04"%))++ $ $JC%ec5\:: $<#E{{3< <?B< < <  
 8=4#tt$ 04,J# 	 	 	LLm=immm   8<<&(<= =          	 	 	LLAaAABBB8<<&(<= =        	
 ts3   CE A0E 	E HA
F,,H9AHHNc                   K   i }|                                  rt          | j        i}| j        r1|                                  d {V }|r|                    |           | j                            |t          j                               d S r   )	r   r$   r   r   r  r   r   add_metrics_pointr   )r   metrics_dictcustom_metricss      rp   r   z:ReplicaMetricsManager._add_autoscaling_metrics_point_async  s      //11 	N0$2LML ' 	4#'#I#I#K#KKKKKKKN 4##N333--IKK	
 	
 	
 	
 	
rr   rc   N)'r  
__module____qualname____doc__r   r   *SET_REPLICA_REQUEST_METRIC_GAUGE_TASK_NAMEr&   r   BaseEventLoopr   rX   boolr   r   r   r   r   r   r   r   
concurrentfuturesFuturer   r)   r   r   r   r   strr  r   r   r   r   r   r  r    rr   rp   rt   rt      sC         ,H(/1S.[8[8 )[8 %%67	[8
 [8 [8 [8 [8zI I I$4 4 4"7 7 7

 
 
&#C #C #C #C #CJ(BS9T ( ( ( (	( $	( &.b(:;M;T2U.U%V	( 	( 	( 	(M MS M M M MM MS M M M M*# * * * *As A ARV A A A A
4S> 
 
 
 
00	$sE#u*--.	/0 0 0 0d
 
 
 
 
 
rr   rt   c                      e Zd Zdedededededede	de
fd	Zed
efd            Zd
efdZd
efdZd
ee         fdZd
ee         fdZddddedefdZdedeef         fdZded
e	fdZeded
ee ddf         fd            Z!de"e#         de"e
         de$defdZ%dedee&         dee
e&f         fd Z'ded
ee(e&f         fd!Z)ded
e*e&df         fd"Z+defd#Z,e-d$             Z.de"e         de"e         fd%Z/	 d1dedede"e
         fd&Z0e-ded'e1j2        fd(            Z3e-ded'e4fd)            Z5e-eded
ee ddf         fd*                        Z6e7defd+            Z8d, Z9d- Z:d. Z;d/ Z<d
ee
e&f         fd0Z=dS )2ReplicaBaserx   ro   	init_argsinit_kwargsdeployment_configversionr{   route_prefixc	           
      B    | _         | _        |j         _        | _        | _        | _         j        j          _         j        j	        r j        j	         d j        z    _         j        j
         _                              j        j                   t                       _        t!          ||| j        t"          t$          d|           _        t)           fd           _        d _        t/          j                     _        d  _        t7                       _        d _        d _        d  _                              d d            tC          | j         j        j"        |           _#        d  _$        d  _%        d  _&        d  _'        d  _(        d S )N_F)r   run_sync_methods_in_threadpool run_user_code_in_separate_threadlocal_testing_moder   c                       j         S r   )max_ongoing_requestsr   s   rp   <lambda>z&ReplicaBase.__init__.<locals>.<lambda>  s
    D,E rr   servable_objectrank)rx   ry   rz   r{   ))_versionr   r   r   _deployment_configr   _route_prefixname_component_nameapp_name	unique_id_component_id_configure_logger_and_profilerslogging_configr   r   UserCallableWrapperr5   r7   _user_callable_wrapperrT   
_semaphore_user_callable_initializedr   Lock_user_callable_initialized_lock_initialization_latencyr   _dynamically_created_handles_healthy_shutting_down_user_callable_asgi_app_set_internal_replica_contextrE   rz   _metrics_manager_internal_grpc_port
_docs_path
_http_port
_grpc_port_rank)	r   rx   ro   r  r  r   r!  r{   r"  s	   `        rp   r   zReplicaBase.__init__  s     %(6"3)"&"5":<' 	&/222T5II   "-7,,T-D-STTT355&9-+K-W$/	'
 	'
 	'
# $$E$E$E$EFF +0'/6|~~,8<$ @Cuu)  $ ;?$ 	**4d*KKK >!'#6I	!
 !
 !
 37 )-)-)-,0


rr   rc   c                     | j         j        S r   )r/  r)  r   s    rp   r)  z ReplicaBase.max_ongoing_requestsE  s    &;;rr   c                 4    | j                                         S r   )rD  r   r   s    rp   r   z$ReplicaBase.get_num_ongoing_requestsI  s    $==???rr   c                 B   t           j        j                                        j        }d }| j        )t          | j        d          rt          | j                  }| j        j	        | j        | j
        | j        | j        | j        | j        |||                                 f
S )Nroutes)r   servecontext_get_internal_replica_contextr-  rB  hasattrrR   r.  r   r>  rE  rF  rG  rH  list_outbound_deployments)r   current_rankroute_patternss      rp   get_metadatazReplicaBase.get_metadataL  s    y(FFHHM'3 t3X>> V!78T!U!U M+M($OOO**,,
 	
rr   c                     | j         S r   )r?  r   s    rp   get_dynamically_created_handlesz+ReplicaBase.get_dynamically_created_handlesc  s    00rr   c                    t                      }|                                 D ]}|                    |           | j        j        }| j        j        }t          t                    }	 |                    ||f          }|D ]}|j	        }|                    |           	 |
                                 n# |
                                 w xY wt          |          S )al  List all outbound deployment IDs this replica calls into.

        This includes:
        - Handles created via get_deployment_handle()
        - Handles passed as init args/kwargs to the deployment constructor

        This is used to determine which deployments are reachable from this replica.
        The list of DeploymentIDs can change over time as new handles can be created at runtime.
        Also its not guaranteed that the list of DeploymentIDs are identical across replicas
        because it depends on user code.

        Returns:
            A list of DeploymentIDs that this replica calls into.
        )source_type)r   rW  addr9  
_init_args_init_kwargsr!   r^   
find_nodesr   r   list)r   seen_deployment_idsr   r  r  scannerhandleshandles           rp   rR  z%ReplicaBase.list_outbound_deploymentsf  s     25 "AACC 	3 	3M##M2222 /:	1>  ,<===	(()[)ABBG! 7 7 & 4#''66667 MMOOOOGMMOOOO'(((s   )8B7 7CNr+  r,  r-  c                      j         j        }dt          dd f fd}t          j        j                             j        | j         |||           d S )Nr   rc   c                 <    j                             |            d S r   )r?  rZ  )r   r   s    rp   register_handle_callbackzKReplicaBase._set_internal_replica_context.<locals>.register_handle_callback  s     -11-@@@@@rr   )rx   r,  r/  r-  
world_sizehandle_registration_callback)r/  num_replicasr%   r   rN  rO  rC  r   )r   r,  r-  rf  re  s   `    rp   rC  z)ReplicaBase._set_internal_replica_context  s     ,9
	AL 	AT 	A 	A 	A 	A 	A 	A 		77'+#6!)A 	8 	
 	
 	
 	
 	
rr   r7  c                 :   |i }t          |t                    rt          di |}t          t          j        | j        | j        |t                     t          t          j        | j        | j                   |j
        t          j        k    rt          j                    }t          j        D ]}|                    |d            i |t"          | j        t$          | j        t&          t          j        t(          | j        j        ddddi| _        d S ddd| _        d S )N)component_typecomponent_namecomponent_idr7  buffer_size)rj  rk  rl  skip_context_filterTserve_access_log)rn  ro  r  )rf   r   r`   rL   r*   REPLICAr2  r5  r4   rM   encodingr_   JSONr   get_ray_core_logging_contextTASK_LEVEL_LOG_KEYSpopr>   r?   r=   r<   r   r3  _access_log_context)r   r7  ray_core_logging_contextr  s       rp   r6  z+ReplicaBase._configure_logger_and_profilers  sL    !Nnd++ 	=*<<^<<N"-5/+)>	
 	
 	
 	
 	,-5/+	
 	
 	
 	
 "l&777
 (9'U'W'W$
 )< 8 8(,,S$7777(*($d&:!4#5#%7%?%t':'C%t"D( (D$$$ (,$(( (D$$$rr   r   c                 6    | j                                          S r   )r:  lockedr   s     rp   _can_accept_requestzReplicaBase._can_accept_request  s    
 ?))++++rr   c              #     K   t          j                     }d }d dt          ffd}	 |V  nx# t          j        $ r"}|}|                     ||           Y d }~nLd }~wt
          $ r<}|}t                              d           |                     ||           Y d }~nd }~ww xY wt          j                     |z
  dz  }| 	                    |||           ||d d S )Nsc                     | d S r   r  )r|  status_codes    rp   _status_code_callbackzEReplicaBase._handle_errors_and_metrics.<locals>._status_code_callback  s    KKKrr   zRequest failed.r~   )
r   r  r   CancelledError_on_request_cancelledr   rj   r   _on_request_failed_record_errors_and_metrics)r   r   r  user_exceptionr  r
  r   r~  s          @rp   _handle_errors_and_metricsz&ReplicaBase._handle_errors_and_metrics  sK      Y[[
	S 	 	 	 	 	 		9'''''% 	< 	< 	<N&&'7;;;;;;;; 	9 	9 	9N.///##$4a88888888	9
 ikkJ.$6
''K5E	
 	
 	
 % d* &%s    . B#AB#'2BB#r  r~  r   c                 ~   |j         }|j        }|j        }|d}nt          |t          j                  rd}nd}|| j        t          <   |j        | j        t          <   t                              t          |pd| j        r|r|n||p||          | j                   | j                            |||d u           d S )NOK	CANCELLEDERRORCALL)methodr   statusr   extra)r   r   r   )_http_methodr   call_methodrf   r   r  rv  rA   
request_idr@   rj   inforK   r   rD  r   )	r   r  r~  r   r   http_method
http_router  
status_strs	            rp   r  z&ReplicaBase._record_errors_and_metrics  s     '3%+
&2!JJ(>?? 	!$JJ J
 5? 19I9T !56",f$(MQjQjjk"0j%   * 	 		
 		
 		
 	44!$D0 	5 	
 	
 	
 	
 	
rr   request_argsrequest_kwargsc                    |j         rut          |          dk    rt          |d         t                    sJ |d         }|j        }t          |||j                  }|                    dd          |_        ||f}n}|j	        rvt          |          dk    rt          |d         t                    sJ |d         }| j                            |j                  }|j        f}|j        rt           |j        ini }||fS )Nr   r   r  WS)is_http_requestlenrf   r+   
asgi_scoperH   receive_asgi_messagesgetr  is_grpc_requestr,   r9  get_user_method_infor  user_request_prototakes_grpc_context_kwargr.   grpc_context)r   r   r  r  requestscopereceivemethod_infos           rp   _unpack_proxy_argszReplicaBase._unpack_proxy_args  s1    + 	|$$))jQ!5/ /))  -9OG&E&')F G -2IIh,E,E)!7+LL- 	|$$))ja+.V.V))V#/?G5JJ , K $68L 7&(8(EFF  ^++rr   c                 v  K   |                      |||          \  }}|                     |          5  |                     |          4 d {V  | j                            |||           d {V cd d d           d {V  cd d d            S # 1 d {V swxY w Y   	 d d d            d S # 1 swxY w Y   d S r   )r  _wrap_request_start_requestr9  call_user_method)r   r   r  r  s       rp   handle_requestzReplicaBase.handle_request@  s      (,'>'>lN(
 (
$n  011 	 	**+;<<        !8II$lN                   	 	 	 	 	 	 	 	              	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s5   B."B0B.
B	B.B	B..B25B2c                  K   |                      |||          \  }}|                     |          5 }|                     |          4 d{V  |j        rC|\  }}| j                            ||||          2 3 d{V }t          j        |          W V  6 n*| j                            |||          2 3 d{V }|W V  6 	 ddd          d{V  n# 1 d{V swxY w Y   ddd           dS # 1 swxY w Y   dS zDGenerator that is the entrypoint for all `stream=True` handle calls.N)	r  r  r  r  r9  call_http_entrypointpickledumpscall_user_generator)	r   r   r  r  status_code_callbackr  r  msgsresults	            rp   handle_request_streamingz$ReplicaBase.handle_request_streamingL  sH      (,'>'>lN(
 (
$n  011 	%5I**+;<< % % % % % % % %#3 %%1NE7&*&A&V&V(,	' ' 1 1 1 1 1 1 1d %l40000000' ' )-(C(W(W($&) ) % % % % % % %f
 %) )% % % % % % % % % % % % % % % % % % % % % % % % % % %	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	%sM   C4*C8B>7C5C;CC4
C 	 C4#C 	$C44C8;C8c                  K   |                      |          sX| j        }t                              d| d|j         dddi           t          d|                                           W V  d S |                     |||          \  }}|                     |          5 }| 	                    |          4 d {V  t          d|                                           W V  |j
        rC|\  }}| j                            ||||          2 3 d {V }t          j        |          W V  6 nW|j        r+| j                            |||          2 3 d {V }	|	W V  6 n%| j                            |||           d {V W V  d d d           d {V  n# 1 d {V swxY w Y   d d d            d S # 1 swxY w Y   d S )	Nz,Replica at capacity of max_ongoing_requests=z, rejecting request .log_to_stderrFr  T)acceptednum_ongoing_requests)rz  r)  rj   rk   r  r(   r   r  r  r  r  r9  r  r  r  is_streamingr  r  )
r   r   r  r  limitr  r  r  r  r  s
             rp   handle_request_with_rejectionz)ReplicaBase.handle_request_with_rejectionf  sz      ''(899 	-ENNDu D D%5%@D D D&.    
 )0M0M0O0OPPPPPPF'+'>'>lN(
 (
$n  011 	5I**+;<<        ,! *.)F)F)H)H	       $3 %1NE7&*&A&V&V(,	' ' 1 1 1 1 1 1 1d %l40000000' ' &2 
(,(C(W(W($&) ) % % % % % % %f
 %) ) !% ; L L(,! !          3                          	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	sO   F3;AFD*>FE.FF3
F	F3"F	#F33F7:F7c                    K   t           r   )NotImplementedErrorr   s    rp   _on_initializedzReplicaBase._on_initialized  s      !!rr   c                   K   |(|| _         |                     | j        j        |           	 | j        4 d {V  t          j                    | _        | j        s| j                                         d {V | _	        | j	        r| j        j
        j        | _        |                                  d {V  d| _        | j        It          | j        d          o| j        j        d u}| j                            || j        j                   |t| j                            |j                   d {V  t*          j        j                                        j        }| j                            |j        |           d {V  d d d           d {V  n# 1 d {V swxY w Y   |                                  d {V  d S # t:          $ r" t=          t?          j                               d w xY w)Nr+  T_user_autoscaling_stats)r   r   r-  )!rI  rC  r9  user_callabler=  r   _initialization_start_timer;  initialize_callablerB  	_callable	docs_pathrF  r  rQ  r  rD  r   call_record_autoscaling_stats set_sync_method_threadpool_limitr)  r   rN  rO  rP  r-  call_reconfigureuser_configcheck_healthr   RuntimeError	traceback
format_exc)r   r   r-  initializeds       rp   
initializezReplicaBase.initialize  s      DJ.. $ ; IPT /   -	A ; # # # # # # # #26)++/6 "9MMOOOOOOOO 0 3  7AK  ..0000000006:D32># $ ;=V  ( !% ; S#'!(	 $ -OO3>8<8S8q P   
 %05VV)>         9,JJLLQD5FF)5! G         A# # # # # # # # # # # # # # # # # # # # # # # # # # #P ##%%%%%%%%%%% 	A 	A 	Ay35566D@	As/   G EFG 
FG "F#G ,G.c                   K   	 |j         | j        j         k    }|| j        k    }|| _        |j        | j        j        k    }|| _        t	          j        | j        ||          | _        | j                            |j	                   |r| 
                    |j                   | j                            |j                   d {V  |s|r'| j                            |j         |           d {V  |                     | j        j        |           | j        j        | _        d S # t&          $ r" t)          t+          j                              d w xY w)Nr  r+  )r  r/  rI  r7  rW   from_deployment_versionr.  rD  r   rz   r6  r9  r  r)  r  rC  r  r"  r0  r   r  r  r  )r   r   r-  r"  user_config_changedrank_changedlogging_config_changeds          rp   reconfigurezReplicaBase.reconfigure  s     (	A!-1H1TT    4:-LDJ!0*9: # '8D#-E0, DM !88!4   & W445F5UVVV-NN!6         # l 1BB%1 C          .. $ ; I /   
 "&!;D 	A 	A 	Ay35566D@	As   D D& &,Er
  c                     d S r   r  r   r   r
  s      rp   r  z!ReplicaBase._on_request_cancelled   s	     	rr   c                     d S r   r  r  s      rp   r  zReplicaBase._on_request_failed  s    rr   c                     d S r   r  r   s     rp   r  zReplicaBase._wrap_request
  s	    
 	rr   c                $  K   | j         4 d {V  	 | j                            |           d W V  | j                            |           n# | j                            |           w xY w	 d d d           d {V  d S # 1 d {V swxY w Y   d S r   )r:  rD  r   r   r   s     rp   r  zReplicaBase._start_request  sW     ? 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	QQ%>>?OPPP%>>?OPPPP%>>?OPPPPP	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Q 	Qs'   A?AA?A((A??
B	B	c                   K   | j         j        }	 t          j        |           d{V  | j                                        }|dk    r"t                              d| d| d           n t                              ddd	i
           dS |)zWait for any ongoing requests to finish.

        Sleep for a grace period before the first time we check the number of ongoing
        requests to allow the notification to remove this replica to propagate to
        callers first.
        TNr   zWaiting for an additional z!s to shut down because there are z ongoing requests.z,Graceful shutdown complete; replica exiting.r  Fr  )r/  graceful_shutdown_wait_loop_sr   r   rD  r   rj   r  )r   wait_loop_period_sr  s      rp   _drain_ongoing_requestsz#ReplicaBase._drain_ongoing_requests  s       "4R	- 2333333333#'#8#Q#Q#S#S #a''R1C R R)=R R R   
 B*E2     	rr   c                   K   	 | j                                          d {V  nC#  | j        rt                              d           nt                              d           Y nxY w| j                                         d {V  d S )NzJ__del__ ran before replica finished initializing, and raised an exception.z__del__ raised an exception.)r9  call_destructorr;  rj   r   rD  r   r   s    rp   r   zReplicaBase.shutdown2  s      	A-==??????????		A . A  +   
   !?@@@#,,...........s	   $ >A$c                    K   d| _         | j        r|                                  d {V  |                                  d {V  d S )NT)rA  r;  r  r   r   s    rp   perform_graceful_shutdownz%ReplicaBase.perform_graceful_shutdownB  se      " * 	1..000000000mmoorr   c                    K   	 | j                                         }|| d {V  d| _        d S # t          $ r)}t                              d           d| _        |d d }~ww xY w)NTzReplica health check failed.F)r9  call_user_health_checkr@  r   rj   rk   r   fr
  s      rp   r  zReplicaBase.check_healthL  s      
	 +BBDDA} DMMM 	 	 	NN9:::!DM	s   *0 
A#$AA#c                    K   	 | j                                         }|| d {V S i S # t          $ r"}t                              d           |d d }~ww xY w)Nz$Replica record routing stats failed.)r9  call_user_record_routing_statsr   rj   rk   r  s      rp   record_routing_statsz ReplicaBase.record_routing_statsY  st      	+JJLLA}wwwwwwI 	 	 	NNABBB	s   ") ) 
AAAr   )>r  r  r  r&   r   r   r   r-   rW   r  r  r   propertyr   r)  r   ReplicaMetadatarU  r   r%   rW  r   rR  ra   rC  r   r`   r6  r)   rz  r   r   StatusCodeCallbackr  r   BaseExceptionr  r  r   r  bytesr  r   r  r  r   r  r  r  r   r  r  r   r  r  r   r  r  r   r  r  r  r  rr   rp   r  r    s       J1J1 !J1 	J1
 J1 ,J1 #J1 J1 J1 J1 J1 J1X <c < < < X<@# @ @ @ @
o 
 
 
 
.1\1B 1 1 1 1%)4+= %) %) %) %)P .2t
 
 
"*
9D
 
 
 
&/#D$$=>/ / / /b,O , , , , , + /+	%tT1	2+ + + ^+<$
 /$
 c]$
 	$

 *$
 $
 $
 $
L!,)!, Cj!, S#X	!, !, !, !,F
 /
	ucz	
 
 
 
% /%	T		"% % % %4- /- - - -^ " " ^"5A!)*:!;5ACKKCX5A 5A 5A 5Av '+	.A .A+.A .A sm	.A .A .A .A`  /4;4J   ^
 ? y    ^  /	%tT1	2   ^ ^
 Q_ Q Q Q Q  0/ / /     DcN      rr   r  c                   p    e Zd Zd Zdedej        fdZdedefdZ	e
dedeeddf         fd	            ZdS )
Replicac                    K   t           j        j                                        j        }|                     | j        j        |           | j        "t          j	                    | j
        z
  | _        d S d S )Nr+  )r   rN  rO  rP  r-  rC  r9  r  r>  r   r  )r   rS  s     rp   r  zReplica._on_initializede  sw      y(FFHHM** 7E 	+ 	
 	
 	
 '/+/9;;9X+XD((( 0/rr   metadatar
  c                 ,   t           j        j                            |j                  }|                                D ]}|                                 t          |j                  }|                                D ]}|                                 dS )z#Recursively cancels child requests.N)r   rN  rO   _get_requests_pending_assignmentinternal_request_idvaluescancelrY   )r   r  r
  requests_pending_assignmenttaskin_flight_requestsreplica_results          rp   r  zReplica._on_request_cancelledr  s    
 I>>,  	$
 06688 	 	DKKMMMM 5X5QRR07799 	$ 	$N!!####	$ 	$rr   r   c                     t           j        j                                        r%t           j        j                                         d S d S r   )r   utilpdb$_is_ray_debugger_post_mortem_enabled_post_mortemr  s      rp   r  zReplica._on_request_failed  s@    8<<<>> 	(HL%%'''''	( 	(rr   rc   Nc           
   #   V  K   t           j        j        j                            t           j        j                            |j        |j        |j        | j	        j
        |j        |j                             |                     |          5 }|V  ddd           dS # 1 swxY w Y   dS )zContext manager that wraps user method calls.

        1) Sets the request context var with appropriate metadata.
        2) Records the access log message (if not disabled).
        3) Records per-request metrics via the metrics manager.
        )r   r  _internal_request_idr3  multiplexed_model_idr  N)r   rN  rO  _serve_request_contextr   _RequestContextr   r  r  r   r3  r  r  r  )r   r   r  s      rp   r  zReplica._wrap_request  s       		044I--&,+6%5%I,5%5%J-: .  		
 		
 		
 ,,-=>> 	'BV&&&&	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	' 	's   BB"%B")r  r  r  r  r)   r   r  r  r   r  r   r   r  r  r  rr   rp   r  r  d  s        Y Y Y$'$,3,B$ $ $ $"(? (y ( ( ( ( ' /'	%tT1	2' ' ' ^' ' 'rr   r  c                      e Zd ZdZdededededededed	efd
Z	de
fdZdefdZdefdZdeee                  fdZ	 d!dededefdZd Zdeeef         fdZ	 d"ded	ee         defdZdedee         deeee         f         fdZdedeeef         fdZdede edf         fdZ!dede edf         fdZ"dedefdZ#d  Z$dS )#ReplicaActora?  Actor definition for replicas of Ray Serve deployments.

    This class defines the interface that the controller and deployment handles
    (i.e., from proxies and other replicas) use to interact with a replica.

    All interaction with the user-provided callable is done via the
    `UserCallableWrapper` class.
    rx   serialized_deployment_defserialized_init_argsserialized_init_kwargsdeployment_config_proto_bytesr!  r{   r"  c	           
      "  K   t          j        |          }	t          j        |          }
t	          |
t
                    rt          |
          }
t          ||
t          j        |          t          j        |          |	|||          | _        d S )N)rx   ro   r  r  r   r!  r{   r"  )	r-   from_proto_bytesr   loadsrf   r  rq   rD   _replica_impl)r   rx   r  r  r  r  r!  r{   r"  r   ro   s              rp   r   zReplicaActor.__init__  s       -=)
 
 %*+DEEnc** 	SB>RRN*=!)!'(<==#)*@AA/%	+
 	+
 	+
rr   rb  c                 8    |j                                          d S r   )pongr   )r   rb  s     rp   push_proxy_handlezReplicaActor.push_proxy_handle  s     	rr   rc   c                 4    | j                                         S )zFetch the number of ongoing requests at this replica (queue length).

        This runs on a separate thread (using a Ray concurrency group) so it will
        not be blocked by user code.
        )r  r   r   s    rp   r   z%ReplicaActor.get_num_ongoing_requests  s     !::<<<rr   c                   K   t          j                    t          j                                                    t          j                                                    t          j                                                    t          j                                        t          j        	                                t                      fS )a  poke the replica to check whether it's alive.

        When calling this method on an ActorHandle, it will complete as
        soon as the actor has started running. We use this mechanism to
        detect when a replica has been allocated a worker slot.
        At this time, the replica can transition from PENDING_ALLOCATION
        to PENDING_INITIALIZATION startup state.

        Returns:
            The PID, actor ID, node ID, node IP, and log filepath id of the replica.
        )osgetpidr   get_runtime_contextget_actor_idget_worker_idget_node_idr  get_node_ip_addressget_node_instance_idrN   r   s    rp   is_allocatedzReplicaActor.is_allocated  s       IKK#%%2244#%%3355#%%1133H((**H))++*,,
 	
rr   c                 4    | j                                         S r   )r  rR  r   s    rp   rR  z&ReplicaActor.list_outbound_deployments  s    !;;===rr   Nr   r-  c                 z   K   | j                             ||           d{V  | j                                         S )ab  Handles initializing the replica.

        Returns: 5-tuple containing
            1. DeploymentConfig of the replica
            2. DeploymentVersion of the replica
            3. Initialization duration in seconds
            4. Port
            5. FastAPI `docs_path`, if relevant (i.e. this is an ingress deployment integrated with FastAPI).
        N)r  r  rU  )r   r   r-  s      rp   initialize_and_get_metadataz(ReplicaActor.initialize_and_get_metadata  sI        ++,=tDDDDDDDDD!..000rr   c                 H   K   | j                                          d {V  d S r   )r  r  r   s    rp   r  zReplicaActor.check_health  s3       --///////////rr   c                 D   K   | j                                          d {V S r   )r  r  r   s    rp   r  z!ReplicaActor.record_routing_stats  s-      '<<>>>>>>>>>rr   c                 |   K   | j                             |||           d {V  | j                                         S r   )r  r  rU  )r   r   r-  r"  s       rp   r  zReplicaActor.reconfigure  sK        ,,->lSSSSSSSSS!..000rr   pickled_request_metadatar  c                     t          j        |          }|j        s|j        rt          j        |d                   f}||fS )Nr   )r  r  r  r  )r   r)  r  r   s       rp   _preprocess_request_argsz%ReplicaActor._preprocess_request_args
  sL    
 "<(@AA+ 	</?/O 	<"La99;L--rr   c                    K   |                      ||          \  }} | j        j        |g|R i | d{V }|j        r|j        |                                f}|S )z$Entrypoint for `stream=False` calls.N)r+  r  r  r  r  SerializeToStringr   r)  r  r  r   r  s         rp   r  zReplicaActor.handle_request  s       *.)F)F$l*
 *
&, 9t)8
+
 
 
/=
 
 
 
 
 
 
 
 + 	Q&3V5M5M5O5OPFrr   c                   K   |                      ||          \  }} | j        j        |g|R i |2 3 d{V }|j        r|j        |                                f}|W V  /6 dS r  )r+  r  r  r  r  r-  r.  s         rp   r  z%ReplicaActor.handle_request_streaming'  s       *.)F)F$l*
 *
&, HD.G
+
 
 
/=
 
 	 	 	 	 	 	 	&  / U*79Q9Q9S9STLLLLL
 
 
s   A!c                "  K   |                      ||          \  }} | j        j        |g|R i |2 3 d{V }t          |t                    rt          j        |          W V  4|j        r|j        |	                                f}|W V  \6 dS )aK  Entrypoint for all requests with strict max_ongoing_requests enforcement.

        The first response from this generator is always a system message indicating
        if the request was accepted (the replica has capacity for the request) or
        rejected (the replica is already at max_ongoing_requests).

        For non-streaming requests, there will only be one more message, the unary
        result of the user request handler.

        For streaming requests, the subsequent messages will be the results of the
        user request handler (which must be a generator).
        N)
r+  r  r  rf   r(   r  r  r  r  r-  r.  s         rp   r  z*ReplicaActor.handle_request_with_rejection9  s      $ *.)F)F$l*
 *
&, MD.L
+
 
 
/=
 
 		 		 		 		 		 		 		& &"899 l6*******#3 Y.;V=U=U=W=WXF
 
 
s   Bproto_request_metadatac                    K   ddl m} |                    |          }t          |j        |j        |j        |j        |j                  } | j        j	        |g|R i | d {V S )Nr   )r)   )r  r  r  r  r   )
ray.serve.generated.serve_pb2r)   
FromStringr  r  r  r  r   r  r  )r   r1  r  r  RequestMetadataProtoprotor   s          rp   handle_request_from_javaz%ReplicaActor.handle_request_from_javaY  s      	
 	
 	
 	
 	
 	
 %//0FGG,;' % 9)!&!;+-
 -
 -
 7T'6
+
 
 
/=
 
 
 
 
 
 
 
 	
rr   c                 H   K   | j                                          d {V  d S r   )r  r  r   s    rp   r  z&ReplicaActor.perform_graceful_shutdowno  s3       ::<<<<<<<<<<<rr   )NNr   )%r  r  r  r  r&   r  rW   r  r  r   r    r  r   r   r"  r   r   r%   rR  r-   ra   r  r%  r  r   r   r  r  r   r)   r+  r  r   r  r  r7  r  r  rr   rp   r  r    s        

 $)
 $	

 !&
 (-
 #
 
 
 
 
 
8    
=# = = = =
C 
 
 
 
.>8D4F+G > > > > OS1 1!11@K1	1 1 1 1"0 0 0?DcN ? ? ? ? SW1 1'21BJ3-1	1 1 1 1	."'	. Cj	. 
c
*	+		. 	. 	. 	."'
 
ucz	   $"'
 
T		"   $"'
 
T		"   @
 %

 

 
 
 
,= = = = =rr   r  c                   j    e Zd ZU dZeed<   eed<   eed<   eed<   eed<   edededd fd	            Z	d
S )UserMethodInfoz4Wrapper for a user method and its relevant metadata.callabler1  is_asgi_apptakes_any_argsr  crc   c                    t          j        |          j        } | ||j        |t	          |          dk    t
          |v           S )Nr   )r;  r1  r<  r=  r  )inspect	signature
parametersr  r  r.   )clsr>  r<  paramss       rp   from_callablezUserMethodInfo.from_callable}  sN    "1%%0s#v;;?%:f%D
 
 
 	
rr   N)
r  r  r  r  r   __annotations__r  r  classmethodrE  r  rr   rp   r:  r:  s  s         >>
III""""
h 
 
AQ 
 
 
 [
 
 
rr   r:  c                   |   e Zd ZdZeefZdedede	de
dededed	efd
Zedej        fd            ZdedefdZedefd            ZdedefdZdedefdZdddddddedeee                  dee	eef                  dedee         dee         deeef         fdZedee         fd             ZdBd!Zedee         fd"            Z defd#Z!dee"j#        j$                 fd$Z%dee"j#        j$                 fd%Z&dee"j#        j$                 fd&Z'ed'             Z(ede	eef         fd(            Z)ede	ee*ee+f         f         fd)            Z,ed*ee         d+e-fd,            Z.ded-eded.ed/edee         dee         defd0Z/d1e0d2e1d3e2d4e3def
d5Z4ed-ed3e2d4e3d6e5def
d7            Z6d1e0d8ee         d9e	eef         de7edf         fd:Z8edd;d1e0d8ee         d9e	eef         d<ee         dee7edf                  f
d=            Z9ed1e0d8ee         d9e	eef         defd>            Z:d?e;fd@Z<edA             Z=dS )Cr8  zLWraps a user-provided callable that is used to handle requests to a replica.ro   r  r  r   r%  r&  r'  r   c                    t          j        |          s4t          j        |          s t          dt	          |           d          | _        | _        | _        t          j        |           _        | _	        | _
        d _        | _        | _        d _        i  _        t                               t$          j                   _        d  _        | _         j        rSt/          j                     _         fd}	t5          j        d|	           _         j                                         d S t/          j                     _        d S )NzBdeployment_def must be a function or class. Instead, its type was r  Fc                  l    t          j         j                    j                                         d S r   )r   set_event_loop_user_code_event_looprun_foreverr   s   rp   _run_user_code_event_loopz?UserCallableWrapper.__init__.<locals>._run_user_code_event_loop  s4     &t'ABBB*6688888rr   T)daemontarget)r@  
isfunctionisclass	TypeErrorr  _deployment_defr[  r\  _is_functionr   _local_testing_mode_destructor_called_run_sync_methods_in_threadpool!_run_user_code_in_separate_thread _warned_about_sync_method_change_cached_user_method_inforj   isEnabledForloggingDEBUG_is_enabled_for_debugr  r/  r   new_event_looprL  	threadingThread_user_code_event_loop_threadr   get_running_loop)
r   ro   r  r  r   r%  r&  r'  r   rN  s
   `         rp   r   zUserCallableWrapper.__init__  sv    ">22 	gon6U6U 	+''+ + +  
  .#'#.~>>+#5 "'/M,1Q.05-CE%%+%8%8%G%G""31 	D &(( &9 9 9 9 9 1:0@01 1 1D- -3355555)0)A)C)CD&&&rr   rc   c                     | j         S r   )rL  r   s    rp   ry   zUserCallableWrapper.event_loop  r   rr   r  c                      t          j                   s
J d            t          j                   dt          f fd            }|S )a  Decorator to run a coroutine method on the user code event loop.

        The method will be modified to be a sync function that returns a
        `asyncio.Future` if user code is running in a separate event loop.
        Otherwise, it will return the coroutine directly.
        z7_run_user_code can only be used on coroutine functions.rc   c                      | g|R i |}| j         r7t          j        || j                  }| j        r|S t          j        |          S |S r   )rY  r   run_coroutine_threadsaferL  rV  wrap_future)r   argskwargscorofutr  s        rp   wrapperz3UserCallableWrapper._run_user_code.<locals>.wrapper  sh    1T+D+++F++D5 6tT=WXX+ J*3///rr   )r@  iscoroutinefunction	functoolswrapsr   )r  rn  s   ` rp   _run_user_codez"UserCallableWrapper._run_user_code  s{     *
 
 	E 	ED	E 	E 
 
				c 		 		 		 		 		 
			 rr   r  c                 :   K   |t          j                    _        d S r   )r   current_default_thread_limitertotal_tokens)r   r  s     rp   r  z4UserCallableWrapper.set_sync_method_threadpool_limit  s!       CH	022???rr   method_namec                     | j         v r j         |         S  j        r j        }nut           j        |          rt	           j        |          }nJ fd}t          t          |t           j                                      }t          d| d| d          t          
                    |t           j        t                              }| j         |<   |S )zGet UserMethodInfo for the provided call method name.

        This method is cached to avoid repeated expensive calls to `inspect.signature`.
        c                 ~    |                      d          rdS t          t          j        |                     sdS dS )N__FT)
startswithr;  re   r  )attrr   s    rp   callable_method_filterzHUserCallableWrapper.get_user_method_info.<locals>.callable_method_filter  sB    ??4(( ! 5!'$.$"?"?@@ ! 5trr   zTried to call a method 'z*' that does not exist. Available methods: r  )r<  )r[  rU  r  rQ  re   r^  filterdirr]   r:  rE  rf   rF   )r   rv  user_methodr|  methodsr  s   `     rp   r  z(UserCallableWrapper.get_user_method_info  s   
 $7770== 	.KKT^[11 	!$.+>>KK     6"8#dn:M:MNNOOG#;       ++"4>3HII , 
 
 6:%k2rr   r  	asgi_argsc                    K   |                                 \  }}}t          |t          j        j                  r ||||           d{V  dS t	          |                              |||           d{V  dS )a  Handle the result from user code and send it over the ASGI interface.

        If the result is already a Response type, it is sent directly. Otherwise, it
        is converted to a custom Response type that handles serialization for
        common Python objects.
        N)to_args_tuplerf   	starlette	responsesrJ   send)r   r  r  r  r  r  s         rp   _send_user_result_over_asgiz/UserCallableWrapper._send_user_result_over_asgi  s        )6688wfi1:;; 	>&...........6""''w===========rr   NF)rj  rk  r  generator_result_callback'run_sync_methods_in_threadpool_overrider;  rj  rk  r  r  r  c                  K   d}nt                      nt                      || j        n|}t          j                  pt          j                  o(t          j                  pt          j                   }	|	rX|rVt          j                  rd}|st          dj
         d          fd}
t          j        |
           d{V }nb|	r<| j        s5|3d| _        t          j        t!          j        j
                              i }t          j        |          r| d{V }||fS )a7  Call the callable with the provided arguments.

        This is a convenience wrapper that will work for `def`, `async def`,
        generator, and async generator functions.

        Returns the result and a boolean indicating if the result was a sync generator
        that has already been consumed.
        FNTMethod 'f' returned a generator. You must use `handle.options(stream=True)` to call generators on a deployment.c                  @     i } r| D ]} |           d } | S r   r  )r  rrj  r;  r  is_generatorrk  s     rp   run_callablez;UserCallableWrapper._call_func_or_gen.<locals>.run_callableL  sL    !42622 "# 5 511!4444!Frr   )rv  )tupler   rX  r@  rQ  ismethodro  isasyncgenfunctionisgeneratorfunctionrS  r  r   run_syncrZ  warningswarnr6   formatiscoroutine)r   r;  rj  rk  r  r  r  sync_gen_consumedrun_sync_in_threadpoolis_sync_methodr  r  r  s    ``` `      @rp   _call_func_or_genz%UserCallableWrapper._call_func_or_gen  s     $ "'ttUWW!-466 7> 008 	 x((FG,<X,F,F
 '11 4)(33
 	  +	&4 +	&"6x@@L 	$(!#  $68#4 6 6 6           %-l;;;;;;;;FF 
=
 <C8<5<C$,$5     Xt.v..F"6** &%(((rr   c                     | j         S r   )r  r   s    rp   r  z!UserCallableWrapper.user_callablen  s
    ~rr   c                     K      j         j        }dt          dt          f fd} j        D ]}|                    ||            j                                          d {V  d S )Nr$  excc                 .                         |          S r   )handle_exception)r$  r  r   s     rp   r  zGUserCallableWrapper._initialize_asgi_callable.<locals>.handle_exception{  s    ((---rr   )r  appr   r   service_unavailable_exceptionsadd_exception_handler_run_asgi_lifespan_startup)r   r  r  r  s   `   rp   _initialize_asgi_callablez-UserCallableWrapper._initialize_asgi_callabler  s      -+	. 	.i 	. 	. 	. 	. 	. 	. 6 	= 	=C%%c+;<<<<n7799999999999rr   c                   K   | j         t          d          t                              dddi           | j        r| j        | _         n| j                            | j                  | _         |                     | j         j        | j	        | j
        d           d{V  t          | j         t                    r|                                  d{V  t          | j         t                    rC| j                             | j        j                   t$          j                            d           t+          | j         t,          d          | _        t+          | j         t0          d          | _        t+          | j         d	d          | _        t                              d
ddi           t          | j         t                    r| j         j        ndS )zInitialize the user callable.

        If the callable is an ASGI app wrapper (e.g., using @serve.ingress), returns
        the ASGI app object, which may be used *read only* by the caller.
        Nz/initialize_callable should only be called once.zStarted initializing replica.r  Fr  )rj  rk  r  1record_autoscaling_statszFinished initializing replica.)r  r  rj   r  rU  rT  __new__r  r   r[  r\  rf   rF   r  rQ   r  r/  r)  rS   )NUM_REPLICAS_USING_ASYNCHRONOUS_INFERENCErecordre   r/   _user_health_checkr:   _user_record_routing_statsr  r  r   s    rp   r  z'UserCallableWrapper.initialize_callable  s      >%PQQQ 	+"E* 	 	
 	
 	

  	T!1DNN "199$:NOODN(('_(8= )          $.*?@@ 744666666666$.*=>> T22+@   GNNsSSS")$.:Mt"T"T*1N8$+
 +
' (/N6(
 (
$ 	,"E* 	 	
 	
 	
 $.*?@@DN	
rr   c                 :    | j         t          d| d          d S )Nz-`initialize_callable` must be called before `z`.)r  r  )r   rv  s     rp   _raise_if_not_initializedz-UserCallableWrapper._raise_if_not_initialized  s2    >!OOOO   "!rr   c                 f    |                      d           | j        |                                 S d S )Nr  )r  r  _call_user_health_checkr   s    rp   r  z*UserCallableWrapper.call_user_health_check  s9    &&'?@@@ ".//111trr   c                 f    |                      d           | j        |                                 S d S )Nr  )r  r  _call_user_record_routing_statsr   s    rp   r  z2UserCallableWrapper.call_user_record_routing_stats  s7    &&'GHHH*677999trr   c                 f    |                      d           | j        |                                 S d S )Nr  )r  r  _call_user_autoscaling_statsr   s    rp   r  z1UserCallableWrapper.call_record_autoscaling_stats  s7    &&'FGGG'344666trr   c                 J   K   |                      | j                   d {V  d S r   )r  r  r   s    rp   r  z+UserCallableWrapper._call_user_health_check  s5      $$T%<===========rr   c                 P   K   |                      | j                   d {V \  }}|S r   )r  r  r   r  r$  s      rp   r  z3UserCallableWrapper._call_user_record_routing_stats  s7      001PQQQQQQQQ	rr   c                 P   K   |                      | j                   d {V \  }}|S r   )r  r  r  s      rp   r  z0UserCallableWrapper._call_user_autoscaling_stats  s7      001MNNNNNNNN	rr   r  r-  c                   K   |                      d           d}| j        sQt          | j        t                    r7t          | j        t                    }t          j        |          j        }d|v }||r| j        rt          d          t          | j        t                    s%t          d| j        z   dz   t          z   dz             i }|r||d<   |                     t          | j        t                    |f|           d {V  d S d S )	Nr  Fr-  z9deployment_def must be a class to use user_config or rankz-user_config or rank specified but deployment z	 missing z method)rj  rk  )r  rU  rQ  r  r8   re   r@  rA  rB  
ValueErrorr]   r   r  )r   r  r-  user_subscribed_to_rankreconfigure_methodrD  rk  s          rp   r  z$UserCallableWrapper.call_reconfigure  sp     &&'9::: #(  	7WT^=O%P%P 	7!(9K!L!L&'9::EF&,&6#"&="   O   T^-?@@ 'C)*!" ))  	    F& &!%v(((:;;!^ )           # #"rr   user_method_infor  r  c                  K   t          j        |          }t          j        |          }	|ro|r|D ]}
 ||
           n|	r|2 3 d{V }
 ||
           6 nk|r$|j        s|                     ||           d{V  nE|s|st          d|j         d          n(|r
J d            |s|	rt          d|j         d          |S )ai  Postprocess the result of a user method.

        User methods can be regular unary functions or return a sync or async generator.
        This method will raise an exception if the result is not of the expected type
        (e.g., non-generator for streaming requests or generator for unary requests).

        Generator outputs will be written to the `generator_result_callback`.

        Note that HTTP requests are an exception: they are *always* streaming requests,
        but for ASGI apps (like FastAPI), the actual method will be a regular function
        implementing the ASGI `__call__` protocol.
        NCalled method 'G' with `handle.options(stream=True)` but it did not return a generator.z4All HTTP requests go through the streaming codepath.r  r  )r@  isgenerator
isasyncgenr<  r  rS  r1  )r   r  r  r  r  r  r  r  result_is_genresult_is_async_genr  s              rp   _handle_user_method_resultz.UserCallableWrapper._handle_user_method_result  s     .  +F33%088  	  1 1A--a00001$ % 1 1 1 1 1 1 1!--a0000  &v  )9)E  66vyIIIIIIIIII$ 
-> 
  !&6&; ! ! !   $F FEF F#   3 2/4 2 2 2   s   Ar   r  r  r  c                  K   t                      |                     |j                  }| j        r:t	          j                    dt          ffd}|                     ||||          }n7dt          ffd}t	          j        |                     ||||                    }d}	                    |          2 3 d {V }	|s4|	d         }
d}|
d         dk    r |t          |
d	                              |	W V  C6 d S )
Nitemc                 B   K                        j        |            d S r   call_soon_threadsafe
put_nowaitr  result_queuesystem_event_loops    rp   enqueuez9UserCallableWrapper.call_http_entrypoint.<locals>.enqueueh  s'      !66|7NPTUUUUUrr   c                 6   K                        |            d S r   )r  )r  r  s    rp   r  z9UserCallableWrapper.call_http_entrypoint.<locals>.enqueuep  s!      ''-----rr   Fr   Tr  zhttp.response.startr  )rI   r  r  rY  r   rd  r   _call_http_entrypointr   fetch_messages_from_queuer  )r   r   r  r  r  r  r  call_futurefirst_message_peekedmessagesmsgr  r  s              @@rp   r  z(UserCallableWrapper.call_http_entrypointY  s      $~~445E5QRR1 	 !( 8 : :VC V V V V V V V 44 %' KK
.C . . . . . . "-**+;UGWUU K  %*DD[QQ 	 	 	 	 	 	 	(
 ( =qk'+$v;"777 )(S]););<<<NNNNN RQQs   <C?r  c                 8  K   |                      d           | j        r(t                              d|j         dddd           |j        r|||f}n8|j        st                      }n"t          j	        
                    |||          f}d}	 t          |d	          r&t          j        |                                          }|                     |j        |i d|
           d{V \  }}|                     ||dd||t%          |||                     d{V }	|(|                                s|                                 |	S # t*          $ rw}
|j        s@|                     |
          }|                     |t%          |||                     d{V  |(|                                s|                                  d}
~
wt          j        $ rA |=|                                s)t          |j        d          s|                                  w xY w)zCall an HTTP entrypoint.

        `send` is used to communicate the results of streaming responses.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r  %Started executing request to method ''.FTr  ro  r  Nfetch_until_disconnect)rj  rk  r  r  )r  r  r  r  r  set_max_batch_size)r  r_  rj   r   r1  r<  r=  r  r  requestsr   rQ  r   r   r  r  r;  r  rG   doner  r   r  r  r  )r   r  r  r  r  r  receive_taskr  r  final_resultr
  responses               rp   r  z)UserCallableWrapper._call_http_entrypoint  s      	&&'>???% 	LLQ8H8MQQQ(-4HH    
 ' 	O!7D1LL!0 	O !77LL &.66ugtLLNL.	w 899 U&273Q3Q3S3STT.2.D.D )!!*. /E / / ) ) ) ) ) )%F% "&!@!@ ! $"3*."5'488 "A " "      L '0A0A0C0C'##%%% 
	 
	 
	#/ 003366hugt<<         '0A0A0C0C'##%%%% 
	 
	 
	'0A0A0C0C' /8:NOO * '')))
	s   B:E
 

HA2GAHr  r  c                Z  
K   | j         s/|                     |||           d{V }|2 3 d{V }|W V  6 dS t                      
t          j                    dt
          f
fd}|                     ||||          }
                    |          2 3 d{V }|D ]}	|	W V  6 dS )zCalls a user method for a streaming call and yields its results.

        The user method is called in an asyncio `Task` and places its results on a
        `result_queue`. This method pulls and yields from the `result_queue`.
        Nr  c                 >                         j        |            d S r   r  r  s    rp   _enqueue_thread_safezEUserCallableWrapper.call_user_generator.<locals>._enqueue_thread_safe  s#    !66|7NPTUUUUUrr   r  )rY  _call_user_generatorrI   r   rd  r   r  )r   r   r  r  genr  r  r  r  r  r  r  s             @@rp   r  z'UserCallableWrapper.call_user_generator  sj      5 	11 ,       C !$       f !$ (>>L !( 8 : :V3 V V V V V V V 33 ,	 4  K #/"H"H"U"U       h#  CIIIII #V"U"Us
   8B*r  r  c                  	
K   |                      d           nt                      nt                      |                     |j                  

j        	t          j        	          pt          j        	          o(t          j	        	          pt          j
        	           }| j        r(t                              d
j         dddd           d	t          t           df         f	
fd
	
fd}r%|r#| j        rt%          j        |           d{V  dS rfd} |             d{V  dS              S )zCall a user generator.

        The `generator_result_callback` is used to communicate the results of generator
        methods.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r  Nr  r  FTr  r  rc   c                   K    i } t          j        |           r|  d {V } t          j        |           r| D ]}|W V  d S t          j        |           r| 2 3 d {V }|W V  6 d S t	          dj         d          Nr  r  )r@  r  r  r  rS  r1  )r  r  r;  r  r  r  s     rp   _call_generator_asynczGUserCallableWrapper._call_user_generator.<locals>._call_generator_async%  s      (L;N;;C"3''  iiiiii"3'' 
! ! !F LLLLL! !#C(( $' ! ! ! ! ! ! !& LLLLL %(CC  W&6&; W W W  s   A+c                       i } t          j        |           r| D ]} |           d S t          dj         d          r  )r@  r  rS  r1  )r  r  r;  r  r  r  r  s     rp   _call_generator_synczFUserCallableWrapper._call_user_generator.<locals>._call_generator_sync6  s    (L;N;;C"3'' ! $ $FGFOOOO$ $  W&6&; W W W  rr   c                  H   K                2 3 d {V }  |            6 d S r   r  )r  r  r  s    rp   gen_coro_wrapperzBUserCallableWrapper._call_user_generator.<locals>.gen_coro_wrapperE  sR      $9$9$;$; $ $ $ $ $ $ $&GFOOOO %<$;$;s   !)r  r  r   r  r  r;  r@  rQ  r  ro  r  r_  rj   r   r1  r   r   rX  r   r  )r   r   r  r  r  r  r  r  r  r;  r  s     ```   @@@rp   r  z(UserCallableWrapper._call_user_generator   s	     " 	&&'=>>>'3'?||UWW+9+E466445E5QRR#,x((FG,<X,F,F
 '11 4)(33
 	 % 	LLQ8H8MQQQ(-4HH    
	^CI-F 	 	 	 	 	 	 	 	 	"		 		 		 		 		 		 		 		 		  
	+~ 
	+$*N 
	+$%9::::::::::: 	+$ $ $ $ $ $ #"$$$$$$$$$$$((***rr   c                   K   |                      d           | j        r(t                              d|j         dddd           |                     |j                  }|                     |j        ||d           d	{V \  }}t          j	        |          st          j
        |          rt          d
|j         d          |S )zCall a (unary) user method.

        Raises any exception raised by the user code so it can be propagated as a
        `RayTaskError`.
        r  r  r  FTr  r  )rj  rk  r  Nr  r  )r  r_  rj   r   r  r  r  r;  r@  r  r  rS  r1  )r   r   r  r  r  r  r$  s          rp   r  z$UserCallableWrapper.call_user_methodM  s      	&&'9:::% 	LLX8H8TXXX(-4HH    
  445E5QRR00%!	 1 
 
 
 
 
 
 
 
	 v&& 	'*<V*D*D 	.+0 . . .  
 rr   r  c                     t          || j                  r&t          j                            |j        d          S t          j                            dd          S )Ni  )r~  zInternal Server Errori  )rf   r  r  r  rJ   message)r   r  s     rp   r  z$UserCallableWrapper.handle_exceptionp  sZ    c4>?? 	&///MMM&//'S 0   rr   c                   K   | j         t                              d           dS | j        rdS d| _        	 t	          | j         d          r'|                     | j         j        d           d{V  t	          | j         d          r/t          | j         d                                           d{V  dS dS # t          $ r(}t          
                    d|            Y d}~dS d}~ww xY w)	zExplicitly call the `__del__` method of the user callable.

        Calling this multiple times has no effect; only the first call will
        actually call the destructor.
        NzEThis replica has not yet started running user code. Skipping __del__.T__del__F)r  __serve_multiplex_wrapperz/Exception during graceful shutdown of replica: )r  rj   r   rW  rQ  r  r  re   r   r   r   )r   r
  s     rp   r  z#UserCallableWrapper.call_destructorx  s[      >!LL$   F " 	F"&	Tt~y11 ,,N*<A -          t~'BCC Vdn.IJJSSUUUUUUUUUUUV V  	T 	T 	TRqRRSSSSSSSSS	Ts   A>B9 9
C+C&&C+r  )>r  r  r  r  r[   r\   r  r   r   r   r%   r  r-   r   r  r   AbstractEventLoopry   rr  r   r  r  r:  r  r   rG   r  r   r  r  r  r   r  r  r  r  r  r  r  r  r  r  r   r  r  ra   r  r  r)   r  r   r   r  r   r  r   r  r  r  r   r  r  r  rr   rp   r8  r8    s$       VV&79S%T"6D 6D 6D 	6D $6D )-6D +/6D !6D ,6D 6D 6D 6Dp *G5 * * * X*( x    2 HC H H H ^H
" " " " " "H>> > > > >* &*+/"8<BFN) N) N)N) uSz"	N)
 c3h(N) N) $,H#5N) 2:$N) 
sDy	N) N) N) N)` x1    X: : : :" >
8G+< >
 >
 >
 ^>
@S    1C1J(K    9K9R0S    x
8J8Q/R     > > ^> tCH~    ^ DeCJ>O9O4P    ^ $(3- ${ $ $ $ ^$L;; );
 ; ;  ; $,H#5; H%; 
; ; ; ;z-)- 1- 	-
 - 
- - - -^ O(O O 	O
 O 
O O O ^Ob$)$ Cj$ S#X	$
 
T		"$ $ $ $L  '+J+ J+ J+)J+ CjJ+ S#X	J+ (#J+ 
.d+	,J+ J+ J+ ^J+X  )  Cj  S#X	 
 
      ^ DI      T  T ^ T  T  Trr   r8  )r   concurrent.futuresr  rp  r@  r]  r  r  ra  r   r  r  abcr   r   collectionsr   r   
contextlibr   r   dataclassesr	   	importlibr
   typingr   r   r   r   r   r   r   r   r   r   starlette.responsesr  anyior   fastapir   starlette.applicationsr   starlette.typesr   r   r   r   r   r   ray._common.filtersr   ray._common.utilsr   	ray.actorr   r    ray.dag.py_obj_scannerr!   ray.remote_functionr"   	ray.server#   ray.serve._private.commonr$   r%   r&   r'   r(   r)   r*   r+   r,   ray.serve._private.configr-   ray.serve._private.constantsr.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   ray.serve._private.default_implrD   rE   ray.serve._private.http_utilrF   rG   rH   rI   rJ    ray.serve._private.logging_utilsrK   rL   rM   rN    ray.serve._private.metrics_utilsrO   rP    ray.serve._private.task_consumerrQ   1ray.serve._private.thirdparty.get_asgi_route_namerR   ray.serve._private.usagerS   ray.serve._private.utilsrT   rU   rV   ray.serve._private.versionrW   ray.serve.configrX   ray.serve.contextrY   ray.serve.deploymentrZ   ray.serve.exceptionsr[   r\   r]   ray.serve.handler^   ray.serve.schemar_   r`   ra   	getLoggerrj   r  r   r  r  rq   rt   r  r  r  r  r:  r8  r  rr   rp   <module>r     sx              				            # # # # # # # # * * * * * * * * : : : : : : : : ! ! ! ! ! ! # # # # # #                                        , , , , , , 9 9 9 9 9 9 9 9 9 9 9 9 



       1 1 1 1 1 1 6 6 6 6 6 6 - - - - - - - - 0 0 0 0 0 0 . . . . . .      
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 7 6 6 6 6 6                                               0                                Q P P P P P P P @ @ @ @ @ @      3 2 2 2 2 2         
 9 8 8 8 8 8 . . . . . . 5 5 5 5 5 5 + + + + + +         
 . - - - - - E E E E E E E E E E		,	-	- UOSMSMT#YT, 	"s x    *T
 T
 T
 T
 T
 T
 T
 T
n
 seTk* i	 i	 i	 i	 i	# i	 i	 i	X9' 9' 9' 9' 9'k 9' 9' 9'xP= P= P= P= P= P= P= P=f 
 
 
 
 
 
 
 
*PT PT PT PT PT PT PT PT PT PTrr   