
    )`is                        d Z ddlZddlZddlZddlmZ ddlmZmZ ddl	m
Z
 ddlZddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ d Z G d de
          ZdZ G d de          Zej        d             Zdej         fdej!        dede"deej!                 dedej!        fdZ#ddddej         fdej!        dej!        dej!        dedee$         deej!                 d eej!                 de"dedeej!        ej!        f         fd!Z% ed"          	 	 d:d#ed$ej&        d%ee         d&ee'         deeej!        e'f         f
d'            Z( ed(          	 d;d)ej!        d*e'd+e'd,e'd-ej!        d.e'd/e'd0e"de"d1eej!                 ddfd2            Z) ed3          d4ej!        d5ej!        d6ej!        d*e'd+e'd7e'd,e'd-ej!        d.e'd/e'dej!        de$d8ej!        de"ddfd9            Z*dS )<zE
MNNVL (Multi-Node NVLink) communication operations for FlashInfer.

    N)SimpleNamespace)OptionalTuple)Enum)
deprecated)Mapping   )gen_trtllm_mnnvl_comm_module)register_custom_op   )McastGPUBufferCommBackend
MPIBackend)AllReduceFusionWorkspacec                  F    ddl m}  	 | j                                         d S )Nr   MPI)mpi4pyr   
COMM_WORLDBarrierr   s    s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/flashinfer/comm/trtllm_mnnvl_ar.pympi_barrierr      s1    IN    c                   P    e Zd ZdZdZdZededededej	        dd f
d	            Z
d
S )MNNVLAllreduceFusionStrategyr   r   c   tp_size
num_tokens
hidden_dimdtypereturnc                     t          j        g |                                          }||z  | z  |z  t          k    rt          j        S t          j        S )Nr    )torchtensorelement_sizeMNNVL_ONE_SHOT_THRESHOLDr   ONESHOTTWOSHOT)r   r   r   r    	elem_sizes        r   select_strategyz,MNNVLAllreduceFusionStrategy.select_strategy$   sQ     L5111>>@@	
"W,y8<TTT/77/77r   N)__name__
__module____qualname__r(   r)   AUTOstaticmethodintr$   r    r+    r   r   r   r      sk        GGD88"%8368?D{8	'8 8 8 \8 8 8r   r   i   c                   f    e Zd ZdZ	 	 	 	 	 ddedee         dee         deej                 dee         dee	         f fd	Z
ej        ej        fd
edededej        dedefd            Zeej        ej        fd
edededej        dedefd                        Zedefd            ZddZ xZS )MNNVLAllReduceFusionWorkspace   Nmappingmax_num_tokensr   r    buffer_size_in_bytescomm_backendc           	      ^   t                                          |j        |j                   ||||
J d            t	          j        g |                                          }t          t          |j	        |z  |z  z  |          }| 
                    |j	        |||t          j                  }	| 
                    |j	        |||t          j                  }
t          |	|
          }nt          j        d| d           |t#                      }|dk    rt%          d| d          || j        z  }|j        | _        |j	        | _	        t          j        d	|j	         d
|j         d| d           t+          ||j	        |j        t	          j        d|j                  |          | _        | j        j        }t5          j        || j        z            dz  dz  | _        | j        | j        z  | _        t          j        d| d| j         d| j         d           | j                            |j        t          j                   t          j         !                                 |"                                 dgdz  }t	          j        dd| j        dg|dt          j#        t	          j        d|j                            | _$        | j        %                                | _&        | j        '                    | j                  | _(        | j        )                                | _*        dS )a  
        Initialize the MNNVL Allreduce Fusion Workspace. The workspace will be allocated and initialized based on the provided problem size. If max_num_tokens is larger than the one-shot threshold, the workspace will be created according to the max of required one-shot size at threshold, or the required two-shot size. Note that the workspace is not bind to the given problem size. It can be reused for different problem size without reinitialization given the allocated size is sufficient.

        If the buffer_size_in_bytes is provided, the workspace will be created according to the provided size. The user is expected to use the utility function get_required_buffer_size_bytes to calculate the required size. The actual allocation size may be larger due to alignment requirements. This covers the advanced used case, for example, the user may want to enforce oneshot strategy and ignore the heuristics.

        Either max_num_tokens or buffer_size_in_bytes must be provided.

        comm_backend will be used for creating the workspace and synchronization. If not provided, MPIBackend will be used which will use COMM_WORLD for synchronization.

        Args:
            mapping: Mapping configuration containing rank info
            max_num_tokens: The maximum number of tokens in the input tensor.
            hidden_dim: The hidden dimension of the tensors to be reduced.
            dtype: The data type of the tensors to be reduced.
            buffer_size_in_bytes: The requested size in bytes for each lamport buffer. The actual allocation size may be larger due to alignment requirements. The actual usable size will be NUM_LAMPORT_BUFFERS * actual_buffer_size_per_lamport_buffer.
        Nz_max_num_tokens, hidden_dim, and dtype must be provided if buffer_size_in_bytes is not provided.r#   z@[MNNVL Allreduce] Using provided buffer size override in bytes:  bytes.l    zThe buffer size in bytes z9 is greater than the maximum supported size (UINT32_MAX).z[MNNVL Allreduce] TP size: z, rank: z+, Allocating workspace with requested size z bytes per buffer.cuda   z)[MNNVL Allreduce] Actual allocated size: z/ bytes, Actual buffer size per lamport buffer: z bytes, total workspace: r      r	   )r    device)+super__init__
world_sizerankr$   r%   r&   minr'   r   get_required_buffer_size_bytesr   r(   r)   maxloggingdebugr   
ValueErrorNUM_LAMPORT_BUFFERStp_rankr   r?   
local_rankmcast_buffer_handlebuf_sizemathfloorbuffer_size_bytesworkspace_size_byteslamport_initializefloat32r<   synchronizebarrieruint32buffer_flagsget_buffer_ptrs_devuc_ptrs_devget_unicast_ptruc_ptr_localget_multicast_ptrmc_ptr)selfr6   r7   r   r    r8   r9   r*   oneshot_max_num_tokensone_shot_size_bytestwo_shot_size_bytesrequested_workspace_sizeallocated_sizenum_bytes_to_clear	__class__s                 r   rA   z&MNNVLAllReduceFusionWorkspace.__init__6   s   2 	+W\:::'**%%q &%& Ru555BBDDI%((W_y-H:-UV& &" #'"E"E&,4# # #'"E"E,4# # $'':<O#P#P  MpSgppp   %<<L9--{,@{{{  
 $8$:R#R O	 h'/  h  h7?  h  h  @T  h  h  h	
 	
 	

 $2$OOL!344$
 $
  1: J~(@@AARG"L 	 %)$:T=U$U! K  K  Kw{  xN  K  K  im  iB  K  K  K	
 	
 	

 	 33GOU]SSS
     S1W!L4)1E/AE1E,<(:;;
 
 
  3GGII 4DDTYOO.@@BBr   r   r   strategyr!   c                 R    |                      |||||          }|| j        k    rdS dS )N
        Calculate the required buffer size for a given problem size.
        FT)rE   rQ   )r_   r   r   r   r    rg   required_buffer_sizes          r   is_buffer_size_sufficientz7MNNVLAllReduceFusionWorkspace.is_buffer_size_sufficient   s?      $BBZUH 
  
  $"88854r   c                 .   t          j        g |                                          }|t          j        k    rt                              | |||          }|t          j        k    r||z  | z  |z  }n#dt          j        || z            z  | z  |z  |z  }|S )ri   r#   r	   )	r$   r%   r&   r   r/   r+   r(   rO   ceil)r   r   r   r    rg   r*   buffer_sizes          r   rE   z<MNNVLAllReduceFusionWorkspace.get_required_buffer_size_bytes   s     L5111>>@@	38883CCZ H 3;;;$z1G;iGKK
 DIj72333g=
JYV  r   c                     dS )Nmnnvlr2   r_   s    r   backendz%MNNVLAllReduceFusionWorkspace.backend   s    wr   c                 N    t          | dd          rdS | `| `| `| `| `d| _        dS )z%Destroy workspace and free resources.
_destroyedFNT)getattrrM   rX   rZ   r\   r^   rt   rq   s    r   destroyz%MNNVLAllReduceFusionWorkspace.destroy   sC    4u-- 	F$Kr   )NNNNN)r!   N)r,   r-   r.   rJ   r   r   r1   r$   r    r   rA   	functoolscacher   r/   boolrk   r0   rE   propertystrrr   rv   __classcell__)rf   s   @r   r4   r4   3   s       
 )-$('+.2.2yC yCyC !yC SM	yC
 $yC 'smyC {+yC yC yC yC yC yCv _ 2N1R   	
 { / 
   _& _ 2N1R   {	
 / 
   _ \6     X
 
 
 
 
 
 
 
r   r4   c            !         t                                                      t          dg d          dt          j        dt
          dt
          dt
          dt          j        d	t
          d
t
          dt          dt          dt          dt          j        dt          t          j                 dt          t          j                 dt          t          j                 dt          t                   dd f fd            } t          |           S )Nz)flashinfer::trtllm_mnnvl_allreduce_fusion)inputmulticast_buffer_ptrbuffer_ptrs_devbuffer_ptr_localbuffer_flags_mnnvlnranksrC   rmsnorm_fusionlaunch_with_pdluse_oneshotoutputresidual_outresidual_ingammaepsilon)mutates_argsr~   r   r   r   r   r   rC   r   r   r   r   r   r   r   r   r!   c                 N                         | |||||||||	|
