
    PipI                    $   d Z ddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
mZmZmZmZ ddlZddlmZ ddlZddlZddlmZ ddlmZmZmZmZmZmZmZmZ ddlm Z m!Z! ddl"m#Z#m$Z$ dd	l%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+ dd
l,m-Z-m.Z. ddl/m0Z0 ddl1m2Z2m3Z3m4Z4m5Z5 g dZ6ddlm7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=  edd          Z> ej?        e@          ZAdZBdZCdZDdZEdZFdZG G d dee>                   ZH G d de           ZI G d deI          ZJ G d deI          ZKdS ) az  Definition of the StatefulDataLoader and associated iterators.

This file is a stand-in for torch.utils.data.dataloader, and includes a
StatefulDataLoader, which inherits from DataLoader and adds
state_dict/load_state_dict methods, as well as implementations for
single and multi-process iterators which are also stateful.

Where possible, we import the original definitions from torch.utils.data.dataloader,
and use inheritance for base classes only (StatefulDataLoader, _StatefulBaseDataLoaderIter).

For the single and multi-process iterator implementations, we fork the code to avoid a
diamond-shaped multiple-inheritance scheme.
    N)AnyDictIterableListOptionalTypeVarUnion)ExceptionWrapper)_utils
DataLoaderDatasetIterableDatasetIterDataPipeMapDataPipeSamplerSequentialSampler)_BaseDataLoaderIter_InfiniteConstantSampler)!_IterDataPipeSerializationWrapper _MapDataPipeSerializationWrapper   )_DATASET_ITER_STATE_DATASET_STATE_FETCHER_ENDED_FETCHER_STATE_IncrementalWorkerState
_WORKER_ID)BatchSamplerRandomSampler)Stateful)_AckStartup_worker_looptry_to_deserializetry_to_serialize)StatefulDataLoaderget_worker_infodefault_collatedefault_convert)_collate_fn_t_DatasetKind_sharding_worker_init_fn_worker_init_fn_tr'   r(   r&   _T_coT)	covariant_index_sampler_state_sampler_iter_state_sampler_iter_yielded_IterableDataset_len_called_shared_seed_iterator_finishedc            #       v   e Zd ZU dZed         ed<   	 	 	 	 	 	 	 	 	 	 	 	 d#dddd	dd
dee         dee         dee	         de
eedf         de
ee         ee         df         dedee         de	de	dedee         dee         de	dede	dee         f dZd$dZd%dZdeeef         fd Zd!eeef         ddfd"ZdS )&r%   a  
    This is a drop in replacement for ``torch.utils.data.DataLoader``
    that implements state_dict and load_state_dict methods, enabling mid-epoch
    checkpointing.

    All arguments are identical to ``torch.utils.data.DataLoader``, with
    a new kwarg: ``snapshot_every_n_steps``.

    Args:
        dataset (Dataset): dataset from which to load the data.
        batch_size (int, optional): how many samples per batch to load
            (default: ``1``).
        shuffle (bool, optional): set to ``True`` to have the data reshuffled
            at every epoch (default: ``False``).
        sampler (Sampler or Iterable, optional): defines the strategy to draw
            samples from the dataset. Can be any ``Iterable`` with ``__len__``
            implemented. If specified, :attr:`shuffle` must not be specified.
        batch_sampler (Sampler or Iterable, optional): like :attr:`sampler`, but
            returns a batch of indices at a time. Mutually exclusive with
            :attr:`batch_size`, :attr:`shuffle`, :attr:`sampler`,
            and :attr:`drop_last`.
        num_workers (int, optional): how many subprocesses to use for data
            loading. ``0`` means that the data will be loaded in the main process.
            (default: ``0``)
        collate_fn (Callable, optional): merges a list of samples to form a
            mini-batch of Tensor(s).  Used when using batched loading from a
            map-style dataset.
        pin_memory (bool, optional): If ``True``, the data loader will copy Tensors
            into device/CUDA pinned memory before returning them.  If your data elements
            are a custom type, or your :attr:`collate_fn` returns a batch that is a custom type,
            see the example below.
        drop_last (bool, optional): set to ``True`` to drop the last incomplete batch,
            if the dataset size is not divisible by the batch size. If ``False`` and
            the size of dataset is not divisible by the batch size, then the last batch
            will be smaller. (default: ``False``)
        timeout (numeric, optional): if positive, the timeout value for collecting a batch
            from workers. Should always be non-negative. (default: ``0``)
        worker_init_fn (Callable, optional): If not ``None``, this will be called on each
            worker subprocess with the worker id (an int in ``[0, num_workers - 1]``) as
            input, after seeding and before data loading. (default: ``None``)
        multiprocessing_context (str or multiprocessing.context.BaseContext, optional): If
            ``None``, the default `multiprocessing context`_ of your operating system will
            be used. (default: ``None``)
        generator (torch.Generator, optional): If not ``None``, this RNG will be used
            by RandomSampler to generate random indexes and multiprocessing to generate
            ``base_seed`` for workers. (default: ``None``)
        prefetch_factor (int, optional, keyword-only arg): Number of batches loaded
            in advance by each worker. ``2`` means there will be a total of
            2 * num_workers batches prefetched across all workers. (default value depends
            on the set value for num_workers. If value of num_workers=0 default is ``None``.
            Otherwise, if value of ``num_workers > 0`` default is ``2``).
        persistent_workers (bool, optional): If ``True``, the data loader will not shut down
            the worker processes after a dataset has been consumed once. This allows to
            maintain the workers `Dataset` instances alive. (default: ``False``)
        pin_memory_device (str, optional): the device to :attr:`pin_memory` to if ``pin_memory`` is
            ``True``.
        in_order (bool, optional): If ``False``, the data loader will not enforce that batches
            are returned in a first-in, first-out order. Only applies when ``num_workers > 0``. (default: ``True``)
        snapshot_every_n_steps (int, optional): Defines how often the state is
            transferred from the dataloader workers to the dataloader. By default, it is set to ``1``, i.e., state is transferred every step. If the state is large, this value can be increased (and ideally set to the frequency of training checkpointing) to reduce the overhead of transferring state every step.


    .. warning:: If the ``spawn`` start method is used, :attr:`worker_init_fn`
                 cannot be an unpicklable object, e.g., a lambda function. See
                 `multiprocessing-best-practices <https://pytorch.org/docs/stable/notes/multiprocessing.html#multiprocessing-best-practices>`_ on more details related
                 to multiprocessing in PyTorch.

    .. warning:: ``len(dataloader)`` heuristic is based on the length of the sampler used.
                 When :attr:`dataset` is an :class:`~torch.utils.data.IterableDataset`,
                 it instead returns an estimate based on ``len(dataset) / batch_size``, with proper
                 rounding depending on :attr:`drop_last`, regardless of multi-process loading
                 configurations. This represents the best guess PyTorch can make because PyTorch
                 trusts user :attr:`dataset` code in correctly handling multi-process
                 loading to avoid duplicate data.

                 However, if sharding results in multiple workers having incomplete last batches,
                 this estimate can still be inaccurate, because (1) an otherwise complete batch can
                 be broken into multiple ones and (2) more than one batch worth of samples can be
                 dropped when :attr:`drop_last` is set. Unfortunately, PyTorch can not detect such
                 cases in general.

                 See `Dataset Types <https://pytorch.org/docs/stable/data.html>`_ for more details on these two types of datasets and how
                 :class:`~torch.utils.data.IterableDataset` interacts with
                 `Multi-process data loading <https://pytorch.org/docs/stable/data.html#multi-process-data-loading>`_.

    .. warning:: See `Reproducibility <https://pytorch.org/docs/stable/notes/randomness.html#reproducibility>`_, and `Dataloader-workers-random-seed <https://pytorch.org/docs/stable/notes/faq.html#dataloader-workers-random-seed>`_, and
                 `Data-loading-randomness <https://pytorch.org/docs/stable/data.html#data-loading-randomness>`_ notes for random seed related questions.

    .. warning:: Setting `in_order` to `False` can harm reproducibility and may lead to a skewed data distribution being fed to the trainer in cases with imbalanced data.

    .. warning:: Setting `in_order` to `False` currently has no guarantees for state management.

    .. _multiprocessing context:
        https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
    _StatefulBaseDataLoaderIter	_iteratorr   Nr   F T)prefetch_factorpersistent_workerspin_memory_devicein_ordersnapshot_every_n_stepsdataset
