
    )`ic                        d Z ddlmZ ddlmZ ddlmZ ddlZddlZddl	m
Z
 dd	lmZmZ dd
lmZ ddlmZ ddlmZ e G d d                      Zej        d             Ze
dej        dededefd            Ze
dej        dee         dededej        dej        fd            Ze
dej        deej                 dej        dej        dedededed efd!            Ze
	 d1d#ej        d$edej        dej        dedededed%ed&edej        fd'            Ze
d(ej        dej        dej        ded)ef
d*            Z e
deded+ed,efd-            Z! G d. d/          Z"g d0Z#dS )2z
MoE All-to-All Operations (Throughput Backend)

This module provides the throughput-optimized all-to-all backend for MoE expert parallelism,
supporting multiple payloads per collective operation.
    )	dataclass)SimpleNamespace)OptionalN   )flashinfer_api   )MnnvlMemoryMnnvlConfig)Mapping)gen_moe_alltoall_module)register_custom_opc                   V    e Zd ZU dZdZeed<   dZee	         ed<   dZ
ee	         ed<   dS )	_A2AStatez3Internal state tracking for MoeAlltoAll operations.idlephaseNlocal_num_tokenscombine_payload_offset)__name__
__module____qualname____doc__r   str__annotations__r   r   intr        w/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/flashinfer/comm/trtllm_moe_alltoall.pyr   r      sR         ==E3&*hsm***,0HSM00000r   r   c                     t                                                      t          dd          dt          j        dt
          dt
          dt
          ffd            } t          d	d          d
t          j        dt          t          j                 dt          j        dt          j        dt
          dt
          dt
          dt
          dt
          ffd            }t          dd          	 d#dt          j        dt
          dt          j        dt          j        dt
          dt
          dt
          dt
          dt
          dt          dt          j        ffd            }t          dd          dt          j        dt          j        dt          j        dt
          dt
          f
fd            }t          dg           fd            }t          d g           dt
          dt
          ffd!            }t          | |||||"          S )$z$Get or build the MOE A2A JIT module.zflashinfer::moe_a2a_initialize)	workspace)mutates_argsr   ep_rankep_sizemax_num_tokensc                 4                         | |||          S N)moe_a2a_initialize)r   r!   r"   r#   modules       r   r&   z3get_moe_alltoall_module.<locals>.moe_a2a_initialize%   s     ((GWnUUUr   zflashinfer::moe_a2a_dispatchtoken_selected_expertsinput_payloadsmetainforuntime_max_tokens_per_ranktop_knum_expertsc	                 >    	                     | ||||||||	  	        S )aW  
        Dispatch tokens and payloads to expert ranks.

        Args:
            token_selected_experts: [local_num_tokens, top_k] int32 tensor
            input_payloads: List of [local_num_tokens, *] tensors to dispatch
            workspace: [ep_size, size_per_rank] workspace tensor
            metainfo: Metadata tensor from initialize
            runtime_max_tokens_per_rank: Max tokens per rank in this batch
            ep_rank: Current expert parallel rank
            ep_size: Total expert parallel size
            top_k: Number of experts per token
            num_experts: Total number of experts

        Returns:
            recv_offsets: List of offsets for each payload in the workspace
            recv_sizes: List of sizes for each payload in the workspace
            combine_payload_offset: Offset for combine payload region
        )moe_a2a_dispatch)
r(   r)   r   r*   r+   r!   r"   r,   r-   r'   s
            r   r/   z1get_moe_alltoall_module.<locals>.moe_a2a_dispatch1   s9    D &&"'

 

 
	
r   zflashinfer::moe_a2a_combineFpayloadr   r   payload_in_workspacereturnc
                 @    
                     | |||||||||	
  
        S )a  
        Combine expert outputs back to originating tokens.

        Args:
            payload: [ep_size, max_tokens, elements_per_token] tensor
            local_num_tokens: Number of tokens on this rank
            workspace: [ep_size, size_per_rank] workspace tensor
            metainfo: Metadata tensor from initialize
            runtime_max_tokens_per_rank: Max tokens per rank in this batch
            ep_rank: Current expert parallel rank
            ep_size: Total expert parallel size
            top_k: Number of experts per token
            combine_payload_offset: Offset from dispatch
            payload_in_workspace: If True, payload is workspace-backed

        Returns:
            output: [local_num_tokens, elements_per_token] tensor
        )moe_a2a_combine)r0   r   r   r*   r+   r!   r"   r,   r   r1   r'   s             r   r4   z0get_moe_alltoall_module.<locals>.moe_a2a_combine_   s<    D %%'" 
 
 	
r   z'flashinfer::moe_a2a_sanitize_expert_ids)
expert_idsr5   invalid_expert_idc                 6                         | ||||          S r%   )moe_a2a_sanitize_expert_ids)r5   r   r*   r!   r6   r'   s        r   r8   z<get_moe_alltoall_module.<locals>.moe_a2a_sanitize_expert_ids   s)     11	8W6G
 
 	
r   z,flashinfer::moe_a2a_get_metainfo_index_pairsc                  ,                                      S )z
        Get all metainfo index constants from C++.

        Returns:
            Tuple of (names, values) where names is a list of constant names
            and values is a list of their corresponding integer values
        ) moe_a2a_get_metainfo_index_pairs)r'   s   r   r:   zAget_moe_alltoall_module.<locals>.moe_a2a_get_metainfo_index_pairs   s     66888r   z%flashinfer::moe_a2a_get_aux_data_sizec                 0                         | |          S )a1  
        Get the auxilary datasize per rank for the MoeAlltoAll operation.

        Args:
            ep_size: Total expert parallel size
            max_num_tokens: Maximum number of tokens across all ranks

        Returns:
            aux_data_size: Size of the auxilary data per rank in bytes
        )moe_a2a_get_aux_data_size)r"   r#   r'   s     r   r<   z:get_moe_alltoall_module.<locals>.moe_a2a_get_aux_data_size   s    $ //HHHr   )r&   r/   r4   r8   r:   r<   F)	r   build_and_loadr   torchTensorr   listboolr   )r&   r/   r4   r8   r:   r<   r'   s         @r   get_moe_alltoall_modulerC       s)    %&&5577F(#  V<VV V 	V V V V V	 V &#  (
 %(
U\*(
 <(
 ,	(

 &)(
 (
 (
 (
 (
 (
 (
 (
 (
	 (
T %#   &+)
 )
)
)
 <)
 ,	)

 &))
 )
 )
 )
 !$)
 #)
 
)
 )
 )
 )
 )
	 )
V 1$  	
L	
<	
 ,	
 		

 	
 	
 	
 	
 	
	 	
 6  9 9 9 9	 9 /  III I I I I	 I  -)'$?)I";   r   r   r!   r"   r#   c                 J    t                                          | |||          S r%   )rC   r&   )r   r!   r"   r#   s       r   r&   r&      s+     #$$777G^  r   leading_shapeslice_start	slice_enddtyper2   c                    | j         dk    r|                     d          } |                     t          j                  }| j         dk    s
J d            ||z
  |j        d         k    s
J d            ||                    d          z  }||                    d          z  }||z
  }||z   }	|	|j        d         k    s
J d             ||||	f                             |          j        g |dR  }
|
S )	a  
    Wrap an offset in the workspace into a tensor.

    Args:
        workspace: [ep_size, size_per_rank] or [size_per_rank] workspace tensor
        leading_shape: The leading shape to wrap the tensor with
        slice_start: The start of the slice in the workspace
        slice_end: The end of the slice in the workspace
        dtype: Data type for the output tensor

    Returns:
        tensor: [leading_shape, *] workspace-backed tensor
    r   r   rH   r   z0workspace must be shape [ep_size, size_per_rank]z4slice_end - slice_start must belong to a single rankz2slice must fall within the workspace size per rank)ndim	unsqueezeviewr?   uint8shapestride)r   rE   rF   rG   rH   workspace_base
slice_ranklocal_slice_startslice_lengthlocal_slice_endresults              r   (moe_a2a_wrap_payload_tensor_in_workspacerX      s=   * ~''**	^^%+^66N>Q R{"n&:1&====> >==  5 5a 8 88J#n&;&;A&>&>>{*L',6On215555< 655	z#4_#DDE	E				"	" 	" 	" 	" 
 Mr   r(   r)   r*   r+   r,   r-   c	                     t                                          | ||||||||	  	        \  }	}
}g }t          ||	|
d          D ]6\  }}}|                    t	          |||g|||z   |j                             7||fS )a  
    Dispatch tokens and payloads to expert ranks.

    Args:
        token_selected_experts: [local_num_tokens, top_k] int32 tensor
        input_payloads: List of [local_num_tokens, *] tensors to dispatch
        workspace: [ep_size, size_per_rank] workspace tensor
        metainfo: Metadata tensor from initialize
        runtime_max_tokens_per_rank: Max tokens per rank in this batch
        ep_rank: Current expert parallel rank
        ep_size: Total expert parallel size
        top_k: Number of experts per token
        num_experts: Total number of experts

    Returns:
        output_payloads: List of payloads for this rank, backed by data in the workspace
        combine_payload_offset: The offset to place the combine payload in the workspace
    Tstrict)rC   r/   zipappendrX   rH   )r(   r)   r   r*   r+   r!   r"   r,   r-   recv_offsets
recv_sizesr   output_payloadsinput_payloadoffsetsizes                   r   r/   r/      s    > 	 !!22"'
	
 
	
 5L*4 O'*j( ( ( 
 
#vt 	456# 	
 	
 	
 	
 222r   Fr0   r   r   r1   c
                 V    t                                          | |||||||||	
  
        S r%   )rC   r4   )
r0   r   r   r*   r+   r!   r"   r,   r   r1   s
             r   r4   r4   >  s@     #$$44#  r   r5   r6   c                 L    t                                          | ||||          S r%   )rC   r8   )r5   r   r*   r!   r6   s        r   r8   r8   Y  s.     #$$@@Ix2C  r   %total_dispatch_payload_size_per_tokencombine_payload_size_per_tokenc                     t                                          | |          }d } ||d           || |z  |z  d          z    || |z  |z  d          z   S )a  
    Get the workspace size per rank for the MoeAlltoAll operation.

    Args:
        ep_size: Total expert parallel size
        max_num_tokens: Maximum number of tokens across all ranks
        total_dispatch_payload_size_per_token: The size of the payload per token in the dispatch phase. This should be the sum of all payloads.
        combine_payload_size_per_token: The size of the payload per token in the combine phase.

    Returns:
        workspace_size_per_rank: Size of the workspace per rank in bytes
    c                     | |z   dz
  |z  |z  S )Nr   r   )xys     r   pad_upz3moe_a2a_get_workspace_size_per_rank.<locals>.pad_up~  s    Qq A%%r      )rC   r<   )r"   r#   rf   rg   aux_data_sizerl   s         r   #moe_a2a_get_workspace_size_per_rankro   f  s    & ,--GG M
& & &
 	}c""
&>),QQSV
W
W	X
&>),JJC
P
P	Qr   c                   (   e Zd ZU dZi Zeeeeeef         ef         ed<   e	dedededede
defd	            Zee	 d"dedededededefd                        ZdZee         ed<   e	d             Z	 	 	 d#de
dedededededee         fdZd Ze	 	 d$dej        deej                 dedee         dee         deej                 fd            Ze	 d%dej        dededej        fd            Zededed ej        dej        fd!            ZdS )&MoeAlltoAlla  
    Manages MoE All-to-All operations with proper workspace allocation and synchronization.

    This class provides the throughput-optimized backend that supports multiple payloads
    per collective operation, explicit dispatch/combine phases, and workspace-backed tensors.

    Example:
        >>> moe_a2a = MoeAlltoAll(mapping, max_num_tokens=2048, top_k=2, num_experts=8)
        >>> recv = moe_a2a.dispatch(experts, [hidden, ids, scales], batch_size)
        >>> output = moe_a2a.combine(processed, batch_size)
    _WORKSPACE_CACHEworkspace_size_per_rankr!   r"   r#   mappingr2   c                     ||||f}|| j         v r| j         |         S t          ||          }|                    t          j                  }t          ||||          }	|||||||	d| j         |<   | j         |         S )N)rs   r#   r!   r"   	mnnvl_memr   r*   )rr   r	   as_torch_strided_tensorr?   rO   r&   )
clsrs   r!   r"   r#   rt   keyrv   r   r*   s
             r   get_workspacezMoeAlltoAll.get_workspace  s     '.I#&&&',,#G-DEEI!99%+FFI)	 H ,C"0""&&$) )C % ',,r   r   r,   hidden_sizeextra_payload_bytes_per_tokenc                     d}t          ||z            |dz  z   |dz  z   |z   }t          ||z            }t          | |||          S )a  
        Convenience wrapper to calculate the workspace size per rank for the MoeAlltoAll operation. Automatically calculates the size of the dispatch and combine payloads when using default values.
        This allocates space assuming 16-bit float, which may overallocate for quantized models. For a tighter bound, use the base function `moe_a2a_get_workspace_size_per_rank` directly.

        Args:
            ep_size: Total expert parallel size
            top_k: Number of experts per token
            max_num_tokens: Maximum number of tokens across all ranks
            hidden_size: Hidden dimension size
            extra_payload_bytes_per_token: Extra size per token in the payload
        Returns:
            workspace_size_per_rank: Size of the workspace per rank in bytes
        r      )r   ro   )r"   r,   r#   r{   r|   element_sizerf   rg   s           r   get_moe_workspace_size_per_rankz+MoeAlltoAll.get_moe_workspace_size_per_rank  su    .  l*++aiai ,, 	. *-[<-G)H)H&21*	
 
 	
r   N_METAINFO_INDEXc                 (   | j         t                      }|                                \  }}i | _         t          ||d          D ]K\  }}|                    d          r|                    dd          n|}t          |          | j         |<   JdS dS )z2Initialize constants from C++ if not already done.NTrZ   MOE_A2A_ )r   rC   r:   r\   
startswithreplacer   )rx   r'   namesvaluesnamevalue
clean_names          r   _init_constantszMoeAlltoAll._init_constants  s     &,..F"CCEEME6 #%C"5&>>> = =e z22DLLR000 
 36e**#J// '&= =r   r-   mnnvl_configc                    |                                   |)|
J d            |                     |j        |||          }t          j                     |rt          j        ||           || _        || _        |j        | _        |j	        | _
        || _        || _        t          | j        t                    r| j        dk    rt          d          t          | j        t                    r| j        dk    rt          d          |                     || j
        | j        | j        |          | _        | j        d         |k    s
J d            | j        d         | j        k    s
J d	            | j        d
         | j
        k    s
J d            | j        d         | j        k    s
J d            | j        d         | _        | j        d         | _        | j        d         | _        t+                      | _        dS )a  
        Initialize MoeAlltoAll with workspace allocation.

        Args:
            mapping: Mapping object containing rank information
            max_num_tokens: Maximum number of tokens supported
            top_k: Number of experts per token
            num_experts: Total number of experts
            workspace_size_per_rank: Size of workspace per rank in bytes, if None hidden_size must be provided
            hidden_size: Hidden dimension size used when calculating the workspace size, if workspace_size_per_rank is not provided
            mnnvl_config: Used to configure the communication backend for the MNNVL memory object
        NzGhidden_size must be provided if workspace_size_per_rank is not providedr   ztop_k must be a positive intz"num_experts must be a positive intrs   zWorkspace size mismatchr#   zMax tokens mismatchr!   zEP rank mismatchr"   zEP size mismatchrv   r   r*   )r   r   moe_ep_sizer	   
initializeset_comm_from_configrs   r#   r"   moe_ep_rankr!   r,   r-   
isinstancer   
ValueErrorrz   
_WORKSPACErv   r   r*   r   _state)selfrt   r#   r,   r-   rs   r{   r   s           r   __init__zMoeAlltoAll.__init__  s   . 	"***Y +** '+&J&J#UNK' '#
 	    	D,WlCCC'>$,**
&$*c** 	=djAoo;<<<$*C00 	CD4D4I4IABBB ,,#LL
 
 89=TTTT% UTT /0D4GGGG! HGG y)T\999;M999y)T\999;M99955
3kkr   c                     t           j                                         | `| j        | j        | j        | j        | j        f= d| j	        _
        dS )zReset the workspace to free up its state. This is mainly used for testing. Use this with caution. This object is no longer usable after this.deletedN)r?   cudasynchronizer   rr   rs   r!   r"   r#   r   r   )r   s    r   _reset_workspacezMoeAlltoAll._reset_workspaceE  sS    
   O!,#	
 &r   r(   r)   r+   invalid_token_expert_idexpert_id_payload_indexc                    | j         j        dk    s
