
    &`iN                     p   d dl Z d dlZd dlZd dlZd dlZd dl mZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZmZmZmZmZmZmZmZ d dlZd d	lmZ d d
lmZ d dlm Z m!Z"m#Z#m$Z$m%Z%m&Z' d dl(m)Z)  ej*        e          Z+ e,ej-        .                    dd                     e,ej-        .                    dd                    fZ/ G d de          Z0e G d d                      Z&eegdf         Z1ee2e0ee0e2f         f         Z3 G d de          Z4 G d d          Z5 G d d          Z6dS )    N)sleep)AbstractEventLoop)defaultdict)Mapping)	dataclass)Enumauto)AnyCallableDefaultDictDictOptionalSetTupleUnion)get_or_create_event_loop)SERVE_LOGGER_NAME)DeploymentTargetInfoEndpointInfoEndpointSetLongPollRequestLongPollResultUpdatedObject)metrics/LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_LOWER_BOUND30/LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_S_UPPER_BOUND60c                   d    e Zd Zd Z e            Z e            Z e            Z e            ZdS )LongPollNamespacec                 .    | j         j         d| j         S )N.)	__class____name__nameselfs    p/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/_private/long_poll.py__repr__zLongPollNamespace.__repr__+   s    .)77DI777    N)	r$   
__module____qualname__r)   r	   DEPLOYMENT_TARGETSROUTE_TABLEGLOBAL_LOGGING_CONFIGDEPLOYMENT_CONFIG r*   r(   r    r    *   sP        8 8 8 $&&K DFFr*   r    c                   $    e Zd ZU eed<   eed<   dS )r   object_snapshotsnapshot_idN)r$   r+   r,   r
   __annotations__intr1   r*   r(   r   r   4   s,          r*   r   c                   "    e Zd Z e            ZdS )LongPollStateN)r$   r+   r,   r	   TIME_OUTr1   r*   r(   r8   r8   C   s        tvvHHHr*   r8   c                       e Zd ZdZdeeef         deddfdZddZ	deeef         ddfdZ
deeef         ddfd	Zd
efdZd Zd Zdeeef         fdZdS )LongPollClientaI  The asynchronous long polling client.

    Args:
        host_actor: handle to actor embedding LongPollHost.
        key_listeners: a dictionary mapping keys to
          callbacks to be called on state update for the corresponding keys.
        call_in_event_loop: an asyncio event loop
          to post the callback into.
    key_listenerscall_in_event_loopreturnNc                     |J || _         || _        || _        d | j                                        D             | _        d| _        | j                            | j        t          j	                               d S )Nc                     i | ]}|d S r1   ).0keys     r(   
<dictcomp>z+LongPollClient.__init__.<locals>.<dictcomp>`   s,     1
 1
 1

  	1
 1
 1
r*   T)context)

host_actorr<   
event_loopkeyssnapshot_ids
is_runningcall_soon_threadsafe
_poll_nextcontextvarsContext)r'   rG   r<   r=   s       r(   __init__zLongPollClient.__init__R   s     "---$*,1
 1

 )..001
 1
 1
 
 	,,O[%8%:%: 	- 	
 	
 	
 	
 	
r*   c                     d| _         dS )z5Stop the long poll client after the next RPC returns.FN)rK   r&   s    r(   stopzLongPollClient.stopp   s    r*   c                 F    | j                             | j        |           dS )a  Add more key listeners to the client.
        The new listeners will only be included in the *next* long poll request;
        the current request will continue with the existing listeners.

        If a key is already in the client, the new listener will replace the old one,
        but the snapshot ID will be preserved, so the new listener will only be called
        on the *next* update to that key.
        N)rH   rL   _add_key_listenersr'   r<   s     r(   add_key_listenersz LongPollClient.add_key_listenerst   s%    " 	,,T-DmTTTTTr*   c                       j                              fd|                                D                         j                            |           dS )z|Inner method that actually adds the key listeners, to be called
        via call_soon_threadsafe for thread safety.
        c                 (    i | ]}|j         v|d S rA   )r<   )rC   rD   r'   s     r(   rE   z5LongPollClient._add_key_listeners.<locals>.<dictcomp>   s(    VVV4CU8U8US"8U8U8Ur*   N)rJ   updaterI   r<   rU   s   ` r(   rT   z!LongPollClient._add_key_listeners   sb     	  VVVV 2 2 4 4VVV	
 	
 	
 	!!-00000r*   
