
    .`i+              
          d Z ddlZddlZddlmZmZmZmZ ddlmZm	Z	m
Z
 ddlmZmZ ddlmZ ddlmZmZ ddlmZ dd	lmZ  ed
          Z ed          Z G d d          ZdefdZ	 dde
eef         dedz  de
ee	e         f         fdZdede
fdZdedefdZerdeedf         fdZdeedf         dee e!ef         df         fdZ"deedf         de#e         fdZ$dS ) ze
Contains helpers related to asynchronous code.

This is similar in concept to the `asyncio` module.
    N)FIRST_COMPLETEDAbstractEventLoopFutureTask)AsyncGenerator	AwaitableCallable)ExecutorThreadPoolExecutor)partial)TYPE_CHECKINGTypeVar)BatchEncoding)	ParamSpecPTc            
           e Zd ZdZ	 	 ddededdfdZd	 Zd
 Zde	j
        dede	j        eeeef         eee         ef         z           fdZde	j        defdZde	j        fdZdededefdZd ZdS )AsyncMicrobatchTokenizerzAsynchronous tokenizer with micro-batching.

    Pulls pending encode/decode requests from a queue and batches them
    up to reduce overhead. A single-thread ThreadPoolExecutor is used
    so the event loop stays responsive.
        Mb`?max_batch_sizebatch_wait_timeout_sreturnNc                     || _         || _        || _        t          j                    | _        i | _        g | _        t          d          | _	        d S )N   )max_workers)
	tokenizerr   r   asyncioget_running_loop_loop_queues_batcher_tasksr   	_executor)selfr   r   r   s       j/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/utils/async_utils.py__init__z!AsyncMicrobatchTokenizer.__init__    sY     #,$8!-//
  	 +- ,:::    c                    K   | j                                         }|                     d|          }|                     | j         |          }|                    |||f           d {V  | d {V S )Nencoder    create_future
_queue_key
_get_queueput)r$   promptkwargsresult_futurekeyqueues         r%   __call__z!AsyncMicrobatchTokenizer.__call__5   s       $
 8 8 : :ooh//
C00ii7888888888""""""""r'   c                    K   | j                                         }|                     d|          }|                     | j         |          }|                    ||f           d {V  | d {V S )Ndecoder*   )r$   	token_idsr0   r1   r2   r3   s         r%   r6   zAsyncMicrobatchTokenizer.decode<   s       $
 8 8 : :ooh//
C00iiM2333333333""""""""r'   loopr2   c                    | j                             |          }|t          j                    x| j         |<   }|d         dk    r#|d         dk    }|                     ||          }n5|d         dk    sJ d|d          d            |                     |          }| j                            |                    |                     |S )	zkGet the request queue for the given operation key, creating a new
        queue and batcher task if needed.Nr   r)   r   otherr6   zUnknown operation type: .)	r!   getr   Queue_batch_encode_loop_batch_decode_loopr"   appendcreate_task)r$   r8   r2   r3   	can_batchcoros         r%   r-   z#AsyncMicrobatchTokenizer._get_queueD   s    
   %%=(/7DL1v!!Fg-	..ui@@1v)))+Oc!f+O+O+O)))..u55&&t'7'7'='=>>>r'   r3   rB   c                 4   K   	 |                                  d{V \  }}}|g}|g}|g} j                                         j        z   }	t	          |           j        k     r|	 j                                        z
  }
|
dk    rn	 t          j        |                                 |
           d{V \  }}}|                    |           |                    |           |s|                    |           n# t          j	        $ r Y nw xY wt	          |           j        k     	 |rt	          |          dk    rt           j        |fi |} j                             j        |           d{V }t          |          D ][\  }|                                sBfd|                                D             }|                    t%          |                     \nm||f fd	} j                             j        |           d{V }t'          ||          D ].\  }}|                                s|                    |           /nE# t(          $ r8}|D ]+}|                                s|                    |           ,Y d}~nd}~ww xY w)z.Batch incoming encode requests for efficiency.TNr   r   c                 (    i | ]\  }}||         S  rF   ).0kvis      r%   
