
    &`i|              
          d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZ ej        e          Z G d de          Z G d d	          Ze G d
 d                      Z	 	 d+dedededefdZde
ee
eef         f         deddfdZde
dee         f         de
ee
eef         f         deddfdZde
dee         f         de
ee
eef         f         deee                  fdZde
edf         de
deee                  f         de
ee
eef         f         fdZd,d!Zded"ed#efd$Zd%e
dee         f         d&ee
dee         f                  de
ee
eef         f         fd'Z de
ee
eef         f         de
dee         f         fd(Z!d%e
dee         f         de
dee         f         fd)Z"d%e
dee         f         de
dee         f         fd*Z#dS )-    N)defaultdict)Enum)total_ordering)DictListOptionalSetTuplec                   $    e Zd ZdZdZdZdZd ZdS )_DAGNodeOperationTypez
    There are three types of operations that a DAG node can perform:
    1. READ: Read from an input channel.
    2. COMPUTE: Execute the method corresponding to the node.
    3. WRITE: Write to an output channel.
    READCOMPUTEWRITEc                     | t           j        k    rdS | t           j        k    rdS | t           j        k    rdS J d|              )z
        A string representation of the operation type to be used in visualization.

        The result string is a single character because conciseness is preferred.
        RCWFzUnknown operation type: )r   r   r   r   selfs    n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/dag/dag_node_operation.pyviz_strz_DAGNodeOperationType.viz_str   sV     (---3*2223*0003777777u    N)__name__
__module____qualname____doc__r   r   r   r    r   r   r   r      s>          DGE8 8 8 8 8r   r   c                   J    e Zd Z	 d
dededee         fdZd Zd Z	d Z
d	 ZdS )_DAGNodeOperationNexec_task_idxoperation_typemethod_namec                 0    || _         || _        || _        dS )a#  
        Args:
            exec_task_idx: The index of the task that this operation belongs to
                in the actor's ExecutableTask list. The index is not the same
                as bind_index because there may be more tasks bound to an actor
                than tasks that appear in the current compiled DAG.
            operation_type: The type of operation to perform.
            method_name: The name of the method that this operation originates
                from. This is only for visualization and debugging purposes.
        Nr    typer"   )r   r    r!   r"   s       r   __init__z_DAGNodeOperation.__init__*   s!      +"	&r   c                 8    d| j          d| j         d| j         dS )Nz!_DAGNodeOperation(exec_task_idx: z, type: z, method_name: )r$   r   s    r   __repr__z_DAGNodeOperation.__repr__>   sB    0"00 0Y0 0 !,0 0 0	
r   c                 Z    d| j          d| j         d| j                                         S )R
        A string representation of the node to be used in visualization.
        [z]  )r    r"   r%   r   r   s    r   r   z_DAGNodeOperation.viz_strF   s7     R4%QQ)9QQDI<M<M<O<OQQQr   c                 8    t          | j        | j        f          S N)hashr    r%   r   s    r   __hash__z_DAGNodeOperation.__hash__L   s    T'3444r   c                 B    | j         |j         k    o| j        |j        k    S r/   )r    r%   r   others     r   __eq__z_DAGNodeOperation.__eq__O   s#     !U%88TTY%*=TTr   r/   )r   r   r   intr   r   strr&   r)   r   r1   r5   r   r   r   r   r   )   s        
 &*	' '' .' c]	' ' ' '(
 
 
R R R5 5 5U U U U Ur   r   c                   "   e Zd ZdededddefdZd Zdd	Zdd
Z	d Z
edefd            Zedefd            Zedefd            Zedefd            Zedefd            Zedefd            Zedefd            Zd Zed             ZdS )_DAGOperationGraphNode	operationtask_idxactor_handleray.actor.ActorHandlerequires_acceleratorc                     || _         || _        || _        || _        i | _        i | _        t                      | _        t                      | _        dS )a  
        _DAGOperationGraphNode represents a node in the DAG operation graph.
        It contains information about the node's in-degree, out-degree, edges,
        and the operation it performs.

        Args:
            operation: The operation that this node performs. The operation
                can be a READ, COMPUTE, or WRITE operation.
            task_idx: A unique index which can be used to index into
                `CompiledDAG.idx_to_task` to get the corresponding task.
            actor_handle: The actor handle to which this operation belongs.
            requires_accelerator: Whether this operation requires accelerator.
        N)	r:   r;   r<   r>   in_edges	out_edgesset	sync_idxspending_sync_idxs)r   r:   r;   r<   r>   s        r   r&   z_DAGOperationGraphNode.__init__W   sY    ( # ($8! TVTV BE
 JMr   c           	      R    d| j          d| j         d| j        j         d| j         d	S )Nz"_DAGOperationGraphNode(operation: z, task_idx: z, actor_id: z, requires_accelerator: r(   )r:   r;   r<   _ray_actor_idr>   r   s    r   r)   z_DAGOperationGraphNode.__repr__   s`    B.B BB B *8B B &*%>	B B B	
