
    &`ie.                         d dl mZmZmZmZmZmZmZ erd dlZd dl	Z	d dl
mZmZ d dlmZmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ  G d	 d
          Ze G d de                      ZdS )    )TYPE_CHECKINGAnyDictListOptionalTupleUnionN)ClassMethodNodeDAGNode)COLLECTIVE_OPERATION_KEYIS_CLASS_METHOD_OUTPUT_KEY)ChannelContext)CommunicatorTorchTensorType)AllGatherOpAllReduceOpReduceScatterOp_CollectiveOp)DeveloperAPIc            
           e Zd ZdZ	 ddeee                  dedeee	e
f                  fdZde	fdZeded	         fd
            Zedefd            Zde
fdZdddeded         f         fdZdS )_CollectiveOperationat  
    Represent metadata for a collective communicator collective operation.

    Args:
        inputs: A list of lists of DAGNode. Each nested list inside
            of inputs should contain exactly one object per actor.
            If multiple nested lists are provided, then the order of
            actors should be the same for each nested list.
        op: The collective operation to perform.
        transport: The transport to use for the collective operation.

    Requirements:
    1. Input nodes are unique.
    2. Actor handles are unique.
    3. Actor handles match the custom communicator group if specified.
    Ninputsop	transportc                    g | _         t          |          D ]\  }t                    dk    r-t          |          dk    rd| nd}t          d| d          t	          d D                       s0t          |          dk    rd| nd}t          d	| d
 d          t          t                              t                    k    r=fdD             }t          |          dk    rd| nd}t          d| d|           g D ]Y}|                                }|,t          |          dk    rd| nd}t          d|                               |           Zt          t                              t                    k    r=fdD             }	t          |          dk    rd| nd}t          d| d|	           |dk    r}
t          |d                   t          ||                   k    r?t          dt          |d                    d| dt          ||                    d          t          |
          t                    k    r3t          dt          |
           d| dt                     d          t          t          |
                    D ]-\  }\  }}||k    rt          d| d| d| d| d| d          .ԉ| _         || _	        |t          j        }t          |d          | _        t          |t                    rFt          |                                          t          | j                   k    rt          d          d S d S )Nr      z
 at index  zExpected non-empty input list.c              3   @   K   | ]}t          |t                    V  d S N)
isinstancer   ).0nodes     k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/dag/collective_node.py	<genexpr>z0_CollectiveOperation.__init__.<locals>.<genexpr>9   s,      IITz$00IIIIII    z at list at index z&Expected all input nodes to be DAGNodez
, but got c                 F    g | ]}                     |          d k    |S r   )count)r"   
input_nodeinput_nodess     r$   
<listcomp>z1_CollectiveOperation.__init__.<locals>.<listcomp>D   s<       ""((44q88 888r&   zExpected unique input nodesz, but found duplicates: z,Expected an actor handle from the input nodec                 j    g | ]/}                     |                                          d k    -|0S r(   )r)   _get_actor_handle)r"   r*   current_actor_handless     r$   r,   z1_CollectiveOperation.__init__.<locals>.<listcomp>_   sI     ' ' '",22:3O3O3Q3QRRUVVV VVVr&   zExpected unique actor handlesz6, but found duplicate actor handles from input nodes: zVExpected all input lists to have the same number of nodes. List at index 0 has length z, but list at index z has length z[Expected all input lists to have the same set of actor handles. List at index 0 has actors z has actors z\Expected all input lists to have the same order of actor handles. List at index 0 has actor z at position z has actor T)r   _direct_returnz=Expected actor handles to match the custom communicator group)_actor_handles	enumeratelen
ValueErrorallsetr.   appendzip_opr   ACCELERATOR
_type_hintr!   r   get_actor_handles)selfr   r   r   inested_list_error_msg
duplicatesr*   actor_handleinvalid_input_nodesfirst_actor_handlesjfirstcurrentr/   r+   s                 @@r$   __init__z_CollectiveOperation.__init__)   s'    >@'// Y	 Y	NA{;1$$<?KK!OO(8Q(8(8(8QS% L4ILLL  
 II[IIIII 03Fa,,,,R & !.=R . .*. . .   3{##$$K(8(888   &1  
 14Fa,,,,R & !$2G $ $!$ $  
 %'!) 	; 	;
);;=='47KK!OO0Q000 * %^G\^^   &,,\:::: 3,--..#6K2L2LLL' ' ' '&1' ' '# 14Fa,,,,R & !-4I - -*- -   Avv&;# 6!9~~VAY// >25fQi..> >> >,/q	NN> > >   &''3/D+E+EEE J256I2J2JJ JJ J,/0E,F,FJ J J   (1')>??( (  ##E7 G##$J5:J JIJJ J!"J J/6J JEFJ J J   $ 4'3I)IdSSSi.. 	9..0011S9L5M5MMM S  	 	MMr&   returnc                 8    d| j          d| j         d| j         dS )Nz#CollectiveOperation(_actor_handles=z, _op=z, _type_hint=))r1   r9   r;   r=   s    r$   __str__z_CollectiveOperation.__str__   sA    -"1- -8- - /- - -	
r&   zray.actor.ActorHandlec                     | j         S r    )r1   rK   s    r$   actor_handlesz"_CollectiveOperation.actor_handles       ""r&   c                     | j         S r    )r;   rK   s    r$   	type_hintz_CollectiveOperation.type_hint   s
    r&   c                     | j         j        +t          j                    }|j        | j         j                 }nB| j                                         | j                                         }nt          d          |S )NzExpected a communicator group)r;   communicator_idr   get_currentcommunicatorsget_custom_communicatorr4   )r=   ctxcommunicators      r$   get_communicatorz%_CollectiveOperation.get_communicator   sm    ?*6 ,..C,T_-LMLL_4466B?BBDDLL<===r&   send_buftorch.Tensor)r[   .c                 F   ddl t          fdD                       st          d          |                                 }t	          | j        t                    rt                    dk    sJ d         }t          | j                  } j	        |j
        d         |z  g|j
        dd         R |j        |j                  }|                    ||           nt	          | j        t                    rt                    dk    r;d         } j        |          }|                    ||| j        j                   nUt          fdD                       st          dd	 D                        d
 }j        j                                      }|                    ||| j        j                    ||          }nt	          | j        t*                    rt                    dk    sJ d         }t          | j                  }|j
        d         |z  dk    rt          d|            j	        |j
        d         |z  g|j
        dd         R |j        |j                  }|                    ||| j        j                   |S )a,  
        Call the collective operation on the input tensor(s). Output tensor(s) are
        allocated and returned.

        Args:
            *send_buf: A variable number of torch tensors to send to the collective
                operation. The tensors have the same order as the input nodes.

        Returns:
            A torch tensor or a tuple of torch tensors containing the results of the
            collective operation. The output tensors have the same length and order
            as the input node list of the actor of this operation.
        r   Nc              3   B   K   | ]}t          |j                  V  d S r    )r!   Tensor)r"   ttorchs     r$   r%   z/_CollectiveOperation.execute.<locals>.<genexpr>   s/      AA1:a..AAAAAAr&   z+Expected a torch tensor for each input noder   )dtypedevicec              3   D   K   | ]}|j         d          j         k    V  dS )r   Nra   )r"   r_   rZ   s     r$   r%   z/_CollectiveOperation.execute.<locals>.<genexpr>   s1      JJA17hqk&77JJJJJJr&   z;Expected all input tensors to have the same dtype, but got c                     g | ]	}|j         
S  rd   )r"   r_   s     r$   r,   z0_CollectiveOperation.execute.<locals>.<listcomp>   s    #>#>#>AG#>#>#>r&   c                     g }d}|D ]U}|                                 }| |||z                                |j                  }|                    |           ||z  }Vt	          |          S )Nr   )numelviewshaper7   tuple)flat_bufbufsviewsoffsetr_   rh   s         r$   unflatten_fromz4_CollectiveOperation.execute.<locals>.unflatten_from   sq    EF! ( ( !		$Vfun%<=BB17KKQ% <<'r&   zSExpected the first dimension of the input tensor to be divisible by the world size )r`   r5   r4   rY   r!   r9   r   r3   r1   emptyrj   ra   rb   	allgatherr   
empty_like	allreducereduceOpnnutilsparameters_to_vectorr   reducescatter)	r=   rZ   rX   r_   
world_sizerecv_bufrp   rl   r`   s	    `      @r$   executez_CollectiveOperation.execute   s     	AAAAAAAAA 	LJKKK,,..dh,, 1	Gx==A%%%%AT011J"u{j(717122;77gx  H
 ""1h////+.. '	G8}}!!QK+5+A..&&q(DH4EFFFFJJJJJJJJJ $A#>#>X#>#>#>A A  
