
    &`ir                        d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z!  ed          Z"e! G d de                      Z#dS )    N)chain)	AnyCallableDictListLiteralOptionalTupleTypeVarUnion)DAGNodeBase)build_compiled_dag_from_ray_dag)_PyObjScanner)ChannelOutputType)AutoTransportType)Communicator)TorchTensorType)Device)DeveloperAPITc                      e Zd ZdZdee         deeef         deeef         deeef         fdZde	d          fdZ
	 	 	 	 dDdeeeef                  ded         dedefdZedefd            Zej        deddfd            Zdee         fdZdeeef         fdZdeeef         fdZdeeef         fdZdefdZdeeef         fdZd Z	 	 	 	 	 	 	 dEdee         dee         d ed!ee         d"ee         d#ee         d$eeeef                  dd%fd&Zdd'd(edeej         d)f         fd*Z!de	d          fd+Z"de	d          fd,Z#	 	 	 	 dFd/Z$d-d.de%fd0Z&dGd1Z'd2 Z(dHd3Z)d4ed5e*d6e*fd7Z+deej         d)f         fd8Z,d9e	e         d:eeef         d;eeef         d<eeef         dd f
d=Z-d9e	e         d:eeef         d;eeef         d<eeef         dd f
d>Z.d? Z/d@eeef         fdAZ0dBefdCZ1dS )IDAGNodea5  Abstract class for a node in a Ray task graph.

    A node has a type (e.g., FunctionNode), data (e.g., function options and
    body), arguments (Python values, DAGNodes, and DAGNodes nested within Python
    argument values) and options (Ray API .options() used for function, class
    or class method)
    argskwargsoptionsother_args_to_resolvec                 &   |pg | _         |pi | _        |pi | _        |pi | _        g | _        t          j                    j        | _        d| _	        | 
                                | _        i | _        t                      | _        d| _        d| _        dS )a7  
        args:
            args (Tuple[Any]): Bound node arguments.
                ex: func_or_class.bind(1)
            kwargs (Dict[str, Any]): Bound node keyword arguments.
                ex: func_or_class.bind(a=1)
            options (Dict[str, Any]): Bound node options arguments.
                ex: func_or_class.options(num_cpus=2)
            other_args_to_resolve (Dict[str, Any]): Bound kwargs to resolve
                that's specific to subclass implementation without exposing
                as args in base class, example: ClassMethodNode
        FN)_bound_args_bound_kwargs_bound_options_bound_other_args_to_resolve_downstream_nodesuuiduuid4hex_stable_uuid_args_contain_nested_dag_node_collect_upstream_nodes_upstream_nodescache_from_last_executer   
_type_hint_original_type_hintis_cgraph_output_node)selfr   r   r   r   s        d/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/dag/dag_node.py__init__zDAGNode.__init__)   s    & (,zr-3\r.5m!'R 	)
 35 !JLL,
 .3* 150L0L0N0N (*$->-@-@
 AE  &+"""    returnc                 X   g }t          | j        d          sJ | j        D ]}t          |t                    r|                    |           -t                      }|                    |          }|                    |           |                                 t          |          dk    | _
        t                      }|                    | j        | j        g          }|                    |           |                                 |D ]}|j                            |            |S )a  
        Retrieve upstream nodes and update their downstream dependencies.

        Currently, the DAG assumes that all DAGNodes in `args`, `kwargs`, and
        `other_args_to_resolve` are upstream nodes. However, Ray Compiled Graphs
        builds the upstream/downstream relationship based only on args. Be cautious
        when persisting DAGNodes in `other_args_to_resolve` and kwargs in the future.

        TODO (kevin85421): Currently, the upstream nodes and downstream nodes have
        circular references. Therefore, it relies on the garbage collector to clean
        them up instead of reference counting. We should consider using weak references
        to avoid circular references.
        __iter__r   )hasattrr   
isinstancer   appendr   
find_nodesextendclearlenr'   r   r!   r"   )r.   upstream_nodesargscanner	dag_nodesother_upstream_nodesupstream_nodes          r/   r(   zDAGNode._collect_upstream_nodes^   s8    +- t'44444# 	H 	HC#w'' H%%c****'//#..s33	%%i00058^^a5G22//070B0B"11
 1
 	2333+ 	9 	9M+2248888r1   autodefaultF	transportdevice)rC   cpugpucuda_static_shape_direct_returnc                 ,   	 t          |          }nH# t          $ r; d                    d t           D                       }t          d| d| d          w xY w|dk    rt          |||          | _        n|dk    rt          d	|||
          | _        n}|d	k    rt          d	|||
          | _        n^|dk    rt          |||          | _        n@t          |t                    st          d| d          t          ||||
          | _        | S )a  
        Configure the torch tensor transport for this node.

        Args:
            transport: Specifies the tensor transport mechanism.
                - "accelerator": Tensors are communicated using accelerator-specific backends
                (e.g., NCCL, XLA, or vendor-provided transport). This is the recommended option
                for most use cases, as it supports extensibility and future hardware backends.
                - "nccl": Tensors are passed explicitly via NCCL. This option is kept for
                backwards compatibility and may be removed in the future. Use "accelerator"
                instead unless you have legacy requirements.
                - "shm": Tensors are passed via host shared memory and gRPC. Typically used
                when accelerator-based transport is unavailable or not suitable.
                - "auto" (default): The system automatically selects the appropriate transport
                mechanism based on the sender and receiver, usually preferring accelerator-based
                transport when available.
            device: The target device to use for the tensor transport.
                "default": The tensor will maintain its original device placement from the sender
                "cpu": The tensor will be explicitly moved to CPU device in the receiver
                "gpu" or "cuda": The tensor will be explicitly moved to GPU device in the receiver
            _static_shape: A hint indicating whether the shape(s) and dtype(s)
                of tensor(s) contained in this value always remain the same
                across different executions of the DAG. If this is True, the
                transport will be more efficient.
            _direct_return: Whether the tensor is sent directly or inside of
                other data. If a "nccl" transport is used, this allows the
                sender and receiver to eliminate performance overhead from
                an additional data transfer.
        z, c              3   ,   K   | ]}d |j          d V  dS )'N)value).0ds     r/   	<genexpr>z0DAGNode.with_tensor_transport.<locals>.<genexpr>   s,      %E%En!'nnn%E%E%E%E%E%Er1   zInvalid device 'z'. Valid options are: .rB   )rE   rI   rJ   ncclaccelerator)rD   rE   rI   rJ   shmzInvalid transport type: zd. Transport must be one of 'auto', 'nccl', 'shm', 'accelerator' or an instance of Communicator type.)r   
ValueErrorjoinr   r+   r   r6   r   )r.   rD   rE   rI   rJ   valid_devicess         r/   with_tensor_transportzDAGNode.with_tensor_transport   s   H	F^^FF 	 	 	 II%E%Ef%E%E%EEEMQ6QQQQQ  	
 /+-  DOO
 &  -'+-	  DOO -''-'+-	  DOO %-+-  DOO i66  8y 8 8 8  
 .#+-	  DO s
    AAc                     | j         S N)r+   r.   s    r/   	type_hintzDAGNode.type_hint   s
    r1   r]   Nc                 `    t          | j        t                    r| j        | _        || _        d S r[   )r6   r+   r   r,   )r.   r]   s     r/   r]   zDAGNode.type_hint   s,    do'899 	7'+D$#r1   c                     | j         S )z,Return the tuple of arguments for this node.)r   r\   s    r/   get_argszDAGNode.get_args   s     r1   c                 4    | j                                         S )z3Return the dict of keyword arguments for this node.)r   copyr\   s    r/   