||||           dS )a  
        Perform a multi-node NVLink all-reduce operation with fusion.
        Args:
            input: Input tensor
            multicast_buffer_ptr: Pointer to the multicast buffer as an integer
            buffer_ptrs_dev: Pointer to the device array of buffer pointers as an integer
            buffer_ptr_local: Pointer to local buffer as an integer
            buffer_flags_mnnvl: Buffer flags tensor for synchronization
            nranks: Total number of ranks participating in the all-reduce
            rank: Current process rank
            rmsnorm_fusion: Whether to perform RMSNorm fusion
            launch_with_pdl: Whether to launch with PDL
            use_oneshot: Whether to use one-shot (true) or two-shot (false)
            output: Output tensor
            residual_out: Residual output tensor (if rmsnorm)
            gamma: Gamma tensor (if rmsnorm)
            epsilon: Epsilon value (if rmsnorm)
        Ntrtllm_mnnvl_allreduce_fusion)r~   r   r   r   r   r   rC   r   r   r   r   r   r   r   r   modules                  r   r   zCget_trtllm_mnnvl_comm_module.<locals>.trtllm_mnnvl_allreduce_fusion   sU    n 	,, 	
 	
 	
 	
 	
r   r   )
r
   build_and_loadr   r$   Tensorr1   ry   r   floatr   )r   r   s    @r   get_trtllm_mnnvl_comm_moduler      s>   )++::<<F3
 
 
  (3
|3
!3
 3
 	3

 "L3
 3
 3
 3
 3
 3
 3
 u|,3
 el+3
 %3
 %3
  
!3
 3
 3
 3
 3
) (3
j &C   r   r~   	workspacer   r   rg   r!   c                    t          | j                  dk    r-t          dt          | j                   d| j         d          |t          j        |           }nEt          |j                  dk    r-t          dt          |j                   d|j         d          t                      }|t          j        k    r=t                              |j	        | j        d         | j        d         | j
                  }|                    |j	        | j        d         | j        d         | j
        |          sRt          d	|j         d
