
    -`i-m                        d Z ddlmZmZ ddlmZ ddlZddlZddl	m
Z
mZmZmZmZ ddlmZ  ee          Ze G d d                      Zeej        ej        ef         Zd	ej        d
edej        dej        deeeee         f         eeee         f         f         f
dZd
edej        dej        deej                 deej                 dej        j        dz  dedefdZdeej                 deej                 dej        dej        dedej        deddfdZ	 	 	 	 d&dej        dej        deeej                          deej                 dede dedej        j        dz  deeef         dz  defd Z!	 	 d'dej        dej        deeej                          dede deeef         dz  ddfd!Z"dej        deeef         d"edej        fd#Z#dej        deeef         dej        fd$Z$g d%Z%dS )(zh
The actual execution of the rearrangement.

This involves the exchange of expert weights between GPUs.
    )IterableSequence)	dataclassN)P2POpProcessGroup
all_gatherbatch_isend_irecvget_global_rank)init_loggerc                   `    e Zd ZU dZej        ed<   	 eed<   	 ej        ed<   	 ej        ed<   dS )RecvMetadataz<Metadata describing remote receives during EPLB rebalancing.recv_primary_mask
recv_countrecv_expert_idsrecv_dst_rowsN)__name__
__module____qualname____doc__npndarray__annotations__int     {/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/distributed/eplb/rebalance_execute.pyr   r      sT         FFz!!!KOOO3ZD:NNr   r   
expert_idsnum_local_expertsold_indicesnew_indicesreturnc                    i }i }| j         dk    r||fS t          j        |           }t          |          }t          j        |t          j                  }t          j        ||          }	t          j        ||          }
t          j        |	          r0||	         }||	         }||z  }t          j        ||f          }||         }||         }t          j	        dgt          j
        t          j        |          dk              d         dz   t          |          gg          }t          t          |          dz
            D ]~}||         ||dz            }}t          ||                   }|||         }t          j        |d          \  }}|t          j        |                   }|                                ||<   t          j        |
          rO||
         }||
         }||z  }t          j        ||f          }||         }||         }t          j	        dgt          j
        t          j        |          dk              d         dz   t          |          gg          }t          t          |          dz
            D ]}||         ||dz            }}t          ||                   }|||         }t          j        |d          \  }}|t          j        |                   }t!          |                    |g                     fd|D             }|||<   |D ]#}t          |          }||vrg ||<   ||vrg ||<   $||fS )a  
    Get the ranks of the experts that need to be exchanged.

    Args:
        expert_ids: 1D array of expert indices to query.
        num_local_experts: The number of local experts.
        old_indices: The old indices of the experts.
        new_indices: The new indices of the experts.

    Returns:
        A tuple of two dictionaries mapping expert_id to:
        - ranks_to_send: The ranks that have this expert and need to send.
        - ranks_to_recv: The ranks that need to receive this expert.
    r   dtype   Treturn_indexc                 6    g | ]}|vt          |          S r   )r   ).0rsend_ranks_sets     r   
<listcomp>z3get_ep_ranks_with_experts_batch.<locals>.<listcomp>   s0     ! ! !0G0GA0G0G0Gr   )sizer   uniquelenarangeint32isinanylexsortconcatenatewherediffranger   sorttolistsetget)r   r   r   r    ranks_to_send_mapranks_to_recv_mapunique_expertsnum_positionsposition_indicesold_relevant_masknew_relevant_maskold_relevant_positionsold_relevant_expertsold_relevant_ranks
sort_ordersorted_expertssorted_ranksexpert_boundariesistartendexpertexpert_ranks_
unique_idxunique_ranksnew_relevant_positionsnew_relevant_expertsnew_relevant_ranksrecv_ranks_actualr+   s                                 @r   get_ep_ranks_with_experts_batchrW   -   s   ( /1.0 ! "333Yz**N$$Mybh??? ^<<^<< 
v   >!12C!D*+<=37HH Z!79M NOO
-j9)*5 NS"(27>22a788;a?#nBUBUAVW
 

 s,--122 	> 	>A*1-/@Q/G3E.//F'c	2L IlFFFMAz'
