
    &`i                         d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	d dl
mZmZmZ erd dlZ e	j        d            G d d                      Z G d d	e          ZdS )
    N)defaultdict)TYPE_CHECKINGDictListOptionalTuple)CommunicatorReduceOpTorchTensorAllocator)num_cpusc                   R    e Zd ZdZdefdZdedddefdZded	ed         d
dfdZ	dS )CPUCommBarrierz
    Barrier actor that blocks the given number of actors until all actors have
    reached the Barrier.

    p2p operations are not done here (completed via shared memory channel).
    
num_actorsc                     || _         t          j                    | _        t	          t
                    | _        i | _        t	          t                    | _	        t	          t                    | _
        d S N)r   asyncio	Condition	conditionr   listcollective_datacollective_data_shapeintnum_actors_seennum_actors_read)selfr   s     }/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/channel/cpu_communicator.py__init__zCPUCommBarrier.__init__   sY    $ *,,@KD@Q@QEG"*3//  +3//    op_iddatatorch.Tensoropc                 |   K    j         4 d{V   j                                     |            j        xx         dz  cc<    j                  j        k    rE                     | j                           }| j        <    j                                          n$ j                              fd           d{V   j                 } j        xx         dz  cc<    j                  j        k    r j        =  j        =  j        = |cddd          d{V  S # 1 d{V swxY w Y   dS )z
        Wait at the communicator until all actors have sent `op_id` and `data`.
        Once data from all actors is received, execute the collective `op`
        on the communicator actor and return the result.
        N   c                  0    j                   j        k    S r   )r   r   )r   r   s   r   <lambda>z0CPUCommBarrier.wait_collective.<locals>.<lambda>8   s    D074?J r   )	r   r   appendr   r   	_apply_op
notify_allwait_forr   )r   r   r    r"   s   ``  r   wait_collectivezCPUCommBarrier.wait_collective'   s      > 	 	 	 	 	 	 	 	 '..t444 '''1,'''#E*do==~~b$*>u*EFF.2$U+))++++n--JJJJJ         '.D '''1,'''#E*do==(/(/(/-	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   DD++
D58D5tensorsreturnc                    |d                                          }|t          j        k    r|dd         D ]}||z  }n|t          j        k    r|dd         D ]}||z  }n|t          j        k    r#|dd         D ]}t          j        ||          }nv|t          j        k    r#|dd         D ]}t          j        ||          }nC|t          j	        k    r t          |          t          |          z  }nt          d| d          |S )zAApply the specified reduction operation across a list of tensors.r   r$   Nz
Operation z not supported)cloner
   SUMPRODUCTMAXtorchmaxMINminAVGsumlen
ValueError)r   r"   r,   resulttensors        r   r(   zCPUCommBarrier._apply_opE   s@    !!##!!""+ ! !& !8###!!""+ ! !& !8<!!""+ 3 3662238<!!""+ 3 3662238<\\CLL0FF<"<<<===r   N)
__name__
__module____qualname____doc__r   r   r
   r+   r   r(    r   r   r   r      s         03 0 0 0 03 n (    <H tN/C       r   r   c            
       r   e Zd ZdZdeded         fdZdddefd	Z	 d#dee         dddede	e
         fdZ	 	 	 	 d$dZej        fdddddefdZej        fdddddefdZd%dZdedd
fdZded         fdZdej        j        defdZde	e         fdZdefdZdefdZd  Zd! Zedefd"            Zd
S )&CPUCommunicatorzX
    Uses a CPU-based communicator actor instead of an accelerator group like NCCL.
    
world_sizeactor_handleszray.actor.ActorHandlec                     || _         || _        t          t                    | _        t                      | _        d| _        dS )z]We use the op index to synchronize the sender and receiver at the
        communicator actor.N)_world_size_actor_handlesr   r   num_opssetbarriers_rank)r   rD   rE   s      r   r   zCPUCommunicator.__init__a   s;     &+"3'' 


r   r<   r!   	peer_rankc                     d S r   rA   )r   r<   rM   s      r   sendzCPUCommunicator.sendm   s	     	r   Nshapedtypeztorch.dtype	allocatorc                     d S r   rA   )r   rP   rQ   rM   rR   s        r   recvzCPUCommunicator.recvr   s	     	r   send_bufrecv_bufc                     t           r   NotImplementedError)r   rU   rV   s      r   	allgatherzCPUCommunicator.allgather|   s
    
 "!r   r"   c           	      &     fd                                  D             }dd                    t          t          t	          |                              z   }t
                              |d                               j                  } j	        
                    |           t          j        |j                             j        |         ||                    }|
J d            |d d          |d d <    j        |xx         dz  cc<   d S )Nc                 :    g | ]}                     |          S rA   )get_rank).0actor_handler   s     r   
<listcomp>z-CPUCommunicator.allreduce.<locals>.<listcomp>   s2     
 
 
,8DMM,''
 
 
r   zbarrier-collective--T)nameget_if_existsz-Receiving buffer required for CPUCommunicatorr$   )get_actor_handlesjoinmapstrsortedr   optionsremoterG   rK   addraygetr+   rI   )r   rU   rV   r"   	all_ranksbarrier_keybarrierr;   s   `       r   	allreducezCPUCommunicator.allreduce   s(   
 
 
 
<@<R<R<T<T
 
 
	 ,chhs3y@Q@Q7R7R.S.SS ((k(NNUU
 
 	'"""#**4<+DhPRSS
 
 ##%T###QQQi[!!!Q&!!!!!r   c                     t           r   rX   )r   rU   rV   r"   s       r   reducescatterzCPUCommunicator.reducescatter   s
     "!r   r-   c                 B    | j         D ]}t          j        |           d S r   )rK   rl   kill)r   rp   s     r   destroyzCPUCommunicator.destroy   s0    } 	 	GHW	 	r   rankc                     || _         d S r   rL   )r   rw   s     r   
initializezCPUCommunicator.initialize   s    


r   c                     | j         S r   )rH   r   s    r   rd   z!CPUCommunicator.get_actor_handles   s    ""r   actorc                     d | j         D             }	 |                    |j                  }n# t          $ r t          d          w xY w|S )z
        Return the given actor's rank in the CPU communicator.

        Args:
            actor: The actor handle to look up.
        c                     g | ]	}|j         
