
    -`i%                       U d Z ddlZddlZddlZddlZddlmZ ddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlZddlZddlmc mZ ddlZddlmZmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z* ddl+m,Z, ddl-m.Z. e G d d                      Z/ edg d          Z0de1e2ej3        ez  f         de4e5e4e2ef                  e5ej3                 f         fdZ6i Z7e1e2e8f         e9d<   de2de2fdZ:i Z;e1e2eg ed         f         f         e9d<   dd Z<d!ej3        d"e2dej3        fd#Z=d!ej3        d"e2dej3        fd$Z>d!ej3        d%e8d&e8d"e2dej3        f
d'Z?d!ej3        d%e8d&e8d"e2dej3        f
d(Z@d!ej3        d%e8d&e8d"e2dej3        f
d)ZAd!ej3        d%e8d&e8d"e2dej3        f
d*ZB	 	 	 	 dd,ej3        d-ej3        d.ej3        d/ej3        d0e2d1e8d2e8d"e2d3e5e8         d4ej3        dz  d5ej3        dz  d6ejC        dz  d7eDdej3        fd8ZE	 	 	 	 dd,ej3        d-ej3        d.ej3        d/ej3        d0e2d1e8d2e8d"e2d3e5e8         d4ej3        dz  d5ej3        dz  d6ejC        dz  d7eDdej3        fd9ZF e.d:e=e>;            e.d<e?e@;            e.d=eAeB;            e.d>eFeE;            G d? d          ZGdaHeGdz  e9d@<   daIeGdz  e9dA<   daJe8dz  e9dB<   deGfdCZKdeGfdDZLdEe5e8         dFe8dGe2deGfdHZM	 	 	 ddJe5e5e8                  dFe8dGe2dKeDd"e2dz  dLeDdeGfdMZNdaOeGdz  e9dN<   deGfdOZPdaQeGdz  e9dP<   deGfdQZReRZSdaTeGdz  e9dR<   deGfdSZUdaVeGdz  e9dT<   deGfdUZWdaXeGdz  e9dV<   deGfdWZYdaZeGdz  e9dX<   deGfdYZ[e	dZej\        fd[            Z] e&e^          Z_dIa`d\eDfd]Za	 	 	 	 	 	 dd&e8dae8dbe2dFe8dGe2dcedz  fddZb	 	 	 	 	 ddfe8dge8dhe8die8dz  dGe2dz  ddfdjZc	 	 	 ddfe8dge8dhe8die8dz  dGe2dz  ddfdkZddleje        jf        fdmZgdn Zhd+aie	doeGfdp            Zjde8fdqZkde8fdrZlde8fdsZmde8fdtZnde8fduZodv Zpdw ZqddxeDfdyZr	 ddzee$z  d{e8de5eD         fd|ZsdeDfd}ZtdeDfd~Zudzee$z  de8fdZvdS )a  vLLM distributed state.
It takes over the control of the distributed environment from PyTorch.
The typical workflow is:

- call `init_distributed_environment` to initialize the distributed environment.
- call `initialize_model_parallel` or `ensure_model_parallel_initialized` to
 initialize the model parallel groups.

- any code dealing with the distributed stuff

- call `destroy_model_parallel` to destroy the model parallel groups.
- call `destroy_distributed_environment` to destroy the distributed environment.

If you only need to use the distributed environment without model/pipeline
 parallelism, you can skip the model parallel initialization and destruction
 steps.
    N)
namedtuple)Callable)contextmanagernullcontext)	dataclass)	timedelta)shared_memory)AnyOptional)patch)BackendProcessGroup)DeviceCommunicatorBase)StatelessProcessGroup)init_logger)resolve_obj_by_qualname)get_distributed_init_method)suppress_stdout)direct_register_custom_opc                   .    e Zd ZU ej        j        ed<   dS )GraphCaptureContextstreamN)__name__
__module____qualname__torchcudaStream__annotations__     s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/distributed/parallel_state.pyr   r   ;   s#         Jr!   r   TensorMetadata)devicedtypesizetensor_dictreturnc           
      f   g }g }|                                  D ]\  }}t          |t          j                  r_|j        j        }|                    |t          ||j        |	                                          f           |                    |           ~|                    ||f           ||fS )zSplit the tensor dictionary into two parts:
    1. A list of (key, value) pairs. If the value is a tensor, it is replaced
         by its metadata.
    2. A list of tensors.
    )
items
isinstancer   Tensorr$   typeappendr#   r%   r&   )r'   metadata_listtensor_listkeyvaluer$   s         r"   _split_tensor_dictr3   C   s     ,.M&(K!'')) / /
UeU\** 	/
 \&F  nVU[%**,,GGH   u%%%%  #u....+%%r!   _group_name_counternamec                 z    | t           vr
dt           | <   |  dt           |           }t           | xx         dz  cc<   |S )z|Get a unique name for the group.
    Example:
    _get_unique_name("tp") -> "tp:0"
    _get_unique_name("tp") -> "tp:1"
    r   :   )r4   )r5   newnames     r"   _get_unique_namer:   `   sU     &&&$%D!33+D133G"Nr!   GroupCoordinator_groupsgroupc                 H    t          j        |           t          | j        <   d S N)weakrefrefr<   unique_namer=   s    r"   _register_grouprD   p   s    !(U!3!3GEr!   tensor
group_namec                     |t           v sJ d| d            t          |                     }|t          d| d          |                    |           S NzGroup z is not found.z is destroyed.)r<   
ValueError_all_reduce_out_place)rE   rF   r=   s      r"   
all_reducerK   t   sj       "E:"E"E"E   J!!E}<*<<<===&&v...r!   c                 *    t          j        |           S r?   )r   
empty_like)rE   rF   s     r"   all_reduce_fakerN   |   s    F###r!   dim
world_sizec                     |t           v sJ d| d            t          |                     }|t          d| d          |                    | |          S rH   )r<   rI   _reduce_scatter_out_placerE   rO   rP   rF   r=   s        r"   reduce_scatterrT      sn        "E:"E"E"E   J!!E}<*<<<===**63777r!   c                     t          | j                  }| j        |         |z  ||<   t          j        || j        | j                  S Nr%   r$   listshaper   emptyr%   r$   rE   rO   rP   rF   	new_shapes        r"   reduce_scatter_faker^      sC     V\""I\#&*4IcN;yV]KKKKr!   c                     |t           v sJ d| d            t          |                     }|t          d| d          |                    | |          S rH   )r<   rI   _all_gather_out_placerS   s        r"   
all_gatherra      sn        "E:"E"E"E   J!!E}<*<<<===&&vs333r!   c                     t          | j                  }| j        |         |z  ||<   t          j        || j        | j                  S rV   rX   r\   s        r"   all_gather_fakerc      sC     V\""I\#&3IcN;yV]KKKKr!   FABA_scaleB_scale	reduce_oporig_scatter_dimscatter_dim_after_maybe_reshapeoutput_shapebiasresult_scale	out_dtypeuse_fast_accumc           
         |                                 dk    ri|j        d d         | j        d d         k    r t          d| j         d|j         d          |                    dd                                          }n8|                                 dk    r t          d| j         d|j         d          t          j        |                     dd                                          ||||	|