batch_sizeshufflesamplerbatch_samplernum_workers
collate_fn
pin_memory	drop_lasttimeoutworker_init_fnr9   r:   r;   r<   r=   c                   t           j                            d           |dk     rt          d          |
dk     rt          d          |dk    r|t          d          |dk    r|d}n||dk     rt          d          |r|dk    rt          d          |dk    r|st                              d	           || _        || _        || _        || _	        || _
        |
| _        || _        || _        || _        t          | j        t                     rt#          | j                  | _        n3t          | j        t$                    rt'          | j                  | _        t          |t(                    rt*          j        | _        t          |t                     r.|+t           j        j        j                            ||
          }n|dvrt          d|           |t          d|           |t          d|           n t9          |          }t*          j        | _        ||rt          d          | |dk    s|s||	rt          d          d }d}	n||	rt          d          |G| j        t*          j        k    rt=                      }n#|rt?          ||          }ntA          |          }||tC          |||	          }|| _"        |	| _#        || _$        || _%        || _&        |*| j'        rtP          j)        j*        }ntP          j)        j+        }|| _,        || _-        d| _.        d | _/        d | _0        | 1                                 || _2        d | _3        d| _4        t          j5        ddd           d S )Nzpython.stateful_data_loaderr   zXnum_workers option should be non-negative; use num_workers=0 to disable multiprocessing.z%timeout option should be non-negativezprefetch_factor option could only be specified in multiprocessing.let num_workers > 0 to enable multiprocessing, otherwise set prefetch_factor to None.   z-prefetch_factor option should be non-negativez/persistent_workers option needs num_workers > 0using in_order=False with multiple workers does not give any guarantees for state management and loading from a checkpoint may not work as expected.)r@   >   FNzVDataLoader with IterableDataset: expected unspecified shuffle option, but got shuffle=zVDataLoader with IterableDataset: expected unspecified sampler option, but got sampler=zbDataLoader with IterableDataset: expected unspecified batch_sampler option, but got batch_sampler=z1sampler option is mutually exclusive with shuffler   z[batch_sampler option is mutually exclusive with batch_size, shuffle, sampler, and drop_lastFzVbatch_size=None option disables auto-batching and is mutually exclusive with drop_last)	generatorT
