
    &`i                        d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZm Z m!Z! d d
l"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* d dl+m,Z, d dl-m.Z.m/Z/ erd dl0Z1d dl2Z2d dl3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9  ed          Z: G d dee:                   Z;e. G d de j<                              Z=e=Z>dS )    N)TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalTupleTypeVarUnionBatchIterator)	RefBundle)LogicalPlan)	InputData)ExecutionPlan)DatasetStats)BlockAccessor	DataBatch_apply_batch_format)ArrowBatchCollateFn	CollateFnDefaultCollateFnNumpyBatchCollateFnPandasBatchCollateFnTensorBatchReturnTypeTensorBatchTypeis_tensor_batch_type)DataContext)	PublicAPIRayDeprecationWarning)CollatedDataMaterializedDatasetSchemaTensorFlowTensorBatchTypeTorchBatchTypeTorchDeviceTypeTc                   <    e Zd Zdeg ee         f         fdZd ZdS )_IterableFromIteratoriterator_genc                     || _         dS )zConstructs an Iterable from an iterator generator.

        Args:
            iterator_gen: A function that returns an iterator each time it
                is called. For example, this can be a generator function.
        Nr-   )selfr-   s     e/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/iterator.py__init__z_IterableFromIterator.__init__;   s     )    c                 *    |                                  S Nr/   r0   s    r1   __iter__z_IterableFromIterator.__iter__D   s      """r3   N)__name__
__module____qualname__r   r   r*   r2   r7    r3   r1   r,   r,   :   sK        )Xb(1+o%> ) ) ) )# # # # #r3   r,   c                      e Zd ZdZej        deee         e	e
         ef         fd            Zeddddddd	d
edede	e         dede	e         de	e         dee         fd            Zdee         defdZdddddddddd
edede	e         dede	e         de	e         de	eegdf                  de	eegef                  dee         fdZdefdZedeeeef                  fd            Zej        edefd                        Zej        dAd            Zej        defd            Zedddddddddd	d
ede	e         d e	ed!eed!f         f                  d"ed#e d         f         d$e	eeeee!j"        f         gdf         e#f                  dede	e         de	e         d%eded&         fd'            Z$ddddddd(d
ede	e         d e	ed)eed)f         f                  dede	e         de	e         ded*         fd+Z%dddddddddd,d,d-d.e	e         d/e	ee&e         e&e&e                  eee&e         f         f                  d0e	d!         d1e	ed!e&d!         eed!f         f                  ded
edede	e         de	e         d2ed3edd4fd5Z'edddddddddd6	d/eee&e         f         d7eee&e         f         d8ee	e         e	e&e                  f         d
ededede	e         de	e         d9ed:eed:f         f         d;ed:eed:f         f         d<ee	d:         e	eed:f                  f         dd=fd>            Z(edBd@            Z)dS )CDataIteratora(  An iterator for reading records from a :class:`~Dataset`.

    For Datasets, each iteration call represents a complete read of all items in the
    Dataset.

    If using Ray Train, each trainer actor should get its own iterator by calling
    :meth:`ray.train.get_dataset_shard("train")
    <ray.train.get_dataset_shard>`.

    Examples:
        >>> import ray
        >>> ds = ray.data.range(5)
        >>> ds
        Dataset(num_rows=5, schema={id: int64})
        >>> ds.iterator()
        DataIterator(Dataset(num_rows=5, schema={id: int64}))
    returnc                     dS )a  Returns the iterator to use for `iter_batches`.

        Returns:
            A tuple. The first item of the tuple is an iterator over RefBundles.
            The second item of the tuple is a DatasetStats object used for recording
            stats during iteration.
            The third item is a boolean indicating if the blocks can be safely cleared
            after use.
        Nr;   r6   s    r1   _to_ref_bundle_iteratorz$DataIterator._to_ref_bundle_iterator\   s	     	r3         defaultFNprefetch_batches
batch_sizebatch_format	drop_lastlocal_shuffle_buffer_sizelocal_shuffle_seedrE   rF   rG   rH   rI   rJ   c                8    |                      ||||||          S )a  Return a batched iterable over the dataset.

        Examples:
            >>> import ray
            >>> for batch in ray.data.range(
            ...     1000000
            ... ).iterator().iter_batches(): # doctest: +SKIP
            ...     print(batch) # doctest: +SKIP

        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool will be used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            batch_size: The number of rows in each batch, or None to use entire blocks
                as batches (blocks may contain different number of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            batch_format: Specify ``"default"`` to use the default block format
                (NumPy), ``"pandas"`` to select ``pandas.DataFrame``, "pyarrow" to
                select ``pyarrow.Table``, or ``"numpy"`` to select
                ``Dict[str, numpy.ndarray]``, or None to return the underlying block
                exactly as is with no additional formatting.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer will be drained.
            local_shuffle_seed: The seed to use for the local random shuffle.

        Returns:
            An iterable over record batches.
        rD   )_iter_batches)r0   rE   rF   rG   rH   rI   rJ   s          r1   iter_batcheszDataIterator.iter_batchesk   s4    ^ !!-!%&?1 " 
 
 	
r3   ref_bundles_iterc                     t          |fi |S r5   r   )r0   rN   kwargss      r1   _create_batch_iteratorz#DataIterator._create_batch_iterator   s     -88888r3   rE   rF   rG   rH   rI   rJ   _collate_fn_finalize_fnrS   r$   rT   c          
           t                    dt          t                   f f	d}	t          |	          S )Nr>   c               3     	K   t          j                    }                                 \  }}}                                }                    ||||	
          }|r.|j                            t          j                    | z
             |E d {V  |r0|j                            t          j                    | z
             d S d S )N)statsdataset_tagclear_block_after_readrF   rG   rH   
collate_fnfinalize_fnshuffle_buffer_min_sizeshuffle_seedrE   )timeperf_counterr@   _get_dataset_tagrQ   iter_initialize_sadditer_total_s)
time_startref_bundles_iteratorrW   blocks_owned_by_consumerrX   batch_iteratorrS   rT   rG   rF   rH   rI   rJ   rE   r0   s         r1   _create_iteratorz4DataIterator._iter_batches.<locals>._create_iterator   s     *,,J ,,..	$( //11K!88$''?%)#&((A/!1 9  N  N'++D,=,?,?*,LMMM%%%%%%%% I"&&t'8':':Z'GHHHHHI Ir3   )r   r   r   r,   )
r0   rE   rF   rG   rH   rI   rJ   rS   rT   rh   s
   ````````` r1   rL   zDataIterator._iter_batches   s     +<88$	I(9"5 $	I $	I $	I $	I $	I $	I $	I $	I $	I $	I $	I $	I $	I $	IL %%5666r3   c                     dS )Nunknown_datasetr;   r6   s    r1   r`   zDataIterator._get_dataset_tag   s      r3   c                 \    |                      ddd          fd}t          |          S )a   Return a local row iterable over the dataset.

        If the dataset is a tabular dataset (Arrow/Pandas blocks), dicts
        are yielded for each row by the iterator. If the dataset is not tabular,
        the raw row is yielded.

        Examples:
            >>> import ray
            >>> dataset = ray.data.range(10)
            >>> next(iter(dataset.iterator().iter_rows()))
            {'id': 0}

        Time complexity: O(1)

        Returns:
            An iterable over rows of the dataset.
        NrA   )rF   rG   rE   c               3      K   D ]E} t          j        t          j        |                     } |                     d          D ]}|V  Fd S )NT)public_row_format)r   	for_blockbatch_to_block	iter_rows)batchrowbatch_iterables     r1   _wrapped_iteratorz1DataIterator.iter_rows.<locals>._wrapped_iterator   sh      '  %/0LU0S0STT ??T?BB  CIIII r3   )rL   r,   )r0   rt   rs   s     @r1   rp   zDataIterator.iter_rows   sP    & ++$ , 
 
	 	 	 	 	 %%6777r3   c                     dS )z9Returns a string containing execution timing information.Nr;   r6   s    r1   rW   zDataIterator.stats   s	     	r3   r&   c                     dS )z/Return the schema of the dataset iterated over.Nr;   r6   s    r1   schemazDataIterator.schema  s	     	r3   c                     d S r5   r;   r6   s    r1   get_contextzDataIterator.get_context  s    r3   auto)	rE   rF   dtypesdevicerZ   rH   rI   rJ   