||          } |j        g |d d         |j        d         R  }t          j	        ||||          }t          j
        |          }|S )	Nr8   z]For row-wise scaling, the leading dims of A_scale must match the leading dims of A (A shape: z, A_scale shape: )r   z Invalid A_scale shape (A shape: )numelrZ   rI   flatten
contiguousr   
_scaled_mmviewfuncolreduce_scatter_tensorwait_tensor)rd   re   rf   rg   rh   ri   rj   rF   rk   rl   rm   rn   ro   Cress                  r"   /patched_fused_scaled_matmul_reduce_scatter_faker~      s   " }}=""--HWH H7>}H H H  
 //!R((3355	A		DD D3:=D D D
 
 	

 				!R##%%			 		A 	.SbS!.171:...A

&		 C 
S
!
!CJr!   c                 b    t           j        j                            | |||||||||	|
||          S r?   )r   opssymm_mem"fused_scaled_matmul_reduce_scatter)rd   re   rf   rg   rh   ri   rj   rF   rk   rl   rm   rn   ro   s                r"   *patched_fused_scaled_matmul_reduce_scatterr      sH     9@@		'  r!   rK   )op_nameop_func	fake_implrT   ra   r   c                      e Zd ZU dZeed<   ee         ed<   eed<   eed<   eed<   eed<   eed<   ed	z  ed
<   e	d	z  ed<   	 	 dMdeee                  dede
ez  dedede
d	z  fdZ	 dNdZ	 dOdZed             Zed             Zed             Zed             Zed             Zed             ZedPded	z  fd            Zdej        d ej        fd!Zdej        d ej        fd"ZdQdej        d$ed ej        fd%Zdej        d$ed ej        fd&Z	 	 dRdej        eej                 z  d$ed'ee         d	z  fd(Z dQdej        d$ed ej        fd)Z!	 dSdej        d$ed'ee         d	z  d ej        fd*Z"dej        d$ed ej        fd+Z#	 dTdej        d,ed$ed ej        d	z  fd-Z$dUdej        d.efd/Z%dVd0e	d	z  d.efd1Z&	 dRd2ee	         d.ed3ed	z  fd4Z'd0e	d,ed d	fd5Z(d.ed e	fd6Z)	 	 	 	 dWd7e*e
ej        e	z  f         d	z  d.ed3ed	z  d8ed	z  d e*e
ej        e	z  f         d	z  f
d9Z+	 	 	 dXd7e*e
ej        e	z  f         d,ed	z  d:e,d          d;e*e
ef         d	z  d e*e
ej        e	z  f         d	z  f
d<Z-	 	 	 dXd.ed	z  d:e,d          d;e*e
ef         d	z  d e*e
ej        e	z  f         d	z  fd=Z.d> Z/dPd?ej        d,ed	z  d d	fd@Z0	 dPdAej1        dBej2        d.ed	z  d ej        fdCZ3dD Z4dEej5        j6        fdFZ7	 	 dMdGej        dHej        dIedJeej                 d	z  d e8ej        ej        f         e8ej        ej        eej                 f         z  f
dKZ9	 dYdIed ej        fdLZ:d	S )Zr;   aR  
    PyTorch ProcessGroup wrapper for a group of processes.
    PyTorch ProcessGroup is bound to one specific communication backend,
        e.g. NCCL, Gloo, MPI, etc.
    GroupCoordinator takes charge of all the communication operations among
        the processes in the group. It manages both CPU and device
        communication.
    rankranksrP   
local_rankrank_in_group	cpu_groupdevice_groupNdevice_communicatormq_broadcasterFgroup_rankstorch_distributed_backenduse_device_communicatoruse_message_queue_broadcasterrF   c                    |pd}t          |          | _        t          |            t          j                                        | _        || _        d }d }|D ]}	t          j                            |	|          }
t                      5  t          j                            |	d          }d d d            n# 1 swxY w Y   | j        |	v r>|	| _
        t          |	          | _        |	                    | j                  | _        |
}|}|J |J || _        || _        ddlm} |                                rt          j        d|           | _        n|                                rt          j        d|           | _        nQ|                                r$t          j        |j         d|           | _        nt          j        d	          | _        || _        d | _        |rT| j        d
k    rIt5          |                                          } || j        | j        | j        | j                  | _        ddlm} d | _        |r,| j        d
k    r!|                    | j        dd          | _        ddlm} |                                p|                                 | _!        |"                                otG          t          j$        j%        d          | _&        d S )N	anonymous)backendgloor   current_platformzcuda:zxpu:r7   cpur8   )r   r$   r   rB   MessageQueue  @    init_shm_manager)'r:   rB   rD   r   distributedget_rankr   r   	new_groupr   r   lenrP   indexr   r   r   vllm.platformsr   is_cuda_aliker$   is_xpuis_out_of_treedevice_namer   r   r   get_device_communicator_cls3vllm.distributed.device_communicators.shm_broadcastr   r   create_from_process_groupis_tpuuse_custom_op_callis_cpuhasattrr   _Cuse_cpu_custom_send_recv)selfr   r   r   r   r   rF   self_device_groupself_cpu_groupr   r   r   r   device_comm_clsr   s                  r"   __init__zGroupCoordinator.__init__1  sx     .;
+J77%..00	$   	+ 	+E ,668 7  L
 !"" O O!-77v7NN	O O O O O O O O O O O O O O OyE!!"
"%e**%*[[%;%;"$0!!*))) ,,,'-333333))++ 	.,';z';';<<DKK$$&& 	.,':j':':;;DKK,,.. 	.,*:*F'U'U'U'UVVDKK,u--DK'>$#' " 		t':':5 <<>> O (7.{!. ,	( ( (D$ 	UTTTTT37( 	T_q-@-@"."H"H# #D 	433333 **,,I0@0G0G0I0I 	 )9(?(?(A(A )
gIL,G
 G
