
    PiT                        d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZmZ ddlmZ dd	l m Z  dd
l!m"Z" dZ# G d de          Z$ ed          Z%dee%         dee%gef         ddfdZ&ee%         Z'ee         Z( G d dee%ef                   Z)dee j*        ej*        f         de j*        dej+        fdZ, G d dee                   Z- G d dee                   Z. G d dee                   Z/ G d  d!ee                   Z0eee j*        ee1ej2        ej+        gdf         Z3 G d" d#ee                   Z4dS )$    N)AnyCallableDictGenericIteratorListLiteralOptionalProtocolSequenceTypeVarUnion)BaseNodeT)Batcher	Unbatcher)ExceptionWrapperStartupExceptionWrapper)QueueSnapshotStoreSnapshotStore   )
_apply_udf)_populate_queue)QUEUE_TIMEOUTi,  c                        e Zd Zd Zd Zd ZdS )_MultiprocessContextc                     d S N selfargskwargss      g/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchdata/nodes/map.pyProcessz_MultiprocessContext.Process           c                     d S r   r   r    s      r$   Eventz_MultiprocessContext.Event    r&   r'   c                     d S r   r   r    s      r$   Queuez_MultiprocessContext.Queue#   r&   r'   N)__name__
__module____qualname__r%   r)   r+   r   r'   r$   r   r      sA                r'   r   Xsourcemap_fnreturnzParallelMapper[T]c                 &    t          | |d          S )a  Returns a :class:`ParallelMapper` node with num_workers=0, which will execute map_fn in the current process/thread.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
    r   )r0   r1   num_workers)ParallelMapper)r0   r1   s     r$   Mapperr6   *   s$        r'   c                   T    e Zd Zdeegef         fdZdee         dee         fdZdS )MapOverBatchr1   c                     || _         d S r   r1   )r!   r1   s     r$   __init__zMapOverBatch.__init__=   s    r'   xlistr2   c                        fd|D             S )Nc                 :    g | ]}                     |          S r   r:   ).0xr!   s     r$   
<listcomp>z)MapOverBatch.__call__.<locals>.<listcomp>A   s#    ...1A...r'   r   )r!   r<   s   ` r$   __call__zMapOverBatch.__call__@   s    ........r'   N)	r,   r-   r.   r   r/   r   r;   r   rB   r   r'   r$   r8   r8   <   sd        xQ/    /hqk /hqk / / / / / /r'   r8   in_qout_q
stop_eventc                 l   i }d}|                                 s	 |                     dt                    \  }}n# t          j        $ r Y Gw xY w||k    r|                    ||fd           |dz  }no||v rf	 t          d|d|                                d	|          # t          $ r t          d
          }Y nw xY w|                    ||fd           d S |||<   ||v r5|                    |
                    |          |fd           |dz  }||v 5|                                 d S d S )Nr   TblocktimeoutF)rH   r   zDuplicate index idx=z, buffer.keys()=z, item=zin _sort_worker)where)is_setgetr   queueEmptyput
ValueErrorkeys	Exceptionr   pop)rC   rD   rE   buffercur_idxitemidxs          r$   _sort_workerrX   D   s   FG!! 	t]CCID##{ 	 	 	H	'>>IItWoUI333qLGGf}}E$%Y%Y%Y%Y%YRV%Y%YZZZ  E E E+2CDDDDDDE		4+U	333F3KIIvzz'**G4EIBBBqLG % !!     s!   ; AA:*B$ $C Cc            
           e Zd ZdZdZ	 ddee         deegef         de	e
eef                  fdZd Zde
eef         fd	Zd
 ZdS )_InlineMapperIterz%Non-Parallel implementation of Mapperr0   Nr1   initial_statec                     || _         || _        |'| j                             || j                            d S | j                                          d S r   )r0   r1   reset
SOURCE_KEY)r!   r0   r1   r[   s       r$   r;   z_InlineMapperIter.__init__c   sU     $KmDO<=====Kr'   c                 P    |                      t          | j                            S r   )r1   nextr0   r!   s    r$   __next__z_InlineMapperIter.__next__p   s    {{4,,---r'   r2   c                 B    | j         | j                                        iS r   )r^   r0   
state_dictra   s    r$   	get_statez_InlineMapperIter.get_states   s    !7!7!9!9::r'   c                     d S r   r   ra   s    r$   	_shutdownz_InlineMapperIter._shutdownv   s    r'   r   )r,   r-   r.   __doc__r^   r   r/   r   r   r
   r   strr   r;   rb   re   rg   r   r'   r$   rZ   rZ   ^   s        //J 37	     !a    S#X/	       . . .;4S> ; ; ; ;    r'   rZ   c                       e Zd ZdZdee         deegef         dede	de