get_kwargszDAGNode.get_kwargs   s     !&&(((r1   c                 4    | j                                         S )z3Return the dict of options arguments for this node.)r    rb   r\   s    r/   get_optionszDAGNode.get_options   s     "'')))r1   c                 4    | j                                         S )zAReturn the dict of other args to resolve arguments for this node.)r!   rb   r\   s    r/   get_other_args_to_resolvez!DAGNode.get_other_args_to_resolve   s    055777r1   c                     | j         S )zReturn stable uuid for this node.
        1) Generated only once at first instance creation
        2) Stable across pickling, replacement and JSON serialization.
        )r&   r\   s    r/   get_stable_uuidzDAGNode.get_stable_uuid   s    
   r1   c                    K   i }| j                                         D ]0\  }}t          |t          j                  r| d{V ||<   +|||<   1|S )zGets cached object refs from the last call to execute().

        After this DAG is executed through execute(), retrieves a map between node
        UUID to a reference to the return value of the default executor on that node.
        N)r*   itemsr6   asyncioTask)r.   cache	node_uuidrN   s       r/   !get_object_refs_from_last_executez)DAGNode.get_object_refs_from_last_execute  so        $ < B B D D 	) 	)Iu%.. )).;;;;;;i  #(i  r1   c                     i | _         d S r[   )r*   r\   s    r/   clear_cachezDAGNode.clear_cache  s    ')$$$r1   create_submit_timeout_buffer_size_bytesenable_asyncio_max_inflight_executions_max_buffered_results_overlap_gpu_communication_default_communicatorzray.dag.CompiledDAGc           
          ddl m} |                                }	||	j        }| j        rt          d          d| _        t          | |||||||          S )a  Compile an accelerated execution path for this DAG.

        Args:
            _submit_timeout: The maximum time in seconds to wait for execute() calls.
                None means using default timeout, 0 means immediate timeout
                (immediate success or timeout without blocking), -1 means
                infinite timeout (block indefinitely).
            _buffer_size_bytes: The initial buffer size in bytes for messages
                that can be passed between tasks in the DAG. The buffers will
                be automatically resized if larger messages are written to the
                channel.
            enable_asyncio: Whether to enable asyncio for this DAG.
            _max_inflight_executions: The maximum number of in-flight executions that
                can be submitted via `execute` or `execute_async` before consuming
                the output using `ray.get()`. If the caller submits more executions,
                `RayCgraphCapacityExceeded` is raised.
            _max_buffered_results: The maximum number of results that can be
                buffered at the driver. If more than this number of results
                are buffered, `RayCgraphCapacityExceeded` is raised. Note that
                when result corresponding to an execution is retrieved
                (by calling `ray.get()` on a `CompiledDAGRef` or
                `CompiledDAGRef` or await on a `CompiledDAGFuture`), results
                corresponding to earlier executions that have not been retrieved
                yet are buffered.
            _overlap_gpu_communication: (experimental) Whether to overlap GPU
                communication with computation during DAG execution. If True, the
                communication and computation can be overlapped, which can improve
                the performance of the DAG execution. If None, the default value
                will be used.
            _default_communicator: The default communicator to use to transfer
                tensors. Three types of values are valid. (1) Communicator:
                For p2p operations, this is the default communicator
                to use for nodes annotated with `with_tensor_transport()` and when
                shared memory is not the desired option (e.g., when transport="nccl",
                or when transport="auto" for communication between two different GPUs).
                For collective operations, this is the default communicator to use
                when a custom communicator is not specified.
                (2) "create": for each collective operation without a custom communicator
                specified, a communicator is created and initialized on its involved actors,
                or an already created communicator is reused if the set of actors is the same.
                For all p2p operations without a custom communicator specified, it reuses
                an already created collective communicator if the p2p actors are a subset.
                Otherwise, a new communicator is created.
                (3) None: a ValueError will be thrown if a custom communicator is not specified.

        Returns:
            A compiled DAG.
        r   )