trigger_atc                 h    | xj         dz  c_         | j         |k    r|                                  dS dS )a^  Called after a single callback is completed.

        When the total number of callback completed equals to trigger_at,
        _poll_next() will be called. This is designed to make sure we only
        _poll_next() after all the state callbacks completed. This is a
        way to serialize the callback invocations between object versions.
           N)_callbacks_processed_countrM   )r'   rZ   s     r(   _on_callback_completedz%LongPollClient._on_callback_completed   sD     	''1,''*j88OO 98r*   c                       j         sdS d _         j        j                             j                   _         j                             fd           dS )z`Poll the update. The callback is expected to scheduler another
        _poll_next call.
        Nr   c                 .                         |           S N)_process_update)rY   r'   s    r(   <lambda>z+LongPollClient._poll_next.<locals>.<lambda>   s    t7K7KF7S7S r*   )rK   r]   rG   listen_for_changeremoterJ   _current_ref_on_completedr&   s   `r(   rM   zLongPollClient._poll_next   sa      	F*+' O=DDTEVWW''(S(S(S(STTTTTr*   c                     | j                                         r| j                             |           d S t                              d           d| _        d S )Nz9The event loop is closed, shutting down long poll client.F)rH   rK   rL   loggererror)r'   callbacks     r(   _schedule_to_event_loopz&LongPollClient._schedule_to_event_loop   sT     ?%%'' 	$O00:::::LLTUUU#DOOOr*   updatesc           	          t          t          j        j                  r#t                              d           d _        d S t          t                    r#t                              d           d _        d S t          t          j        j	                  r>t          
                    dj        z                                    j                   d S t          j        k    r6t                              d                                 j                   d S t                              d  dt!                                                     dd	di
           s                      j                                                   D ]D\  }}|j         j        |<    j        |         }||j        f fd	}                     |           Ed S )Nz8LongPollClient failed to connect to host. Shutting down.Fz0LongPollClient connection failed, shutting down.zLongPollHost errored
z+LongPollClient polling timed out. Retrying.zLongPollClient z received updates for keys: r"   log_to_stderr)extrac                 d     | |                                t                               d S )N)rZ   )r^   len)rk   argr'   rm   s     r(   chainedz/LongPollClient._process_update.<locals>.chained   s3    ++s7||+DDDDDr*   )
isinstanceray
exceptionsRayActorErrorri   debugrK   ConnectionErrorwarningRayTaskErrorrj   traceback_strrl   rM   r8   r9   listrI   itemsr4   rJ   r<   r3   )r'   rm   rD   rY   rk   rt   s   ``    r(   rb   zLongPollClient._process_update   s   g <>> 	 LLSTTT#DOFg// 	NNMNNN#DOFg ;== 	 LL1G4IIJJJ ((999Fm,,,LLFGGG((999F'd ' 'GLLNN##' ' '"E* 	 	
 	
 	

  	:((999"==?? 
	2 
	2KC%+%7Dc")#.H "*v/E E E E E E E E ((1111
	2 
	2r*   )r>   N)r$   r+   r,   __doc__r   KeyTypeUpdateStateCallabler   rP   rR   rV   rT   r6   r^   rM   rl   strr   rb   r1   r*   r(   r;   r;   G   s4        
 G%889
 .	

 

 
 
 
<       U!'+>">?U	U U U U&
1!'+>">?
1	
1 
1 
1 
1
 
 
 
 
	U 	U 	U$ $ $-2tC,>'? -2 -2 -2 -2 -2 -2r*   r;   c                   H   e Zd ZdZefdeeef         fdZddee	         fdZ
deeee	ef         f         fdZd	ee	ef         d
eeee	ef         f         fdZded
efdZdefdZded
e	fdZde	d
efdZde	ded
efdZdee	ef         d
efdZdee	ef         d
dfdZdS )LongPollHostaa  The server side object that manages long pulling requests.

    The desired use case is to embed this in an Ray actor. Client will be
    expected to call actor.listen_for_change.remote(...). On the host side,
    you can call host.notify_changed({key: object}) to update the state and
    potentially notify whoever is polling for these values.

    Internally, we use snapshot_ids for each object to identify client with
    outdated object and immediately return the result. If the client has the
    up-to-date version, then the listen_for_change call will only return when
    the object is updated.
    #listen_for_change_request_timeout_sc                     i | _         i | _        t          t                    | _        || _        t          j        ddd          | _        d S )N)serve_long_poll_host_transmission_counterz6The number of times the long poll host transmits data.)namespace_or_state)descriptiontag_keys)	rJ   object_snapshotsr   setnotifier_events$_listen_for_change_request_timeout_sr   Countertransmission_counter)r'   r   s     r(   rP   zLongPollHost.__init__   s^     1346ITJ
 J
 5X1$+O7P,%
 %
 %
!!!r*   NrD   c                     |t          | j        |                   S t          d | j                                        D                       S )zUsed for testing.Nc              3   4   K   | ]}t          |          V  d S ra   )rr   )rC   eventss     r(   	<genexpr>z8LongPollHost._get_num_notifier_events.<locals>.<genexpr>  s(      OOvs6{{OOOOOOr*   )rr   r   sumvaluesr'   rD   s     r(   _get_num_notifier_eventsz%LongPollHost._get_num_notifier_events  sI    ?t+C0111OO1E1L1L1N1NOOOOOOr*   timeout_or_datac                     t          |t                    r | j                            dddi           dS |}|                                D ]-}| j                            ddt          |          i           .dS )zHelper method that tracks the data sent by listen_for_change.

        Records number of times long poll host sends data in the
        ray_serve_long_poll_host_send_counter metric.
        r\   r   TIMEOUT)valuetagsN)ru   r8   r   incrI   r   )r'   r   datarD   s       r(   _count_sendzLongPollHost._count_send  s     o}55 	 %))3Y? *      #Dyy{{  )--#7S"B .     r*   keys_to_snapshot_idsr>   c                   K   |s.t          d           d{V  i }|                     |           |S i }|                                D ]H\  }}	 | j        |         }n# t          $ r Y  w xY w||k    rt          | j        |         |          ||<   It          |          dk    r|                     |           |S i }i }|                                D ]r}t          j
                    }| j        |                             |           t                                          |                                          }	|||	<   |||	<   st          j        |                                t          j        t#          j        | j                    d{V \  }
}|D ]U}	|	                                 	 ||	         }| j        ||	                                      |           F# t          $ r Y Rw xY wt          |
          dk    r+|                     t,          j                   t,          j        S i }|