|                    |j	        | j        d         | j        d         | j
        |           d          |                    | |j        |j        |j        |j        |j	        |j        d||t          j        k    |dddd           |S )a1  Perform a multi-node NVLink all-reduce operation across multiple GPUs.

    This function performs an all-reduce (sum) operation using NVIDIA's multi-node NVLink (MNNVL)
    technology to efficiently combine tensors across multiple GPUs and nodes.

    There are 2 variants: One-shot and Two-shot:
     - One-shot: Each rank stores local shard to all other ranks. Each ranks will receive all shards at the end of the communication round and perfom local reduction. Suitable for small data size and is optimized for low latency.
     - Two-shot: There will be 3 steps:
        1. Scatter each GPU's input shard to other ranks. Each rank will received all shards of a slice of tokens.
        2. Each rank perform reduction on the local tokens.
        3. Each rank broadcast the result to all ranks.
        Suitable for large data size and is optimized for balancing throughput and latency.

    Args:
        input: Local Input Shard [num_tokens, hidden_dim]
        workspace: MNNVLAllReduceFusionWorkspace
        launch_with_pdl: Whether to launch with PDL
        output: Output tensor to store the result, empty tensor will be created if not provided.
        strategy: MNNVLAllreduceFusionStrategy. Internal heuristics will be used if not provided.
    Returns:
        output: Reduced tensor [num_tokens, hidden_dim]
    r	   !The input tensor must be 2D, got D. The shape is .N"The output tensor must be 2D, got r   r   [The buffer size in the given workspace is insufficient for the given problem size. Buffer:  bytes, Required: r;   F)lenshaperI   r$   