( ( ( !8>>>xHH&&x48;LMMM)>(H==/22 	Gx==A%%%%AT011JwqzJ&!++ 6)36 6   #u{z)8AGABBK88gx  H
 &&q(DH4EFFFr&   r    )__name__
__module____qualname____doc__r   r   r   r   r	   strr   rG   rL   propertyrN   r   rQ   rY   r   r|   rf   r&   r$   r   r      s?        * 9=	k kT']#k k E#|"345	k k k kZ
 
 
 
 
 #t$;< # # # X# ?    X,    H'H	~u%899	:H H H H H Hr&   r   c                        e Zd ZdZdedeef         deeef         deeef         deeef         f
 fdZ	de
e         d	eeef         d
eeef         deeef         fdZd Zedefd            Z xZS )CollectiveOutputNodezORepresent an output node from a communicator collective operation in a Ray DAG.method_namemethod_argsmethod_kwargsmethod_optionsother_args_to_resolvec                    || _         |                    t          d           | _        |                    t          d          | _        | j        | j        st          d          t                                          |||||           d S )NFzExpected a collective operation)	_inputsgetr   _collective_opr   _is_class_method_outputr4   superrG   )r=   r   r   r   r   r   	__class__s         r$   rG   zCollectiveOutputNode.__init__   s     #4I4M4M$d5
 5
 .C-F-F&.
 .
