
    .`iaa                        U d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZmZ d dlmZmZ d dlm Z  d dl!m"Z"m#Z#m$Z$m%Z% d dl&m'Z' e%d dl(m)Z) d dl*m+Z+ ndZ)e
rd dl,m-Z-  ee.          Z/ e            Z0ee'dz           e1d<   e02                    d           e G d d                      Z3 G d de           Z4dS )    N)defaultdict)Callable)Future)	dataclass)TYPE_CHECKINGAny)init_logger)current_platform)get_env_vars_to_copy)get_distributed_init_methodget_ipget_open_port)GrammarOutputSchedulerOutput)ReconfigureDistributedRequestReconfigureRankType)Executor)FutureWrapperRayWorkerWrapperinitialize_ray_clusterray)ModelRunnerOutput)ActorHandle) PlacementGroupSchedulingStrategy)PlacementGroupCOMPLETED_NONE_FUTUREc                   D    e Zd ZU dZeed<   eed<   dZeed<   dZe	ed<   dS )	RayWorkerMetaDataz
    Metadata for a Ray worker.
    The order of ray worker creation can be random,
    and we need to reset the rank after creating all workers.
    workercreated_rankadjusted_rank ipN)
__name__
__module____qualname____doc__r   __annotations__intr"   r$   str     q/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/v1/executor/ray_executor.pyr   r   0   sQ           M3BLLLLLr-   r   c                      e Zd ZU dZh dZddhZdZeed<   dZ	eed<   d'd
Z
edefd            Zd'dZdeeef         fdZd Zd(dZdedd	fdZ	 d)dededed	z  eed	z           z  fdZ	 d)dddeded	z  eed	z           z  fdZ	 d)dedddeded	z  eed	z           z  fdZ	 	 	 	 d*deez  ded	z  ded eeef         d	z  dede e         ee e                  z  fd!Z!d" Z"d#efd$Z#d% Z$d'd&Z%d	S )+RayDistributedExecutorzRay-based distributed executor>   
LOCAL_RANKVLLM_HOST_IPVLLM_HOST_PORTCUDA_VISIBLE_DEVICESHF_TOKENHUGGING_FACE_HUB_TOKENTuses_raysupports_ppreturnNc                    d | _         t          j                    st          j                    rdt          j        d<   | j        sJ t          | j                   | j        j	        }t          j        
                    dd          }|dk    rdt          j        d<   |                     |           | j        j        d u| _        | j        j        j        dk    o| j        j        d u p| j        j        j         | _        d | _        d S )Nshm&VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPERAY_USAGE_STATS_ENABLED01pooling)forward_dagr
   is_tpuis_xpuosenvironr7   r   parallel_configplacement_groupget_init_workers_rayvllm_configkv_transfer_confighas_connectormodel_configrunner_typeec_transfer_configis_ec_produceruses_samplerscheduler_output)selfrG   	ray_usages      r.   _init_executorz%RayDistributedExecutor._init_executorP   s   7; "$$ 	I(8(?(A(A 	ICHBJ?@}}t3444.> JNN#<cBB	47BJ01 	/// "-@L ,9ER 
/47 F#6EE 	
 9=r-   c                 F    | j         j        }|dk    r| j        j        rdn|S )zRay distributed executor supports pipeline parallelism,
        meaning that it allows PP size batches to be executed concurrently.
              )rF   pipeline_parallel_sizescheduler_configasync_scheduling)rS   pp_sizes     r.   max_concurrent_batchesz-RayDistributedExecutor.max_concurrent_batchesm   s,    
 &=qLLT%:%KLqqQXXr-   c                     t           rt                               d           t          | d          rG| j        B| j                                         dd l}| j        D ]} |j        |           d | _        d S d S d S )NzShutting down Ray distributed executor. If you see error log from logging.cc regarding SIGTERM received, please ignore because this is the expected termination process in Ray.rA   r   )loggerinfohasattrrA   teardownr   workerskill)rS   r   r   s      r.   shutdownzRayDistributedExecutor.shutdownu   s     	KKK  
 4'' 	$D,<,H%%'''JJJ, ! !    #D	$ 	$,H,Hr-   c                 h    |                     di           }|                    dddddi           |S )Nruntime_envnsightzcuda,cudnn,cublasz'worker_process_%p'node)tozcuda-graph-trace)
setdefaultupdate)rS   ray_remote_kwargsrg   s      r.   !_configure_ray_workers_use_nsightz8RayDistributedExecutor._configure_ray_workers_use_nsight   sT     (22="EE,.(. 	
 	
 	
 ! r-   c                     | j         S N)_env_vars_for_all_workersrS   s    r.   _get_env_vars_to_be_updatedz2RayDistributedExecutor._get_env_vars_to_be_updated   s    --r-   rG   r   c                 t  %&' t           j        }d | _        g | _        g | _        | j        j        r|                     |          }t           j        rt          t          t          t           j                            d                              }t          |          | j        j        k    sJ d|d| j        j                    t          t          |                    t          |          k    sJ d|            neg }t!          |j                  D ]:\  }}|                    t&          j        d          r|                    |           ;|d | j        j                 }g }t-                      %t!          |          D ]\  }}t/          |d|          }	t&          j        dk    r8 t1          j        d'd||	d	|t4                                        |
          }
nD t1          j        d'ddt&          j        |i|	d|t4                                        |
          }
|                    t7          |
|                     t1          j        d |D                       }t9          ||          D ]\  }}||_        t<                              d|           t<                              d| j                   i &|D ]}&                    |d          dz   &|<   dt6          f%&fd}tA          ||          }t!          |          D ]\  }}||_!        d |D             | _        d |D             }| "                    d|f           g }| j        g| j        z   D ]C}
|
|                    t1          j        |
j#                                                             DtI          t                    }tI          t                    't!          |          D ]J\  }\  }}||                             |           d |D             }'|         %                    |           K'&                                D ]\  }}tA          |          '|<   t          |%gz             }t          |          }t          |          }||k    r;tO          d| dt          |(                                           d| d| d	          'fd|D             }tS          | j*        t          t&          j+                  ,                    | j-                  d           }|D ]*}|D ]%}|t\          j/        v rt\          j/        |         ||<   &+|| _0        | "                    d!| 1                                f           t          '          dk    rd"%te          %tg                                }g }t!          |          D ]j\  }\  }} ||         4                    |          }!tk          | j6        |!||| j         p|| j        j7        z  dk    #          }"|                    |"           k| "                    d$|f           | "                    d%           | "                    d&           tq          | j        j9                  D ]}#| j                            g            tq          | j        j7                  D ]y}$|#| j        j7        z  |$z   }t          | j        |#                   |$k    sJ |#t          | j                  k     sJ | j        |#                             | j        |                    zd S )(N,zZVLLM_RAY_BUNDLE_INDICES must have the same size as the world size, but got bundle_indices=z% and self.parallel_config.world_size=zMVLLM_RAY_BUNDLE_INDICES cannot have duplicate values, but got bundle_indices=r   T)rG   #placement_group_capture_child_tasksplacement_group_bundle_indexGPU)num_cpusnum_gpusscheduling_strategy)rpc_rank)rz   r{   	resourcesr|   )r   r    c                 J    g | ] }|j         j                                        !S r,   )r   get_node_ipremote).0eachs     r.   
<listcomp>z<RayDistributedExecutor._init_workers_ray.<locals>.<listcomp>   s9        '..00  r-   zworkers: %szdriver_dummy_worker: %srW   itemc                 8    | j         }|k    rdnd|         |fS )a  
            Sort the workers based on 3 properties:
            1. If the worker is on the same node as the driver (vllm engine),
                it should be placed first.
            2. Then, if the worker is on a node with fewer workers, it should
                be placed first.
            3. Finally, if the work is on a node with smaller IP address, it
                should be placed first.
            r   rW   )r$   )r   r$   	driver_ip	ip_countss     r.   sort_by_driver_then_worker_ipzORayDistributedExecutor._init_workers_ray.<locals>.sort_by_driver_then_worker_ip   s)     Bi11Q	"rAAr-   )keyc                     g | ]	}|j         
S r,   )r   r   r   s     r.   r   z<RayDistributedExecutor._init_workers_ray.<locals>.<listcomp>  s    GGGGGGr-   c                 (    i | ]}|j         |j        S r,   )r    r"   r   s     r.   
<dictcomp>z<RayDistributedExecutor._init_workers_ray.<locals>.<dictcomp>  s-     
 
 
6:Dt1
 
 
r-   adjust_rankargsc                 ,    g | ]}t          |          S r,   )r*   )r   xs     r.   r   z<RayDistributedExecutor._init_workers_ray.<locals>.<listcomp>  s    ///!s1vv///r-   z0Every node should have a unique IP address. Got z nodes with node ids z and z unique IP addresses z. Please check your network configuration. If you set `VLLM_HOST_IP` environment variable, make sure it is unique for each node.c           
          g | ]?\  }}t           j        d                     t          t          |                             i@S )rv   )r
   device_control_env_varjoinmapr+   )r   node_id_	node_gpuss      r.   r   z<RayDistributedExecutor._init_workers_ray.<locals>.<listcomp>2  sY     4
 4
 4
 !	 !7Yw/00: :4
 4
 4
r-   rc   )exclude_varsadditional_varsdestinationupdate_environment_variablesz	127.0.0.1)rJ   
local_rankrankdistributed_init_methodis_driver_workerinit_workerinit_device
load_modelr,   ):envsVLLM_RAY_PER_WORKER_GPUSdriver_dummy_workerrc   pp_tp_workersrF   ray_workers_use_nsightro   VLLM_RAY_BUNDLE_INDICESlistr   r*   splitlen
world_sizeset	enumeratebundle_specsrH   r
   ray_device_keyappendr   r   r   r   r   r   zipr$   r_   debugsortedr"   collective_rpcget_node_and_gpu_idsr   extenditemsRuntimeErrorkeysr   WORKER_SPECIFIC_ENV_VARSadditional_env_varsunionADDITIONAL_ENV_VARSrD   rE   rr   rt   r   r   indexdictrJ   tensor_parallel_sizerangerY   )(rS   rG   rn   r{   bundle_indices	bundle_idbundleworker_metadatar   r|   r   
worker_ipsr   r$   r   sorted_worker_metadatair   rerank_mappingworker_node_and_gpu_idsnode_workersr   gpu_idsall_ipsn_ipsn_nodes(all_args_to_update_environment_variablesenv_vars_to_copyr   namer   
all_kwargsr   r   kwargspp_ranktp_rankr   r   r   s(                                        @@@r.   rI   z(RayDistributedExecutor._init_workers_ray   s   0 =A /1
 <>6 	 $ F F!! ! ' 	O!#c4+G+M+Mc+R+R"S"STTN~&&$*>*IIII:/=: :+6: : JII
 s>**++s>/B/BBBB.*. . CBBB  N%./K%L%L 5 5!	6::.=qAA 5")))444+,Md.B.M,MNN35HH	(88 	X 	XOD)"B /48-6# # #  .%77 %(;  (	 
 #$ $
 %+FDF$9$9  />I(;	 
 (  #$ $ %+FDF$9$9  ""#4FQU#V#V#VWWWWW +  
 

 OZ88 	 	HD"DGG]O444.0HIII$&	 	5 	5B%MM"a0014IbMM	B0A 	B 	B 	B 	B 	B 	B 	B  "(!>"
 "
 "
 !!788 	# 	#GAt!"DGG0FGGG
 
>T
 
 
 	M0ABBB #%/04<? 	 	F~#**3::<<==    #4((%%	%./F%G%G 	/ 	/!A!!((+++ 0/w///Gg%%g.... ) 1 1 	1 	1GW!'IgjI;.//Gl##e7  (,\->->-@-@(A(A  /6    4
 4
 4
 4
 !84
 4
 4
0 06 0 DEEKK(  "
 
 
 = 	2 	2D( 2 22:%%!#D!1DJ2 *R&*$2R2R2T2T1V 	 	
 	
 	
 y>>Q $I"=}#
 #

 
"+,C"D"D 
	& 
	&D,7A%g.44T::J ,%(?&*&:": "K4/DDI  F f%%%%M>>>M***L)))T1HII 	G 	GG%%b))) !5!JKK G G  $"6"KKwV4-g6777BBBBT%7!8!88888"7+224<3EFFFFG	G 	Gr-   reconfig_requestc                     |                      d|f           |j        t          j        k    r|                                  d S d S )Nreinitialize_distributedr   )r   new_data_parallel_rankr   SHUTDOWN_CURRENT_RANKre   )rS   r   s     r.   r   z/RayDistributedExecutor.reinitialize_distributed{  sU     	6>N=PQQQ3"89 9 MMOOOOO9 9r-   FrR   	non_blockc                     | j         t          d          | j        r|j        s|                     |d |          S || _         |rt
          nd S )NzOState error: sample_tokens() must be called after execute_model() returns None.)rR   r   rQ   total_num_scheduled_tokens_execute_dagr   )rS   rR   r   s      r.   execute_modelz$RayDistributedExecutor.execute_model  sr    
  ,6  
   	H(8(S 	H$$%5tYGGG !1(1;$$t;r-   grammar_outputzGrammarOutput | Nonec                 f    | j         }||rt          ndS d| _         |                     |||          S )as  Execute the model on the Ray workers.

        The scheduler output to use should have been provided in
        a prior call to execute_model().

        Args:
            grammar_output: The structured outputs grammar bitmask, if applicable.
            non_block: If True, the method will return a Future.

        Returns:
            The model runner output.
        N)rR   r   r   )rS   r   r   rR   s       r.   sample_tokensz$RayDistributedExecutor.sample_tokens  sE    "  0#,5?((4? $  !1>9MMMr-   c                    | j         |                     d          | _         | j                             ||f          }| j        s1|s|d                                         S t          |d                   S | j        J |s,| j                            t          j        |                    S t          || j                  S )NF)enable_asyncior   )	rA   _compiled_ray_dagexecuterL   rH   r   kv_output_aggregator	aggregater   )rS   rR   r   r   refss        r.   r   z#RayDistributedExecutor._execute_dag  s     ##55U5KKD'')9>(JKK! 	*  %Aw{{}}$ !a))) (444 	F,66swt}}EEE T4#<===r-   r,   methodtimeoutr   r   c                     t          |t                    r|nt          j        |          ~i fd| j        D             }|rt          |          S t          j        ||          S )z%Runs the given method on all workers.Nc                 <    g | ]} |j         j        gR i S r,   )execute_methodr   )r   r   r   r   sent_methods     r.   r   z9RayDistributedExecutor.collective_rpc.<locals>.<listcomp>  sZ     
 
 
  )F!("  &, 
 
 
r-   )r   )
isinstancer+   cloudpickledumpsrc   r   r   rH   )rS   r   r   r   r   r   ray_worker_outputsr   s      ``  @r.   r   z%RayDistributedExecutor.collective_rpc  s     !+63 7 7Vff[=Nv=V=V>F
 
 
 
 
 
 ,	
 
 
  	5 !3444w)7;;;;r-   c                    dd l }ddlm} |                    d          }|                    |j                            d                    }||k     rt          d| d|           dd l}|j                            d          }|t          d          |j                            d	          }|t          j
        d
k    rt          d          d S d S )Nr   )versionz2.43.0r   zRay version z is required, but found z!ray.experimental.compiled_dag_refzQRay Compiled Graph is not installed. Run `pip install ray[cgraph]` to install it.cupyncclzcupy is not installed but required since VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE is set to 'nccl'. Run `pip install ray[cgraph]` and check cupy installation.)importlib.metadata	packagingr   parsemetadata
ValueErrorimportlib.utilutil	find_specr   r<   )rS   	importlibr   required_versioncurrent_versioncgraph_spec	cupy_specs          r.   _check_ray_cgraph_installationz5RayDistributedExecutor._check_ray_cgraph_installation  s#   !!!!%%%%%%"==22!--	(:(B(B5(I(IJJ---9/ 9 9'69 9  
 	n../RSS?  
 N,,V44	!LPV!V!VM   !V!Vr-   r   c                 F   | j         j        sJ |                                  t          j                            dd           ddlm}m} t          
                    dt          j        d                    t          
                    dt          j                   t          
                    dt          j                   t          j        }|dvrt          d	| d
           |            5 fd| j        d         D             t!          | j                  D ]g\  }}fdt!          |          D             t#          | j                  dz
  }||k     r*t          j        dk    rt          j        fdD             h |          }d d d            n# 1 swxY w Y   t          j        r4ddlm}	 ddlm}
  |	d|
           t          
                    d           nt          
                    d           |                    |t          j                  S )NRAY_CGRAPH_get_timeout300r   )	InputNodeMultiOutputNodez#RAY_CGRAPH_get_timeout is set to %sz+VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE = %sz+VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM = %s)autor   r;   z:Invalid value for VLLM_USE_RAY_COMPILED_DAG_CHANNEL_TYPE: z-. Valid values are: 'auto', 'nccl', or 'shm'.c                     g | ]}S r,   r,   )r   r   
input_datas     r.   r   z<RayDistributedExecutor._compiled_ray_dag.<locals>.<listcomp>4  s    AAAazAAAr-   c                 V    g | ]%\  }}|j                             |                   &S r,   )execute_model_raybind)r   r   r   outputss      r.   r   z<RayDistributedExecutor._compiled_ray_dag.<locals>.<listcomp>8  sB       !6 ,11'!*==  r-   rW   r;   c                 <    g | ]}|                                S ))	transport)with_tensor_transport)r   outputr  s     r.   r   z<RayDistributedExecutor._compiled_ray_dag.<locals>.<listcomp>F  s:       " 44y4II  r-   )register_accelerator_context)RayPPCommunicatorcuda)torch_module_namecommunicator_clszeUsing RayPPCommunicator (which wraps vLLM _PP GroupCoordinator) for Ray Compiled Graph communication.zCUsing Ray's NCCL communicator for Ray Compiled Graph communication.)r   _overlap_gpu_communication)rF   use_rayr
  rD   rE   rl   ray.dagr  r  r_   r`   r   r<   &VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMMr  r   r   r   VLLM_USE_RAY_WRAPPED_PP_COMM,ray.experimental.channel.accelerator_contextr  6vllm.distributed.device_communicators.ray_communicatorr  experimental_compile)rS   r   r  r  channel_typer   tp_grouplast_pp_rankrA   r  r  r  r  r  s              @@@r.   r   z(RayDistributedExecutor._compiled_ray_dag	  s   #++++++--- 	
6>>>666666661J/0	
 	
 	
 	97	
 	
 	
 	97	
 	
 	

 B666OO O O  
 Y[[ !	3J BAAA4+=a+@AAAG%.t/A%B%B  !   %.x%8%8  
  #4#566:l**CuLL
 !% KI   &-  G
 */'22KC!	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3 !	3F , 	           )("(;L    KK8    KKU   //)'+'R 0 
 
 	
s   0B!FF!$F!c                 .    |                                   d S rq   )re   rs   s    r.   __del__zRayDistributedExecutor.__del__h  s    r-   c                     d S rq   r,   rs   s    r.   check_healthz#RayDistributedExecutor.check_healthk  s	     	r-   )r9   N)rG   r   )F)Nr,   NF)&r%   r&   r'   r(   r   r   r7   boolr)   r8   rU   propertyr*   r]   re   r   r+   r   ro   rt   rI   r   r   r   r   r   r   r   r   r   floattupler   r   r
  r   r,  r.  r,   r-   r.   r0   r0   >   s        ((      &'?@HdK= = = =: Y Y Y Y XY$ $ $ $ !d3PS8n ! ! ! !". . .`G `G `G `GD =	     < <)< < 
T	!F+<t+C$D	D	< < < <,  N N.N N 
T	!F+<t+C$D	D	N N N N:  	> >)> /> 	>
 
T	!F+<t+C$D	D> > > >D !%(,< <h< < 	<
 S#X%< < 
cVDI&	&< < < <6  <]
 ]
 ]
 ]
 ]
~       r-   r0   )5rD   collectionsr   collections.abcr   concurrent.futuresr   dataclassesr   typingr   r   r   	vllm.envsr   vllm.loggerr	   vllm.platformsr
   vllm.ray.ray_envr   vllm.utils.network_utilsr   r   r   vllm.v1.core.sched.outputr   r   vllm.v1.enginer   r   vllm.v1.executor.abstractr   vllm.v1.executor.ray_utilsr   r   r   r   vllm.v1.outputsr   	ray.actorr   ray.util.scheduling_strategiesr   ray.util.placement_groupr   r%   r_   r   r)   
set_resultr   r0   r,   r-   r.   <module>rF     s   
				 # # # # # # $ $ $ $ $ $ % % % % % % ! ! ! ! ! ! % % % % % % % %           # # # # # # + + + + + + 1 1 1 1 1 1         
 E D D D D D D D M M M M M M M M . . . . . .            . - - - - -?%%%%%%OOOOOOOK 8777777	X		:@&(( v/$67 B B B      & & & 
 
 
 
 
 
 
 
p p p p pX p p p p pr-   