DAGContextNzIt is not allowed to call `experimental_compile` on the same DAG object multiple times no matter whether `teardown` is called or not. Please reuse the existing compiled DAG or create a new one.T)ray.dagr|   get_currentbuffer_size_bytesr-   rV   r   )
r.   rt   ru   rv   rw   rx   ry   rz   r|   ctxs
             r/   experimental_compilezDAGNode.experimental_compile  s    t 	'&&&&&$$&&%!$!6 % 	N   &*".$!&!	
 	
 		
r1   )_ray_cache_refsr   zray.actor.ActorHandlec                \    fd}|                      |          }|r|j        | _        |S )a  Execute this DAG using the Ray default executor _execute_impl().

        Args:
            _ray_cache_refs: If true, stores the default executor's return values
                on each node in this DAG in a cache. These should be a mix of:
                - ray.ObjectRefs pointing to the outputs of method and function nodes
                - Serve handles for class nodes
                - resolved values representing user input at runtime
        c                      | j         i S r[   )_execute_impl)noder   r   s    r/   executorz!DAGNode.execute.<locals>.executorz  s    %4%t6v666r1   )apply_recursivern   r*   )r.   r   r   r   r   results     ``  r/   executezDAGNode.executem  sN    	7 	7 	7 	7 	7 	7 %%h// 	:+3>D(r1   c                    g }|                                  D ]0}t          |t                    r||vr|                    |           1|                                                                 D ]0}t          |t                    r||vr|                    |           1|                                                                 D ]0}t          |t                    r||vr|                    |           1|S )aT  Return the list of nodes specified as top-level args.

        For example, in `f.remote(a, [b])`, only `a` is a top-level arg.

        This list of nodes are those that are typically resolved prior to
        task execution in Ray. This does not include nodes nested within args.
        For that, use ``_get_all_child_nodes()``.
        )r`   r6   r   r7   rc   valuesrg   )r.   childrenas      r/   _get_toplevel_child_nodesz!DAGNode._get_toplevel_child_nodes  s     	' 	'A!W%% 'H$$OOA&&&""))++ 	' 	'A!W%% 'H$$OOA&&&//1188:: 	' 	'A!W%% 'H$$OOA&&&r1   c                     t                      }g }|                    | j        | j        | j        g          D ]}||vr|                    |           |                                 |S )a  Return the list of nodes referenced by the args, kwargs, and
        args_to_resolve in current node, even they're deeply nested.

        Examples:
            f.remote(a, [b]) -> [a, b]
            f.remote(a, [b], key={"nested": [c]}) -> [a, b, c]
        )r   r8   r   r   r!   r7   r:   )r.   r>   r   ns       r/   _get_all_child_nodeszDAGNode._get_all_child_nodes  s|      // ## "1
 
 	# 	#A   """r1   fnCallable[[DAGNode], T]c                 H   i }t                      }|                    | j        | j        | j        g          D ]}||vr ||          ||<   |                    |          \  }}}|                                 |                     |||                                 |          S )ap  Apply and replace all immediate child nodes using a given function.

        This is a shallow replacement only. To recursively transform nodes in
        the DAG, use ``apply_recursive()``.

        Args:
            fn: Callable that will be applied once to each child of this node.

        Returns:
            New DAGNode after replacing all child nodes.
        )	r   r8   r   r   r!   replace_nodesr:   _copyre   )r.   r   replace_tabler>   r   new_args
new_kwargsnew_other_args_to_resolves           r/   "_apply_and_replace_all_child_nodesz*DAGNode._apply_and_replace_all_child_nodes  s       // && "1
 
 	/ 	/D =((&(bhhd#:A:O:O;
 ;
7*7 	 zzj$"2"2"4"46O
 
 	
r1   c                     t                    j        dk    s G d d          } |          n | j        j        v rj        | j                 S  |                     fd                    S )a  Apply callable on each node in this DAG in a bottom-up tree walk.

        Args:
            fn: Callable that will be applied once to each node in the
                DAG. It will be applied recursively bottom-up, so nodes can
                assume the fn has been applied to their args already.

        Returns:
            Return type of the fn after application to the tree.
        
_CachingFnc                       e Zd Zd ZddZdS )+DAGNode.apply_recursive.<locals>._CachingFnc                 R    i | _         || _        | j         | j        _         d | _        d S r[   )rn   r   input_node_uuid)r.   r   s     r/   r0   z4DAGNode.apply_recursive.<locals>._CachingFn.__init__  s)    !#DJ DG$(JDGM+/D(((r1   r   r   c                    ddl m} |j        | j        vr"|                     |          | j        |j        <   t          ||          r3| j        s|j        | _        n| j        |j        k    rt          d          | j        |j                 S )Nr   	InputNodez/Each DAG should only have one unique InputNode.)ray.dag.input_noder   r&   rn   r   r6   r   AssertionError)r.   r   r   s      r/   __call__z4DAGNode.apply_recursive.<locals>._CachingFn.__call__  s    <<<<<<(
::8<
4#45!$	22 #3 373DD00!1T5FFF"0 Q# #   :d&788r1   N)r   r   )__name__
__module____qualname__r0   r    r1   r/   r   r     s7        0 0 09 9 9 9 9 9r1   c                 .    |                                S r[   )r   )r   r   s    r/   <lambda>z)DAGNode.apply_recursive.<locals>.<lambda>
  s    T11"55 r1   )typer   r&   rn   r   )r.   r   r   s    ` r/   r   zDAGNode.apply_recursive  s     Bxx L009 9 9 9 9 9 9 9* BBB BH,,x 122r335555 
 
 	
