
    &`i1                       d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZ d dlmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZ d dlmZmZmZm Z m!Z!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d d	lm)Z)m*Z*m+Z+m,Z, d d
l-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZG d dlHmIZI d dlJmKZKmLZLmMZM d dlNmOZO d dlPmQZQ  ejR        eS          ZT ejU                    aVd ZWdddeeX         deeX         fdZYeO	 d?deedeXf                  de2deeX         de1fd            ZZeO	 d@d!ed"         d#ee         d$e[ddfd%            Z\eO	 d@d!ed"         d#ee         d$e[ddfd&            Z]eOd!ed"         ddfd'            Z^d( Z_d)e2deeX         fd*Z`d+ ZaeO G d, d-                      Zb G d. d/          ZceO G d0 d"                      Zde G d1 d2                      ZeeO G d3 d4                      ZfeO	 	 	 	 	 	 	 dAd6d7d8eeg         d9eeh         d:e[d;eeh         d<eeh         d$ee[         d=eeeBeXf                  dd4fd>            ZidS )B    N)defaultdict)nullcontext)asdict	dataclass)AnyDictListOptionalSetTupleUnion) RAY_CGRAPH_ENABLE_NVTX_PROFILING!RAY_CGRAPH_ENABLE_TORCH_PROFILINGRAY_CGRAPH_VISUALIZE_SCHEDULE)_build_dag_node_operation_graph_DAGNodeOperation_DAGNodeOperationType_DAGOperationGraphNode_extract_execution_schedule%_generate_actor_to_execution_schedule'_generate_overlapped_execution_schedule_visualize_execution_schedule)DAGOperationFuture	GPUFutureResolvedFuture)RayCgraphCapacityExceededRayChannelErrorRayChannelTimeoutErrorRayTaskError)AwaitableBackgroundReaderAwaitableBackgroundWriterChannelContextChannelInterfaceChannelOutputTypeCompiledDAGArgsCompositeChannelIntraProcessChannelReaderInterfaceSynchronousReaderSynchronousWriterWriterInterfaceAcceleratorContext)AutoTransportTypeTypeHintResolver)CachedChannel)Communicator)SharedMemoryType)_destroy_communicator_init_communicator)TorchTensorType)CompiledDAGFutureCompiledDAGRef_process_return_vals)DeveloperAPI)NodeAffinitySchedulingStrategyc                      t                                           D ]\  } }|                    d           t          j                    a d S )NTkill_actors)_compiled_dagsitemsteardownweakrefWeakValueDictionary)_compiled_dags     m/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/dag/compiled_dag_node.py_shutdown_all_compiled_dagsrF   _   sO    )//11 0 0< 	$////022NNN    output_nodezray.dag.MultiOutputNodeinput_attributesreturnc           	      &   ddl m} t                      }t                      }| g}|rt|                                }||v r|                    |           t          ||          r|                    |j                   |                    |j                   |t||z
  }|rnd	                    d |D                       }d	                    d |D                       }	t          |          dk    rdnd}
t          d	|	 d
| d|
 d          dS )a  
    Helper function to check that all input attributes are used in the DAG.
    For example, if the user creates an input attribute by calling
    InputNode()["x"], we ensure that there is a path from the
    InputAttributeNode corresponding to "x" to the DAG's output. If an
    input attribute is not used, throw an error.

    Args:
        output_node: The starting node for the traversal.
        input_attributes: A set of attributes accessed by the InputNode.
    r   )InputAttributeNode, c              3   4   K   | ]}t          |          V  d S Nstr.0keys     rE   	<genexpr>z5_check_unused_dag_input_attributes.<locals>.<genexpr>   s(      )P)Ps#c(()P)P)P)P)P)PrG   c              3   4   K   | ]}t          |          V  d S rO   rP   rR   s     rE   rU   z5_check_unused_dag_input_attributes.<locals>.<genexpr>   s(      (N(NcS(N(N(N(N(N(NrG      z	is unusedz
are unusedzDCompiled Graph expects input to be accessed using all of attributes z, but  zf. Ensure all input attributes are used and contribute to the computation of the Compiled Graph output.N)ray.dagrL   setpopadd
isinstancerT   extend_upstream_nodesjoinlen
ValueError)rH   rI   rL   used_attributesvisited_nodesstackcurrent_nodeunused_attributesunused_attributes_strinput_attributes_strunused_phrases              rE   "_check_unused_dag_input_attributesrk   h   sq    +*****eeOEEM&1]E
 	3yy{{=((,'''l$677 	2 0111\1222  	3 )?: 
 $		)P)P>O)P)P)P P P#yy(N(N=M(N(N(NNN'*+<'='='B'B?';? ?(? ?+8? ? ?
 
 	

 
rG   reader_and_node_listray.actor.ActorHandletypdriver_actor_idc                     d}	 t          j                    j        }n# t          $ r Y nw xY w|                    |||          }|S )a  Generic actor method to allocate an output channel.

    Args:
        reader_and_node_list: A list of tuples, where each tuple contains a reader
            actor handle and the node ID where the actor is located.
        typ: The output type hint for the channel.
        driver_actor_id: If this channel is read by a driver and that driver is an
            actual actor, this will be the actor ID of that driver actor.

    Returns:
        The allocated channel.
    N)rayget_runtime_contextcurrent_actorRuntimeErrorcreate_channel)selfrl   rn   ro   writeroutput_channels         rE   do_allocate_channelry      si    ( 15F(**8    '' N
 s    
**FtasksExecutableTaskscheduleoverlap_gpu_communicationc                    	 |D ]}|                     |           t          r\t          r
J d            	 ddl}n# t          $ r t	          d          w xY w|                                }|                                 t          rt          r
J d            ddl}|j        	                    |j        j
        j        |j        j
        j        gd|j                            d                    }|                                 t                              d	           d
}		 |	rn1|D ]-}
||
j                                     | |
j        |          }	|	r n.4t          r|                                 t          r0|                                 t                              d           dS dS # t,          $ r t/          j        d            w xY w)a5  A generic actor method to begin executing the operations belonging to an
    actor. This runs an infinite loop to execute each _DAGNodeOperation in the
    order specified by the schedule. It exits only if the actor dies or an
    exception is thrown.

    Args:
        tasks: the executable tasks corresponding to the actor methods.
        schedule: A list of _DAGNodeOperation that should be executed in order.
        overlap_gpu_communication: Whether to overlap GPU communication with
            computation during DAG execution to improve performance.
    r}   z<NVTX and torch profiling cannot be enabled at the same time.r   NzaPlease install nvtx to enable nsight profiling. You can install it by running `pip install nvtx`.Tcompiled_graph_torch_profiles)
activities
with_stackon_trace_readyzTorch profiling startedFzTorch profiling stopped'Compiled DAG task exited with exception)preparer   r   nvtxImportErrorProfileenabletorchprofilerprofileProfilerActivityCPUCUDAtensorboard_trace_handlerstartloggerinfoexec_task_idxexec_operationtypedisablestop	Exceptionlogging	exception)rv   rz   r|   r}   taskr   nvtx_profiler   torch_profiledone	operations              rE   do_exec_tasksr      s[   $9 	N 	NDLL3LLMMMM+ 	"5N NMN N5   !H  
  <<>>L!!!, 	34N NMN N4 LLL!N22N37N38  $~GG3    3 	 	M !!!KK1222	 %  	Y45DD).*C   E	 , 	#  """, 	3   KK122222	3 	3    CDDDs    3F* ; F* AEF* * G
c                    	 |D ]}|                     |           t          | d          sg | _        d}	 |rdS |D ]}t          j                    }||j                 }|                    | |j        |          }t          j                    }| j                            t          | j
        j        t          j                                                    t          j                                                    |j        |j        |j        j        ||                     |r n# t&          $ r t)          j        d            w xY w)a  A generic actor method similar to `do_exec_tasks`, but with profiling enabled.

    Args:
        tasks: the executable tasks corresponding to the actor methods.
        schedule: A list of _DAGNodeOperation that should be executed in order.
        overlap_gpu_communication: Whether to overlap GPU communication with
            computation during DAG execution to improve performance.
    r   __ray_cgraph_eventsFT)actor_classname
actor_nameactor_idmethod_name
bind_indexr   start_tend_tr   N)r   hasattrr   timeperf_counterr   r   r   append_ExecutableTaskRecord	__class____name__rq   rr   get_actor_nameget_actor_idr   r   valuer   r   r   )	rv   rz   r|   r}   r   r   r   r   r   s	            rE   do_profile_tasksr     s   $ 	N 	NDLL3LLMMMMt233 	*')D$	 %  	+--Y45**).*C  )++(//)(,(?#&#:#<#<#K#K#M#M!$!8!:!:!G!G!I!I$($4#'?"+."6 '#	 	 	    E1	4    CDDDs   7D' C,D' ' Ec                 j    |D ]}|                                  |D ]}|                                 d S rO   )destroy_cuda_eventcancel)rv   rz   r   s      rE   do_cancel_executable_tasksr   =  sP      " "!!!!   rG   c           	          t           j        j                            d                    t          j        t          |           | | j                            d          }t          d||           }|S )N T)task_exceptionr   )function_nametraceback_strcause)
rq   _privateutilsformat_error_messager`   	tracebackformat_exceptionr   __traceback__r   )exc	backtracewrappeds      rE   _wrap_exceptionr   F  sq    "77
	*499c3;LMMNN 8  I %  G
 NrG   	type_hintc                 j    |                                  rt          | t                    sJ | j        S dS )a(  
    Get the communicator group ID from the type hint. If the type hint does not
    require communicator, return None.

    Args:
        type_hint: The type hint of the channel.

    Returns:
        The communicator group ID if the type hint requires communicator,
        otherwise None.
    N)requires_acceleratorr]   r5   communicator_idr   s    rE   _get_comm_group_idr   S  s<     %%'' ))_55555((4rG   c                  :   t          j                    j        st                      S ddl} ddlm} t          j                    j        }|j        dk    r'| j	        
                                st                      S  |j                                        |          S )z
    Return a context manager for executing communication operations
    (i.e., READ and WRITE). For accelerator operations, the context manager
    uses the proper cuda device from channel context, otherwise,
    nullcontext will be returned.
    r   Nr,   cuda)r"   get_currenttorch_availabler   r   ,ray.experimental.channel.accelerator_contextr-   torch_devicer   r   is_availablegetget_device_context)r   r-   devices      rE   _device_context_managerr   e  s     %''7 }}LLLOOOOOO'))6F{fUZ%<%<%>%> }}!!##66v>>>rG   c                       e Zd ZdZdeddfdZedee         fd            Z	ede
eef         fd            Zedefd	            Zeded
         fd            ZdefdZdS )CompiledTaskz0Wraps the normal Ray DAGNode with some metadata.idxdag_noderay.dag.DAGNodec                 h    || _         || _        i | _        g | _        g | _        g | _        g | _        dS )z
        Args:
            idx: A unique index into the original DAG.
            dag_node: The original DAG node created by the user.
        N)r   r   downstream_task_idxsoutput_channelsoutput_idxs	arg_nodesoutput_node_idxs)rv   r   r   s      rE   __init__zCompiledTask.__init__  sG       IK! 8:<> 35+-rG   rJ   c                 4    | j                                         S rO   )r   get_argsrv   s    rE   argszCompiledTask.args  s    }%%'''rG   c                 4    | j                                         S rO   )r   
get_kwargsr   s    rE   kwargszCompiledTask.kwargs  s    }'')))rG   c                 *    t          | j                  S rO   )ra   r   r   s    rE   num_readerszCompiledTask.num_readers  s    4,---rG   r$   c                 $    d | j         D             S )Nc                     g | ]	}|j         
S  r   )rS   arg_nodes     rE   
<listcomp>z/CompiledTask.arg_type_hints.<locals>.<listcomp>  s    BBBx"BBBrG   )r   r   s    rE   arg_type_hintszCompiledTask.arg_type_hints  s    BB4>BBBBrG   c                 8    d| j          d| j         d| j         dS )Nz
            Node: z
            Arguments: z
            Output: z
            )r   r   r   r   s    rE   __str__zCompiledTask.__str__  sA    = 	  )   	rG   N)r   