D ]3}	||	         }t          | j        |         | j        |                   ||<   4|                     |           |S )zListen for changed objects.

        This method will return a dictionary of updated objects. It returns
        immediately if any of the snapshot_ids are outdated,
        otherwise it will block until there's an update.
        r\   Nr   )return_whentimeout)r   r   r   rJ   KeyErrorr   r   rr   rI   asyncioEventr   addr   create_taskwaitFIRST_COMPLETEDrandomuniformr   cancelremover8   r9   )r'   r   updated_objectsrD   client_snapshot_idexisting_idasync_task_to_eventsasync_task_to_watched_keyseventtaskdonenot_doneupdated_object_keys                r(   rd   zLongPollHost.listen_for_change%  s      $ 	#((NNNNNNN O_---"" ';'A'A'C'C 	 	#C#"/4     000'4)#.( ($ !##_---""  "%'"',,.. 
	3 
	3CMOOE  %))%000+--99%**,,GGD). &/2&t,,&|&++--/ND$MN 
  
  
 
 
 
 
 
 
h  	 	DKKMMM,T2$%?%EFMMeTTTT    
 t99>>]3444 )) O  %?%E"6C)*<=%&897 7 233 _---""s$   A
A*)A*-.G
G)(G)keys_to_snapshot_ids_bytesc                     K   t          j        |          } fd|j                                        D             }                     |           d{V }                     |          S )zListen for changed objects. only call by java proxy/router now.
        Args:
            keys_to_snapshot_ids_bytes (Dict[str, int]): the protobuf bytes of
              keys_to_snapshot_ids (Dict[str, int]).
        c                 B    i | ]\  }}                     |          |S r1   )_parse_xlang_key)rC   	xlang_keyr4   r'   s      r(   rE   z7LongPollHost.listen_for_change_java.<locals>.<dictcomp>  s=      
  
  
&	; !!),,k 
  
  
r*   N)r   
FromStringr   r   rd   _listen_result_to_proto_bytes)r'   r   request_protor   keys_to_updated_objectss   `    r(   listen_for_change_javaz#LongPollHost.listen_for_change_java|  s       (23MNN 
  
  
  