DataloaderenabledTrue)6torch_C_log_api_usage_once
ValueErrorloggerwarningr>   rC   r9   rE   r;   rG   rH   multiprocessing_contextr<   
isinstancer   r   r   r   r   r*   r   _dataset_kindutilsdatagraph_settingsapply_shuffle_settingsboolMapr   r   r   r   r?   rF   rA   rB   rL   _auto_collationr   collater'   r(   rD   r:   _DataLoader__initializedr2   r7   check_worker_number_rationalityr=   next_iter_state_initial_iter_for_state_dict	set_vital)selfr>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rV   rL   r9   r:   r;   r<   r=   s                      /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchdata/stateful_dataloader/stateful_dataloader.py__init__zStatefulDataLoader.__init__   s)   , 	$$%BCCC??m   Q;;DEEE! ;h   1__!8OO(_q-@-@LMMM 	P+"2"2NOOO??8?NNJ  
 &.$!2,'>$  dlL11 	J<T\JJDLLk22 	J;DLIIDL g// 1	2!-!6D4 '<00 &#k.=TTU\fmTnnG-- vmtvv   " vmtvv   * SCPS S   + 7mmG!-!1D7STTT$Q'W-@I-@ w   JII  o   ?!\%:::244 9+GyIIIGG/88G!m&;(*iHHM$"*"# <#^;

#^;
$"4 )-%+/(,,...&<#9=
 -2)i88888    returnc                     | j         dk    rt          | | j                  }n)|                                  t	          | | j                  }d | _        |S Nr   )rC   $_StatefulSingleProcessDataLoaderIterrc   rb   &_StatefulMultiProcessingDataLoaderIter)rf   its     rg   _get_iteratorz StatefulDataLoader._get_iterator}  sU    q  5dD<PQQBB002227d>RSSB#	ri   r   c                    | j         rd| _         | j        J ng| j        rG| j        dk    r<| j        |                                 | _        n4| j                            |            n|                                 | _        | j        j        r;| j        r| j                            |            n|                                 | _        | j        S )NFr   )rd   r7   r:   rC   rp   _reset	_finishedrf   s    rg   __iter__zStatefulDataLoader.__iter__  s     , 		205D->----$ 	2)9A)=)=~%!%!3!3!5!5%%d++++!//11DN># 	6& 6%%d++++!%!3!3!5!5~ri   c                     | j          |                                 | _         d| _        | j                                         S NT)r7   rp   rd   
