
    &`iY                     z   d Z ddlZddlZddlZddlZddlZddlmZ ddlm	Z	m
Z
mZmZmZmZ ddlZddlmc mc mZ ddlmc mc mZ ddlmZmZmZ ddlmZ e	rddlmZ  ej        e          Z eeej!        e"f         gdf         Z#dZ$d	ej%        fd
Z&d	ej%        fdZ' G d d          Z( G d d          Z)dS )zjThis file implements a threaded stream controller to abstract a data stream
back to the ray clientserver.
    N)OrderedDict)TYPE_CHECKINGAnyCallableDictOptionalUnion)	INT32_MAXOBJECT_TRANSFER_CHUNK_SIZEOBJECT_TRANSFER_WARNING_SIZE)log_once)Worker    reqc           	   #   *  K   | j         j        }t          |          }|dk    s
J d            |t          k    r3t	          d          r$|dz  }t          j        d|ddt                     t          j	        |t          z            }t          d|          D ]z}|t          z  }t          ||dz   t          z            }t          j        | j         j        |||         |||| j         j        	          }t          j        | j        |
          V  {dS )aS  
    Chunks a put request. Doing this lazily is important for large objects,
    since taking slices of bytes objects does a copy. This means if we
    immediately materialized every chunk of a large object and inserted them
    into the result_queue, we would effectively double the memory needed
    on the client to handle the put.
    r   %Cannot chunk object with missing dataclient_object_put_size_warning   @z#Ray Client is attempting to send a .2fa   GiB object over the network, which may be slow. Consider serializing the object and using a remote URI to transfer via S3 or Google Cloud Storage instead. Documentation for doing this can be found here: https://docs.ray.io/en/latest/handling-dependencies.html#remote-uris   )client_ref_iddatachunk_idtotal_chunks
total_sizeowner_id)req_idputN)r   r   lenr   r   warningswarnUserWarningmathceilr   rangeminray_client_pb2
PutRequestr   r   DataRequestr   )	r   request_datar   size_gbr   r   startendchunks	            n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/client/dataclient.py	chunk_putr0   "   sT      7<L\""J>>>B>>>111h(7 71 u$SDS S S 	
 	
 	
 9Z*DDEEL!\** G G55*x!|/IIJJ)'/eCi(%!W%
 
 
 (
FFFFFFFG G    c              #     K   | j         j        }t          |          }|dk    s
J d            t          j        |t
          z            }t          d|          D ]}|t
          z  }t          ||dz   t
          z            }t          j	        | j         j
        | j         j        | j         j        | j         j        | j         j        | j         j        | j         j        |||         ||
  
        }t          j        | j        |          V  dS )aY  
    Chunks a client task. Doing this lazily is important with large arguments,
    since taking slices of bytes objects does a copy. This means if we
    immediately materialized every chunk of a large argument and inserted them
    into the result_queue, we would effectively double the memory needed
    on the client to handle the task.
    r   r   r   )
typename
payload_id	client_idoptionsbaseline_options	namespacer   r   r   )r   taskN)r:   r   r   r#   r$   r   r%   r&   r'   
ClientTaskr3   r4   r5   r6   r7   r8   r9   r)   r   )r   r*   r   r   r   r,   r-   r.   s           r/   
chunk_taskr<   L   s      8=L\""J>>>B>>>9Z*DDEEL!\** H H55*x!|/IIJJ)x*h(H$ X6h(eCi(%
 
 
 (
GGGGGGGH Hr1   c                   V    e Zd ZdZdedej        fdZdeej	        e
f         defdZdS )	ChunkCollectora  
    This object collects chunks from async get requests via __call__, and
    calls the underlying callback when the object is fully received, or if an
    exception while retrieving the object occurs.

    This is not used in synchronous gets (synchronous gets interact with the
    raylet servicer directly, not through the datapath).

    __call__ returns true once the underlying call back has been called.
    callbackrequestc                 V    t                      | _        || _        d| _        || _        d S )N)	bytearrayr   r?   last_seen_chunkr@   )selfr?   r@   s      r/   __init__zChunkCollector.__init__y   s)    KK	 ! r1   responsereturnc                 X   t          |t                    r|                     |           dS |j        }|j        s|                     |           dS |j        t          k    r8t          d          r)|j        dz  }t          j	        d|ddt                     |j        }|j        }|| j        dz   k    r;| j                            |           || _        | j        dz   | j        j        _        n|| j        dz   k    rVd| d	| j        dz    d
|j         }t$                              |           |                     t)          |                     dS t$                              d| d|j         d           |j        |j        dz
  k    r|                     | j                   dS dS )NT#client_object_transfer_size_warningr   z'Ray Client is attempting to retrieve a r   zy GiB object over the network, which may be slow. Consider serializing the object to a file and using rsync or S3 instead.r   zReceived chunk z when we expected z for request zReceived a repeated chunk z from request .F)
isinstance	Exceptionr?   getvalidr   r   r   r    r!   r"   r   r   rD   extendr@   start_chunk_idr   loggerwarningRuntimeErrordebugr   )rE   rG   get_respr+   
chunk_datar   msgs          r/   __call__zChunkCollector.__call__   s   h	** 	MM(###4<~ 	MM(###4!===(1C
 C
= )E1GM-H- - -    ]
$t+a///IZ(((#+D  /3.BQ.FDL++,q000L( L L'!+L L:B/L L  NN3MM,s++,,,4 LL3X 3 3 (3 3 3  
  5 999MM$)$$$4 5r1   N)__name__
__module____qualname____doc__ResponseCallabler'   r)   rF   r	   DataResponserM   boolrY    r1   r/   r>   r>   m   st        	 	
!1 
N<V 
 
 
 
2~'BI'M!N 2SW 2 2 2 2 2 2r1   r>   c                   h   e Zd ZdddedefdZdefdZdej	        fdZ
d	 Zd&dZdedd
fdZdej        defdZd&dZdedd
fdZd&dZed             Zd&dZdej        dej        fdZ	 d'dej        dee         dd
fdZd Z 	 d'dej!        dej"        fdZ#	 d'dej$        dej%        fdZ&d'dej'        fdZ(	 d'dej)        dej*        fdZ+dej)        dedd
fd Z,	 d'dej-        dej.        fd!Z/	 d'dej0        dd
fd"Z1dej2        defd#Z3dej4        dej5        fd$Z6dej7        dej8        fd%Z9d
S )(
DataClientclient_workerr   r6   metadatac                    || _         || _        || _        |                                 | _        t                      | _        t          j                    | _	        t          j
        | j	                  | _        |                                 | _        i | _        i | _        d| _        d| _        d| _        d| _        | j                                         dS )a  Initializes a thread-safe datapath over a Ray Client gRPC channel.

        Args:
            client_worker: The Ray Client worker that manages this client
            client_id: the generated ID representing this client
            metadata: metadata to pass to gRPC requests
        )lockFr   N)rd   
_client_id	_metadata_start_datathreaddata_threadr   outstanding_requests	threadingLockrg   	Conditioncv_create_queuerequest_queue
ready_dataasyncio_waiting_data_in_shutdown_req_id_last_exception_acknowledge_counterr,   )rE   rd   r6   re   s       r/   rF   zDataClient.__init__   s     +#!1133 5@MM! N$$	 %49555!//11*, BD!!#$%!     r1   rH   c                     | j                                         sJ | xj        dz  c_        | j        t          k    rd| _        | j        dk    sJ | j        S )Nr   r   )rg   lockedrv   r
   rE   s    r/   _next_idzDataClient._next_id   s]    y!!!!!<)##DL |q    |r1   c                 <    t          j        | j        ddd          S )Nray_client_streaming_rpcra   T)targetr4   argsdaemon)rm   Thread
_data_mainr{   s    r/   rj   zDataClient._start_datathread   s*    ?+	
 
 
 	
r1   c              #      K   	 | j                                         }|d S |                    d          }|dk    rt          |          E d {V  n |dk    rt	          |          E d {V  n|V  o)NTr3   r   r:   )rr   rN   
WhichOneofr0   r<   )rE   r   req_types      r/   	_requestszDataClient._requests   s      	$((**C{~~f--H5  $S>>))))))))V##%c??********				r1   Nc                    d}	 | j         j        s=t          j        | j         j                  }| j        dt          |          fgz   }|                    |                                 |d          }	 |D ]}| 	                    |           	 t                              d           |                                  d S # t          j        $ rp}|                     |          }|s;|| _        Y d }~t                              d           |                                  d S |                                  Y d }~nd }~ww xY w| j         j        =n# t$          $ r}|| _        Y d }~nd }~ww xY wt                              d           |                                  d S # t                              d           |                                  w xY w)NFreconnectingT)re   wait_for_readyzShutting down data channel.)rd   ru   ray_client_pb2_grpcRayletDataStreamerStubchannelri   strDatapathr   _process_responserR   rU   	_shutdowngrpcRpcError_can_reconnectrw   _reconnect_channelrM   )rE   r   stubre   resp_streamrG   es          r/   r   zDataClient._data_main  s   	(5 .*A&.   >nc,>O>O-P,QQ"mmNN$$%#' ,  
	.$/ 9 9..x8888 LL6777NN } . . .#'#6#6q#9#9L' /0,
 LL6777NN ++--------. (5 .(  	% 	% 	%#$D      	% LL6777NN LL6777NNs`   A/E 4B? ?D>D9,E  D94E 9D>>E F 
