
     `i                        d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlZd dlmZ d dlmZ d	d
lmZ d	dlmZ d	dlmZ d	dlmZmZmZ ddlmZ ddlmZmZmZm Z m!Z! ddl"m#Z#m$Z$m%Z% 	 d dej&        dej&        dej&        de'ddf
dZ(e G d d                      Z) e             G d d                      Z* e             G d d                      Z+ G d d          Z,dS )!    N)	dataclass)partial)count)perf_counter)OptionalUnion)nn)tqdm   )PretrainedConfig)GenerationConfig)logging)ContinuousBatchProcessorMetricsattach_tracertraced   )PagedAttentionCache)GenerationOutputRequestStateRequestStatusget_device_and_memory_breakdownlogger)SCHEDULER_MAPPINGFIFOScheduler	Schedulerattention_maskcumulative_seqlens_qcumulative_seqlens_ksliding_windowreturnc                 j   t          j        | j                  j        }t	          t          |          dz
            D ]}||dz            ||         z
  }||dz            ||         z
  }||k     r|dk    r	||z
  dz   }nd}t          ||         ||dz                      }	t          ||         ||dz                      }
t          j        | d|	|
f         j        || j        | j	                  }t          j
        ||          }|dk    r!||z
  |z
  }|t          j        ||          z  }|| d|	|
f<   dS )u  Builds an attention mask inplace using the cumulative seqlens of the query and key. If given a sliding window, it
    will also apply a sliding window mask on top. The attention mask is not boolean, it uses zeroes and -inf (or its
    equivalent) so it's more of an attention score bias tensor.
    The attention mask is a block-diagonal matrix, with each block an attention mask for a single query-key pair.
    Each of those block is built from a causal mask and, if there is a sliding window, a sliding window mask.

    An example is represented below, with seqlen_k = 8, seqlen_q = 4 and sliding_window = 6:

    CAUSAL MASK:

           █ █ █ █ █ ░ ░ ░
           █ █ █ █ █ █ ░ ░
           █ █ █ █ █ █ █ ░
           █ █ █ █ █ █ █ █

    SLIDING WINDOW MASK:
         ┌──────────────────────── seqlen_k - seqlen_q - sliding_window = 8 - 4 - 6 = -2 offset to the right
       <─┴─>
     ░ █ | █ █ █ █ █ █ █ █
     ░ ░ | █ █ █ █ █ █ █ █
     ░ ░ | ░ █ █ █ █ █ █ █
     ░ ░ | ░ ░ █ █ █ █ █ █

    ATTENTION MASK (sum of causal and sliding window masks):

           █ █ █ █ █ ░ ░ ░
           █ █ █ █ █ █ ░ ░
           ░ █ █ █ █ █ █ ░
           ░ ░ █ █ █ █ █ █

    Another example with seqlen_k = 5, seqlen_q = 3 and sliding_window = 2:

    CAUSAL MASK:

           █ █ █ ░ ░
           █ █ █ █ ░
           █ █ █ █ █

    SLIDING WINDOW MASK:
         ┌──────────────────────── seqlen_k - seqlen_q - sliding_window = 5 - 3 - 2 = 0 offset to the right
        <┴>
         | ░ █ █ █ █
         | ░ ░ █ █ █
         | ░ ░ ░ █ █

    ATTENTION MASK (sum of causal and sliding window masks):

           ░ █ █ ░ ░
           ░ ░ █ █ ░
           ░ ░ ░ █ █

    r   .dtypedevice)diagonalN)torchfinfor#   minrangelenslicefullshaper$   triutril)r   r   r   r   	min_valueiseqlen_qseqlen_kcausal_diagonalquery_range	key_range	minus_infmaskedsliding_diagonals                 /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/transformers/generation/continuous_batching/continuous_api.pybuild_attention_maskr;   %   sx   t N0115I3+,,q011 = ='A.1Ea1HH'A.1Ea1HHh8q==&1A5OOO035I!a%5PQQ.q13GA3NOO	J3Y67= &!(	
 
 
	 I@@@A'(2^Cej5EFFFFF6<sK233-= =    c                       e Zd ZU ej        ed<   eej                 ed<   ej        ed<   ej        ed<   ej        ed<   eed<   eed<   eej                 ed<   eej                 ed	<   ej        ed
<   e	ed<   dZ
eed<   dS )PagedAttentionArgs	input_idsr   position_idsr   r   max_seqlen_qmax_seqlen_kwrite_index
read_indexlogits_indicescacheF	use_cacheN)__name__
__module____qualname__r&   Tensor__annotations__r   intlistr   rG   bool r<   r:   r>   r>   y   s         |U\****,,&&&,&&&el####U\""""L   Itr<   r>   c                      e Zd Z	 	 	 d-dedededej        dej        dej	        d	e
j        d
e
j        dededededdfdZ ed          deddfd            ZdefdZe e
j                    d.defd                        ZdefdZd Zed             Zedefd            Zedefd            Zedee         dee         d eee                  d!eee                  d"ee         d#eee         ee ee         f         f         d$ee         ddfd%            Z!ed&             Z"eded'efd(            Z#ed)             Z$edefd*            Z%ed+             Z&ed,             Z'dS )/ContinuousBatchProcessorFTrF   configgeneration_configinput_queueoutput_queue
stop_eventmodel_devicemodel_dtype	scheduler	streamingmanual_evictionslice_inputsr    Nc                    || _         || _        || _        || _        || _        || _        || _        || _        |	| _        |
| _	        || _
        || _        t          |dd          dn|j        | _        g | _        |j        | _        t!          |j                  | _        d| _        d| _        d| _        |                     |j                   dS )a  Initialize the continuous batch processor.

        Args:
            cache: A [`PagedAttentionCache`] object
            config: The model configuration
            generation_config: The generation configuration
            input_queue: Queue for incoming requests
            output_queue: Queue for outgoing results
            stop_event: Event to signal processing should stop
            model_device: Device for model inputs/outputs
            model_dtype: Data type for model inputs/outputs
            scheduler: The [`Scheduler`] to use
            streaming: Whether to stream tokens as they're generated
            manual_eviction: Whether to manually evict blocks from the cache
            slice_inputs: Whether to slice the inputs to the model
        r   Nr   r   )rF   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   getattrr   requests_in_batchmax_batch_tokensr   metricstotal_query_lengthtotal_key_lengthtotal_batch_sizesetup_static_tensors
num_groups)selfrF   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   s                r:   __init__z!ContinuousBatchProcessor.__init__   s    > 
!2&($(&"".( $+63CT#J#J#RaaX^Xm57 !& 66u7MNN #$ ! !!!%"233333r<   )
standalonerg   c                 
     j          j        j         j        j        z  t          j         j        d _        t	          j        dffi  j         _	        t	          j        dffi  j         _
        t	          j        dz   ffi  j         _        d _        t	          j        ffi  j         _        t	          j        dffi  j         _        t           j        dd           }| t           j        dd          }|dv rdgndg}t#          t%          |                    } fd	|D              _        t(                              |d           _                                         r+ddz   f j         j        d
fd|D              _        nd  _         fdt5          |          D              _         fdt5          |          D              _                             d           d S )Nr"   r   r   layer_typesr   )r   Nfull_attentionsliding_attentionc                 F    i | ]}|t          j        d z   fi j        S r   r&   emptytensor_metadata).0
layer_typeTrh   s     r:   
<dictcomp>zAContinuousBatchProcessor.setup_static_tensors.<locals>.<dictcomp>   sA     %
 %
 %
ISJQUDDt/CDD%
 %
 %
r<   )sizer#   r$   c                 4    i | ]}|t          j        d i S )rP   )r&   rr   )rt   ru   attn_mask_kwargss     r:   rw   zAContinuousBatchProcessor.setup_static_tensors.<locals>.<dictcomp>   s-    "m"m"mS]:u{/N/N=M/N/N"m"m"mr<   c                 @    g | ]}t          j        ffi j        S rP   rq   )rt   _rv   rh   s     r:   
<listcomp>zAContinuousBatchProcessor.setup_static_tensors.<locals>.<listcomp>   s1    #i#i#iRSEK$M$M8L$M$M#i#i#ir<   c                 D    g | ]}t          j        z   fi j        S rP   rq   )rt   r|   rv   	num_pagesrh   s     r:   r}   zAContinuousBatchProcessor.setup_static_tensors.<locals>.<listcomp>   s3    "s"s"s\]5;	A#W#W$BV#W#W"s"s"sr<   T)
full_reset)ra   rF   
num_blocks
block_sizer&   int32rX   rs   rr   r?   r@   r   rA   rE   
output_idsr_   rS   rN   setr   dictfromkeysrB   return_attention_maskrY   r   r)   write_index_storageread_index_storagereset_static_tensors)rh   rg   rl   r   rv   rz   r   s   `   @@@r:   rf   z-ContinuousBatchProcessor.setup_static_tensors   sk   !J)DJ,AA	).@QRR aVDDt/CDD!KAGG$2FGG$)KQ$Q$QD<P$Q$Q!#k1$GG$2FGG+q!fEE0DEE dk=$??$T[2BAFFN0>)0K0K+,,ReQfK3{++,,%
 %
 %
 %
 %