%%%s   	"B77B;	>B;	r   Tc                 N    ddl m} |                    | j        dd|||          S )Nr   r   r   r   )writer_rankexternal_writer_handleblocking)r   r   r   r   )r   r   r   r   r   s        r"   create_mq_broadcasterz&GroupCoordinator.create_mq_broadcaster  sH     	UTTTTT55N##9 6 
 
 	
r!   c                 b    ddl m} |                    | j        dd| j        |         |          S )Nr   r   r   r   )reader_rankr   )r   r   'create_from_process_group_single_readerr   r   )r   reader_rank_in_groupr   r   s       r"   $create_single_reader_mq_broadcastersz5GroupCoordinator.create_single_reader_mq_broadcasters  sN     	UTTTTTCCN
#78 D 
 
 	
r!   c                     | j         d         S )z8Return the global rank of the first process in the groupr   r   r   s    r"   
first_rankzGroupCoordinator.first_rank  s     z!}r!   c                     | j         d         S )z7Return the global rank of the last process in the grouprq   r   r   s    r"   	last_rankzGroupCoordinator.last_rank  s     z"~r!   c                 "    | j         | j        k    S )z;Return whether the caller is the first process in the group)r   r   r   s    r"   is_first_rankzGroupCoordinator.is_first_rank  s     yDO++r!   c                 "    | j         | j        k    S )z:Return whether the caller is the last process in the group)r   r   r   s    r"   is_last_rankzGroupCoordinator.is_last_rank  s     yDN**r!   c                 D    | j         }| j        }| j        |dz   |z           S )z=Return the global rank of the process that follows the callerr8   r   rP   r   r   r   rP   s      r"   	next_rankzGroupCoordinator.next_rank  *     *_
z=1,
:;;r!   c                 D    | j         }| j        }| j        |dz
  |z           S )z>Return the global rank of the process that precedes the callerr8   r   r   s      r"   	prev_rankzGroupCoordinator.prev_rank  r   r!   graph_capture_contextc              #   >  K   |.t           j                                        }t          |          }n|j        }t                      }ddlm} | j        9t          | j        |          sJ | j        j
        }||                                }t           j                                        }||k    r|                    |           t           j                            |          5  |5  |V  d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S )Nr   )CudaCommunicator)r   r   r   r   r   r   7vllm.distributed.device_communicators.cuda_communicatorr   r   r+   ca_commcapturecurrent_streamwait_stream)r   r   r   maybe_ca_contextr   r   curr_streams          r"   graph_capturezGroupCoordinator.graph_capture  s      (Z&&((F$7$?$?!!*1F '==	
 	
 	
 	
 	
 	
 #/d68HIIIII.6G"#*??#4#4  j//11&  {+++Zv&& 	( 	((8 	( 	(''''	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s6   &D)C:.D:C>	>DC>	DDDinput_r(   c                     | j         dk    r|S | j        r+t          j        j                            || j                  S |                     |          S )a^  
        User-facing all-reduce function before we actually call the
        all-reduce operation.

        We need this because Dynamo does not support passing an arbitrary
        object (`self` in this case) to a custom op. We need to pass the
         group name as a string, and then look up the group coordinator from
         the group name, dispatch the all-reduce operation to the group
         coordinator.

        In addition, PyTorch custom ops do not support mutation or returning
        a new tensor in the same op. So we always make the all-reduce operation
        out-of-place.
        r8   rF   )rP   r   r   r   vllmrK   rB   rJ   r   r   s     r"   rK   zGroupCoordinator.all_reduce  sU      ?aM" 	69>,,V@P,QQQ--f555r!   c                 b    | j         t          d          | j                             |          S NNo device communicator found)r   rI   rK   r   s     r"   rJ   z&GroupCoordinator._all_reduce_out_place  s1    #+;<<<'226:::r!   rq   rO   c                 `   | j         }|dk    r|S |                                 |cxk    r|                                k     s$n J d| d|                                             | j        r-t          j        j                            |||| j                  S | 	                    ||          S Nr8   zInvalid dim (z) for input tensor with shape r   )
rP   rO   r&   r   r   r   r   ra   rB   r`   r   r   rO   rP   s       r"   ra   zGroupCoordinator.all_gather  s    _
??M

}2222fjjll22222NCNNv{{}}NN 322 " 	;9>,,ZD4D -    --fc:::r!   c                 d    | j         t          d          | j                             ||          S r   )r   rI   ra   r   r   rO   s      r"   r`   z&GroupCoordinator._all_gather_out_place  s3    #+;<<<'2263???r!   sizesc                 f    | j         t          d          | j                             |||          S r   )r   rI   all_gathervr   r   rO   r   s       r"   r   zGroupCoordinator.all_gatherv  s7     #+;<<<'33FCGGGr!   c                 `   | j         }|dk    r|S |                                 |cxk    r|                                k     s$n J d| d|                                             | j        r-t          j        j                            |||| j                  S | 	                    ||          S r   )
rP   rO   r&   r   r   r   r   rT   rB   rR   r   s       r"   rT   zGroupCoordinator.reduce_scatter  s    _
??M

}2222fjjll22222NCNNv{{}}NN 322 " 	?9>00ZD4D 1    11&#>>>r!   c                 f    | j         t          d          | j                             |||          S r   )r   rI   reduce_scattervr   s       r"   r   z GroupCoordinator.reduce_scatterv*  s7     #+;<<<'77UKKKr!   c                 d    | j         t          d          | j                             ||          S r   )r   rI   rT   r   s      r"   rR   z*GroupCoordinator._reduce_scatter_out_place1  s3    #+;<<<'66vsCCCr!   dstc                     | j         }|dk    r|S | j        t          d          | j                            |||          S )z
        NOTE: We assume that the input tensor is on the same device across
        all the ranks.
        NOTE: `dst` is the local rank of the destination rank.
        r8   Nr   )rP   r   rI   gather)r   r   r   rO   rP   s        r"   r   zGroupCoordinator.gather6  sL     _
??M#+;<<<'..vsC@@@r!   srcc                     || j         k     sJ d| d            | j         dk    r|S t          j                            || j        |         | j                   |S )z^Broadcast the input tensor.
        NOTE: `src` is the local rank of the source rank.
        Invalid src rank (rr   r8   r   r=   )rP   r   r   	broadcastr   r   )r   r   r   s      r"   r   zGroupCoordinator.broadcastF  sv     T_$$$&A3&A&A&A$$$ ?aM##
3t/@ 	$ 	
 	
 	
 r!   objc                    || j         k     sJ d| d            | j         dk    r|S | j        *|dk    s
J d            | j                            |          S | j        |k    r5t          j                            |g| j        |         | j                   |S dg}t          j                            || j        |         | j                   |d         S )z^Broadcast the input object.
        NOTE: `src` is the local rank of the source rank.
        r   rr   r8   Nr   z-Message queue broadcaster only supports src=0r   )	rP   r   broadcast_objectr   r   r   broadcast_object_listr   r   )r   r   r   recvs       r"   r   z!GroupCoordinator.broadcast_objectU  s     T_$$$&A3&A&A&A$$$ ?aJ*!888L888&77<<<$$334:c?$. 4    J6D33$*S/ 4    7Nr!   obj_listr=   c                     || j         k     sJ d| d            | j         dk    r|S t          j                            || j        |         | j                   |S )zcBroadcast the input object list.
        NOTE: `src` is the local rank of the source rank.
        r   rr   r8   r   )rP   r   r   r   r   r   )r   r   r   r=   s       r"   r   z&GroupCoordinator.broadcast_object_listm  sv     T_$$$&A3&A&A&A$$$ ?aO//$*S/1B 	0 	
 	
 	
 r!   c                    	 || j         k     sJ d| d            || j        k    s