__module____qualname____doc__intr   propertyr   r   r   r   rQ   r   r   r	   r   r   r   rG   rE   r   r     s       ::.C .+< . . . .@ (eCj ( ( ( X( *S#X * * * X* .S . . . X. C%8 9 C C C XC      rG   r   c                   N    e Zd ZdZdeeef         dee         fdZ	dedefdZ
dS )	_ExecutableTaskInputa@  Represents an input to an ExecutableTask.

    Args:
        input_variant: either an unresolved input (when type is ChannelInterface)
            , or a resolved input value (when type is Any)
        channel_idx: if input_variant is an unresolved input, this is the index
            into the input channels list.
    input_variantchannel_idxc                 "    || _         || _        d S rO   )r   r   )rv   r   r   s      rE   r   z_ExecutableTaskInput.__init__  s    
 +&rG   channel_resultsrJ   c                 d    t          | j        t                    r|| j                 }n| j        }|S )z
        Resolve the input value from the channel results.

        Args:
            channel_results: The results from reading the input channels.
        )r]   r   r#   r   )rv   r  r   s      rE   resolvez_ExecutableTaskInput.resolve  s6     d(*:;; 	'#D$45EE&ErG   N)r   r   r   r   r   r#   r   r
   r   r   r  r   rG   rE   r   r     su         '-s23' c]' ' ' 's s      rG   r   c                       e Zd ZdZdddee         deeef         fdZd Z	d Z
dd
efdZdededdfdZdefdZd
edefdZd
edefdZdefdZ	 dded
edefdZdS )r{   z^A task that can be executed in a compiled DAG, and it
    corresponds to an actor method.
    r   r   resolved_argsresolved_kwargsc                    ddl m} |j                                        | _        |j                                        | _        |j        | _        |j        | _        |j	        | _
        |j        j        | _        d| _        t          |j        |          r|j        j        | _        g | _        g | _        || _        |j        | _        i }|D ]}t          |t(                    rV|}||v r	||         }n6| j                            |           t-          | j                  dz
  }|||<   t/          ||          }	nt/          |d          }	| j                            |	           | j                                        D ]}
t          |
t(                    rJ t3          | j                  | _        t7          | j        | j                  | _        d| _        dS )a_  
        Args:
            task: The CompiledTask that this ExecutableTask corresponds to.
            resolved_args: The arguments to the method. Arguments that are
                not Channels will get passed through to the actor method.
                If the argument is a channel, it will be replaced by the
                value read from the channel before the method executes.
            resolved_kwargs: The keyword arguments to the method. Currently, we
                do not support binding kwargs to other DAG nodes, so the values
                of the dictionary cannot be Channels.
        r   CollectiveOutputNodeNrW   )rY   r
  r   get_method_namer   _get_bind_indexr   r   r   r   input_type_hintsr   output_type_hintcollective_opr]   input_channelstask_inputsr  r   task_idxr#   r   ra   r   valuesr)   input_readerr*   output_writer_intermediate_future)rv   r   r  r  r
  input_channel_to_idxargchannelr   
task_inputvals              rE   r   zExecutableTask.__init__  s   " 	100000=88::-7799#3+9=9L37=3J GKdm%9:: 	=!%!<D6879/>  =?  	0 	0C#/00 =222"6w"?KK '..w777"%d&9":":Q">K4?(11#{CC

1#t<<
##J//// '..00 	9 	9C!#'7888888 .?t?R-S-S.? $"2/
 /
 CG!!!rG   c                 j    | j                                          | j                                         dS )z
        Close all the input channels and the output channel. The exact behavior
        depends on the type of channel. Typically, it will release the resources
        used by the channels.
        N)r  closer  r   s    rE   r   zExecutableTask.cancel.  s4     	!!!  """""rG   c                 8    t          j        | j                   dS )z
        If this executable task has created a GPU future that is not yet waited on,
        that future is in the channel context cache. Remove the future from the cache
        and destroy its CUDA event.
        N)r   remove_gpu_futurer  r   s    rE   r   z!ExecutableTask.destroy_cuda_event7  s     	#DM22222rG   Fr}   c                 F   | j         D ]}|                                 | j                                         | j                                         | j                                         t                      | _        t                      | _        |sdS | j        	                                rOt          | j                  }t          j                    j                            |          }|J |j        | _        | j         r| j         D ]}|	                                r~t          |          }t          j                    j                            |          }|J t!          | j        t                    s| j        |j        k    s
J d            |j        | _        dS dS )a3  
        Prepare the task for execution. The `exec_operation` function can only
        be called after `prepare` has been called.

        Args:
            overlap_gpu_communication: Whether to overlap GPU communication with
                computation during DAG execution to improve performance
        NzhCurrently all torch tensor input channels of a Compiled Graph task should use the same recv cuda stream.)r  register_custom_serializerr  r  r   r  r   _send_stream_recv_streamr   r   r"   r   communicatorsr   send_streamr]   recv_stream)rv   r}   typ_hintcomm_group_id
comm_groupr   s         rE   r   zExecutableTask.prepare?  s    - 	2 	2H//111188:::!!!  """ (MM'MM( 	F  5577 	7.t/DEEM'355CGGVVJ))) * 6D  	?!2 ? ?	1133 ?$6y$A$AM!/!;!=!=!K!O!O%" "J &111%d&7EE #0J4JJJJX  KJJ )3(>D%	? 	?? ?rG   r  wrap_in_gpu_futurerJ   Nc                 t    | j         J |rt          || j                  }nt          |          }|| _         dS )a  
        Wrap the value in a `DAGOperationFuture` and store to the intermediate future.
        The value corresponds to result of a READ or COMPUTE operation.

        If wrap_in_gpu_future is True, the value will be wrapped in a GPUFuture,
        Otherwise, the future will be a ResolvedFuture.

        Args:
            val: The value to wrap in a future.
            wrap_in_gpu_future: Whether to wrap the value in a GPUFuture.
        N)r  r   r  r   )rv   r  r*  futures       rE    wrap_and_set_intermediate_futurez/ExecutableTask.wrap_and_set_intermediate_futurek  sI     (000 	)sDM22FF#C((F$*!!!rG   c                 F    | j         }d| _         |                                S )a  
        Reset the intermediate future and wait for the result.

        The wait does not block the CPU because:
        - If the future is a ResolvedFuture, the result is immediately returned.
        - If the future is a GPUFuture, the result is only waited by the current
            CUDA stream, and the CPU is not blocked.

        Returns:
            The result of a READ or COMPUTE operation from the intermediate future.
        N)r  wait)rv   r,  s     rE   "reset_and_wait_intermediate_futurez1ExecutableTask.reset_and_wait_intermediate_future  s"     *$(!{{}}rG   c                     | j         J d}	 | j                                        }|                     ||           n# t          $ r d}Y nw xY w|S )ad  
        Read input data from upstream DAG nodes and cache the intermediate result.

        Args:
            overlap_gpu_communication: Whether to overlap GPU communication with
                computation during DAG execution to improve performance.

        Returns:
            True if system error occurs and exit the loop; otherwise, False.
        NFr*  T)r  r  readr-  r   )rv   r}   exit
input_datas       rE   _readzExecutableTask._read  s     (000	*//11J 11#< 2      	 	 	DDD	 s   0> AAc                    |                                  }	 t          |d           n/# t          $ r"}|                     ||           Y d}~dS d}~ww xY wg }| j        D ]*}|                    |                    |                     +| j        | j        j        }nt          || j
                  }	  ||i | j        }n&# t          $ r}t          |          }Y d}~nd}~ww xY w|                     ||           dS )a  
        Retrieve the intermediate result from the READ operation and perform the
        computation. Then, cache the new intermediate result. The caller must ensure
        that the last operation executed is READ so that the function retrieves the
        correct intermediate result.

        Args:
            overlap_gpu_communication: Whether to overlap GPU communication with
                computation during DAG execution to improve performance.
            class_handle: An instance of the class to which the actor belongs. For
                example, the type of `class_handle` is <class 'xxxx.Worker'> if the
                actor belongs to the `class Worker` class.
        Returns:
            True if system error occurs and exit the loop; otherwise, False.
        F)return_single_outputr2  N)r0  r8   r   r-  r  r   r  r  executegetattrr   r  r   )	rv   r}   class_handler5  r   resolved_inputsr  method
output_vals	            rE   _computezExecutableTask._compute  sm   ( <<>>

	 %HHHHH 	 	 	
 11(A 2    55555	 * 	C 	CJ"":#5#5j#A#ABBBB)'/FF \4+;<<F	.ID4HIIJJ 	. 	. 	.(--JJJJJJ	.
 	--+D 	. 	
 	
 	
 us,   ( 
AAA5C 
C&C!!C&c                     |                                  }d}	 | j                            |           n# t          $ r d}Y nw xY w|S )af  
        Retrieve the intermediate result from the COMPUTE operation and write to its
        downstream DAG nodes. The caller must ensure that the last operation executed
        is COMPUTE so that the function retrieves the correct intermediate result.

        Returns:
            True if system error occurs and exit the loop; otherwise, False.
        FT)r0  r  writer   )rv   r>  r4  s      rE   _writezExecutableTask._write  sd     <<>>
	$$Z0000 	 	 	DDD	 s   3 AAop_typec                 8   |t           j        k    rjt                      5  | j        5  |                     |          cddd           cddd           S # 1 swxY w Y   	 ddd           dS # 1 swxY w Y   dS |t           j        k    r|                     ||          S |t           j        k    rit                      5  | j        5  | 	                                cddd           cddd           S # 1 swxY w Y   	 ddd           dS # 1 swxY w Y   dS dS )a  
        An ExecutableTask corresponds to a DAGNode. It consists of three
        operations: READ, COMPUTE, and WRITE, which should be executed in
        order to ensure that each operation can read the correct intermediate
        result.
        Args:
            class_handle: The handle of the class to which the actor belongs.
            op_type: The type of the operation. Possible types are READ,
                COMPUTE, and WRITE.
            overlap_gpu_communication: Whether to overlap GPU communication with
                computation during DAG execution to improve performance.
        Returns:
            True if the next operation should not be executed; otherwise, False.
        N)
r   READr   r#  r6  COMPUTEr?  WRITEr"  rB  )rv   r;  rC  r}   s       rE   r   zExecutableTask.exec_operation  s   ( +000(** A A& A A::&?@@A A A A A A AA A A A A A A AA A A A A A A A AA A A A A A A A A A A A A A A A A A -555==!:LIII-333(** ) )& ) );;==) ) ) ) ) ) )) ) ) ) ) ) ) )) ) ) ) ) ) ) ) )) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) 43si   A.AA.A	A.A	A..A25A2?DC4D4C8	8D;C8	<DDDF)r   r   r   r   r	   r   r   rQ   r   r   r   boolr   r-  r0  r6  r?  rB  r   r   r   rG   rE   r{   r{     s        IGIG CyIG c3h	IG IG IG IGV# # #3 3 3*? *? *? *? *? *?X++,0+	+ + + +,C     t     65#'5 
	5 5 5 5n    , +0	) ) ') $(	)
 
) ) ) ) ) )rG   c                   f    e Zd ZU eed<   eed<   eed<   eed<   eed<   eed<   eed<   eed<   d	 Zd
S )r   r   r   r   r   r   r   r   r   c                      t          |           S rO   )r   r   s    rE   to_dictz_ExecutableTaskRecord.to_dict   s    d||rG   N)r   r   r   rQ   __annotations__r   floatrL  r   rG   rE   r   r     ss         OOOMMMOOONNNNNNLLL    rG   r   c                      e Zd ZdZ ej        d           G d d                      Z	 	 	 	 	 	 	 dLd	ee         d
ee	         de
dee	         dee	         dee
         deeeef                  fdZede
fd            ZdefdZdefdZdMdZdNdZdNdZ	 dOddded         de
ddfdZded          ddfd!ZdNd"Zed#ddee         fd$            Zd#ed         defd%Z	 	 dNd&Zdedeee                   f         fd'Z!dedee"         f         fd(Z#de
fd)Z$d* Z%d+ Z&d,e	de
fd-Z'd,e	d.e(fd/Z)d,e	d0ee	         dee(         fd1Z*d,e	d0e	fd2Z+d,e	fd3Z,	 dPd4e	d5ee         de
fd6Z-	 dPd4e	d5ee         de
fd7Z.d8 Z/d4e	fd9Z0	 	 dQd,e	d0ee	         d5ee         fd:Z1dee2ee2         f         fd;Z3d<e4e(d=f         d>eee(f         ddfd?Z5dee6ee6         f         fd@Z7defdAZ8dBe9dCedefdDZ:	 	 	 	 dRdefdGZ;dH Z<dOdIe
fdJZ=dK Z>dS )SCompiledDAGzExperimental class for accelerated execution.

    This class should not be called directly. Instead, create
    a ray.dag and call experimental_compile().

    See REP https://github.com/ray-project/enhancements/pull/48 for more
    information.
    r   )num_cpusc                       e Zd ZdZdS )CompiledDAG.DAGDriverProxyActora  
        To support the driver as a reader, the output writer needs to be able to invoke
        remote functions on the driver. This is necessary so that the output writer can
        create a reader ref on the driver node, and later potentially create a larger
        reader ref on the driver node if the channel backing store needs to be resized.
        However, remote functions cannot be invoked on the driver.

        A Compiled Graph creates an actor from this class when the DAG is initialized.
        The actor is on the same node as the driver. This class has an empty
        implementation, though it serves as a way for the output writer to invoke remote
        functions on the driver node.
        N)r   r   r   r   r   rG   rE   DAGDriverProxyActorrS  /  s        	 	 	rG   rT  NFcreatesubmit_timeoutbuffer_size_bytesenable_asynciomax_inflight_executionsmax_buffered_resultsr}   default_communicatorc                    ddl m} |                                }	|| _        t	          j                    | _        || _        | j        |	j        | _        || _	        | j	        |	j
        | _	        t          j                    j        | _        || _        | j        |	j        | _        |	j        | _        || _        | j        |	j        | _        || _        | j        |	j        | _        d| _        t1          |t2                    r"|dk    r