E*E% F %E**F 0GrG   c                    |j         dk    rt                              d|            dS |j         | j        v rd}	 | j        |j                  }t	          |t
                    r ||          }n|r ||           |r| j        |j         = n*# t          $ r t                              d           Y nw xY w| j        5  |j         | j	        v r)|r'| j	        |j         = | 
                    |j                    ddd           dS # 1 swxY w Y   dS | j        5  || j        |j         <   | j                                         ddd           dS # 1 swxY w Y   dS )z;
        Process responses from the data servicer.
        r   zGot unawaited response NTzCallback error:)r   rR   rU   rt   rL   r>   rM   	exceptionrg   rl   _acknowledgers   rp   
notify_all)rE   rG   
can_remover?   s       r/   r   zDataClient._process_response   s    ?aLL=8==>>>F?d777J44X_Eh77 '!)(!3!3JJ 'HX&&& C 1(/B 4 4 4  !2333334 7 7?d&???J?1(/B%%ho6667 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7  % %3;0""$$$% % % % % % % % % % % % % % % % % %s6   AB $B32B3=8DD	D)EEEr   c                    | j                             |          s6t                              d           t                              |           dS t                              d           t                              |           dS )z
        Processes RPC errors that occur while reading from data stream.
        Returns True if the error can be recovered from, False otherwise.
        z$Unrecoverable error in data channel.Fz"Recoverable error in data channel.T)rd   r   rR   errorrU   )rE   r   s     r/   r   zDataClient._can_reconnectD  sk    
 !0033 	LL?@@@LLOOO59:::Qtr1   c                 D   | j         5  d| _        | j                                         | j                                        }i | _        ddd           n# 1 swxY w Y   | j        rt          d| j                   }nt          d          }|D ]}|r ||           dS )z+
        Shutdown the data channel
        TNzOFailed during this or a previous request. Exception that broke the connection: zERequest cannot be fulfilled because the data client has disconnected.)rg   ru   rp   r   rt   valuesrw   ConnectionError)rE   	callbackserrr?   s       r/   r   zDataClient._shutdownQ  s    Y 	+ 	+ $DG   188::I(*D%	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+  
	!@)-)=@ @ CC
 "  C " 	 	H 	 	s   AAAAr   c                 "   | j         j        sdS | j                                        sJ | xj        dz  c_        | j        t
          z  dk    rB| j                            t          j	        t          j
        |                               dS dS )z
        Puts an acknowledge request on the request queue periodically.
        Lock should be held before calling this. Used when an async or
        blocking response is received.
        Nr   r   )r   )acknowledge)rd   _reconnect_enabledrg   rz   rx   ACKNOWLEDGE_BATCH_SIZErr   r   r'   r)   AcknowledgeRequest)rE   r   s     r/   r   zDataClient._acknowledgem  s     !4 	Fy!!!!!!!Q&!!$'==BB""* . A P P P       CBr1   c                 F   	 | j                             d          }n# t          j        $ r d}Y nw xY w|szt                              d           	 | j                             d           n)# t          $ r t                              d            w xY wt                              d           | j	        5  | 
                                | _        | j                                        D ]}| j                            |           	 d	d	d	           d	S # 1 swxY w Y   d	S )