state_dictrt   s    rg   rx   zStatefulDataLoader.state_dict  s9    >!!//11DN04D-~((***ri   rx   c                 @    d | _         d| _        |i k    rd S || _        d S NF)r7   rd   rc   rf   rx   s     rg   load_state_dictz"StatefulDataLoader.load_state_dict  s/    ,1)F)ri   )r   NNNr   NFFr   NNN)rj   r6   )rj   r   )__name__
__module____qualname____doc__r   __annotations__r   r-   intr]   r	   r   r   r   r)   floatr,   strrh   rp   ru   r   r   rx   r|    ri   rg   r%   r%   ]   s        ^ ^@ 56666
 %&"&26DH.2 6: ${9  *.#(!#01){9 {9 {9{9 SM{9 $	{9
 w$./{9 WT]HTND@A{9 {9 ]+{9 {9 {9 {9 !!23{9  "#!{9" !#{9$ %{9& '{9( !)){9 {9 {9 {9z      0+DcN + + + +*$sCx. *T * * * * * *ri   r%   c                   N     e Zd Zdeddf fdZd
 fd	Z fdZd Z fd	Z xZ	S )r6   loaderrj   Nc                 f    t                                          |           d| _        d| _        d S Nr   F)superrh   r1   rs   )rf   r   	__class__s     rg   rh   z$_StatefulBaseDataLoaderIter.__init__  s.       %&"ri   Fc                 h    t                                          ||           d| _        d| _        d S r   )r   rr   r1   rs   )rf   r   
first_iterr   s      rg   rr   z"_StatefulBaseDataLoaderIter._reset  s.    vz***%&"ri   c                 h    t                                                      }| xj        dz  c_        |S )Nr   )r   _next_indexr1   )rf   idxr   s     rg   r   z'_StatefulBaseDataLoaderIter._next_index  s1    gg!!##""a'""
ri   c                     d S Nr   rt   s    rg   rx   z&_StatefulBaseDataLoaderIter.state_dict  s    ri   c                 r    	 t                                                      S # t          $ r	 d| _         w xY wrw   )r   __next__StopIterationrs   )rf   r   s    rg   r   z$_StatefulBaseDataLoaderIter.__next__  sC    	77##%%% 	 	 	!DN	s   # 6F)
r}   r~   r   r%   rh   rr   r   rx   r   __classcell__r   s   @rg   r6   r6     s        1 d      
     
    
          ri   r6   c                   :     e Zd ZdZdZd fd	Zd Zd Zd Z xZ	S )	rm   a  We avoid using inheritance here to share code because we quickly run into
    a diamond which becomes difficult to reason about, so instead we fork the
    code from torch.utils.data.dataloader for _SingleProcessDataLoaderIter and
    _MultiProcessDataLoaderIter. This allows us to satisfy the original
    dataloader __iter__'s return type of _BaseDataLoaderIter (since
    _StatefulBaseDataLoader inherits from _BaseDataLoaderIter).
    _num_yieldedNc                    t                                          |           | j        dk    sJ | j        dk    sJ t	          | j        t          t          f          r:t          j	        j
        j                            | j        | j        | j                   ||                     |           d S t!          j        | j        | j        | j        | j        | j                  | _        d S rl   )r   rh   _timeout_num_workersrW   _datasetr   r   rP   rY   rZ   r[   apply_sharding_world_size_rankr|   r*   create_fetcherrX   r_   _collate_fn
_drop_last_dataset_fetcher)rf   r   rc   r   s      rg   rh   z-_StatefulSingleProcessDataLoaderIter.__init__  s       }!!!! A%%%% dmlK%@AA 	hK+::4=$JZ\`\fggg&  11111$0$?"$ % %D!!!ri   c                     |                                  }| j                            |          }| j        r%t          j                            || j                  }|S r   )r   r   fetch_pin_memoryr   rE   _pin_memory_device)rf   indexrZ   s      rg   
_next_dataz/_StatefulSingleProcessDataLoaderIter._next_data  sU      ""$**511 	O$//d6MNNDri   c                 ,   | j         t          j        k    ret          t	          | j        j                  t          | j        j        i}d }| j        j        | j        j	        urt	          | j        j	                  }nd }t	          | j        j	                  }t          t	          | j                  t          t	          | j                  t          | j        | j        | j        t$          | j        t(          | j        t,          |t.          |t0          | j        i	}|S r   )rX   r*   r   r   r$   r   dataset_iterr   endedr>   _INDEX_SAMPLER_STATE_index_sampler_SAMPLER_ITER_STATE_sampler_iter_SAMPLER_ITER_YIELDEDr1   _NUM_YIELDEDr   _ITERABLEDATASET_LEN_CALLEDr2   _SHARED_SEEDr3   r   r   _ITERATOR_FINISHEDrs   )rf   fetcher_statedataset_staterx   s       rg   rx   z/_StatefulSingleProcessDataLoaderIter.state_dict  s    !666#%5d6K6X%Y%Y 5 ;M !M$19N9VVV 01F1N O O M,T-B-JKKM !"243F"G"G!1$2D!E!E!4#=t0')I$+MM


 ri   c                 0   | j         |v sJ d| j          d            |t                   | _        t          | j        t
                    st          | j        t
                    rqt          | j        |t                             | _        t          | j                  | _        |t                   %t          | j        |t                             | _        nSt          | j        t          j        j        j        j                  s%t!          j        | j        | j        d           | _        || j                  | _        |t&                   | _        |t*                   | _        |t.                   ?t          | j        t
                    r%t          | j        |t.                             | _        t3          j        | j        | j        | j        | j        | j                  | _        | j        t2          j         k    rt          | j        t
                    st          | j        j!        t
                    r|tD                   t|tD                   tF                   :t          | j        j!        |tD                   tF                             | j        _!        |tD                   tH                   | j        _%        nT| j        dk    rItL          '                    d| j         d           tQ          | j                  D ]}tS          |            |tT                   | _+        d S )NState doesn't contain key 'z(' expected for single process dataloaderr   wNeither dataset nor iter(dataset) defines state_dict/load_state_dict so we are naively fast-forwarding your dataset by  steps. For more efficient resumes, please implement `state_dict` and `load_state_dict` in your IterableDataset and/or iterator.),r   r   r1   rW   r   r    r   r#   r   iterr   rP   rY   rZ   
dataloaderr   	itertoolsislicer   r   r2   r   r3   r   r   r*   r   rX   r_   r   r   r   r   r   r   r   r   r   rT   rU   rangenextr   rs   )rf   rx   _s      rg   r|   z4_StatefulSingleProcessDataLoaderIter.load_state_dict
  s   +++d):ddd ,++%/0E%F" d)844 	m
4CUW_8`8` 	m"4T5H*UiJj"k"kD!%d&9!:!:D-.:%78JJWjLk%l%l"# +D  m
 &/%5d6I4Kegk%l%l"&t'89+56Q+R(&|4 n%1jPX6Y6Y1.t}j>XYYDM , ;M O!
 !
 !666$-22 #jAVAcem6n6n #n-9!.12EFR=O 1>&~67JK> >-: 3=^2L^2\D)/ $q((NNACGCTA A A  
 #4#455 # #T



#$67ri   r   )
r}   r~   r   r   r   rh   r   rx   r|   r   r   s   @rg   rm   rm     s{          "L     ,    488 88 88 88 88 88 88ri   rm   c                       e Zd ZU dZeed<   dZdZdZdZ	dZ
dZdZd	Z fd
Zd% fd	Zd Zd Zej        fdZd ZdedefdZd Zd Zd Zd Zd Zd Zdedededeeef         deee f         f
d Z!d&d!Z"d" Z#e$d#             Z%d$ Z& xZ'S )'rn   zIIterates once over the DataLoader's dataset, as specified by the sampler._last_yielded_worker_idr   	_snapshot_main_snapshot_worker_snapshots_snapshot_step_steps_since_snapshot
_base_seedc                 2    t                                          |           |j         _        |j         _        |j         _         j        dk    sJ  j        dk    sJ |j	        t          }n|j	        }|j         _        t           j        t          t           f          r0t#          j        t&           j         j         j                   _        |                                 _        d _        d _        |                                 _        g  _        g  _         fdt=           j                  D             }|a j        |v sJ d j         d            | j                                       j!        i           }tE          tG           j$        t=          tK          |                                        tE          |&                                          k    s+J tK          |          |&                                f            |'                                D ]
\  }}|||<   | j                  j(                                       j)         j*                   _*        | j                  j(                                      tV           j,                   _,        t=           j                  D ]}|                                }	|	-                                 |.                    t^           j0         j        |	 j         j         j1         j2         j3         j*         j        | j         j4         j,        | $                    |                   f          }
d|
_5        |
6                                  j        7                    |	            j        7                    |
            j8        rFts          j                     _:        tw          j                     _<         j=        dk    rt|          j?        @                                }n j=        t|          jA        B                                k    rFt          t|          t|          jA        B                                          }|@                                }nt|          jD        @                                }ts          jE        t          jG        jH         j         j<        | j:         j=        f          }d|_5        |6                                 | _I        n j         _<         j4        r5 j8        r.dd lJ} j        D ]"}
|K                    t          jM        |
           #t          jN        O                    t                     t          d	  j        D                                  t          jN        R                                 d _        i t          jT                    c _U         _V        d
 |'                                D              _W         X                    |d|d u            |~ Y                    | j                  j(                            | j                  jZ                  _[         \                    | j                  jZ                 | j                  j]                  j        | j                  j(                  jW                   d} j0        t          j_        k    rA|`                                D ],}||t                   |t                   t                   d} |rt=           j         j        z            D ]} d                                  j[        dk    rIt          f                    d j[         d           t=           j[                  D ]}t                       jh        | j                  j]                 k    rt          d          n~| j                  j]                  _h        t=           jh        dz             D ]}t           jj                   t=           j         j        z            D ]} d                                 t=          | jk                           D ]}t                      |t                    _m        d S d S )Nr   Fc                 <    i | ]}                     |          d S r   )_worker_key).0irf   s     rg   
