
    .`iL                     4   d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZmZ d d
lmZmZ d dlmZ d dl m!Z!m"Z"m#Z# d dl$m%Z% d dl&m'Z'  ee(          Z) G d de          Z* G d de*          Z+dS )    N)Callable)FutureThreadPoolExecutor)cached_property)Lock)Any)init_logger)get_distributed_init_methodget_ipget_open_port)GrammarOutputSchedulerOutput)ReconfigureDistributedRequestReconfigureRankType)Executor)AsyncModelRunnerOutputDraftTokenIdsModelRunnerOutput)
run_method)WorkerWrapperBasec                   6   e Zd ZddZdeeeef         fdZedefd            Z		 	 	 	 	 ddee
z  d	edz  d
ededz  dededefdZ	 ddedededz  eedz           z  fdZ	 ddedz  dededz  eedz           z  fdZdedz  fdZddZdeddfdZddZdS )UniProcExecutorreturnNc           	         t          d          | _        |                                 \  }}}t          | j        |||dt                                }d| _        | j        dk    rt          dd          | _        | j        	                    |g	           | j        
                                 | j                                         dS )
)Initialize the worker and load the model.r   )rpc_rankT)vllm_config
local_rankrankdistributed_init_methodis_driver_workershared_worker_lockN   WorkerAsyncOutput)max_workersthread_name_prefix)
all_kwargs)r   driver_worker_distributed_argsdictr   r   async_output_threadmax_concurrent_batchesr   init_workerinit_device
load_model)selfr    r   r   kwargss        u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/v1/executor/uniproc_executor.py_init_executorzUniProcExecutor._init_executor   s    .:::484J4J4L4L1z(!$;!#vv
 
 
 ?C &**'92E( ( (D$ 	&&6(&;;;&&(((%%'''''    c                    t          t                      t                                }| j        j        j                                                            d          }t          |          dk    rt          |d                   nd}|d|fS )z3Return (distributed_init_method, rank, local_rank).:r#   r   )
r
   r   r   r   device_configdevice__str__splitlenint)r0   r    device_infor   s       r2   r)   z!UniProcExecutor._distributed_args2   sw    "=fhh"X"X&4;CCEEKKCPP,/,<,<q,@,@SQ(((a
&:55r4   c                 "    | j         j        rdndS )N   r#   )scheduler_configasync_schedulingr0   s    r2   r,   z&UniProcExecutor.max_concurrent_batches:   s    ):AqqAr4    Fmethodtimeoutargsr1   	non_blocksingle_valuec                 h   |i }|st          | j        |||          |rngS 	 t          | j        |||          t          t                    rf| j        x}I|r|                    j                  S dt          t                   ffd}|                    |          S                                 t          t                               }	|	
                    |rng           nE# t          $ r8}
t          t                               }	|	                    |
           Y d }
~
nd }
~
ww xY w|	S )Nr   c                  .                                      gS N)
get_output)results   r2   get_output_listz7UniProcExecutor.collective_rpc.<locals>.get_output_listU   s     & 1 1 3 344r4   )r   r(   
isinstancer   r+   submitrL   listr   r   
set_result	Exceptionset_exception)r0   rD   rE   rF   r1   rG   rH   async_threadrN   futureerM   s              @r2   collective_rpczUniProcExecutor.collective_rpc>   so    >F 	8 2FD&IIF)766x7	$ 2FD&IIF&"899 	-$($<<LI# F+2263DEEE5T#Y 5 5 5 5 5 5 (..???**,,C[]]FBff6(CCCC 	$ 	$ 	$C[]]F  ########	$ s%   AC- 8,C- %AC- -
D/7.D**D/scheduler_outputc                 6    |                      d|f|d          S )Nexecute_modelTrF   rG   rH   rX   )r0   rY   rG   s      r2   r[   zUniProcExecutor.execute_modela   s0     """$	 # 
 
 	
r4   grammar_outputc                 6    |                      d|f|d          S )Nsample_tokensTr\   r]   )r0   r^   rG   s      r2   r`   zUniProcExecutor.sample_tokensk   s0     "" "	 # 
 
 	
r4   c                 0    |                      dd          S )Ntake_draft_token_idsT)rH   r]   rB   s    r2   rb   z$UniProcExecutor.take_draft_token_idsu   s    ""#9"MMMr4   c                     d S rK   rC   rB   s    r2   check_healthzUniProcExecutor.check_healthx   s	     	r4   reconfig_requestc                     | j                             |           |j        t          j        k    r|                                  d S d S rK   )r(   reinitialize_distributednew_data_parallel_rankr   SHUTDOWN_CURRENT_RANKshutdown)r0   re   s     r2   rg   z(UniProcExecutor.reinitialize_distributed}   sO     	334DEEE3"89 9 MMOOOOO9 9r4   c                 D    | j         x}r|                                 d S d S rK   )r(   rj   )r0   workers     r2   rj   zUniProcExecutor.shutdown   s2    ''6 	OO	 	r4   r   N)NrC   NFF)F)__name__
__module____qualname__r3   tuplestrr<   r)   r   r,   r   floatr*   boolr   rX   r   r   r   r[   r   r`   r   rb   rd   r   rg   rj   rC   r4   r2   r   r      s       ( ( ( (.65c3#7 6 6 6 6 B B B B _B !%""! !h! ! 	!
 t! ! ! 
! ! ! !H DI
 
 /
<@
	T	!F+<t+C$D	D
 
 
 
 GL
 
+d2
?C
	T	!F+<t+C$D	D
 
 
 
Nmd&: N N N N   
 =	        r4   r   c                   ^     e Zd ZdZd fdZdeeeef         fdZde	e         f fdZ
 xZS )ExecutorWithExternalLaunchera  An executor that uses external launchers to launch engines,
    specially designed for torchrun-compatible launchers, for
    offline inference with tensor parallelism.

    see https://github.com/vllm-project/vllm/issues/11400 for
    the motivation, and examples/offline_inference/torchrun_example.py
    for the usage example.

    The key idea: although it is tensor-parallel inference, we only
    create one worker per executor, users will launch multiple
    engines with torchrun-compatible launchers, and all these engines
    work together to process the same prompts. When scheduling is
    deterministic, all the engines will generate the same outputs,
    and they don't need to synchronize the states with each other.
    r   Nc                 t    t           j        r
J d            t                                                       dS )r   zKTo get deterministic execution, please set VLLM_ENABLE_V1_MULTIPROCESSING=0N)envsVLLM_ENABLE_V1_MULTIPROCESSINGsuperr3   )r0   	__class__s    r2   r3   z+ExecutorWithExternalLauncher._init_executor   sC    6 	
 	
:	
 	
6 	     r4   c                     d}t          t          j        d                   }t          t          j        d                   }|||fS )Nzenv://RANK
LOCAL_RANK)r<   osenviron)r0   r    r   r   s       r2   r)   z.ExecutorWithExternalLauncher._distributed_args   s?     #+2:f%&&L122
&j88r4   c                 (   t                                                      }ddlm}  |            j        }t          j        |gdt
          j                  }t          j	        ||t          j
        j                   |                                gS )Nr   )get_world_groupcpu)r8   dtype)groupop)rz   determine_available_memoryvllm.distributed.parallel_stater   	cpu_grouptorchtensorint64dist
all_reduceReduceOpMINitem)r0   memoryr   r   memory_tensorr{   s        r2   r   z7ExecutorWithExternalLauncher.determine_available_memory   s    3355CCCCCC#O%%/	fXe5;OOOY4=;LMMMM""$$%%r4   rm   )rn   ro   rp   __doc__r3   rq   rr   r<   r)   rQ   r   __classcell__)r{   s   @r2   rv   rv      s          ! ! ! ! ! !95c3#7 9 9 9 9&DI & & & & & & & & & &r4   rv   ),r   collections.abcr   concurrent.futuresr   r   	functoolsr   multiprocessingr   typingr   r   torch.distributeddistributedr   	vllm.envsrx   vllm.loggerr	   vllm.utils.network_utilsr
   r   r   vllm.v1.core.sched.outputr   r   vllm.v1.enginer   r   vllm.v1.executor.abstractr   vllm.v1.outputsr   r   r   vllm.v1.serial_utilsr   vllm.v1.worker.worker_baser   rn   loggerr   rv   rC   r4   r2   <module>r      s   
			 $ $ $ $ $ $ 9 9 9 9 9 9 9 9 % % % % % %                                      # # # # # # W W W W W W W W W W D D D D D D D D M M M M M M M M . . . . . . T T T T T T T T T T + + + + + + 8 8 8 8 8 8	X		o o o o oh o o od.& .& .& .& .&? .& .& .& .& .&r4   