d| _        d}nHt5          d|           |4t1          |t6                    st5          dt9          |                     || _        t=          t>                    | _         i | _!        t?                      | _"        t?                      | _#        t?                      | _$        tK          | j        | j        	          | _&        t1          | j        tN                    r| j        dk    rt5          d
| j                   t	          j(                    | _)        i | _*        i | _+        d| _,        d| _-        d| _.        g | _/        d| _0        d| _1        d| _2        d| _3        d| _4        d| _5        d| _6        i | _7        t=          tp                    | _9        i | _:        i | _;        t=          tp                    | _<        i | _=        d| _>        d| _?        t=          t                    | _A        i | _B        dd}
 |
            | _C        d| _D        t                      | _E        t                      | _F        dS )a5  
        Args:
            submit_timeout: The maximum time in seconds to wait for execute() calls.
                None means using default timeout (DAGContext.submit_timeout),
                0 means immediate timeout (immediate success or timeout without
                blocking), -1 means infinite timeout (block indefinitely).
            buffer_size_bytes: The initial buffer size in bytes for messages
                that can be passed between tasks in the DAG. The buffers will
                be automatically resized if larger messages are written to the
                channel.
            enable_asyncio: Whether to enable asyncio. If enabled, caller must
                be running in an event loop and must use `execute_async` to
                invoke the DAG. Otherwise, the caller should use `execute` to
                invoke the DAG.
            max_inflight_executions: The maximum number of in-flight executions that
                can be submitted via `execute` or `execute_async` before consuming
                the output using `ray.get()`. If the caller submits more executions,
                `RayCgraphCapacityExceeded` is raised.
            max_buffered_results: The maximum number of results that can be
                buffered at the driver. If more results are buffered,
                `RayCgraphCapacityExceeded` is raised. Note that
                when result corresponding to an execution is retrieved
                (by calling `ray.get()` on a `CompiledDAGRef` or
                `CompiledDAGRef` or await on a `CompiledDAGFuture), results
                corresponding to earlier executions that have not been retrieved
                yet are buffered.
            overlap_gpu_communication: (experimental) Whether to overlap GPU
                communication with computation during DAG execution. If True, the
                communication and computation can be overlapped, which can improve
                the performance of the DAG execution. If None, the default value
                will be used.
            _default_communicator: The default communicator to use to transfer
                tensors. Three types of values are valid. (1) Communicator:
                For p2p operations, this is the default communicator
                to use for nodes annotated with `with_tensor_transport()` and when
                shared memory is not the desired option (e.g., when transport="accelerator",
                or when transport="auto" for communication between two different GPUs).
                For collective operations, this is the default communicator to use
                when a custom communicator is not specified.
                (2) "create": for each collective operation without a custom communicator
                specified, a communicator is created and initialized on its involved actors,
                or an already created communicator is reused if the set of actors is the same.
                For all p2p operations without a custom communicator specified, it reuses
                an already created collective communicator if the p2p actors are a subset.
                Otherwise, a new communicator is created.
                (3) None: a ValueError will be thrown if a custom communicator is not specified.

        Returns:
            Channel: A wrapper around ray.ObjectRef.
        r   
DAGContextNFrU  TzBThe only allowed string for default_communicator is 'create', got zHThe default_communicator must be None, a string, or a Communicator, got )rW  num_shm_buffersz6`buffer_size_bytes` must be a positive integer, found rJ   rm   c                      t           j                            t          t	          j                                                    d                                                    S )NF)soft)scheduling_strategy)rP  rT  optionsr:   rq   rr   get_node_idremoter   rG   rE   _create_proxy_actorz1CompiledDAG.__init__.<locals>._create_proxy_actor  s[     2::$B+--99;;%% % % ;   fhh	rG   )rJ   rm   )GrY   r^  r   _enable_asyncioasyncioQueue
_fut_queue_max_inflight_executionsrY  _max_buffered_resultsrZ  uuiduuid4hex_dag_id_submit_timeoutrV  get_timeout_get_timeout_buffer_size_bytesrW  _overlap_gpu_communicationr}   _create_default_communicatorr]   rQ   rb   r1   r   _default_communicatorr   rZ   _communicator_to_type_hints"_actors_to_created_communicator_id)_p2p_actors_with_unresolved_communicators,_p2p_dag_nodes_with_unresolved_communicators-_collective_ops_with_unresolved_communicatorsr2   _default_type_hintr   Lock_dag_submission_lockidx_to_taskdag_node_to_idxcounterinput_task_idxoutput_task_idxinput_attr_task_idxs_returns_list_input_num_positional_args_input_kwargsdag_input_channelsdag_output_channels_dag_submitter_dag_output_fetcherworker_task_refslistactor_to_tasksactor_to_gpu_idsactor_to_executable_tasksactor_to_execution_scheduleactor_to_node_id_execution_index_max_finished_execution_indexdict_result_buffer_channel_dict_proxy_actor_is_teardown_destructed_ref_idxs_got_ref_idxs)rv   rV  rW  rX  rY  rZ  r}   r[  r^  ctxrg  s              rE   r   zCompiledDAG.__init__@  s   x 	'&&&&&$$&&%3!-//(?%(0,/,GD)%9"%-),)AD&z||'0>'#&#5D -0_1B"*&)&;D#:S'*2.1.KD+,1)*C00 	#x//481'+$$ 2/2 2   "-j ,7
 7
- 40114 4   >R"  	(  	/ EE 	6 EE 	9 EE 	: 6F"5 !96
 6
 6
 $1377 	4;RVW;W;W-*- -   %,LNN! 79=? .2.2/1!#( :>'.2 EIEI 9=>B  QS  	
 KM  	&  	(
 OQ &( 35*9DT9J9JGI	 	 	 	 0/11!
 DH66!
 =AFFrG   rJ   c                     | j         S rO   )r  r   s    rE   is_teardownzCompiledDAG.is_teardown'  s      rG   c                     | j         S )z8
        Get the unique ID of the compiled DAG.
        rq  r   s    rE   get_idzCompiledDAG.get_id+  s     |rG   c                     d| j          dS )NzCompiledDAG()r  r   s    rE   r   zCompiledDAG.__str__1  s    -dl----rG   noder   c                 x    | j         }t          ||          | j        |<   || j        |<   | xj         dz  c_         d S )NrW   )r  r   r  r  )rv   r  r   s      rE   	_add_nodezCompiledDAG._add_node4  sA    l ,S$ 7 7%(T"rG   c                    ddl m}m}m}m}m}m}m} d\  | _        | _	        t                      }| j                                        D ]\  }	}
t          |
j        |          rL| j        
J d            |	| _        |
j        }|                    |j                                                   ft          |
j        |          r| j                            |	           | j                                        D ]b\  }	}
|	| j        k    st          |
j        |          r&t)          |
j                  dk    r$|
j        j        r| j	        
J d            |	| _	        c| j	        J | j        | j	                 j        }t          ||          s4 ||g          }|                     |           | j        |         | _	        nd| _        | j        t5          d          d}t                      }t                      }t                      }| j                                        D ]U\  }}
|
j        }t          ||          snt          ||          s^t          ||          sNt          ||          s>t          ||          rt5          d	          t7          d
t9          |                     t          ||          r|j        r |                                }|t7          d          || j        vr"t@          !                    |          | j        |<   t          |j"        tF                    r|$                    |
           |j"        %                                r| &                    ||h           t          ||          r@| &                    |t          |j'        j(                  d           | j)        r
J d            nt          ||          st          ||          ra|j"        %                                rt7          d          t          |j"        tF                    rtU          |j"        j+                  |_"        t9          |j"                  tX          u r| j-        |_"        |
j.                                        D ]$\  }}t          ||          rt7          d          %t_          |
j0                  D ]\  }}t          ||          s| j        |         }| j        |         }d}t          ||          r|j        r|                                }|
j1                            |j                   t          |j        |          rt          |j        j2        tf                    r |$                    |j        j2                   nNt          |j        j2        th                    r |$                    |j        j2                   nt7          d          ||rt7          d          d}| j        | j                 }n*t          |j        |          r||st7          d          d}||j        |<   |j        j"        %                                r| &                    |j        |h           Wtk          ||           | 6                                 | 7                    |           | 8                                 |rd| _9        n!|sd| _9        ntu          |          dz   | _9        tw          |          | _<        dS )zBefore compiling, preprocess the DAG to build an index from task to
        upstream and downstream tasks, and to set the input and output node(s)
        of the DAG.

        This function is idempotent.
        r   )ClassMethodNoder
  DAGNodeFunctionNoderL   	InputNodeMultiOutputNodeNNNzMore than one InputNode foundzMore than one output node foundTz5Compiled DAGs currently require exactly one InputNodez7Compiled DAGs currently only support actor method nodeszFound unsupported node of type z[Compiled DAGs can only bind methods to an actor that is already created with Actor.remote())r  zCurrently, the overlap_gpu_communication option is not supported for accelerator collective operations. Please set overlap_gpu_communication=False.zpDAG inputs cannot be transferred via accelerator because the driver cannot participate in the communicator group)r   zLCompiled DAG currently does not support binding to other DAG nodes as kwargszPInputNode() can only be indexed using int for positional args or str for kwargs.z^All tasks must either use InputNode() directly, or they must index to specific args or kwargs.FrW   )=rY   r  r
  r  r  rL   r  r  r  r  rZ   r  r?   r]   r   updateinput_attribute_nodeskeysr  r   ra   r   is_cgraph_output_noder  r  r  NotImplementedErrorrb   r   is_class_method_call_get_actor_handler  rP  _get_gpu_idsr   r.   r\   r   _track_communicator_usage_collective_opactor_handlesrv  r5   r   r$   r~  r   	enumerater   r   rT   r   rQ   rk   _check_leaf_nodes_resolve_auto_transport_init_communicatorsr  maxtupler  )rv   r  r
  r  r  rL   r  r  rI   r   r   
input_noderH   direct_inputinput_positional_argsinput_kwargsauto_transport_tasksr  r   actor_handlerC   r  r  upstream_node_idxupstream_taskdownstream_actor_handles                             rE   _preprocesszCompiledDAG._preprocess:  s>   	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 5?1T1%(UU)//11 		6 		6IC$-33 6*224S222&)# "]
 ''
(H(M(M(O(OPPPPDM+=>> 6)00555 )//11 
	+ 
	+ICd)))Z1. .) D-..!33M7 4 +335V333'*$#///&t';<E+77 	&)/;-88KNN;'''#'#7#DD  !%D &%G   (,*-%%!$ 58EE #.4466 E	 E	NHd}H8Y//Yh(:;;Y h88Y h88	Y h55 Y-Q   %%WtH~~%W%WXXX(O44 ,9V ,'99;;'$F  
  t'<<<:E:R:R$; ;D),7 h02CDD 3(,,T222 %::<< M228l^LLLh(<== 
22 H3ABB&* 3   
  $>  ; >
 Hi00 J,5 5  %::<< $R   h02CDD  *9'18* * *H& H&''+<<< &*%<"+++--  3c7++ $4   $DI.. ; ;3!#w// $($8$=! $ 01B C*.'x99K 5K /7.H.H.J.J+ %%m&<===m46HII  ( "-"8"<cBB -11-2H2LMMMM#M$:$>DD $(()?)CDDDD(E  
 $/L/(&  
 $)L %)$4T5H$IMM 6	BB (#//(M   $(L?V28< )3HHJJ  22%.01  q;| 	+;8HIII   $$%9:::  """ 	M./D++& 	M./D++.12G.H.H1.LD+"<00rG   c                    | j                                         D ]G\  }}t          |                                || j                  }|D ]}|                    |           Ht          j                    j        }t          j                    j	        }| j
        D ]}| j        st          d          t          |j                  }|| j        v r| j        |         }n/t          t!          |          d| j        ||          }|| j        |<   |j                            |           d}	| j        r| j                                        D ]#\  }}| j                            |          r|}	 n$|	*t          t!          | j                  d| j        ||          }	| j        D ]}
|
j                            |	           dS dS )z7
        Initialize communicators for the DAG.
        z?Communicator creation is not allowed for collective operations.N)ry  r?   r4   get_actor_handlesrv  set_communicator_idr-   r   module_namecommunicator_clsr}  rw  rb   r  r  rz  r  r   r{  issubsetr|  )rv   communicator
type_hintsr   r   accelerator_module_nameaccelerator_communicator_clsr  actorsp2p_communicator_idr   s              rE   r  zCompiledDAG._init_communicators  s<    )-(H(N(N(P(P 	? 	?$L*0..00/ O
 ( ? ?	--o>>>>? #5"8":":"F'9'='?'?'P$ "O 	I 	IM4  U   =677F@@@"&"I&"Q"4LL3+0# # CR7?#77HHHH
 #9 	L 8>>@@  AJJ6RR *9'E #*&8GHH3+0' '# !M L L"667JKKKK#	L 	L L LrG   r   r  rm   r  c                    d|v rt          d          |r|j        j        }n|j        }|                                }|^| j        P| j        sI|j        /t          |j        t                    sJ t          d| d          t          d| d          | j        }|Y|r!| j	        
                    |j                   dS | j        
                    |           | j                            |           dS |rNt          |                                          |k    r(t          d|                                 d| d	          n\|                    t          |                                                    s(t          d
|                                 d| d	          | j        |         
                    |           dS )a  
        Track the usage of a communicator.

        This method first determines the communicator to use: if a custom
        communicator is specified, use it; if not and a default communicator
        is available, use it; otherwise, it records necessary information to
        create a new communicator later.

        This method also performs validation checks on the passed-in communicator.

        Args:
            dag_node: The DAG node that uses the communicator, this is the node
                that has the `with_tensor_transport()` type hint for p2p communication,
                or a `CollectiveOutputNode` for collective operations.
            actors: The full or partial set of actors that use the communicator.
                This method should be called one or multiple times so that all actors
                of the communicator are tracked.
            collective_op: Whether the communicator is used for a collective operation.
        Nz4Driver cannot participate in the communicator group.z<with_tensor_transport(transport='auto') is used for DAGNode z|, This requires specifying a default communicator or 'create' for _default_communicator when calling experimental_compile().zDAGNode z has no custom communicator specified. Please specify a custom communicator for the DAGNode using `with_tensor_transport()`, or specify a communicator or 'create' for _default_communicator when calling experimental_compile().zThe passed-in communicator must have the same set of actors as the collective operation. The passed-in communicator has actors z+ while the collective operation has actors .z{The passed-in communicator must include all of the actors used in the P2P operation. The passed-in communicator has actors z$ while the P2P operation has actors )rb   r  r   get_custom_communicatorrx  rw  _original_type_hintr]   r.   r}  r\   r|  r{  r  rZ   r  r  ry  )rv   r   r  r  r   r  s         rE   r  z%CompiledDAG._track_communicator_usage]  sp   2 6>>STTT 	+ /9II *I 88::*29 3 /;%h&BDUVVVVV$UW_ U U U  
 !Qx Q Q Q    5L NBFF+     AEEhOOO>EEfMMMMM |557788FBB$OAMA_A_AaAaO O FLO O O   C s<+I+I+K+K'L'LMM $HAMA_A_AaAaH H ?EH H H   ,\:>>yIIIIIrG   r  r   c                     t           j                  }|D ]}|j                                        }|j                                        }|                     |          f} fd|D             }|                    |j        j        ||          |j        _        |j        j        	                                r< 
                    |j        t          |                              |h                     dS )zC
        Resolve the auto transport type hint for the DAG.
        c                 >    g | ]}|                     |          fS r   )_get_node_id)rS   readerrv   s     rE   r   z7CompiledDAG._resolve_auto_transport.<locals>.<listcomp>  s9     $ $ $8>**6223$ $ $rG   N)r/   r  r   r  r   r  r  r  r   r   r  rZ   union)rv   r  type_hint_resolverr   rw   readerswriter_and_noderl   s   `       rE   r  z#CompiledDAG._resolve_auto_transport  s    .d.CDD ) 	 	D]4466F/6688G%t'8'8'@'@AO$ $ $ $BI$ $ $  '9&@&@'$' 'DM#
 }&;;== ..MLL&&x00  !	 	rG   c                    ddl m}m} g }| j                                        D ]Y\  }}t          |j        |          st          |j                  dk    r&|j        j	        s|
                    |j                   Zt          |          dk    r-t          dt          |           dd |D              d          dS )z[
        Check if there are leaf nodes in the DAG and raise an error if there are.
        r   )r  r  zzCompiled DAG doesn't support leaf nodes, i.e., nodes that don't have downstream nodes and are not output nodes. There are z2 leaf nodes in the DAG. Please add the outputs of c                 6    g | ]}|                                 S r   )r  )rS   	leaf_nodes     rE   r   z1CompiledDAG._check_leaf_nodes.<locals>.<listcomp>  s$    LLLII--//LLLrG   z to the the MultiOutputNode.N)rY   r  r  r  r?   r]   r   ra   r   r  r   rb   )rv   r  r  