Wb%
 %
 %
! !MM+q99%%'' 	'Aq)a-0)+   
 #n"m"m"mal"m"m"mD"&D $j#i#i#i#iW\]gWhWh#i#i#i "s"s"s"s"s"safgqarar"s"s"s 	!!T!22222r<   c                 "    | j         j        dk    S )Npaged_attention)rS   _attn_implementationrh   s    r:   r   z.ContinuousBatchProcessor.return_attention_mask   s    {/3DDDr<   r   c                    |s| j         sa| j        d                             d          }| j        d                             d          }| j        d                             d          }n| j        }| j        }| j        }| j        ddd|f                                          | j	        ddd|f                                          | j
        d|dz                                             d| _        | j        d|                             d           | j        ddd|f                             d           | j        D ]}| j        |         d|dz                                             d| j        |<   | j        N| j        |         ddddd|d|f                             t%          j        | j                  j                   t-          | j        j                  D ]U}| j        |         d|                             d           | j        |         d||z                                d           VdS )zReset static tensors for the next batch. In between batches, reset only the parts that were used in the last
        batch, but for initialisation, we can reset everything using the (full_reset) flag.r   Nr   )r]   r   rx   r   rc   rd   re   r?   zero_r@   r   rA   rE   fill_r   r   rB   r   r&   r'   rY   r(   r)   rF   rg   )rh   r   q_lenk_lenb_sizeru   r1   s          r:   r   z-ContinuousBatchProcessor.reset_static_tensors   s[     	+T. 	+,Q/44R88E+A.33B77E-a055a88FF+E)E*F 	qqq&5&y!'')))!!!VeV)$**,,,!,FQJ,/55777FUF#))"---6E6	"((,,, 3 	o 	oJ%j1,FQJ,?EEGGG,-Dj)".#J/111fuffuf0DEKKEKX\XhLiLiLmnnn tz,-- 	B 	BA$Q'/55b999#A&7==bAAAA	B 	Br<   c                 v   | j         r| j        n| j        d                             d          }| j         r| j        n| j                            d          dz
  }| j        ddd|f         | j        ddd|f         | j        d|dz            | j        | j	        d|         i i i | j
        | j        | j        dd}t          | j                                                  }t!          |          dk    r| j                                        D ]\  }}|d|dz            |d         |<   | j        |         |d         |<   | j        R| j         r||         n| j        |                             d          }| j        |         d	d|d|f         |d
         |<   n|d         }| j        |         d|dz            |d<   | j        |         |d<   | j        Y| j        |         |         }| j         r|n| j        |                             d          }| j        |         d	d|d|f         |d