(;(;<L(4(;(;(=(=f%% 
v   :!12C!D*+<=37HH Z!79M NOO
-j9)*5 NS"(27>22a788;a?#nBUBUAVW
 

 s,--122 	: 	:A*1-/@Q/G3E.//F'c	2L IlFFFMAz'
(;(;<L !!2!6!6vr!B!BCCN! ! ! ! ,! ! ! ):f%% ! + +V***(*f%***(*f%///r   expert_weightsexpert_weights_bufferscuda_streamep_groupc                 0  >?@A |j         |j         k    sJ                                 }t          j        | ft          j                  }t          j        | fdt          j                  }	t          j        | fdt          j                  }
t          j        | fdt          j                  }t          j        | fdt          j                  }|| z  }t          j        | t          j                  }||z   }||         }||         }||k    }|dk    }t          j	        ||d          }t          j
        |t          j        ||                    }d}|dk    }t          j        |          rWt          j        ||         d          \  }}||         }||         }t          |j         d                   }||	d|<   ||
d|<   d}t          j        | |          }t          j        |          r^||         }||         }t          j        |d          \  } }!||!         }"t          | j         d                   }| |d|<   |"|d|<   d||"<   t          j        | |          }#t          |#                                          r|dk    rt          j        |#          d                                         }$t%          t'          |	d|         |
d|                             }%|$D ]_>|>         }&|%                    |&d          }'|'dk    r9t'          ||          D ](\  }(})|)>                             |(|'         d	           )`g }*                                }+fd
t/          |+          D             },|dk    rJ|	d|         }-|
d|         }.t          j        |-d          }/|-|/         }-|.|/         }.t3          |-| ||          \  }0}1t'          |-                                |.                                          D ]\  }&@|0|&         }2|1|&         }3|2r|3st5          |3          t5          |2          z  }4|2                    |          }5|5|4z  }6|6|4z   }7|3|6|7         }8t5          |2          |4z  }9|9|5z   }:|:t5          |3          k     r|8                    |3|:                    |8D ]>|,>         ?|*?@fd|D             z  }*|dk    r|d|         }-|d|         };t          j        |-d          }/|-|/         }-|;|/         };t3          |-| ||          \  }0}1t'          |-                                |;                                          D ]\  }&>|0|&         }2|1|&         }3|2r|3st5          |3          t5          |2          z  }4|3                    |          }:t5          |2          |4z  }9|:|9k     r|2|:|4z           @n|2|:|9z
           @|,@         A|*>Afd|D             z  }*|*rc|at:          j                            |          5  tA          |*          }<|<D ]}=|=!                                 	 ddd           n# 1 swxY w Y   n*|*r(tA          |*          }<|<D ]}=|=!                                 ||tE          ||||          fS )a  
    Rearranges expert weights during EPLB rebalancing.

    Args:
        num_local_experts: Number of local experts.
        old_indices: (num_experts_total,) ndarray of current (old)
            global-to-local expert assignments.
        new_indices: (num_experts_total,) ndarray of desired (new)
            global-to-local assignments after rebalance.
        expert_weights: Original expert weights for the layer.
        expert_weights_buffers: Intermediate buffers (one per tensor).
        cuda_stream: CUDA stream for async copies (can be None for sync mode).
        ep_group: Distributed process group for expert parallel comms.

    Returns:
        is_unchanged (np.ndarray): (num_local_experts,), True where an expert row
            is unchanged after rebalance.
        is_received_locally (np.ndarray): (num_local_experts,), True where a row
            can be updated from local data.
        RecvMetadata: Metadata needed for completing remote weight transfers.
    r#   F)assume_uniquer   Tr&   Nnon_blockingc                 2    i | ]}|t          |          S r   )r
   )r)   rankr[   s     r   
<dictcomp>z"move_to_buffer.<locals>.<dictcomp>   s%    WWWdOHd;;WWWr   stablekindc                 \    g | ](}t          t          j        j        |                   )S r   )r   torchdistributedisend)r)   w
dst_globalsrcs     r   r,   z"move_to_buffer.<locals>.<listcomp>   sH         )/#"   r   c                 \    g | ](}t          t          j        j        |                   )S r   )r   rh   ri   irecv)r)   bdst
src_globals     r   r,   z"move_to_buffer.<locals>.<listcomp>E  sH         %+cF   r   )r   r   r   r   )#shaperb   r   zerosbool_fullint64r1   r0   r2   
logical_orlogical_andr3   r.   r   boolnonzeror:   dictzipr<   copy_r-   r8   argsortrW   r/   indexappendrh   cudastreamr	   waitr   )Br   r   r    rX   rY   rZ   r[   ep_rankr   send_expert_idssend_src_rowsr   r   base
local_rowslocal_globalold_local_expert_idsnew_local_expert_idsis_unchanged	new_validcan_recv_localis_received_locally
send_count	valid_olduniq_experts	first_idxfiltered_rowssrc_rowsr   need_recv_maskdesired_expertsdesired_dstsuniq_recv_expertsuniq_indicesdst_rowseligible_local_buffer_maskdest_indicesexpert_to_src_maprN   	src_localrk   rp   p2p_opsep_sizerank_to_globalexpertssrcsordersend_maprecv_mapranks_to_sendranks_to_recvnum_dst_per_sender
sender_pos
recv_beginrecv_end
recv_ranksremainder_start
recver_posdstsreqsreqrq   rl   rm   rr   sB         `                                                       @@@@r   move_to_bufferr      s   <  11111mmooG"3!5RXFFFg02BbhGGGOG.0"BHEEEMg02BbhGGGOG.0"BHEEEM&&D,BH===J*$L&|4&|4 (+??L %*IW2%  N -bnY?? 
 J$*I	vi ."$) +$#
 #
 #