a  
        Attempts to reconnect the gRPC channel and resend outstanding
        requests. First, the server is pinged to see if the current channel
        still works. If the ping fails, then the current channel is closed
        and replaced with a new one.

        Once a working channel is available, a new request queue is made
        and filled with any outstanding requests to be resent to the server.
           )timeoutFzKEncountered connection issues in the data channel. Attempting to reconnect.T)r   z$Failed to reconnect the data channelzReconnection succeeded!N)rd   ping_serverr   r   rR   rS   _connect_channelr   rU   rg   rq   rr   rl   r   r   )rE   ping_succeededr@   s      r/   r   zDataClient._reconnect_channel  s   	# "/;;A;FFNN} 	# 	# 	#"NNN	#  	4NN+  "333FFFF"   EFFF LL2333 Y 	0 	0!%!3!3!5!5D4;;== 0 0"&&w////0	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0 	0s+    22A. .&B8ADDDc                  (    t          j                    S N)queueSimpleQueuera   r1   r/   rq   zDataClient._create_queue  s     """r1   c                    d }| j         5  d| _        | j                                         | j        Zt          j        t          j                              }| j                            |           | j                            d            | j	        | j	        }d d d            n# 1 swxY w Y   ||
                                 d S d S )NT)connection_cleanup)rg   ru   rp   r   rr   r'   r)   ConnectionCleanupRequestr   rk   join)rE   threadcleanup_requests      r/   closezDataClient.close  s   Y 	* 	* $DG   !- #1"<'5'N'P'P# # # "&&777"&&t,,,+)	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	* 	*  KKMMMMM s   BB&&B*-B*r   c                      j         5                                                                     |_         j                            |           | j        <    j                             fd                                              j	                 } j	        =  j        =  
                               d d d            n# 1 swxY w Y   |S )Nc                  $     j         v pj        S r   )rs   ru   )r   rE   s   r/   <lambda>z+DataClient._blocking_send.<locals>.<lambda>  s    Vt%>%S$BS r1   )rg   _check_shutdownr|   r   rr   r   rl   rp   wait_forrs   r   )rE   r   r   r   s   `  @r/   _blocking_sendzDataClient._blocking_send  s    Y 	& 	&  """]]__FCJ""3'''03D%f-GSSSSSTTT  """?6*D')&1f%%%	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& s   B8CCCr?   c                     | j         5  |                                  |                                 }||_        || j        |<   || j        |<   | j                            |           d d d            d S # 1 swxY w Y   d S r   )rg   r   r|   r   rt   rl   rr   r   )rE   r   r?   r   s       r/   _async_sendzDataClient._async_send  s    
 Y 	( 	(  """]]__FCJ08D%f-03D%f-""3'''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   AA33A7:A7c                 d   | j                                         sJ | j        sd S | j                                          t	          j                    j        | j        j        k    rd S ddlm	}  |             | j         
                                 | j        d| j         }nd}t          |          )Nr   )
disconnectzlRequest can't be sent because the Ray client has already been disconnected due to an error. Last exception: zKRequest can't be sent because the Ray client has already been disconnected.)rg   rz   ru   releaserm   current_threadidentrk   ray.utilr   acquirerw   r   )rE   r   rX   s      r/   r   zDataClient._check_shutdown  s    y!!!!!  	F	 #%%+t/?/EEEF''''''
	+*'* * C% 
 c"""r1   r@   c                 d    t          j        |          }|                     |          }|j        S )N)init)r'   r)   r   r   rE   r@   contextdatareqresps        r/   InitzDataClient.Init  s;     !,
 
 
 ""7++yr1   c                 d    t          j        |          }|                     |          }|j        S )N)prep_runtime_env)r'   r)   r   r   r   s        r/   PrepRuntimeEnvzDataClient.PrepRuntimeEnv
  s<     !,$
 
 
 ""7++$$r1   c                     t          j        t          j                              }|                     |          }|j        S )N)connection_info)r'   r)   ConnectionInfoRequestr   r   )rE   r   r   r   s       r/   ConnectionInfozDataClient.ConnectionInfo  sC     ,*@BB
 
 
 ""7++##r1   c                 d    t          j        |          }|                     |          }|j        S )NrN   )r'   r)   r   rN   r   s        r/   	GetObjectzDataClient.GetObject  :     !,
 
 
 ""7++xr1   c                     t          |j                  dk    rt          d|           t          j        |          }t          ||          }|                     ||           d S )Nr   z=RegisterGetCallback() must have exactly 1 Object ID. Actual: r   )r?   r@   )r   ids