d         ded	ee         d
edeeeef                  fdZdee         fdZdefdZdeeef         fdZdefdZd Zd ZdS )_ParallelMapperItera  _ParallelMapperIter will start at least two threads, one running
    _populate_queue, and one for _apply_udf. If in_order == True, a
    third thread will be started to read from _apply_udf's result q
    and block the output_q until the appropriate in_order element is available,
    buffering outputs as needed.

    A BoundedSemaphore with initial value max_concurrent will limit the number
    of items in flight, and in all of the queues.
    r0   r1   r4   in_ordermethodthreadprocess
mp_contextmax_concurrentsnapshot_frequencyr[   c
           	      N   || _         || _        || _        || _        || _        || _        || _        |dk    rt          j                    n|                                | _	        |dk    rt          j                    n|                                | _
        |
d| j        z  n|| _        t          j        | j                  | _        d| _        t          j                    | _        |                                | _        d| _        d}
|	5|	d         | _        |	d         }
| j                             | j                   n d | _        | j                                          t-                      | _        t          j        t2          | j         | j	        | j        | j        | j        | j        fd	          | _        g | _        t9          | j                  D ]}|| j	        | j
        | j        | j        dk    r| j        n| j        f}| j                            | j        dk    rt          j        t<          |d	          n|                    t<          |d	                     t          j                    | _         t          j        tB          | j
        | j         | j        fd	          | _"        | j
        | _#        | j        r| j         | _#        | j        $                                 | j        D ]}|$                                 | j        r| j"        $                                 tK          j&        d
           | j        '                    | j        tP                    | _        t9          |
          D ]6}	 tS          |            # tT          $ r tW          d|
 d| d          w xY wd S )Nro      valueFr   snapshotsteps_since_snapshotTtargetr"   daemong{Gz?ro   rI   Tried to fast-forward / items during init but hit StopIteration after 4 items, this is likely a bug or malformed state_dict),r0   r1   r4   rl   rm   rq   rs   rM   r+   _in_q_intermed_q