i #9- ++A.//
'3$%-kzk" J^%8$8)DDN	vn +.~>!.1*,)ORV*W*W*W'<-*0344
'8$%-kzk"&*(#!#?R!S!S &**,,-- 
B*q..z"<==a@GGII ,mKZK.HII
 
   	B 	BC)#.F)--fb99IB0FGG B BDAqcFLL9DLAAAAG mmooGWWWWgWWWN A~~!+:+.[j[)
7222%.E{<	
 
( w~~//?? 	 	KFC$V,M$V,M   !$]!3!3s=7I7I!I&,,W55J#&88J!$66H&z(':;J!-003EEO(:5JC....!!-
";<<<! 	 	+C0
      ,   	 A~~!+:+.[j[)
7222%.E{<	
 
( w~~//?? 	 	KFC$V,M$V,M   !$]!3!3s=7I7I!I&,,W55J!-003EEOO++#J2D$DE#J$@A',J      0   GG  ;*Z{++ 	 	$W--D  



	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 
  )) 	 	CHHJJJJ 	/!+'		
 	
 	
	 	s   )YYYr   r   recv_metadatar   c           	         |j         }|j        }|j        }	|j        }
|j        d         }t          j        ||          }t          j        | |          }t          |	                                          rjt          j
        |          d                                         }|D ];}t          | |          D ](\  }}||                             ||         d           )<|dk    rdS ||z  }||t          j        |t
          j                  z            }t          j        t          j        | |           t          j        | |dk                        }t          |	                                          sdS t          j
        |          d         }||         }|	d|         }|
d|         }t          j        |d          }||         }||         }t          j        ||          }t          j        ||j        d         k     |t          j        ||j        d         d	z
                     |k              }t          |	                                          sdS ||         }|||                  }t          |                                |                                          D ]-\  }} | D ]%}||                             ||          d           &.dS )
