
    &`i4                         d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZmZ d dlmZ erd dlZ e j        e          Z G d d	e          ZdS )
    N)
ModuleType)TYPE_CHECKINGCallableListOptionalTuple)RayChannelError)AcceleratorContext)CommunicatorTorchTensorAllocator)ReduceOpc                      e Zd ZdZ	 d*dededee         ded         ded	         d
efdZ	deddfdZ
ded         fdZdej        j        defdZdee         fdZdefdZdddeddfdZee         fdee         dddeddfdZ	 	 	 	 	 	 d+d Z	 	 	 	 d,d!Zej        fddddd"efd#Zej        fddddd"efd$Zed%             Zed&             Zd-d'Zde fd(Z!e"de fd)            Z#dS ).
_NcclGroupz
    Represents an actor's NCCL communicator. This is the default NCCL communicator
    to be used in Compiled Graph if a custom communicator is not provided.

    This class is not thread-safe.
    F
world_sizecomm_idrankactor_handleszray.actor.ActorHandlecuda_streamztorch.cuda.Streamuse_communication_streamsc                    || _         || _        d| _        || _        || _        |t          j                    s
J d            |
J d            |                     t          j                    j	                  }||k    sJ d| d|             ddl
m} || _        | j                            |||          | _        nd| _        d| _        d| _        d| _        ||
J d            || _        |rpddl}	t#          j                                                    d         }
|	j                            |
	          | _        |	j                            |
	          | _        n| j        | _        | j        | _        d
| _        dS )a  
        Initialize a NCCL communicator that can be used to communicate p2p with
        other GPU actors.

        This method blocks until the same call has been made on all other
        actors in the group, with the same arguments for world_size and
        comm_id.

        NOTE: A concurrent NCCL group can coexist with this one but using the
        two groups concurrently on different CUDA streams may cause deadlock.
        See
        https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/usage/communicators.html
        #using-multiple-nccl-communicators-concurrently.

        If the user can guarantee that all involved actors execute the same ops
        in the same order, then the other NCCL group should use the given
        `cuda_stream`, and there will not be a concurrency issue. Otherwise,
        the other stream needs to synchronize with the given `cuda_stream`
        before and after it launches NCCL ops, e.g., at the beginning and end
        of a DAG task.

        Args:
            world_size: The number of participating actors/devices.
            comm_id: A unique communicator ID returned by
                cupy.cuda.nccl.get_unique_id().
            rank: The rank of this actor. If None, then the caller is not a
                participant of the NCCL group.
            actor_handles: A list of actor handles, in rank order.
            cuda_stream: A raw CUDA stream to dispatch NCCL ops to. If rank is
                specified, then this must be specified too.
            use_communication_streams: Whether to use dedicated send and recv
                streams for communication. If True, communication and computation
                can be overlapped to improve performance.
        NzNCCL actor has no GPUs assignedz#NCCL actor must specify cuda_streamzNCCL actor's rank z does not match expected rank r   )	nccl_utilzNCCL actor has no rank assigned)deviceF)_world_size_rankr   _actor_handles_use_communication_streamsrayget_gpu_idsget_rankget_runtime_contextcurrent_actor$ray.util.collective.collective_groupNcclCommunicator_comm_cuda_stream_send_stream_recv_streamtorchr
   getget_accelerator_devicescudaStream_closed)selfr   r   r   r   r   r   expected_rankr   r(   r   s              w/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/channel/nccl_group.py__init__z_NcclGroup.__init__   s   V &$(
/3+*C'?$$GG&GGG$**,Q*** MM#*A*C*C*QRRM%%%WDWWWW &%% GFFFFF&DN88WdSSDJJ DJ;?;?;?"##%F### +D( 
6 ,/11IIKKAN$)J$5$5V$5$D$D!$)J$5$5V$5$D$D!!$($5!$($5!    returnNc                     d S N )r.   r   s     r0   
initializez_NcclGroup.initializet   s    r2   c                     | j         S r5   )r   r.   s    r0   get_actor_handlesz_NcclGroup.get_actor_handlesx   s    ""r2   actorc                     d | j         D             }	 |                    |j                  }n# t          $ r t          d          w xY w|S )z
        Return the given actor's rank in the NCCL communicator.

        Args:
            actor: The actor handle to look up.
        c                     g | ]	}|j         
S r6   )_ray_actor_id).0as     r0   
<listcomp>z'_NcclGroup.get_rank.<locals>.<listcomp>   s    BBBQ_BBBr2   zActor is not in the NCCL group.)r   indexr>   
ValueError)r.   r;   	actor_idsr   s       r0   r   z_NcclGroup.get_rank{   sj     CBd.ABBB		@??5#677DD 	@ 	@ 	@>???	@s	   . Ac                     | j         S )z+
        Return this actor's rank.
        )r   r9   s    r0   get_self_rankz_NcclGroup.get_self_rank   s     zr2   c                     | j         S )zF
        Return the number of ranks in the NCCL communicator.
        )r   r9   s    r0   get_world_sizez_NcclGroup.get_world_size   s     r2   buftorch.Tensor	peer_rankc                 F   | j         rt          d          | j        r| j                                         | j                            | j                            |          |	                                | j        
                    |          || j        j                   dS )a  
        Send a torch.Tensor to a peer.

        This returns when the send kernel has been queued, but the kernel may
        not have completed. Therefore, the caller should ensure that there are
        no concurrent writes to the sent `buf` until the send has finished.
        That is, either all writes should be submitted on the current stream
        (self._cuda_stream) or, if on a different stream, that stream should
        synchronize with the current stream.

        Args:
            buf: The torch.Tensor to send. It should already be on this
                actor's default device.
            peer_rank: The rank of the actor to send to.
        NCCL group has been destroyed.N)r-   r	   r   r&   synchronizer$   sendr   get_tensor_ptrnumelget_nccl_tensor_dtyper   )r.   rI   rK   s      r0   rO   z_NcclGroup.send   s      < 	D!"BCCC* 	, ))+++ 	
N))#..IIKKN0055)	
 	
 	
 	
 	
r2   shapedtypeztorch.dtypec                    | j         rt          d          |
J d             |||          }| j        r| j                                         | j                            | j                            |          |	                                | j        
                    |          || j        j                   n| j                            | j                            |          |	                                | j        
                    |          || j        j                   | j                                         | j         rt          d          |S )a  
        Receive a torch.Tensor from a peer and synchronize the current stream.

        After this call returns, the receive buffer is safe to read from from
        any stream. An RayChannelError will be raised if an error occurred (e.g.,
        remote actor died), and the buffer is not safe to read.

        Args:
            buf: The torch.Tensor to receive into. This buffer is safe to read
            peer_rank: The rank of the actor to receive from.
        rM   Nz&NCCL group requires a tensor allocator)r-   r	   r   r'   rN   r$   recvr   rP   rQ   rR   r   r%   )r.   rS   rT   rK   	allocatorrI   s         r0   rV   z_NcclGroup.recv   sM   $ < 	D!"BCCC$$&N$$$iu%%* 	, ))+++JOO--c22		44S99!-    JOO--c22		44S99!-   ))+++< 	D!"BCCC
r2   send_bufrecv_buf	operationCallable[..., None]c                     | j         rt          d          |j        |j        k    s
J d             ||  | j                                         | j         rt          d          d S )NrM   zRay Compiled Graph derived the dtype of recv_buf from send_buf, so send_buf and recv_buf must have the same dtype. If you see this error, please file an issue at Ray repository.zNCCL group has been destroyed during allreduce operation. There may be a dtype mismatch between input tensors from different ranks.)r-   r	   rT   r%   rN   )r.   rX   rY   rZ   operation_argss        r0   _exec_collectivez_NcclGroup._exec_collective   s     < 	D!"BCCC~///M 0// 		>"" 	%%'''< 	!#  	 	r2   c                    | j                             |          | j                             |          |                                | j                             |          | j        j        g} | j        ||| j        j        g|R   d S r5   )	r   rP   rQ   rR   r%   r   r^   r$   	allGather)r.   rX   rY   r]   s       r0   	allgatherz_NcclGroup.allgather  s     N))(33N))(33NNN00::)
 	J 	
 		
 	
 	
 	
 	
 	
r2   opc                    | j                             |          | j                             |          |                                | j                             |          |j        | j        j        g} | j        ||| j        j	        g|R   d S r5   )
r   rP   rQ   rR   valuer%   r   r^   r$   	allReducer.   rX   rY   rb   r]   s        r0   	allreducez_NcclGroup.allreduce%  s     N))(33N))(33NNN00::H)
 	J 	
 		
 	
 	
 	
 	
 	
r2   c                    | j                             |          | j                             |          |                                | j                             |          |j        | j        j        g} | j        ||| j        j	        g|R   d S r5   )
r   rP   rQ   rR   rd   r%   r   r^   r$   reduceScatterrf   s        r0   reducescatterz_NcclGroup.reducescatter:  s     N))(33N))(33NNN00::H)
 	J$	
 		
 	
 	
 	
 	
 	
r2   c                 H    dd l }|j                            | j                  S Nr   )r(   r+   StreamContextr'   r.   r(   s     r0   recv_streamz_NcclGroup.recv_streamO  $    z''(9:::r2   c                 H    dd l }|j                            | j                  S rl   )r(   r+   rm   r&   rn   s     r0   send_streamz_NcclGroup.send_streamU  rp   r2   c                    | j         rdS d| _         | j        gt                              dt	          j                    j                    | j                                         | j                                         dS dS )z)
        Destroy the NCCL group.
        NTz!Destructing NCCL group on actor: )	r-   r$   loggerinfor   r    r!   abortdestroyr9   s    r0   rw   z_NcclGroup.destroy[  s     < 	F:!KK=*,,:= =   JJ      "!r2   c                     dS )Nacceleratorr6   r9   s    r0   get_transport_namez_NcclGroup.get_transport_nameo  s    }r2   c                 6    ddl m} |                                S )Nr   )nccl)	cupy.cudar|   get_unique_id)clsr|   s     r0   generate_communicator_idz#_NcclGroup.generate_communicator_idr  s&    """"""!!###r2   )F)rX   rJ   rY   rJ   rZ   r[   )rX   rJ   rY   rJ   )r3   N)$__name__
__module____qualname____doc__inttupler   r   boolr1   r7   r:   r   r;   ActorHandler   rF   rH   rO   r   r   rV   r^   ra   r   SUMrg   rj   propertyro   rr   rw   strrz   classmethodr   r6   r2   r0   r   r      s	         +0U UU U sm	U
 34U 12U $(U U U Uns t    #4(?#@ # # # #ci3     x}             #
 #
3 #
4 #
 #
 #
 #
T /07 7Sz7 7 	7 
7 7 7 7r  ! )	   >
 
 !
 
 
 
.  |	
 
 
 !
 	
 
 
 
2  |	
 
 
 !
 	
 
 
 
* ; ; X;
 ; ; X;
! ! ! !(C     $ $ $ $ [$ $ $r2   r   )loggingtypesr   typingr   r   r   r   r   r   ray.exceptionsr	   ,ray.experimental.channel.accelerator_contextr
   %ray.experimental.channel.communicatorr   r   ray.experimental.util.typesr   r(   	getLoggerr   rt   r   r6   r2   r0   <module>r      s          A A A A A A A A A A A A A A 



 * * * * * * K K K K K K T T T T T T T T 0 0 0 0 0 0 LLL 
	8	$	$a$ a$ a$ a$ a$ a$ a$ a$ a$ a$r2   