leaf_nodesrC   r   s         rE   r  zCompiledDAG._check_leaf_nodes  s   	
 	
 	
 	
 	
 	
 	
 	

 %'
'--// 	1 	1GAtdm_== D-..!33; 4 !!$-000 z??a(z??( ( MLLLL( ( (    rG   r  c                     t          j        | j                            d                     }|                    dg           S )z5
        Get the GPU IDs of an actor handle.
        c                 L    t          j                                                    S rO   )rq   rr   get_accelerator_idsr   s    rE   <lambda>z*CompiledDAG._get_gpu_ids.<locals>.<lambda>  s    S466JJLL rG   GPU)rq   r   __ray_call__rf  )r  accelerator_idss     rE   r  zCompiledDAG._get_gpu_ids  sI    
 '%,,LL 
 

 ""5"---rG   c                 
   || j         v r| j         |         S d}|| j        k    s|&t          j                                                    }n-t          j        |j                            d                     }|| j         |<   |S )a   
        Get the node ID of an actor handle and cache it.

        Args:
            actor_handle: The actor handle, or None if the actor handle is the
                driver.
        Returns:
            The node ID of the actor handle or driver.
        Nc                 L    t          j                                                    S rO   )rq   rr   re  r   s    rE   r  z*CompiledDAG._get_node_id.<locals>.<lambda>  s    !8!:!:!F!F!H!H rG   )r  r  rq   rr   re  r   r  rf  )rv   r  node_ids      rE   r  zCompiledDAG._get_node_id  s     4000(664,,,0D-//;;==GGg)00HH  G
 /6l+rG   c           	          ddl m}m}m}m}m}  j                                           j        J  j         j	        J dS  j        g}t                      }|r|                    d          }||v r|                    |            j        |         }	t          |	j        |          r|	j        j        rt#          |	j                  dk    sJ t'          t(                    }
|	j        D ]h} j        |         }|j        }t          ||          r|j        r fd|j        D             |
|<   D|	|
vrg |
|	<   |
|	                             |           i|	j                            d          }|
                                D ]\  }}g }t                      }d}|D ]}t          |j        |          r8d}|                    d j                              j                  f           O|j                                        }||vrX|j                                        }|                    |                     |          f           |                    |           |r%t=          j                                                     nd}t=          j!        |"                    tF          ||	j        j$        |                    }d}|j        }t          ||          r|j        r|j%        }|	j                            |           |	j&                            |           |	j'                             j(        |                    |	j                                        }|J  j)        |                             |	           nt          |	j        |          r|	j        j        r|	j        }|j*        }|sJ  j         j(        |                  }tW          t#          |j                            D ]b}|j&        |         |j%        k    rJ|	j                            |j        |                    |	j&                            |j&        |                    ct#          |	j                  dk    sJ nt          |	j        |          rt'          t                    }|	j        D ]} j        |         }t          |j        |          sJ |j                                        }                     |          } |j,        D ]?}!t          |!|          st          |!|          r||!                             || f           @g |	_        |D ]}"t)          ||"                   }tG           ||"j$        d          }|	j                            |           |	j&                            t          |"|          rdn|"j-                   t          |"|          rN j(        |"         }# j        |#         }$|$j                            |           t#          |$j                  dk    sJ n,t          |	j        |          st          |	j        |          sJ |	j        D ]}|                    |           | j                                        D ]m\  }%}	|% j        k    s |% j.        k    st          |	j        |          r1|%|vr8d}&|	j,        D ]}!t          |!|          rd}&|&st_          d	|	j         d
          nddl0m1}' |'r# 2                                rt_          d           j         j                 }(|(j         _3         j3        J  j)                                        D ]<\  }})t'          t                    }*|)D ]}	d}&|	j,        D ]t}!t          |!|          rbd}&|*|!                             |	            j(        |!         }+ j        |+         }t#          |j                  dk    sJ |j        d         },|,J u|&st_          d          |*                                D ]\  }!}- j(        |!         }+ j        |+         }t#          |j                  dk    sJ |j        d         },|,J t#          |-          dk    r&ti          t#          |-          |,           j5        |,<   |, j5        |,<   g }.|)D ]}	g }/|	j,        D ]}!t          |!|          rh j(        |!         }+ j        |+         }t#          |j                  dk    sJ |j        d         },|,J  j5        |,         },|/                    |,           z|/                    |!           tm          |	|/|	j7                  }0|.                    |0           |.8                    d            |. j9        |<   >ddl0m:}1 |1rtv          }2ntx          }2 =                                 _>         j9                                        D ]N\  }}.|j?        @                    d          "                    |2|. j>        |          jA                   jB        |<   O j.        J g  _C         j         j.                 j,        D ]m}t          ||          sJ  j(        |         } j        |         }	t#          |	j                  dk    sJ  jC                            |	j        d                    n D                                  j3        sJ  jC        sJ d  jC        D             sJ  jE        st#           jC                  dk    sJ  F                                 _G         j         j                 }( jH        rAt           j3        |(j&        d           _        t           jC         jK                   _	        n:t           j3        |(j&        d           _        t           jC                   _	         j        N                                  j	        N                                 dS )a  Compile an execution path. This allocates channels for adjacent
        tasks to send/receive values. An infinite task is submitted to each
        actor in the DAG that repeatedly receives from input channel(s) and
        sends to output channel(s).

        This function is idempotent and will cache the previously allocated
        channels. After calling this function, _dag_submitter and
        _dag_output_fetcher will be set and can be used to invoke and fetch
        outputs for the DAG.
        r   r  r  rL   r  r  Nc                 *    g | ]}j         |         S r   )r  )rS   r   rv   s     rE   r   z/CompiledDAG._get_or_compile.<locals>.<listcomp>K  s1     > > > # !,S1> > >rG   r  FTrW   zzCompiled DAGs require each task to take a ray.dag.InputNode or at least one other DAGNode as an input. Invalid task node:
z*
Please bind the task to proper DAG nodes.)!RAY_CGRAPH_ENABLE_DETECT_DEADLOCKa)  This DAG cannot be compiled because it will deadlock on accelerator calls. If you believe this is a false positive, please disable the graph verification by setting the environment variable RAY_CGRAPH_ENABLE_DETECT_DEADLOCK to 0 and file an issue at https://github.com/ray-project/ray/issues/new/.zeCompiled DAGs require each task to take a ray.dag.InputNode or at least one other DAGNode as an inputc                     | j         S rO   )r   )r   s    rE   r  z-CompiledDAG._get_or_compile.<locals>.<lambda>?  s    4? rG   rT   )RAY_CGRAPH_ENABLE_PROFILING_ray_system)concurrency_groupc                     g | ]}|d uS rO   r   )rS   rx   s     rE   r   z/CompiledDAG._get_or_compile.<locals>.<listcomp>c  s*     
 
 
+9N$&
 
 
rG   )is_input)OrY   r  r  rL   r  r  r  r  r  r  rZ   r[   r\   r  r]   r   r  ra   r   r   r  r   is_class_method_outputr   _get_remote_methodr?   insertr  r  r  rq   rr   r   r   rf  ry   r   
output_idxr   r   r  r  class_method_callranger   rT   r  rb   ray.dag.constantsr  _detect_deadlockr  r0   r  r{   r   sortr  r  r   r   _build_execution_scheduler  r  rd  rv  r  r  (_register_input_output_custom_serializerr  _monitor_failures_monitorrh  r!   r    rk  r*   r)   r   )3rv   r  r  rL   r  r  frontiervisitedcur_idxr   output_to_readersr   downstream_taskdownstream_nodefnoutputr  rl   reader_handles_setread_by_multi_output_noder  reader_handlero   rx   r  r  	task_nodeupstream_noder  i!input_node_to_reader_and_node_setreader_taskreader_node_idr  input_dag_nodeinput_attr_idxinput_attr_tasknode_idxhas_at_least_one_channel_inputr  
input_taskrz   arg_to_consumersarg_idxarg_channel	consumersexecutable_tasksr  executable_taskr  exec_task_funcs3   `                                                  rE   _get_or_compilezCompiledDAG._get_or_compile  s   	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 &"...*+777F'(%% f	%ll1ooG'!!KK   #G,D4=/::\M6\
 4/00A5555 MXM M!  4 H HC&*&6s&;O&5&>O"?ODDH+BH> > > >'6'K> > >)/::
  '88868-d3)$/66GGGG]55nEE'8'>'>'@'@ 4X 4XOFGVX( *-&05-") F F%foGG F8<5 177 !$($5$($5$5d6G$H$H!"    -3O,M,M,O,OM,4FFF060Q0Q0S0S 4 ; ;%2D4E4Em4T4T$U!" !" !" !3 6 6} E E E
 5"/11>>@@@! $ &)W		/0 M3+	 & &N "&J&,oO"?ODD@+B@ &5%?
(//???$++J777)001Eo1VWWWW#}>>@@#///#L188>>>>4=/::EM8E !M	 ) ;$$$} $ 01Em1T Us=#@AABB N NA$03y7KKK,33M4QRS4TUUU(//0I!0LMMM4/00A55555DM955 8  $$ 2
  4  C"&"23"7K%k&:OLLLLL$/$8$J$J$L$LM%)%6%6}%E%EN*/  %c+=>> *C C  >cBFF!. ?  	 (*$&G I IN+/9.I, ,( &9,&0	& &N (//???$++%ni@@0+/   ".2DEE I)-)=n)M*.*:>*J'7>>~NNN"?#BCCqHHHH1I4 "$-1CDD 
M?I I    0 % %$$$$M  f	%R #.4466 	 	NHdD///t333dm-?@@ 4 w&&16.9 > >C!#w// >9=65 $D  =D D D   	HGGGGG, 	1F1F1H1H 	B   %d&9:
","<&222 $(#6#<#<#>#> D	L D	LL%ALSAQAQ   16.9 7 7C!#w// 79=6(-11$777"&"6s";(,(8(A"=#@AAQFFFF&3&CA&F*6665 $    #3"8"8":": B BY.s3 $ 0 9=899Q>>>>+;A>"...y>>A%%6CI#7 7D&{33
 7BD&{33  " 9 9+-9 2 2C!#w// 
2"&"6s";(,(8(A"=#@AAQFFFF&3&CA&F*666&*&8&E%,,[9999 &,,S1111"0!K# #
 !''8888 !!&B&B!CCC;KD*<88AAAAAA& 	+-NN*N ,0+I+I+K+K(.2.L.R.R.T.T 	 	*L*2>2K2S2S"/ 3T 3 3f 0>/	  !,// #///#% &t';<A 	E 	EFfg.....-f5J#J/Dt+,,1111$++D,@,CDDDD 	55777&&&&''''
 
=A=U
 
 
 	
 	
 
 ! 	6t/00A5555 ..00%d&9:
 	S";'&# # #D
 (A(( (D$$
 #4')?$# # #D (99Q'R'RD$!!### &&(((((rG   c           	         ddl m} | j        sJ | j        sJ t	          t
                    }| j                                        D ]B\  }}t          |          D ]+\  }}|j        }| j        |         j	        }|j
        }	|                                }d}
|j        D ]}|j                                        rd}
 n t          ||          }|j                                        }t!          t#          |t$          j        |	          |||
          }t!          t#          |t$          j        |	          |||          }t!          t#          |t$          j        |	          |||          }||                             |||g           -D|S )a  
        Generate READ, COMPUTE, and WRITE operations for each DAG node.

        Returns:
            A dictionary that maps an actor handle to a list of lists of
            _DAGOperationGraphNode. For the same actor, the index of the
            outer list corresponds to the index of the ExecutableTask in
            the list of `executable_tasks` in `actor_to_executable_tasks`,
            i.e. `exec_task_idx`. In the inner list, the order of operations
            is READ, COMPUTE, and WRITE.

            Example:
            {
                actor1: [
                    [READ COMPUTE WRITE] # exec_task_idx 0
                    [READ COMPUTE WRITE] # exec_task_idx 1
                ]
            }
        r   r	  FT)ray.dag.collective_noder
  r  r  r   r  r?   r  r  r   r   r  r_   r   r   r]   r   r   r   rE  rF  rG  r   )rv   r
  actor_to_operation_nodesr  r$  r   	exec_taskr  r   r   requires_accelerator_readr  requires_accelerator_computerequires_accelerator_write	read_nodecompute_node
write_nodes                    rE   "_generate_dag_operation_graph_nodez.CompiledDAG._generate_dag_operation_graph_node  s   , 	A@@@@@----  	! /3.L.R.R.T.T -	 -	*L*,56F,G,G , ,(y %-+H5>'3'99;;,1)%-%=  M$.CCEE 481 0:20 0, .6-?-T-T-V-V*2%%'<'A;   - 	  6%%'<'Dk   0    4%%'<'BK   . 
 )6==j9   U,\ ('rG   c                    |                                  }t          | j        |          }t          |          }d}| j        rt          |          }t          rt          |||           |t          |          S t          |          S )a  
        Generate an execution schedule for each actor. The schedule is a list of
        _DAGNodeOperation.

        Step 1: Generate a DAG node operation graph. Refer to the functions
        `_generate_dag_operation_graph_node` and `_build_dag_node_operation_graph`
        for more details.

        Step 2: Topological sort

        It is possible to have multiple _DAGOperationGraphNodes with zero in-degree.
        Refer to the function `_select_next_nodes` for the logic of selecting nodes.

        Then, put the selected nodes into the corresponding actors' schedules.

        The schedule should be intuitive to users, meaning that the execution should
        perform operations in ascending order of `bind_index` as much as possible.

        [Example]:

        See `test_execution_schedule` for more examples.

        Returns:
            actor_to_execution_schedule: A dictionary that maps an actor handle to
                the execution schedule which is a list of operations to be executed.
        N)	r2  r   r  r   rv  r   r   r   r   )rv   r*  graphr  actor_to_overlapped_schedules        rE   r  z%CompiledDAG._build_execution_schedule  s    < $(#J#J#L#L /6
 
 'LE&R&R# (,$* 	+R+, ,( ) 	)+-I5   (3./KLLL./JKKKrG   c                 :    t                               d           dS )a  
        TODO (kevin85421): Avoid false negatives.

        Currently, a compiled graph may deadlock if there are accelerator channels,
        and the readers have control dependencies on the same actor. For example:

        actor1.a ---> actor2.f1
                 |
                 ---> actor2.f2

        The control dependency between `actor2.f1` and `actor2.f2` is that `f1` should
        run before `f2`. If `actor1.a` writes to `actor2.f2` before `actor2.f1`, a
        deadlock will occur.

        Currently, the execution schedule is not granular enough to detect this
        deadlock.

        Returns:
            True if a deadlock is detected; otherwise, False.
        z0Deadlock detection has not been implemented yet.F)r   debugr   s    rE   r  zCompiledDAG._detect_deadlock  s    * 	GHHHurG   c                     t          j        |            G fddt          j                  } |            }|                                 |S )Nc                   ^     e Zd Z fdZdeffdZd	deffdZd	deffdZfdZ xZ	S )
.CompiledDAG._monitor_failures.<locals>.Monitorc                     t                                          d           d| _        t          j                    | _        d| _        d S )NT)daemonCompiledGraphMonitorThreadF)superr   name	threadingr  _in_teardown_lock_teardown_done)rv   r   s    rE   r   z7CompiledDAG._monitor_failures.<locals>.Monitor.__init__$  sE       ---8	 *3)9)9&&+###rG   rJ   c                 X                 t                               dd           dS dS )NzCompiledDAG has been destructed before teardown. This should not occur please report an issue at https://github.com/ray-project/ray/issues/new/.T)
stack_infoF)r   error)rv   	get_outers    rE   _outer_ref_alivez?CompiledDAG._monitor_failures.<locals>.Monitor._outer_ref_alive,  sA    9;;&LLJ $(	 !    !5trG   Fr=   c                     
            }|                                  sd S ddlm} |                                }|j        }|j                                        D ]\  }}d}	 t          j        ||           nk# t          j	        j
        $ rI d| d| d}	|r|	dz  }	t          j        |           n|	d	z  }	t                              |	           d
}Y nt          $ r Y nw xY w|s	 t          j        |           # t          $ r Y w xY w|r;|j        D ]5}t                              d|            t          j        |           4d S d S )Nr   r]  FtimeoutzCompiled DAG actor z is still running zs after teardown().z_ Force-killing actor. Increase RAY_CGRAPH_teardown_timeout if you want teardown to wait longer.zQ Teardown may hang. Call teardown with kill_actors=True if force kill is desired.TzKilling actor: )rG  rY   r^  r   teardown_timeoutr  r?   rq   r   
exceptionsGetTimeoutErrorkillr   warningr   r   )rv   r=   outerr^  r  rK  actorrefrJ  msgrF  s             rE   wait_teardownz<CompiledDAG._monitor_failures.<locals>.Monitor.wait_teardown7  s   !	,,.. F...... ,,..#&#7 "'"8">">"@"@ % %JE3#G-=>>>>>>9 ' ' 'E% E E/E E E  ' !;C
  HUOOOO!.C s+++"&$    	 # ! $     ( "'!7 ( ($=e$=$=>>>( (( (s+   $A;;AC#	C#"C#*C??
DDc                    | j         5  | j        r	 d d d            d S              }|                                 s	 d d d            d S t                              d           |j                                         |j                                         |j        	                                D ]}t                              d|             d |j        
                                D             }|D ]N}	 t          j        |d           # t          $ r Y &t          $ r t                              d           Y Kw xY w|j                                        D ]}t%          |           t                              d           |                     |           t                              d	           d
| _        d d d            d S # 1 swxY w Y   d S )NzTearing down compiled DAGz%Cancelling compiled worker on actor: c                 T    g | ]%\  }}|j                             t          |          &S r   )r  rf  r   )rS   rQ  rz   s      rE   r   zKCompiledDAG._monitor_failures.<locals>.Monitor.teardown.<locals>.<listcomp>~  s@     # # #(E5 *112LeTT# # #rG      rI  zError cancelling worker taskz Waiting for worker tasks to exitr<   zTeardown completeT)rA  rB  rG  r   r   r  r  r  r  r  r?   rq   r   r   r   r   rz  r  r3   rT  )rv   r=   rP  rQ  cancel_refs
cancel_refr   rF  s          rE   r@   z7CompiledDAG._monitor_failures.<locals>.Monitor.teardownn  s   + '/ '/* '/ '/ '/ '/ '/ '/ '/ '/ &IKKE0022 '/ '/ '/ '/ '/ '/ '/ '/ KK ;<<<(..000-33555!&!@!E!E!G!G U U$SE$S$STTTT# #,1,K,Q,Q,S,S# # #K '2 	! 	!
!GJ;;;;;. ! ! ! !D( ! ! !",,-KLLL D! AHHJJ? ?'-o>>>>KK BCCC&&;&???KK 3444*.D'O'/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/ '/sN   	GGB,G8DG
EG#E>G EA?GGGc                    	              }|                                  sd S t          j        t          |j                                                             d S # t          $ r4 t                              d           | 	                    d           Y d S t          $ r<}t                              d|            | 	                                 Y d }~d S d }~ww xY w)Nz>Received KeyboardInterrupt, tearing down with kill_actors=TrueTr<   z&Handling exception from worker tasks: )rG  rq   r   r  r  r  KeyboardInterruptr   r   r@   r   r7  )rv   rP  erF  s      rE   runz2CompiledDAG._monitor_failures.<locals>.Monitor.run  s    $%IKKE0022 GD!7!>!>!@!@AABBBBB( 4 4 4KKX   MMdM333333  $ $ $LL!M!!M!MNNNMMOOOOOOOOO$s"   A 8A :C	C#1CCrH  )
r   r   r   r   rI  rG  rT  r@   r]  __classcell__)r   rF  s   @rE   Monitorr:  #  s        , , , , ,	$ 	 	 	 	 	 	5( 5( 5( 5( 5( 5( 5( 5(n(/ (/D (/ (/ (/ (/ (/ (/T$ $ $ $ $ $ $ $ $rG   r_  )rA   rR  r@  Threadr   )rv   r_  monitorrF  s      @rE   r  zCompiledDAG._monitor_failures   ss    K%%	B	$ B	$ B	$ B	$ B	$ B	$ B	$i& B	$ B	$ B	$H '))rG   c                     | j         | j        z
  }|| j        k    r+t          j                            d| j         d| d          d S )N(The compiled graph can't have more than z. in-flight executions, and you currently have z in-flight executions. Retrieve an output using ray.get before submitting more requests or increase `_max_inflight_executions`. `dag.experimental_compile(_max_inflight_executions=...)`)r  r  rl  rq   rL  r   )rv   num_inflight_executionss     rE   &_raise_if_too_many_inflight_executionsz2CompiledDAG._raise_if_too_many_inflight_executions  su    !D$FF 	  #d&CCC.::K0K K"9K K K   DCrG   execution_indexc                     || j         v S )av  Check whether there are results corresponding to the given execution
        index stored in self._result_buffer. This helps avoid fetching and
        caching results again.

        Args:
            execution_index: The execution index corresponding to the result.

        Returns:
            Whether the result for the given index has been fetched and cached.
        )r  )rv   rf  s     rE   _has_execution_resultsz"CompiledDAG._has_execution_results  s     $"555rG   resultc                     |                      |          s=t          |          D ]/\  }}|| j        v r|| j        |         v s|| j        |         |<   .dS dS )a  Cache execution results in self._result_buffer. Results are converted
        to dictionary format to allow efficient element removal and calculation of
        the buffer size. This can only be called once per execution index.

        Args:
            execution_index: The execution index corresponding to the result.
            result: The results from all channels to be cached.
        N)rh  r  r  r  )rv   rf  ri  chan_idxress        rE   _cache_execution_resultsz$CompiledDAG._cache_execution_results  s     **?;; 	I!*6!2!2 I I# $t'@@@ D$=o$NNNEHD'8B	I 	II IrG   channel_indexc                    || j         v sJ |Gd t          | j                             |                                          d           D             }n!| j         |                             |          g}|| j        vrt                      | j        |<   | j        |                             |           |                     |           |S )a  Retrieve execution results from self._result_buffer and return the result.
        Results are converted back to original list format ordered by output channel
        index.

        Args:
            execution_index: The execution index to retrieve results from.
            channel_index: The index of the output channel corresponding to the result.
                Channel indexing is consistent with the order of
                self.dag_output_channels. None means that the result wraps outputs from
                all output channels.

        Returns:
            The execution result corresponding to the given execution index and channel
            index.
        Nc                     g | ]
}|d          S )rW   r   )rS   kvs     rE   r   z6CompiledDAG._get_execution_results.<locals>.<listcomp>  s,        1  rG   c                     | d         S )Nr   r   )rq  s    rE   r  z4CompiledDAG._get_execution_results.<locals>.<lambda>  s
    2a5 rG   r  )r  sortedr[   r?   r  rZ   r\   _clean_up_buffers)rv   rf  rn  ri  s       rE   _get_execution_resultsz"CompiledDAG._get_execution_results  s    , $"55555   '++O<<BBDD((    FF )/:>>}MMNF$"44425%%D/?+//>>>///rG   c                     || j         vrt                      | j         |<   | j         |                             |           |                     |           dS )a1  
        Delete the execution results for the given execution index and channel index.
        This method should be called when a CompiledDAGRef or CompiledDAGFuture is
        destructed.

        Note that this method maintains metadata for the deleted execution results,
        and only actually deletes the buffers lazily when the buffer is not needed
        anymore.

        Args:
            execution_index: The execution index to destruct results from.
            channel_index: The index of the output channel corresponding to the result.
        N)r  rZ   r\   rt  )rv   rf  rn  s      rE   _delete_execution_resultsz%CompiledDAG._delete_execution_results
	  sY     $";;;9<D%o6!/266}EEE/////rG   c                 &   d}| j                             |t                                }d|v r t          |          dk    s
J d            d}nn| j                            |t                                }|                    |          }|t          t          t          | j                                      k    }|sdS | j        	                    |d           | j        	                    |d           | j         	                    |d           dS )zQ
        Try to release the result buffer for the given execution index.
        FNrW   )zgwhen None exists in got_channel_idxs, it means all channels, and it should be the only value in the setT)
r  r   rZ   ra   r  r  r  r  r  r[   )rv   rf  should_releasegot_channel_idxsdestructed_channel_idxsprocessed_channel_idxss         rE   _try_release_result_bufferz&CompiledDAG._try_release_result_buffer	  s$   
 -11/355II###'((A--- 0--- "NN&*&?&C&C' '# &6%;%;<S%T%T"3sc$233448 8 N  	5666!%%ot<<<555trG   idx_to_releaserJ  c                    || j         dz   k    rdS | j                            |t                                }d}d|v r t	          |          dk    s
J d            d}n't	          |          t	          | j                  k    rd}|sdS || j        vsJ || j        vsJ 	 | j        	                    |           n+# t          $ r}t          d| j         d          |d}~ww xY w| j                            |           dS )a9  
        Try to release the native buffer for the given execution index.

        Args:
            idx_to_release: The execution index to release buffers from.
            timeout: The maximum time in seconds to wait for the release.

        Returns:
            Whether the buffers have been released.
        rW   FN)znwhen None exists in destructed_channel_idxs, it means all channels, and it should be the only value in the setTzReleasing native buffers corresponding to a stale CompiledDAGRef is taking a long time. If this is expected, increase RAY_CGRAPH_get_timeout which is currently E seconds. Otherwise, this may indicate that the execution is hanging.)r  r  r   rZ   ra   r  r  r  r  release_channel_buffersr   rt  r[   )rv   r~  rJ  r{  ry  r\  s         rE   _try_release_native_bufferz&CompiledDAG._try_release_native_buffer<	  sc    T?!CCC5"&";"?"?PSPUPU"V"V***.//1444 7444 "NN())S1I-J-JJJ!N 	5 T%88888T%77777		$<<WEEEE% 	 	 	(=A=N    	 	!%%n555ts    B; ;
C#CC#c                 |    |                      ||          r| xj        dz  c_        dS |                     |          S )a  
        Try to release the buffer for the given execution index.
        First try to release the native buffer, then try to release the result buffer.

        Args:
            idx_to_release: The execution index to release buffers from.
            timeout: The maximum time in seconds to wait for the release.

        Returns:
            Whether the native buffer or result buffer has been released.
        rW   T)r  r  r}  )rv   r~  rJ  s      rE   _try_release_bufferzCompiledDAG._try_release_buffern	  sK     **>7CC 	 ..!3..4..~>>>rG   c                     | j         }	 t          j                    }|                     | j        dz   |          sdS |dk    r)|t          j                    |z
  z  }t          |d          }c)a  
        Repeatedly release buffer if possible.

        This method starts from _max_finished_execution_index + 1 and tries to release
        as many buffers as possible. If a native buffer is released,
        _max_finished_execution_index will be incremented.
        TrW   r`  r   N)rt  r   	monotonicr  r  r  )rv   rJ  