J d            t          j        t	          j        |          t          j                  }t          j        |                                gt          j	        d          }t          j
                            || j        |         | j                   t          j
                            || j        |         | j                   dS )	z3Send the input object list to the destination rank.Invalid dst rank (rr   zKInvalid destination rank. Destination rank is the same as the current rank.)r%   r   rW   r   r=   N)rP   r   r   
frombufferpickledumpsuint8rE   rt   longr   sendr   r   )r   r   r   object_tensorsize_tensors        r"   send_objectzGroupCoordinator.send_object~  s    DT_$$$&A3&A&A&A$$$d((((# )(( (c):):%+NNNl  ""#5:e
 
 
 	{
3t~VVV 	}$*S/XXXtr!   c                 d   	 || j         k     sJ d| d            || j        k    s
J d            t          j        dt          j        d          }t          j                            || j        |         | j                  }t          j        |	                                t          j
        d          }t          j                            || j        |         | j                  }||k    s
J d            t          j        |                                                                          }|S )	z3Receive the input object list from the source rank.r   rr   zAInvalid source rank. Source rank is the same as the current rank.r8   r   rW   r   z@Received object sender rank does not match the size sender rank.)rP   r   r   r[   r	  r   r   r   r   itemr  r  loadsnumpytobytes)r   r   r  	rank_sizer  rank_objectr   s          r"   recv_objectzGroupCoordinator.recv_object  s<   ?T_$$$&A3&A&A&A$$$d((((O )(( k!5:eDDD %**TZ_DN + 
 
	
 +
 
 
 ',,tz#dn - 
 
 i'''N ('' l=..0088::;;
r!   r'   metadata_groupc                    t           j                                        r| j        dk    r|S | j        }| j        }|| j        k     sJ d| d            | j        }||k    rg }t          |t                    sJ dt          |                       t          |          \  }}|                     ||           g }|D ]}	|	                                dk    r|	j        r/t           j                            |	| j        |         |d          }
n.t           j                            |	| j        |         |d          }
|                    |
           |D ]}|                                 n|                     d	|          }i }g }|D ]\  }}t          |t$                    rt          j        |j        |j        |j        
          }	|	                                dk    r|	||<   ^|	j        r/t           j                            |	| j        |         |d          }
n.t           j                            |	| j        |         |d          }
|                    |
           |	||<   |||<   |D ]}|                                 |S )ziBroadcast the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.
        r8   r   rr   Expecting a dictionary, got r   r   T)r   r=   async_opNrW   )r   r   is_initializedrP   r   r   r   r+   dictr-   r3   r   rt   r   r   r   r.   waitr#   r[   r&   r%   r$   )r   r'   r   r=   r  r   r/   r0   async_handlesrE   handleasync_handler1   r2   s                 r"   broadcast_tensor_dictz&GroupCoordinator.broadcast_tensor_dict  s     //11 	T_5I5I!T_$$$&A3&A&A&A$$$*C35Mk400  BtK/@/@BB 0 *<K)H)H&M; !!-S!999M% - -<<>>Q&&= 	".88DJsO>TX 9  FF
 #.88DJsO54 9  F $$V,,,, - $ $!!####$ !11$C1@@MKM+ - -
Ue^44 -"[
%+el  F ||~~**+1C( } !&!2!<!<" $
3"0%)	 "= " " "'!2!<!<"
3ut "= " " "((000'-K$$',K$$ - $ $!!####r!   all_gather_groupall_gather_tensorsc                 v   t           j                                        r| j        dk    r|S |dn|j        }|dn|j        }| j        }| j        }|| j        dz   | j        z  }|| j        k     sJ d| d            | j        r3| j        t          d          | j        
                    ||           dS g }	t          |t                    sJ dt          |                       t          |          \  }	}
|                     |	|           d	 |                                D             }t#          |          t#          |
          k    sJ t%          ||
          D ]\  }}|                                dk    r|duo|                                |z  dk    }|r|                    ||          n|}|r|                    |d
          |         }|j        r.t           j                            || j        |         |           t           j                            || j        |         |           dS )a  Send the input tensor dictionary.
        NOTE: `dst` is the local rank of the source rank.

        all_gather_group: The group for the all-gather operation. If provided,
            an optimization is enabled where each rank in the group sends a
            slice of a tensor and the receiver reconstructs it using an
            all-gather, which can improve performance. This is typically the
            tensor-parallel group.
        all_gather_tensors: A dictionary to specify which tensors should use
            the all-gather optimization, which is only effective when
            `all_gather_group` is provided. By default, this optimization is
            on for any tensor whose size is divisible by the
            `all_gather_group`'s world size. However, it should be disabled
            for tensors that are not fully replicated across the group (e.g.,
            the residual tensor when sequence parallelism is enabled). This
            dictionary allows overriding the default behavior on a per-tensor
            basis.
        r8   Nr   r  rr   r   r  )r   c                 L    g | ]!\  }}t          |t          j                  |"S r    )r+   r   r,   ).0kvs      r"   
<listcomp>z5GroupCoordinator.send_tensor_dict.<locals>.<listcomp>I  s-    XXXTQJq%,<W<WXqXXXr!   rq   r  )r   r   r  rP   r   r   r   r   r   rI   send_tensor_dictr+   r  r-   r3   r  r*   r   ziprt   getreshaper   r
  r   )r   r'   r   r"  r#  all_gather_sizeall_gather_rankr=   r  r/   r0   tensor_keysr1   rE   use_all_gathers                  r"   r*  z!GroupCoordinator.send_tensor_dict  s   4  //11 	T_5I5I/7!!=M=X!)AA/?/M 	 !;%)T_<CT_$$$&A3&A&A&A$$$( 	'/ !?@@@$55S   4/1+t,, 	
 	
>4+<+<>>	
 	
, &8%D%D"{ 	C000XX[%6%6%8%8XXX;3{#3#33333{K88 	Q 	QKC||~~"" !,V/1QUV1V 
 &$"&&sN;;;# 
  N<<_M} Q!&&
3~ '    
 !&&v4:c?%&PPPPtr!   c                 X   t           j                                        r| j        dk    rdS |dn|j        }|dn|j        }| j        }| j        }|| j        dz
  | j        z  }|| j        k     sJ d| d            | j        r0| j        t          d          | j        
                    |          S |                     |          }i }	|D ]Z\  }
}t          |t                    r9t          j        |j        |j        |j                  }|                                dk    r||	|
<   `|duo|                                |z  dk    }|r|                    |
|          n|}|r#|j        }|                    |d	          |         }|j        r.t           j                            || j        |         |
           n-t           j                            || j        |         |
           |r,|                    |d          }|                    |          }||	|
<   U||	|
<   \|	S )a  Recv the input tensor dictionary.
        NOTE: `src` is the local rank of the source rank.

        all_gather_group: The group for the all-gather operation. If provided,
            an optimization is enabled where each rank in the group sends a
            slice of a tensor and the receiver reconstructs it using an
            all-gather, which can improve performance. This is typically the
            tensor-parallel group.
        all_gather_tensors: A dictionary to specify which tensors should use
            the all-gather optimization, which is only effective when
            `all_gather_group` is provided. By default, this optimization is
            on for any tensor whose size is divisible by the
            `all_gather_group`'s world size. However, it should be disabled
            for tensors that are not fully replicated across the group (e.g.,
            the residual tensor when sequence parallelism is enabled). This
            dictionary allows overriding the default behavior on a per-tensor
            basis.
        r8   Nr   r   rr   r   r  rW   rq   r   )rO   )r   r   r  rP   r   r   r   r   r   rI   recv_tensor_dictr  r+   r#   r[   r&   r%   r$   rt   r,  rZ   r-  r   r   r   ra   )r   r   r"  r#  r.  r/  r=   r  recv_metadata_listr'   r1   r2   rE   r1  
orig_shapes                  r"   r3  z!GroupCoordinator.recv_tensor_dictg  s   2  //11 	T_5I5I4/7!!=M=X!)AA/?/M 	 !;%)T_<CT_$$$&A3&A&A&A$$$( 	'/ !?@@@+<<   "--#-66&(, (	) (	)JC%00 ')UZu{5<XXX<<>>Q&&'-K$ %D0 >8A=  *(&**3???'  " R!'J#^^OR@@QF= U%**DJsO> +    
 %**6tz#e*TTT! 8-88A 9  F $^^J77F#)C  #(C  r!   c                 P    t           j                            | j                   dS )a+  Barrier synchronization among the group.
        NOTE: don't use `device_group` here! `barrier` in NCCL is
        terrible because it is internally a broadcast operation with
        secretly created GPU tensors. It is easy to mess up the current
        device. Use the CPU group instead.
        rC   N)r   r   barrierr   r   s    r"   r7  zGroupCoordinator.barrier  s&     	!!!77777r!   rE   c                 j    	 | j         t          d          | j                             ||           dS )z8Sends a tensor to the destination rank in a blocking wayNr   )r   rI   r
  )r   rE   r   s      r"   r
  zGroupCoordinator.send  s<    D#+;<<< %%fc22222r!   r&   r%   c                 h    	 | j         t          d          | j                             |||          S )z'Receives a tensor from the source rank.Nr   )r   rI   r   )r   r&   r%   r   s       r"   r   zGroupCoordinator.recv  s;     	@#+;<<<',,T5#>>>r!   c                 >   t          | d          r&t          j                            | j                   | `t          | d          r&t          j                            | j                   | `| j        | j                                         | j        	d | _        d S d S )Nr   r   )	r   r   r   destroy_process_groupr   r   r   destroyr   r   s    r"   r<  zGroupCoordinator.destroy  s    4(( 	"33D4EFFF!4%% 	33DNCCC#/$,,...*"&D +*r!   modelc                 L    | j         | j                             |           d S d S r?   )r   &prepare_communication_buffer_for_model)r   r=  s     r"   r?  z7GroupCoordinator.prepare_communication_buffer_for_model  s0    #/$KKERRRRR 0/r!   hidden_statesrouter_logitsis_sequence_parallelextra_tensorsc                 R    | j         | j                             ||||          S ||fS r?   )r   dispatch)r   r@  rA  rB  rC  s        r"   rE  zGroupCoordinator.dispatch  sA     #/+44$	   !-//r!   c                 J    | j         | j                             ||          S |S r?   )r   combine)r   r@  rB  s      r"   rG  zGroupCoordinator.combine  s-     #/+33MCWXXX  r!   )FN)r   NT)r   Fr?   )rq   )r   N)rq   N)r   rq   r   )Nr   )Nr   NN)NNNF);r   r   r   __doc__intr   rY   r   r   r
   strr   boolr   r   r   propertyr   r   r   r   r   r   r   r   r   r   r,   rK   rJ   ra   r`   r   rT   r   rR   r   r   r   r   r  r  r  r!  r   r*  r3  r7  r
  Sizer%   r   r<  nnModuler?  tuplerE  rG  r    r!   r"   r;   r;     sF          III9OOO OOO/$6666$J /4!%P
 P
$s)_P
 P
 $'=	P

 "&P
 (,P
 $JP
 P
 P
 P
f DH
 
 
 
 05
 
 
 
   X   X , , X, + + X+ < < X< < < X< ( (3F3M ( ( ( ^(:6 6%, 6 6 6 60;EL ;U\ ; ; ; ;
; ; ;C ; ; ; ; ; @EL @s @u| @ @ @ @ "&	H HtEL11H H Cy4	H H H H? ?U\ ? ?U\ ? ? ? ?" NRL LlL),L:>s)d:JL	L L L LD D3 D5< D D D D >@A AlA),A7:A		A A A A   3     C$J C    2 OS S	(+8Dt8K   "s      6"s "s " " " "L =A%).2P P#u|c112T9P P d"	P
 %t+P 
c5<#%%	&	-P P P Pj 9=59V V#u|c112V 4ZV ##56	V
 !dOd2V 
c5<#%%	&	-V V V Vt 9=59	Y Y4ZY ##56Y !dOd2	Y
 
c5<#%%	&	-Y Y Y Yv8 8 83 35< 3cDj 3D 3 3 3 3 GK? ?J?',{?9<t?	? ? ? ?
' 
' 
'SEHO S S S S &+370 0|0 |0 #	0
 EL)D00 	elEL()
elD,>>
?	@0 0 0 0* ;@! !37!	! ! ! ! ! !r!   _WORLD_INNER_DP_WORLD_NODE_COUNTc                  2    t           
J d            t           S )Nzworld group is not initialized)rS  r    r!   r"   get_world_grouprW    s    ?Mr!   c                  2    t           
J d            t           S )Nz'inner dp world group is not initialized)rT  r    r!   r"   get_inner_dp_world_grouprY    s    &&(Q&&&r!   r   r   r   c                 ,    t          | g||dd          S )NFworld)r   r   r   r   rF   r;   )r   r   r   s      r"   init_world_groupr]    s,     G") %   r!   Tr   r   r   c                 ,    t          | |||||          S )N)r   r   r   r   r   rF   r\  )r   r   r   r   rF   r   s         r"   init_model_parallel_groupr_  #  s-     ") 7&C   r!   _TPc                  2    t           
J d            t           S )Nz.tensor model parallel group is not initialized)r`  r    r!   r"   get_tp_grouprb  8  s    ??L???Jr!   _DCPc                  2    t           
J d            t           S )Nz6decode context model parallel group is not initialized)rc  r    r!   r"   get_dcp_groupre  @  s    UKr!   _PPc                  2    t           
J d            t           S )Nz0pipeline model parallel group is not initialized)rf  r    r!   r"   get_pp_grouprh  K  s    ??N???Jr!   _DPc                  2    t           
J d            t           S )Nz&data parallel group is not initialized)ri  r    r!   r"   get_dp_grouprk  S  s    ??D???Jr!   _EPc                  2    t           
J d            t           S )Nzexpert parallel group is not initialized. EP group is only created for MoE models with num_experts > 0. This function should only be called for MoE models.)rl  r    r!   r"   get_ep_grouprn  [  s    ??	> ??
 Jr!   _PCPc                  2    t           
J d            t           S )Nz1prefill context parallel group is not initialized)ro  r    r!   r"   get_pcp_grouprq  g  s    PKr!   r$   c              #   R  K   t          t          j                            |                     }t	                                          |          5  t                                          |          5  |V  ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS )a=  
    `graph_capture` is a context manager which should surround the code that
    is capturing the CUDA graph. Its main purpose is to ensure that some
    operations will be run after the graph is captured, before the graph
    is replayed. It returns a `GraphCaptureContext` object which contains the
    necessary data for the graph capture. Currently, it only contains the
    stream that the graph capture is running on. This stream is set to the
    current CUDA stream when the context manager is entered and reset to the
    default stream when the context manager is exited. This is to ensure that
    the graph capture is running on a separate stream from the default stream,
    in order to explicitly distinguish the kernels to capture
    from other kernels possibly launched on background in the default stream.
    )r$   N)r   r   r   r   rb  r   rh  )r$   contexts     r"   r   r   l  s      "%*"3"36"3"B"BCCG		%	%g	.	.  0L0LW0U0U                                  s6   "B3B8BB	BB	BB #B enablec                 
    | a d S r?   )_ENABLE_CUSTOM_ALL_REDUCE)rt  s    r"   set_custom_all_reducerw    s     &r!   rq   env://ncclr   distributed_init_methodtimeoutc                 f   t                               d| ||||           ddlm}  |            }||j        j        dk    r|j        j        dk    s|j        j        dk    r|j        }|j        | z  |z   }|j	        } |j        dk    r|j
        }	|j        }
t          |	|
          }nH|j        }	|                                }
t          |	|
          }t                               d| ||           t          j                                        st                               d| ||||           |
J d            t          j                            |          sEt                               d|           t          j                                        s
J d	            d
}t          j                            ||| ||           |dk    r|dk    rt.          j        n|}t2          t5          t7          t          j                                                            }t;          |||          a||j        j        dk    r|j        j        ant?          t2          j                   at                               dt<                     n6t2          j!        t          j                                        k    s
J d            |s|j        j"        dk    re|j        dk    rO|j!        fdt7          |j                  D             }tG          |tI                      j%        |ddd          a&d S t2          a&d S d S d S )NzIworld_size=%d rank=%d local_rank=%d distributed_init_method=%s backend=%sr   get_current_vllm_config_or_noneexternal_launcherr8   zAAdjusting world_size=%d rank=%d distributed_init_method=%s for DPzRdistributed_init_method must be provided when initializing distributed environmentz>Distributed backend %s is not available; falling back to gloo.z'Fallback Gloo backend is not available.r   )r   init_methodrP   r   r{  rq   rx  z0Detected %d nodes in the distributed environmentz;world group already initialized with a different world sizec                 J    g | ]fd t                    D             S )c                      g | ]
}z  |z   S r    r    )r&  idp_rankworld_size_inner_dps     r"   r)  z;init_distributed_environment.<locals>.<listcomp>.<listcomp>  s$    WWWq..2WWWr!   )range)r&  r  r  s    @r"   r)  z0init_distributed_environment.<locals>.<listcomp>  sK        XWWWWEBU<V<VWWW  r!   Tinner_dp_worldF)r   rF   r   )'loggerdebugvllm.configr~  parallel_configdistributed_executor_backendnnodesdata_parallel_sizedata_parallel_rankworld_size_across_dpmaster_addrmaster_portr   data_parallel_master_ipget_next_dp_init_portr   r   r  infois_backend_availablewarningis_gloo_availableinit_process_groupenvs
LOCAL_RANKrS  rY   r  get_world_sizer]  rU  _node_countr   rP   nnodes_within_dpr_  rW  r   rT  )rP   r   rz  r   r   r{  r~  configr  ipportr   r   r  s                @r"   init_distributed_environmentr    s    LLS   <;;;;;,,..F"?CVVV")A--%81<< !0 1J>E$9
 !A%% ,B".D&A"d&K&K## 8B"88::D&A"d&K&K#LLS'	   ++-- 
W#	
 	
 	
 '22& 322  55g>> 	NNP   $6688  9 8 G,,/! 	- 	
 	
 	
 R )@8(K(KT__QU
~U5,;;==>>??!%W==&"8"?!"C"C 07KK%f&677KGUUUU E$5$D$D$F$FFFFI GFF f4EII-11"1"<   $_%GHH  K 8!!,.2+(-  OOO %OOO! IIr!   r8   tensor_model_parallel_sizepipeline_model_parallel_size#prefill_context_model_parallel_size"decode_context_model_parallel_sizec           
      B   t           j                                        sJ t           j                                        }t           j                                        }|p/t           j                            t                      j                  }d}ddlm	}  |            }	|	|	j
        j        }t          j        |                              d||||           }
t          
J d            |
                    d|                               d          }d |D             }t#          |t                      j        |dd	
          at&          
J d            |
                    d|                              d          }d |D             }t#          |t                      j        |dd
          at(          
J d            |
                    dd                              d|                              d          }d |D             }t#          |t                      j        |d          at,          
J d            |
                    dd                              d|                              d          }d |D             }t#          |t                      j        |d          at.          
J d            |
                    dd                              d|                              d          }d |D             }t#          |t                      j        |d          at0          
J d            |	|	j        |	j        j        rs|
                    dd                              d||z  | z                                d          }d |D             }t#          |t                      j        |d          at6                              d||t.          j        t,          j        t(          j        t          j        t0          t0          j        nd           dS ) a  
    Initialize model parallel groups.

    Arguments:
        tensor_model_parallel_size: number of GPUs used for tensor model
            parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model
            parallelism.
        backend: name of torch distributed communication backend.

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
        4 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 pipeline model-parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    r8   r   r}  Nrq   z2tensor model parallel group is already initializedc                 6    g | ]}|                                 S r    tolistr&  xs     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>@       333!188::333r!   Ttp)r   rF   z:decode context model parallel group is already initializedc                 6    g | ]}|                                 S r    r  r  s     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>S  r  r!   dcpz5prefill context parallel group is already initialized      c                 6    g | ]}|                                 S r    r  r  s     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>c  r  r!   pcpr   z4pipeline model parallel group is already initialized   c                 6    g | ]}|                                 S r    r  r  s     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>n  r  r!   ppz*data parallel group is already initializedc                 6    g | ]}|                                 S r    r  r  s     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>v  r  r!   dpz,expert parallel group is already initializedc                 6    g | ]}|                                 S r    r  r  s     r"   r)  z-initialize_model_parallel.<locals>.<listcomp>  s     777aqxxzz777r!   epzcrank %s in world size %s is assigned as DP rank %s, PP rank %s, PCP rank %s, TP rank %s, EP rank %szN/A)r   r   r  r  r   get_backendrW  r   r  r~  r  r  aranger-  r`  rx   unbindr_  r   rc  ro  	transposerf  ri  rl  model_configis_moer  	info_oncer   )r  r  r  r  r   rP   r   r  r~  r  	all_ranksr   s               r"   initialize_model_parallelr     sL   > ++-----'6688J%%''DV*667H7H7UVVG;;;;;;,,..F#3F Z((00
$+" I ;;L;;;..%?@@GGJJK33{333K $$&*  C <<U<<<
 ##B(JKKRRSTUUK33{333K$$&*  D <<P<<<Aq!!	8	9	9	 
 43{333K$_&&17u  D ;;N;;;Aq!!))".JKKRRSTUU  43{333K
#_&&17t  C
 ;;D;;;%%a++33B8JKKRRSTUUK33{333K
#_&&17t  C
 ;;F;;;~,48K8R41%%W"56,-  VAYY 	 87;777'**5w4
 
 

 	! 	 _%    r!   c                    |p/t           j                            t                      j                  }t                      st          | ||||           dS t                      | k    sJ dt                      d|             t                      j	        }||k    sJ d|d|            t                      j	        }||k    sJ d|d|            dS )zHelper to initialize model parallel groups if they are not initialized,
    or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
    values if the model parallel groups are initialized.
    Nzotensor parallel group already initialized, but of unexpected size. got: get_tensor_model_parallel_world_size()=z( vs. wanted: tensor_model_parallel_size=zXpipeline parallel group already initialized, but of unexpected size. got: pp_world_size=z* vs. wanted: pipeline_model_parallel_size=z[prefill context parallel group already initialized, but of unexpected size: pcp_world_size=z) vs. prefill_context_model_parallel_size=)r   r   r  rW  r   model_parallel_is_initializedr  $get_tensor_model_parallel_world_sizerh  rP   rq  )r  r  r  r  r   pp_world_sizepcp_world_sizes          r"   !ensure_model_parallel_initializedr    s9    V*667H7H7UVVG(** !&(/.	
 	
 	
 	/115OOOO	1466	1 	1-	1 	1 POO
 !NN-M8888	3	3 	3/	3 	3 988
 #__/N@@@@	2	2 	2.	2 	2 A@@@@r!   r=  c                 T   t           t                               |            t          t                              |            t          t                              |            t          t                              |            t
          t
                              |            dS dS )a1  Prepare the communication buffer for the model.
    Traditional communication libraries like NCCL are almost
    model agnostic. However, emerging new communication libraries like
    MoE all2all (DeepEP) usually allocate the communication buffer
    based on the model shape for optimal performance.
    N)r`  r?  ro  rf  ri  rl  )r=  s    r"   r?  r?    s     22599933E:::
225999
225999
22599999 r!   c                  &    t           duot          duS )z=Check if tensor and pipeline parallel groups are initialized.N)r`  rf  r    r!   r"   r  r    s    d?.s$.r!   tp_groupc              #   t   K   t           r
J d            da t                      }| a	 dV  da |adS # da |aw xY w)a  Patch the tp group temporarily until this function ends.

    This method is for draft workers of speculative decoding to run draft model
    with different tp degree from that of target model workers.

    Args:
        tp_group (GroupCoordinator): the tp group coordinator
    z)Should not call when it's already patchedTNF)_TP_STATE_PATCHEDrb  r`  )r  old_tp_groups     r"   patch_tensor_parallel_groupr    sj       !MM"MMM >>L
C " "s   1 7c                  (    t                      j        S )z6Return world size for the tensor model parallel group.)rb  rP   r    r!   r"   r  r    s    >>$$r!   c                  (    t                      j        S )z3Return my rank for the tensor model parallel group.)rb  r   r    r!   r"   get_tensor_model_parallel_rankr    s    >>''r!   c                  (    t                      j        S )z>Return world size for the decode context model parallel group.)re  rP   r    r!   r"   ,get_decode_context_model_parallel_world_sizer    s    ??%%r!   c                  (    t                      j        S )z;Return my rank for the decode context model parallel group.)re  r   r    r!   r"   &get_decode_context_model_parallel_rankr  	  s    ??((r!   c                  2    t           
J d            t           S )z@Return the total number of nodes in the distributed environment.Nz*distributed environment is not initialized)rU  r    r!   r"   get_node_countr    s    ""$P"""r!   c                     t           rt                                            da t          rt                                           dat          rt                                           dat          rt                                           dat
          rt
                                           dat          rt                                           dadS )z(Set the groups to none and destroy them.N)r`  r<  rc  ro  rf  ri  rl  r    r!   r"   destroy_model_parallelr    s      
C  D  D  
C  
C  
CCCr!   c                      t           rt                                            d a d at          j                                        r t          j                                         d S d S r?   )rS  r<  rU  r   r   r  r;  r    r!   r"   destroy_distributed_environmentr  6  s`     FK'')) 2//111112 2r!   shutdown_rayc                    t          j                     t          j                     t	                       t                       | rdd l}|                                 t          j                     ddl	m
} |j        }|
 |             	 |                                s t          j                                         d S d S # t           $ r t"                              d           Y d S w xY w)Nr   r   z;torch._C._host_emptyCache() only available in Pytorch >=2.5)r  disable_envs_cachegcunfreezer  r  rayshutdowncollectr   r   empty_cacher   r   r   _host_emptyCacheAttributeErrorr  r  )r  r  r   r  s       r"   cleanup_dist_env_and_memoryr  @  s   KMMM#%%% 


JLLL//////".KV&&(( 	(H%%'''''	( 	( V V VTUUUUUUVs   
2C   $C('C(pgsource_rankc                 `   t          | t                    rt          j                            |           t          j        j        j        k    s
J d            t          j                            |           }t          j                            |           }t          j        	                    |           }n*| j
        }| j        }t          t          |                    }t          j        dg|z  t          j        d          }d}d}	 t!          j        t$                    5  ||k    rt'          j        dd	
          }||j        dt-          |          <   t          | t                    r/t          j                            |j        g||         |            n|                     |j        |           d||<   nt          | t                    r4dg}t          j                            |||         |            |d         }	n|                     d|          }	t5          dd           5  t'          j        |	          }ddd           n# 1 swxY w Y   |j        dt-          |                   |k    rd||<   ddd           n# 1 swxY w Y   n2# t6          $ r%}