r   r4   c                     | j         |j         k    r| j         S | j        j        | j        f|j        j        |j        fk     S )z
        This function defines the order of the nodes in the priority queue used in
        `_select_next_nodes`. The priority queue is a min-heap, so the node with
        higher priority is considered "less than" the other node.
        )is_accelerator_opr:   r    r;   r3   s     r   __lt__z_DAGOperationGraphNode.__lt__   sN     !U%<<< )) N0$-@-D  r   c                     | j         |j         k    o3| j        j        |j        j        k    o| j        j        |j        j        k    S )z
        Two operations are equal only when they have the same `exec_task_idx` and `type`
        and belong to the same actor.
        )r<   r:   r    r%   r3   s     r   r5   z_DAGOperationGraphNode.__eq__   sE     !33 <,0MM<#u';;	
r   c                 8    t          | j        | j        f          S )zQ
        An operation is uniquely identified by its `task_idx` and type.
        )r0   r:   r;   r   s    r   r1   z_DAGOperationGraphNode.__hash__   s     T^T]3444r   returnc                 *    t          | j                  S r/   )lenr@   r   s    r   	in_degreez _DAGOperationGraphNode.in_degree   s    4=!!!r   c                 l    | j         dk    o)t          | j                  t          | j                  k    S )z
        If a node is not an accelerator operation, it is ready when it has a zero
        in-degree.
        If it is an accelerator operation, it is ready when all the nodes in the
        operation have zero in-degrees.
        r   )rO   rN   rD   rC   r   s    r   is_readyz_DAGOperationGraphNode.is_ready   s5     ~" 
&''3t~+>+>>	
r   c                 6    | j         j        t          j        k    S r/   )r:   r%   r   r   r   s    r   is_readz_DAGOperationGraphNode.is_read   s    ~"&;&@@@r   c                 D    | j         j        t          j        k    o| j        S )z^
        A node is an accelerator read if it is a read node and requires accelerator.
        )r:   r%   r   r   r>   r   s    r   is_accelerator_readz*_DAGOperationGraphNode.is_accelerator_read   s%     N#8#== *)	
r   c                 D    | j         j        t          j        k    o| j        S )zd
        A node is an accelerator compute if it is a compute node and requires accelerator.
        )r:   r%   r   r   r>   r   s    r   is_accelerator_computez-_DAGOperationGraphNode.is_accelerator_compute   s%     N#8#@@ *)	