start_times      rE   _try_release_buffersz CompiledDAG._try_release_buffers	  s|     #		*))J++2Q6   "}}4>++j88gq//		*rG   c                 X    |                      |           |                                  dS )a  
        Clean up native and result buffers.

        This method:
        1. Tries to release the buffer for the given execution index.
           This index is the specific one that requires a clean up,
           e.g., right after get() is called or a CompiledDAGRef/CompiledDAGFuture
           is destructed.
        2. Tries to release all buffers starting from _max_finished_execution_index + 1.
           This step is to clean up buffers that are no longer needed.

        Args:
            idx_to_release: The execution index that requires a clean up,
                e.g., right after get() is called or a CompiledDAGRef/CompiledDAGFuture
                is destructed.
        N)r  r  )rv   r~  s     rE   rt  zCompiledDAG._clean_up_buffers	  s0    " 	  000!!#####rG   c                    || j         }| j        |k     r-t          | j                  | j        k    r-t          d| j         dt          | j                   d          t          j                    }	 |                     | j        dz   |          s8| j	        
                    |          }|                     | j        dz   |           | xj        dz  c_        n+# t          $ r}t          d| j          d          |d}~ww xY w|dk    r)|t          j                    |z
  z  }t          |d	          }| j        |k     +dS dS )
av  Repeatedly execute this DAG until the given execution index and
        buffer results for all CompiledDagRef's.
        If the DAG has already been executed up to the given index, it will do nothing.

        Note: If this comes across execution indices for which the corresponding
        CompiledDAGRef's have been destructed, it will release the buffer and not
        cache the result.

        Args:
            execution_index: The execution index to execute until.
            channel_index: The index of the output channel to get the result from.
                Channel indexing is consistent with the order of
                self.dag_output_channels. None means wrapping results from all output
                channels into a single list.
            timeout: The maximum time in seconds to wait for the execution.
                None means using default timeout (DAGContext.get_timeout),
                0 means immediate timeout (immediate success or timeout without
                blocking), -1 means infinite timeout (block indefinitely).

        TODO(rui): catch the case that user holds onto the CompiledDAGRefs
        Nrc  z* buffered results, and you currently have z buffered results. Call `ray.get()` on CompiledDAGRef's (or await on CompiledDAGFuture's) to retrieve results, or increase `_max_buffered_results` if buffering is desired, note that this will increase driver memory usage.rW   zeIf the execution is expected to take a long time, increase RAY_CGRAPH_get_timeout which is currently r  r`  r   )rt  r  ra   r  rm  r   r   r  r  r  r3  rm  r   r  )rv   rf  rn  rJ  r  ri  r\  s          rE   _execute_untilzCompiledDAG._execute_until	  s   6 ?'G0?BB4&''4+EEE/>1> >&)$*=&>&>> > >   ))J666:G   "5::7CCF11:Q>   22a7222)   ,AEAR   
  "}}4>++j88gq//Q 0?BBBBBBs   4A&C 
D%C>>Dc                      j         rt          d                                                                 ||           t	          |          dk    rt	          |          dk    r	|d         }nt          ||          }                                                                    	  j        	                    | j
                   n+# t          $ r}t          d j
         d          |d}~ww xY w xj        dz  c_         j        r. fdt          t	           j                            D             }nt!            j                  }|S )	a  Execute this DAG using the compiled execution path.

        Args:
            args: Args to the InputNode.
            kwargs: Kwargs to the InputNode

        Returns:
            A list of Channels that can be used to read the DAG result.

        Raises:
            RayChannelTimeoutError: If the execution does not complete within
                self._submit_timeout seconds.

        NOTE: Not thread-safe due to _execution_index etc.
        z(Use execute_async if enable_asyncio=TruerW   r   r   r   zhIf the execution is expected to take a long time, increase RAY_CGRAPH_submit_timeout which is currently zA seconds. Otherwise, this may indicate that execution is hanging.Nc                 <    g | ]}t          j        |          S r   )r7   r  )rS   rn  rv   s     rE   r   z'CompiledDAG.execute.<locals>.<listcomp>'
  s8       ! tT%:MJJ  rG   )rh  rb   r'  _check_inputsra   r%   r  re  r  rA  rr  r   r  r  r  r  r7   )rv   r   r   inpr\  rR  s   `     rE   r9  zCompiledDAG.execute	  s   (  	IGHHH4(((t99>>c&kkQ.. q'CC!tF;;;C
 	!!###33555	%%c4+?@@@@% 	 	 	(S@D@TS S S  		 	" 	>   %*3t/G+H+H%I%I  CC
 !t'<==C
s   + C 
C4C//C4r   .r   c                     t          |          | j        k    r't          d| j         dt          |                     | j        D ]}||vrt          d| d          dS )z
        Helper method to check that the DAG args provided by the user during
        execution are valid according to the defined DAG.
        z9dag.execute() or dag.execute_async() must be called with z positional args, got z@dag.execute() or dag.execute_async() must be called with kwarg ``N)ra   r  rb   r  )rv   r   r   kwargs       rE   r  zCompiledDAG._check_inputs0
  s    
 t99777#> t99    ' 	 	EF"" ;27; ; ;   #	 	rG   c                    K    j         st          d                                                                 ||            j        4 d{V  t          |          dk    rt          |          dk    r	|d         }nt          ||          }                                   j        	                    |           d{V  t          j                     j                                       d{V  ddd          d{V  n# 1 d{V swxY w Y    xj        dz  c_         j        r/ fdt!          t           j                            D             nt%            j                  S )a  Execute this DAG using the compiled execution path.

        NOTE: Not thread-safe.

        Args:
            args: Args to the InputNode.
            kwargs: Kwargs to the InputNode.

        Returns:
            A list of Channels that can be used to read the DAG result.
        z#Use execute if enable_asyncio=FalseNrW   r   r  c                 >    g | ]}t          j        |          S r   )r6   r  )rS   rn  futrv   s     rE   r   z-CompiledDAG.execute_async.<locals>.<listcomp>m
  s:       ! "$(=sMRR  rG   )rh  rb   r'  r  r  ra   r%   re  r  rA  ri  Futurerk  putr  r  r  r  r6   )rv   r   r   r  r  s   `   @rE   execute_asynczCompiledDAG.execute_asyncC
  s2       # 	DBCCC4(((, 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+4yyA~~#f++"2"2 1g%4???77999%++C000000000.""C/%%c*********!	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+$ 	" 	F    %*3t/G+H+H%I%I  CC
 $D$*?EEC
s   B(D
DDc           	      
  >? ddl m}m}m}m}m} t          | d          r| j        st          d          | j        	                                D ]=\  }}t          |d          rt          |j        |          st          d| d          >ddlm}m}	  |t                    }
 |t                     > |t"                    } |t                     }d	}i }g }| j        	                                D ]\  }}|j        }d