t8                              d|
           Y d}
~
nd}
~
ww xY w|r|                                 n# |r|                                 w w xY wt          | t                    r!t          j                            |            n|                                  t!          j        t$                    5  ||k    r|r|                                  ddd           n# 1 swxY w Y   t          | t                    r$t          j        !                    ||            |}nBt          j"        |          }t          |          D ]}|                     ||          }||z  }d |#                                D             S )z
    This is a collective operation that returns if each rank is in the same node
    as the source rank. It tests if processes are attached to the same
    memory system (shared access to shared memory).
    z;in_the_same_node_as should be tested with a non-NCCL group.rC   r   r   rW   s   magic_messageNT   )creater&   r   r  r8   z)multiprocessing.resource_tracker.registerc                      d S r?   r    )argskwargss     r"   <lambda>z%in_the_same_node_as.<locals>.<lambda>  s    D r!   )r5   z(Error ignored in is_in_the_same_node: %sc                     g | ]}|d k    	S )r8   r    r  s     r"   r)  z'in_the_same_node_as.<locals>.<listcomp>  s    555qAF555r!   )$r+   r   r   r   r  r   NCCLr   r  get_process_group_ranksr   rP   rY   r  rE   int32
contextlibsuppressOSErrorr	   SharedMemorybufr   r   r5   broadcast_objr   	Exceptionr  errorcloser7  unlinkrK   
zeros_liker  )r  r  r   rP   r   is_in_the_same_nodemagic_messageshmr   r5   eaggregated_datar  	rank_datas                 r"   in_the_same_node_asr	  Y  s(    "l## ( ,,R00E4E4M4RRRRI SRR  )))33&55B5??
 !99"==w]