pin_memoryr{   ztorch.dtyper|   r)   rZ   r}   r(   c       	   
         ddl m}
 ddlm} ||dk    rt	          d          |	r|t	          d          dk    r |            r
 |
            nddd	lm d
t          dt          t          t          f         ffd}|t          ||	          }d}nt          |t                    rd}n{t          |t                    rd}nct          |t                    rd}nKt!          |          rd}t#          j        dt&                     nt	          dt)          |                     |                     ||||||||          S )a  Return a batched iterable of Torch Tensors over the dataset.

        This iterable yields a dictionary of column-tensors. If you are looking for
        more flexibility in the tensor conversion (e.g. casting dtypes) or the batch
        format, try using :meth:`~ray.data.DataIterator.iter_batches` directly.

        Examples:
            >>> import ray
            >>> for batch in ray.data.range(
            ...     12,
            ... ).iterator().iter_torch_batches(batch_size=4):
            ...     print(batch)
            {'id': tensor([0, 1, 2, 3])}
            {'id': tensor([4, 5, 6, 7])}
            {'id': tensor([ 8,  9, 10, 11])}

            Use the ``ArrowBatchCollateFn`` to customize how the tensor batch is created
            from an Arrow batch.

            >>> import pyarrow as pa
            >>> import torch
            >>> import ray
            >>> from ray.data.collate_fn import ArrowBatchCollateFn
            >>> class CustomArrowBatchCollateFn(ArrowBatchCollateFn):
            ...     def __call__(self, batch: pa.Table) -> torch.Tensor:
            ...         return torch.as_tensor(batch["col_1"].to_numpy() + 5)
            >>> iterator = ray.data.from_items([
            ...     {"col_1": 1, "col_2": 2},
            ...     {"col_1": 3, "col_2": 4}]).iterator()
            >>> for batch in iterator.iter_torch_batches(collate_fn=CustomArrowBatchCollateFn()):
            ...     print(batch)
            tensor([6, 8])

            Use the ``NumpyBatchCollateFn`` to customize how the tensor batch is created
            from a Numpy batch.

            >>> from typing import Dict
            >>> import numpy as np
            >>> import torch
            >>> import ray
            >>> from ray.data.collate_fn import NumpyBatchCollateFn
            >>> class CustomNumpyBatchCollateFn(NumpyBatchCollateFn):
            ...     def __call__(self, batch: Dict[str, np.ndarray]) -> torch.Tensor:
            ...         return torch.as_tensor(batch["col_1"] + 5)
            >>> iterator = ray.data.from_items([
            ...     {"col_1": 1, "col_2": 2},
            ...     {"col_1": 3, "col_2": 4}]).iterator()
            >>> for batch in iterator.iter_torch_batches(collate_fn=CustomNumpyBatchCollateFn()):
            ...     print(batch)
            tensor([6, 8])

            Use the ``PandasBatchCollateFn`` to customize how the tensor batch is created
            from a Pandas batch.

            >>> import pandas as pd
            >>> import torch
            >>> import ray
            >>> from ray.data.collate_fn import PandasBatchCollateFn
            >>> class CustomPandasBatchCollateFn(PandasBatchCollateFn):
            ...     def __call__(self, batch: pd.DataFrame) -> torch.Tensor:
            ...         return torch.as_tensor(batch["col_1"].to_numpy() + 5)
            >>> iterator = ray.data.from_items([
            ...     {"col_1": 1, "col_2": 2},
            ...     {"col_1": 3, "col_2": 4}]).iterator()
            >>> for batch in iterator.iter_torch_batches(collate_fn=CustomPandasBatchCollateFn()):
            ...     print(batch)
            tensor([6, 8])

        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool will be used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            batch_size: The number of rows in each batch, or None to use entire blocks
                as batches (blocks may contain different number of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            dtypes: The Torch dtype(s) for the created tensor(s); if None, the dtype
                will be inferred from the tensor data. You can't use this parameter
                with ``collate_fn``.
            device: The device on which the tensor should be placed. Defaults to
                "auto" which moves the tensors to the appropriate device when the
                Dataset is passed to Ray Train and ``collate_fn`` is not provided.
                Otherwise, defaults to CPU. You can't use this parameter with
                ``collate_fn``.
            collate_fn: [Alpha] A function to customize how data batches are collated
                before being passed to the model. This is useful for last-mile data
                formatting such as padding, masking, or packaging tensors into custom
                data structures. If not provided, `iter_torch_batches` automatically
                converts batches to `torch.Tensor`s and moves them to the device
                assigned to the current worker. The input to `collate_fn` may be:

                1. pyarrow.Table, where you should provide a callable class that
                   subclasses `ArrowBatchCollateFn` (recommended for best performance).
                   Note that you should use util function `arrow_batch_to_tensors` to
                   convert the pyarrow.Table to a dictionary of non-contiguous tensor
                   batches.
                2. Dict[str, np.ndarray], where you should provide a callable class that
                   subclasses `NumpyBatchCollateFn`
                3. pd.DataFrame, where you should provide a callable class that
                   subclasses `PandasBatchCollateFn`

                The output can be any type. If the output is a `TensorBatchType`, it will be
                automatically moved to the current worker's device. For other types,
                you must handle device transfer manually in your training loop.
                Note: This function is called in a multi-threaded context; avoid using
                thread-unsafe code.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer will be drained. This
                buffer size must be greater than or equal to ``batch_size``, and
                therefore ``batch_size`` must also be specified when using local
                shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.
            pin_memory: [Alpha] If True, copies the tensor to pinned memory. Note that
                `pin_memory` is only supported when using `DefaultCollateFn`.

        Returns:
            An iterable over Torch Tensor batches.
        r   )
