
    %`i$                     .   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlZd dlm	Z
 d dlmc mZ d dlmZ d dlmZmZmZmZ  ej        e          Z G d d          Z G d	 d
e          Z G d de          Z G d de          Z G d de          ZdS )    N)deque)ListTupleaio)get_or_create_event_loop)gcs_pb2gcs_service_pb2gcs_service_pb2_grpc
pubsub_pb2c                   n    e Zd ZddefdZed             Zd Zd Zd Z	e
dej        d	dfd
            ZdS )_SubscriberBaseN	worker_idc                     || _         t          t          d t          d          D                                 | _        d| _        d| _        d| _        d S )Nc              3   >   K   | ]}t          j        d           V  dS )   N)randomgetrandbits).0_s     k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/_private/gcs_pubsub.py	<genexpr>z+_SubscriberBase.__init__.<locals>.<genexpr>   s-      -W-Wf.@.C.C-W-W-W-W-W-W       r   r   )
_worker_idbytes	bytearrayrange_subscriber_id_last_batch_size_max_processed_sequence_id_publisher_id)selfr   s     r   __init__z_SubscriberBase.__init__   sW    # $I-W-WUSUYY-W-W-W$W$WXX !*+' r   c                     | j         S N)r    r#   s    r   last_batch_sizez_SubscriberBase.last_batch_size"   s    $$r   c                 v    t          j        |i           }t          j        | j        | j        |g          }|S )N)channel_typesubscribe_messagesubscriber_id	sender_idcommands)r   Commandr
    GcsSubscriberCommandBatchRequestr   r   )r#   channelcmdreqs       r   _subscribe_requestz"_SubscriberBase._subscribe_request&   sE     gLLL>-TWSX
 
 
 
r   c                 N    t          j        | j        | j        | j                  S )N)r-   max_processed_sequence_idpublisher_id)r
   GcsSubscriberPollRequestr   r!   r"   r'   s    r   _poll_requestz_SubscriberBase._poll_request-   s.    7-&*&E+
 
 
 	
r   c                     t          j        | j        | j        g           }|D ]0}|j                            t          j        |i                      1|S )Nr,   )r*   unsubscribe_message)r
   r1   r   r   r/   appendr   r0   )r#   channelsr4   r2   s       r   _unsubscribe_requestz$_SubscriberBase._unsubscribe_request4   sp    >-SU
 
 
   	 	GL"RPPP    
r   ereturnc                     |                                  t          j        j        k    rdS |                                  t          j        j        k    rdS dS )NTF)codegrpc
StatusCodeDEADLINE_EXCEEDEDUNAVAILABLE)r@   s    r   _should_terminate_pollingz)_SubscriberBase._should_terminate_polling>   sE     6688t8884 6688t2224ur   r&   )__name__
__module____qualname__r   r$   propertyr(   r5   r:   r?   staticmethodrD   RpcErrorrH    r   r   r   r      s        ! !% ! ! ! ! % % X%  
 
 
   T] t    \  r   r   c                   d     e Zd ZdZ	 	 	 ddededej        f fdZddZ	dd	Z
ddd
ZddZ xZS )_AioSubscribera#  Async io subscriber to GCS.

    Usage example common to Aio subscribers:
        subscriber = GcsAioXxxSubscriber(address="...")
        await subscriber.subscribe()
        while running:
            ...... = await subscriber.poll()
            ......
        await subscriber.close()
    Nr   addressr2   c                 B   t                                          |           |r#|
J d            t          j        |d          }n|
J d            t	          j        |          | _        || _        t                      | _	        t          j                    | _        d S )Nz,address and channel cannot both be specifiedTr   z,One of address and channel must be specified)superr$   	gcs_utilscreate_gcs_channelr   InternalPubSubGcsServiceStub_stub_channelr   _queueasyncioEvent_close)r#   pubsub_channel_typer   rR   r2   	__class__s        r   r$   z_AioSubscriber.__init__V   s     	### 	W??$R???27EEEGG&&(V&&&)FwOO
 ,ggmoor   rA   c                    K   | j                                         rdS |                     | j                  }| j                            |d           d{V  dS )zRegisters a subscription for the subscriber's channel type.

        Before the registration, published messages in the channel will not be
        saved for the subscriber.
        N   timeout)r]   is_setr5   rY   rX   GcsSubscriberCommandBatchr#   r4   s     r   	subscribez_AioSubscriber.subscriben   sk       ; 	F%%dm44j2232CCCCCCCCCCCr   c                 J   K   | j                             ||           d {V S )Nrb   )rX   GcsSubscriberPoll)r#   r4   rc   s      r   
_poll_callz_AioSubscriber._poll_cally   s3      Z11#w1GGGGGGGGGr   c                   K   t          | j                  dk    rn|                                 }t                                          |                     ||                    }t                                          | j                                                  }t          j        ||g|t          j	                   d {V \  }}|
                                }|                                s|                                 ||vs||v rd S 	 t          |                                j                  | _        |                                j        | j        k    rm| j        dk    r=t$                              d|                                j         d| j         d           |                                j        | _        d| _        |                                j        D ]V}|j        | j        k    rt$                              d|            0|j        | _        | j                            |           Wn3# t0          j        $ r!}	|                     |	          rY d }	~	d S  d }	~	ww xY wt          | j                  dk    ld S d S )	Nr   rb   )rc   return_when zreplied publisher_id zdifferent from z/, this should only happens during gcs failover.zIgnoring out of order message )lenrZ   r:   r   create_taskrj   r]   waitr[   FIRST_COMPLETEDpopdonecancelresultpub_messagesr    r8   r"   loggerdebugr!   sequence_idwarningr=   rD   rN   rH   )
r#   rc   r4   pollclosers   others
other_taskmsgr@   s
             r   _pollz_AioSubscriber._poll}   s     $+!##$$&&C+--99W55 D -..::4;;K;K;M;MNNE!(uwG<S" " "      LD&  J??$$ $!!###45D==(+DKKMM,F(G(G%;;==-1CCC)R//@DKKMM4N @ @.2.@@ @ @  
 *.)CD&67D3;;==5 , ,C$*III'M'M'MNNN 69oD3K&&s++++, =   11!44 FFFFFC $+!######s   D*H< <I,I'&I''I,c                    K   | j                                         rdS | j                                          |                     | j        g          }	 | j                            |d           d{V  n# t          $ r Y nw xY wd| _        dS )z2Closes the subscriber and its active subscription.N)r>      rb   )r]   rd   setr?   rY   rX   re   	Exceptionrf   s     r   r|   z_AioSubscriber.close   s       ; 	F''$-'AA	*66sA6FFFFFFFFFF 	 	 	D	


s   "A7 7
BBNNN)rA   Nr&   )rI   rJ   rK   __doc__r   straiogrpcChannelr$   rg   rj   r   r|   __classcell__r_   s   @r   rQ   rQ   J   s        	 	  #'& & & 	&
 & & & & & &0	D 	D 	D 	DH H H H% % % % %N       r   rQ   c                   r     e Zd Z	 	 	 d	dededej        f fdZd
deeef         fdZ	e
d             Z xZS )GcsAioResourceUsageSubscriberNr   rR   r2   c                 d    t                                          t          j        |||           d S r&   )rT   r$   r   RAY_NODE_RESOURCE_USAGE_CHANNELr#   r   rR   r2   r_   s       r   r$   z&GcsAioResourceUsageSubscriber.__init__   s7     	6	7G	
 	
 	
 	
 	
r   rA   c                 r   K   |                      |           d{V  |                     | j                  S )zPolls for new resource usage message.

        Returns:
            A tuple of string reporter ID and resource usage json string.
        rb   N)r   _pop_resource_usagerZ   )r#   rc   s     r   r{   z"GcsAioResourceUsageSubscriber.poll   sD       jjj)))))))))''444r   c                     t          |           dk    rdS |                                 }|j                                        |j        j        fS )Nr   )NN)rn   popleftkey_iddecodenode_resource_usage_messagejson)queuer   s     r   r   z1GcsAioResourceUsageSubscriber._pop_resource_usage   sB    u::??:mmooz  ""C$C$HHHr   r   r&   )rI   rJ   rK   r   r   rD   r   r$   r   r{   rM   r   r   r   s   @r   r   r      s           $	
 

 
 	
 
 
 
 
 
5 5%s
*; 5 5 5 5 I I \I I I I Ir   r   c                        e Zd Z	 	 	 d
dededej        f fdZed             Z		 dde
eeej        f                  fdZed	             Z xZS )GcsAioActorSubscriberNr   rR   r2   c                 d    t                                          t          j        |||           d S r&   )rT   r$   r   GCS_ACTOR_CHANNELr   s       r   r$   zGcsAioActorSubscriber.__init__   s,     	5y'7SSSSSr   c                 *    t          | j                  S r&   )rn   rZ   r'   s    r   
queue_sizez GcsAioActorSubscriber.queue_size   s    4;r   rA   c                 v   K   |                      |           d{V  |                     | j        |          S )z}Polls for new actor message.

        Returns:
            A list of tuples of binary actor ID and actor table data.
        rb   N
batch_size)r   _pop_actorsrZ   r#   r   rc   s      r   r{   zGcsAioActorSubscriber.poll   sI       jjj)))))))))
CCCr   c                    t          |           dk    rg S d}g }t          |           dk    rY||k     rS|                                 }|                    |j        |j        f           |dz  }t          |           dk    r||k     S|S Nr      )rn   r   r=   r   actor_messager   r   poppedmsgsr   s        r   r   z!GcsAioActorSubscriber._pop_actors   s    u::??I%jj1nn*!4!4--//CKKS%67888aKF %jj1nn*!4!4 r   r   r&   )rI   rJ   rK   r   r   rD   r   r$   rL   r   r   r   r	   ActorTableDatar{   rM   r   r   r   s   @r   r   r      s           $	T TT T 	T T T T T T     X  #'	D 	D	eE7112	3	D 	D 	D 	D 	 	 \	 	 	 	 	r   r   c                        e Zd Z	 	 	 d	dededej        f fdZ	 d
dee	ee
j        f                  fdZed             Z xZS )GcsAioNodeInfoSubscriberNr   rR   r2   c                 d    t                                          t          j        |||           d S r&   )rT   r$   r   GCS_NODE_INFO_CHANNELr   s       r   r$   z!GcsAioNodeInfoSubscriber.__init__   s,     	99gwWWWWWr   rA   c                 v   K   |                      |           d{V  |                     | j        |          S )zsPolls for new node info message.

        Returns:
            A list of tuples of (node_id, GcsNodeInfo).
        rb   Nr   )r   _pop_node_infosrZ   r   s      r   r{   zGcsAioNodeInfoSubscriber.poll   sI       jjj)))))))))##DKJ#GGGr   c                    t          |           dk    rg S d}g }t          |           dk    rY||k     rS|                                 }|                    |j        |j        f           |dz  }t          |           dk    r||k     S|S r   )rn   r   r=   r   node_info_messager   s        r   r   z(GcsAioNodeInfoSubscriber._pop_node_infos  s    u::??I%jj1nn*!4!4--//CKKS%:;<<<aKF %jj1nn*!4!4 r   r   r&   )rI   rJ   rK   r   r   rD   r   r$   r   r   r	   GcsNodeInfor{   rM   r   r   r   s   @r   r   r      s           $	X XX X 	X X X X X X #'	H 	H	eE7../	0	H 	H 	H 	H 	 	 \	 	 	 	 	r   r   )r[   loggingr   collectionsr   typingr   r   rD   r   r   ray._private.gcs_utils_privaterU   ray._common.utilsr   ray.core.generatedr	   r
   r   r   	getLoggerrI   rw   r   rQ   r   r   r   rO   r   r   <module>r      s                           * * * * * * * * * 6 6 6 6 6 6            
	8	$	$1 1 1 1 1 1 1 1hf f f f f_ f f fRI I I I IN I I I8" " " " "N " " "J    ~     r   