r1   c                    t                      }| g}d}|r|                    d          }|j        r|                     |j                   ||vr}|j        r|t          d| d|           |} ||           |                    |           	 t          j	        |j
        |j        g          D ]}||vr|                    |           |dS dS )z
        Traverse all nodes in the connected component of the DAG that contains
        the `self` node, and apply the given function to each node.
        Nr   z^The DAG was compiled more than once. The following two nodes call `experimental_compile`: (1) z, (2) )setpopr'   _raise_nested_dag_node_errorr   r-   rV   addr   from_iterabler"   r)   r7   )r.   r   visitedqueuecgraph_output_noder   neighbors          r/   traverse_and_applyzDAGNode.traverse_and_apply  s?   
 %%04 '	/99Q<<D1 D11$2BCCC7""- 	. *5(D#5D D=AD D  
 *.&4D!!!" !& 3+T-AB! ! / /H  w..X...O  '	/ '	/ '	/ '	/ '	/r1   c           
      8   |D ]}t          |t                    rt                      }|                    |g          }|                                 t          |          dk    r&t          dt          |           d| d|  d          t          d          )z
        Raise an error for nested DAGNodes in Ray Compiled Graphs.

        Args:
            args: The arguments of the DAGNode.
        r   zFound z DAGNodes from the arg z in z. Please ensure that the argument is a single DAGNode and that a DAGNode is not allowed to be placed inside any type of container.zA DAGNode's args should contain nested DAGNodes as args, but none were found during the compilation process. This is a Ray internal error. Please report this issue to the Ray team.)r6   r   r   r8   r:   r;   rV   r   )r.   r   r=   r>   r?   s        r/   r   z$DAGNode._raise_nested_dag_node_error@  s      	 	C#w'' '//#..u55	y>>A%%$BY B B B B"B B B   & L
 
 	