get_device)_in_ray_train_workerNrz   zcollate_fn cannot be used with dtypes and device.You should manually move the output Torch tensors to thedesired dtype and device outside of collate_fn.z;pin_memory is only supported when using `DefaultCollateFn`.cpu)move_tensors_to_devicerq   r>   c                 @    t          |           r |           S | S )a  Default finalize function for moving PyTorch tensors to device. If
            batch is of type `TensorBatchType`, it will be automatically moved to the
            current worker's device. For other types, you must handle device transfer
            manually in your training loop.

            Args:
                batch: Input batch to move to device.

            Returns:
                Batch with tensors moved to the target device.
                - If input is TensorBatchType, returns tensors moved to device
                - Otherwise returns the same type as input without moving tensors
                to device.
            )r|   )r    )rq   r|   r   s    r1   default_finalize_fnz<DataIterator.iter_torch_batches.<locals>.default_finalize_fn  s0    " $E** --eFCCCCr3   )r{   r|   r}   pyarrownumpypandaszPassing a function to `iter_torch_batches(collate_fn)` is deprecated in Ray 2.47. Please switch to using a callable class that inherits from `ArrowBatchCollateFn`, `NumpyBatchCollateFn`, or `PandasBatchCollateFn`.zUnsupported collate function: rR   )ray.train.torchr   ray.train.utilsr   