S rA   )_ray_actor_id)r^   as     r   r`   z,CPUCommunicator.get_rank.<locals>.<listcomp>   s    BBBQ_BBBr   z*Actor is not in the CPUCommunicator group.)rH   indexr   r:   )r   r}   	actor_idsrw   s       r   r]   zCPUCommunicator.get_rank   sj     CBd.ABBB		K??5#677DD 	K 	K 	KIJJJ	Ks	   . Ac                     | j         S r   ry   r|   s    r   get_self_rankzCPUCommunicator.get_self_rank   s
    zr   c                     | j         S )zE
        Return the number of ranks in the CPU communicator.
        )rG   r|   s    r   get_world_sizezCPUCommunicator.get_world_size   s     r   c                     dS )NcpurA   r|   s    r   get_transport_namez"CPUCommunicator.get_transport_name   s    ur   c                     t           r   rX   r|   s    r   recv_streamzCPUCommunicator.recv_stream       !!r   c                     t           r   rX   r|   s    r   send_streamzCPUCommunicator.send_stream   r   r   c                 L    dd l }t          |                                          S )Nr   )uuidrg   uuid4)clsr   s     r   generate_communicator_idz(CPUCommunicator.generate_communicator_id   s!    4::<<   r   r   )rU   r!   rV   r!   )r-   N) r=   r>   r?   r@   r   r   r   rO   r   r   r   rT   rZ   r
   r0   rq   rs   rv   rz   rd   rl   r}   ActorHandler]   r   r   rg   r   r   r   classmethodr   rA   r   r   rC   rC   \   sl        
3 
t<S7T 
 
 
 
> c     59 Sz  	
 01   " " !" " " "  |	' ' ' !' 	' ' ' '4  |	" " " !" 	" " " "   s t    #4(?#@ # # # #ci3     x}             C    " " "" " " ! ! ! ! [! ! !r   rC   )r   collectionsr   typingr   r   r   r   r   rl   %ray.experimental.channel.communicatorr	   r
   r   r3   rj   r   rC   rA   r   r   <module>r      s#    # # # # # # = = = = = = = = = = = = = = 



           LLL QH H H H H H H HVs! s! s! s! s!l s! s! s! s! s!r   