U:&&''  ,	
jE   %M
C% )) 	2 	2{""#03GGG0=,#m,,,-b,// @%;;
k(:" <     $$SX;$???,-#D)) b,// C 6D%;;%"4B <     7DD++Dk+BBD ?00  @ @ (4$???C	@ @ @ @ @ @ @ @ @ @ @ @ @ @ @
 7/S///0MAA01'-?	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2@  D D D?CCCCCCCCD  	IIKKK  	IIKKKK	 "l## !!!++++


 
	W	%	%  ;3JJLLL               "l## )$$%8$CCC-*+>??z"" 	) 	)A(()<!(DDIy(OO55O22445555s   J $DJ1IJI	JI	(JJ JJ JJ K$ 
K
%K K$ K

K$ $K=#NNNc                      	 t           t           j        S t          j                                        sdS t          j                                        dk    S # t          $ r Y dS w xY w)a  
    Check if the current process is the first rank globally across all
    parallelism strategies (PP, TP, DP, EP, etc.).

    Unlike group-specific checks like `get_tensor_model_parallel_rank() == 0`
    or `get_pp_group().is_first_rank`, this function checks the global rank
    across all parallelism dimensions.

    Returns:
        bool: True if this is the global first rank (rank 0), False otherwise.
              Returns True if distributed is not initialized (single process).
    NTr   )rS  r   r   r   r  r   r  r    r!   r"   is_global_first_rankr    st     ''  //11 	4  ))++q00   tts   A A !A 
A%$A%c                  4   	 t           t           j        dk    S t          j                                        sdS 	 t          t          j                  dk    S # t          $ r% t          j        	                                dk    cY S w xY w# t          $ r Y dS w xY w)zT
    Check if the current process is the first local rank (rank 0 on its node).
    Nr   T)
rS  r   r   r   r  rK  r  r  r  r   r    r!   r"   is_local_first_rankr    s     $)) //11 	4	5t''1,, 	5 	5 	5$--//14444	5   tts3   B	 B	 A ,BB	 BB	 	
BBc                 b   t          | t                    r!t          j                            |           }n| j        }|dk    rdS dg|z  }d}t          |          D ]Q}||         dk    r|dz  }|||<   t          | |          }t          |          D ]\  }}|r||         dk    r|||<   R|S )z
    Returns the total number of nodes in the process group.

    Args:
        pg: The process group to analyze

    Returns:
        int: The total number of nodes
    rC   r8   r   )	r+   r   r   r   r  rP   r  r	  	enumerate)r  rP   node_assignmentnext_node_idcurrent_ranksame_node_flags