ValueErrorray.air._internal.torch_utilsr   r   r   r   r   r   
isinstancer   r   r   callablewarningswarnr#   typerL   )r0   rE   rF   r{   r|   rZ   rH   rI   rJ   r}   r   r   r   rG   r   s       `         @r1   iter_torch_batcheszDataIterator.iter_torch_batches  s   \ 	/.....888888!v'9Vv=M=MB    	*0M   V &:%9%;%;FZZ\\\F	
 	
 	
 	
 	
 	
	"	(#-.	 	 	 	 	 	 	,  *%  J
 %LL
$788 	R %LL
$788 	R"LL
$899 	R#LLj!! 
	R"LM* &    Pd:>N>NPPQQQ!!-!%&?1", " 	
 	
 		
r3   )rE   rF   r{   rH   rI   rJ   ztf.dtypes.DTyper'   c                r   	 ddl m	 |                     |||||          }t          	fd|          }|S )a\	  Return a batched iterable of TensorFlow Tensors over the dataset.

        This iterable will yield single-tensor batches of the underlying dataset
        consists of a single column; otherwise, it will yield a dictionary of
        column-tensors.

        .. tip::
            If you don't need the additional flexibility provided by this method,
            consider using :meth:`~ray.data.Dataset.to_tf` instead. It's easier
            to use.

        Examples:
            >>> import ray
            >>> for batch in ray.data.range( # doctest: +SKIP
            ...     12,
            ... ).iter_tf_batches(batch_size=4):
            ...     print(batch.shape) # doctest: +SKIP
            (4, 1)
            (4, 1)
            (4, 1)

        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool will be used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            batch_size: The number of rows in each batch, or None to use entire blocks
                as batches (blocks may contain different number of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            dtypes: The TensorFlow dtype(s) for the created tensor(s); if None, the
                dtype will be inferred from the tensor data.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer will be drained. This
                buffer size must be greater than or equal to ``batch_size``, and
                therefore ``batch_size`` must also be specified when using local
                shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.

        Returns:
            An iterator over TensorFlow Tensor batches.
        r   )(convert_ndarray_batch_to_tf_tensor_batchrE   rF   rH   rI   rJ   c                      |           S )N)r{   r;   )rq   r   r{   s    r1   <lambda>z.DataIterator.iter_tf_batches.<locals>.<lambda>>  s    BBf   r3   )"ray.air._internal.tensorflow_utilsr   rL   map)
