
    -`iN                        d dl mZ d dlZd dlmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZmZ dd	lmZmZ  e            rd d
lmZ d dlmZ d dlmZ  ee          Z G d de          Z  G d de          Z! G d de          Z" G d de          Z# G d de#          Z$ G d de#          Z% G d de          Z& G d de          Z'dS )    )AnyN)get_dp_groupget_ep_group)get_forward_context)init_logger)has_flashinfer_all2all)has_deep_ephas_morihas_pplx   )All2AllManagerBaseCache)Mapping)MnnvlConfig)MnnvlMoec                        e Zd ZdZ fdZdej        dej        dedej        fdZ	 	 dd
ej        dej        dede	ej                 d	z  de
ej        ej        f         f
dZ	 dd
ej        dedej        fdZd Z xZS )NaiveAll2AllManagerz
    A naive implementation of all2all communication.
    It uses all-reduce under the hood, which is not
    efficient at all. The main purpose is for testing and
    debugging.
    c                 J    t                                          |           d S Nsuper__init__self	cpu_group	__class__s     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/distributed/device_communicators/all2all.pyr   zNaiveAll2AllManager.__init__#   !    #####    xcu_tokens_across_sp_cpuis_sequence_parallelreturnc                     t          |j                  dk    sJ t          j        |d         |                    d          f|j        |j                  }|r| j        n| j        }|r| j	        n| j
        }|dk    rdn
||dz
           }||         }|||d d f                             |           t          |          D ]K}	|	dk    rdn
||	dz
           }||	         }t                                          |||d d f         |	           L|S )N   r   )devicedtyper   )lenshapetorchemptysizer'   r(   rankdp_rank
world_sizedp_world_sizecopy_ranger   	broadcast)
r   r    r!   r"   bufferr.   r0   startendidxs
             r   naive_multicastz#NaiveAll2AllManager.naive_multicast&   s4    17||q    $R(!&&))4QXQW
 
 
 1Btyydl(<TT__$BT
QYY$;D1H$E%d+uSy!!!|""1%%%$$ 	@ 	@CAA'>sQw'GE)#.CNN$$VE#IqqqL%93????r   FNhidden_statesrouter_logitsextra_tensorsc                     |t          d          |r| j        j        nd}t                      j        }|J |                    |          }|                     |||          }|                     |||          }||fS )Nz6extra_tensors is not supported for NaiveAll2AllManagerr   )NotImplementedErrortp_groupr0   r   dp_metadatacu_tokens_across_spr9   )r   r:   r;   r"   r<   sp_sizer@   r!   s           r   dispatchzNaiveAll2AllManager.dispatch>   s     $%H   /CI$-**)++7&&&"-"A"A'"J"J,,24H
 
 ,,24H
 
 m++r   c                 2   |r| j         n| j        }t                      j        }|J |r| j        j        nd}|                    |          }|dk    rdn
||dz
           }||         }t                                          |          }	|	||d d f         }|S )Nr   r   )	r.   r/   r   r@   r?   r0   rA   r   
all_reduce)
r   r:   r"   ep_rankr@   rB   r!   r6   r7   all_hidden_statess
             r   combinezNaiveAll2AllManager.combineW   s      4E$)))++7&&&.BI$-**"-"A"A'"J"J\\'>w{'K%g.(NN55mDD)%)QQQ,7r   c                     d S r    r   s    r   destroyzNaiveAll2AllManager.destroyh       r   FNF)__name__
__module____qualname____doc__r   r+   Tensorboolr9   listtuplerC   rH   rL   __classcell__r   s   @r   r   r      s<        $ $ $ $ $< "' #	
 
   8 &+37, ,|, |, #	,
 EL)D0, 
u|U\)	*, , , ,4 IN "\AE	   "      r   r   c                       e Zd ZdZ fdZ	 	 ddej        dej        dedeej                 dz  d	e	ej        ej        f         e	ej        ej        eej                 f         z  f
d
Z
	 ddej        ded	ej        fdZd Z xZS )AgRsAll2AllManagerzu
    An implementation of all2all communication based on
    all-gather (dispatch) and reduce-scatter (combine).
    c                 J    t                                          |           d S r   r   r   s     r   r   zAgRsAll2AllManager.__init__r   r   r   FNr:   r;   r"   r<   r#   c                    t                      j        }|J |                                }|J |rt                      nt	                      }||j                 |j        d         k    sJ ||g}||                    |           |                    |d|          }	||	d         |	d         |	dd         fS |	d         |	d         fS )zK
        Gather hidden_states and router_logits from all dp ranks.
        Nr   dimsizesr   r%   )	r   r@   get_chunk_sizes_across_dp_rankr   r   rank_in_groupr*   extendall_gatherv)
r   r:   r;   r"   r<   r@   r`   
dist_grouptensors_to_gathergathered_tensorss
             r   rC   zAgRsAll2AllManager.dispatchu   s     *++7&&&::<<   ';O\^^^