ValueErrorr'   r)   r>   r   )rE   r@   r?   r   	collectors        r/   RegisterGetCallbackzDataClient.RegisterGetCallback#  s     w{q  %"% %   !,
 
 
 #HgFFF	),,,,,r1   c                 d    t          j        |          }|                     |          }|j        S )N)r   )r'   r)   r   r   r   s        r/   	PutObjectzDataClient.PutObject2  r   r1   c                 Z    t          j        |          }|                     |           d S )N)r   r'   r)   r   )rE   r@   r   r   s       r/   ReleaseObjectzDataClient.ReleaseObject;  s:     !,
 
 
 	!!!!!r1   c                 \    t          j        |          }|                     ||           d S )N)r:   r   )rE   r@   r?   r   s       r/   SchedulezDataClient.ScheduleC  s1     ,':::(+++++r1   c                 d    t          j        |          }|                     |          }|j        S )N)	terminate)r'   r)   r   r   rE   r@   r   r   s       r/   	TerminatezDataClient.TerminateG  s;     (
 
 
 ""3''~r1   c                 d    t          j        |          }|                     |          }|j        S )N)list_named_actors)r'   r)   r   r   r   s       r/   ListNamedActorszDataClient.ListNamedActorsP  s<     (%
 
 
 ""3''%%r1   )rH   Nr   ):rZ   r[   r\   r   listrF   intr|   rm   r   rj   r   r   r   r   r   r   r`   r   r   r   r   staticmethodrq   r   r'   r)   r_   r   r   r^   r   r   InitRequestInitResponser   PrepRuntimeEnvRequestPrepRuntimeEnvResponser   ConnectionInfoResponser   
GetRequestGetResponser   r   r(   PutResponser   ReleaseRequestr   r;   r   TerminateRequestTerminateResponser   ClientListNamedActorsRequestClientListNamedActorsResponser   ra   r1   r/   rc   rc      s       #!h #!3 #!$ #! #! #! #!L#    
9#3 
 
 
 
     :"%# "%$ "% "% "% "%H $       83 4    $$0 $0 $0 $0N # # \#   *!-		$   . 04( ('( +,( 
	( ( ( (# # #B <@ %1		$    FJ% %%;%		.% % % %$ $n.S $ $ $ $ ;? %0		#   -%0-<L-	- - - -  ;? %0		#    ?C" "%4"	" " " ", 9 ,EU , , , ,%6		)   &%B&		5& & & & & &r1   rc   )*r]   loggingr#   r   rm   r    collectionsr   typingr   r   r   r   r   r	   r   !ray.core.generated.ray_client_pb2core	generatedr'   &ray.core.generated.ray_client_pb2_grpcr   ray.util.client.commonr
   r   r   ray.util.debugr   ray.util.client.workerr   	getLoggerrZ   rR   r_   rM   r^   r   r)   r0   r<   r>   rc   ra   r1   r/   <module>r
     s#            # # # # # # F F F F F F F F F F F F F F F F  : : : : : : : : : : : : D D D D D D D D D D D D         
 $ # # # # # .------		8	$	$U>#>	#IJKTQR   'G>- 'G 'G 'G 'GTHN. H H H HBJ J J J J J J JZ]& ]& ]& ]& ]& ]& ]& ]& ]& ]&r1   