a  
    Copies expert weights from communication buffers back to the target weight tensors
    after EPLB rebalancing.

    Args:
        expert_weights: List of the actual MoE layer weights used in the execution.
        expert_weights_buffers: Intermediate buffers containing the experts weights
            after the transfer is completed.
        is_unchanged: (num_local_experts,), True where an expert row is unchanged.
        is_received_locally: (num_local_experts,), True where a row is updated locally.
        recv_metadata: RecvMetadata containing remote receive metadata.
        new_indices: (num_experts_total,) mapping from local rows to desired
            (possibly global) expert id, after rebalance.
        ep_rank: Rank of the process in the expert parallel group.
    r   Tr_   Nr#   r]   rd   re   r%   )r   r   r   r   rs   r   rx   ry   rz   r3   r{   r:   r}   r~   r0   r1   r   searchsortedminimum)!rX   rY   r   r   r   r    r   r   r   r   r   r   	copy_maskdest_mask_npr   rq   rk   rp   r   local_expertsduplicate_maskdup_dst_rowsdup_expertsprim_experts	prim_dstsr   prim_experts_sortedprim_dsts_sortedposvalidmatched_dst_rowsmatched_src_rowsrm   s!                                    r   move_from_bufferr   e  s   0 &7)J#3O!/M$*1- 13DEEI><-;;LL 8z,//299;; 	8 	8CN,BCC 8 81#QsV$77778 Q &&Dry1B"('S'S'S STM^
}':&:;;
))=B+>?? N
 ""$$%% :n--a0L-K";J;/Lkzk*IJ|(333E&u- '
/-{
;
;CN!'**BJs,?,Ea,H1,LMMN	 E
 		 #E*'E
3(//113C3J3J3L3LMM 4 4S 	4 	4AcFLL3dL3333	44 4r   Fold_global_expert_indicesnew_global_expert_indicesexpert_weights_buffer
is_profilelayerrank_mappingc	           	        K   |                                 }	|Yt          |          |                                 k    rt          ||          }n#t          | ||                                           } | j        d         |j        d         k    sJ | j        \  }
}t          |          |
k    sJ t          t          |d                             j        d         }|j        |
|fk    sJ ||	|z  k    sJ |                                                                 }|                                                                }t          |||         ||         ||         |||          \  }}}|||fS )a  
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.

    Returns:
        is_unchanged (np.ndarray): (1, num_local_experts), True where expert
            is left unchanged.
        is_received_locally (np.ndarray): (1, num_local_experts), True where expert
            can be received locally.
        RecvMetadata: Metadata needed for completing remote weight transfers.
    Nr%   r   r   r   r    rX   rY   rZ   r[   )
r-   r/   )_map_new_expert_indices_with_rank_mapping)_map_old_expert_indices_with_rank_mappingrs   nextitercpunumpyr   )r   r   rX   r   r[   r   r   rZ   r   r   num_moe_layersnum_physical_expertsnum_local_physical_expertsold_global_expert_indices_npnew_global_expert_indices_npr   r   r   s                     r   transfer_layerr     s     H mmooG|//(Q)) )%% )R)) )% %*1-1J1PQR1SSSSS+D+J(N(~.0000!%d>!+<&=&=!>!>!DQ!G$*~?S.TTTTT7-G#GGGGG#<#@#@#B#B#H#H#J#J #<#@#@#B#B#H#H#J#J 7E40707%e,48 8 84L%} ,m;;r   c                    |Yt          |          |                                k    rt          ||          }n#t          | ||                                          } | j        d         |j        d         k    sJ | j        \  }}t          |          |k    sJ t          t          |d                             j        d         }|j        ||fk    sJ |                                }	||	|z  k    sJ t          |d                   }
d |
D             }|rit          |d         |          D ]P\  }fdt          |	          D             }t          j                                         t          |||           QdS t          j                                         |                                                                 }|                                                                }t          |          D ]b}t%          |||         ||         ||         |d|          \  }}}t'          ||         ||||||         |                                           cdS )	a  
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.
        rank_mapping: A dictionary mapping old rank to new rank.
    Nr%   r   c                 6    g | ]}t          j        |          S r   )rh   
empty_like)r)   rk   s     r   r,   z4rearrange_expert_weights_inplace.<locals>.<listcomp>7  s0     * * * !* * *r   c                     g | ]}S r   r   )r)   rP   buffers     r   r,   z4rearrange_expert_weights_inplace.<locals>.<listcomp>=  s     @ @ @A @ @ @r   )groupr   )rX   rY   r   r   r   r    r   )r/   r-   r   r   rs   r   r   listr}   r8   rh   ri   barrierr   r   synchronizer   r   r   r   rb   )r   r   rX   r[   r   r   r   r   r   r   first_layer_weightsweights_bufferweightdummy_recv_bufferold_global_expert_indices_cpunew_global_expert_indices_cpu	layer_idxr   r   r   r   s                       @r    rearrange_expert_weights_inplacer     s   6 |//(Q)) )%% )R)) )% %*1-1J1PQR1SSSSS+D+J(N(~.0000!%d>!+<&=&=!>!>!DQ!G$*~?S.TTTTTmmooG7-G#GGGGG~a011* *%8* * *N  
!."3^DD 	 	NFF @ @ @ @w @ @ @%%'''!    
 	 
J$=$A$A$C$C$I$I$K$K!$=$A$A$C$C$I$I$K$K!>** 
 
	;I85i@5i@))4#1<
 <
 <