_max_tasks	threadingBoundedSemaphore_sem_doner)   _stop_mp_stop_steps_since_snapshot	_snapshotr]   r   _snapshot_storeThreadr   _read_thread_workersrangeappendr   r%   _sort_qrX   _sort_thread_out_qstarttimesleepget_initial_snapshotACK_TIMEOUTr`   StopIterationrP   )r!   r0   r1   r4   rl   rm   rq   rr   rs   r[   fast_forward	worker_idr"   tis                  r$   r;   z_ParallelMapperIter.__init__   s    & $"4DJhDVDV5;===\f\l\l\n\n
JPT\J\J\blbrbrbtbt2@2H!d...n.T_EEE	
_&&
"((**%&"$*:6DN()?@LKdn----!DNK133%,"
$'	
 
 
 
 DFt/00 	 	I
 "kX55

4=D M  ;(**  
dKKKK''zT'RR   
 %*KMM%,"DL$*=
 
 
 &= 	',DK!!! 	 	AGGIIII= 	&##%%%
4-BB$J[epBqq|$$ 	 	AT



     g\ g g/0g g g  	 	s   1N!N"r2   c                     | S r   r   ra   s    r$   __iter__z_ParallelMapperIter.__iter__       r'   c                    	 | j                                         rt                      | j        rU| j        j        | j        k    r@| j                                          | j                                         t                      	 | j	        
                    dt                    \  }}n# t          j        $ r Y w xY wt          |t                    r!d| _        | j                                         t          |t                     rBt          |t"                    s| j                                         |                                 | xj        dz  c_        | j                                         |                     |           |S NTrG   r   )r   rK   r   r   r   _valuer   setr   r   rL   r   rM   rN   
isinstancereleaser   r   reraiser   _maybe_update_snapshotr!   rV   rW   s      r$   rb   z_ParallelMapperIter.__next__   sp   	z  "" &#oo% &	 0DO C C
   !!####oo% KOO$ONN	cc;    $.. !
	!!###D"233 !$(?@@ (I%%'''&&!+&&I'',,,Ks   $B+ +B=<B=c                      | j         | j        dS N)rx   ry   r   r   ra   s    r$   re   z_ParallelMapperIter.get_state      $($>
 
 	
r'   rW   c                 ^    | j                             |          x}|| _        d| _        d S d S Nr   r   pop_versionr   r   r!   rW   rx   s      r$   r   z*_ParallelMapperIter._maybe_update_snapshot  <    ,88===HJ%DN)*D&&& KJr'   c                 .    |                                   d S r   rg   ra   s    r$   __del__z_ParallelMapperIter.__del__      r'   c                 6   | j                                          | j                                         t          | d          r<| j                                        r#| j                            t          dz             t          | d          r<| j                                        r#| j                            t          dz             t          | d          r<| j	        D ]6}|                                r|                    t          dz             5d S d S )Nr      rI   r   r   )
r   r   r   hasattrr   is_alivejoinr   r   r   )r!   r   s     r$   rg   z_ParallelMapperIter._shutdown  s   
4(( 	>T->-G-G-I-I 	>""=1+<"===4(( 	>T->-G-G-I-I 	>""=1+<"===4$$ 	6] 6 6::<< 6FF=1#4F555	6 	66 6r'   N)r,   r-   r.   rh   r   r/   r   r   intboolr	   r   r
   r   ri   r   r;   r   r   rb   re   r   r   rg   r   r'   r$   rk   rk   z   sT        ^^ !a ^ 	^
 ^ +,^ )^ !^  ^  S#X/^ ^ ^ ^@(1+    !    :
4S> 
 
 
 
+# + + + +
  
6 
6 
6 
6 
6r'   rk   c                   8    e Zd ZdZ	 	 	 	 	 ddee         deegef         ded	e	d
e
d         dee         dee         def fdZddeeeef                  f fdZdeeeef                  fdZdeeeef                  fdZdefdZdeeef         fdZ xZS )_ParallelMapperImplaL  This class implements _ParallelMapperIter and _InlineMapperIter as a BaseNode,
    allowing them to be composed with other BaseNodes.

    TODO: In the future, this class may go away once we implement reset() on
    _ParallelMapperIter and _InlineMapperIter themselves so we don't need this
    additional level of abstraction.
    Tro   Nr   r0   r1   r4   rl   rm   rn   multiprocessing_contextrr   rs   c	                    t                                                       |dv sJ || _        || _        || _        || _        || _        || _        t          | _	        | j        dk    r%| j        t          j
        | j                  | _	        |7|dk    r1t          |t                    r||k    rt          d|d|d          || _        || _        d | _        d S )Nrn   rp   r   max_concurrent= should be <= num_workers=!)superr;   r0   r1   r4   rl   rm   r   mp_mp_contextget_contextr   r   rP   rr   rs   _it)
r!   r0   r1   r4   rl   rm   r   rr   rs   	__class__s
            r$   r;   z_ParallelMapperImpl.__init__)  s     	.....& '>$ ";)##(D(P!~d.JKKD%+//.#.. T>K3O3O !RN!R!R;!R!R!RSSS,"4RVr'   r[   c                     t                                          |           | j        | `| j        dk    r|                     |          | _        d S |                     |          | _        d S r   )r   r]   r   r4   _parallel_reset_inline_resetr!   r[   r   s     r$   r]   z_ParallelMapperImpl.resetG  sf    m$$$8a++M::DHHH))-88DHHHr'   c                 :    t          | j        | j        |          S )N)r0   r1   r[   )rZ   r0   r1   r!   r[   s     r$   r   z!_ParallelMapperImpl._inline_resetQ  s     DKWdeeeer'   c                     t          | j        | j        | j        | j        | j        | j        | j        | j        |	  	        S )N)	r0   r1   r4   rl   rm   rq   rr   rs   r[   )	rk   r0   r1   r4   rl   rm   r   rr   rs   r   s     r$   r   z#_ParallelMapperImpl._parallel_resetT  sH    ";;(];'.#6'

 

 

 
	