<dictcomp>z?AsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<dictcomp>x   s#    #H#H#H1Aqt#H#H#Hr'   c                 <    fdt          | |          D             S )Nc                 2    g | ]\  }} j         |fi |S rF   )r   )rG   pkwr$   s      r%   
<listcomp>zQAsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<lambda>.<locals>.<listcomp>{   sD     M M M49Arq//B//M M Mr'   )zip)promptsr0   r$   s     r%   <lambda>z=AsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<lambda>{   s<     M M M M=@&=Q=QM M M r'   )r<   r    timer   lenr   r   wait_forr@   TimeoutErrorr   r   run_in_executorr#   	enumeratedoneitems
set_resultr   rQ   	Exceptionset_exception)r$   r3   rB   r/   r0   r1   rR   kwargs_listresult_futuresdeadlinetimeoutbatch_encode_fnresultsfutdata	encode_fnreserJ   s   `                 @r%   r>   z+AsyncMicrobatchTokenizer._batch_encode_loopU   sY     1	-27))++,=,=,=,=,=,=)FFMhG!(K+_Nz((4+DDHg,,!444"TZ__%6%66a<<	:A:J		W; ; 5 5 5 5 5 51FFM NN6***"))-888$ 3#**6222+   E g,,!444-  0W!1!1&-dng&P&P&P&PO$(J$>$>% %      G #,N";"; @ @3"xxzz @#H#H#H#H#H#H#HDNN=+>+>???@
 07{ ! ! ! ! !I %)J$>$>	% %      G %($@$@ 0 0S"xxzz 0NN3/// - - -) - -C88:: -))!,,,- - - - --]1	-s,   
A2C= =DD+D'I 
J.JJc                   K   	 |                                  d{V \  }}|g}|g}| j                                        | j        z   }t	          |          | j        k     r|| j                                        z
  }|dk    rn	 t          j        |                                 |           d{V \  }}|                    |           |                    |           n# t          j	        $ r Y nw xY wt	          |          | j        k     	 | j        
                    | j        | j        j        |           d{V }t          ||          D ].\  }	}
|	                                s|	                    |
           /nE# t"          $ r8}|D ]+}	|	                                s|	                    |           ,Y d}~nd}~ww xY w)z.Batch incoming decode requests for efficiency.TNr   )r<   r    rT   r   rU   r   r   rV   r@   rW   rX   r#   r   batch_decoderQ   rZ   r\   r]   r^   )r$   r3   r7   r1   token_ids_listr`   ra   rb   rd   re   rh   ri   s               r%   r?   z+AsyncMicrobatchTokenizer._batch_decode_loop   s?     	--2YY[['8'8'8'8'8'8$I}'[N+_Nz((4+DDHn%%(;;;"TZ__%6%66a<<5<5E		W6 6 0 0 0 0 0 0,I} #)))444"))-8888+   E n%%(;;;- $
 : :NDN$?! !       !$NG < < , ,HC88:: ,s+++,  - - -) - -C88:: -))!,,,- - - - --7	-s,   AC C10C1A0E> >
G .F;;G opr0   c                     |dk    rdS |                     dd          }|                     dd          }|                     d          }|sd|dd	fS t          | j        d
d	          }||||k    rd|ddfS dS )a  
        Return a normalized key describing operation + kwargs.

        - `add_special_tokens`: {True/False}
        - `truncation`: {True/False}
          - If `truncation` is False (`max_length` is None),
            returns a key for a can_batch queue.
          - If `truncation` is True and `max_length` is None or equals
            `tokenizer.model_max_length`, returns a key for a can_batch queue.
          - Otherwise, returns a key for a cannot_batch queue.

        Examples:
          - Decode: ("decode",)
          - Encode typical:
            ("encode", add_special_tokens, bool_truncation, max_length_label)
          - Fallback: ("encode", "other")
        r6   )r6   add_special_tokensT
truncationF
max_lengthr)   Nmodel_max_length	model_max)r)   r:   )r<   getattrr   )r$   rm   r0   ro   rp   rq   rs   s          r%   r,   z#AsyncMicrobatchTokenizer._queue_key   s    & >>;#ZZ(<dCCZZe44
ZZ--
 	=/<<DN,>EE	)"7J)<S<S/{BB  r'   c                     t          | dd           xrCt          | dd           x}r2|                                s fd}|                    |           d S d S d S d S )Nr"   r    c                  :    D ]} |                                   d S N)cancel)tasktaskss    r%   cancel_tasksz6AsyncMicrobatchTokenizer.__del__.<locals>.cancel_tasks   s+    ! " "DKKMMMM" "r'   )rt   	is_closedcall_soon_threadsafe)r$   r8   r{   rz   s      @r%   __del__z AsyncMicrobatchTokenizer.__del__   s    d$4d;;;U
	4 w555
	4 NN$$
	4" " " " " %%l33333
	4 
	4 
	4 
	4 
	4 
	4r'   )r   r   )__name__
__module____qualname____doc__intfloatr&   r4   r6   r   r   tupler=   strdictr   listr-   boolr>   r?   r,   r~   rF   r'   r%   r   r      sL         !&+	; ; ; $	;
 
