
    &`i8              
          d dl Z d dlmZmZmZ d dlZd dlmZmZ d dl	m
Z
mZmZmZ d dlmZmZ d dlmZmZmZmZmZ d dlmZ  e j        e          Z	 ddeed	         eed	                  f         d
edeeeef                  fdZ G d d          Z G d d          Z G d d          Z  e            Z! e            Z" e             Z#dS )    N)ListOptionalUnion)CollectiveOutputNode_CollectiveOperation)BIND_INDEX_KEYCOLLECTIVE_OPERATION_KEYIS_CLASS_METHOD_OUTPUT_KEYPARENT_CLASS_NODE_KEY)CommunicatorTorchTensorType)AllGatherOpAllReduceOpReduceOpReduceScatterOp_CollectiveOp)r   inputsray.dag.DAGNodeop	transportc                    t          | d         t                    r$t          |t                    st          d          t          | d         t                    s| g} |t          j        }t          | ||          }g }t          |t                    rd}nRt          |t                    rd|j         }n2t          |t                    rd|j         }nt          d|           t          t          | d                             D ]^fd| D             }|d                                         }|J t          |t          |          t                      t                      t           |t"          |j        t&          |i	          }|xj        d
z  c_        t          |          d
k    rg }	t          t          |                    D ]nt          d |ft                      t                      t"          |                                t*          dt           |i          }
|	                    |
           o|                    |	           I|                    |           `|S )a  
    Bind inputs (input nodes or lists of input nodes) with a collective operation.
    The collective operation is applied to each list of input nodes. The output nodes
    will have the same shape as the input nodes.

    Example of binding a list of input node:
    with InputNode() as inp:
        res_comp1 = [actor.comp1.bind(inp) for actor in actors]
        res_comp2 = [actor.comp2.bind(inp) for actor in actors]
        res_ar = allreduce.bind([res_comp1, res_comp2])

    Requirements:
    1. Each input node returns a torch tensor.
    2. Each input node within a list is from a different actor.
    3. If lists of input nodes are provided, the order of actors should
        be the same for each nested list.
    4. If a custom transport is specified, its actor set matches the actor
        set of the input nodes.
    5. If input nodes are provided, then all tensors have the same shape.
        If lists of input nodes are provided, then all tensors in each
        list have the same shape.

    Requirements 1-3 are checked in the `CollectiveGroup` constructor.
    Requirement 4 is not checked yet.

    Args:
        inputs: A list of DAG nodes or a list of lists of DAG nodes. Each leaf list
            should contain one object per actor.
        op: The collective operation.
        transport: GPU communicator for the collective operation. If not
            specified, the default ACCELERATOR is used.

    Returns:
        A list of collective output nodes or a list of lists of collective output nodes,
        with the same shape as the input nodes. Each output node has the same order and
        belongs to the same actor as the corresponding input node.
    r   zLCurrently binding a nested list of dag nodes is only supported for allreduceN	allgatherz
allreduce.zreducescatter.z)Expected a collective operation, but got c                 $    g | ]}||         S  r   ).0lis     z/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/collective/operations.py
<listcomp>z_bind.<locals>.<listcomp>[   s"    555A151Q4555    )method_namemethod_argsmethod_kwargsmethod_optionsother_args_to_resolve   return_idx_T)
isinstancelistr   
ValueErrorr   ACCELERATORr   r   reduceOpr   rangelen_get_actor_handler   tupledictr   r   _ray_dag_bind_indexr	   _get_bind_indexr
   append)r   r   r   collective_opcollective_output_nodesr!   input_node_listactor_handlecollective_output_nodeoutput_nodesoutput_noder   s              @r   _bindr<      s   T &)T"" 
:b++F+F 
Z
 
 	

 fQi&& #/	(Y??M:<"k"" K!	B	$	$ K02;00	B	(	( K4r{44IRIIJJJ3vay>>"" $C $C5555555:I;




 	 '''!5#o..&&66%| @(-#
"
 
"
 
"
 	((A-((!##79L3//00 1 12%!%%+Q/FFFF&(>(N(N(P(P2D-|
 
 ##K0000#**<8888#**+ABBBB""r    c                   n    e Zd ZdZ	 dded         deeeef                  dee	         fdZ
	 dd	efd
ZdS )AllGatherWrapperzWrapper for NCCL all-gather.Ninput_nodesr   r   returnc                 <    t          |t                      |          S N)r<   r   )selfr?   r   s      r   bindzAllGatherWrapper.bind   s    
 [+--;;;r    default
group_namec                 (    ddl m}  ||||          S )Nr   )r   )ray.util.collective.collectiver   )rC   tensor_listtensorrF   r   s        r   __call__zAllGatherWrapper.__call__   s*     	=<<<<<yfj999r    rB   )rE   )__name__
__module____qualname____doc__r   r   r   strr   r   rD   rK   r   r    r   r>   r>      s        &&
 9=< <+,< E#|"345< 
"	#	< < < < $	: : 	: : : : : :r    r>   c            
           e Zd ZdZej        dfded         dedeee	e
f                  dee         fdZd	ej        fd
e	defdZdS )AllReduceWrapperzWrapper for NCCL all-reduce.Nr?   r   r   r   r@   c                     t          |t                    st          d|           t          |t	          |          |          S NzUnexpected operation: )r,   )r(   r   r*   r<   r   rC   r?   r   r   s       r   rD   zAllReduceWrapper.bind   sJ     "h'' 	<:b::;;;[+r":":":IFFFr    rE   rF   c                 (    ddl m}  ||||          S )Nr   )	allreduce)rH   rW   )rC   rJ   rF   r   rW   s        r   rK   zAllReduceWrapper.__call__   s*     	=<<<<<yR000r    rL   rM   rN   rO   r   SUMr   r   r   rP   r   r   rD   RayReduceOprK   r   r    r   rR   rR      s        &&
  |8<		G 	G+,	G 	G E#|"345		G
 
"	#	G 	G 	G 	G $%/	1 1 1 	1 1 1 1 1 1r    rR   c            
           e Zd ZdZej        dfded         dedeee	e
f                  dee         fdZd	ej        fd
e	defdZdS )ReduceScatterWrapperz Wrapper for NCCL reduce-scatter.Nr?   r   r   r   r@   c                     t          |t                    st          d|           t          |t	          |          |          S rT   )r(   r   r*   r<   r   rU   s       r   rD   zReduceScatterWrapper.bind   sJ     "h'' 	<:b::;;;[/2">">">	JJJr    rE   rF   c                 (    ddl m}  ||||          S )Nr   )reducescatter)rH   r_   )rC   rJ   rF   r   r_   s        r   rK   zReduceScatterWrapper.__call__   s+     	A@@@@@}VZ444r    rX   r   r    r   r\   r\      s        **
  |8<		K 	K+,	K 	K E#|"345		K
 
"	#	K 	K 	K 	K $%/	5 5 5 	5 5 5 5 5 5r    r\   rB   )$loggingtypingr   r   r   rayray.dag.collective_noder   r   ray.dag.constantsr   r	   r
   r   *ray.experimental.channel.torch_tensor_typer   r   ray.experimental.util.typesr   r   r   r   r   ray.util.collective.typesrZ   	getLoggerrL   loggerrP   r<   r>   rR   r\   r   rW   r_   r   r    r   <module>rj      s#    ( ( ( ( ( ( ( ( ( ( 



 N N N N N N N N            U T T T T T T T              > = = = = =		8	$	$ 59f# f#$()45F0G+HHIf#f# c</01f# f# f# f#R: : : : : : : :*1 1 1 1 1 1 1 125 5 5 5 5 5 5 52 		$$&&r    