r1   c                     ddl m} | }t          ||          sHt          |j                  dk    rt          d| d          |j        d         }t          ||          H|S )zV
        Return the root node of the DAG. The root node must be an InputNode.
        r   r   zWNo InputNode found in the DAG: when traversing upwards, no upstream node was found for rR   )r   r   r6   r;   r)   rV   )r.   r   r   s      r/   
_find_rootzDAGNode._find_root[  s     	100000T9-- 	+4'((A-- >6:> > >   '*D T9-- 	+ r1   source_input_listpredicate_fnapply_fnc                     i }t                      }|                    |          D ]} ||          r||vr ||          ||<    |                    |          }|                                 |S )aa  
        Apply a given function to DAGNodes in source_input_list, and return
        the replaced inputs without mutating or coping any DAGNode.

        Args:
            source_input_list: Source inputs to extract and apply function on
                all children DAGNode instances.
            predicate_fn: Applied on each DAGNode instance found and determine
                if we should apply function to it. Can be used to filter node
                types.
            apply_fn: Function to apply on the node on bound attributes. Example::

                apply_fn = lambda node: node._get_serve_deployment_handle(
                    node._deployment, node._bound_other_args_to_resolve
                )

        Returns:
            replaced_inputs: Outputs of apply_fn on DAGNodes in
                source_input_list that passes predicate_fn.
        )r   r8   r   r:   )r.   r   r   r   r   r>   r   replaced_inputss           r/   apply_functionalzDAGNode.apply_functionalk  s    4 //&&'899 	5 	5D|D!! 5d-&?&?&.htnnd#!//>>r1   c                     t           )z?Execute this node, assuming args have been transformed already.NotImplementedError)r.   r   r   s      r/   r   zDAGNode._execute_impl  s
     "!r1   r   r   new_optionsr   c                     t           z3Return a copy of this node with the given new args.r   )r.   r   r   r   r   s        r/   