r   c                 D    | j         j        t          j        k    o| j        S )z`
        A node is an accelerator write if it is a write node and requires accelerator.
        )r:   r%   r   r   r>   r   s    r   is_accelerator_writez+_DAGOperationGraphNode.is_accelerator_write   s%     N#8#>> *)	
r   c                 ,    | j         p| j        p| j        S r/   )rU   rW   rY   r   s    r   rH   z(_DAGOperationGraphNode.is_accelerator_op   s%     $ )*)(	
r   c                 4    | j                                         S )r+   )r:   r   r   s    r   r   z_DAGOperationGraphNode.viz_str   s     ~%%'''r   c                 >    | j         j                                        S r/   )r<   rF   hexr   s    r   	_actor_idz _DAGOperationGraphNode._actor_id   s     .22444r   N)r4   r9   )r   r   r   r   r6   boolr&   r)   rI   r5   r1   propertyrO   rQ   rS   rU   rW   rY   rH   r   r^   r   r   r   r9   r9   U   s       (O$(O (O .	(O
 #(O (O (O (OT
 
 
   (	
 	
 	
 	
5 5 5 "3 " " " X" 	
$ 	
 	
 	
 X	
 A A A A XA 
T 
 
 
 X
 
 
 
 
 X
 
d 
 
 
 X
 
4 
 
 
 X
( ( ( 5 5 X5 5 5r   r9    F	from_nodeto_nodelabelcontrol_dependencyc                 z    ||f| j         |j        |j        j        f<   ||f|j        | j        | j        j        f<   dS )a8  
    Add an edge from `from_node` to `to_node`.

    Args:
        from_node: The node from which the edge originates.
        to_node: The node to which the edge points.
        label: The label of the edge. This will be used to annotate the edge
            in the visualization of the execution schedule.
    N)rA   r;   r:   r%   r@   )rb   rc   rd   re   s       r   	_add_edgerg      sT      	GI)7+<+ABC
 	HGi()*=*BCDDDr   graphnoderL   c                     |j         |j        j        f}|j        D ]-\  }}| |         |         }|j                            |           .dS )z?
    Update the node as pending for its synchronous nodes.
    N)r;   r:   r%   rC   rD   add)rh   ri   idxr;   op_type	sync_nodes         r   _update_pending_sync_idxsro     s]     =$.-
.C!^ - -'(OG,	#'',,,,- -r   actor_to_candidateszray._raylet.ActorIDc                    |j         dk    s
J d            |j        rw|j        D ]o\  }}||         |         }|j                            |j        |j        j        f           |j        rt          |j                  dk    sJ t          ||           pt          |j                  dk    rt          ||           |j        rt          |j                  dk    r't          j        | |j        j                 |           dS |j        D ]:\  }}||         |         }t          j        | |j        j                 |           9dS dS )z
    Push the node with a zero in-degree to the candidates if its operation is ready.
    If it has synchronous nodes, its accelerator operation is not ready until all
    the nodes are pending, then all the nodes will be pushed to the candidates.
    r   z!Expected to have a zero in-degreeN)rO   rY   rA   r@   popr;   r:   r%   rU   rN   ro   rC   rQ   heapqheappushr<   r^   )rp   rh   ri   r;   rm   	read_nodern   s          r   _push_candidate_node_if_readyrv     s    >Q C   8!% 	8 	8Hgh0I""DM4>3F#GHHH0QS9K5L5LPQ5Q5Q5QQ%eY7777
4>a!%... } t~!##N#D$5$?@    
 &*^  !'!(OG4	'	(>(HI     r   c                 >   d}|                                  D ],}t          |          dk    r||d         |k     r|d         }-|dS |g}t          |j                  dk    r6|j        D ].\  }}||         |         }||k    r|                    |           /|D ]=}| |j        j                 }|                    |           t          j        |           >|D ]A}| |j        j                 }||v r)|                    |           t          j        |           B|S )a  
    This function selects the next nodes for the topological sort to generate
    execution schedule. If there are multiple candidate _DAGOperationGraphNodes,
    select the node with the top priority. The priority is defined in
    `_DAGOperationGraphNode.__lt__`.

    For the implementation details, we maintain a priority queue for each actor,
    where the head of the priority queue is the node with the smallest `exec_task_idx`.
    When a node has a zero in-degree, it is added to the corresponding actor's
    priority queue. For a node other than an accelerator collective node, it is ready to be
    executed if it has a zero in-degree. For an accelerator collective node, it is ready to
    be executed when all the nodes in its collective operation have zero in-degrees.

    If a node is an accelerator collective node, it updates the `ready_collective_nodes` of
    all the nodes in its collective operation. Unless all the nodes in its collective
    group have zero in-degrees, this node is removed from the candidate list.
    Eventually, exactly one accelerator collective node from its collective operation is
    selected from the candidate list.

    If the selected node is an accelerator write node, select all the downstream accelerator
    read nodes. If the selected node is an accelerator collective node, select all the accelerator
    compute nodes in its collective operation.

    Args:
        actor_to_candidates: A dictionary mapping an actor id to a list of
            candidate nodes. The list is maintained as a priority queue, so
            the head of the queue, i.e., `candidates[0]`, is the node with
            the smallest `bind_index`.
        graph: A dictionary mapping the index of a task to a dictionary of its
            _DAGOperationGraphNodes for different operations.

    Returns:
        A list of _DAGOperationGraphNodes to be placed into the corresponding
        execution schedules.
    Nr   )	valuesrN   rC   appendr<   r^   removers   heapify)rp   rh   top_priority_node
candidates
next_nodesr;   rm   ri   s           r   _select_next_nodesr   E  sk   N )0022 . .
z??a$
18I(I(I *1 t#$J &''1,,!2!< 	( 	(Hg?7+D(((!!$'''  " "():)DE
$j!!!!  & &():)DE
:d###M*%%%r   idx_to_taskz&ray.dag.compiled_dag_node.CompiledTaskactor_to_operation_nodesr=   c                 v   | sJ i }|                                 D ]\  }}d}|D ]}|d         j        }|d         |d         |d         }
}	}t          ||	           t          |	|
           |t          ||	dd           |	}||vsJ t          j        |t          j        |	t          j        |
i||<   ddlm}m	}m
} ddlm} |                                  D ]\  }}t          |j        |          st          |j        |          s1t          |j        |          r|j        j        rS|j        D ]S}| |         j        }t          ||          r!||         t          j                 }
t          ||          r|j        r| |         j        }|D ]|}||v rv||         t          j                 }t          |
||
j        rd	nd
           |
j        r=|t          j        f|t          j        fh}|
|fD ]}|j                            |           }||         t          j                 }t          |
||
j        rd	nd
           |
j        r=|t          j        f|t          j        fh}|
|fD ]}|j                            |           Ut)          t*                    }|                                  D ]W\  }}t          |j        |          r=|j        j        s1||j        j                                     |t          j        f           X|                                D ]}|D ]\  }}|||         |         _        |S )ai  
    Generate a DAG node operation graph by adding edges based on the
    following rules:

    #1  Add edges from READ to COMPUTE, and from COMPUTE to WRITE, which
        belong to the same task.
    #2  Add an edge from COMPUTE with bind_index i to COMPUTE with bind_index
        i+1 if they belong to the same actor.
    #3  Add an edge from WRITE of the writer task to READ of the reader task.

    This is the step one of building an execution schedule for each actor.

    Args:
        idx_to_task: A dictionary that maps the `task_idx` to the `CompiledTask`.
            `CompiledTask` contains information about a DAGNode and its downstream
            nodes.

        actor_to_operation_nodes: A dictionary that maps an actor handle to
            a list of lists of _DAGOperationGraphNode. For the same actor, the
            index of the outer list corresponds to the index of the ExecutableTask
            in the list of `executable_tasks` in `actor_to_executable_tasks`. In
            the inner list, the order of operations is READ, COMPUTE, and WRITE.

    Returns:
        A graph where each node is a _DAGOperationGraphNode. The key is `task_idx`,
        the index to retrieve its task from `idx_to_task`, and the value is a
        dictionary that maps the _DAGNodeOperationType (READ, COMPUTE, or WRITE)
        to the corresponding _DAGOperationGraphNode
    Nr         ra   T)ClassMethodNodeCollectiveOutputNodeMultiOutputNode)_CollectiveOperationacceleratorshm)itemsr;   rg   r   r   r   r   ray.dagr   r   r   ray.dag.collective_noder   
isinstancedag_nodeis_class_method_outputdownstream_task_idxsr>   rC   updater   rB   collective_oprk   rx   )r   r   rh   _operation_nodes_listprev_compute_nodeoperation_nodesr;   ru   compute_node
write_noder   r   r   r   taskdownstream_task_idxdownstream_dag_nodeconsumer_idxsconsumer_idxidxsri   collective_op_to_idxsrm   s                           r   _build_dag_node_operation_graphr     sB   F ;LNE#;#A#A#C#C   3 	 	O&q)2H""" &0|I i...lJ/// !,+\2tDDD ,5((((%*I%-|%+ZE(OO#	0 ONNNNNNNNN<<<<<< &++-- 60 60$t}o66	$-)=>>	 t}o66	4	 #'#< &	0 &	0"-.A"B"K-?? x)>)DEJ.@@'> !,,? @ U$1 < <L#u,,$),$78M8R$S	!&%-7-LWMMRW  
 &: <!)+@+F G!-/D/I J$D *4Y(? < < $ 5 5d ; ; ; ;123H3MNI!+!@Ke  
 . 04:;(*?*DE (3 0 0DN))$////M&	0V 	C  &++--  $t}&:;;	M8	 "$-"=>BB089   &,,.. 6 6!% 	6 	6Hg15E(OG$..	6 Lr   actorc                 \    | j         j        }| j                                        }d| d| S )z
    Returns the label of an actor in the visualization of the execution schedule.

    Args:
        actor: The actor to be represented.
    zActor class name: z
Actor ID: )'_ray_actor_creation_function_descriptor
class_namerF   r]   )r   r   actor_ids      r   _actor_viz_labelr     s9     >IJ"&&((HB
BBBBBr   rl   optimized_indexc                 \    |                                  d| d| z   }| j         d| }||fS )a7  
    Returns the visualization id and label of a node. The visualization id is unique
    across all nodes.

    Args:
        node: The node to be represented.
        idx: The index of the node in the execution schedule.
        optimized_index: The index of the node in the optimized execution schedule.
    r-   ,r   )r   r^   )ri   rl   r   node_viz_labelnode_viz_ids        r   _node_viz_id_and_labelr   +  sG     \\^^&A#&A&A&A&AAN^66n66K&&r   actor_to_execution_scheduleactor_to_overlapped_schedulec           
      d   	 ddl }n# t          $ r t          d          w xY w|                    d          }i }|| }|                                 D ]\  }}||         }d t	          |          D             }	|j                                        }
|                    d|
           5 }|                    |
t          |          	           t	          |          D ]U\  }}|	
                    |          }t          |||          \  }}||k    rd
nd}|                    |||           |||<   V	 ddd           n# 1 swxY w Y   |                                 D ]\  }}t	          |          D ]v\  }}||         }|j                                        D ]O\  }}|\  }}|\  }}||         |         }||         }|dk    rdnd}|rdnd}|                    |||||           Pw|                    d          5 }|                    dddd           |                    dd           d}|                    d|d           |                    ddd           ddd           n# 1 swxY w Y   t                              d            |                    d!d"d#$           dS )%a   
    Visualize the execution schedule for each actor.

    The visualization will be saved as a PNG file named `compiled_graph_schedule.png`.
    Details of the visualization: # noqa

        Node description format:
            [<task_index>] <method_name> <operation> <orig_index>, <overlap_index>

        Node description fields:
            operation: is R(READ), C(COMPUTE), or W(WRITE)
            orig_index: the index in the original execution schedule
            overlap_index: the index in the overlap-communication optimized execution schedule
            If this is different from orig_index, the node is highlighted in red color

        Node grouping:
            The nodes belonging to the same actor are grouped in the same rectangle
            The actor class name and the actor id are shown in the rectangle

        Edges:
            black color (without label): data dependency
            black color (annotated with "shm"): shared memory channel
            blue color (annotated with "accelerator): accelerator channel
            dashed edge: control dependency between compute operations

    Args:
        actor_to_execution_schedule: A dictionary that maps an actor handle to
            the execution schedule which is a list of operation nodes.
        actor_to_overlapped_schedule: A dictionary that maps an actor handle to the
            optimized execution schedule which is a list of operation nodes.
        graph: A graph where each node is a _DAGOperationGraphNode. The key is
            `task_idx`, the index to retrieve its task from `idx_to_task`, and
            the value is a dictionary that maps the _DAGNodeOperationType (READ,
            COMPUTE, or WRITE) to the corresponding _DAGOperationGraphNode. It is
            generated by `_build_dag_node_operation_graph`.
    r   NzrPlease install graphviz to visualize the execution schedule. You can install it by running `pip install graphviz`.DAG)commentc                     i | ]\  }}||	S r   r   ).0iri   s      r   
<dictcomp>z1_visualize_execution_schedule.<locals>.<dictcomp>{  s+     #
 #
 #
4D!#
 #
 #
r   cluster_)name)rankrd   redblack)colorr   bluedashedsolid)rd   r   stylecluster_legendLegendt20	lightgrey)rd   labellocfontsizebgcolorexample_nodez[0] bwd C 10,10
a  <<TABLE BORDER="0" CELLBORDER="0" CELLSPACING="0"><TR><TD ALIGN="LEFT"><B>Node description format:</B></TD></TR><TR><TD ALIGN="LEFT">[&lt;task_index&gt;] &lt;method_name&gt; &lt;operation&gt; &lt;orig_index&gt;, &lt;overlap_index&gt;</TD></TR><TR><TD></TD></TR><TR><TD ALIGN="LEFT"><B>Node description fields:</B></TD></TR><TR><TD ALIGN="LEFT">operation: is R(READ), C(COMPUTE), or W(WRITE)</TD></TR><TR><TD ALIGN="LEFT">orig_index: the index in the original execution schedule</TD></TR><TR><TD ALIGN="LEFT">overlap_index: the index in the overlap-communication optimized execution schedule</TD></TR><TR><TD ALIGN="LEFT">If this is different from orig_index, the node is highlighted in <FONT COLOR="red">red color</FONT></TD></TR><TR><TD></TD></TR><TR><TD ALIGN="LEFT"><B>Node grouping:</B></TD></TR><TR><TD ALIGN="LEFT">The nodes belonging to the same actor are grouped in the same rectangle</TD></TR><TR><TD ALIGN="LEFT">The actor class name and the actor id are shown in the rectangle</TD></TR><TR><TD></TD></TR><TR><TD ALIGN="LEFT"><B>Edges:</B></TD></TR><TR><TD ALIGN="LEFT">black color (without label): data dependency</TD></TR><TR><TD ALIGN="LEFT">black color (annotated with "shm"): shared memory channel</TD></TR><TR><TD ALIGN="LEFT"><FONT COLOR="blue">blue color</FONT> (annotated with "accelerator): accelerator channel</TD></TR><TR><TD ALIGN="LEFT">dashed edge: control dependency between compute operations</TD></TR></TABLE>>example_explanation	plaintext)shapeinvis)r   zLWriting compiled graph schedule visualization to compiled_graph_schedule.pngcompiled_graph_schedulepngF)formatview)graphvizImportErrorDigraphr   	enumeraterF   r]   subgraphattrr   getr   ri   rA   edgeloggerinforender)r   r   rh   r   dotnode_to_viz_idr   execution_nodesoverlapped_schedulenode_to_optimized_indexr   r   r   ri   r   r   r   r   out_edgeviz_inford   re   out_task_idxout_op_typeout_nodeout_node_viz_idr   legendexplanations                                r   _visualize_execution_scheduler   <  s   Z
 
 
 
D
 
 	

 

5

)
)C8:N#+ (C$"="C"C"E"E 3 3:5A#
 #
#,-@#A#A#
 #
 #
 &**,,\\4(44\55 		3MMx/?/F/FMGGG$_55 3 34"9"="=d"C"C.D!_/ /+^ "1A!5!57k>GGG'2t$$3		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 		3 #>"C"C"E"E   11 	 	GAt(.K&*n&:&:&<&< 	 	"(,4)),4)k .{;"0":"'="8"8g$6CGURW     		 
+	,	, J(S4UUU 	N$7888 	. 	);kJJJN$9III;J J J J J J J J J J J J J J J> KK	)   JJ(UJCCCCCs.    !,B
EE	
E	A"I33I7:I7c                    t          t                    }t          t                    }|                                 D ];\  }}|                                D ]!\  }}|j        dk    rt	          || |           "<t                      }	 t          ||           }|n|D ]=}||vsJ |                    |           ||j                 	                    |           >|D ]i}|j
        D ]_\  }}	| |         |	         }
|
|v r|
j                            |j        |j        j        f           |
j        dk    rt	          || |
           `jt!          |          t!          |           dz  k    s