<dictcomp>zC_StatefulMultiProcessingDataLoaderIter.__init__.<locals>.<dictcomp>  s)    UUUq))!,,dUUUri   r   z&' expected for multiprocess dataloader)targetargsTxpuc              3   $   K   | ]}|j         V  d S r   )pidr   ws     rg   	<genexpr>zB_StatefulMultiProcessingDataLoaderIter.__init__.<locals>.<genexpr>  s$      ?]?]!?]?]?]?]?]?]ri   c                 4    i | ]\  }}|t          |          S r   )r   )r   keystates      rg   r   zC_StatefulMultiProcessingDataLoaderIter.__init__.<locals>.<dictcomp>  s'    !n!n!n*#u#'>u'E'E!n!n!nri   )r   prime_prefetchsnapshot_steplast_yielded_worker_idrC   main_snapshotworker_snapshotsr   r   zClast_yielded_worker_id does not match, the dataset may have changedr   )nr   rh   r=   _snapshot_intervalr9   _prefetch_factorr<   	_in_orderr   rV   multiprocessingrH   _worker_init_fnrW   r   r   r   	functoolspartialr+   r   r   Queue_worker_result_queue_worker_pids_set	_shutdownEvent_workers_done_event_index_queues_workersr   	_SNAPSHOTget_WORKER_SNAPSHOTSsetmapr   lenkeysitems_MAIN_SNAPSHOT
_BASE_SEEDr   r   r3   cancel_join_threadProcessr"   rX   r_   r   r   _persistent_workersdaemonstartappendr   	threading_pin_memory_thread_done_eventqueue_data_queuer   rP   r   current_devicerQ   _get_privateuse1_backend_namegetattrcudaThreadr   rE   _pin_memory_loop_pin_memory_threadatexitregisterrn   _clean_up_workersignal_handling_set_worker_pidsidtuple_set_SIGCHLD_handlercollectionsdequer   _main_snapshotsr   rr   _restore_main_state_SNAPSHOT_STEPr   _update_snapshot_LAST_YIELDED_WORKER_IDr*   r   valuesr   r   r   _try_put_indexrT   rU   r   r   rS   _worker_queue_idx_cycle_STEPS_SINCE_SNAPSHOTr   rs   )rf   r   rc   rV   worker_stateswstates
worker_keysdr   index_queuer   r  custom_device_modpin_memory_threadr  fast_forwardr   r   r   s   `                 rg   rh   z/_StatefulMultiProcessingDataLoaderIter.__init__  sO      "("? & 6 1$$$$$q(((()1&5##&,&D#%4 dmlK%@AA 	#,#4($ 
	$ $D  %<$A$A$C$C! %#:#@#@#B#B UUUUE$BS<T<TUUU&/111cT^ccc 211%dn599$:PRTUUGs4+U3w<<-@-@AABBc',,..FYFYYYYG\YYY #*--// / /
B,.j))-dn=d>QRVVW[WfhlhwxxDO / ?@S T X Xd/! !D t()) $	$ $	$A17799K **,,,'//#&M-,($OO(%,%!$"2"21"5"56 0  A( AH GGIII%%k222M  #### 	91:1B1BD.  %{}}D&%//!&!9!9!;!;(EH,R,R,T,TTT$+E583Y3Y3[3[$\$\!!2!A!A!C!C!&!:!:!<!< ) 0(9-$"6+	! 	! 	! (,$##%%% '8D###8D # 	\(8 	\MMM] \ \ F WYZ[[[[ 	//4%?]?]t}?]?]?]:]:]^^^33555 $/1;3D3F3F,, "o!nXeXkXkXmXm!n!n!nFtOt<STTT &$$_T^%DTEX%YZZZ / ?@S TD!!-dn=d>QR'6t~'FtGc'd --dn=d>QR!%!7 "    !L!\%:::*1133  E} ^,4~9NOb9c9k'+ *t4t7HHII * *A''))))$q((NNACGCTA A A  
 #4#455 # #T



/?4>3RSWSo3ppp$%jkkk q 0?t~/NtOk/l,t;a?@@ 7 7A56666t4t7HHII * *A''))))?4+EFGG  T



,-?@DNNN[ '&ri   FTc                 l   t                                          ||           d| _        d| _        i | _        d| _        d t          | j                  D             | _        d t          | j                  D             | _	        t          j        t          | j                            | _        | j        }|rpt          | j                  D ]0}| j        |                             t          |d                      1|dk    r"|                                 \  }}t#          | j                  st%          d| j                   t'          |t                    rt'          |j        t*                    r|j                                         |j        r>| j        |                     |j                                               |j                   n4t9          |j                  | j        |                     |j                  <   |dz  }nt%          d|           |dk    "nmt          | j                  D ]D}| j        |                             t:          j                            | j                              E| j        }	|	dk    r|                                 \  }
}t#          | j                  st%          d| j                   t'          |
t:          j        j                  rt'          |t                    sJ |
|f            t'          |j        t*                    r|j                                         |j        
J |            t9          |j                  | j        |                     |j                  <   |	dz  }	|	dk    tC          j"                    | _#        | j        dz
  | _$        | %                    d| j        dz
  | j        | &                                | j                   |r3t          | j'        | j        z            D ]}| (                                 d S d S )	Nr   c                     g | ]}d S )Tr   r   r   s     rg   
<listcomp>zA_StatefulMultiProcessingDataLoaderIter._reset.<locals>.<listcomp>[  s    GGGGGGri   c                     g | ]}d S )r   r   r+  s     rg   r,  zA_StatefulMultiProcessingDataLoaderIter._reset.<locals>.<listcomp>`  s    "G"G"G1"G"G"Gri   z$A worker has failed during startup! r   z,Invalid response from worker after startup: z#A worker has failed during Resume! r   ))r   rr   	_send_idx	_rcvd_idx
_task_info_tasks_outstandingr   r   _workers_status_workers_num_tasksr   cycler  r   putr!   	_get_dataallrS   rW   initial_stater
   reraiseis_deltar   r   	worker_idapply_deltar   r   worker_ResumeIterationr3   r  r  r  r   r  _get_main_stater   r  )rf   r   r   r   	remainingr   r   rZ   r   resume_iteration_cnt
