
    .`iK                     6   d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	Z
d dlmZ d dlmZ d dlmZ d dlmZ d d	l	mZ d d
lmZ d dlmZ d dlmZ d dlmZ erd dlmZmZ d dlmZ  ee           Z!dZ"	 d dl#Z#d dl$m%Z% d dl&m'Z' 	 d dl(m)Z) n# e*$ r d dl(m+Z, e,j-        Z)Y nw xY w G d de          Z.dZ/n!# e*$ rZ0dZ# e1e0          Z/dZ.Y dZ0[0ndZ0[0ww xY w G d de          Z2de3fdZ4d Z5dddede1fd Z6d(d"Z7d(d#Z8	 d)ded$e1dz  fd%Z9de:fd&Z;de:fd'Z<dS )*    N)defaultdict)Future)TYPE_CHECKINGUnion)ParallelConfig)get_pp_group)KVOutputAggregator)init_loggercurrent_platform)IntermediateTensorsget_ip)AsyncModelRunnerOutput)WorkerWrapperBase)GrammarOutputSchedulerOutput)ModelRunnerOutputi  )placement_group_table)PlacementGroup)available_resources_per_node)statec                        e Zd ZdZd fdZdefdZdeeee	         f         fdZ
d Zded	         ed
         z  deded
         f         fdZdeeef         fdZdefdZ xZS )RayWorkerWrapperzyRay wrapper for vllm.worker.Worker, allowing Worker to be
        lazily initialized after Ray sets CUDA_VISIBLE_DEVICES.returnNc                 H     t                      j        |i | d| _        d S )NF)super__init__compiled_dag_cuda_device_set)selfargskwargs	__class__s      n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/v1/executor/ray_utils.pyr   zRayWorkerWrapper.__init__-   s/    EGGd-f---
 16D---    c                     t                      S Nr   r    s    r$   get_node_ipzRayWorkerWrapper.get_node_ip5   s    88Or%   c                 "   t          j                                                    }t          j        j        j        }|s$t          dt          j        j        j                  t          j                    	                                |         }||fS )Nz)current platform %s does not support ray.)
rayget_runtime_contextget_node_idvllm	platformsr   ray_device_keyRuntimeErrordevice_nameget_accelerator_ids)r    node_id
device_keygpu_idss       r$   get_node_and_gpu_idsz%RayWorkerWrapper.get_node_and_gpu_ids8   s}    -//;;==G8GJ "?N3?   -//CCEEjQGG##r%   c                     | j         
J d            | j        sIt          j                    rn,| j         j        J t          j        | j         j                   d| _        d S d S )NWorker is not initializedT)workerr   r   is_tpudevice
set_devicer(   s    r$   setup_device_if_necessaryz*RayWorkerWrapper.setup_device_if_necessaryC   sz    
 ;**,G***4 9#*,, D;-999$/0BCCC481119 9r%   execute_model_input)r   r   )r   r   r   r   c                 P   |                                   | j        
J d            t          |          dk    r|\  }}}n|\  }}d }| j        j        J | j        j                            ||          }|                     |          r|||fS t          |t                    r|                                }t                      j
        s|r	|j        rJ ||d f}nJ|H| j        j                            |          }t          |t                    r|                                }|S )Nr9      )r>   r:   lenmodel_runnerexecute_model_is_intermediate_tensors
isinstancer   
get_outputr   is_last_rankreq_idssample_tokens)r    r?   scheduler_outputgrammar_outputintermediate_tensorsoutputs         r$   execute_model_rayz"RayWorkerWrapper.execute_model_rayS   s^    **,,,;**,G***&''1,,' G .2F2F 4G0 .'+$;+777[-;; "6 F ,,V44 @'??&"899 -**,,>>. 1 "7777)>4?1??OO f&<== 1#..00FMr%   varsc                 D    t           j                            |           d S r'   )osenvironupdate)r    rP   s     r$   override_env_varsz"RayWorkerWrapper.override_env_vars}   s    Jd#####r%   c                 ,    t          |t                    S r'   )rF   r   )r    rN   s     r$   rE   z)RayWorkerWrapper._is_intermediate_tensors   s    f&9:::r%   )r   N)__name__
__module____qualname____doc__r   strr)   tuplelistintr7   r>   r   rO   dictrU   boolrE   __classcell__r#   s   @r$   r   r   )   s,       	C 	C	6 	6 	6 	6 	6 	6	 	 	 	 			$%T#Y*? 		$ 		$ 		$ 		$	9 	9 	9 (	!&'I!JMN"O(	 KLN
	(	 (	 (	 (	T	$$sCx. 	$ 	$ 	$ 	$	;d 	; 	; 	; 	; 	; 	; 	; 	;r%   r   c                   8     e Zd ZdZddedz  f fdZddZ xZS )FutureWrapperaR  A wrapper around Ray output reference to meet the interface
    of .execute_model(): The top level (core busy loop) expects .result() api
    to block and return a single output.

    If aggregator is provided, the outputs from all workers are aggregated upon
    the result() call. If not only the first worker's output is returned.
    N
aggregatorc                 d    t                                                       || _        || _        d S r'   )r   r   ref_or_refsre   )r    rg   re   r#   s      r$   r   zFutureWrapper.__init__   s,    &$r%   c                     t          j        | j        |          }| j        |S | j                            |d          S )Ntimeoutr   )output_rank)r+   getrg   re   	aggregate)r    rj   outputss      r$   resultzFutureWrapper.result   sA    '$*G<<<?"N((a(@@@r%   r'   )rW   rX   rY   rZ   r	   r   ro   ra   rb   s   @r$   rd   rd      su         % %0BT0I % % % % % %
A A A A A A A Ar%   rd   r   c                      t           duS )z!Returns True if Ray is available.N)r+    r%   r$   ray_is_availablerr      s    d?r%   c                  D    t           t          dt           d          dS )z+Raise an exception if Ray is not available.NzFailed to import Ray: z+.Please install Ray with `pip install ray`.)r+   
ValueErrorray_import_errrq   r%   r$   assert_ray_availablerv      s6    
{9^ 9 9 9
 
 	
 {r%   placement_groupr   parallel_config
device_strc           
      |   t          j                    s
J d            t          |           }|d         }|d         }t          t                    }|                                D ]&\  }}||                             ||                    't          j                                                    }	|	|vrt          d|	 d| j
         d| d          |                                D ]T\  }}t          |          |j        k     r7t                              d|j        |t          |          |||j                   Ud	S )
zVerify a given placement group has bundles located in the right place.

    There are 2 rules.
    - Warn if all tensor parallel workers cannot fit in a single node.
    - Fail if driver node is not included in a placement group.
    zDRay is not initialized although distributed-executor-backend is ray.bundles_to_node_idbundleszdriver node id z& is not included in a placement group z. Node id -> bundles z. You don't have enough GPUs available in a current node. Check `ray status` and `ray list nodes` to see if you have available GPUs in a node `{driver_node_id}` before starting an vLLM engine.aC  tensor_parallel_size=%d is bigger than a reserved number of %ss (%d %ss) in a node %s. Tensor parallel workers can be spread out to 2+ nodes which can degrade the performance unless you have fast interconnect across nodes, like Infiniband. To resolve this issue, make sure you have more than %d GPUs available at each node.N)r+   is_initializedr   r   r]   itemsappendr,   r-   r1   idrB   tensor_parallel_sizeloggerwarning)
rw   rx   ry   pg_databundle_to_node_idsr|   node_id_to_bundle
bundle_idxr4   driver_node_ids
             r$   _verify_bundlesr      s      N  $O44G !56i G;Ft;L;L17799 ? ?
G'"))'**=>>>>,..::<<N...Pn P P$'P P P P P
 
 	
 .3355  w<</>>>NN7  4G4   r%   current_placement_groupc           
         | j         }t          j                    }|                                 }d}t          j                    |z
  t          k     rt	          j        |g|          \  }}t          |          dk    rna|dz  }t                              dt          t          j                    |z
            |           t          j                    |z
  t          k     	 t	          j
        |d           dS # t          j        j        $ r[ t          d |D                       }|dk    rt          d| d	| d
t           d          dt          d|dt           d          dw xY w)zWait until a placement group is ready.

    It prints the informative log messages if the placement group is
    not created within time.

    
   ri   r      a6  Waiting for creating a placement group of specs for %d seconds. specs=%s. Check `ray status` and `ray list nodes` to see if you have enough resources, and make sure the IP addresses used by ray cluster are the same as VLLM_HOST_IP environment variable specified in each node if you are running on a multi-node.c              3   B   K   | ]}|                     d d          V  dS )GPUr   N)rl   ).0specs     r$   	<genexpr>z'_wait_until_pg_ready.<locals>.<genexpr>  s0       V V%!3!3 V V V V V Vr%      z+Cannot provide a placement group requiring z GPUs (placement_group_specs=z	) within z seconds.
Tensor parallel size may exceed available GPUs in your cluster. Check resources with `ray status` and `ray list nodes`.
If running on K8s with limited GPUs, consider reducing --tensor-parallel-size to match available GPU resources.Nz:Cannot provide a placement group of placement_group_specs=z within z^ seconds. See `ray status` and `ray list nodes` to make sure the cluster has enough resources.)bundle_specstimereadyPG_WAIT_TIMEOUTr+   waitrB   r   infor^   rl   
exceptionsGetTimeoutErrorsumrt   )r   placement_group_specsspg_ready_refwait_intervalr   _total_gpu_requireds           r$   _wait_until_pg_readyr      s    4@	A*0022LM
)++/O
+
+8\NMBBBqu::>> 	J 	a  !		
 		
 		
 )++/O
+
+$a((((((>)     V V@U V V VVV !!L%L L*?L L #L L L
 
 
 ((( ("( ( (  +s    C8 8A/E'c                    t           j                            |            t          j                    }d}t          j                    |z
  t          k     rt           j                                        }|d S |dz  }t                              dt          t          j                    |z
                       t          j	        |           t          j                    |z
  t          k     d S d S )Nr   r   z?Waiting for removing a placement group of specs for %d seconds.)
r+   utilremove_placement_groupr   r   get_current_placement_groupr   r   r^   sleep)r   r   r   pgs       r$   _wait_until_pg_removedr   '  s    H##$;<<<	AM
)++/O
+
+X1133:E 	M	a  	
 	
 	
 	
=!!! )++/O
+
+
+
+
+
+r%   ray_addressc                    t                       ddlm}  |j                    rN| j        dk    rCddlm}  |            }| j        |k    r(t                              d| j        ||| j                   t          j
                    rt                              d           n |j                    s |j                    ra	 t          j        d           nf# t          $ r> t                              d           t          j        || j        | j        	           Y nw xY wt          j        || j        
           |j        st%          d|j         d          | j        r| j        }nt          j                                        }|rt                              d           |j        }d}|D ]8}|                    d          }	|	dk    rt%          d d          |	r|dz  }9| j        |k    r!t%          d d d| j         d| d	          nIt                              d           t          j                                        d          }
| j        |
k    rt                              d           fdt5          | j                  D             }t7                      }t          j                                                    }t=                      |         }|                    d          dk     r"t%          d d|d d d|d|d          d|d         d| <   t          j                            |d           }t?          |           |J tA          ||            || _        d!S )"a  Initialize the distributed cluster with Ray.

    it will connect to the Ray cluster and create a placement group
    for the workers, which includes the specification of the resources
    for each distributed worker.

    Args:
        parallel_config: The configurations for parallel execution.
        ray_address: The address of the Ray cluster. If None, uses
            the default Ray cluster address.
    r   r   r   )cuda_device_count_statelesszTensor parallel size (%d) exceeds available GPUs (%d). This may result in Ray placement group allocation failures. Consider reducing tensor_parallel_size to %d or less, or ensure your Ray cluster has %d GPUs available.z8Ray is already initialized. Skipping Ray initialization.autoz_No existing RAY instance detected. A new instance will be launched with current node resources.)addressnum_gpusruntime_env)r   r   zcurrent platform z does not support ray.z"Using the existing placement groupz/Placement group bundle cannot have more than 1 .zThe number of required z(s exceeds the total number of available z6s in the placement group. Required number of devices: z. Total number of devices: zANo current placement group found. Creating a new placement group.z\The number of required %ss exceeds the total number of available %ss in the placement group.c                     g | ]}d iS )g      ?rq   )r   r   ry   s     r$   
<listcomp>z*initialize_ray_cluster.<locals>.<listcomp>  s+     9
 9
 9
"#Z9
 9
 9
r%   zCurrent node has no z" available. current_node_resource=z#. vLLM engine cannot start without z . Make sure you have at least 1 z% available in a node current_node_id=z current_ip=gMbP?znode:PACK)strategyN)!rv   vllm.platformsr   is_cuda
world_sizevllm.utils.torch_utilsr   r   r   r+   r}   r   is_rocmis_xpuinitConnectionErrorray_runtime_envr0   rt   r2   rw   r   r   r   rl   cluster_resourcesranger   r,   r-   r   r   r   )rx   r   r   r   available_gpusr   r|   device_bundlesbundlebundle_devicesnum_devices_in_clusterr   
current_ipcurrent_node_idcurrent_node_resourcery   s                  @r$   initialize_ray_clusterr   9  s    //////  !! o&@1&D&DFFFFFF4466%66NND  **	 	 	  SNOOOO	!		!	#	# S'>'7'>'@'@ S	HV 		 		 		NNO   H#(3+;     		 	/2QRRRR!0J 
T 0 <TTT
 
 	

 & I"1"A"%("F"F"H"H =68999 *6 	$ 	$F#ZZ
A66N!! SjSSS    $!#%66>* > >'1> >/>/I> > -;> > >   7 	WXXX!$!6!8!8!<!<Z!K!K %(>>>NNB	  9
 9
 9
 9
',_-G'H'H9
 9
 9
 XX
133??AA < > > O $$Z33a77Iz I I(I II I?II I (7I I ;EI I I   :?a !5!5!56 #&(":":!F #; #
 #
 	4555"...+_jIII&=O###s    C ADDc                      ddl m}  t          j                    }t	          |d                   }|                                 }||z  dk    sJ ||z  S )Nr   )TPUAcceleratorManagerTPU)ray._private.acceleratorsr   r+   r   r^   !get_current_node_num_accelerators)r   r   
total_tpustpus_per_nodes       r$   get_num_tpu_nodesr     sh    ??????-//&u-..J)KKMMM%****&&r%   c                     t           j                                        } t           j                                        }d}|rt	                      }|                                 D ]W\  }}||j                                        k    r5|d                                         D ]\  }}|                    |           Xt          |          }|S )Nr   r{   )
r+   r   r   r   setr~   r   hexaddrB   )pg_table
current_pg	num_nodesnodes_in_pgpg_keyr   r   nodes           r$    get_num_nodes_in_placement_groupr     s    x--//H5577JI %ee"..** 	* 	*JFB**,,,,!"67==?? * *GAtOOD))))$$	r%   )r   r   r'   )=rR   r   collectionsr   concurrent.futuresr   typingr   r   r   r.   vllm.configr   vllm.distributedr   /vllm.distributed.kv_transfer.kv_connector.utilsr	   vllm.loggerr
   r   vllm.sequencer   vllm.utils.network_utilsr   vllm.v1.outputsr   vllm.v1.worker.worker_baser   vllm.v1.core.sched.outputr   r   r   rW   r   r   r+   ray.utilr   ray.util.placement_groupr   ray._private.stater   ImportErrorr   _state_available_resources_per_noder   ru   er[   rd   r`   rr   rv   r   r   r   r   r^   r   r   rq   r%   r$   <module>r      s   
			  # # # # # # % % % % % % ' ' ' ' ' ' ' '     & & & & & & ) ) ) ) ) ) N N N N N N # # # # # # + + + + + + - - - - - - + + + + + + 2 2 2 2 2 2 8 8 8 8 8 8 2HHHHHHHH111111	X		nJJJ......777777LCCCCCCC L L L666666'-'K$$$	LX; X; X; X; X;, X; X; X;t NN   
C SVVNA A A A AF A A A,$    

 
 
2%28F2TW2 2 2 2j> > > >B" " " "( #D> D>#D>tD> D> D> D>N'3 ' ' ' '#      s<   5B6 B B6 B"B6 !B""B6 6C;CC