*7*L*R*R*T*T 
  
  
 )-(>(>?S(T(T"T"T"T"T"T"T112IJJJr*   r%   c                     |t           j        j        k    rt           j        S |t           j        j        k    rt           j        S |S ra   )r    r.   r%   r-   )r'   r%   s     r(   _parse_poll_namespacez"LongPollHost._parse_poll_namespace  s=    $0555$00&9>>>$77Kr*   r   c                    |t          d          |                    d          r|                    d          r|dd                             d          }t	          |          dk    r^|                     |d                                                   }t          |t                    r||d                                         fS n|                     |          S t          d		                    |                    )
Nz(func _parse_xlang_key: xlang_key is None()r\   rB   ,   r   z(can not parse key type from xlang_key {})

ValueError
startswithendswithsplitrr   r   stripru   r    format)r'   r   fields
enum_fields       r(   r   zLongPollHost._parse_xlang_key  s    GHHH$$ 	9););C)@)@ 	9qt_**3//F6{{a!77q	8I8IJJ
j*;<< 9%vay'8'888--i888CJJ9UUVVVr*   c                     t          |t                    rd|d         j        z   dz   |d         z   dz   S t          |t                    r|j        S |S )Nr   r   r   r\   r   )ru   tupler%   r    r   s     r(   _build_xlang_keyzLongPollHost._build_xlang_key  sW    c5!! 	Q$s*SV3c99.// 	8OJr*   r3   c                    |t           j        k    r@d |                                D             }t          |                                          S t          |t                    rO|d         t           j        k    r9d |j        D             }t          ||j
                                                  S t                              t          |                    S )Nc                 Z    i | ](\  }}t          |          t          |j                   )S ))route)r   EndpointInfoProtor   )rC   endpoint_tagendpoint_infos      r(   rE   z@LongPollHost._object_snapshot_to_proto_bytes.<locals>.<dictcomp>  sE       /L- L!!#4=;N#O#O#O  r*   )	endpointsr   c                 @    g | ]}|j                                         S r1   )
replica_idto_full_id_str)rC   replica_infos     r(   
<listcomp>z@LongPollHost._object_snapshot_to_proto_bytes.<locals>.<listcomp>  s7         '6688  r*   )replica_namesis_available)r    r.   r   r   SerializeToStringru   r   r-   running_replicasr   r   r   encode)r'   rD   r3   xlang_endpointsactor_name_lists        r(   _object_snapshot_to_proto_bytesz,LongPollHost._object_snapshot_to_proto_bytes  s     #///
 3B3H3H3J3J  O 999KKMMMU## 	4A2C2V(V(V $3$D  O (-,9    !!"
 ::c/22333r*   r   c                       fd|                                 D             }d|i}t          di |}|                                S )Nc           
          i | ]H\  }}                     |          t          |j                            ||j                             IS ))r4   r3   )r   UpdatedObjectProtor4   r   r3   )rC   rD   updated_objectr'   s      r(   rE   z>LongPollHost._listen_result_to_proto_bytes.<locals>.<dictcomp>  sp     )
 )
 )
 $^ !!#&&(:*6 $ D D7! !) ) ))
 )
 )
r*   r   r1   )r   r   r   )r'   r   xlang_keys_to_updated_objectsr   protos   `    r(   r   z*LongPollHost._listen_result_to_proto_bytes  sp    )
 )
 )
 )
 (?'D'D'F'F)
 )
 )
% <
 &&&&&&(((r*   rm   c                    |                                 D ]\  }}	 | j        |xx         dz  cc<   n-# t          $ r  t          j        dd          | j        |<   Y nw xY w|| j        |<   t                              d| d           | j        	                    |t                                D ]}|
                                 dS )zg
        Update the current snapshot of some objects
        and notify any long poll clients.
        r\   r   i@B z$LongPollHost: Notify change for key r"   N)r   rJ   r   r   randintr   ri   ry   r   popr   )r'   rm   
object_keyr   r   s        r(   notify_changedzLongPollHost.notify_changed  s    
 +2--// 	 	&JM!*---2---- M M M
 17q)0L0L!*---M 1?D!*-LLM
MMMNNN-11*ceeDD  			 	s   1'AAra   )r$   r+   r,   r   #LISTEN_FOR_CHANGE_REQUEST_TIMEOUT_Sr   r6   rP   r   r   r   r   r8   r   r   r   rd   bytesr   r   r   r   r   r
   r   r   r   r   r1   r*   r(   r   r      s        " 0	
 
-2H.

 
 
 
,P PHW,= P P P P$]D-9O4P%PQ   ,U#"7C<0U# 
}d7M#9::	;U# U# U# U#nK$)K 
K K K K"#    W# W' W W W WG     44-04	4 4 4 44)'+G],B'C)	) ) ) )$ggsl&;       r*   r   )7r   rN   loggingosr   r   asyncio.eventsr   collectionsr   collections.abcr   dataclassesr   enumr   r	   typingr
   r   r   r   r   r   r   r   rv   ray._common.utilsr   ray.serve._private.constantsr   ray.serve.generated.serve_pb2r   r   r   r   r   r   r   r   ray.utilr   	getLoggerri   floatenvirongetr   r    r   r   r   r8   r;   r   r1   r*   r(   <module>r     s         				        , , , , , , # # # # # # # # # # # # ! ! ! ! ! !         P P P P P P P P P P P P P P P P P P P P 



 6 6 6 6 6 6 : : : : : :                     		,	-	- 
E"*..JD
Q
QRR	E"*..JD
Q
QRR' #                ud{+ 
&.?.D(EE
F    D   Z2 Z2 Z2 Z2 Z2 Z2 Z2 Z2zD D D D D D D D D Dr*   