return_idxr   s              rg   rr   z-_StatefulMultiProcessingDataLoaderIter._resetK  sB   vz*** "#  HGeD4E.F.FGGG
 #H"GeD4E.F.F"G"G"G'0uT=N7O7O'P'P$%	 '	.4,-- @ @"1%))+a*>*>????a--..**44/00 \$%bDL`%b%bccck22 \!$"46FGG 5*22444} .t/?/?/O/OP\\]a]oppppSj .T T.t/?/?/O/OP NII$%ZTX%Z%Z[[[! a--& T.// _ _"3'++FM,J,J4K\,],]^^^^#'#4 &**#'>>#3#3 
D4/00 c$%a4K_%a%abbbj&-*HII .%dK88LL:t:LLL8!$"46FGG 5*22444-994999Of*P PD*4+;+;DN+K+KL )A-( '**  +022'+'81'<$#'#4q#8)..00!3 	 	
 	
 	
  	&4043DDEE & &##%%%%	& 	&& &ri   c                 N    |d S | j         |                             |           d S r   )r   r<  )rf   r#  rx   s      rg   _update_worker_snapshotz>_StatefulMultiProcessingDataLoaderIter._update_worker_snapshot  s/    Fz*66zBBBBBri   c                     | j         st                              d           | j        | j        | j                 z
  }| j        | j        | j        |t          | j	        i}|S )NrK   )
r   rT   rU   r   r   r  r   r   r   rs   )rf   steps_since_snapshotrx   s      rg   rx   z1_StatefulMultiProcessingDataLoaderIter.state_dict  sj    ~ 	NNJ    $04>$BU3VVNDN&(<

 ri   c                    	 | j                             |          }d|fS # t          $ r1}g }t          | j                  D ]P\  }}| j        |         r>|                                s*|                    |           |                     |           Qt          |          dk    r3d
                    d |D                       }t          d| d          |t          |t          j                  rY d }~dS dd l}dd l	 d	}	fd
t#          |	          D             }
n7# t$          $ r*}|j        |j        k    rt          d          d Y d }~nd }~ww xY w d }~ww xY w)NrG   Tr   z, c              3   >   K   | ]}t          |j                  V  d S r   )r   r   r   s     rg   r   zG_StatefulMultiProcessingDataLoaderIter._try_get_data.<locals>.<genexpr>  s*      $H$HASZZ$H$H$H$H$H$Hri   zDataLoader worker (pid(s) z) exited unexpectedly)FN
   c                 8    g | ]}                                 S r   )NamedTemporaryFile)r   r   tempfiles     rg   r,  zH_StatefulMultiProcessingDataLoaderIter._try_get_data.<locals>.<listcomp>  s%    UUUh1133UUUri   a  Too many open files. Communication with the workers is no longer possible. Please increase the limit using `ulimit -n` in the shell or change the sharing strategy by calling `torch.multiprocessing.set_sharing_strategy('file_system')` at the beginning of your code)r  r   	Exception	enumerater   r2  is_aliver  _mark_worker_as_unavailabler   joinRuntimeErrorrW   r  EmptyerrnorM  r   OSErrorEMFILE)rf   rG   rZ   efailed_workersr;  r   pids_strrU  fds_limit_marginfsrM  s              @rg   _try_get_dataz4_StatefulMultiProcessingDataLoaderIter._try_get_data  s   %	#'''88D$< "	 "	 "	  N )$- 8 8 @ @	1'	2 @1::<< @"))!,,,44Y???>""Q&&99$H$H$H$H$HHH"#_#_#_#_``fgg!U[)) %$}}}}}LLLOOO 
 $& UUUUUCS=T=TUUU 	  	  	 7el**&9     +****	  E"	sE   " ECE:ED! E!