other_rankis_same_nodes           r"   r  r    s     "l## #&55B5??

]
Qq cJ&OLj)) ; ;<(A-- 	(4% .b,??(1/(B(B 	; 	;$J ;
 ;q @ @.:
+	; r!   )r=   r;   r(   N)NNNF)FNT)rq   rq   rx  rq   ry  N)r8   r8   r8   r8   N)r8   r8   NrI  rH  )wrJ  r  r  r  r@   collectionsr   collections.abcr   r   r   dataclassesr   datetimer   multiprocessingr	   typingr
   r   unittest.mockr   r   torch.distributed)torch.distributed._functional_collectivesr   _functional_collectivesry   #torch.distributed._symmetric_memoryr   r   	vllm.envsr  >vllm.distributed.device_communicators.base_device_communicatorr   vllm.distributed.utilsr   vllm.loggerr   vllm.utils.import_utilsr   vllm.utils.network_utilsr   vllm.utils.system_utilsr   vllm.utils.torch_utilsr   r   r#   r  rL  r,   rR  rY   r3   r4   rK  r   r:   r<   rD   rK   rN   rT   r^   ra   rc   r%   rM  r~   r   r;   rS  rT  rU  rW  rY  r]  r_  r`  rb  rc  re   get_context_model_parallel_grouprf  rh  ri  rk  rl  rn  ro  rq  r$   r   r   r  rv  rw  r  r  r  rP  rQ  r?  r  r  r  r  r  r  r  r  r  r  r  r	  r  r  r  r    r!   r"   <module>r*     s    $     				   " " " " " " $ $ $ $ $ $ 2 2 2 2 2 2 2 2 ! ! ! ! ! !       ) ) ) ) ) )                            : : : : : : : : : * * * * 3 3 3 3 3 3 3 3            9 8 8 8 8 8 # # # # # # ; ; ; ; ; ; @ @ @ @ @ @ 3 3 3 3 3 3     
         ,.I.I.IJJ&c5<#--.&