<   | j        d|d
<   |S )z2Get model keyword arguments for the current batch.r   r   r   NF)r?   r@   cu_seq_lens_qrA   rE   cu_seq_lens_krB   r   rD   rC   rF   rG   r   rB   .r   )r]   rc   r   rx   re   r   r?   r@   rA   rE   rD   rC   rF   rN   r   keysr*   itemsrB   r   )rh   r   r   kwargsrl   ru   	seqlens_kr   s           r:   get_model_kwargsz)ContinuousBatchProcessor.get_model_kwargs  s    ,0+<f''$BZ[\B]BbBbceBfBf*.*;g&&AZA_A_`bAcAcfgAg 6E6	2 -aaa%i8!6|!|D -"1&5&9 /+Z
 
  4499;;<<{a)-)B)H)H)J)J p p%
I6?&1*6M'
3595Fz5R~&z2&2151BpIf--H[\fHgHlHlmoHpHpE;?;Nz;Z[^`faf`fhninhn[n;oF+,Z8p %QJ&*&?
&KLfWXjL&YF?#%)%6z%BF>"".1*=fE!%!2`8KJ8W8\8\]_8`8`+/+>z+J3PVQVPVX^Y^X^K^+_'(&'+F#$r<   c           	          d| j          d| j         d| j        j         d| j        j         d	|                                                                 z   S )Nz%ContinuousBatchProcessor(input_queue=z, output_queue=z, active_requests=z, waiting_requests=))rU   rV   rZ   active_requestswaiting_requestsr   __repr__r   s    r:   r   z!ContinuousBatchProcessor.__repr__E  s    uD4D u uUYUf u u#~=u uRVR`Rqu u u##%%..001	
r<   c                    | j                                         s	 | j                                         }|6| j                            |           nz# t
          j        $ r Y dS t          $ r\}t          j	        d| d           t                                          d          }||                     ||           Y d}~nd}~ww xY w| j                                         dS dS )z?Pull new requests from the input queue and add to waiting list.NzError processing new request: Texc_infostate)rU   rr   
get_nowaitrZ   add_waiting_requestqueueEmpty	Exceptionr   errorlocalsget_handle_request_error)rh   r   es      r:   _get_new_requestsz*ContinuousBatchProcessor._get_new_requestsL  s    "((** 	99(3355=2259999;    9 9 9AaAADQQQQ&,hhll7&;&;$..q%888	9 "((** 	9 	9 	9 	9 	9s#   A A C	$	C	-ACC	r   c                 ~   t           j        |_        t          |          |_        t          |j        t                    r%| j                            |j                  |_	        ng |_	        | j
                            |j        |j                   | j                            |                                           dS )z(Handle general request processing error.N)r   FAILEDstatusstrr   
isinstance
request_idrZ   !get_active_request_static_outputsstatic_outputsrb   record_request_completioncreated_timerV   putto_generation_output)rh   r   r   s      r:   r   z.ContinuousBatchProcessor._handle_request_error^  s     %+%jj e&,, 	&#'>#S#STYTd#e#eE  #%E ..u/A5CSTTTe88::;;;;;r<   c                 *	   |                                   | j                                         | j                                        sdS | j                            t          | j        j                  t          | j        j                             | j        	                    | j
                  | _        | j        sdS | j                            | j                   |                                  d| _        d| _        d| _        g }g }dg}g }t#          | j        t&                    rd | j        D             ndgd t)          | j        j                  D             }d t)          | j        j                  D             }| j        D ]}|j        }t          |j                  }	| j                            |j        ||	          }
| xj        |	z  c_        | xj        t7          |
                                          z  c_        | xj        dz  c_        |xj        |	z  c_        |                    |j                   |                    t)          |||	z                        |                    |d         |	z              t7          | j        |	          | _        |j         s|                    |d         dz
             |
!                                D ]R\  }}|                             |         d         |z              t7          | j"        |         |          | j"        |<   S| j        #                    |j        ||	|           | j        $                    |j        ||	|           | %                    ||||||           | j        &                    | j                   tO          j(        tR          j*                  rt#          | j        t&                    r!t7          fd| j        D                       }nd         }tO          j+        d	t          | j                   d
t          | j        j                   dt          | j        j                   d|d          d| d| j        ,                                            dS )zPrepare tensors and metadata for the next model forward pass. Returns True if there are requests to process,
        False otherwise.Fr   c                     i | ]}|d gS )r   rP   )rt   ru   s     r:   rw   z?ContinuousBatchProcessor.prepare_next_batch.<locals>.<dictcomp>  s    #`#`#`