E+ EEEEEc                    | j         dk    r9|                     | j                   \  }}|r|S t          d| j          d          | j        r\| j                                        r4|                                 \  }}|r|S | j                                        4t          d          	 |                                 \  }}|r|S )Nr   zDataLoader timed out after z secondsz%Pin memory thread exited unexpectedly)r   r]  rS  r   r  rP  )rf   successrZ   s      rg   r6  z0_StatefulMultiProcessingDataLoaderIter._get_dataF  s     =1 ..t}==MGT Z"#X#X#X#XYYY 	 )2244 L $ 2 2 4 4  K )2244 L ##JKKK  $ 2 2 4 4  K ri   r;  rj   c                     d| S )Nworker_r   )rf   r;  s     rg   r   z2_StatefulMultiProcessingDataLoaderIter._worker_keyg  s    $$$$ri   c                    	 | j         | j        k     rx| j                            | j         d           }|r6|d         }t	          |          dk    s| j        |         rnO| j        | j         = | xj         dz  c_         | j         | j        k     x| j        s|                                  t          t	          | j        | j                            dk    r| j        	                    | j                   d         \  }}}t          |t          j        j                  r@|                     |                     |j                  |           | xj         dz  c_         V| xj         dz  c_         |                     |||          S | j        s| j        dk    sJ |                                 \  }\  }}}| xj        dz  c_        | j        t,          j        k    rpt          |t          j        j                  rQ| j        rd| j        |j        <   n|                     |j                   |
J d            |                                  || j         k    r| j        snt          |t          j        j                  r0|                     |                     |j                  |           | j        |= |                     |||          S | j        |xx         |||ffz  cc<   n| j        |= t          |t          j        j                  r@|                     |                     |j                  |           | xj         dz  c_         B| xj         dz  c_         |                     |||          S j)NTr   rJ   r   Fz:StopIteration should always be accompanied by a state_dict)r/  r.  r0  r   r   r2  r   _shutdown_workersr   poprW   r   r=  _IterableDatasetStopIterationrD  r   r;  _process_datar   r1  r6  rX   r*   r   rQ  r  r   )rf   infor;  rZ   rx   r   s         rg   r   z1_StatefulMultiProcessingDataLoaderIter._next_dataj  sq   E	K .4>11**4>4@@ 8 $QI4yyA~~)=i)H~7!# .4>11 / -**,,,##
 4?4>233q88.2o.A.A$..Q.QRS.T+idFM$OPP K001A1A$.1Q1QS]^^^NNa'NNNNa'NN--dIzJJJ~E$*AA*E*E*EE151A1A.C.$	:##q(##!\%:::dFM$OPP */ I?D,T^<<88HHH%113o111'')))
 dn$$~ K!$(STT !44T5E5Edn5U5UWabbb ,--dIzJJJ$$$$	:)F(HH$$$$OC(dFM$OPP K001A1A$.1Q1QS]^^^NNa'NNNNa'NN--dIzJJJKE	Kri   c                     | j         | j        t          t          | j                  t
          t          | j                  t          | j        t          | j
        t          | j        | j        | j        iS r   )_NUM_WORKERSr   r   r$   r   r   r   r   r1   r   r2   r   r3   r   r   rt   s    rg   r?  z6_StatefulMultiProcessingDataLoaderIter._get_main_state  sZ    t0!1$2D!E!E "243F"G"G!4#=')I$+OT_
 	
ri   c                    | j         || j                 k    sJ |t                   | _        t	          | j        t                    st	          | j        t                    rqt          | j        |t                             | _        t          | j                  | _        |t                   %t          | j        |t                             | _        nSt	          | j        t          j        j        j        j                  s%t#          j        | j        | j        d           | _        |t&                   | _        |t*                   | _        || j                 | _        d S r   )r   ri  r   r1   rW   r   r    r   r#   r   r   r   rP   rY   rZ   r   r   r   r   r   r2   r   r3   r   r   r{   s     rg   r  z:_StatefulMultiProcessingDataLoaderIter._restore_main_state  s"    Jt/@$AAAAA%/0E%F"d)844 	m
4CUW_8`8` 	m"4T5H*UiJj"k"kD!%d&9!:!:D-.:%78JJWjLk%l%l"# +D  m
 &/%5d6I4Kegk%l%l"+56Q+R(&|4$T_5ri   c                    | j         | j        z  }| j        |k     sJ 	 |                                 }d}d}| j        sn| j        t          j        k    rG| j        | j        z  }|dz   | j        | j         z  z   }|| j        k    rd}|| j        z   | j        k    rd}n:| j	        | j        z  dk    rd}| j	        dz
  | j        z  | j        z   | j        k    rd}n# t          $ r Y d S w xY wt          | j                  D ]T}t          | j                  }| j        |         r1| j        r n+| j        |         |t#          | j                  z  k     r nUd S |r7|sJ | j                            | j        |                                 f           | j        |                             | j        ||ff           |f| j        | j        <   | j        |xx         dz  cc<   | xj        dz  c_        | xj        dz  c_        d S )NFr   Tr   )r   r   r1  r   r   rX   r*   r   r   r1   r   r   r   r  r2  r   r3  sumr  r  r.  r?  r   r5  r0  )	rf   	max_tasksr   snapshot_mainsnapshotxhir   worker_queue_idxs	            rg   r  z5_StatefulMultiProcessingDataLoaderIter._try_put_index  so   )D,==	&2222	$$&&E!MH* $#|'<<<%(??UT.1FFF000$(M))T-DDD#H-0GG1LL$(M/!3t7NN%&)-)@A A  $H 	 	 	FF	t()) 	 	A#D$@AA#$45 > E,-=>cRVRfNgNgAggg E F 	ROO8 ''9M9M9O9O(PQQQ+,00$.5(BS1TUUU+;*=' 0111Q61111$!s   B6C 
