
    .`i"                        d dl Zd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ  ee          ZdefdZded	ed
ededededej        fdZdej        dedefdZdej        d	edej        fdZdej        defdZdededededededeeej        dz  ef         fdZ	 	 	 	 ddedededededz  dedz  dej        dz  dedeeej        dz  ef         fdZdS )    N)ParallelConfig)get_dp_group)init_logger)check_ubatch_thresholdsis_last_ubatch_emptyparallel_configc                     t                      j        }t                      j        }| j        r/t                              d           d}t                      j        }||fS )Nz=Using CPU all reduce to synchronize DP padding between ranks.cpu)r   devicedevice_group#disable_nccl_for_dp_synchronizationlogger	info_once	cpu_group)r   r   groups      k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/v1/worker/dp_utils.py_get_device_and_groupr      s_    ^^"FNN'E : )K	
 	
 	
 (5=    should_ubatchshould_dp_padorig_num_tokens_per_ubatchpadded_num_tokens_per_ubatchcudagraph_modereturnc                 4   |j         }|j        }t          |          \  }}	t          j        d||t          j                  }
||
d         |<   ||
d         |<   | rdnd|
d         |<   |rdnd|
d         |<   ||
d         |<   t          j        |
|	           |
S )	N   r   dtyper               )r   )data_parallel_sizedata_parallel_rankr   torchzerosint32dist
all_reduce)r   r   r   r   r   r   dp_sizedp_rankr   r   tensors              r   _run_arr-   &   s     0G0G)/::MFE[GF%+FFFF3F1Ig5F1Ig+2F1Ig+2F1Ig'F1IgOF%((((Mr   r,   num_ubatchesc                    | dd d f         }| dd d f         }t          t          j        | d         dk                                                        }|sdS t	          |                                                                          }t	          |                                                                          }t          |||          rt          	                    d||           d}|S )Nr   r   r    FzAborting ubatching %s %s)
boolr%   allitemintminmaxr   r   debug)r,   r.   orig_num_tokens_tensorpadded_num_tokens_tensorr   orig_min_num_tokenspadded_max_num_tokenss          r   _post_process_ubatchr;   ;   s    #AqqqD\%ad| uya88==??@@M u 488::??AABB 8 < < > > C C E EFF/1FUU &(;=R	
 	
 	
 r   c                    | dd d f         }|ret          |                                                                          }t          j        |gt          |          z  dt          j                  S |                                S )Nr   r
   r   )r3   r5   r2   r%   r,   lenr'   r
   )r,   r   num_tokens_across_dpmax_num_tokenss       r   _post_process_dp_paddingr@   O   s    !!QQQ$< 
* 15577<<>>??|s#7888+
 
 
 	
 $'')))r   c                 |    t          | dddf                                                                                   S )z
    Synchronize cudagraph_mode across DP ranks by taking the minimum.
    If any rank has NONE (0), all ranks use NONE.
    This ensures all ranks send consistent values (all padded or all unpadded).
    r"   N)r3   r4   r2   )r,   s    r   _post_process_cudagraph_moderB   ^   s6     vad|!!&&(()))r   num_tokens_unpaddednum_tokens_paddedshould_attempt_ubatchingshould_attempt_dp_paddingc                 |   || k    sJ t          ||| |||          }t          t          j        |d         dk                                                        }||k    sJ t          ||j                  }|r |st                              dd           d}t          ||          }	t          |          }
||	|
fS )a  
    1. Decides if each DP rank is going to microbatch. Either all ranks
    run with microbatching or none of them do.

    2. Determines the total number of tokens that each rank will run.
    When running microbatched or if should_attempt_dp_padding is True, all
    ranks will be padded out so that the run with the same number of tokens

    3. Synchronizes cudagraph_mode across ranks by taking the minimum.

    Returns: tuple[
        should_ubatch: Are all DP ranks going to microbatch
        num_tokens_after_padding: A tensor containing the total number of
        tokens per-microbatch for each DP rank including any DP padding.
        synced_cudagraph_mode: The synchronized cudagraph mode (min across ranks)
    ]

    )r   r   r   r   r   r   r!   r   zzMicrobatching has been triggered and requires DP padding. Enabling DP padding even though it has been explicitly disabled.global)scopeT)r-   r0   r%   r1   r2   r;   r.   r   
debug_oncer@   rB   )rC   rD   rE   rF   r   r   r,   r   r   num_tokens_after_paddingsynced_cudagraph_modes              r   _synchronize_dp_ranksrM   g   s   4  33333
 ./#6%6%'  F 6!9>227799::M %5555 )1MNNM ]  	 	 	
 	
 	
   8    9@@24IIIr   allow_microbatchingallow_dp_paddinguniform_decode num_scheduled_tokens_per_requestc                     |j         dk    rdd|fS d}|r|J t          || |          }|| }t          | |||||          \  }	}
}|	|
|fS )a  
    Coordinates amongst all DP ranks to determine if and how the full batch
    should be split into microbatches.

    Args:
        num_tokens_unpadded: Number of tokens without accounting for padding
        allow_microbatching: If microbatching should be attempted
        allow_dp_padding: If all DP ranks should be padded up to the same value
        parallel_config: The parallel config
        num_tokens_padded: Number of tokens including any non-DP padding (CUDA graphs,
            TP, etc)
        uniform_decode: Only used if allow_microbatching is True. True if the batch
            only contains single token decodes
        num_scheduled_tokens_per_request: Only used if allow_microbatching is True. The
            number of tokens per request.
        cudagraph_mode: The cudagraph mode for this rank (0=NONE, 1=PIECEWISE, 2=FULL)

    Returns: tuple[
        ubatch_slices: if this is set then all DP ranks have agreed to
        microbatch
        num_tokens_after_padding: A tensor containing the total number of
        tokens per-microbatch for each DP rank including padding. Will be
        padded up to the max value across all DP ranks when allow_dp_padding
        is True.
        synced_cudagraph_mode: The synchronized cudagraph mode (min across ranks)
    ]

    r   FN)rP   )r#   r   rM   )rC   rN   rO   r   rD   rP   rQ   r   rE   r   rK   rL   s               r   coordinate_batch_across_dprS      s    L )Q..dN**  % 
)))#:)$
 $
 $
   / 	$	
 	
 E],.C 35JKKr   )NNNr   )numpynpr%   torch.distributeddistributedr(   vllm.configr   vllm.distributed.parallel_stater   vllm.loggerr   vllm.v1.worker.ubatch_utilsr   r   __name__r   r   r0   r3   Tensorr-   r;   r@   rB   tuplerM   ndarrayrS    r   r   <module>ra      s  
                  & & & & & & 8 8 8 8 8 8 # # # # # #       
 
X		>    $ !$ #&	
  $ \   * S T    (*U\ *$ *5< * * * ** *# * * * *CJCJCJ #CJ  $	CJ
 CJ $CJ 4$c)*CJ CJ CJ CJV %)"&:>CL CLCLCL CL $	CL
 TzCL 4KCL ')j4&7CL CL 4$c)*CL CL CL CL CL CLr   