J#`#`#`r<   c                     g | ]}g S rP   rP   rt   r|   s     r:   r}   z?ContinuousBatchProcessor.prepare_next_batch.<locals>.<listcomp>  s    ???Qb???r<   c                     g | ]}g S rP   rP   r   s     r:   r}   z?ContinuousBatchProcessor.prepare_next_batch.<locals>.<listcomp>  s    @@@ar@@@r<   r   r   c              3   4   K   | ]}|         d          V  dS )r   NrP   )rt   ru   r   s     r:   	<genexpr>z>ContinuousBatchProcessor.prepare_next_batch.<locals>.<genexpr>  s.      jj*-j9"=jjjjjjr<   zScheduled: z, Waiting: z
, Active: z	. cum Q: z
. cum KV: z, free blocks: T)-r   rZ   clear_cancelled_requestshas_pending_requestsrb   record_queue_metricsr*   r   r   schedule_batchra   r`   record_batch_metricsr   rc   rd   re   r   r   r   r)   rF   rg   position_offset
prompt_idsget_seqlens_kr   maxvaluesextendappendrA   remaining_prompt_idsr   rB   extend_read_indicesextend_write_indices_build_tensorsrecord_kv_cache_memory_metricsr   isEnabledForr   DEBUGdebugget_num_free_blocks)rh   r?   r@   r   rE   rD   rC   r   past_lengthquery_lengthr   ru   layer_type_seqlen_kckr   s                 @r:   prepare_next_batchz+ContinuousBatchProcessor.prepare_next_batchm  s    	   //111~2244 	5))#dn.L*M*MsSWSaSrOsOsttt "&!>!>t?T!U!U% 	5))$*@AAA 	!!### #$ ! !	 !sd/66 	'#`#`dF_#`#`#`  $%3 ??%
(=">">???
@@5)>#?#?@@@ + 	f 	fE/Ku/00L
001A;P\]]I ##|3##!!S)9)9););%<%<<!!!!Q&!!!!\1!! U-...k;3M N NOOO ''(<R(@<(OPPP #D$5| D DD- D%%&:2&>&BCCC3<??3D3D h h/
/$Z0778LZ8XY[8\_r8rsss03D4Ej4QSf0g0g!*--J**5+;[,XbcccJ++E,<k<Ydeeee 	  	
 	
 	
 	33DJ???w}-- 		$3T:: .jjjjPTPijjjjj)"-LQc$"899 Q Qc$.JiFjFj Q Qt~=>>Q QI]^`IaQ QQ Q.2j.L.L.N.NQ Q  
 tr<   r?   r@   rD   rC   r   r   rE   c                    t          t          j        fi | j        } ||          | j        dddt          |          f<    ||          | j        dddt          |          f<    ||          | j        dt          |          <    ||          | j        dt          |          <   |	                                D ]_\  }	}
 ||
          | j
        |	         dt          |
          <   | j        +t          | j        |	         ||
|	dk    r| j        nd           `g | _        g | _        t!          t#                      ||          D ]\  }}} ||          | j        |         dt          |          <    ||          | j        |         dt          |          <   | j        rt          |          n| j        |                             d          }| j        rt          |          n| j        |                             d          }| j                            | j        |         d|                    | j                            | j        |         d|                    dS )zeBuilds the actual tensors for the current batch, by modifying the already allocated tensors in place.Nrn   r   )r   r   r   r   r   )r   r&   tensorrs   r?   r*   r@   r   rE   r   r   r   r;   r   rD   rC   zipr   r   r   r]   rx   r   )rh   r?   r@   rD   rC   r   r   rE   	to_tensorru   layer_type_seqlens_kr1   group_read_indicesgroup_write_indicesrws                   r:   r   z'ContinuousBatchProcessor._build_tensors  s    ELAAD,@AA	 /8i	.B.Bqqq*C	NN**+4=Il4K4K!!!0s<00001AJK_A`A`!"=C(<$=$="=>5>Y~5N5N1c.1112 1E0J0J0L0L 	 	,J,QZQZ[oQpQpD%j12MC8L4M4M2MN".$#'#6z#B)=)=:DH[:[:[4#6#6ab	    :=eggzS^:_:_ 		E 		E6A!#6DMIN`DaDaD#A&'@-?)@)@'@AFOiPcFdFdD$Q'(B#.A*B*B(BC+/+<e&'''$BYZ[B\BaBabdBeBeA,0,=g'(((4C[\]C^CcCcdfCgCgAO""4#:1#=bqb#ABBB##D$<Q$?$CDDDD		E 		Er<   c                     | j         5	 | j                                         d         }n# t          $ r ddg}Y nw xY wddg}|S )Nr   r   )r   tolistr   )rh   outs     r:   _synczContinuousBatchProcessor._sync  sa    ?&o,,..q1   !f a&C
s   ) ::tokenc                     | j         r.| j                            |                                           dS |j        t
          j        k    r.| j                            |                                           dS dS )zCSend output to the queue based on streaming mode and request state.N)r[   rV   r   r   r   r   FINISHED)rh   r   r   s      r:   _maybe_send_outputz+ContinuousBatchProcessor._maybe_send_output  sy     > 	@!!%"<"<">">?????\]333!!%"<"<">">????? 43r<   c                 
   |                                  }g }t          | j                  D ]*\  }}|j        }t	          |j                  dk    r| j                            |j        |j                   t          j
        |_        || j        |                  }|g|_        |                    |          ra| j                            |j        |j                   | j                            |j        | j                    |                    |           |                     ||           |j        t          j        k    rt          j        |_        ,| j                                        dk    rt3          d          dS )z0Update request states based on generated tokens.r   )evict_from_cachezNo more free blocksN)r   	enumerater`   r   r*   r   rb   record_ttft_metricr   r   DECODINGr   rE   r   update_with_tokenr   rZ   finish_requestr\   r   r   PREFILLING_SPLITSPLIT_PENDING_REMAINDERrF   r   
ValueError)rh   
out_tokensfinished_request_idsr1   r   req_idr   s          r:   update_batchz%ContinuousBatchProcessor.update_batch  sj    ZZ\\
!!$"899 	E 	EHAu%F5-..!33//0BEDTUUU,5"4#6q#9:$)7 **511 8L::5;MuO_```N11%2BZ^ZnVn1ppp(//777''u5555!???,D:))++q002333 10r<   c                 4    | j                                         S )z2Check if there are any active or waiting requests.)rZ   r   r   s    r:   r   z-ContinuousBatchProcessor.has_pending_requests!  s     ~22444r<   c                     | j         }|D ]7}|                     ||           | j                            |j                   8dS )z&Handle errors during batch processing.N)r`   r   rZ   r   r   )rh   r   failed_reqsreqs       r:   handle_batch_errorz+ContinuousBatchProcessor.handle_batch_error&  sV     , 	: 	:C&&uc222N))#.9999	: 	:r<   c                    t          | j        j                                                  }|D ]7}|                     ||           | j                            |j                   8t          | j        j                                                  D ]7}| j        j        	                    |          }|                     ||           8| j        j
                                         dS )zFail all active requests with the given error.

        Args:
            error: The error to report in the failure message
        N)rN   rZ   r   r   r   r   r   r   r   popwaiting_requests_orderclear)rh   r   requestsr   r   s        r:   fail_all_requestsz*ContinuousBatchProcessor.fail_all_requests.  s     6==??@@ 	< 	<E&&ue444N))%*:;;;; 4>:??AABB 	5 	5FN377??E&&ue4444 	-3355555r<   )FFT)F)(rH   rI   rJ   r   r   r   r   Queue	threadingEventr&   r$   r#   r   rO   ri   r   rM   rf   r   no_gradr   r>   r   r   r   r   r   r   rN   r   r   r   r   r   r   r   r   r  r	  rP   r<   r:   rR   rR      s}          %!94 94"94 !94 ,	94
 [94 k94 O94 l94 [94 94 94 94 94 
94 94 94 94v Vt)3s )3t )3 )3 )3 )3VEt E E E E U]__B Bt B B B _ VBB*"4 * * * *X
 
 
 9 9 V9" <, < < < V< [D [ [ [ V[z *E9*E 3i*E cO	*E
 $s)_*E #3i*E $DItCcN/C$CD*E S	*E 