r0   rE   rF   r{   rH   rI   rJ   rs   mapped_iterabler   s
      `     @r1   iter_tf_batcheszDataIterator.iter_tf_batches  s    t	
 	
 	
 	
 	
 	
 ++-!&?1 , 
 
      	
 
 r3   T)label_columnfeature_columnslabel_column_dtypefeature_column_dtypesrF   rE   rH   rI   rJ   unsqueeze_label_tensorunsqueeze_feature_tensorsr   r   r   r   r   r   z torch.utils.data.IterableDatasetc                   	
 ddl }ddlm ddlm} sdrt          |j                  sit          t                    rt          t                    s t          dt                     d          t                    t                    k    rt          d          t          d                                 D                       rt          d	          nt          d         t          t          f          rt          t          t          f          s t          d
t                     d          t!                    t!                    k    rt          d          t          d D                       rt          d	          	 
fd} ||          S )ah  Return a Torch IterableDataset over this dataset.

        This is only supported for datasets convertible to Arrow records.

        It is recommended to use the returned ``IterableDataset`` directly
        instead of passing it into a torch ``DataLoader``.

        Each element in IterableDataset will be a tuple consisting of 2
        elements. The first item contains the feature tensor(s), and the
        second item is the label tensor. Those can take on different
        forms, depending on the specified arguments.

        For the features tensor (N is the ``batch_size`` and n, m, k
        are the number of features per tensor):

        * If ``feature_columns`` is a ``List[str]``, the features will be
          a tensor of shape (N, n), with columns corresponding to
          ``feature_columns``

        * If ``feature_columns`` is a ``List[List[str]]``, the features will be
          a list of tensors of shape [(N, m),...,(N, k)], with columns of each
          tensor corresponding to the elements of ``feature_columns``

        * If ``feature_columns`` is a ``Dict[str, List[str]]``, the features
          will be a dict of key-tensor pairs of shape
          {key1: (N, m),..., keyN: (N, k)}, with columns of each
          tensor corresponding to the value of ``feature_columns`` under the
          key.

        If ``unsqueeze_label_tensor=True`` (default), the label tensor will be
        of shape (N, 1). Otherwise, it will be of shape (N,).
        If ``label_column`` is specified as ``None``, then no column from the
        ``Dataset`` will be treated as the label, and the output label tensor
        will be ``None``.

        Note that you probably want to call ``.split()`` on this dataset if
        there are to be multiple Torch workers consuming the data.

        Time complexity: O(1)

        Args:
            label_column: The name of the column used as the
                label (second element of the output list). Can be None for
                prediction, in which case the second element of returned
                tuple will also be None.
            feature_columns: The names of the columns
                to use as the features. Can be a list of lists or
                a dict of string-list pairs for multi-tensor output.
                If None, then use all columns except the label column as
                the features.
            label_column_dtype: The torch dtype to
                use for the label column. If None, then automatically infer
                the dtype.
            feature_column_dtypes: The dtypes to use for the feature
                tensors. This should match the format of ``feature_columns``,
                or be a single dtype, in which case it will be applied to
                all tensors. If None, then automatically infer the dtype.
            batch_size: How many samples per batch to yield at a time.
                Defaults to 1.
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool will be used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            drop_last: Set to True to drop the last incomplete batch,
                if the dataset size is not divisible by the batch size. If
                False and the size of dataset is not divisible by the batch
                size, then the last batch will be smaller. Defaults to False.
            local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer will be drained. This
                buffer size must be greater than or equal to ``batch_size``, and
                therefore ``batch_size`` must also be specified when using local
                shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.
            unsqueeze_label_tensor: If set to True, the label tensor
                will be unsqueezed (reshaped to (N, 1)). Otherwise, it will
                be left as is, that is (N, ). In general, regression loss
                functions expect an unsqueezed tensor, while classification
                loss functions expect a squeezed one. Defaults to True.
            unsqueeze_feature_tensors: If set to True, the features tensors
                will be unsqueezed (reshaped to (N, 1)) before being concatenated into
                the final features tensor. Otherwise, they will be left as is, that is
                (N, ). Defaults to True.

        Returns:
            A torch IterableDataset.
        r   N)convert_pandas_to_torch_tensor)TorchIterableDatasetzbIf `feature_columns` is a dict, `feature_column_dtypes` must be None, `torch.dtype`, or dict, got .zF`feature_columns` and `feature_column_dtypes` must have the same keys.c              3      K   | ]}| V  d S r5   r;   .0
subcolumnss     r1   	<genexpr>z(DataIterator.to_torch.<locals>.<genexpr>  s$      QQ*:~QQQQQQr3   zcolumn list may not be emptyzqIf `feature_columns` is a list of lists, `feature_column_dtypes` must be None, `torch.dtype`, or a sequence, got zH`feature_columns` and `feature_column_dtypes` must have the same length.c              3      K   | ]}| V  d S r5   r;   r   s     r1   r   z(DataIterator.to_torch.<locals>.<genexpr>  s$      HH*:~HHHHHHr3   c               3     K                        d
          D ]ir& g	          }                                nd } t          t                    rfdD             }n           }|| fV  jd S )Nr   )rF   rG   rE   rH   rI   rJ   	unsqueezec                 v    i | ]5}| |         t          t                    r|         n           6S )r   )r   dict)r   keyrq   r   r   r   r   s     r1   
<dictcomp>zADataIterator.to_torch.<locals>.make_generator.<locals>.<dictcomp>  sq     ' ' '   ;;!+C0 $..CT#J#J!; 5c : :%:&?	 	 	' ' 'r3   )columnscolumn_dtypesr   )rL   popr   r   )label_tensorfeatures_tensorrq   rF   r   rH   r   r   r   r   rI   rJ   rE   r0   r   r   s     @r1   make_generatorz-DataIterator.to_torch.<locals>.make_generator  s"     ++%%!1#*C#5 ,   )6 )6   	(#A#A%*"8	$ $ $L IIl++++#'Lot44 ' ' ' ' ' ' ' ' $3' ' 'OO 'E&D /&;";	' ' 'O '55555S)6 )6r3   )torchr   r   )ray.data._internal.torch_iterable_datasetr   r   dtyper   	TypeErrorr   setr   anyvalueslisttuplelen)r0   r   r   r   r   rF   rE   rH   rI   rJ   r   r   r   r   r   r   s   ````````````   @r1   to_torchzDataIterator.to_torchF  sx   X 	PPPPPPRRRRRR  	#"O  	E4I5;)W)W 	E/400 E!"7>> #H)-.C)D)DH H H  
 ''3/D+E+EEE$3   QQ8N8N8P8PQQQQQ E$%CDDDEOA.u>> E!"7$GG #N/34I/J/JN N N  
 ''3/D+E+EEE$5   HHHHHHH E$%CDDD*	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6 *	6X $#N333r3   )	additional_columnsrE   rF   rH   rI   rJ   feature_type_speclabel_type_specadditional_type_speclabel_columnsr   r   ztf.TypeSpecr   r   ztf.data.Datasetc       	            	
 ddl mm} 	 ddl}n# t          $ r t          d          w xY wdt          ddffddt          t          t          f         ddffd	}d
t          t          t          j        f         dt          t          t          t                   f         dt          |j        t          t          |j        f         f         dt          |j        t          t          |j        f         f         ffd	
 fd}	
X                                 }t          |j                   |            |            ||          	 ||          
B@                                 }t          |j                   |            ||          %|j        j                            |	
f          }n#|j        j                            |	
f          }|j                                        }|j        j        j        j        |j        _        |                    |          S )a  Return a TF Dataset over this dataset.

        .. warning::
            If your dataset contains ragged tensors, this method errors. To prevent
            errors, :ref:`resize your tensors <transforming_tensors>`.

        Examples:
            >>> import ray
            >>> ds = ray.data.read_csv(
            ...     "s3://anonymous@air-example-data/iris.csv"
            ... )
            >>> it = ds.iterator(); it
            DataIterator(Dataset(num_rows=?, schema=...))

            If your model accepts a single tensor as input, specify a single feature column.

            >>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your model accepts a dictionary as input, specify a list of feature columns.

            >>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
            <_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your dataset contains multiple features but your model accepts a single
            tensor as input, combine features with
            :class:`~ray.data.preprocessors.Concatenator`.

            >>> from ray.data.preprocessors import Concatenator
            >>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"]
            >>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features")
            >>> it = preprocessor.transform(ds).iterator()
            >>> it
            DataIterator(Concatenator
            +- Dataset(num_rows=?, schema=...))
            >>> it.to_tf("features", "target")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your model accepts different types, shapes, or names of tensors as input, specify the type spec.
            If type specs are not specified, they are automatically inferred from the schema of the iterator.

            >>> import tensorflow as tf
            >>> it.to_tf(
            ...     feature_columns="features",
            ...     label_columns="target",
            ...     feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
            ...     label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
            ... )
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>

            If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
            A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.

            >>> import pandas as pd
            >>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df)))
            >>> it = ds.iterator()
            >>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>

            If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.

            >>> it.to_tf(
            ...     feature_columns="sepal length (cm)",
            ...     label_columns="target",
            ...     additional_columns="sample weights",
            ...     additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
            ... )
            <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>

        Args:
            feature_columns: Columns that correspond to model inputs. If this is a
                string, the input data is a tensor. If this is a list, the input data
                is a ``dict`` that maps column names to their tensor representation.
            label_columns: Columns that correspond to model targets. If this is a
                string, the target data is a tensor. If this is a list, the target data
                is a ``dict`` that maps column names to their tensor representation.
            additional_columns: Columns that correspond to sample weights or other metadata.
                If this is a string, the weight data is a tensor. If this is a list, the
                weight data is a ``dict`` that maps column names to their tensor representation.
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool will be used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            batch_size: Record batch size. Defaults to 1.
            drop_last: Set to True to drop the last incomplete batch,
                if the dataset size is not divisible by the batch size. If
                False and the size of dataset is not divisible by the batch
                size, then the last batch will be smaller. Defaults to False.
            local_shuffle_buffer_size: If non-None, the data will be randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer will be drained. This
                buffer size must be greater than or equal to ``batch_size``, and
                therefore ``batch_size`` must also be specified when using local
                shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.
            feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is
                only one column, specify a `tf.TypeSpec`. If there are multiple columns,
                specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.
            label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is
                only one column, specify a `tf.TypeSpec`. If there are multiple columns,
                specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.
            additional_type_spec: The `tf.TypeSpec` of `additional_columns`. If there
                is only one column, specify a `tf.TypeSpec`. If there are multiple
                columns, specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.

        Returns:
            A ``tf.data.Dataset`` that yields inputs and targets.
        r   )convert_ndarray_to_tf_tensorget_type_specNztensorflow must be installed!columnr>   c           	      B    | vrt          d|  d|  d d          d S )NzYou specified 'z_' in `feature_columns`, `label_columns`, or `additional_columns`, but there's no column named 'z*' in the dataset. Valid column names are: r   )r   )r   valid_columnss    r1   validate_columnz+DataIterator.to_tf.<locals>.validate_column  sb    ]** @f @ @%+@ @ 0=@ @ @   +*r3   r   c                 l    t          | t                    r| D ]} |           d S  |            d S r5   )r   r   )r   r   r   s     r1   validate_columnsz,DataIterator.to_tf.<locals>.validate_columns  sY    '4(( )% , ,F#OF++++, ,  (((((r3   rq   	type_specc                x     t          |t                    r  |                   S  fd|D             S )Nr   c                 D    i | ]}| |         |                    S )r   r;   )r   r   rq   r   r   s     r1   r   zHDataIterator.to_tf.<locals>.convert_batch_to_tensors.<locals>.<dictcomp>  sN         44&MYv->    r3   )r   str)rq   r   r   r   s   ` `r1   convert_batch_to_tensorsz4DataIterator.to_tf.<locals>.convert_batch_to_tensors  sk     '3'' Y33E'NiXXXX      &	   r3   c               3      K                                  D ]S} t          | t                    sJ  | 	
          } |           }||fV  > |           }|||fV  Td S )Nr   )r   r   )rL   r   r   )rq   featureslabelsadditional_metadatar   r   rF   r   rH   r   r   r   r   rI   rJ   rE   r0   s       r1   	generatorz%DataIterator.to_tf.<locals>.generator  s      ++!1%#*C#5 ,   @ @ "%.....33?>O   21=O   &-"F******B*B 2"6+ + +'
 #F,??????/@ @r3   )r   )output_signature)r   r   r   
tensorflowImportErrorr   r   r   r	   r   npndarrayTypeSpecTensorrw   r   namesdataDatasetfrom_generatorOptionsexperimentalAutoShardPolicyOFFexperimental_distributeauto_shard_policywith_options)r0   r   r   r   rE   rF   rH   rI   rJ   r   r   r   r   tfr   r   rw   datasetoptionsr   r   r   r   s   ````````````       @@@@r1   to_tfzDataIterator.to_tf  s3   H	
 	
 	
 	
 	
 	
 	
 	

	>##### 	> 	> 	><===	>	C 	D 	 	 	 	 	 		)eCI&6 	)4 	) 	) 	) 	) 	) 	)	RZ(	 3S	>*	 R[$sBK/?*@@A		
 29d3	>223	 	 	 	 	 		@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@4 $(?[[]]F--M_---]+++ -fo N N N+mFMJJJO).B.J[[]]F--M/000#0=AS#T#T#T )go44%#(" 5  GG go44->,P 5  G '//##G 04 	'9 ##G,,,s    9r%   c                     ddl m} |                                 \  }}}t          |          }t	          ||                                           }t          t          |          |j                  } |||          S )a7  Execute and materialize this data iterator into object store memory.

        .. note::
            This method triggers the execution and materializes all blocks
            of the iterator, returning its contents as a
            :class:`~ray.data.dataset.MaterializedDataset` for further processing.
        r   )r%   )
input_data)	ray.data.datasetr%   r@   r   r   ry   r   r   _context)r0   r%   rN   rW   _ref_bundlesexecution_planlogical_plans           r1   materializezDataIterator.materialize  s     	988888%)%A%A%C%C"%+,,&ud.>.>.@.@AA"---#
 
 #"
 
 	
r3   )r>   r&   )r>   r%   )*r8   r9   r:   __doc__abcabstractmethodr   r   r   r   r   boolr@   r"   intr   r   r   rM   r   rQ   r   r   rL   r`   r   rp   rW   rw   r!   ry   r   r
   r   r   r   r   r   r	   r   r   r   r;   r3   r1   r=   r=   H   s3        $ 		x	"H\$:D@	A     !"&/37,05
 5
 5
 5
 	5

 sm5
 5
 $,C=5
 %SM5
 
)	5
 5
 5
 Y5
