
    &`i!                         d dl Z d dlmZmZmZ d dlZd dlmZmZm	Z	m
Z
 d dlmZ dee         defdZ ed	           G d
 d                      Z ed	           G d d                      ZdS )    N)AnyListOptional)GetTimeoutErrorRayChannelErrorRayChannelTimeoutErrorRayTaskError)	PublicAPIreturn_valsreturn_single_outputc                     t          | t                    r| | D ]+}t          |t                    r|                                ,|rt	          |           dk    sJ | d         S | S )a!  
    Process return values for return to the DAG caller. Any exceptions found in
    return_vals will be raised. If return_single_output=True, it indicates that
    the original DAG did not have a MultiOutputNode, so the DAG caller expects
    a single return value instead of a list.
       r   )
isinstance	Exceptionr	   as_instanceof_causelen)r   r   vals      u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/compiled_dag_ref.py_process_return_valsr      s     +y))  , ,c<(( 	,))+++	,  ;1$$$$1~    alpha)	stabilityc                   n    e Zd ZdZ	 ddddedee         fdZd Zd	 Zd
 Z	d Z
d Zddee         fdZdS )CompiledDAGRefa  
    A reference to a compiled DAG execution result.

    This is a subclass of ObjectRef and resembles ObjectRef.  For example,
    similar to ObjectRef, ray.get() can be called on it to retrieve the result.
    However, there are several major differences:
    1. ray.get() can only be called once per CompiledDAGRef.
    2. ray.wait() is not supported.
    3. CompiledDAGRef cannot be copied, deep copied, or pickled.
    4. CompiledDAGRef cannot be passed as an argument to another task.
    Ndagray.experimental.CompiledDAGexecution_indexchannel_indexc                 V    || _         || _        || _        d| _        |j        | _        dS )a  
        Args:
            dag: The compiled DAG that generated this CompiledDAGRef.
            execution_index: The index of the execution for the DAG.
                A DAG can be executed multiple times, and execution index
                indicates which execution this CompiledDAGRef corresponds to.
            actor_execution_loop_refs: The actor execution loop refs that
                are used to execute the DAG. This can be used internally to
                check the task execution errors in case of exceptions.
            channel_index: The index of the DAG's output channel to fetch
                the result from. A DAG can have multiple output channels, and
                channel index indicates which channel this CompiledDAGRef
                corresponds to. If channel index is not provided, this CompiledDAGRef
                wraps the results from all output channels.

        FN)_dag_execution_index_channel_index_ray_get_calleddag_output_channels_dag_output_channels)selfr   r   r   s       r   __init__zCompiledDAGRef.__init__2   s4    , 	 /+$$'$;!!!r   c                 \    d| j                                          d| j         d| j         dS )NzCompiledDAGRef(, execution_index=, channel_index=)r    get_idr!   r"   r&   s    r   __str__zCompiledDAGRef.__str__O   sK    4di..00 4 4#44 4!04 4 4	
r   c                      t          d          )Nz CompiledDAGRef cannot be copied.
ValueErrorr.   s    r   __copy__zCompiledDAGRef.__copy__V   s    ;<<<r   c                      t          d          )Nz%CompiledDAGRef cannot be deep copied.r1   r&   memos     r   __deepcopy__zCompiledDAGRef.__deepcopy__Y   s    @AAAr   c                      t          d          )Nz!CompiledDAGRef cannot be pickled.r1   r.   s    r   
__reduce__zCompiledDAGRef.__reduce__\   s    <===r   c                 ~    | j         j        rd S | j        rd S | j                             | j        | j                   d S N)r    is_teardownr#   _delete_execution_resultsr!   r"   r.   s    r   __del__zCompiledDAGRef.__del___   sH    9  	F 	F	++D,A4CVWWWWWr   timeoutc                 .   | j         rt          d          d| _         	 | j                            | j        | j        |           | j                            | j        | j                  }n# t          $ r  t          $ r{}t          | j        j
                                                  }	 t          j        |d           |# t          $ r}t          d          |d }~wt          $ r}|d d }~ww xY wd }~wt          $ r  w xY wt!          |d          S )NzQray.get() can only be called once on a CompiledDAGRef, and it was already called.T
   )r?   znTimed out when getting the actor execution loop exception. This should not happen, please file a GitHub issue.)r#   r2   r    _execute_untilr!   r"   _get_execution_resultsr   r   listworker_task_refsvaluesraygetr   r   r   )r&   r?   r   channel_erroractor_execution_loop_refstimeout_errorexecution_errors          r   rH   zCompiledDAGRef.getj   s    	B  
  $#	I$$%t':G   )::%t': KK & 	 	 	 	$ 	$ 	$ )-TY-G-N-N-P-P(Q(Q%$12>>>> $# # % % %J  %%  0 0 0 &4/0  	 	 		#K666sH   AA+ +D?+C5+CC5
C2CC2*C--C22C55Dr;   )__name__
__module____qualname____doc__intr   r'   r/   r3   r7   r9   r>   floatrH    r   r   r   r   $   s        
 
  (,	< <+< <  }	< < < <:
 
 
= = =B B B> > >	X 	X 	X,7 ,78E? ,7 ,7 ,7 ,7 ,7 ,7r   r   c            
       ^    e Zd ZdZ	 ddddedddee         fd	Zd
 Zd Zd Z	d Z
d Zd ZdS )CompiledDAGFuturea  
    A reference to a compiled DAG execution result, when executed with asyncio.
    This differs from CompiledDAGRef in that `await` must be called on the
    future to get the result, instead of `ray.get()`.

    This resembles async usage of ObjectRefs. For example, similar to
    ObjectRef, `await` can be called directly on the CompiledDAGFuture to
    retrieve the result.  However, there are several major differences:
    1. `await` can only be called once per CompiledDAGFuture.
    2. ray.wait() is not supported.
    3. CompiledDAGFuture cannot be copied, deep copied, or pickled.
    4. CompiledDAGFuture cannot be passed as an argument to another task.
    Nr   r   r   futzasyncio.Futurer   c                 >    || _         || _        || _        || _        d S r;   )r    r!   _futr"   )r&   r   r   rV   r   s        r   r'   zCompiledDAGFuture.__init__   s(     	 /	+r   c                 \    d| j                                          d| j         d| j         dS )NzCompiledDAGFuture(r)   r*   r+   r,   r.   s    r   r/   zCompiledDAGFuture.__str__   sK    4!1!1!3!3 4 4#44 4!04 4 4	
r   c                      t          d          )Nz#CompiledDAGFuture cannot be copied.r1   r.   s    r   r3   zCompiledDAGFuture.__copy__   s    >???r   c                      t          d          )Nz(CompiledDAGFuture cannot be deep copied.r1   r5   s     r   r7   zCompiledDAGFuture.__deepcopy__   s    CDDDr   c                      t          d          )Nz$CompiledDAGFuture cannot be pickled.r1   r.   s    r   r9   zCompiledDAGFuture.__reduce__   s    ?@@@r   c              #     K   | j         t          d          | j         }d | _         | j                            | j                  sO|                                E d {V }| j        xj        dz  c_        | j                            | j        |           | j                            | j        | j	                  }t          |d          S )NzVCompiledDAGFuture can only be awaited upon once, and it has already been awaited upon.r   T)rX   r2   r    _has_execution_resultsr!   	__await___max_finished_execution_index_cache_execution_resultsrC   r"   r   )r&   rV   resultr   s       r   r_   zCompiledDAGFuture.__await__   s      9-   i	y//0EFF 	N #//////FI33q833I..t/DfMMMi66!4#6
 
 $K666r   c                 ~    | j         j        rd S | j        d S | j                             | j        | j                   d S r;   )r    r<   rX   r=   r!   r"   r.   s    r   r>   zCompiledDAGFuture.__del__   sE    9  	F9F	++D,A4CVWWWWWr   r;   )rM   rN   rO   rP   rQ   r   r'   r/   r3   r7   r9   r_   r>   rS   r   r   rU   rU      s         & (,
, 
,+
, 
, 	
,
  }
, 
, 
, 
,
 
 
@ @ @E E EA A A7 7 7.X X X X Xr   rU   )asynciotypingr   r   r   rG   ray.exceptionsr   r   r   r	   ray.util.annotationsr
   boolr   r   rU   rS   r   r   <module>ri      sV    & & & & & & & & & & 



            + * * * * *d3i t    , Wq7 q7 q7 q7 q7 q7 q7 q7h WJX JX JX JX JX JX JX JX JX JXr   