*E *E *E V*EX   V @ @S @ @ @ V@ 4 4 V4* 5d 5 5 5 V5 : : V: 6 6 V6 6 6r<   rR   c            	          e Zd ZdZ	 	 	 	 d/dedededefd	Zed
             Zd Z	d0dede
e         fdZd1de
e         fdZ	 d2dee         de
e         de
e         defdZdeee                  fdZdefdZd2de
e         fdZd Zd Zedee         fd            Zedefd            Zed             Zedefd             Z ed!"          d#             Z ed$"          d%             Z  ed&"          defd'            Z!d( Z" ed)"          defd*            Z# ed+"          d,             Z$ede
e         fd-            Z%edefd.            Z&dS )3ContinuousBatchingManagerzManager for handling continuous batching of generation requests.

    This class provides the user interface for submitting generation requests,
    retrieving results, and managing the background generation thread.
    Fr   TrT   r\   r[   r]   c                    |                                 | _        ||j        n|}|| _        t          j        |          | _        t          j                    | _        t          j                    | _	        || _
        t          |dd          | _        d| _        d| _        t          j                    | _        d| j        j        _        t          |dd          | _        | j                            |          | _        t          |dd          | _        t          |d	d          | _        || _        d| _        || _        | j        rt5          d
          dS )aS  Initialize the continuous batching manager.

        Args:
            model: The language model for generation
            generation_config: Configuration for generation parameters
            max_queue_size: Maximum size of the request queue (0 = unlimited)
            streaming: Whether to stream tokens as they are generated
        N)maxsizelog_prob_generationFr   	do_sampleTuse_cuda_graphprofilez!Cuda graphs are not supported yet)evalmodelrT   r   r
  rU   rV   r  r  rW   r[   r_   r  _generation_thread_request_counterLock_request_locktop_pr  _get_logits_processorlogit_processorr  r  r\   batch_processorr]   NotImplementedError)rh   r  rT   r\   max_queue_sizer[   r]   s          r:   ri   z"ContinuousBatchingManager.__init__M  sB   " ZZ\\
7H7PE33Vg!2 ;~>>>!KMM#/++"#*+<>SUZ#[#[ "& !&^---1
$* !2KFF#z??@QRR%&79I5QQ0)UCC.CG( 	K%&IJJJ	K 	Kr<   c                    | j         /| j                                         rt          j        d           dS t	          j                    | _        t          j        | j	                  | _         | j         
                                 dS )z'Start the background generation thread.Nz"Manager thread is already running.)target)r  is_aliver   warningr   r
  _result_queuer  Thread_run_generation_loopstartr   s    r:   r)  zContinuousBatchingManager.startu  sy     ".43J3S3S3U3U.N?@@@F"[]]"+"2$:S"T"T"T%%'''''r<   c                 F    | j         duo| j                                         S )z5Check if the background generation thread is running.N)r  r$  r   s    r:   