J d            || j        k    s
J d            t          ||| j        | j        || j        | j        | j        | j	        	  	        \  }}|
                    d          | j         _        || j         _        d| j         _        |6|
J d            ||         }t          || j        | j        | j        |           |S )aN  
        Perform MoE all-to-all dispatch operation.

        Args:
            token_selected_experts: [local_num_tokens, top_k] expert indices
            input_payloads: List of [local_num_tokens, *] tensors to dispatch
            runtime_max_tokens_per_rank: Max tokens per rank in this batch
            invalid_token_expert_id: If set, sanitize invalid tokens to this ID
            expert_id_payload_index: Index of expert IDs in input_payloads (required if invalid_token_expert_id is set)

        Returns:
            recv_tensors: List of [ep_size, max_tokens, *] tensors
        r   z%dispatch called twice without combine2runtime_max_tokens_per_rank exceeds max_num_tokensr   
dispatchedNzDexpert_id_payload_index required when invalid_token_expert_id is set)r   r   r#   r/   r   r*   r!   r"   r,   r-   rc   r   r   r8   )	r   r(   r)   r+   r   r   recv_tensorsr   recv_expert_idss	            r   dispatchzMoeAlltoAll.dispatchS  s   , { F***,S****d.AAAA@ BAA 0@"NM'LLJ
0
 
