
    &`i              
       |   d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl
Z
d dlmZ d dlmZmZmZ  G d de          Z G d d          Ze
j         G d	 d
                      Zdededee
j        j                 dee         ddf
dZdeddfdZdddeeed         ee         f                  ddfdZdddefdZdS )    N)Dict	FrozenSetListOptionalSetTupleType)ChannelContext)CommunicatorReduceOpTorchTensorAllocatorc                      e Zd ZdZdeej        j                 fdZde	ddfdZ
dej        j        de	fd	Zde	fd
Zdee	         fdZded         fdZddde	ddfdZ	 d"dee	         ddde	dee         ddf
dZ	 	 	 	 	 	 d#dZej        fdddddeddfdZej        fdddddeddfdZed             Zed             Zd$dZdefd Zedefd!            Z dS )%AbstractNcclGroupz)
    A dummy NCCL group for testing.
    actor_handlesc                 "    || _         d | _        d S N)_actor_handles_rank)selfr   s     x/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/collective/conftest.py__init__zAbstractNcclGroup.__init__   s    +


    rankreturnNc                     || _         d S r   r   )r   r   s     r   
initializezAbstractNcclGroup.initialize   s    


r   actorc                 6    | j                             |          S r   )r   index)r   r   s     r   get_rankzAbstractNcclGroup.get_rank   s    "((///r   c                 *    t          | j                  S r   )lenr   r   s    r   get_world_sizez AbstractNcclGroup.get_world_size   s    4&'''r   c                     | j         S r   r   r$   s    r   get_self_rankzAbstractNcclGroup.get_self_rank!   s
    zr   ray.actor.ActorHandlec                     | j         S r   )r   r$   s    r   get_actor_handlesz#AbstractNcclGroup.get_actor_handles$   s    ""r   valuetorch.Tensor	peer_rankc                     t           r   NotImplementedError)r   r+   r-   s      r   sendzAbstractNcclGroup.send'   s    !!r   shapedtypeztorch.dtype	allocatorc                     t           r   r/   )r   r2   r3   r-   r4   s        r   recvzAbstractNcclGroup.recv*   s
     "!r   send_bufrecv_bufc                     t           r   r/   )r   r7   r8   s      r   	allgatherzAbstractNcclGroup.allgather3   s
    
 "!r   opc                     t           r   r/   r   r7   r8   r;   s       r   	allreducezAbstractNcclGroup.allreduce:   
     "!r   c                     t           r   r/   r=   s       r   reducescatterzAbstractNcclGroup.reducescatterB   r?   r   c                     d S r    r$   s    r   recv_streamzAbstractNcclGroup.recv_streamJ       tr   c                     d S r   rC   r$   s    r   send_streamzAbstractNcclGroup.send_streamN   rE   r   c                     d S r   rC   r$   s    r   destroyzAbstractNcclGroup.destroyR   s    r   c                     dS )NacceleratorrC   r$   s    r   get_transport_namez$AbstractNcclGroup.get_transport_nameU   s    }r   c                     d S r   rC   )clss    r   generate_communicator_idz*AbstractNcclGroup.generate_communicator_idX   s    r   r   )r7   r,   r8   r,   r   N)r   N)!__name__
__module____qualname____doc__r   rayr   ActorHandler   intr   r!   r%   r   r'   r*   r1   r   r   r6   r:   r   SUMr>   rA   propertyrD   rG   rI   strrL   classmethodrO   rC   r   r   r   r      s        d39+@&A    s t    0ci3 0 0 0 0 0( ( ( ( (x}    #4(?#@ # # # #". "S "T " " " " 59" "Sz" " 	"
 01" 
" " " "" " !" 
	" " " "  |	" " " !" 	"
 
" " " "  |	" " " !" 	"
 
" " " "   X   X   C         [  r   r   c                       e Zd Zd Z	 	 	 	 dded         dee         dedee         d	ee	e                  d
efdZ
ded
dfdZdee         d
dfdZdS )MockNcclGroupSetc                     i | _         d S r   )ids_to_actors_and_custom_commsr$   s    r   r   zMockNcclGroupSet.__init__^   s    
  	+++r   NFactorsr(   custom_nccl_groupuse_communication_streamsaccelerator_module_nameaccelerator_communicator_clsr   c                   	 t          t          j                              	t                    f| j        	<   *t          t          t                                        }nfdD             }	fdt          |          D             }t          j
        |d           t          j                    }|j        	<   nt                    |j        	<   	S )Nc                 :    g | ]}                     |          S rC   )r!   ).0r   r`   s     r   
<listcomp>z-MockNcclGroupSet.__call__.<locals>.<listcomp>v   s(    KKK5&//66KKKr   c           	      \    g | ](\  }}|j                             t          |          )S rC   )__ray_call__remotemock_do_init_nccl_group)rf   r   r   r_   r`   group_ids      r   rg   z-MockNcclGroupSet.__call__.<locals>.<listcomp>w   sQ     	
 	
 	
 e %%'! 	
 	
 	
r      timeout)rY   uuiduuid4	frozensetr^   listranger#   ziprT   getr
   get_currentcommunicatorsr   )