_copy_implzDAGNode._copy_impl  s
     "!r1   c                     |                      ||||          }| j        |_        t          j        | j                  |_        t          j        | j                  |_        |S r   )r   r&   rb   deepcopyr+   r,   )r.   r   r   r   r   instances         r/   r   zDAGNode._copy  s\     ??j+/H
 
 !% 1"mDO<<'+}T5M'N'N$r1   c                     | j         S )=Required due to overriding `__getattr__` else pickling fails.)__dict__r\   s    r/   __getstate__zDAGNode.__getstate__  s
    }r1   rP   c                 :    | j                             |           dS )r   N)r   update)r.   rP   s     r/   __setstate__zDAGNode.__setstate__  s    Qr1   attrc                     |dk    r t          dt          |            d          |dk    r t          dt          |            d          |                     |          S )Nbindz .bind() cannot be used again on  remotez.remote() cannot be used on z:. To execute the task graph for this node, use .execute().)AttributeErrorr   __getattribute__)r.   r   s     r/   __getattr__zDAGNode.__getattr__  sz    6>> !QDJJ!Q!Q!QRRRX 7tDzz 7 7 7  
 ((...r1   )rB   rC   FF)NNFNNNrs   )r   r   r2   r   )r   r   )r2   r   )2r   r   r   __doc__r
   r   r   strr0   r   r(   r	   r   r   r   boolrY   propertyr   r]   setterr`   rc   re   rg   ri   rp   rr   floatintr   ray	ObjectRefr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r1   r/   r   r      sF        3+Cj3+ S#X3+ c3h	3+
  $CH~3+ 3+ 3+ 3+j-i - - - -b 9?;D#$R RE#|"345R 78R 	R
 R R R Rh ,    X $#4 $ $ $ $ $
 %*        
)DcN ) ) ) )
*T#s(^ * * * *
84S> 8 8 8 8! ! ! ! !c3h    * * *
 ,0,0$26/359DLT
 T
!%T
 %SMT
 	T

 #+3-T
  (}T
 %-TNT
  (lC.?(@AT
 
T
 T
 T
 T
n .3  &*	s}55	6   *4	?    :d9o    2&
*&
	&
 &
 &
 &
P,
": ,
q ,
 ,
 ,
 ,
\0/ 0/ 0/ 0/d
 
 
6    ## # 	# # # #J"	s}55	6" " " ""s)" cN" #s(^	"
 $(S>" 
" " " "s) cN #s(^	
 $(S> 
       d38n        	/ 	/ 	/ 	/ 	/ 	/ 	/r1   r   )$rl   rb   r#   	itertoolsr   typingr   r   r   r   r   r	   r
   r   r   r   ray.dag.baser   ray.dag.compiled_dag_noder   ray.dag.py_obj_scannerr   ray.experimental.channelr   ,ray.experimental.channel.auto_transport_typer   %ray.experimental.channel.communicatorr   *ray.experimental.channel.torch_tensor_typer   ray.experimental.util.typesr   ray.util.annotationsr   r   r   r   r1   r/   <module>r      s           
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 $ $ $ $ $ $ E E E E E E 0 0 0 0 0 0 6 6 6 6 6 6 J J J J J J > > > > > > F F F F F F . . . . . . - - - - - -GCLL a
/ a
/ a
/ a
/ a
/k a
/ a
/ a
/ a
/ a
/r1   