8)= 	))4#1% 3'5i@MMOO	
 	
 	
 	
 	

 
r   new_ep_sizec                 x   | j         \  }}|s
J d            t          |          }||z  }||z  }t          j        ||fd| j        | j                  }t          |          D ]V}	|                    |	          }
|
=|
dk    r7|
|k     r1|	|z  }|	dz   |z  }|
|z  }|
dz   |z  }| dd||f         |dd||f<   W|S )a  
    Map the old global expert indices to the new global expert indices.

    Args:
        old_global_expert_indices:
            Shape (num_layers, old_ep_size * num_local_physical_experts).
        rank_mapping: Mapping from old rank to new rank.
        new_ep_size: New expert parallelism size.

    Returns:
        Mapped expert indices with shape
        (num_layers, new_ep_size * num_local_physical_experts).
    Rank mapping is requiredr]   
fill_valuer$   deviceNr   r%   )rs   r/   rh   rv   r$   r   r8   r<   )r   r   r   
num_layersold_num_physical_expertsold_ep_sizer   new_num_physical_expertsmapped_expert_indicesold_ranknew_rankold_start_idxold_end_idxnew_start_idxnew_end_idxs                  r   r   r   c  s/   $ ,E+J(J(33333< l##K!9[!H*-GG "J	-.'-(/	   +&&  ##H--HMMh6L6L$'AAM#a<+EEK$'AAM#a<+EEK *!!!];-F*FG "!!!];%>">? ! r   c                    | j         \  }}|s
J d            t          |          }t          d |                                D                       }||z  }||z  }t	          j        ||fd| j        | j                  }t          |          D ]G}	||	         }
|
dk    r7|
|k     r1|	|z  }|	dz   |z  }|
|z  }|
dz   |z  }| d d ||f         |d d ||f<   H|S )Nr   c              3   "   K   | ]
}|d k    V  dS )r]   Nr   )r)   r   s     r   	<genexpr>z<_map_new_expert_indices_with_rank_mapping.<locals>.<genexpr>  s&      KKh"nKKKKKKr   r]   r   r   r%   )	rs   r/   sumvaluesrh   rv   r$   r   r8   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   s                  r   r   r     sG    ,E+J(J(33333< l##KKK\5H5H5J5JKKKKKK!9[!H*-GG!J	-.'-(/	   +&& 
 
)q==X33$'AAM#a<+EEK$'AAM#a<+EEK *!!!];-F*FG "!!!];%>">? ! r   )r   r   r   )Fr   NN)FN)&r   collections.abcr   r   dataclassesr   r   r   rh   torch.distributedr   r   r   r	   r
   vllm.loggerr   r   loggerr   tupler   MoveToBufferResultr   r|   r   rW   Tensorr   Streamr   r   rz   r   r   r   r   __all__r   r   r   <module>r     sq    / . . . . . . . ! ! ! ! ! !                   $ # # # # #	X		 
O 
O 
O 
O 
O 
O 
O 
O 2:rz<?@ h0
h0h0 h0 	h0
 4T#Yc49n!556h0 h0 h0 h0VJJJ J U\*	J
 %U\2J "T)J J J J J JZL4U\*L4 .L4 *L4 	L4
  L4 L4 L4 
L4 L4 L4 L4j ,0*.G< G<$|G<$|G< Xel34G< $EL1	G<
 G< G< G< "T)G< sCx.4'G< G< G< G< G<^ *.b
 b
$|b
$|b
 Xel34b
 	b

 b
 sCx.4'b
 
b
 b
 b
 b
J2!$|2!sCx.2! 2! \	2! 2! 2! 2!j !$| !sCx. ! \ !  !  !  !F A
@
@r   