C#"C#c                    | j         |xx         dz  cc<   |                                  t          |t                    r|                                 || _        |4|                     |                     |t                             |           | j	        r*| j
        dz   | j	        z  dk    r|                                  |S )Nr   r   )r3  r  rW   r
   r9  r   rD  r   r   r   r   _take_snapshot)rf   rZ   r;  rx   s       rg   rf  z4_StatefulMultiProcessingDataLoaderIter._process_data  s    	***a/***d,-- 	LLNNN'0$!(()9)9*Z:P)Q)QS]^^^" 	"):Q)>$BY(Y]^(^(^!!!ri   c                    d }t          | j                  rn| j        d         d         | j        dz
  k    rO| j                                        \  }}t          | j                  r| j        d         d         | j        dz
  k    O| j        s|d S || j        dz
  k    sJ || j        dz
  f            |                     | j        dz   | j        | j        || j	                   d S )Nr   r   )
r   r  r/  popleftr   r  r   r   r   r   )rf   main_snapshot_idxr   s      rg   rt  z5_StatefulMultiProcessingDataLoaderIter._take_snapshot  s    $&'' 	NT-A!-DQ-G4>\]K]-]-]/3/C/K/K/M/M,} $&'' 	NT-A!-DQ-G4>\]K]-]-]~ 	"3"; F DNQ$6666NQ9
666 	!("	
 	
 	
 	
 	
ri   r   r   rC   r   r   c           
          | j         || j        || j        || j        d |                                D             i| _        d S )Nc                 >    i | ]\  }}||                                 S r   )	get_state)r   r   worker_states      rg   r   zK_StatefulMultiProcessingDataLoaderIter._update_snapshot.<locals>.<dictcomp>0  s+    $u$u$uGXsLS,*@*@*B*B$u$u$uri   )r  r  r   r   r   r   )rf   r   r   rC   r   r   s         rg   r  z7_StatefulMultiProcessingDataLoaderIter._update_snapshot$  sN     (*@"$u$u\l\r\r\t\t$u$u$u	
ri   c                     | j         |         s| j        s|sJ | j        |         }|                    d            d| j         |<   | j                                        |k    sJ d S rz   )r2  r   r   r5  r   is_set)rf   r;  shutdownqs       rg   rQ  zB_StatefulMultiProcessingDataLoaderIter._mark_worker_as_unavailable3  sz    
 #I.V$2JVhVVV y) 	
d +0Y''..00H<<<<<<ri   c                    t           t           j        du st           j        d S | j        s:d| _        	 t          | d          r~| j                                         | j                            d           | j        	                                 | j        
                                 | j                                         | j                                         t          t          | j                            D ]-}| j        s| j        |         r|                     |d           .| j        D ]"}|	                    t           j                   #| j        D ]*}|
                                 |                                 +	 | j        r3t           j                            t1          |                      d| _        | j        D ]*}|                                r|                                 +d S # | j        r3t           j                            t1          |                      d| _        | j        D ]*}|                                r|                                 +w xY wd S )NTr  )NN)r~  rH  F)r   python_exit_statusr   hasattrr  r   r   r5  r  rR  r   closer   r   r   r   r   r2  rQ  MP_STATUS_CHECK_INTERVALr   r   r  _remove_worker_pidsr  rP  	terminate)rf   r;  r   r  s       rg   rc  z8_StatefulMultiProcessingDataLoaderIter._shutdown_workersM  sz    >V6$>>&B[BcF ~ 9	&!DN7& 4!566 66::<<< -11,???+00222-@@BBB-33555 (,,...!&s4='9'9!:!: S SI / S43G	3R S88T8RRR D DA FF6#BFCCCC+  A((***GGIIII ( 2*>>r$xxHHH,1D) & &Azz|| &
 & & ( 2*>>r$xxHHH,1D) & &Azz|| &
 &g9	& 9	&s   EG5 5A.I#c                     	 |                      t          j                   |                                 r|                                  d S d S # |                                 r|                                  w w xY w)NrH  )rR  r   r  rP  r  )r   s    rg   r  z7_StatefulMultiProcessingDataLoaderIter._clean_up_worker  sp    	FF6:F;;;zz||  qzz|| s    A +A9c                 .    |                                   d S r   )rc  rt   s    rg   __del__z._StatefulMultiProcessingDataLoaderIter.__del__  s         ri   )FTr   )(r}   r~   r   r   r   r   ri  r   r   r   r  r   r  r   rh   rr   rD  rx   r   r  r]  r6  r   r   r   r?  r  r  rf  rt  r   r   r   r  rQ  rc  staticmethodr  r  r   r   s   @rg   rn   rn   E  s$        TTj	 !   !LI%N+%N37JDA DA DA DA DALP& P& P& P& P& P&dC C C
    %+$C 1 1 1 1h     B%S %S % % % %FK FK FKP	
 	
 	
6 6 6(0 0 0d  
 
 
(

 !$
 	

 CH~
 s$;;<
 
 
 
= = = =4B& B& B&J   \! ! ! ! ! ! !ri   rn   )Lr   r  r   r   loggingr  r  typingr   r   r   r   r   r   r	   rP   torch.multiprocessingr   torch.utils.data._utils.workertorch.utils.data.graph_settingstorch._utilsr
   torch.utils.datar   r   r   r   r   r   r   r   torch.utils.data.dataloaderr   r   #torch.utils.data.datapipes.datapiper   r   incremental_stater   r   r   r   r   r   rA   r   r   statefulr    r=  r!   r"   r#   r$   __all__r)   r*   r+   r,   r'   r(   r&   r-   	getLoggerr}   rT   r   r   r   r   r   r   r%   r6   rm   rn   r   ri   rg   <module>r     s                      F F F F F F F F F F F F F F F F F F  / / / / / / % % % % & & & & ) ) ) ) ) )	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 V U U U U U U U s s s s s s s s                1 0 0 0 0 0 0 0       S S S S S S S S S S S S                    	4(((		8	$	$- + / ; ) M* M* M* M* M*E* M* M* M*`
    "5   6z8 z8 z8 z8 z8+F z8 z8 z8zV! V! V! V! V!-H V! V! V! V! V!ri   