
    -`i>              	       :   d dl Z d dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
 d dlmZ d dlmZ d dl mZ d dlmZ d dlZd dlZd d	lmZ d d
lmZ d dlmZ  ee          Z G d dej        ddd          Z G d dej        dddd          ZdZ G d de          Z  G d de          Z! G d de          Z" G d de          Z# G d d          Z$ G d d e          Z% G d! d"e          Z& G d# d$e&          Z' G d% d&e&          Z( G d' d(          Z)dS ))    N)ABCabstractmethod)Counterdeque)Callable)asdict)count)Queue)Any)KVEventsConfig)init_logger)ExternalBlockHashc                   D    e Zd ZU eed<   ee         ed<   dZedz  ed<   dS )
EventBatchtseventsNdata_parallel_rank)	__name__
__module____qualname__float__annotations__listr   r   int     n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/vllm/distributed/kv_events.pyr   r      sB          	IIII%)d
)))))r   r   TF)
array_likeomit_defaultsgcc                       e Zd ZdZdS )KVCacheEventz*Base class for all KV cache-related eventsN)r   r   r   __doc__r   r   r   r"   r"   $   s         5444r   r"   )r   r   r    tagGPUc                       e Zd ZU ee         ed<   edz  ed<   ee         ed<   eed<   edz  ed<   	 edz  ed<   edz  ed<   d	efd
ZdS )BlockStoredblock_hashesNparent_block_hash	token_ids
block_sizelora_idmedium	lora_namereturnc                     t          t          | j                  | j        t          | j                  | j        | j        | j        f          S N)hashtupler(   r)   r*   r+   r,   r-   selfs    r   __hash__zBlockStored.__hash__?   sI    d'((&dn%%	
 	
 		
r   )	r   r   r   r   r   r   r   strr6   r   r   r   r'   r'   1   s         ())))(4////CyOOO4Z $JTz

# 

 

 

 

 

 

r   r'   c                   B    e Zd ZU ee         ed<   edz  ed<   defdZdS )BlockRemovedr(   Nr-   r/   c                 R    t          t          | j                  | j        f          S r1   )r2   r3   r(   r-   r4   s    r   r6   zBlockRemoved.__hash__P   s"    U4,--t{;<<<r   )	r   r   r   r   r   r   r7   r   r6   r   r   r   r9   r9   L   sS         ())))$J=# = = = = = =r   r9   c                       e Zd ZdS )AllBlocksClearedN)r   r   r   r   r   r   r<   r<   T   s        Dr   r<   c                   2    e Zd ZU eeez  ez           ed<   dS )KVEventBatchr   N)r   r   r   r   r'   r9   r<   r   r   r   r   r>   r>   X   s.         |+.>>??????r   r>   c                       e Zd ZdZdZdeddfdZdee         ddfdZ	dee         fd	Z
dee         fd
ZddZddeddfdZddZdefdZdefdZdS )KVEventAggregatorz
    Aggregates KV events across multiple workers.
    Tracks how many times each event appears and returns only those
    that were emitted by all workers.
    )_event_counter_num_workersnum_workersr/   Nc                 d    |dk    rt          d          t                      | _        || _        d S )Nr   z&num_workers must be greater than zero.)
ValueErrorr   rA   rB   )r5   rC   s     r   __init__zKVEventAggregator.__init__e   s6    !EFFF5<YY!,r   r   c                     t          |t                    st          d          | j                            |           dS )zg
        Add events from a worker batch.

        :param events: List of KVCacheEvent objects.
        z&events must be a list of KVCacheEvent.N)
isinstancer   	TypeErrorrA   updater5   r   s     r   
add_eventszKVEventAggregator.add_eventsk   sC     &$'' 	FDEEE""6*****r   c                 N      fd j                                         D             S )zv
        Return events that appeared in all workers.

        :return: List of events present in all workers.
        c                 0    g | ]\  }}|j         k    |S r   rB   ).0eventr	   r5   s      r   
<listcomp>z7KVEventAggregator.get_common_events.<locals>.<listcomp>{   s5     
 
 
u))) )))r   )rA   itemsr4   s   `r   get_common_eventsz#KVEventAggregator.get_common_eventsu   s=    
 
 
 
 $ 3 9 9 ; ;
 
 
 	
