
    &`i                     B   d dl Z d dlmZ d dlmZmZ d dlmZ d dlZd dl	Z	d dl
Z	d dlmc mZ d dlmZ d dlmZ d dlmZ  e	j        d            G d	 d
                      Z G d d          Z G d dej                  Zd Z G d dej        j                  ZdS )    N)defaultdict)OptionalTuple)mock)
nccl_group)TorchTensorAllocator)Device)num_cpusc                   (    e Zd ZdZddZddefdZdS )	Barrierz
    Barrier that blocks the given number of actors until all actors have
    reached the barrier. This is used to mock out blocking NCCL ops.
       c                    || _         t          j                    | _        i | _        t          t                    | _        t          j	        dt          j
        t          j                  }|                                 d S )NAray.experimental.channel.torch_tensor_type.TorchTensorType.devicenew_callablereturn_value)
num_actorsasyncio	Condition	conditiondatar   intnum_actors_seenr   patchPropertyMockr	   CPUstart)selfr   device_property_patchers      u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/channel/conftest.py__init__zBarrier.__init__   ss    $ *,, 	*3// #'*O*#
 #
 #

 	 %%'''''    Nidxc                    K    j         4 d{V  |) j        vsJ  j         j        f            | j        <    j        xx         dz  cc<    j                  j        k    r j                                          n$ j                              fd           d{V  | j                 }	 ddd          d{V  n# 1 d{V swxY w Y   |S )z
        Wait at barrier until all actors have sent `idx`. One actor should
        provide `data`, and this value will be returned by this method for all
        other actors.
        N   c                  0    j                   j        k    S N)r   r   )r#   r   s   r    <lambda>zBarrier.wait.<locals>.<lambda>9   s    D05H r"   )r   r   r   r   
notify_allwait_for)r   r#   r   s   `` r    waitzBarrier.wait(   s      > 	& 	& 	& 	& 	& 	& 	& 	&$)+++di9M-N+++!%	# %%%*%%%#C(DO;;))++++n--HHHHH         |y~%	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	&" s   B$C		
CC)r   r'   )__name__
__module____qualname____doc__r!   r   r+    r"   r    r   r      sR         
( ( ( (" c      r"   r   c                       e Zd Zd Zd ZdS )MockCudaStreamc                     d| _         d S )Nr   )cuda_streamr   s    r    r!   zMockCudaStream.__init__C   s    r"   c                     d S r'   r0   r5   s    r    synchronizezMockCudaStream.synchronizeF   s    r"   N)r,   r-   r.   r!   r7   r0   r"   r    r2   r2   B   s2              r"   r2   c            
            e Zd ZdZ fdZdej        defdZ	 dde	e         dej
        ded	ee         fd
ZddZ xZS )MockNcclGroupzl
    Mock the internal _NcclGroup to use a barrier actor instead of a NCCL group
    for communication.
    c                      t                      j        |i | t          t                    | _        t                      | _        d S r'   )superr!   r   r   num_opssetbarriersr   argskwargs	__class__s      r    r!   zMockNcclGroup.__init__P   s?    $)&))) #3''r"   tensor	peer_rankc                 l   t          |                                 |g          }d|d          d|d          }t          j        |          }| j                            |           t          j        |j                            | j	        |         |                     | j	        |xx         dz  cc<   d S )Nbarrier-r   -r%   name
sortedget_self_rankray	get_actorr>   addgetr+   remoter<   )r   rC   rD   barrier_keybarriers        r    sendzMockNcclGroup.sendW   s    d0022I>??BQBB+a.BB-[111'"""##DL$=vFFGGG[!!!Q&!!!!!r"   Nshapedtype	allocatorc                    t          |                                 |g          }d|d          d|d          }t          j        |          }| j                            |           t          j        |j                            | j	        |                             }|
J d             |||          }|d d          |d d <   | j	        |xx         dz  cc<   |S )NrF   r   rG   r%   rH   z4torch tensor allocator is required for MockNcclGrouprJ   )	r   rU   rV   rD   rW   rR   rS   received_tensorbufs	            r    recvzMockNcclGroup.recv`   s     d0022I>??BQBB+a.BB-[111'"""'',"5"5dl;6O"P"PQQ!!A "!!iu%% #AAA[!!!Q&!!!
r"   returnc                 B    | j         D ]}t          j        |           d S r'   )r>   rM   kill)r   rS   s     r    destroyzMockNcclGroup.destroyu   s0    } 	 	GHW	 	r"   r'   )r\   N)r,   r-   r.   r/   r!   torchTensorr   rT   r   rV   r   r   r[   r_   __classcell__rB   s   @r    r9   r9   J   s         
    '5< 'C ' ' ' ' 59 Sz { 	
 01   *       r"   r9   c                     t          j                    } d| j        j        _        t           j                            d| t          j                    t          j                    d          }|                                 t          t          j
        j        j        _        t          j        dd           }|                                 t          j        dd           }|                                 t          j        d	t          j        d
                    }|                                 t          j        dd          }|                                 t          j        dd           }|                                 t          j        dt           j        t"          j                  }|                                 t&          j                                        }|                    t          j        d
                     dS )z*
    Patch methods that require CUDA.
    r   zsys.modules)z	cupy.cudacupyz$ray.util.collective.collective_groupztorch.cuda.current_streamc                      t           S r'   r2   r0   r"   r    r(   z!start_nccl_mock.<locals>.<lambda>   s    . r"   )r   ztorch.cuda.Streamc                      t           S r'   rg   r0   r"   r    r(   z!start_nccl_mock.<locals>.<lambda>   s    . r"   ztorch.Tensor.devicecudaztorch.Tensor.is_cudaTzQray.experimental.channel.torch_tensor_accelerator_channel._torch_tensor_allocatorc                 .    t          j        | |          S )N)rV   )r`   empty)rU   rV   s     r    r(   z!start_nccl_mock.<locals>.<lambda>   s    U[e<<< r"   r   r   N)r   	MagicMockncclget_unique_idr   r   dictr   r9   rM   experimentalchannelr   
_NcclGroupr`   devicer   r	   r   ray_channelChannelContextget_currentset_torch_device)	nccl_mock
cp_patcherstream_patchernew_stream_patchertensor_patchertensor_allocator_patcherr   ctxs           r    start_nccl_mockr   z   s   
   I01IN -"N$$48N4D4D	
 	
 J  6CC'2 Z#2H2H  N *@*@   Z 5u|F7K7KLLNZ 6==N#z[<<    ""$$$ #jK&Z  
 !!###

$
0
0
2
2Cf--.....r"   c                   ,     e Zd ZdZ fdZ fdZ xZS )TracedChannelzA
    Patched Channel that records all write ops for testing.
    c                 H     t                      j        |i | g | _        d S r'   )r;   r!   opsr?   s      r    r!   zTracedChannel.__init__   s*    $)&)))r"   c                 n    | j                             ||f            t                      j        |i |S r'   )r   appendr;   writer?   s      r    r   zTracedChannel.write   s6    v'''uww}d-f---r"   )r,   r-   r.   r/   r!   r   rb   rc   s   @r    r   r      s[             
. . . . . . . . .r"   r   )r   collectionsr   typingr   r   unittestr   r`   rM   ray.dagray.experimental.channelrp   rq   rt   r   %ray.experimental.channel.communicatorr   ray.experimental.util.typesr	   rQ   r   r2   rr   r9   r   shared_memory_channelChannelr   r0   r"   r    <module>r      s    # # # # # # " " " " " " " "        



  . . . . . . . . . / / / / / / F F F F F F . . . . . . Q. . . . . . . .b       - - - - -J) - - -`0/ 0/ 0/f. . . . .K5= . . . . .r"   