
    Pi                         d dl Z d dlZd dlmZmZmZmZ d dlmZ	 d dl
mZ d dlmZmZ d dlmZmZ ddlmZ ded	ee j        e	j        f         d
ededej        dej        fdZdS )    N)AnyDictOptionalUnion)BaseNode)ExceptionWrapperStartupExceptionWrapper)MonotonicIndexSnapshotStore   )QUEUE_TIMEOUTsourceqsnapshot_storesnapshot_frequency	semaphore
stop_eventc           	         t                      	 	 ddt          dt          t          t          t
          t          f         t          f                  ffd}	 t          |t                    r|dk    sJ d|             
                    |                                            n7# t          $ r* t          d	
          }
                    |           Y dS w xY wd}|                                s|                    dt                    s1	 t!          |           }	|dz  }d}
|dk    r||z  dk    r|                                 }
 ||	d|
           nN# t"          $ r} ||d           Y d}~dS d}~wt          $ r! t%          d
          }	 ||	d           Y dS w xY w|                                dS dS )a  _populate_queue calls `iter(source)` to get an iterator `it`, waits for semaphore.acquire,
    and puts its outputs onto q. It never releases the sempahore. It continues to put items on the
    q as long as it can acquire the sempahore, stop_event is not set, and StopIteration has not
    been thrown by the `it`.

    This function will always put tuples of (x, idx) on the q where idx
    starts from 0 and is monotonically increasing. x may be the output of next(it), StopIteration,
    or an ExceptionWrapper.

    If there is an exception raised during the call to `iter(source)`, this function does not
    wait to acquire sempahore before putting StartupExceptionWrapper on q.

    Note: this is only intended to be used by a single thread at once. Each instance
    creates its own iter for source so if this is called with multiple threads, you may get
    duplicates if source is not sharded properly.
    TNblocksnapshotc                                                      }|r                    ||                               | |f||rdnd            d S )N)r   versiong      ?)r   timeout)getappendput)itemr   r   _idxidxr   r   s       s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchdata/nodes/_populate_queue.py_putz_populate_queue.<locals>._put1   s_    
 wwyy 	C!!8T!BBB	tTl%1G4HHHHH    r   z5snapshot_frequency must be non-negative integer! Got )r   z%in _populate_queue startup for device)where)blockingr   r   F)r   r   )r   zin _populate_queue)TN)r
   boolr   r   r   strr   r	   
isinstanceintappend_initial_snapshot
state_dict	Exceptionis_setacquirer   nextStopIterationr   )r   r   r   r   r   r   r!   eyieldedr   r   r   s    ``        @r    _populate_queuer2      sr   4 

C MQI II 5c3h1H!HIJI I I I I I I I)3//	X4F!4K4K4KWCUWW 5L4KK..8I8I8K8K.LLLL   #*QRRR...:::
 G!!   $ FF 		<<DqLGH!A%%'4F*F!*K*K!,,..DUX66666 	 	 	D%    EEEEE 	 	 	#*>???DDU####EE	 !!     s2   AB, ,0C C AE 
F*)E<<*F*)F*)queue	threadingtypingr   r   r   r   torch.multiprocessingmultiprocessingmptorchdata.nodes.base_noder   !torchdata.nodes.exception_wrapperr   r	   torchdata.nodes.snapshot_storer
   r   	constantsr   Queuer(   BoundedSemaphoreEventr2    r"   r    <module>rA      s        - - - - - - - - - - - - " " " " " " . . . . . . W W W W W W W W H H H H H H H H $ $ $ $ $ $AAU["("#A "A 	A
 )A A A A A A Ar"   