; ; ; ;*# # ## # #-49	uS$./%S	68I2JJ	K   "3-gm 3- 3- 3- 3- 3-j -gm  -  -  -  -D!!S !!$ !!5 !! !! !! !!F4 4 4 4 4r'   r   ry   c                     | r=|                                  s+t          |                                 | j                   d S d S d S rw   )rZ   run_in_loopget_looprx   )ry   s    r%   cancel_task_threadsafer      sM     2DIIKK 2DMMOOT[111112 2 2 2r'   funcexecutorr   c                 l     dt           j        dt           j        dt          t                   f fd}|S )z
    Take a blocking function, and run it on in an executor thread.

    This function prevents the blocking function from blocking the
    asyncio event loop.
    The code in this function needs to be thread safe.
    argsr0   r   c                  x    t          j                    }t          g| R i |}|                    |          S )N)r   r   )r   get_event_loopr   rX   )r   r0   r8   p_funcr   r   s       r%   _async_wrapperz"make_async.<locals>._async_wrapper   sG    %''//////##XF#CCCr'   )r   r   r0   r   r   )r   r   r   s   `` r%   
make_asyncr      sV    Daf D DVAY D D D D D D D
 r'   r8   functionc                 z    t          |           r ||  d S |                                 s | j        |g|R   d S d S rw   )in_loopr|   r}   )r8   r   r   s      r%   r   r      s\    t}} 3$^^ 3!!(2T2222223 3r'   
event_loopc                 T    	 t          j                    | k    S # t          $ r Y dS w xY w)NF)r   r   RuntimeError)r   s    r%   r   r      s=    '))Z77   uus    
''itc                 *    |                                  S rw   )	__anext__)r   s    r%   anextr     s    ||~~r'   	iteratorsc            	       
K   t          |           dk    r| d         2 3 d{V }d|fW V  6 dS t          j                    

fdt          |           D             }	 |rt          j        |                                t                     d{V \  }}|D ]a}|                    |          }	 | d{V }|\  }}||
                    t          |                    <   ||fW V  R# t          $ r Y ^w xY w||                                D ]g\  }	\  }}t          j        t                    5  |	                                 |                                 d{V  ddd           n# 1 swxY w Y   hdS # |                                D ]g\  }	\  }}t          j        t                    5  |	                                 |                                 d{V  ddd           n# 1 swxY w Y   hw xY w)zMerge multiple asynchronous iterators into a single iterator.

    This method handle the case where some iterators finish before others.
    When it yields, it yields a tuple (i, item) where i is the index of the
    iterator that yields the item.
    r   r   Nc                 `    i | ]*\  }}                     t          |                    ||f+S rF   )rA   r   )rG   rJ   r   r8   s      r%   rK   z)merge_async_iterators.<locals>.<dictcomp>  s7    UUUuq"duRyy))Ar7UUUr'   )return_when)rU   r   r   rY   waitkeysr   poprA   r   StopAsyncIterationr[   
contextlibsuppressBaseExceptionrx   aclose)r   itemawaitsrZ   _dpairrJ   r   fr8   s             @r%   merge_async_iteratorsr     s      9~~#A, 	 	 	 	 	 	 	$T'MMMMM '#%%DUUUU	)@T@TUUUF" 
	#LOTTTTTTTTTGD!  zz!}}!"777777D EAr:>F4++E"II667T'MMMMM)   D  
	 !,,.. 	" 	"JAw2$]33 " "


iikk!!!!!!!" " " " " " " " " " " " " " "	" 	"&,,.. 	" 	"JAw2$]33 " "


iikk!!!!!!!" " " " " " " " " " " " " " "	"sk   . AE< 29C,+E< ,
C96E< 8C99E< 3/E..E2	5E2	<6G:2/G-	!G:-G11G:4G15G:iteratorc                 N   K   g }| 2 3 d{V }|                     |           6 |S )z6Collect all items from an async generator into a list.N)r@   )r   r[   r   s      r%   collect_from_async_generatorr   1  sR      E       dT Ls   $rw   )%r   r   r   r   r   r   r   collections.abcr   r   r	   concurrent.futuresr
   r   	functoolsr   typingr   r   $transformers.tokenization_utils_baser   typing_extensionsr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rF   r'   r%   <module>r      ss         D D D D D D D D D D D D ? ? ? ? ? ? ? ? ? ? ; ; ; ; ; ; ; ;       ) ) ) ) ) ) ) ) > > > > > > ' ' ' ' ' 'IcNNGCLLB4 B4 B4 B4 B4 B4 B4 B4J2 2 2 2 2 !% 
1a4.o a1o   (3' 38 3 3 3 3) d      .D)    #"q$w'#"E#q&M4'(#" #" #" #"L41H TRSW      r'   