r   c                 N    t          | j                                                  S )zf
        Return all events for all workers.

        :return: List of events for all workers.
        )r   rA   elementsr4   s    r   get_all_eventsz KVEventAggregator.get_all_events   s!     D'0022333r   c                 8    | j                                          dS )z+
        Clear all tracked events.
        N)rA   clearr4   s    r   clear_eventszKVEventAggregator.clear_events   s     	!!#####r      r	   c                 P    |dk    rt          d          | xj        |z  c_        dS )z
        Increment the number of workers contributing events.

        :param count: Number to increment the workers by.
        r   zcount must be positive.N)rE   rB   r5   r	   s     r   increment_workersz#KVEventAggregator.increment_workers   s6     A::6777U"r   c                     d| _         dS )z3
        Reset the number of workers to 1.
        r[   NrO   r4   s    r   reset_workerszKVEventAggregator.reset_workers   s     r   c                     | j         S )zX
        Return the number of workers.

        :return: int number of workers.
        rO   r4   s    r   get_number_of_workersz'KVEventAggregator.get_number_of_workers   s       r   c                 B    d| j          dt          | j                   dS )Nz<KVEventAggregator workers=z	, events=>)rB   lenrA   r4   s    r   __repr__zKVEventAggregator.__repr__   s8    2$*; 2 2$-..2 2 2	
r   r/   Nr[   )r   r   r   r#   	__slots__r   rF   r   r"   rL   rT   rW   rZ   r^   r`   rb   r7   rf   r   r   r   r@   r@   \   s3         3I-C -D - - - -+l!3 + + + + +

4#5 

 

 

 

4\ 2 4 4 4 4$ $ $ $# #s #4 # # # #   !s ! ! ! !
# 
 
 
 
 
 
r   r@   c                       e Zd ZdZedee         ddfd            Zedd            Zedde	ddfd	            Z
edee         fd
            Zede	fd            Zedd            ZdS )KVConnectorKVEventszf
    Abstract base class for KV events.
    Acts as a container for KV events from the connector.
    r   r/   Nc                     t           r1   NotImplementedErrorrK   s     r   rL   zKVConnectorKVEvents.add_events       !!r   c                     t           r1   rm   r4   s    r   	aggregatezKVConnectorKVEvents.aggregate   ro   r   r[   r	   c                     t           r1   rm   r]   s     r   r^   z%KVConnectorKVEvents.increment_workers   ro   r   c                     t           r1   rm   r4   s    r   rW   z"KVConnectorKVEvents.get_all_events   ro   r   c                     t           r1   rm   r4   s    r   rb   z)KVConnectorKVEvents.get_number_of_workers   ro   r   c                     t           r1   rm   r4   s    r   rZ   z KVConnectorKVEvents.clear_events   ro   r   )r/   rk   rh   rg   )r   r   r   r#   r   r   r"   rL   rq   r   r^   rW   rb   rZ   r   r   r   rk   rk      s        
 "l!3 " " " " ^" " " " ^" " "s "4 " " " ^" "\ 2 " " " ^" "s " " " ^" " " " ^" " "r   rk   c                   \    e Zd ZdZd
deddfdZededdfd            Zedd	            Z	dS )EventPublisheraG  Lightweight publisher for EventBatch batches with data parallelism
    support.

    In data parallel setups, each DP rank runs its own EventPublisher instance
    to avoid duplicate events and ensure proper event attribution:

    - Each DP rank creates a separate publisher
    - Publishers automatically annotate events with their data_parallel_rank
    - This allows consumers to distinguish events from different DP ranks

    The publisher is responsible for adding DP metadata since the scheduler
    operates independently of DP topology and shouldn't need DP awareness.
    r   r   r/   Nc                     || _         d S r1   )_data_parallel_rank)r5   r   s     r   rF   zEventPublisher.__init__   s    #5   r   r   c                     dS )zEmit events in order.

        Implementations should guarantee at-least-once delivery and
        monotonic ordering (e.g., via sequence numbers).
        Nr   rK   s     r   publishzEventPublisher.publish         r   c                     dS )zShutdown the publisher.Nr   r4   s    r   shutdownzEventPublisher.shutdown   r|   r   r   rg   )