4c3h $u|"445& & & &4 ') T#s(^ ( ( (
3 
3 
 
 
 
 BDc8B); <<==	> C C C4 4 4 4/u| / / / / / /$EL $c $el $ $ $ $8L8"8038AD8
\8 8 8 8LLL"L03LADL
\L L L L4L4"4034AD4
\4 4 4 4LLL"L03LADL
\L L L L$ !%(,$( 1 1|1|1 \1 \	1
 1 1 &)1 1 s)1 ,
1 ,%1 {T!1 1 \1 1 1 1| !%(,$(  || \ \	
   &)  s) ,
 ,% {T!  \   @       !          86=   q! q! q! q! q! q! q! q!h #'4 & & &+/!D( / / /S4Z   )    
"2    
	9	"%	03		 	 	 	  +0!$( d3i  $(	
 d
 "    $  $ # # #&    
 !% $ $ $'     $1  # # # #&    
  $ # # #&    
  $ # # #&     !% $ $ $'    
 %,    & 
X		  '$ ' ' ' ' #+ $s% s%s%
s% !s% 	s%
 s% s% s% s% s%n '(()/056Z Z #Z"%Z *-Z ),d
	Z
 4ZZ 
Z Z Z Z@ 0156& & #&"%& *-& ),d
	&
 4Z& 
& & & &R:%(/ : : : :&/ / /
   *:    0%c % % % %
( ( ( ( (
&c & & & &
) ) ) ) )
      D2 2 2V Vd V V V V4 BCY6 Y6,,Y6;>Y6	$ZY6 Y6 Y6 Y6xd    >T    .$L#88 $S $ $ $ $ $ $r!   