is_runningz$ContinuousBatchingManager.is_running  s$    &d2Yt7N7W7W7Y7YYr<   Nblocktimeoutc                     | j         t          j        d           dS | j                                        s-| j                                         t          j        d           |r|                     |           dS dS )zSignal the background thread to stop.

        Args:
            block: Whether to wait for the thread to stop
            timeout: Maximum time to wait for the thread to stop
        NzManager not started.z'Stopping continuous batching manager...)r  r   r%  rW   is_setr   infojoin)rh   r,  r-  s      r:   stopzContinuousBatchingManager.stop  s     "*N1222F%%'' 	CO!!!KABBB 	IIg	 	r<   c                     | j         g| j                             |           | j                                         rt          j        d           dS t          j        d           d| _         dS dS )zWait for the background thread to finish.

        Args:
            timeout: Maximum time to wait for the thread to stop
        Nr-  z2Generation thread did not exit after join timeout.z$Continuous Batching Manager stopped.)r  r1  r$  r   r%  r0  )rh   r-  s     r:   r1  zContinuousBatchingManager.join  s|     ".#(((999&//11 /STTTTTBCCC*.''' /.r<   r?   r   max_new_tokensr    c                 v   |9| j         5  d| j         }| xj        dz  c_        ddd           n# 1 swxY w Y   || j        j        n|}t	          |t          |          t          |          || j        j                  }| j                            |dd           t          j
        d| d	           |S )
a/  Add a new generation request to the queue.

        Args:
            input_ids: Input token IDs to use as prompt
            request_id: Optional custom request ID (auto-generated if None)
            **kwargs: Additional generation parameters

        Returns:
            str: The request ID
        Nreq_r   )r   r   full_prompt_idsr5  eos_token_idT
   r,  r-  zAdded request z
 to queue.)r  r  rT   r5  r   rN   r9  rU   r   r   r   )rh   r?   r   r5  r   s        r:   add_requestz%ContinuousBatchingManager.add_request  s    # + +;D$9;;
%%*%%+ + + + + + + + + + + + + + + CQBX/>>^l !I OO)/<
 
 
 	U$;;;<j<<<===s   155inputsc                 ,    |D ]} | j         |fi | d S N)r<  )rh   r=  r   r?   s       r:   add_requestsz&ContinuousBatchingManager.add_requests  s9     	2 	2IDY11&1111	2 	2r<   c                 V    | j         !| j         j                            |           dS dS )zkCancel a request by its ID.

        Args:
            request_id: The ID of the request to cancel
        N)r  rZ   set_request_cancellationrh   r   s     r:   cancel_requestz(ContinuousBatchingManager.cancel_request  s5     + *CCJOOOOO ,+r<   c                 :   | j         | j                                        rdS 	 | j                            d|          }|'|j        |k    r| j                            |           dS t          j        d|j                    |S # t          j	        $ r Y dS w xY w)zRetrieve one result from the output queue.

        Args:
            timeout: Maximum time to wait for a result

        Returns:
            Optional[GenerationOutput]: The result data or None if timeout
        NTr;  zRetrieved result for request )
r  rV   rr   r   r   r   r   r   r   r   )rh   r   r-  results       r:   
get_resultz$ContinuousBatchingManager.get_result  s     "*t/@/F/F/H/H*4	&**w*GGF%&*;z*I*I!%%f---tLL9JLLMMMM{ 	 	 	44	s   AB )B BBc              #      K   | j         W| j                                         r@|                     d          }||V  | j         | j                                         <dS dS dS dS )z.Iterate over results as they become available.N皙?r4  )r  r$  rG  )rh   rF  s     r:   __iter__z"ContinuousBatchingManager.__iter__  s|      %1d6M6V6V6X6X1__S_11F! %1d6M6V6V6X6X111111111r<   c              #   0  K   d}| j         | j                                         rk|sk|                     |d          }||V  | j        | j        j                            |          }| j         #| j                                         r|edS dS dS dS dS dS )zMIterate over results matching a specific request id as they become available.FNrI  )r   r-  )r  r$  rG  r  rZ   request_is_cancelled)rh   r   request_cancelledrF  s       r:   request_id_iterz)ContinuousBatchingManager.request_id_iter  s      !%1d6M6V6V6X6X1ar1__
C_HHF!#/$($8$B$W$WXb$c$c! %1d6M6V6V6X6X1ar1111111111111r<   c                  
    h dS )N>   
sdpa_pagedeager_pagedflash_attention_2rP   rP   r<   r:   #supported_attention_implementationsz=ContinuousBatchingManager.supported_attention_implementations  s    AAAAr<   c                      dS )NrP  rP   rP   r<   r:    default_attention_implementationz:ContinuousBatchingManager.default_attention_implementation  s    |r<   c                    t           j                            | j        j                  }|                    t           j                                                   t           j                            |          5  |                     |           d d d            n# 1 swxY w Y   t           j                                                            |           t           j        	                                | _
        t           j        
                    | j
        |          5  |                     |           d d d            d S # 1 swxY w Y   d S )N)r$   )stream)r&   cudaStreamr  r$   wait_streamcurrent_streamrW  _generation_step	CUDAGraphgraph)rh   r  rW  s      r:   warmupz ContinuousBatchingManager.warmup  s   ""$**;"<<5:4466777Zv&& 	3 	3!!/222	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	