0
,, (>'B'B1'E'E$-C*( #.*66V 766 ++BCO''   r   Fr0   r1   c                    | j         j        dk    s
J d            || j        k    s
J d            t          || j         j        | j        | j        || j        | j        | j	        | j         j
        |
  
        }t                      | _         |S )a  
        Perform MoE all-to-all combine operation.

        Args:
            payload: [ep_size, max_tokens, elements_per_token] tensor
            runtime_max_tokens_per_rank: Max tokens per rank in this batch
            payload_in_workspace: If True, payload is workspace-backed (skip staging)

        Returns:
            output: [local_num_tokens, elements_per_token] tensor
        r   z)combine called before successful dispatchr   )r   r   r#   r4   r   r   r*   r!   r"   r,   r   r   )r   r0   r+   r1   outputs        r   combinezMoeAlltoAll.combine  s    $ { L0007 100 +d.AAAA@ BAA !K(NM'LLJK. 
 
  kkr   rH   c                 6   | j         j        dk    rt          d          t          j        g |                                          }t          | j        | j        ddf         | j	        |g| j         j
        | j         j
        | j	        |z  |z  |z  z   |          S )a  
        Get combine payload tensor backed by workspace (zero-copy).

        This tensor can be written to directly by expert processing, avoiding
        a staging copy in the combine operation.

        Args:
            runtime_max_tokens_per_rank: Max tokens per rank in this batch
            hidden_size: Hidden dimension size
            dtype: Data type for the tensor

        Returns:
            tensor: [ep_size, max_tokens, hidden_size] workspace-backed tensor
        r   zIget_combine_payload_tensor_in_workspace called before successful dispatchrJ   N)r   r   RuntimeErrorr?   tensorr   rX   r   r!   r"   r   )r   r+   r{   rH   r   s        r   'get_combine_payload_tensor_in_workspacez3MoeAlltoAll.get_combine_payload_tensor_in_workspace  s    * ;,,[   |Be444AACC7N4<?+\67K.K.l88;FUV
 
 	