| d}t          ||          r|dz  }nt          ||          r|d|j         dz  }nt          ||          r|dz  }nt          ||          r||j        rY|                                }|                                }|r!|j                                        dd         nd}|d| d| z  }n3|j        r|d|j         dz  }n|dz  }n|t5          |          j        z  }|||<   t9          |                                          D ]\  }}t          ||          r| j        |         }|t?          |j                   k     rB|j         |         !                                rd}n"t5          |j         |                   j        }nd}|
|         "                    |           >|xx         dz  cc<   |"                    |||f           d}|
	                                D ]D\  }}t?          |          dk    r,|D ]}d||<   |||<   tG          |t?          |                    }E |t                    } |	>fd| j        D                       }d}|r |	            }|rm|$                                } ||         "                    |            |
|          D ]3}!>|!xx         dz  cc<   >|!         dk    r|"                    |!           4|m|}|dz  }||dz  }|	                                D ]\  }}"|| d|" dz  }|dz  }|D ]\  }#}$}|dk    rd }%nd!}%||# d"|% d#|$ d$z  } |d%z  }|d&z  }|d'z  }tG          d( |%                                D                       |z   ?t?          |          }&?fd)tM          |&d*z  dz
            D             }'i }(|	                                D ]r\  })}*|)d*z  }+t9          |*          D ]V\  },} | j        |          }|  d+}-t          |j        |          r|j        j        rc|j                                        }|j                                        }|r!|j                                        dd         nd}|-d,| d+| z  }-nB|j        j        r|-d-|j        j         dz  }-n"|-d.z  }-n|-t5          |j                  j        z  }-d}.| |v r$||)dz
           '                    ||                    }.|,|.z   d/z  }/t9          |-          D ]/\  }0}1|/|0z   t?          |'d                   k     r|1|'|+         |/|0z   <   0|+|/f|(| <   Xt|
	                                D ]\  }#}2|(|#         \  }3}4|2D ]}$|(|$         \  }5}6tM          |3dz   |5          D ]}7|'|7         |4         d"k    rd0|'|7         |4<    |4|6k    rtM          tQ          |4|6          dz   tG          |4|6                    D ]6}8|'|5dz
           |8         d"k    rd1n|'|5dz
           |8         |'|5dz
           |8<   7|6|4k    rd2|'|5dz
           |6dz
  <   nd3|'|5dz
           |6dz   <   d0|'|5dz
           |6<   | j        	                                D ]\  }}t          |j        |          rh|j                                        }9t9          |9          D ]?\  }0}:t          |:|          r*| j        |:         };|;|(v r|(|;         \  }<}=d0|'|<dz
           |=<   @|d4z  }|d$)                    d5 |'D                       z  }|S )6a  
        Visualize the compiled graph in
        ASCII format with directional markers.

        This function generates an ASCII visualization of a Compiled Graph,
        where each task node is labeled,
        and edges use `<` and `>` markers to show data flow direction.

        This method is called by:
            - `compiled_dag.visualize(format="ascii")`



        High-Level Algorithm:
        - Topological Sorting: Sort nodes topologically to organize
            them into layers based on dependencies.
        - Grid Initialization: Set up a 2D grid canvas with dimensions based
            on the number of layers and the maximum number of nodes per layer.
        - Node Placement: Position each node on the grid according to its
            layer and relative position within that layer.
            Spacing is added for readability, and directional markers (`<` and `>`)
            are added to edges to show input/output flow clearly.

        This method should be called
          **after** compiling the graph with `experimental_compile()`.

        Returns:
            ASCII representation of the CG with Nodes Information,
            Edges Information and Graph Built.

        Limitations:
        - Note: This is only used for quick visualization for small graphs.
            For complex graph (i.e. more than 20 tasks), please use graphviz.
        - Scale: Works best for smaller CGs (typically fewer than 20 tasks).
            Larger CGs may result in dense, less readable ASCII
            outputs due to limited space for node and edge rendering.
        - Shape: Ideal for relatively shallow CGs with clear dependency paths.
            For deep, highly branched or densely connected CGs,
            readability may suffer.
        - Edge Overlap: In cases with high fan-out (i.e., nodes with many children)
            or fan-in (nodes with many parents), edge lines may intersect or overlap
            in the ASCII visualization, potentially obscuring some connections.
        - Multi-output Tasks: Multi-output tasks can be visualized, but positioning
            may cause line breaks or overlap when a task has multiple outputs that
            feed into nodes at varying depths.

        Example:
            Basic Visualization:
            ```python
            # Print the CG structure in ASCII format
            print(compiled_dag.visualize(format="ascii"))
            ```

            Example of Ordered Visualization (task is build in order
                to reduce line intersection):
            ```python
            with InputNode() as i:
                o1, o2, o3 = a.return_three.bind(i)
                o4 = b.echo.bind(o1)
                o5 = b.echo.bind(o2)
                o6, o7 = b.return_two.bind(o3)
                dag = MultiOutputNode([o4, o5, o6, o7])

            compiled_dag = dag.experimental_compile()
            compiled_dag.visualize(format="ascii",view=True)


            # Output:
            # 0:InputNode
            # |
            # 1:Actor_54777d:return_three
            # |---------------------------->|---------------------------->|                                                  # noqa
            # 2:Output[0]                   3:Output[1]                   4:Output[2]                                        # noqa
            # |                             |                             |                                                  # noqa
            # 5:Actor_c927c9:echo           6:Actor_c927c9:echo           7:Actor_c927c9:return_two                          # noqa
            # |                             |                             |---------------------------->|                    # noqa
            # |                             |                             9:Output[0]                   10:Output[1]         # noqa
            # |<----------------------------|-----------------------------|-----------------------------|                    # noqa
            # 8:MultiOutputNode
            ```

            Example of Anti-pattern Visualization (There are intersections):
            # We can swtich the nodes ordering to reduce intersections, i.e. swap o2 and o3
            ```python
            with InputNode() as i:
                o1, o2, o3 = a.return_three.bind(i)
                o4 = b.echo.bind(o1)
                o5 = b.echo.bind(o3)
                o6, o7 = b.return_two.bind(o2)
                dag = MultiOutputNode([o4, o5, o6, o7])
            compiled_dag = dag.experimental_compile()
            compiled_dag.visualize(format="ascii",view=True)

            # Output (Nodes 5, 7, 9, 10 should connect to Node 8):
            # 0:InputNode
            # |
            # 1:Actor_84835a:return_three
            # |---------------------------->|---------------------------->|                            # noqa
            # 2:Output[0]                   3:Output[1]                   4:Output[2]                  # noqa
            # |                             |                             |                            # noqa
            # 5:Actor_02a6a1:echo           6:Actor_02a6a1:return_two     7:Actor_02a6a1:echo          # noqa
            # |                             |---------------------------->|                            # noqa
            # |                             9:Output[0]                   10:Output[1]                 # noqa
            # |<----------------------------------------------------------|                            # noqa
            # 8:MultiOutputNode
            ```
        r   r  r  bThe DAG must be compiled before calling 'visualize()'. Please call 'experimental_compile()' first.r   Task at index _ does not have a valid 'dag_node'. Ensure that 'experimental_compile()' completed successfully.)r   dequer   Task z  r  InputAttributeNode[]r  N   unknownActor: z... Method: ClassMethodOutputNode[r  AcceleratorUnknownTyperW   Tc                 ,    g | ]}|         d k    |S )r   r   )rS   r   indegrees     rE   r   z0CompiledDAG._visualize_ascii.<locals>.<listcomp>D  s(    UUUs(3-STBTBTsBTBTBTrG   zNodes Information:
z	 [label="z"] 
z
Edges Information:
z+++z---rX   z> 
z	
Legend:
z1+++> : Represents Accelerator-type data channels
z.---> : Represents Shared Memory data channels
c              3   4   K   | ]}t          |          V  d S rO   ra   )rS   layers     rE   rU   z/CompiledDAG._visualize_ascii.<locals>.<genexpr>i  s(      @@uE

@@@@@@rG   c                 H    g | ]}d  t          dz            D             S )c                     g | ]}d S )rX   r   )rS   rC   s     rE   r   z;CompiledDAG._visualize_ascii.<locals>.<listcomp>.<listcomp>m  s    444444rG      )r  )rS   rC   	max_widths     rE   r   z0CompiledDAG._visualize_ascii.<locals>.<listcomp>m  s4    TTT44eIN33444TTTrG      :Actor_zOutput[UnknownMethodrW  |-><z
Graph Built:
c              3   @   K   | ]}d                      |          V  dS )r   N)r`   )rS   rows     rE   rU   z/CompiledDAG._visualize_ascii.<locals>.<genexpr>  s,      (F(F#(F(F(F(F(F(FrG   )*rY   r  r  rL   r  r  r   r  rb   r?   r]   r   collectionsr   r  r  r   rI  rT   r  r  r  	_actor_idrp  r  r  r   r   r  r   r  ra   r   r   r   r  popleftr  r  indexminr`   )@rv   r  r  rL   r  r  r   r   r   r  adj_listis_multi_outputchild2parentascii_visualization	node_info	edge_infor   labelr   r  r   	arg_indexr  upstream_task_idxr   width_adjustchild_idx_listchildlayerszero_indegreelayer_index
next_layerr  
downstreamr   r  r  edgs_channelheightgridtask_to_pos	layer_numlayer_taskslayer_ycol_num	task_infoadjust_col_numcol_xr  chardownstream_tasks
upstream_y
upstream_xdownstream_ydownstream_xyxoutput_tasksoutput_taskr  output_youtput_xr  r  s@                                                                 @@rE   _visualize_asciizCompiledDAG._visualize_asciiv
  s
   Z	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 t]++ 	43C 	>   )//11 	 	IC4,, Jt}g4V4V  SS S S S   	32222222 *5T):):#.;s#3#3 ,7;t+<+<'2{3'7'7 $&	02	)//11 +	J +	JIC}H#COOOE (I.. 1$H&899 1>x|>>>>Ho66 1**Ho66 10 