!!##//777Z))++
Zdj88 	3 	3!!/222	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3 	3s$   ;BB!$B!"EE	E	r  c                 J   |                                 }t          j                    5  |                     |          }| j        r|j                            |           |                     ||          }|                     ||           ddd           dS # 1 swxY w Y   dS )z6Perform a single generation step. This is cuda graphedN)	r   r&   r  _model_forwardr  output_probscopy__process_logit_sample)rh   r  
batch_datalogitsprobss        r:   r\  z*ContinuousBatchingManager._generation_step  s     %5577
]__ 	1 	1((44F' ;,226:::''
F;;ELL%000	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1 	1s   A#BBBmodel_forward)	span_namec                 &     | j         di |j        S )NrP   )r  rg  )rh   rf  s     r:   ra  z(ContinuousBatchingManager._model_forward  s    tz''J''..r<   logit_processingc                 X   t          | j        d          r'| j                            |d         |d                    |j        \  }}}|                    ||z  |          }|d                             ||z            }|                     ||          }|                    |||          S )Nset_continuous_batching_contextrE   r   r?   )hasattrr  rn  r-   view)	rh   rf  rg  
batch_sizeseq_len
vocab_size	logits_2dinput_ids_2dprocessed_logits_2ds	            r:   rd  z(ContinuousBatchingManager._process_logit  s     4')JKK 	 @@+,j.I   +1,'
GZKK
W 4jAA	!+.33J4HII #22<KK #''
GZHHHr<   samplingc                    | j         rft          j                            |d          }t	          j        |d         d                              d          }|                    d          }nt	          j        |d          }|	                    d          }|j
        d d d |f                             |           d S )Nr   )dimr   r   )num_samples)r  r	   
functionalsoftmaxr&   multinomialsqueeze	unsqueezeargmaxrx   r   rc  )rh   r  rh  next_tokenstokenss        r:   re  z!ContinuousBatchingManager._sample2  s    > 	6M))%R)88E+E!H!DDDLLRPPK%//22KK,u"555K!!!$$"111gvg:.44[AAAAAr<   c                    d}	 t                      }t          | j        j        | j        | j        j        | j        j        t          | j        dd                    }t          j	        dt                      |z
   d           d}t          | j        d          rAt          j        | j        j        d          }|t          j        d| d           t          }nt          }t                      }t!          || j        j        | j        | j        | j        | j        | j        j        | j        j         ||| j                  | j        | j        | j        	          }|| _        d
| _        t          j	        dt                      |z
   d           | j                                        r|                                rR|                     |           | xj        dz  c_        | j                                        >|                                RnF# t8          $ r9}t          j        d| d           |                     ||           Y d}~nd}~ww xY wt          j        d           dS # t          j        d           w xY w)z6Main processing loop running in the background thread.N_tp_size)tp_sizezPagedAttentionCache created in z secondsrZ   zScheduler 'z ' not found. Defaulting to FIFO.)r]   r   zbatch_processor created in r   zError in generation loop: Tr   zGeneration loop finished.) r   r   r  rS   rT   r$   r#   r_   r   r   ro  r   r   rZ   r%  r   rR   rU   rV   rW   r\   r[   r]   r  current_batchr/  r   _inner_generation_loopr   r   _handle_critical_errorr0  )rh   r  ref_timepaged_attention_cacherZ   r   s         r:   r(  z.ContinuousBatchingManager._run_generation_loop@  s   /	5#~~H$7
!&
!
 
J==% % %! L^<>>H;T^^^___It-{;; *-1$2H2RTXYY	$N#\#\#\#\]]] -I *	#~~H6%
!& !
!
 	/1EFF$!.  O $3D !"DLZ|~~7PZZZ[[[--// (O4X4X4Z4Z (++O<<<""a'"" --// (O4X4X4Z4Z (  	< 	< 	<L9a99DIIII''?;;;;;;;;	< K344444FK34444s0   HH I- 
I/I
I- II- -Jgeneration_loopc           
         t           j                                        rt           j                                         |                                sd S t
          j        t          j        k    r3t                      \  }}}}t          j
        d| d| d| d|            t           j                                        r| j        r| j        dk    r|                     |           nt          | d          r\	 |                                  nq# t           $ r9}t          j        d| d	           |                    |           Y d }~d S d }~ww xY w|                     |           n|                     |           t           j                                        rt           j                                         |                                 d S )
Nz[Memory] Device: z	, Total: z, Reserved: z, Allocated: r   r^  zModel forward pass failed: Tr   )r&   rX  is_availablesynchronizer   r   levelr   r   r   r   r  r  r_  ro  _graph_replayr   r   r  r\  r   )rh   r  r$   totalreserved	allocatedr   s          r:   r  z0ContinuousBatchingManager._inner_generation_loopt  s   :""$$ 	%J""$$$1133 	F<7=((1P1R1R.FE8YLsVssessQYsshqssttt:""$$ 	3)< 	3!Q&&O,,,,w'' 7&&((((    L!Bq!B!BTRRRR#66q999FFFFF
 %%o6666!!/222:""$$ 	%J""$$$$$&&&&&s   7D 
E.E

Egraph_replayc                 8    | j                                          d S r?  )r^  replayr   s    r:   r  z'ContinuousBatchingManager._graph_replay  s    
r<   c                     | j                                          	 	 | j                                        }||                    ||           2# t
          j        $ r Y nw xY w||                    |           dS dS )z:Handle critical errors that terminate the generation loop.TN)rW   r   rU   r   r   r   r   r	  )rh   r   r  req_datas       r:   r  z0ContinuousBatchingManager._handle_critical_error  s     		K+6688".#99%JJJK { 	 	 	D	 &--e44444 '&s   3A A A c                     | j         st          d          | j        !| j        j                            |           dS dS )zSEvict a request from the cache. It is assumed that the request is already finished.z0Manual eviction is not enabled for this manager.N)r\   RuntimeErrorr  rZ   r   rC  s     r:   evict_request_from_cachez2ContinuousBatchingManager.evict_request_from_cache  sP     # 	SQRRR+ *99*EEEEE ,+r<   )Fr   TT)FNr?  )NN)'rH   rI   rJ   __doc__r   rO   ri   r   r)  r+  r   floatr2  r1  rN   rM   r   r<  r@  rD  r   rG  rJ  rN  staticmethodr   rS  rU  r_  rR   r\  ra  rd  re  r(  r  r  r  r  rP   r<   r:   r  r  E  s         !&!&K &K ,&K 	&K &K &K &K &K &KP ( ( V(Z Z Z $ %    $/ /HUO / / / / gk   c 08 V^_bVc 	       D24S	? 2 2 2 2P P P P P 8DT;U    *  d d d BS B B B \B c    \ 