Z-.-2Ea2HHHHH*M:$$$]333%11 2 
 
 $$Q')9!)<>Nqrr>RSS"$4Q$777r   c                     t                      j        }|J |                                }|J |rt                      nt	                      }|                    |d|          }|S )zC
        Reduce-scatter hidden_states across all dp ranks.
        Nr   r^   )r   r@   ra   r   r   reduce_scatterv)r   r:   r"   r@   r`   re   s         r   rH   zAgRsAll2AllManager.combine   sq     *++7&&&::<<   ';O\^^^
"22=au2UUr   c                     d S r   rJ   rK   s    r   rL   zAgRsAll2AllManager.destroy   rM   r   rN   rO   )rP   rQ   rR   rS   r   r+   rT   rU   rV   rW   rC   rH   rL   rX   rY   s   @r   r[   r[   l   s        
$ $ $ $ $ &+37 8  8| 8 | 8 #	 8
 EL)D0 8 	elEL()
elD,>>
?	@ 8  8  8  8F IN "\AE	         r   r[   c                        e Zd ZdZ fdZd Z	 	 ddej        dej        ded	e	ej                 dz  d
e
ej        ej        f         f
dZ	 ddej        ded
ej        fdZd Z xZS )PPLXAll2AllManagerz6
    All2All communication based on PPLX kernels.
    c                 "   t                      s
J d            t                                          |           | j        rddlm}m}m} t          	                    d| j
        | j                   | j
        dk    r
 |            n	 |            }t          j        |t          j        | j                  d         | j                   t          	                    d|            ||| j
        | j                   t!                      | _        d S )Nzpplx_kernels not found. Please follow https://github.com/vllm-project/vllm/blob/main/tools/ep_kernels/README.md to install pplx_kernels.r   )nvshmem_alloc_empty_unique_idnvshmem_get_unique_idnvshmem_initz;Initialize NVSHMEM for pplx_kernels: rank=%d, world size=%d)srcgroupzPPLX NVSHMEM UID = %s)r   r   r   	internodepplx_kernels.nvshmemrn   ro   rp   loggerdebugr.   r0   distr4   get_process_group_ranksr   r   handle_cache)r   r   rn   ro   rp   uidr   s         r   r   zPPLXAll2AllManager.__init__   sH   zz 	
 	
(	
 	
z 	###> 	:          LLM	   9>> &%'''2244 
 N0@@Cn   
 LL0#666Ldi999!GGr   c                 z    dd l }| j                            || j        r|j        j        n|j        j                  S Nr   )pplx_kernelsry   get_or_creaters   AllToAll	intranode)r   kwargspplxs      r   
get_handlezPPLXAll2AllManager.get_handle   sE    #### ..'+~RDM##4=;R
 
 	
r   FNr:   r;   r"   r<   r#   c                     t           r   r>   r   r:   r;   r"   r<   s        r   rC   zPPLXAll2AllManager.dispatch   
     "!r   c                     t           r   r   r   r:   r"   s      r   rH   zPPLXAll2AllManager.combine   
     "!r   c                 &   | j         j        5  | j         j                                        D ]\  }}|                                 	 d d d            n# 1 swxY w Y   | j        r,ddlm} t          	                    d            |             d S d S )Nr   )nvshmem_finalizezPPLX NVSHMEM finalize)
ry   _lock_cacheitemsrL   rs   rt   r   ru   rv   )r   _handler   s       r   rL   zPPLXAll2AllManager.destroy   s    $ 	! 	!!.5;;== ! !	6    !	! 	! 	! 	! 	! 	! 	! 	! 	! 	! 	! 	! 	! 	! 	! > 	      LL0111	 	s   9AAArN   rO   rP   rQ   rR   rS   r   r   r+   rT   rU   rV   rW   rC   rH   rL   rX   rY   s   @r   rl   rl      s        "$ "$ "$ "$ "$H
 
 
 &+37" "|" |" #	"
 EL)D0" 