r   )r   )NNN)NNr=   )r   r   r   r   rr   dicttupler   r   classmethodr   rz   staticmethodr   r   r   r   r   r
   r   r   r?   r@   rA   r   rB   r   rH   r   r   r   r   rq   rq     s        
 
 ?Ad5c3!34d:;@@@-!$- - 	-
 - - 
- - - [->  ./'
 '
'
'
 '
 	'

 (+'
 
'
 '
 '
 ^ \'
V '+OXd^***= = [=0 (,.2G" G"G" G" 	G"
 G" "%G" G" {+G" G" G" G"R& & &  26159 9 %9 U\*9 &)	9
 "*#9 "*#9 
el	9 9 9 ^9v 
 &+	( (( &)( #	(
 
( ( ( ^(T !
%(!
 !
 {	!

 
!
 !
 !
 ^!
 !
 !
r   rq   )rq   r4   r/   ro   r&   r8   rX   r=   )$r   dataclassesr   typesr   typingr   r?   	functoolsapi_loggingr   mnnvlr	   r
   rt   r   jit.commr   utilsr   r   cacherC   r@   r   r&   rA   rH   rX   r/   rB   r4   r8   ro   rq   __all__r   r   r   <module>r      s    " ! ! ! ! ! ! ! ! ! ! !            ( ( ( ( ( ( + + + + + + + +       . . . . . . & & & & & & 1 1 1 1 1 1 1 1 e e eP |  	    '|'9' ' 	'
 ;' \' ' ' 'T :3!L:3&:3 |:3 l	:3
 "%:3 :3 :3 :3 :3 :3 :3 :3z  "' \ | l	
 "%       \   4 		|	 l	 		
 	 	 	 	  ,/ %(	   DS
 S
 S
 S
 S
 S
 S
 S
l
  r   