3 
3 V
3 10H 1 1 1 V1 Vo&&&/ / '&/ V()))I I *)I& Vj!!!B'? B B B "!B25 25 25h V'((('6N ' ' ' )('4 Vn%%%  &% 5XF^=_ 5 5 5 V5$ F3 F F F VF F Fr<   r  c                       e Zd ZdZ	 	 	 	 	 ddee         deded	ed
edefdZ	e
 ej                    	 	 	 ddeee                  dee         ded
edeee                  f
d                        ZdS )ContinuousMixinz?Mixin class for models to add continuous batching capabilities.NFr   TrT   r\   r!  r[   r]   r    c                 &   t          | d          r t          | d          rt          | d          st          d          ||n| j        }|t          d          |j        t          j        d           d|_        t          | |||||	          S )
a  Initialize a manager for continuous batching inference.

        Args:
            generation_config: Custom generation configuration
            max_queue_size: Maximum size of the input request queue
            streaming: Whether to stream tokens as they are generated

        Returns:
            `ContinuousBatchingManager`: The manager instance to add requests and retrieve results.
        rS   r$   r#   z;Model must have 'config', 'device', and 'dtype' attributes.Nz8A GenerationConfig must be provided or set in the model.zE`eos_token_id` not set in GenerationConfig. Setting to -1 (disabled).r   )r  rT   r\   r!  r[   r]   )ro  AttributeErrorrT   r   r9  r   r%  r  )rh   rT   r\   r!  r[   r]   
gen_configs          r:   init_continuous_batchingz(ContinuousMixin.init_continuous_batching  s    $ tX&& 	`gdH.E.E 	`WUY[bMcMc 	` !^___*;*G&&TMc
WXXX"*Nbccc&(J# )(+)%
 
 
 	
r<   r=  progress_barc                    |sg S t          j                    t          j        k    rt          j        d           d}|                     ||          }|                                 i }t          |          }	 ddlm	}	  |	t           g          5  t          || d| dd	          5 }
 |j        |fi | d}||k     r|                    d
          }|r<|j        }|j        t          j        k    r|||<   |d
z  }|
                    d
           n)|                                st          j        d           n||k     ddd           n# 1 swxY w Y   ddd           n# 1 swxY w Y   n0# t(          $ r#}t          j        d| d           Y d}~nd}~ww xY w|                    dd           n# |                    dd           w xY w|S )a=  Generate sequences for a batch of prompts using continuous batching.

        Args:
            inputs: List of input token sequences (prompts)
            generation_config: Optional generation configuration
            **kwargs: Additional generation parameters

        Returns:
            `list[list[int]]`: A list containing the generated sequences (including prompt tokens
                                if not handled otherwise) for each input prompt, in the same order.
                                Returns an empty list `[]` for requests that failed.
        z=Progress bar is disabled when logger level is less than DEBUGF)rT   r]   r   )logging_redirect_tqdmzSolving z	 requestsrequest)r  disabledescunitr   r4  z*Generation thread terminated unexpectedly.NzError during batch generation: Tr   g      @r;  )r   getEffectiveLevelr   r   r%  r  r)  r*   tqdm.contrib.loggingr  r
   r@  rG  r   r   r   r   updater+  r   r   r2  )rh   r=  rT   r  r]   r   managerresultsnum_requestsr  pbarfinished_countrF  r   r   s                  r:   generate_batchzContinuousMixin.generate_batch  s   ,  	I#%%66NZ[[[ L //BSbn/oo6{{	2BBBBBB&&x00 & &&!--;L;;;"	   &
 (G(::6:::%&N(<77!(!3!3A!3!>!>! 	&%+%6F%}0FFF28 .! 3 $A#*#5#5#7#7 & &-Y Z Z Z % )<77& & & & & & & & & & & & & & && & & & & & & & & & & & & & &,  	O 	O 	OL>1>>NNNNNNNNN	O LLtSL1111GLLtSL1111sy   9E2 E&)BEE&E	E&E	E&E2 &E**E2 -E*.E2 1F: 2
F<FF: FF: :G)NFr   FT)NTT)rH   rI   rJ   r  r   r   rO   rM   r  r  r   r&   inference_moderN   r  rP   r<   r:   r  r    s       II 9= %!%
 %
#$45%
 %
 	%

 %
 %
 
#%
 %
 %
 %
N U 9=!!< <T#Y< $$45< 	<
 < 
d3i< < <  V< < <r<   r  rp   )-r   r  dataclassesr   	functoolsr   	itertoolsr   timer   typingr   r   r&   r	   r
   configuration_utilsr   generation.configuration_utilsr   utils.loggingr   utils.metricsr   r   r   rF   r   r  r   r   r   r   r   rZ   r   r   r   rK   rM   r;   r>   rR   r  r  rP   r<   r:   <module>r     s         ! ! ! ! ! !                   " " " " " " " "              3 3 3 3 3 3 > > > > > > $ $ $ $ $ $ S S S S S S S S S S & & & & & & l l l l l l l l l l l l l l B B B B B B B B B B 	Q= Q=LQ=,Q=  ,Q= 	Q=
 
Q= Q= Q= Q=h          v6 v6 v6 v6 v6 v6 v6 v6t fF fF fF fF fF fF fF fFRh h h h h h h h h hr<   