r   r_   r`   ra   rb   rc   ranks
init_tasksctxrl   s
    ``      @r   __call__zMockNcclGroupSet.__call__e   s    tz||$$f9
+H5
 $s6{{++,,EEKKKKFKKKE	
 	
 	
 	
 	
 	
  #5&11	
 	
 	

 	
B''''(**(*;Ch''*;F*C*CCh'r   rl   c                    t          j                    }|j        vrd S | j                 \  }}fd|D             }t	          j        |d           | j        v r| j        = |j                                                  |j        = d S )Nc                 P    g | ]"}|j                             t                    #S rC   )ri   rj   mock_do_destroy_nccl_group)rf   r   rl   s     r   rg   z<MockNcclGroupSet.mock_destroy_nccl_group.<locals>.<listcomp>   sD     
 
 

 	 %%* 
 
 
r   rm   rn   )r
   rw   rx   r^   rT   waitrI   )r   rl   r{   r_   _destroy_taskss    `    r   mock_destroy_nccl_groupz(MockNcclGroupSet.mock_destroy_nccl_group   s    (**3,,,F7A	
 
 
 

  
 
 
 	++++t:::3H=(#++---h'''r   nccl_group_idsc                 b    t          j                    }|D ]}|| j        vsJ ||j        vsJ d S r   )r
   rw   r^   rx   )r   r   r{   nccl_group_ids       r   check_teardownzMockNcclGroupSet.check_teardown   sT    (**+ 	: 	:M (KKKKK (999999	: 	:r   )NFNN)rP   rQ   rR   r   r   r   r   boolrY   r	   r|   r   r   rC   r   r   r\   r\   ]   s           59*/15EI$ $,-$ $L1$ $(	$
 "*#$ '/tL/A&B$ 
$ $ $ $L( ( ( ( ( ((:T#Y :4 : : : : : :r   r\   c                       e Zd Zd Z	 ddedeej                 dej        fdZ	dej        de
eef         fdZde
ej        d	f         fd
ZdS )CPUTorchTensorWorkerc                     d| _         d S )Ncpu)devicer$   s    r   r   zCPUTorchTensorWorker.__init__   s    r   Nsizer3   r   c                 :    t          j        ||| j                  S )N)r3   r   )torchonesr   )r   r   r3   s      r   return_tensorz"CPUTorchTensorWorker.return_tensor   s     z$eDK@@@@r   tensorc                 D    |j         | j         k    sJ |j        |d         fS )Nr   )r   r2   )r   r   s     r   r6   zCPUTorchTensorWorker.recv   s(    }++++|VAY&&r   .c                      t          |          S r   )tuple)r   tensorss     r   recv_tensorsz!CPUTorchTensorWorker.recv_tensors   s    W~~r   r   )rP   rQ   rR   r   rV   r   r   r3   Tensorr   r   r6   r   rC   r   r   r   r      s           9=A AA ( 5A	A A A A
'5< 'E#s(O ' ' ' 'elC.?(@      r   r   rl   r   r_   r`   r   c                     t          j                    }|0t          |          }|                    |           ||j        |<   d S |                    |           ||j        |<   d S r   )r
   rw   r   r   rx   )r   rl   r   r_   r`   r{   
nccl_groups          r   rk   rk      ss     
$
&
&C &v..
d###&0(###$$T***&7(###r   c                     t          j                    }||j        vrd S |j        |                                          |j        |= d S r   )r
   rw   rx   rI   )r   rl   r{   s      r   r   r      sN    

$
&
&Cs(((h'')))(###r   dagzray.dag.DAGNodeactors_and_custom_commsr(   zray.dag.CompiledDAGc                     t                      }|                     d|           |                                }t          |j                                                  |k    sJ ||fS )Nz,ray.dag.compiled_dag_node._init_communicator)r\   setattrexperimental_compilesetr^   values)monkeypatchr   r   mock_nccl_group_setcompiled_dags        r   check_nccl_group_initr      s     +,,6  
 ++--L>EEGGHH"	# 	# 	# 	# ,,,r   r   r   c                     |                      d|j                   |j                                        }|                                 |                    |           d S )Nz/ray.dag.compiled_dag_node._destroy_communicator)r   r   "_actors_to_created_communicator_idr   teardownr   )r   r   r   created_communicator_idss       r   check_nccl_group_teardownr      si    
 93  
  ,NUUWW&&'?@@@@@r   )rp   typingr   r   r   r   r   r   r	   r   rT   ray.experimental.channel.commonr
   %ray.experimental.channel.communicatorr   r   r   r   r\   rj   r   rY   rV   r   rU   rk   r   r   r   rC   r   r   <module>r      s.    D D D D D D D D D D D D D D D D D D  



 : : : : : :         K K K K K K K K\F: F: F: F: F: F: F: F:R        "88 8 &'	8
  -8 
8 8 8 8"$s $t $ $ $ $-	- !i/0(<2HHI- - - - -,A'A *A A A A A Ar   