$ &t/K&>???!	
 	
 	
 	
 	
r&   new_args
new_kwargsnew_optionsnew_other_args_to_resolvec                 4    t          | j        ||||          S )N)r   )r   _method_name)r=   r   r   r   r   s        r$   
_copy_implzCollectiveOutputNode._copy_impl  s-     $";
 
 
 	
r&   c                      t          d          )NzFCollectiveOutputNode is only supported with dag.experimental_compile())NotImplementedError)r=   argskwargss      r$   _execute_implz"CollectiveOutputNode._execute_impl,  s    !T
 
 	
r&   rH   c                     | j         S r    )r   rK   s    r$   collective_opz"CollectiveOutputNode.collective_op1  rO   r&   )r}   r~   r   r   r   r   r   r   r   rG   r   r   r   r   r   r   __classcell__)r   s   @r$   r   r      s"       YY

 H

 CH~
 S#X
  $CH~
 
 
 
 
 
<
s)
 cN
 #s(^	

 $(S>
 
 
 

 
 

 #3 # # # X# # # # #r&   r   )typingr   r   r   r   r   r   r	   r`   rayray.dagr
   r   ray.dag.constantsr   r   ray.experimental.channelr   *ray.experimental.channel.torch_tensor_typer   r   ray.experimental.util.typesr   r   r   r   ray.util.annotationsr   r   r   rf   r&   r$   <module>r      s   I I I I I I I I I I I I I I I I I I LLL 



        S R R R R R R R 3 3 3 3 3 3 T T T T T T T T            . - - - - -a a a a a a a aH 7# 7# 7# 7# 7#? 7# 7# 7# 7# 7#r&   