r   r   r   r#   r   rF   r   r   r{   r~   r   r   r   rw   rw      s         6 63 6t 6 6 6 6 j T    ^ & & & ^& & &r   rw   c                   "    e Zd ZdZddZddZdS )NullEventPublisherz-No-op implementation (default when disabled).r/   Nc                     d S r1   r   rK   s     r   r{   zNullEventPublisher.publish       r   c                     d S r1   r   r4   s    r   r~   zNullEventPublisher.shutdown   r   r   rg   )r   r   r   r#   r{   r~   r   r   r   r   r      sB        77        r   r   c                        e Zd ZU dZdZeed<   d                    ddd          Z	 	 	 	 	 	 dde	de
de
d
z  de	de	de	de
dd
f fdZdedd
fdZddZddZddZddZede
d
z  de	de
d
z  fd            Z xZS ) ZmqEventPublishera  Reliable PUB/ROUTER publisher with an in-memory replay buffer.

    Spawns a separate thread to handle publishing from a queue.

    Parameters
    ----------
    endpoint:
        PUB address. Use `tcp://*:5557` to bind or `tcp://host:5557` to
        connect.
    replay_endpoint:
        Optional ROUTER address for replay requests. When given, subscribers can
        request missed batches by sending the starting sequence number as an
        8-byte big-endian integer.
    buffer_steps:
        Number of past batches to keep for replay.
    hwm:
        ZeroMQ high-water-mark for PUB socket.
    max_queue_size:
        Maximum number of events to buffer in memory.
    topic:
        Topic to publish events to.
    g      ?SHUTDOWN_TIMEOUT   bigT)signedtcp://*:5557N'  順  r   endpointreplay_endpointbuffer_stepshwmmax_queue_sizetopicr/   c                 *   t                                          |           t          t          d z           |          | _        t          t          t          t          f                  |          | _	        t          j                                        | _        d | _        d | _        || _        |                     || j                  | _        |                     || j                  | _        || _        |                                  t-                      | _        |                    d          | _        d| _        t6                              d           t;          j        | j        dd          | _         | j         !                                 d S )N)maxsize)maxlenzutf-8TzStarting ZMQ publisher threadzzmq-publisher)targetdaemonname)"superrF   r
   r   _event_queuer   r3   r   bytes_bufferzmqContextinstance_ctx_pub_replay_dp_rankoffset_endpoint_port	_endpoint_replay_endpoint_hwm_socket_setupr	   _seq_genencode_topic_bytes_runningloggerinfo	threadingThread_publisher_thread_threadstart)	r5   r   r   r   r   r   r   r   	__class__s	           r   rF   zZmqEventPublisher.__init__  sV    	+,,,!*t"34^LLLU3:./|DDD K((**	'+	*.*228T]KK $ 9 9T]!
 !
 	 !LL11 3444 ')$_
 
 
 	r   r   c                     | j         st          d          |j        | j        |_        | j                            |           d S )NzPublisher is closed)r   RuntimeErrorr   ry   r   putrK   s     r   r{   zZmqEventPublisher.publish:  sL    } 	64555$,(,(@F%f%%%%%r   c                    d| _         | j                            d           t          j                    }d}|rpt          j                    |z
  | j        k     rQ| j                                         }|rt          j        d           |rt          j                    |z
  | j        k     Q|r8t                              d| j        	                                | j                   | j
                                        r | j
                            | j                   	 | j        | j                            d           | j        | j                            d           dS # w xY w)	z1Stop the publisher thread and clean up resources.FNT皙?z:Warning: Queue still has %s items after %s seconds timeouttimeoutr   )linger)r   r   
put_nowaittimer   emptysleepr   warningqsizer   is_alivejoinr   closer   )r5   r   pending_itemss      r   r~   zZmqEventPublisher.shutdownA  s_   $$T***	 	 u!4t7L!L!L $ 1 7 7 9 99M  
3  	 u!4t7L!L!L
  	NNL!''))%   <  "" 	=Ld&;<<<	y$	q)))|'""!",,,DDDDDs   AE# #E%c                 l   | j         | j                            t          j                  | _         | j                             | j                   | j        fd| j        v s=d| j        v s4| j                            d          s| j                            d          r | j         	                    | j                   n&| j        | j         
                    | j                   | j        J| j                            t          j                  | _        | j        	                    | j                   dS dS )zoInitialize sockets
        https://pyzmq.readthedocs.io/en/v19.0.0/morethanbindings.html#thread-safety
        N*z::zipc://z	inproc://)r   r   socketr   PUBset_hwmr   r   