n9 ( 39	9 9 9 9 !"&/37,0GK7;47 47 47 47 	47
 sm47 47 $,C=47 %SM47 h	{N'BCD47 xs
3447 
)	47 47 47 47l!# ! ! ! ! 88DcN3 8 8 8 Y8< 	s    Y  	    	[      !"$'KO<B 37,0 f
 f
 f
 f
 SM	f

 }d33E.FFGHf
 '89f
 (Dbj12NBCYNO
f
 f
 $,C=f
 %SMf
 f
 
"	#f
 f
 f
 Yf
V !"$'SW37,0L L L L SM	L
 0$s<M7M2NNOPL L $,C=L %SML 
-	.L L L Lb '+ 6:  !37,0'+*.#4 4 4 sm4 "$s)T$s)_d3S	>.BBC
	4 %]34  (-m!4d3;M6NNO 
4 4 4 4 $,C=4 %SM4  !%!4" $(#4$ 
,%4 4 4 4B  IM !37,0LPJN f- f- f-sDI~.f- S$s)^,f-
 "(3-$s)1D"DEf- f- f- f- $,C=f- %SMf- !S-5G0H!HIf- }d33E.FFGf- $]#Xd33E.F%GG
f-  
!f- f- f- Yf-P 
 
 
 Y
 
 
r3   r=   )?r   r^   r   typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   .ray.data._internal.block_batching.iter_batchesr   'ray.data._internal.execution.interfacesr   %ray.data._internal.logical.interfacesr   8ray.data._internal.logical.operators.input_data_operatorr   ray.data._internal.planr   ray.data._internal.statsr   ray.data.blockr   r   r   ray.data.collate_fnr   r   r   r   r   r   r   r    ray.data.contextr!   ray.util.annotationsr"   r#   r   r   r   r   r$   r%   r&   r'   r(   r)   r*   r,   ABCr=   DatasetIteratorr;   r3   r1   <module>r     s   



                                  H H H H H H = = = = = = = = = = = = N N N N N N 1 1 1 1 1 1 1 1 1 1 1 1 H H H H H H H H H H	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 ) ( ( ( ( ( A A A A A A A A LLL                GCLL# # # # #HQK # # # }
 }
 }
 }
 }
37 }
 }
 }
B r3   