J d            |S )a
  
    Generate an execution schedule for each actor. The schedule is a list of
    operation nodes to be executed. The function uses a topological sort
    algorithm to generate the schedule.

    Args:
        graph: A graph where each node is a _DAGOperationGraphNode. The key is
            `task_idx`, the index to retrieve its task from `idx_to_task`, and
            the value is a dictionary that maps the _DAGNodeOperationType (READ,
            COMPUTE, or WRITE) to the corresponding _DAGOperationGraphNode. It is
            generated by `_build_dag_node_operation_graph`.

    Returns:
        actor_to_execution_schedule: A dictionary that maps an actor handle to
            the execution schedule which is a list of operation nodes to be
            executed.
    r   TN   z Expected all nodes to be visited)r   listr   rO   rv   rB   r   rk   r<   ry   rA   r@   rr   r;   r:   r%   rN   )rh   r   rp   r   	node_dictri   visited_nodesnodesout_node_task_idxout_node_typer   s              r   %_generate_actor_to_execution_scheduler     s   2 	D   	D   P P9 (( 	P 	PGAt ~""-.A5$OOO	P EEMX ##6>>= 	H 	HD},,,,d###'(9:AA$GGGG 
	X 
	XD48N 	X 	X0!= !23MB},, !%%t}dn6I&JKKK%**12EuhWWW	X#X6 }Ua///1S///&&r   c                 L   t          j        |           }|                                D ]}t          t	          |                    D ]}||         j        j        t          j        k    r||         j	        rt          |dz
  dd          D ]}||         j        j        t          j
        k    r&||         }|||         }|||dz   |dz   <   |||<    nP||         j        j        t          j        k    s ||         j        j        t          j        k    r||         j	        r n܌|S )a  
    From an existing execution schedule, generate a new schedule by overlapping
    computation and communication.

    Currently, the algorithm generates a new schedule for each actor as follows:
    For each accelerator read operation (i.e., recv), scan backwards to find the nearest
    compute node to swap with so that the accelerator read operation can be overlapped
    with computation.

    Collective operations are not yet supported.

    Args:
        actor_to_execution_schedule: A dictionary that maps an actor handle to
            the existing execution schedule for the actor. The schedule is a list
            is a list of operations to be executed.

    Returns:
        A dictionary that maps an actor handle to the overlapped execution schedule
        for the actor.
    r   )copydeepcopyrx   rangerN   r:   r%   r   r   r>   r   r   )r   r   r   r   jaccelerator_read_opprev_opss          r   '_generate_overlapped_execution_scheduler     sd   8 	122 !  <BBDD  s.//00 	 	A#A&059N9SSS'*? T q1ub"--  A+A.8=089 9 /B!.D+#6qs#;=E+AEAEM:1D+A.+A.8=056 6.q1;@067 7-a0E7
 7	8 ('r   c                 >    d |                                  D             S )zx
    Extract _DAGNodeOperation from _DAGOperationGraphNode in the schedule
    and discard unnecessary information.
    c                 .    i | ]\  }}|d  |D             S )c                     g | ]	}|j         