r'   r2   c                 *    t          | j                  S r   r`   r   ra   s    r$   r`   z_ParallelMapperImpl.nexta      DH~~r'   c                 4    | j                                         S r   )r   re   ra   s    r$   re   z_ParallelMapperImpl.get_stated  s    x!!###r'   )Tro   NNr   r   )r,   r-   r.   rh   r   r/   r   r   r   r   r	   r
   ri   r;   r   r   r]   r   r   r`   re   __classcell__r   s   @r$   r   r      s         /715(,"#W WW !a W 	W
 W +,W "*#W !W  W W W W W W<9 98DcN#; 9 9 9 9 9 9f8DcN+C f f f f
Xd38n-E 
 
 
 
a    $4S> $ $ $ $ $ $ $ $r'   r   c                        e Zd ZdZdZ	 	 	 	 	 	 ddee         deegef         d	e	d
e
ded         dee         dee	         de	dee	         f fdZddeeeef                  f fdZdefdZdeeef         fdZ xZS )r5   a  ParallelMapper executes map_fn in parallel either in num_workers threads or
    processes. For processes, multiprocessing_context can be spawn, forkserver, fork,
    or None (chooses OS default). At most max_concurrent items will be either processed
    or in the iterator's output queue, to limit CPU and Memory utilization. If None
    (default) the value will be 2 * num_workers.

    At most one iter() is created from source, and at most one thread will call
    next() on it at once.

    If in_order is true, the iterator will return items in the order from which they arrive
    from source's iterator, potentially blocking even if other items are available.

    Args:
        source (BaseNode[X]): The source node to map over.
        map_fn (Callable[[X], T]): The function to apply to each item from the source node.
        num_workers (int): The number of workers to use for parallel processing.
        in_order (bool): Whether to return items in the order from which they arrive from. Default is True.
        method (Literal["thread", "process"]): The method to use for parallel processing. Default is "thread".
        multiprocessing_context (Optional[str]): The multiprocessing context to use for parallel processing. Default is None.
        max_concurrent (Optional[int]): The maximum number of items to process at once. Default is None.
        snapshot_frequency (int): The frequency at which to snapshot the state of the source node. Default is 1.
        prebatch (Optional[int]): Optionally perform pre-batching of items from source before mapping.
          For small items, this may improve throughput at the expense of peak memory.
    it_stateTro   Nr   r0   r1   r4   rl   rm   rn   r   rr   rs   prebatchc
           
         t                                                       |dv sJ || _        || _        || _        || _        |7|dk    r1t          |t                    r||k    rt          d|d|d          || _	        || _
        |	| _        |	|| _        || _        nE|	dk    rt          d|	d          t          |          | _        t          ||	d	
          | _        t!          | j        | j        | j        | j        | j        | j        | j	        | j
                  }
| j        	|
| _        d S t%          |
          | _        d S )Nrn   r   r   r   r   z	prebatch=z must be a positive integer!r:   F)
batch_size	drop_last)r0   r1   r4   rl   rm   r   rr   rs   )r   r;   r4   rl   rm   r   r   r   rP   rr   rs   r   r1   r0   r8   r   r   r   r   )r!   r0   r1   r4   rl   rm   r   rr   rs   r   r   r   s              r$   r;   zParallelMapper.__init__  sv    	.....& '>$%+//.#.. T>K3O3O !RN!R!R;!R!R!RSSS,"4  DK DKK1}} !KH!K!K!KLLL&f555DK!&XOOODK!;;(];$($@.#6	
 	
 	
 = DHHH ~~DHHHr'   r[   c                     t                                          |           |'| j                            || j                            d S | j                                         d S r   )r   r]   r   IT_STATE_KEYr   s     r$   r]   zParallelMapper.reset  sY    m$$$$HNN=):;<<<<<HNNr'   r2   c                 *    t          | j                  S r   r   ra   s    r$   r`   zParallelMapper.next  r   r'   c                 B    | j         | j                                        iS r   )r   r   rd   ra   s    r$   re   zParallelMapper.get_state  s    !48#6#6#8#899r'   )Tro   NNr   Nr   )r,   r-   r.   rh   r   r   r/   r   r   r   r   r	   r
   ri   r;   r   r   r]   r`   re   r   r   s   @r$   r5   r5   h  sV        2 L /715(,"#"&/& /&/& !a /& 	/&
 /& +,/& "*#/& !/&  /& 3-/& /& /& /& /& /&b 8DcN#;      a    :4S> : : : : : : : :r'   r5   c                       e Zd ZdZdee         dedededee	e
ef                  f
dZdee         fd	Zdefd
Zde	e
ef         fdZdefdZd Zd ZdS )_SingleThreadedMappera  Utility Iterator for performing mapping with a single thread.
    Because only a single thread is used, we don't need an input queue to guard
    against multiple threads reading from the same iterator. This is used for
    Prefetcher and PinMemory.

    A thread is started on __init__ and stopped on __del__/_shutdown.
    The thread runs _populate_queue, which acquires a BoundedSemaphore with initial value
    of `prefetch_factor`.

    When next() is called on this iterator, it will block until an item is available on _q.
    Next will perform the following depending on what is pulled from the q:
    - StopIteration: raise StopIteration. Any subsequent next() calls will also raise StopIteration
    - ExceptionWrapper: call reraise() on the exception wraper
    - any other item: return the item

    A Bounded semaphore is used to limit concurrency and memory utilization.
    If N items have been pulled from the source, and M items have been yielded by this iterator,
    we maintain the invariant that semaphore.value + (N - M) == prefetch_factor (modulo
    non-atomicness of operations).

    _populate_queue calls semaphore.acquire. When we pull an item from the queue, we
    call semaphore.release (unless it's a StartupExceptionWrapper, because _populate_queue
    does not acquire sempahores in this case). All outstanding items are either being
    processed in _populate_queue, in the _q, or about to be returned by an in-flight next() call.
    r0   prefetch_factorworkerrs   r[   c           	         || _         || _        || _        || _        t	          j                    | _        t          j        |          | _	        t          j
                    | _        d| _        d| _        |:|d         | _        |d         | _        | j                             | j                   n d | _        | j                                          t!                      | _        t          j        | j        | j         | j        | j        | j        | j	        | j        fd          | _        | j                                         | j                            | j        t,                    | _        t/          | j                  D ];}	 t1          |            # t2          $ r t5          d| j         d	| d
          w xY wd| _        d S )Nrv   r   rx   ry   Trz   r}   r~   r   r   )r0   r   r   rs   rM   r+   _qr   r   r   r)   _stop_eventr   _fast_forwardr   r]   r   r   r   _threadr   r   r   r   r`   r   rP   )r!   r0   r   r   rs   r[   r   s          r$   r;   z_SingleThreadedMapper.__init__  s    ."4${}}._EEE	$?,,%&"$*:6DN!./E!FDKdn----!DNK133 ';$'	  
 
 
 	 -BB$,`kBllt)** 	 	AT



     gT-? g g/0g g g  
 s   F&F:r2   c                     | S r   r   ra   s    r$   r   z_SingleThreadedMapper.__iter__!  r   r'   c                    	 | j                                         rt                      	 | j                            dt
                    \  }}n# t          j        $ r Y _w xY wt          |t                    r4| j	        
                                 | j                                          |t          |t                    r\t          |t                    s| j	        
                                 | j                                          |                                 n@| j	        
                                 | xj        dz  c_        |                     |           |S ^r   )r   rK   r   r   rL   r   rM   rN   r   r   r   r   r   r   r   r   r   r   s      r$   rb   z_SingleThreadedMapper.__next__$  sR   	&&(( &#oo% GKKdMKJJ	cc;    $.. 	!!### $$&&&
D"233 
!$(?@@ (I%%''' $$&&&	!!###**a/**++C000-	s   $A A! A!c                      | j         | j        dS r   r   ra   s    r$   re   z_SingleThreadedMapper.get_state=  r   r'   rW   c                 ^    | j                             |          x}|| _        d| _        d S d S r   r   r   s      r$   r   z,_SingleThreadedMapper._maybe_update_snapshotC  r   r'   c                 .    |                                   d S r   r   ra   s    r$   r   z_SingleThreadedMapper.__del__H  r   r'   c                     | j                                          t          | d          r>| j                                        r'| j                            t          dz             d S d S d S )Nr   r   r   )r   r   r   r   r   r   r   ra   s    r$   rg   z_SingleThreadedMapper._shutdownK  sv    4## 	9(=(=(?(? 	9Lma&788888	9 	9 	9 	9r'   N)r,   r-   r.   rh   r   r   r   _WorkerTyper
   r   ri   r   r;   r   r   rb   re   r   r   rg   r   r'   r$   r   r     s
        444 4 	4
  4  S#X/4 4 4 4l(1+    !    2
4S> 
 
 
 
+# + + + +
  9 9 9 9 9r'   r   )5rM   r   r   typingr   r   r   r   r   r   r	   r
   r   r   r   r   torch.multiprocessingmultiprocessingr   torchdata.nodes.base_noder   r   torchdata.nodes.batchr   r   !torchdata.nodes.exception_wrapperr   r   torchdata.nodes.snapshot_storer   r   r   r   	constantsr   r   r   r/   r6   XseqTseqr8   r+   r)   rX   rZ   rk   r   r5   r   r   r   r   r   r'   r$   <module>r      sd         v v v v v v v v v v v v v v v v v v v v v v v v v v v v " " " " " " 1 1 1 1 1 1 1 1 4 4 4 4 4 4 4 4 W W W W W W W W L L L L L L L L " " " " " " , , , , , , $ $ $ $ $ $    8    GCLL8A; !a(8 =P     {{/ / / / /71a4= / / /uU["(23 EK U^Ud    4       8c6 c6 c6 c6 c6(1+ c6 c6 c6LE$ E$ E$ E$ E$(1+ E$ E$ E$PX: X: X: X: X:Xa[ X: X: X:v " 	

~9 ~9 ~9 ~9 ~9HQK ~9 ~9 ~9 ~9 ~9r'   