empty_liker   r   r/   r+   r   r    rk   rQ   rE   r   r^   rZ   r\   rX   rC   r(   )r~   r   r   r   rg   r   s         r   trtllm_mnnvl_allreducer   E  s\   > 5;1`EK0@0@``RWR]```
 
 	
 ~!%((	V\		a		cV\1B1BccTZT`ccc
 
 	
 *++F/444/??u{1~u{1~u{
 
 ..5;q>5;q>5;  
  Ujs  kF  U  U  Zc  ZB  ZB  CL  CT  V[  Va  bc  Vd  fk  fq  rs  ft  v{  vA  CK  ZL  ZL  U  U  U
 
 	
 ((088  $ Mr   Fr   r   r   r   c	                    |t          j        | j                  j        }t	          | j                  dk    r-t          dt	          | j                   d| j         d          t	          |j                  dk    r-t          dt	          |j                   d|j         d          |                                | j        d         k    r3t          d|                                 d	| j        d          d
          |t          j        |           }nEt	          |j                  dk    r-t          dt	          |j                   d|j         d          |t          j        |          }nEt	          |j                  dk    r-t          dt	          |j                   d|j         d          t                      }	|t          j        k    r=t                              |j        | j        d         | j        d         | j                  }|                    |j        | j        d         | j        d         | j        |          sRt          d|j         d|                    |j        | j        d         | j        d         | j        |           d          |	                    | |j        |j        |j        |j        |j        |j        d||t          j        k    |||||           ||fS )a  Performs MNNVL Allreduce + Residual + RMSNorm.

    This function performs a multi-node all-reduce (sum) operation by first calling trtllm_mnnvl_allreduce on the shard_input.
    After this, it performs residual addition and RMSNorm on the all-reduced result, reading it directly from the multicast buffer.
    Note: multicast buffer is the same as the unicast buffer for the current rank.

    Args:
        input: Input tensor [num_tokens, hidden_dim]
        residual_in: Residual input tensor [num_tokens, hidden_dim]
        gamma: Gamma tensor [hidden_dim]
        workspace: MNNVLAllReduceFusionWorkspace
        epsilon: The epsilon parameter for RMSNorm, torch.finfo.eps will be used if not provided.
        output: Output tensor for normalized results [num_tokens, hidden_dim], empty tensor will be created if not provided.
        residual_out: Residual output tensor [num_tokens, hidden_dim], empty tensor will be created if not provided.
        launch_with_pdl: Whether to launch with PDL
        strategy: MNNVLAllreduceFusionStrategy. Internal heuristics will be used if not provided.

    Returns:
        output: Add-residual and normalized tensor [num_tokens, hidden_dim]
        residual_out: Add-residual tensor [num_tokens, hidden_dim]
    Nr	   r   r   r   *The residual input tensor must be 2D, got r   TThe gamma tensor must have the same number of elements as the hidden dimension, got  elements but expected 
 elements.r   z+The residual output tensor must be 2D, got r   r   r   r;   T)r$   finfor    epsr   r   rI   numelr   r   r   r/   r+   r   rk   rQ   rE   r   r^   rZ   r\   rX   rC   r(   )
r~   r   r   r   r   r   r   r   rg   r   s
             r   (trtllm_mnnvl_fused_allreduce_add_rmsnormr     s   B +ek**.
5;1`EK0@0@``RWR]```
 
 	
 ;""u[=N9O9Ouualaruuu
 
 	
 {{}}A&& dchcncncpcp  d  d  JO  JU  VW  JX  d  d  d
 
 	
 ~!%((	V\		a		cV\1B1BccTZT`ccc
 
 	
 '44	\	 	 A	%	%x#l>P:Q:Qxxcocuxxx
 
 	
 *++F/444/??u{1~u{1~u{
 
 ..5;q>5;q>5;  
  Ujs  kF  U  U  Zc  ZB  ZB  CL  CT  V[  Va  bc  Vd  fk  fq  rs  ft  v{  vA  CK  ZL  ZL  U  U  U
 
 	
 ((088  " <r   ztget_allreduce_mnnvl_workspace is deprecated, use MNNVLAllReduceFusionWorkspace class to manage the workspace insteadr6   r     comm_backend_for_handle_transferr8   c                     d|j         z  }d}||nd}t          j        |||z  z            ||z  z  }t          | ||          }|j        }|j        }	|j        |z  }
||	|
fS )a  Get workspace buffers needed for multi-node NVLink all-reduce operation.

    This function allocates and initializes the workspace buffers required for performing
    multi-node NVLink all-reduce operations. It creates:
    1. A multicast GPU buffer for communication between nodes
    2. A flags tensor to track buffer state
    3. Maximum number of elements that can fit in the buffer

    The buffer size is calculated to efficiently handle common hidden dimensions
    (2048, 4096, 5120, 7168, 8192) by using their LCM of 286720.

    Args:
        mapping: Tensor parallel mapping configuration containing rank info
        dtype: Data type of the tensors being reduced
        buffer_size_in_bytes: Optional buffer size. Practically, assign this to 3 * 2 * dtype.itemsize * hidden_dim * max_tokens

    Returns:
        Tuple containing:
        - McastGPUBuffer: Multicast buffer for inter-node communication
        - torch.Tensor: Buffer flags tensor tracking state
        - int: Maximum number of elements that can fit in buffer
       i ` Ni  )r8   r9   )itemsizerO   rm   r4   rM   rX   rQ   )r6   r    r   r8   stridelcm_hidden_dimTARGET_WORKSPACE_SIZE_BYTESr   mcast_bufferrX   max_num_elementss              r   get_allreduce_mnnvl_workspacer     s    @ U^#F N 4 @j    9#~'>? 	&	 "
 .15  I 0L)L 2f< 	 r   zwtrtllm_mnnvl_all_reduce is deprecated, use trtllm_mnnvl_allreduce instead. This function will be removed in the future.inpr   r   buffer_Mr   r   rC   wait_for_resultsoutc
                 x   t          | j                  dk    r-t          dt          | j                   d| j         d          | j        d         |k    r$t          d| j        d          d| d| d          |r|	
J d
            t                      }
|
                    | ||d|||d|d|	d	d	d	d	           d	S )a2  Perform a multi-node NVLink all-reduce operation across multiple GPUs.

    This function performs an all-reduce (sum) operation using NVIDIA's multi-node NVLink (MNNVL)
    technology to efficiently combine tensors across multiple GPUs and nodes.

    There are 3 steps:
    1. scatter each GPU's input shard to the right unicast buffer
    2. perform all-reduce on each GPU
    3. broadcast the result to all GPUs

    Args:
        inp: Local Input Shard
        multicast_buffer_ptr: Pointer to the multicast buffer as an integer
        buffer_ptrs_dev: Pointer to device buffer pointers as an integer
        buffer_M: Maximum number of elements // hidden_dim
        buffer_flags_mnnvl: Tensor containing buffer state flags
        nranks: Total number of ranks participating in the all-reduce
        rank: Current process rank
        wait_for_results: If True, store the result to out
        launch_with_pdl: If True, launch using Programmatic Dependent Launch
        [Optional] out: Output tensor to store the result (required if wait_for_results is True)

    r	   r   r   r   r   )The number of tokens in the input tensor  is greater than the buffer_M i. This is not supported. Please increase the workspace size, or decrease the amount of tokens to at most NzCalling the legacy trtllm_mnnvl_all_reduce with wait_for_results=False is not supported. Please use trtllm_mnnvl_allreduce instead.F)r   r   rI   r   r   )r   r   r   r   r   r   rC   r   r   r   r   s              r   trtllm_mnnvl_all_reducer   0  s/   N 39~~\CI\\PSPY\\\
 
 	

 y|h c	!  c  cdl  c  c  X`  c  c  c
 
 	

   	N "11 *++F
((	    r   ztrtllm_mnnvl_fused_allreduce_rmsnorm is deprecated, use trtllm_mnnvl_fused_allreduce_add_rmsnorm instead. This function will be removed in the future.prenorm_outputnormed_outputshard_inputunicast_ptrresidualc                    t          |j                  dk    r-t          dt          |j                   d|j         d          |j        d         |k    r$t          d|j        d          d| d| d          t          |j                  dk    r-t          d	t          |j                   d|j         d          |
                                |j        d
         k    r3t          d|
                                 d|j        d
          d          t          |j                  dk    r-t          dt          |j                   d|j         d          t          | j                  dk    r-t          dt          | j                   d| j         d          t	                      }|                    |||||||	d|d|| ||
|           dS )a  Performs MNNVL TwoShot Allreduce + RMSNorm.

    This function performs a multi-node all-reduce (sum) operation by first calling trtllm_mnnvl_all_reduce on the shard_input.
    After this, it performs RMSNorm on the all-reduced result, reading it directly from the multicast buffer.
    Note: multicast buffer is the same as the unicast buffer for the current rank.

    Args:
        prenorm_output: Output tensor for prenorm results
        normed_output: Output tensor for normalized results
        shard_input: Input tensor shard
        multicast_buffer_ptr: Pointer address as integer for multicast buffer
        buffer_ptrs_dev: Pointer address as integer for device buffer pointers
        unicast_ptr: Pointer address as integer for unicast buffer
        buffer_M: Maximum number of elements // hidden_dim
        buffer_flags_mnnvl: Buffer flags for synchronization
        nranks: Number of ranks in the tensor parallel group
        rank: Current rank in the tensor parallel group
        gamma: The gamma (norm weight) parameter for RMSNorm
        epsilon: The epsilon parameter for RMSNorm
        residual: The residual tensor to add
        launch_with_pdl: Whether to launch with PDL

    r	   r   r   r   r   r   r   r   r   r   r   r   r   r   z*The prenorm output tensor must be 2D, got TFN)r   r   rI   r   r   r   )r   r   r   r   r   r   r   r   r   rC   r   r   r   r   r   s                  r   $trtllm_mnnvl_fused_allreduce_rmsnormr   z  s   T ;""lK4E0F0FllXcXilll
 
 	

 h&& k8I!8L  k  klt  k  k  `h  k  k  k
 
 	
 8>aoX^9L9Loo^f^looo
 
 	
 {{}})!,,, jchcncncpcp  j  j  JU  J[  \]  J^  j  j  j
 
 	
 =1$$q]5H1I1Iqq[h[nqqq
 
 	
 >  A%%{^=Q9R9R{{drdx{{{
 
 	
 *++F
((    r   )NN)N)+__doc__rw   rO   rG   typesr   typingr   r   enumr   r$   typing_extensionsr   flashinfer.comm.mappingr   jitr
   utilsr   rp   r   r   r   workspace_baser   r   r   r'   r4   rx   r   r/   r   ry   r   r   r   r    r1   r   r   r   r2   r   r   <module>r      s   
       ! ! ! ! ! ! " " " " " " " "        ( ( ( ( ( ( + + + + + + . . . . . . & & & & & & : : : : : : : : : : 4 4 4 4 4 4  8 8 8 8 84 8 8 8" - } } } } }$< } } }@ N N Nj &*-I-NK K<K,K K U\"	K
 +K \K K K Kf  $%)+/!-I-N[  [ <[ [  <[  -	[ 
 e_[  U\"[  5<([  [  +[  5<%&[  [  [  [ ~ z  ?C*.	8 88;8 '/{&;8 #3-	8
 >5<,-8 8 8 8v }  #'D D	DD D 	D
 D D D D D 
%,	D 
D D D DN  ] WLW<W W 	W
 W W W W W W <W W lW W 
W W W W W Wr   