S r   )r:   )r   ri   s     r   
<listcomp>z:_extract_execution_schedule.<locals>.<dictcomp>.<listcomp>S  s    1114111r   r   )r   r   r   s      r   r   z/_extract_execution_schedule.<locals>.<dictcomp>R  s=       E5 	115111  r   )r   )r   s    r   _extract_execution_scheduler  I  s/     7==??   r   )ra   F)r   r=   )$r   rs   loggingcollectionsr   enumr   	functoolsr   typingr   r   r   r	   r
   ray	getLoggerr   r   r   r   r9   r7   r_   rg   r6   ro   rv   r   r   r   r   r   r   r   r  r   r   r   <module>r	     s      # # # # # #       $ $ $ $ $ $ 3 3 3 3 3 3 3 3 3 3 3 3 3 3 



		8	$	$8 8 8 8 8D 8 8 86)U )U )U )U )U )U )U )UX [5 [5 [5 [5 [5 [5 [5 [5B $	 %#  	   2
-T/1GGHHI
-
 
- 

- 
- 
- 
-(3T:P5QQR(T/1GGHHI( !( 
	( ( ( (VG3T:P5QQRGT/1GGHHIG d)*+G G G GTMcCCDM"d+A&B!CCM
 
#t)+AAB
BCM M M M`	C 	C 	C 	C'
 ''*'=@' ' ' '"AD!%&<!=="AD #+$d+A&BBC#	AD T/1GGHHIAD AD AD ADHI'T/1GGHHII'	
!4(>#?
?@I' I' I' I'X:(!%&<!==":( 

!4(>#?
?@	:( :( :( :(z!%&<!==" 

!4(9#:
:;	     r   