u|U\)	*" " " " IN" ""\"AE"	" " " "
      r   rl   c                        e Zd ZdZ fdZd Z	 	 ddej        dej        ded	e	ej                 dz  d
e
ej        ej        f         f
dZ	 ddej        ded
ej        fdZd Z xZS )DeepEPAll2AllManagerBaseH
    All2All communication based on DeepEP High-Throughput kernels.
    c                     t                      s
J d            t                                          |           t                      | _        d| _        d S )NzDeepEP kernels not found. Please follow https://github.com/vllm-project/vllm/blob/main/tools/ep_kernels/README.md to install DeepEP kernels.   )r	   r   r   r   ry   num_smsr   s     r   r   z!DeepEPAll2AllManagerBase.__init__   sX    }} 	
 	
*	
 	
} 	###!GG r   c                     t           r   r   r   r   s     r   r   z#DeepEPAll2AllManagerBase.get_handle  s    !!r   FNr:   r;   r"   r<   r#   c                     t           r   r   r   s        r   rC   z!DeepEPAll2AllManagerBase.dispatch  r   r   c                     t           r   r   r   s      r   rH   z DeepEPAll2AllManagerBase.combine  r   r   c                     d S r   rJ   rK   s    r   rL   z DeepEPAll2AllManagerBase.destroy  rM   r   rN   rO   r   rY   s   @r   r   r      s        
 
 
 
 
" " " &+37" "|" |" #	"
 EL)D0" 
u|U\)	*" " " " IN" ""\"AE"	" " " "
      r   r   c                   P     e Zd ZdZ fdZdeeef         fdZd Zde	fdZ
 xZS )DeepEPHTAll2AllManagerr   c                 J    t                                          |           d S r   r   r   s     r   r   zDeepEPHTAll2AllManager.__init__"  r   r   r#   c                     t           j        dz  dz  }d }d }| j        r)t           j        st           j        dz  dz  }| j        dz  }nd}d}|J |J t          | j        ||d|          S )N   r%   r   r   F)rr   num_nvl_bytesnum_rdma_byteslow_latency_modenum_qps_per_rank)envsVLLM_DEEPEP_BUFFER_SIZE_MBrs   ,VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODEr   dictr   )r   r   r   r   s       r   _make_all2all_kwargsz+DeepEPHTAll2AllManager._make_all2all_kwargs%  s    7$>E> 	!$"S 	!!<tCdJN#|q0N )))+++.')"-
 
 
 	
r   c                     t          |          dk    s
J d            dd l}|                                 }t                              d|           | j                            ||j                  }|S )Nr   zfDeepEPHTAll2AllManager expects no arguments. All the required args are computed in the Manager itself.DeepEP all2all args %s)r)   deep_epr   ru   rv   ry   r~   Bufferr   r   r   buffer_kwargsr   s        r   r   z!DeepEPHTAll2AllManager.get_handle<  s}    6{{a7  
 	1133-}===!%!2!@!@7>"
 "
 r   r   c                 f    dd l }|| j        k    r| j        }|j                            |           d S r|   )r   r   r   set_num_sms)r   r   r   s      r   r   z"DeepEPHTAll2AllManager.set_num_smsK  s=    
 T\!!lG""7+++++r   )rP   rQ   rR   rS   r   r   r   r   r   intr   rX   rY   s   @r   r   r     s         $ $ $ $ $
d38n 
 
 
 
.  ,3 , , , , , , , ,r   r   c                   j     e Zd ZdZ fdZdedededededeeef         fd	Zd
 Z	dedz  fdZ
 xZS )DeepEPLLAll2AllManagerzD
    All2All communication based on DeepEP Low-Latency kernels.
    c                 J    t                                          |           d S r   r   r   s     r   r   zDeepEPLLAll2AllManager.__init__[  r   r   max_num_tokens_per_dp_ranktoken_hidden_sizenum_ep_ranksnum_global_expertsnum_local_expertsr#   c           	          ddl }t          j        dz  dz  }|}|j                            ||||          }	|	J t          | j        ||	d|dt          j                  S )a  
        max_num_tokens_per_dp_rank : the maximum number of tokens a DP rank
          can dispatch all the ranks must hold the same value.
        token_hidden_size: the hidden dimension of each token.
        num_ep_ranks: the number of EP group ranks.
        num_global_experts: Number of experts in the model.
        num_local_experts: Number of experts in an EP rank.
        r   Nr   ) num_max_dispatch_tokens_per_rankhidden	num_ranksnum_expertsT)rr   r   r   r   r   !allow_nvlink_for_low_latency_modeallow_mnnvl)r   r   r   r   get_low_latency_rdma_size_hintr   r   !VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL)