startswithbindconnectr   ROUTERr   r4   s    r   r   zZmqEventPublisher._socket_setup`  s    9	((11DIIdi((( ~)t~%%4>))>,,X66 *>,,[99 * 	t~....+	!!$.111  ,9++CJ77DLLd344444 -,r   c                    t           j                                        | _        | j        J | j        s| j                                        dk    r| j        b| j        	                    d          rH	 | 
                                 n2# t          $ r%}t                              d|           Y d}~nd}~ww xY w	 | j                            d          }|dS n# t          j        $ r Y w xY w	 t#          | j                  }| j                            |          }|                    dd          }| j                            | j        ||f           | j                            ||f           | j                                         nF# t          $ r9}t                              d|           t5          j        d           Y d}~nd}~ww xY w| j        | j                                        dk    dS dS )	z1Background thread that processes the event queue.Nr   zError in replay: %sr   r   r   r   zError in publisher thread: %s)msgspecmsgpackEncoder_packr   r   r   r   r   poll_service_replay	Exceptionr   	exceptiongetqueueEmptynextr   r   to_bytessend_multipartr   r   append	task_doner   r   )r5   erQ   seqpayload	seq_bytess         r   r   z#ZmqEventPublisher._publisher_thread{  s   _,,..
y$$$m 	 t066881<<|'DL,=,=a,@,@'?((****  ? ? ?$$%:A>>>>>>>>?)--c-::=E !;    4=))*++E22LLE22		(($*;Y)PQQQ##S'N333!++----        !@!DDD
3 5 m 	 t066881<<<<<<sC   4B	 	
B8B33B8<C C.-C.2BF 
G/GGc           	         | j         J | j                                         }t          |          dk    rt                              d|           dS |\  }}}t
                              |d          }| j        D ]=\  }}||k    r2| j                             |d|	                    dd          |f           >| j                             |d| j
        df           dS )z6If a replay request is waiting, send buffered batches.N   zInvalid replay request: %sr   r   r   )r   recv_multipartre   r   r   r   
from_bytesr   r   r   END_SEQ)r5   frame	client_id_start_seq_bytes	start_seqr   bufs           r   r   z!ZmqEventPublisher._service_replay  s    |'''++--u::??NN7???F(-%	1oNN?E::	 	 	HCi ++S\\!U%;%;SA  
 	##YT\3$GHHHHHr   c                     | r|dk    r| S d| v r|  d| S d| v rM| rId| v rE|                      d          }| d|         }t          | |dz   d                   }||z   }| d| S | S t          d          )	a  Helper function to offset the port in an endpoint by
            the data parallel rank.

        Args:
            endpoint: The endpoint string
                (e.g., "tcp://*:5557" or "inproc://cache")
            data_parallel_rank: The data parallel rank to offset by

        Returns:
            The endpoint with the port offset by data_parallel_rank
                or suffix appended
        r   inproc_dptcp:Nr[   z0Invalid endpoint: must contain 'inproc' or 'tcp')rfindr   rE   )r   r   last_colon_idx	base_addr	base_portnew_ports         r   r   z&ZmqEventPublisher.offset_endpoint_port  s    "  	-22Ox77#5777H 1C8OO!)!4!4$_n_5	!);)=)= >??	$'99#00h000OKLLLr   )r   Nr   r   r   r   rg   )r   r   r   r#   r   r   r   r   r   r   r7   rF   r   r{   r~   r   r   r   staticmethodr   __classcell__)r   s   @r   r   r      s         . "e!!!mmAuTm22G
 '&*"%' '' ' t	'
 ' ' ' ' 
' ' ' ' ' 'R&j &T & & & &   >5 5 5 56#  #  #  # JI I I I. M*M25M	tM M M \M M M M Mr   r   c                       e Zd ZU eedZeeede	f         f         e
d<   ededede	f         ddfd            Ze	 dd
edz  dede	fd            ZdS )EventPublisherFactory)nullr   .	_registryr   ctorr/   Nc                 R    || j         v rt          d| d          || j         |<   d S )Nzpublisher 'z' already registered)r  KeyError)clsr   r  s      r   register_publisherz(EventPublisherFactory.register_publisher  s;    3=  CCCCDDD"dr   r   configr   c                 6   ||j         r|j        dk    rt                      S t          |          }|                    d          }|                    d           	 | j        |         }n&# t          $ r}t          d| d          |d}~ww xY w |dd|i|S )	z'Create publisher from a config mapping.Nr  	publisherenable_kv_cache_eventszUnknown event publisher ''r   r   )r  r
  r   r   popr  r  rE   )r  r  r   config_dictkindconstructorexcs          r   createzEventPublisherFactory.create  s     N0 6))%'''Vnn{++0111	K--KK 	K 	K 	K@@@@AAsJ	K{PP.@PKPPPs   A+ +
B5B		Br   )r   r   r   r   r   r  dictr7   r   rw   r   classmethodr  r   r   r  r   r   r   r   r     s         " ; ;ItC#~"5667   
 #c ##~:M1N #SW # # # [#
 FGQ Q#d*Q@CQ	Q Q Q [Q Q Qr   r   )*r   r   r   abcr   r   collectionsr   r   collections.abcr   dataclassesr   	itertoolsr	   r
   typingr   r   r   vllm.config.kv_eventsr   vllm.loggerr   vllm.v1.core.kv_cache_utilsr   r   r   Structr   r"   
MEDIUM_GPUr'   r9   r<   r>   r@   rk   rw   r   r   r   r   r   r   <module>r      s\         # # # # # # # # & & & & & & & & $ $ $ $ $ $                          



 0 0 0 0 0 0 # # # # # # 9 9 9 9 9 9	X		* * * * *N	* * * *5 5 5 5 5N5 5 5 5 

 
 
 
 
, 
 
 
6= = = = =< = = =	 	 	 	 	| 	 	 	@ @ @ @ @: @ @ @O
 O
 O
 O
 O
 O
 O
 O
d" " " " "# " " ">& & & & &S & & &>       `M `M `M `M `M `M `M `MF Q  Q  Q  Q  Q  Q  Q  Q  Q  Qr   