/"*":":"<"<K#+#=#=#?#?L<HW.2244RaR88i  JxJJ[JJJEE4 /Lh6ILLLLEE..EEh00"IcN"+H,=,=,?,?"@"@ J J	3c7++ J(,(<S(A% !3t':#;#;;;.y9NNPP V(5II(,T-@-K(L(L(UII$1	./66s;;;SMMMQ&MMM$$&7i%HIIIJ" 191A1A 	F 	F-~>""Q&&+ < <E-1OE**;L''"<^1D1DEE T""UUUUd.>UUUVV 
	J 6(0022{#**8444"*8"4 6 6JZ(((A-(((
+q00"))*555   6 'M1K  
	 	55"** 	@ 	@ICc#?#?D#?#?#?? 	779B 	 	5M?IM))$$ JJ<JJJJJ
 	},SSPP @@@@@@@<O	V UTTTeFQJQRN>S>STTT &,llnn !	9 !	9"I{!mG%.{%;%; 9 9!'1'NNN	 dm_== >}9 5&*m&C&C&E&E'+}'F'F'H'H  ,+L26688!<<!* !
 "%Fh%F%F%F%FF		= 5!%Jt}/G%J%J%JJ		!_4		dm!4!4!==I!"..%+IM%:%@%@hAW%X%XN >1R7(33 8 8GAtqy3tAw<<//37Weai0)0%(8H%%?9D 08~~/?/? 	; 	;+M+%0%?"J
#3 ; ;-8-I*l zA~|<< 2 2AAwz*c11.1Q
+ --"J559J55     $L1$45a8C??  C!%lQ&6!7!: \A-.q11 $j00CF\A-.|a/?@@CF\A-.|a/?@ 8;\A%&|447;< )//11 	? 	?IC$-99 ?#}5577&/&=&= ? ?NA{!+w77 ?*.*>{*K*k991<_1M.Hh;>DA.x8 	11tyy(F(F(F(F(FFFF""rG   r  downstream_actor_idc                    t          |          j        }|| j        v rd| j        |         |k    rS| j        |         }|dt          |          j         z  }t          |          t          u r|d|j        dd          dz  }t          |          t
          u r\||j        v rS|j        |         }|dt          |          j         z  }t          |          t          u r|d|j        dd          dz  }|S )a  
        Get details about outer and inner channel types and channel ids
        based on the channel and the downstream actor ID.
        Used for graph visualization.
        Args:
            channel: The channel to get details for.
            downstream_actor_id: The downstream actor ID.
        Returns:
            A string with details about the channel based on its connection
            to the actor provided.
        r  rM   Nr  z...)r   r   r  r0   _channel_idr&   r'   )rv   r  r  channel_detailsinner_channels        rE   get_channel_detailszCompiledDAG.get_channel_details  s    w--0d(((T-?-HG-S-S(1G<DMM$:<<<OG}}--#D(;BQB(?#D#D#DD MM---#w'<<<#12EFMBD$7$7$@BBBOM""&999#J(A"1"(E#J#J#JJrG   compiled_graphpngc           	      

   |dk    r8|rt          d          |                                 }|rt          |           |S 	 ddl}n# t          $ r t	          d          w xY wddlm}m}m}	m	}
m
} t          | d          r| j        st          d          | j                                        D ]=\  }}t          |d	          rt          |j        |          st          d
| d          >|                    d|          }t#          fd          | j                                        D ]\  }}|j        }d| d}d}d}d}t          ||
          r|dz  }d}d}nt          ||	          r|d|j         dz  }d}d}nt          ||          r
|dz  }d}d}nt          ||          r|j        r|                                }|                                }|rP|j        j        }|j                                        }|d| dz  }|d|dd          dz  }|d| z  }|         }n
|d| z  }d }d}n?|j        r|d!|j         dz  }d}d"}n%|d#z  }d$}d%}n|t9          |          j        z  }d$}d%}|                    t?          |          ||||&           |r%|j         rt9          |j                   j        nd'dz   nd}tC          |j"                  d(k    r|j        j#        D ]}| j$        |         }d}|r~|}|| %                    |j"        d         t9          |          |u r+|                                j                                        n| j&        j                                                  z  }|'                    t?          |          t?          |          |)           ntC          |j"                  d(k    rtC          |j(                  tC          |j"                  k    sJ tS          |j"        |j*                  D ]\  }}d}|rI|}|| %                    ||j                                        j                                                  z  }|'                    t?          |          t?          |          |)           t9          |j                  |	u r5|'                    t?          | j+                  t?          |                     |,                    ||*           |j-        S )+a  
        Visualize the compiled graph by showing tasks and their dependencies.
        This method should be called **after** the graph has been compiled using
        `experimental_compile()`.

        Args:
            filename: For non-ASCII formats, the output file name (without extension).
                For ASCII format, the visualization will be printed to the console,
                and this argument is ignored.
            format: The format of the output file (e.g., 'png', 'pdf', 'ascii').
            view: For non-ASCII formats: Whether to open the file with the default
                viewer. For ASCII format: Whether to print the visualization and return
                None or return the ascii visualization string directly.
            channel_details: If True, adds channel details to edges.

        Returns:
            The string representation of the compiled graph. For Graphviz-based formats
            (e.g., 'png', 'pdf', 'jpeg'), returns the Graphviz DOT string representation
            of the compiled graph. For ASCII format, returns the ASCII string
            representation of the compiled graph.

        Raises:
            ValueError: If the graph is empty or not properly compiled.
            ImportError: If the `graphviz` package is not installed.

        asciizDParameters 'channel_details' are not compatible with 'ascii' format.r   NznPlease install graphviz to visualize the compiled graph. You can install it by running `pip install graphviz`.r  r  r  r   r  r  r  )r?  formatc                  <    dt                     dz  dz   dz  dS )N#i  i]  i 06Xr  )actor_id_to_colors   rE   r  z'CompiledDAG.visualize.<locals>.<lambda>3  s+    R#/0047%?8KRRR rG   r  r  ovalfilledr   r  	rectangle	lightbluer  r  r  yellowr  zID: r  z...
zMethod: 
lightgreenr  oranger  diamondred)shapestyle	fillcolorr  rW   )r  )view).rb   r  printgraphvizr   rY   r  r  rL   r  r  r   r  r?   r]   r   Digraphr   rT   r  r  r  '_ray_actor_creation_function_descriptor
class_namer  rp  r  r  r   r   r  rQ   r   ra   r   _downstream_nodesr  r  r  edger   zipr   r  rendersource)rv   filenamer  r  r  ascii_visualiztion_strr  r  r  rL   r  r  r   r   dotr   r  r  r  r  r   rQ  r
  r   channel_type_strr  downstream_idx
edge_labelrx   r  s                                @rE   	visualizezCompiledDAG.visualize  s   B W  ;   &*%:%:%<%<" .,---))	OOOO 	 	 	H  	
	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 t]++ 	43C 	>   )//11 	 	IC4,, Jt}g4V4V  SS S S S   $4VDD (RRRR
 
 )//11 f	= f	=IC}H#COOOEEEI (I.. ,"$#'		H&899 (">x|>>>>#'		Ho66 $"**#$		Ho66  "0 &"*":":"<"<K$6688E 1!IT # $)?#6#6#8#8!9:!9!9!99!;!!;!;!;;!9K!9!99$5h$?		!9K!9!99$0	"EE4 	&Lh6ILLLLE'E (II ..E%E %II h00!!	 HHSXXuE)HTTT #)'D+,,55&    4'((A--'+}'F N NO%)%9/%JN!%J& 	%5
"d&>&> 03 $(#8#8O#K#K !0 A A C C M Q Q S S S%)%6%@%D%D%F%F' ' 
 HHSXXs>':':*HMMMMN T)**Q..4+,,D4H0I0IIIII69($*?7 7 
N 
N2NN "&J& %5
"d&>&>* M;;==GKKMM' ' 
 HHSXXs>':':*HMMMMDM""&888T0113s88<<<

8$
'''zs   A A c                 Z   | j         J | j        J | j        | j                  }|j        j                                         | j        D ]-}| j        |         }|j        j                                         .| j        | j                 j        D ]}|j                                         dS )z[
        Register custom serializers for input, input attribute, and output nodes.
        N)r  r  r  r   r   r!  r  r   )rv   r  input_attr_task_idxr  r  s        rE   r  z4CompiledDAG._register_input_output_custom_serializer  s     "...#/// %d&9:
%@@BBB $(#< 	L 	L"./BCO$.IIKKKK &t';<A 	: 	:F779999	: 	:rG   r=   c                     | j         rdS t          | dd          }|Kddlm} |                                }|                    |           |                    |j                   d| _         dS )a  
        Teardown and cancel all actor tasks for this DAG. After this
        function returns, the actors should be available to execute new tasks
        or compile a new DAG.

        Note: This method is automatically called when the CompiledDAG is destructed
        or the script exits. However, this should be explicitly called before compiling
        another graph on the same actors. Python may not garbage collect the
        CompiledDAG object immediately when you may expect.
        Nr  r   r]  r<   rI  T)r  r:  rY   r^  r   r@   r`   rK  )rv   r=   ra  r^  r  s        rE   r@   zCompiledDAG.teardown  s      	F$
D11******((**C555LL!5L666 !rG   c                 .    |                                   d S rO   )r@   r   s    rE   __del__zCompiledDAG.__del__  s    rG   NNFNNNrU  )r  r   rJ   N)rJ   NrH  rO   r  )r  r  FF)?r   r   r   r   rq   rf  rT  r
   rN  r   rI  r   r1   rQ   r   r   r  r  r   r  r  r  r   r  r  r  staticmethodr	   r  r  r'  r   r   r2  r   r  r  r  re  rh  r   rm  ru  rw  r}  r  r  r  rt  r  r7   r9  r   r  r6   r  r  r#   r  r  r  r@   r  r   rG   rE   rP  rP  $  s         SZ       $ +/+/$15.248CKeC eC eC $C=eC 	eC
 "*#eC 'smeC $,D>eC 'u\3->'?@eC eC eC eCN !T ! ! ! X!    . . . . .   a1 a1 a1 a1F>L >L >L >LH $	NJ NJ#NJ +,NJ 	NJ
 
NJ NJ NJ NJ`!.1 
   @   : 	.#: 	.tCy 	. 	. 	. \	.2I)J s    0r)	r) r) r) r)hN(	%tD1G,H'II	JN( N( N( N(`4L	%t,='>>	?4L 4L 4L 4Ll$    0I I IV  66 
6 6 6 6 II I I I I,)")3;C=)	c) ) ) )V0 0S 0 0 0 0&#    @ ?C0 0!0,4UO0	0 0 0 0f ?C? ?!?,4UO?	? ? ? ?** * *($ $ $ $ $. (,#'	E* E*E*  }E* %	E* E* E* E*N< 
~tN33	4	< < < <|%S/ 4S> d    &1 
 $'8"99	:	1 1 1 1fM## M# M# M# M#^
'>A	   D "y y 
y y y yv: : :(! !D ! ! ! !4    rG   rP  rU  dagr   rV  rW  rX  rY  rZ  r[  c           	         
 t          |||||||          

fd}|                                 }	|	                    |           
                                 
t          
                                <   
S )Nc                 2                         |            | S rO   )r  )r  rD   s    rE   _build_compiled_dagz<build_compiled_dag_from_ray_dag.<locals>._build_compiled_dag  s    t$$$rG   )rP  
_find_roottraverse_and_applyr'  r>   r  )r  rV  rW  rX  rY  rZ  r}   r[  r!  rootrD   s             @rE   build_compiled_dag_from_ray_dagr%    s     ! L     >>D/000  """,8N<&&(()rG   rO   rH  r  )jri  r   r@  r   r   rn  rA   r  r   
contextlibr   dataclassesr   r   typingr   r   r	   r
   r   r   r   rq   ray.exceptionsr  r   r   r   ray.dag.dag_node_operationr   r   r   r   r   r   r   r   ray.dag.dag_operation_futurer   r   r   r   r   r   r   ray.experimental.channelr    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r   r-   ,ray.experimental.channel.auto_transport_typer.   r/   'ray.experimental.channel.cached_channelr0   %ray.experimental.channel.communicatorr1   .ray.experimental.channel.shared_memory_channelr2   9ray.experimental.channel.torch_tensor_accelerator_channelr3   r4   *ray.experimental.channel.torch_tensor_typer5   !ray.experimental.compiled_dag_refr6   r7   r8   ray.util.annotationsr9   ray.util.scheduling_strategiesr:   	getLoggerr   r   rB   r>   rF   rQ   rk   ry   rI  r   r   r   r   r   r   r   r   r{   r   rP  rN  r   r%  r   rG   rE   <module>r7     sb                # # # # # # " " " " " " ) ) ) ) ) ) ) )                  


             
	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 W V V V V V V V V V                                       L K K K K K        B A A A A A > > > > > >             G F F F F F         
 . - - - - - I I I I I I		8	$	$ -,..3 3 3+
*+
>A#h+
X+
 +
 +
 +
\ 
 &*	 u%<c%ABC 
 c]	
    D 
 ',	J J !J $%J  $	J
 
J J J JZ 
 ',	2 2 !2 $%2  $	2
 
2 2 2 2j D1A,B t    
 
 
"3     $? ? ?6 8 8 8 8 8 8 8 8v       B t) t) t) t) t) t) t) t)n	         j& j& j& j& j& j& j& j&ZM  '+'+ -1*.04?G 	UO  } 	
 &c] #3-  (~ #5s):#;<      rG   