r   r   r   r   r   r   r   r   r   r   s
             r   r   z+DeepEPLLAll2AllManager._make_all2all_kwargs^  s      	 7$>E, FF-G$"*	 G 
 
 ))).')!-.2>
 
 
 	
r   c                     ddl } | j        di |}t                              d|           | j                            ||j                  }|S )zd
        The kwargs for DeepEPLLAll2AllManager is dictated by
        _make_all2all_kwargs.
        r   Nr   rJ   )r   r   ru   rv   ry   r~   r   r   s        r   r   z!DeepEPLLAll2AllManager.get_handle  s`    
 	11;;F;;-}===!%!2!@!@7>"
 "
 r   Nc                     dS r|   rJ   rK   s    r   max_sms_usedz#DeepEPLLAll2AllManager.max_sms_used  s    qr   )rP   rQ   rR   rS   r   r   r   r   r   r   r   rX   rY   s   @r   r   r   V  s         $ $ $ $ $%
$'%
 %
 	%

  %
 %
 
c3h%
 %
 %
 %
N  cDj        r   r   c                   ^     e Zd ZU dZeed<   eed<    fdZdededefdZd Zd Z	d	 Z
 xZS )
FlashInferAllToAllManagerz<
    All2All communication based on flashinfer kernels.
    r.   r0   c                     t                      s
J d            t                                          |           t                              d| j        | j                   d| _        d | _        d S )NzDflashinfer all2all module not found. Please install/check flashinferz8Initialize for flashinfer All2All rank=%d, world size=%dF)	r   r   r   ru   rv   r.   r0   initializedalltoall_infor   s     r   r   z"FlashInferAllToAllManager.__init__  s{    %'' 	
 	
R	
 	
' 	###FIO	
 	
 	

 !!r   gpus_per_nodec                    | j         rdS |                                  t                              d||           t	          ||||          | _        ddlm} t           |t                      j
                  dd          }t          j        | j        |          | _        t          j        | j        |          | _        || _        || _        || _        d| _         t                              d	||           dS )
zInitialize workspaceNz"making map: rank=%d, world size=%d)tp_sizer   )CustomCommunicatori    )comm_backendfabric_page_sizeallocation_granularityTz3FlashInfer All2All initialized for rank %s, size %s)r   cleanupru   rv   r   mapping2vllm.distributed.device_communicators.mnnvl_compatr   r   r   r   r   get_moe_workspacesworkspace_tensorget_moe_prepare_workspaceprepare_workspace_tensorr0   r.   r   info)r   r0   r.   r   r   	dp_configs         r   
initializez$FlashInferAllToAllManager.initialize  s%     	F94LLL	
 
 
	
 	
 	
 	
 	
 	
  ++LNN,DEE$#$
 
 
	 !) ;DL) T T(0(JL))
 )
% %	*A4	
 	
 	
 	
 	
r   c                     t                      sdS | j        dk    rdS | j        s1|                     | j        | j        t
          j        j                   | j        S )zEnsure workspace is initializedFr   )r0   r.   r   )r   r0   r   r   r.   r+   cudadevice_countrK   s    r   %ensure_alltoall_workspace_initializedz?FlashInferAllToAllManager.ensure_alltoall_workspace_initialized  sj    %'' 	5?a5 	OO?Y#j5    
 r   c                     | S r   rJ   r   s     r   r   z$FlashInferAllToAllManager.get_handle  s    r   c                 &   | j         r| j        | j        {	 | `| `n2# t          $ r%}t                              d|           Y d}~nd}~ww xY wd| _        d| _        d| _        d| _         dS # d| _        d| _        d| _        d| _         w xY wdS dS dS )zClean up workspaceNz*Failed to cleanup FlashInfer workspace: %sF)r   r   r   	Exceptionru   warningr   )r   es     r   r   z!FlashInferAllToAllManager.cleanup  s     	)%1-9	))11 P P PKQOOOOOOOOP )-%04-##(    )-%04-##( ((((	) 	)1199s+    A, 
AAA, AA, ,B
)rP   rQ   rR   rS   r   __annotations__r   r   r   r   r   rX   rY   s   @r   r   r     s           IIIOOO" " " " ")
)
 )
 	)
 )
 )
 )
V        ) ) ) ) ) ) )r   r   c                   n     e Zd Z fdZdededej        dej        dededed	ed
edefdZd Zd Z	 xZ
S )MoriAll2AllManagerc                 &   t                      s
J d            dd l}t                                          |           t	                      | _        t          j        j        	                    d|           |j
                            d           d S )NzoMoRI kernels not found. Please follow https://github.com/ROCm/mori/blob/main/README.md to install MoRI kernels.r   mori)r
   r   r   r   r   ry   r+   _C_distributed_c10d_register_process_groupshmemshmem_torch_process_group_init)r   r   r   r   s      r   r   zMoriAll2AllManager.__init__  s    zz 	
 	
(	
 	
z 	###!GG"::69MMM
11&99999r   r.   r   input_dtypequant_dtyper   	scale_dimscale_type_sizer   r   num_experts_per_tokenc                 z   dd l }ddlm}m}  |            s |            s
J d            | j        s|j        j        j        }d}d}d}nB|j        j        j        } |            rd}d}d}n  |            rd}d}d}nt          d          t          |||||||j        ||	|
||||t          d|          	          S )
Nr   )	on_gfx942	on_gfx950z2mori currently only support arch gfx942 and gfx950   P          @   )r.   r0   	data_type
hidden_dimr   r   max_token_type_sizemax_num_inp_token_per_ranknum_experts_per_rankr   warp_num_per_block	block_numkernel_typerdma_block_numgpu_per_node)r   vllm.platforms.rocmr   r   rs   opsEpDispatchCombineKernelType	IntraNodeInterNodeV1r>   r   itemsizemin)r   r.   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  s                     r   r   z'MoriAll2AllManager._make_all2all_kwargs  s7    	<<<<<<<<y{{ 	
iikk 	
 	
@	
 	
) ~ 	(>HKN!#II (>JKy{{ %'"	!# %&"	!#)H   #!(+ + 4'A!2"71#)Q--
 
 
 	
r   c                 f    dd l } |j        j        di |}|j                            |          }|S )Nr   rJ   )r   r
  EpDispatchCombineConfigEpDispatchCombineOp)r   r   r   mori_configr   s        r   _make_handlezMoriAll2AllManager._make_handleK  s?    6dh6@@@@--k::r   c                     dd l } | j        di |}t                              d|           | j                            || j                  }|S )Nr   zMoRI all2all args %srJ   )r   r   ru   rv   ry   r~   r  )r   r   r   mori_kwargsr   s        r   r   zMoriAll2AllManager.get_handleR  s_    /d/99&99+[999/3/@/N/N*0
 0
 r   )rP   rQ   rR   r   r   r+   r(   r   r  r   rX   rY   s   @r   r   r      s        : : : : :;
;
 ;
 [	;

 [;
 ;
 ;
 ;
 %(;
 ;
  #;
 ;
 ;
 ;
z        r   r   )(typingr   r+   torch.distributeddistributedrw   	vllm.envsr   vllm.distributedr   r   vllm.forward_contextr   vllm.loggerr   vllm.utils.flashinferr   vllm.utils.import_utilsr	   r
   r   base_device_communicatorr   r   flashinfer.commr   flashinfer.comm.mnnvlr   flashinfer.comm.trtllm_alltoallr   rP   ru   r   r[   rl   r   r   r   r   r   rJ   r   r   <module>r$     s                            7 7 7 7 7 7 7 7 4 4 4 4 4 4 # # # # # # 8 8 8 8 8 8 C C C C C C C C C C ? ? ? ? ? ? ? ? ''''''111111      
X		N N N N N, N N Nb; ; ; ; ;+ ; ; ;|J J J J J+ J J JZ# # # # #1 # # #L6, 6, 6, 6, 6,5 6, 6, 6,r? ? ? ? ?5 ? ? ?De) e) e) e) e) 2 e) e) e)PZ Z Z Z Z+ Z Z Z Z Zr   