
    &`it                    ,   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZmZmZmZmZmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z* d d	l+m,Z, d d
l-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZF d dlGmHZH d dlImJZJ d dlKmLZL d dlMmNZN d dlOmPZPmQZQmRZRmSZS d dlTmUZU d dlVmWZW d dlXmYZY d dlZm[Z[m\Z\m]Z]m^Z^m_Z_m`Z` d d lamZbmcZc d d!ldmeZe d d"lfmgZg d d#lhmiZi d d$ljmkZkmlZl d d%lmmnZn d d&lompZp d d'lqmrZr d d(lsmtZtmuZu d d)lvmwZwmxZxmyZy d d*lzm{Z{m|Z|m}Z}m~Z~mZ d d+lmZmZmZmZmZmZmZmZ d d,lmZmZmZmZmZmZmZmZmZ d d-lmZ d d.lmZmZmZmZ d d/lmZmZ d d0lmZ d d1lmZ d d2lmZ d d3lmZ d d4lmZ d d5lmZmZmZ d d6lmZ d d7lmZ d d8lmZ erBd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd d9lmZ d d:lCmZmZ d d;lmZ d d<lmZ d d=lmZmZmZ  ej        eɦ          Zd>Zed?ed?         eed?f         f         Zed@eed@f         f         Z edA          ZeeedBf         ef         ZeedCef         Z	 dDZdEZdFZdGZdHZdIZdJZdKZdLZe G dM dN                      Ze G dO dPeee                               Z edQR           G dS dT                      ZdUedVdWfdXZdUedYee         fdZZdUefd[ZdS )\    N)TYPE_CHECKINGAnyCallableDictGenericIterableIteratorListLiteralMappingOptionalTupleTypeVarUnion)	usage_lib)tabulate)ArrowTensorTypeV2,get_arrow_extension_fixed_shape_tensor_types)ComputeStrategy)BigQueryDatasink)ClickHouseDatasinkClickHouseTableSettingsSinkMode)CSVDatasink)IcebergDatasink)ImageDatasink)JSONDatasink)LanceDatasink)MongoDatasink)NumpyDatasink)ParquetDatasink)SQLDatasink)TFRecordDatasink)WebDatasetDatasink)	_equalize)	RefBundle)(_ref_bundles_iterator_to_block_refs_list)memory_stringDataIteratorImpl)StreamSplitDataIterator)LogicalPlan)RandomizeBlocksRandomShuffleRepartitionSort)Count)	InputData)Join)FilterFlatMap
MapBatchesMapRowsProjectStreamingRepartition)r   Zip)Limit)StreamingSplit)Write)PandasBlockBuilderPandasBlockSchema)ExecutionPlan)SortKey)cached_remote_fn)_get_num_rows_split_at_indices)DatasetStatsDatasetStatsSummary_StatsManager)AllToAllAPIConsumptionAPI_validate_rows_per_file_argsget_compute_strategy"merge_resources_to_ray_remote_args)AggregateFnAggregateFnV2MaxMeanMinStdSumUnique)	VALID_BATCH_FORMATSBlockBlockAccessor	DataBatchDataBatchColumnTUUserDefinedFunction_apply_batch_format)DataContext)
ConnectionDatasinkFilenameProviderSaveMode)WriteResult_gen_datasink_write_result)_FileDatasink)DataType)DataIterator)RandomAccessDataset)	ObjectRef)
DeprecatedDeveloperAPI	PublicAPI)NodeAffinitySchedulingStrategy)Template)repr_with_fallback)
schema_pb2)Executor	NodeIdStrGroupedData)DatasetSummary)ExprStarExprcol!__ray_train_test_split_is_train__tf.TypeSpecz	tf.TensorCollatedDataztorch.Tensorztorch.devicezBasic Transformationsz%Sorting, Shuffling and Repartitioningz$Splitting, Merging, Joining DatasetszGrouped and Global AggregationszConsuming DatazI/O and ConversionzInspecting Metadata	ExecutionExpressionsc            %       x4   e Zd ZdZdedefdZe	 dVdd ded	e	e
         d
d fd            Z ee          ddddddddddd
deeeef         geeef         f         de	e         de	ee                  de	eeef                  de	ee                  de	eeef                  de	e         de	e         de	e         de	eeeeef         eeeef         f                  de	eg eeef         f                  d
d fd            Z edd          de	e         fd            Zde	e         fdZe ed d          d
e	e         fd!                        Zed
e	e         fd"            Zd
efd#Z ee          ddd$dddddddddddd%de e!e!f         d&eede"d$         f         de	e         d'e	e         d(ede	ee                  de	eeef                  de	ee                  de	eeef                  de	e         de	e         de	e         de	eeeeef         eeeef         f                  d)ede	eg eeef         f                  d
d f d*            Z#de e!e!f         d&eede"d$         f         de	e         d'e	e         d(ede	ee                  de	eeef                  de	ee                  de	eeef                  de	e         de	e         de	e         de	eeeeef         eeeef         f                  d)ede	eg eeef         f                  fd+Z$ ee%d,-          d.ed/e&d
d fd0            Z' ed12           ee          d3ddd4d5edee!ge(f         d'e	e         de	e         de	e         d
d fd6                        Z) ee          ddd7d8e*e         de	e         de	e         d
d fd9            Z+ ee          ddd7d8eee*e         f         deeef         de	e         d
d fd:            Z, ee          dd;d<ee*e         eeef         f         de	eeeeef         eeeef         f                  fd=            Z- ee          ddddddddddd
de eeef         ee*eeef                  eeef         f         f         de	e         de	ee                  de	eeef                  de	ee                  de	eeef                  de	e         de	e         de	e         de	eeeeef         eeeef         f                  de	eg eeef         f                  d
d fd>            Z. ee          	 	 dWddddddddddd
de	e eeef         ef                  d/e	eee&f                  deeef         de	ee                  de	eeef                  de	ee                  de	eeef                  de	e         de	e         de	e         de	eeeeef         eeeef         f                  de	eg eeef         f                  d
d fd?            Z/ ee0          	 	 dWdddd@dAe	e         dBe	e         dCedDe	e*e                  dEed
d fdF            Z1e2 ee0          dddGdHe	e         dAe	e         d
d fdI                        Z3e2 ee0          ddJdHe	e         d
d fdK                        Z4 ee          ddJdLedHe	e         d
d fdM            Z5e6 ee7          dddNdOedPedQe	e*dR                  d
e*e8         fdS                        Z9e6 ee7          dddNdOedPedQe	e*e                  d
e*dT         fdU                        Z:e6 ee7          dVe*e         d
e*dT         fdW                        Z;e6 ee7          dXe*e         d
e*dT         fdY                        Z<e6 ee7          ddddZd[eeef         dCedHe	e         d\e	e         d
ed]         f
d^                        Z=dd d[eeef         d\ed
ed]         fd_Z>d[ed
dfd`Z?d[edd d
efdaZ@ ed,e7b          dcddddd[edee"df         dge	e         dHe	e         d
edh         f
di            ZA ee7          dje*d          d
d fdk            ZBe2 ee7          	 	 	 	 dXddddmdd dnedoedpee         dqe	ee                  dre	e         dse	e         dte	e         due	eeef                  dved
d fdw                        ZCe2 eeD          	 dYdxeee*e         df         doe	e         d
dyfdz                        ZEe2e6 eeD          dZd{ed|ed
e*e         fd}                                    ZFe2e6 eeD          d~eGd
eeeeef         f         fd                                    ZHe2e6 eeD          	 d[dpe	eee*e         f                  d|ed
eeeeef         f         fd                                    ZIe2e6 eeD          	 d[dpe	eee*e         f                  d|ed
eeeeef         f         fd                                    ZJe2e6 eeD          	 d[dpe	eee*e         f                  d|ed
eeeeef         f         fd                                    ZKe2e6 eeD          	 d[dpe	eee*e         f                  d|ed
eeeeef         f         fd                                    ZLe2e6 eeD          	 	 	 d\dpe	eee*e         f                  ded|ed
eeeeef         f         fd                                    ZMe2e6 eeDd,-          	 	 dWde	e*e                  de	eeNeege*eO         f         f                  d
dfd                                    ZPe2 ee0          	 	 dVdxeee*e         f         deee*e         f         de*eeef                  d
d fd                        ZQ ee7          dje*d          d
d fd            ZR ee          ded
d fd            ZSe6 eeT          	 d]d$dd&ed'e	e         d
e!fd                        ZUe6 eeT          d]ded
e*eeef                  fd                        ZVe6 eeT          dYde	e         d
e*eeef                  fd                        ZWe6 eeT          d]ded
dfd                        ZX e6ddd           eeY          d
efd                        ZZ e6dddd           eeY          d^ded
e	d         fd                        Z[ e6dddd           eeY          d^ded
e	e*e                  fd                        Z\ eeY          d
efd            Z]e6 eeY          d
efd                        Z^e6 eeY          d
e*e         fd                        Z_e6 ee`          dddddddddddeajb        ddede	e*e                  de	d         dede	eeef                  de	ec         de	eg eeef         f                  de	e         de	e         deeef         de	e         de	e         dead
dfd                        Zde6 ee`          dddddddddeajb        d
dede	d         dede	eeef                  de	ec         de	eg eeef         f                  de	e         deeef         de	e         de	e         dead
dfd                        Zee6 ed,e`b          ddeajb        dddddfdede	eeef                  de	eeef                  ddde	d         de	eeef                  de	eeef                  deeef         de	e         d
dfd                        Zf ed,e`b          e6	 d_ddddddeajb        dded{edede	d         dede	eeef                  de	ec         deeef         de	e         dead
dfdĄ                        Zge6 ee`          dddddddddeajb        dŜ
dede	d         dede	eeef                  de	ec         de	eg eeef         f                  de	e         deeef         de	e         de	e         dead
dfdǄ                        Zhe6 ee`          dddddddddeajb        dȜ
dede	d         de	d         dede	eeef                  de	ec         de	e         deeef         de	e         de	e         dead
dfd˄                        Zie6 ed,e`b          dddddddddeajb        d̜
dede	d         dede	eeef                  de	ec         de	e         deeef         de	eeeejekf                  de	e         de	e         dead
dfd΄                        Zle6 ee`          ddddddddeajb        dϜ	ded{ede	d         dede	eeef                  de	ec         de	e         deeef         de	e         de	e         dead
dfdЄ                        Zme6	 	 dWdedeg enf         de	eeef                  de	e         d
df
dӄ            Zoe6dddԜdededeeef         de	e         fdׄ            Zp ed,e`b          e6	 	 dWdedededeeef         de	e         d
dfdۄ                        Zqe6	 	 	 	 d`dededede	e         deeef         de	e         d
dfd            Zre6esjt        dddddddddededesde	d         de	eeef                  de	eeef                  de	eu         de	e         deeef         de	e         d
dfd            Zve6ddddddddddede	d         de"d         dedede	e         de	eeef                  deeef         de	e         d
dfd            Zw e6d          dddԜdexdeeef         de	e         d
dfd            Zy e6dd           eeT          d
e8fd                        Zze6 eeT          d
eeeef                  fd                        Z{e6 eeT          ddd$dddddded&e	e         d'e	e         dede	e         de	e         d e	ee!ge|f                  d
ee!         fd                        Z}e6 eeT          dddddddddded&e	e         de	edeedf         f                  dee~e"d         f         de	eeeej        f         ge|f                  dede	e         de	e         d
ee         fd                        Ze6eddddddd	ded&e	e         de	ed
eed
f         f                  dede	e         de	e         d
ee         fd                        Ze6 ee`          dddddddddd	deee*e         f         deee*e         f         deee*e         f         ded&edede	e         de	e         dedeedf         f         dedeedf         f         dedeedf         f         d
dfd                        Z e6d           ee`          dad                        Z e6d           ee`          	 	 d[deddeeef         ee         ee         df         ded
dfd                        Z e6d           ee`          dbd                        Z e6d           ee`          dcd!                        Z e6d           ee`          ddd%                        Z e6d           ee`          dYded
dfd&                        Z e6d          ed
e*ed                  fd'                        Zedd(d{e	e         d
e*eej                          fd)            Z e6d          ed
e*ed*                  fd+                        Z e6d,          	 dYdxed-e	e         d
efd.            Z e6d/d0           ee          ded1                        Z eeY          d
efd2            Z eeYd,-          d3             Zd
efd4Z e6d          ed
ee         fd5                        Ze e6d          d
e*ee                  fd6                        Zed
efd7            Zed
efd8            Zeed9ed
d fd:                        Zeed
efd;                        Zd<e
dpe	eee*e         f                  fd=Zdd>d<e
dpe	eee*e         f                  d?e	e*e                  fd@ZdAeeef         d
efdBZ edCdDg          dE             ZdF Zd
efdGZd
efdHZd
efdIZd
efdJZdK Zd
e*e         fdLZd
e	e         fdMZd
efdNZdOed
dfdPZdQ Zd
eee         ef         fdRZdS ZdT ZdU ZdS (f  Dataseta2  A Dataset is a distributed data collection for data loading and processing.

    Datasets are distributed pipelines that produce ``ObjectRef[Block]`` outputs,
    where each block holds data in Arrow format, representing a shard of the overall
    data collection. The block also determines the unit of parallelism. For more
    details, see :ref:`Ray Data Key Concepts <data_key_concepts>`.

    Datasets can be created in multiple ways:

    * from external storage systems such as local disk, S3, HDFS etc. via the ``read_*()`` APIs.
    * from existing memory data via ``from_*()`` APIs
    * from synthetic data via ``range_*()`` APIs

    The (potentially processed) Dataset can be saved back to external storage systems
    via the ``write_*()`` APIs.

    Examples:
        .. testcode::
            :skipif: True

            import ray
            # Create dataset from synthetic data.
            ds = ray.data.range(1000)
            # Create dataset from in-memory data.
            ds = ray.data.from_items(
                [{"col1": i, "col2": i * 2} for i in range(1000)]
            )
            # Create dataset from external storage system.
            ds = ray.data.read_parquet("s3://bucket/path")
            # Save dataset back to external storage system.
            ds.write_csv("s3://bucket/output")

    Dataset has two kinds of operations: transformation, which takes in Dataset
    and outputs a new Dataset (e.g. :py:meth:`.map_batches()`); and consumption,
    which produces values (not a data stream) as output
    (e.g. :meth:`.iter_batches()`).

    Dataset transformations are lazy, with execution of the transformations being
    triggered by downstream consumption.

    Dataset supports parallel processing at scale:

    * transformations such as :py:meth:`.map_batches()`
    * aggregations such as :py:meth:`.min()`/:py:meth:`.max()`/:py:meth:`.mean()`,
    * grouping via :py:meth:`.groupby()`,
    * shuffling operations such as :py:meth:`.sort()`, :py:meth:`.random_shuffle()`, and :py:meth:`.repartition()`
    * joining via :py:meth:`.join()`

    Examples:
        >>> import ray
        >>> ds = ray.data.range(1000)
        >>> # Transform batches (Dict[str, np.ndarray]) with map_batches().
        >>> ds.map_batches(lambda batch: {"id": batch["id"] * 2})  # doctest: +ELLIPSIS
        MapBatches(<lambda>)
        +- Dataset(num_rows=1000, schema={id: int64})
        >>> # Compute the maximum.
        >>> ds.max("id")
        999
        >>> # Shuffle this dataset randomly.
        >>> ds.random_shuffle()  # doctest: +ELLIPSIS
        RandomShuffle
        +- Dataset(num_rows=1000, schema={id: int64})
        >>> # Sort it back in order.
        >>> ds.sort("id")  # doctest: +ELLIPSIS
        Sort
        +- Dataset(num_rows=1000, schema={id: int64})

    Both unexecuted and materialized Datasets can be passed between Ray tasks and
    actors without incurring a copy. Dataset supports conversion to/from several
    more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also
    compatible with distributed TensorFlow / PyTorch.
    planlogical_planc                 >   t          |t                    sJ t          |                      t          j        d           || _        || _        | j                            |           d| _        d| _	        | 
                    t          j                               dS )zConstruct a Dataset (internal API).

        The constructor is not part of the Dataset API. Use the ``ray.data.*``
        read methods to construct a dataset.
        datasetN)
isinstancer@   typer   record_library_usage_plan_logical_planlink_logical_plan_current_executor	_write_ds	_set_uuidrG   gen_dataset_id_from_stats_actor)selfr   r   s      d/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/dataset.py__init__zDataset.__init__  s     $..::T

::.&y111
)
$$\222 8<}DFFGGGGG    FNds
_deep_copy_asreturnc                     |st          |           }|r( || j                                        | j                  S  || j                                        | j                  S N)r   r   	deep_copyr   copy)r   r   r   s      r   r   zDataset.copy  sa      	r((C 	:3rx))++R-=>>>3rx}}(8999r   )	api_group)
computefn_args	fn_kwargsfn_constructor_argsfn_constructor_kwargsnum_cpusnum_gpusmemoryconcurrencyray_remote_args_fnfnr   r   r   r   r   r   r   r   r   r   c       
            t          ||||
          }t          |||	|          }| j                                        }t	          | j        j        ||||||||	  	        }t          || j                  }t          ||          S )av  Apply the given function to each row of this dataset.

        Use this method to transform your data. To learn more, see
        :ref:`Transforming rows <transforming_rows>`.

        You can use either a function or a callable class to perform the transformation.
        For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses
        stateful Ray actors. For more information, see
        :ref:`Stateful Transforms <stateful_transforms>`.

        .. tip::

            If your transformation is vectorized like most NumPy or pandas operations,
            :meth:`~Dataset.map_batches` might be faster.

        .. warning::
            Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
            and may result in scheduling or stability issues. Please
            `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
            to the Ray team.

        Examples:

            .. testcode::

                import os
                from typing import Any, Dict
                import ray

                def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
                    row["filename"] = os.path.basename(row["path"])
                    return row

                ds = (
                    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
                    .map(parse_filename)
                )
                print(ds.schema())

            .. testoutput::

                Column    Type
                ------    ----
                image     ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)
                path      string
                filename  string

        Time complexity: O(dataset size / parallelism)

        Args:
            fn: The function to apply to each row, or a class type
                that can be instantiated to create such a callable.
            compute: The compute strategy to use for the map operation.

                * If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

                * Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

                * If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.

                * Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

            fn_args: Positional arguments to pass to ``fn`` after the first argument.
                These arguments are top-level arguments to the underlying Ray task.
            fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
                top-level arguments to the underlying Ray task.
            fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
                You can only provide this if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
                This can only be provided if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            num_cpus: The number of CPUs to reserve for each parallel map worker.
            num_gpus: The number of GPUs to reserve for each parallel map worker. For
                example, specify `num_gpus=1` to request 1 GPU for each parallel map
                worker.
            memory: The heap memory in bytes to reserve for each parallel map worker.
            concurrency: This argument is deprecated. Use ``compute`` argument.
            ray_remote_args_fn: A function that returns a dictionary of remote args
                passed to each map worker. The purpose of this argument is to generate
                dynamic arguments for each actor/task, and will be called each time prior
                to initializing the worker. Args returned from this dict will always
                override the args in ``ray_remote_args``. Note: this is an advanced,
                experimental feature.
            ray_remote_args: Additional resource requirements to request from
                Ray for each map worker. See :func:`ray.remote` for details.

        .. seealso::

            :meth:`~Dataset.flat_map`
                Call this method to create new rows from existing ones. Unlike
                :meth:`~Dataset.map`, a function passed to
                :meth:`~Dataset.flat_map` can return multiple rows.

            :meth:`~Dataset.map_batches`
                Call this method to transform batches of data.
        r   r   r   )r   r   r   r   r   r   ray_remote_args)
rK   rL   r   r   r7   r   dagr,   contextr   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   map_opr   s                   r   mapzDataset.map$  s    l ' 3#	
 
 
 =	
 
 z  " 3"71+

 

 

 #64<88t\***r   zUse set_name() insteadT)messagewarningnamec                 0    |                      |           d S r   )set_namer   r   s     r   	_set_namezDataset._set_name  s    dr   c                     || j         _        dS )zQSet the name of the dataset.

        Used as a prefix for metrics tags.
        Nr   _dataset_namer   s     r   r   zDataset.set_name  s    
 $(
   r   zUse name() insteadc                     | j         S r   )r   r   s    r   _namezDataset._name  s     yr   c                     | j         j        S )z%Returns the user-defined dataset namer   r   s    r   r   zDataset.name  s     z''r   c                 4    | j                                         S )ziUnique ID of the dataset, including the dataset name,
        UUID, and current execution index.
        )r   get_dataset_idr   s    r   r   zDataset.get_dataset_id  s     z((***r   default
batch_sizer   batch_formatzero_copy_batchr   r   r   r   r   r   r   r   udf_modifying_row_countr   r   r   r   r   c                    |duo|dk    }|r||dk    rt          d          t          |t                    r|dk     rt          d           | j        |f||||||||	|
|||||d|S )a=*  Apply the given function to batches of data.

        This method is useful for preprocessing data and performing inference. To learn
        more, see :ref:`Transforming batches <transforming_batches>`.

        You can use either a function or a callable class to perform the transformation.
        For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses
        stateful Ray actors. For more information, see
        :ref:`Stateful Transforms <stateful_transforms>`.

        .. tip::
            To understand the format of the input to ``fn``, call :meth:`~Dataset.take_batch`
            on the dataset to get a batch in the same format as will be passed to ``fn``.

        .. note::
            ``fn`` should generally avoid modifying data buffers behind its input
            since these could be zero-copy views into the underlying object residing
            inside Ray's Object Store.

            To perform any modifications it's recommended to copy the data you
            want to modify.

            In rare cases when you can't copy inside your UDF, you can instead
            specify ``zero_copy_batch=False`` and then Ray Data will copy the
            *whole* batch for you, providing ``fn`` with a copy rather than
            a zero-copy view.

        .. warning::
            Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
            and may result in scheduling or stability issues. Please
            `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
            to the Ray team.

        Examples:

            Call :meth:`~Dataset.map_batches` to transform your data.

            .. testcode::

                from typing import Dict
                import numpy as np
                import ray

                def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
                    batch["age_in_dog_years"] = 7 * batch["age"]
                    return batch

                ds = (
                    ray.data.from_items([
                        {"name": "Luna", "age": 4},
                        {"name": "Rory", "age": 14},
                        {"name": "Scout", "age": 9},
                    ])
                    .map_batches(add_dog_years)
                )
                ds.show()

            .. testoutput::

                {'name': 'Luna', 'age': 4, 'age_in_dog_years': 28}
                {'name': 'Rory', 'age': 14, 'age_in_dog_years': 98}
                {'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}

            If your function returns large objects, yield outputs in chunks.

            .. testcode::

                from typing import Dict
                import ray
                import numpy as np

                def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
                    for i in range(3):
                        yield {"large_output": np.ones((100, 1000))}

                ds = (
                    ray.data.from_items([1])
                    .map_batches(map_fn_with_large_output)
                )

            If you require stateful transformation,
            use Python callable class. Here is an example showing how to use stateful transforms to create model inference workers, without having to reload the model on each call.

            .. testcode::

                from typing import Dict
                import numpy as np
                import torch
                import ray

                class TorchPredictor:

                    def __init__(self):
                        self.model = torch.nn.Identity().cuda()
                        self.model.eval()

                    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
                        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
                        with torch.inference_mode():
                            batch["output"] = self.model(inputs).detach().cpu().numpy()
                        return batch

                ds = (
                    ray.data.from_numpy(np.ones((32, 100)))
                    .map_batches(
                        TorchPredictor,
                        # Two workers with one GPU each
                        compute=ray.data.ActorPoolStrategy(size=2),
                        # Batch size is required if you're using GPUs.
                        batch_size=4,
                        num_gpus=1
                    )
                )

            To learn more, see
            :ref:`End-to-end: Offline Batch Inference <batch_inference_home>`.

        Args:
            fn: The function or generator to apply to a record batch, or a class type
                that can be instantiated to create such a callable. Note ``fn`` must be
                pickle-able.
            batch_size: The desired number of rows in each batch, or ``None`` to use
                entire blocks as batches (blocks may contain different numbers of rows).
                The actual size of the batch provided to ``fn`` may be smaller than
                ``batch_size`` if ``batch_size`` doesn't evenly divide the block(s) sent
                to a given map task. Default ``batch_size`` is ``None``.
            compute: The compute strategy to use for the map operation.

                * If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

                * Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

                * If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.

                * Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

            batch_format: If ``"default"`` or ``"numpy"``, batches are
                ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
                ``pandas.DataFrame``. If ``"pyarrow"``, batches are
                ``pyarrow.Table``. If ``batch_format`` is set to ``None`` input
                block format will be used.
            zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only
                batches. If this is ``True`` and no copy is required for the
                ``batch_format`` conversion, the batch is a zero-copy, read-only
                view on data in Ray's object store, which can decrease memory
                utilization and improve performance. Setting this to ``False``,
                will make a copy of the *whole* batch, therefore allowing UDF to
                modify underlying data buffers (like tensors, binary arrays, etc)
                in place. It's recommended to copy only the data you need to
                modify instead of resorting to copying the whole batch.
            fn_args: Positional arguments to pass to ``fn`` after the first argument.
                These arguments are top-level arguments to the underlying Ray task.
            fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
                top-level arguments to the underlying Ray task.
            fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
                You can only provide this if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
                This can only be provided if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            num_cpus: The number of CPUs to reserve for each parallel map worker.
            num_gpus: The number of GPUs to reserve for each parallel map worker. For
                example, specify `num_gpus=1` to request 1 GPU for each parallel map
                worker.
            memory: The heap memory in bytes to reserve for each parallel map worker.
            concurrency: This argument is deprecated. Use ``compute`` argument.
            udf_modifying_row_count: Set to True if the UDF may modify the number of rows it receives so the limit pushdown optimization will not be applied.
            ray_remote_args_fn: A function that returns a dictionary of remote args
                passed to each map worker. The purpose of this argument is to generate
                dynamic arguments for each actor/task, and will be called each time prior
                to initializing the worker. Args returned from this dict will always
                override the args in ``ray_remote_args``. Note: this is an advanced,
                experimental feature.
            ray_remote_args: Additional resource requirements to request from
                Ray for each map worker. See :func:`ray.remote` for details.

        .. note::

            The size of the batches provided to ``fn`` might be smaller than the
            specified ``batch_size`` if ``batch_size`` doesn't evenly divide the
            block(s) sent to a given map task.

            If ``batch_size`` is set and each input block is smaller than the
            ``batch_size``, Ray Data will bundle up many blocks as the input for one
            task, until their total size is equal to or greater than the given
            ``batch_size``.
            If ``batch_size`` is not set, the bundling will not be performed. Each task
            will receive entire input block as a batch.

        .. seealso::

            :meth:`~Dataset.iter_batches`
                Call this function to iterate over batches of data.

            :meth:`~Dataset.take_batch`
                Call this function to get a batch of data from the dataset
                in the same format as will be passed to the `fn` function of
                :meth:`~Dataset.map_batches`.

            :meth:`~Dataset.flat_map`
                Call this method to create new records from existing ones. Unlike
                :meth:`~Dataset.map`, a function passed to :meth:`~Dataset.flat_map`
                can return multiple records.

            :meth:`~Dataset.map`
                Call this method to transform one record at time.

        Nr   r   a&  You must provide `batch_size` to `map_batches` when requesting GPUs. The optimal batch size depends on the model, data, and GPU used. We recommend using the largest batch size that doesn't result in your GPU device running out of memory. You can view the GPU memory usage via the Ray dashboard.   z!Batch size can't be negative or 0r   )
ValueErrorr   int*_map_batches_without_batch_size_validation)r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   use_gpuss                     r   map_batcheszDataset.map_batches  s    R 4'8HqL 	+zY/F/F/   j#&& 	B:>>@AAA>t>
!%+ 3"7#$;1
 
  !
 
 	
r   c                   |dk    rt          j        dt                     d }t          ||||          }|
|
|d<   |||d<   |||d<   t	          |          }|t
          vrt          dt
           d|           | j                                        }t          | j
        j        |||||||||	||||	          }t          || j                  }t          ||          S )
Nr   z|Passing 'default' to `map_batches` is deprecated and won't be supported after September 2025. Use `batch_size=None` instead.r   r   r   r   z The batch format must be one of , got: )r   r   r   min_rows_per_bundled_inputr   r   r   r   r   r   r   r   )warningswarnDeprecationWarningrK   r]   rU   r   r   r   r6   r   r   r,   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   map_batches_opr   s                       r   r   z2Dataset._map_batches_without_batch_size_validation  sI   4 ""MQ"  
 J& 3#	
 
 
 *2OJ'*2OJ'(.OH%*<88222"3F " "" "  
 z  #"!%+'1 3"7$;1+
 
 
  #>4<@@t\***r   alpha)r   	stabilitycolumn_nameexprc                    ddl m} ddlm} ddlm} | j                                        }t          ||          r6 || j	        j
        |j        g|g|          }t          || j                  }	nN || j	        j
        t                      |                    |          g|          }
t          |
| j                  }	t!          ||	          S )a  
        Add a new column to the dataset via an expression.

        This method allows you to add a new column to a dataset by applying an
        expression. The expression can be composed of existing columns, literals,
        and user-defined functions (UDFs).

        Examples:
            >>> import ray
            >>> from ray.data.expressions import col
            >>> ds = ray.data.range(100)
            >>> # Add a new column 'id_2' by multiplying 'id' by 2.
            >>> ds.with_column("id_2", col("id") * 2).show(2)
            {'id': 0, 'id_2': 0}
            {'id': 1, 'id_2': 2}

            >>> # Using a UDF with with_column
            >>> from ray.data.datatype import DataType
            >>> from ray.data.expressions import udf
            >>> import pyarrow.compute as pc
            >>>
            >>> @udf(return_dtype=DataType.int32())
            ... def add_one(column):
            ...     return pc.add(column, 1)
            >>>
            >>> ds.with_column("id_plus_one", add_one(col("id"))).show(2)
            {'id': 0, 'id_plus_one': 1}
            {'id': 1, 'id_plus_one': 2}

        Args:
            column_name: The name of the new column.
            expr: An expression that defines the new column values.
            **ray_remote_args: Additional resource requirements to request from
                Ray for the map tasks (e.g., `num_gpus=1`).

        Returns:
            A new dataset with the added column evaluated via the expression.
        r   )r8   )Download)DownloadExpr)uri_column_namesoutput_bytes_column_namesr   )exprsr   )1ray.data._internal.logical.operators.map_operatorr8   8ray.data._internal.logical.operators.one_to_one_operatorr   ray.data.expressionsr   r   r   r   r   r   uri_column_namer,   r   rw   aliasr   )r   r   r   r   r8   r   r   r   download_opr   
project_ops              r   with_columnzDataset.with_column(  s   \ 	NMMMMMUUUUUU 	655555z  dL)) 	A"("&"&"6!7+6- /	  K '{DLAALL "&zz4::k#:#:; /  J
 'z4<@@Lt\***r   zUse `with_column` API instead)r   pandas)r   r   r   rx   c                    g d}|vrt          d| d           dt          dt          ffd}t                    s"t          d                                         | j        |f||dd	|S )
am  Add the given column to the dataset.

        A function generating the new column values given the batch in pyarrow or pandas
        format must be specified. This function must operate on batches of
        `batch_format`.

        Examples:


            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.schema()
            Column  Type
            ------  ----
            id      int64

            Add a new column equal to ``id * 2``.

            >>> ds.add_column("new_id", lambda df: df["id"] * 2).schema()
            Column  Type
            ------  ----
            id      int64
            new_id  int64

        Time complexity: O(dataset size / parallelism)

        Args:
            col: Name of the column to add. If the name already exists, the
                column is overwritten.
            fn: Map function generating the column values given a batch of
                records in pandas format.
            batch_format: If ``"default"`` or ``"numpy"``, batches are
                ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
                ``pandas.DataFrame``. If ``"pyarrow"``, batches are
                ``pyarrow.Table``. If ``"numpy"``, batches are
                ``Dict[str, numpy.ndarray]``.
            compute: This argument is deprecated. Use ``concurrency`` argument.
            concurrency: The maximum number of Ray workers to use concurrently.
            ray_remote_args: Additional resource requirements to request from
                Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
                :func:`ray.remote` for details.
        )r   pyarrownumpyz$batch_format argument must be on of r   batchr   c                 :    |           }dk    rBdd l }t          ||j        |j        |j        f          r| j        |_        || j        d d f<   | S dk    rdd l}t          ||j        |j	        f          sJ dt          |                       | j                                      }|dk    r|                     |          S |                     ||          S t          |t          j                  sJ dt          |                       || <   | S )Nr   r   r   zIFor pyarrow batch format, the function must return a pyarrow Array, got: zGFor numpy batch format, the function must return a numpy.ndarray, got: )r   r   	DataFrameIndexSeriesindexlocr   ArrayChunkedArrayr   schemaget_field_indexappend_column
set_columnnpndarray)r   columnpdpa
column_idxr   rx   r   s        r   
add_columnz&Dataset.add_column.<locals>.add_column  sf   RYYFx''#### fr|RXry&IJJ /#(;FL$*	!!!S&!**$$$$!&28R_*EFF  2#'<<2 2 F #\99#>>
## ..sF;;;''
C@@@
 "&"*55  :+/<<: : 5 $c
r   z`fn` must be callable, got {}T)r   r   r   r   )r   rX   callableformatr   )	r   rx   r   r   r   r   r   accepted_batch_formatsr   s	    ```     r   r   zDataset.add_columnn  s    t "@!?!?555'7M ' '$' '  
&	i &	I &	 &	 &	 &	 &	 &	 &	 &	P || 	I<CCBGGHHHt
%# 
 
 
 
 	
r   )r   r   colsc                    t                    t          t                              k    rt          d           fd} | j        |fdd||d|S )a  Drop one or more columns from the dataset.

        Examples:

            >>> import ray
            >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
            >>> ds.schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double
            petal.length  double
            petal.width   double
            variety       string
            >>> ds.drop_columns(["variety"]).schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double
            petal.length  double
            petal.width   double

        Time complexity: O(dataset size / parallelism)

        Args:
            cols: Names of the columns to drop. If any name does not exist,
                an exception is raised. Column names must be unique.
            compute: This argument is deprecated. Use ``concurrency`` argument.
            concurrency: The maximum number of Ray workers to use concurrently.
            ray_remote_args: Additional resource requirements to request from
                Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
                :func:`ray.remote` for details.
        z/drop_columns expects unique column names, got: c                 .    |                                S r   )drop)r   r   s    r   drop_columnsz*Dataset.drop_columns.<locals>.drop_columns  s    ::d###r   r   T)r   r   r   r   )lensetr   r   )r   r   r   r   r   r  s    `    r   r  zDataset.drop_columns  s    V t99CII&&UtUUVVV	$ 	$ 	$ 	$ 	$  t
" #
 
 
 
 	
r   c                b  
 ddl m
 t          |t                    r 
|          g}nt          |t                    rvt          d |D                       st          d          t          |          t          t          |                    k    rt          d|           
fd|D             }nt          d          ddl
m}  ||	          }| j                                        }t          | j        j        |||
          }t#          || j                  }	t'          ||	          S )a  Select one or more columns from the dataset.

        Specified columns must be in the dataset schema.

        .. tip::
            If you're reading parquet files with :meth:`ray.data.read_parquet`,
            you might be able to speed it up by using projection pushdown; see
            :ref:`Parquet column pruning <parquet_column_pruning>` for details.

        Examples:

            >>> import ray
            >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
            >>> ds.schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double
            petal.length  double
            petal.width   double
            variety       string
            >>> ds.select_columns(["sepal.length", "sepal.width"]).schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double

        Time complexity: O(dataset size / parallelism)

        Args:
            cols: Names of the columns to select. If a name isn't in the
                dataset schema, an exception is raised. Columns also should be unique.
            compute: This argument is deprecated. Use ``concurrency`` argument.
            concurrency: The maximum number of Ray workers to use concurrently.
            ray_remote_args: Additional resource requirements to request from
                Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
                :func:`ray.remote` for details.
        r   )rx   c              3   @   K   | ]}t          |t                    V  d S r   r   str.0rx   s     r   	<genexpr>z)Dataset.select_columns.<locals>.<genexpr>Q  s,      <<z#s++<<<<<<r   z=select_columns requires all elements of 'cols' to be strings.zIselect_columns expected unique column names, got duplicate column names: c                 &    g | ]} |          S  r  )r  crx   s     r   
<listcomp>z*Dataset.select_columns.<locals>.<listcomp>Z  s!    ***SSVV***r   zCselect_columns requires 'cols' to be a string or a list of strings.TaskPoolStrategysizer   r   r   )r   rx   r   r	  listallr   r  r  	TypeErrorray.data._internal.computer  r   r   r8   r   r   r,   r   r   )r   r   r   r   r   r   r  r   	select_opr   rx   s             @r   select_columnszDataset.select_columns  sx   ^ 	-,,,,,dC   	SYYKEEd## 	<<t<<<<<  S   4yyCD		NN** :37: :   +***T***EEU   	@?????""444z  "+	
 
 
	 #9dl;;t\***r   )r   namesc                B   t          |t                    r|st          d          t          |                                          t          t          |                                                    k    rt          d|           t          d |                                D                       st          d          d |                                D             }nt          |t                    r|st          d          t          |          t          t          |                    k    rt          d|           t          d |D                       st          d          | 	                                j
        }t          |          t          |          k    rt          d	| d
| d          d t          ||          D             }n t          dt          |           d          |(t          |t                    st          d| d          ddlm}  ||          }| j                                        }t'          | j        j        t-                      g|||          }	t/          |	| j                  }
t3          ||
          S )a  Rename columns in the dataset.

        Examples:

            >>> import ray
            >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
            >>> ds.schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double
            petal.length  double
            petal.width   double
            variety       string

            You can pass a dictionary mapping old column names to new column names.

            >>> ds.rename_columns({"variety": "category"}).schema()
            Column        Type
            ------        ----
            sepal.length  double
            sepal.width   double
            petal.length  double
            petal.width   double
            category      string

            Or you can pass a list of new column names.

            >>> ds.rename_columns(
            ...     ["sepal_length", "sepal_width", "petal_length", "petal_width", "variety"]
            ... ).schema()
            Column        Type
            ------        ----
            sepal_length  double
            sepal_width   double
            petal_length  double
            petal_width   double
            variety       string

        Args:
            names: A dictionary that maps old column names to new column names, or a
                list of new column names.
            concurrency: The maximum number of Ray workers to use concurrently.
            ray_remote_args: Additional resource requirements to request from
                Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
                :func:`ray.remote` for details.
        z0rename_columns received 'names' with no entries.z9rename_columns received duplicate values in the 'names': c              3   p   K   | ]1\  }}t          |t                    ot          |t                    V  2d S r   r  )r  kvs      r   r  z)Dataset.rename_columns.<locals>.<genexpr>  sP        >Ba
1c""9z!S'9'9     r   zJrename_columns requires both keys and values in the 'names' to be strings.c                 X    g | ]'\  }}t          |                              |          (S r  rx   _renamer  prevnews      r   r  z*Dataset.rename_columns.<locals>.<listcomp>  s0    KKK	cSYY&&s++KKKr   z>rename_columns requires 'names' with at least one column name.c              3   @   K   | ]}t          |t                    V  d S r   r  r
  s     r   r  z)Dataset.rename_columns.<locals>.<genexpr>  s,      ==z#s++======r   zBrename_columns requires all elements in the 'names' to be strings.z!rename_columns requires 'names': z$ length match current schema names: .c                 X    g | ]'\  }}t          |                              |          (S r  r"  r$  s      r   r  z*Dataset.rename_columns.<locals>.<listcomp>  s0    WWW	cSYY&&s++WWWr   zLrename_columns expected names to be either List[str] or Dict[str, str], got Nz;Expected `concurrency` to be an integer or `None`, but got r   r  r  r  )r   dictr   r  valuesr  r  itemsr  r   r  zipr  r   r   r  r  r   r   r8   r   r   rw   r,   r   r   )r   r  r   r   r   current_namesr  r   r   r  r   s              r   rename_columnszDataset.rename_columnsn  s   p eT"" /	 U !STTT5<<>>""c#ellnn*=*=&>&>>> WPUWW     FKkkmm      !%  
 LKU[[]]KKKEEt$$ 	  T   5zzSU__,, WPUWW   ==u=====  X   !KKMM/M=!!SZZ// 6 6 6%26 6 6  
 XWSPU=V=VWWWEE6'+E{{6 6 6  
 ":k3+G+G"&"& & &   	@?????""444z  "::&&+	
 
 
	 #9dl;;t\***r   c       
            t          ||||
          }t          |||	|          }| j                                        }t	          | j        j        ||||||||	  	        }t          || j                  }t          ||          S )a  Apply the given function to each row and then flatten results.

        Use this method if your transformation returns multiple rows for each input
        row.

        You can use either a function or a callable class to perform the transformation.
        For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses
        stateful Ray actors. For more information, see
        :ref:`Stateful Transforms <stateful_transforms>`.

        .. tip::
            :meth:`~Dataset.map_batches` can also modify the number of rows. If your
            transformation is vectorized like most NumPy and pandas operations,
            it might be faster.

        .. warning::
            Specifying both ``num_cpus`` and ``num_gpus`` for map tasks is experimental,
            and may result in scheduling or stability issues. Please
            `report any issues <https://github.com/ray-project/ray/issues/new/choose>`_
            to the Ray team.

        Examples:

            .. testcode::

                from typing import Any, Dict, List
                import ray

                def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
                    return [row] * 2

                print(
                    ray.data.range(3)
                    .flat_map(duplicate_row)
                    .take_all()
                )

            .. testoutput::

                [{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

        Time complexity: O(dataset size / parallelism)

        Args:
            fn: The function or generator to apply to each record, or a class type
                that can be instantiated to create such a callable.
            compute: The compute strategy to use for the map operation.

                * If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

                * Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

                * If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.

                * Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

            fn_args: Positional arguments to pass to ``fn`` after the first argument.
                These arguments are top-level arguments to the underlying Ray task.
            fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
                top-level arguments to the underlying Ray task.
            fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
                You can only provide this if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
                This can only be provided if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            num_cpus: The number of CPUs to reserve for each parallel map worker.
            num_gpus: The number of GPUs to reserve for each parallel map worker. For
                example, specify `num_gpus=1` to request 1 GPU for each parallel map
                worker.
            memory: The heap memory in bytes to reserve for each parallel map worker.
            concurrency: This argument is deprecated. Use ``compute`` argument.
            ray_remote_args_fn: A function that returns a dictionary of remote args
                passed to each map worker. The purpose of this argument is to generate
                dynamic arguments for each actor/task, and will be called each time
                prior to initializing the worker. Args returned from this dict will
                always override the args in ``ray_remote_args``. Note: this is an
                advanced, experimental feature.
            ray_remote_args: Additional resource requirements to request from
                Ray for each map worker. See :func:`ray.remote` for details.

        .. seealso::

            :meth:`~Dataset.map_batches`
                Call this method to transform batches of data.

            :meth:`~Dataset.map`
                Call this method to transform one row at time.
        r   )	input_opr   r   r   r   r   r   r   r   )
rK   rL   r   r   r5   r   r   r,   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   opr   s                   r   flat_mapzDataset.flat_map  s    ` ' 3#	
 
 
 =	
 
 z  '+ 3"71+

 

 

 #2t|44t\***r   c       
            t          |du|dug          }|dk    rt          d          fd}t          ||	|
|          }| j        j        }d}d}d}d}d}d}d}|m |d           ddlm} t          |t                    r8t          j
        dt          d	
           ddlm} |                    |          }n|} ||          }net          j
        d           t          |          s%t          dt!          |          j         d          |}}}}}t%          |||          }t'          ||||||||||
  
        }| j                                        }t-          || j                  }t1          ||          S )ax  Filter out rows that don't satisfy the given predicate.

        You can use either a function or a callable class or an expression to
        perform the transformation.
        For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses
        stateful Ray actors. For more information, see
        :ref:`Stateful Transforms <stateful_transforms>`.

        .. tip::
           If you use the `expr` parameter with a predicate expression, Ray Data
           optimizes your filter with native Arrow interfaces.

        .. deprecated::
           String expressions are deprecated and will be removed in a future version.
           Use predicate expressions from `ray.data.expressions` instead.

        Examples:

            >>> import ray
            >>> from ray.data.expressions import col
            >>> ds = ray.data.range(100)
            >>> # String expressions (deprecated - will warn)
            >>> ds.filter(expr="id <= 4").take_all()
            [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]
            >>> # Using predicate expressions (preferred)
            >>> ds.filter(expr=(col("id") > 10) & (col("id") < 20)).take_all()
            [{'id': 11}, {'id': 12}, {'id': 13}, {'id': 14}, {'id': 15}, {'id': 16}, {'id': 17}, {'id': 18}, {'id': 19}]

        Time complexity: O(dataset size / parallelism)

        Args:
            fn: The predicate to apply to each row, or a class type
                that can be instantiated to create such a callable.
            expr: An expression that represents a predicate (boolean condition) for filtering.
                Can be either a string expression (deprecated) or a predicate expression
                from `ray.data.expressions`.
            fn_args: Positional arguments to pass to ``fn`` after the first argument.
                These arguments are top-level arguments to the underlying Ray task.
            fn_kwargs: Keyword arguments to pass to ``fn``. These arguments are
                top-level arguments to the underlying Ray task.
            fn_constructor_args: Positional arguments to pass to ``fn``'s constructor.
                You can only provide this if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            fn_constructor_kwargs: Keyword arguments to pass to ``fn``'s constructor.
                This can only be provided if ``fn`` is a callable class. These arguments
                are top-level arguments in the underlying Ray actor construction task.
            compute: The compute strategy to use for the map operation.

                * If ``compute`` is not specified for a function, will use ``ray.data.TaskPoolStrategy()`` to launch concurrent tasks based on the available resources and number of input blocks.

                * Use ``ray.data.TaskPoolStrategy(size=n)`` to launch at most ``n`` concurrent Ray tasks.

                * If ``compute`` is not specified for a callable class, will use ``ray.data.ActorPoolStrategy(min_size=1, max_size=None)`` to launch an autoscaling actor pool from 1 to unlimited workers.

                * Use ``ray.data.ActorPoolStrategy(size=n)`` to use a fixed size actor pool of ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n)`` to use an autoscaling actor pool from ``m`` to ``n`` workers.

                * Use ``ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)`` to use an autoscaling actor pool from ``m`` to ``n`` workers, with an initial size of ``initial``.

            num_cpus: The number of CPUs to reserve for each parallel map worker.
            num_gpus: The number of GPUs to reserve for each parallel map worker. For
                example, specify `num_gpus=1` to request 1 GPU for each parallel map
                worker.
            memory: The heap memory in bytes to reserve for each parallel map worker.
            concurrency: This argument is deprecated. Use ``compute`` argument.
            ray_remote_args_fn: A function that returns a dictionary of remote args
                passed to each map worker. The purpose of this argument is to generate
                dynamic arguments for each actor/task, and will be called each time
                prior to initializing the worker. Args returned from this dict will
                always override the args in ``ray_remote_args``. Note: this is an
                advanced, experimental feature.
            ray_remote_args: Additional resource requirements to request from
                Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See
                :func:`ray.remote` for details.
        Nr   z/Exactly one of 'fn' or 'expr' must be provided.c                 >    t          d|  d          d S )Nzwhen 'z]' is used, 'fn_args/fn_kwargs' or 'fn_constructor_args/fn_constructor_kwargs' cannot be used.r   )
param_typer   r   r   r   s    r   _check_fn_params_incompatiblez5Dataset.filter.<locals>._check_fn_params_incompatible  sO    #(&2(4  GZ  G  G  G   54r   r   r   r  zString expressions are deprecated and will be removed in a future version. Use predicate expressions from ray.data.expressions instead. For example: from ray.data.expressions import col; ds.filter(expr=col('column_name') > 5)   )
stacklevel)ExpressionEvaluatorr  z@Use 'expr' instead of 'fn' when possible for performant filters.z*fn must be a UserDefinedFunction, but got z	 instead.)r   r   r   r   )
r1  predicate_exprr   r   r   r   r   r   r   r   )sumr   rL   r   r   r  r  r   r	  r   r   r   ?ray.data._internal.planner.plan_expression.expression_evaluatorr;  parse_native_expressionr   r   __name__rK   r4   r   r   r,   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   provided_paramsr8  r1  r<  	filter_fnfilter_fn_argsfilter_fn_kwargsfilter_fn_constructor_argsfilter_fn_constructor_kwargsfilter_computer  r;  	filter_opr   r   s       ````                     r   filterzDataset.filtery  si   ~ r~t4/?@AAaNOOO		 		 		 		 		 		 		 		 =	
 
 %))-37	2659>B"AE$48))&111CCCCCC $$$ &= '          "5!L!LT!R!R "&--;???NNMR   B<<  4Bxx(4 4 4  
 I$N()<&+@(1$7'	  N )"& :">"1+
 
 
	 z  "9dl;;t\***r   )shufflekeyssort
num_blockstarget_num_rows_per_blockrJ  rK  rL  c                   |D|t          j        d           |durt          j        d           |rt          j        d           ||t          d          ||t          d          ||rt          d          | j                                        }|t          | j        j        |	          }nt          | j        j        ||||
          }t          || j
                  }t          ||          S )aD  Repartition the :class:`Dataset` into exactly this number of
        :ref:`blocks <dataset_concept>`.

        This method can be useful to tune the performance of your pipeline. To learn
        more, see :ref:`Advanced: Performance Tips and Tuning <data_performance_tips>`.

        If you're writing data to files, you can also use this method to change the
        number of output files. To learn more, see
        :ref:`Changing the number of output files <changing-number-output-files>`.

        .. note::

            Repartition has three modes:

             * When ``num_blocks`` and ``shuffle=True`` are specified Ray Data performs a full distributed shuffle producing exactly ``num_blocks`` blocks.
             * When ``num_blocks`` and ``shuffle=False`` are specified, Ray Data does NOT perform full shuffle, instead opting in for splitting and combining of the blocks attempting to minimize the necessary data movement (relative to full-blown shuffle). Exactly ``num_blocks`` will be produced.
             * If ``target_num_rows_per_block`` is set (exclusive with ``num_blocks`` and ``shuffle``), streaming repartitioning will be executed, where blocks will be made to carry no more than ``target_num_rows_per_block`` rows. Smaller blocks will be combined into bigger ones up to ``target_num_rows_per_block`` as well.

            .. image:: /data/images/dataset-shuffle.svg
                :align: center

            ..
                https://docs.google.com/drawings/d/132jhE3KXZsf29ho1yUdPrCHB9uheHBWHJhDQMXqIVPA/edit

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100).repartition(10).materialize()
            >>> ds.num_blocks()
            10

        Time complexity: O(dataset size / parallelism)

        Args:
            num_blocks: Number of blocks after repartitioning.
            target_num_rows_per_block: [Experimental] The target number of rows per block to
                repartition. Performs streaming repartitioning of the dataset (no shuffling).
                Note that either `num_blocks` or
                `target_num_rows_per_block` must be set, but not both. When
                `target_num_rows_per_block` is set, it only repartitions
                :class:`Dataset` :ref:`blocks <dataset_concept>` that are larger than
                `target_num_rows_per_block`. Note that the system will internally
                figure out the number of rows per :ref:`blocks <dataset_concept>` for
                optimal execution, based on the `target_num_rows_per_block`. This is
                the current behavior because of the implementation and may change in
                the future.
            shuffle: Whether to perform a distributed shuffle during the
                repartition. When shuffle is enabled, each output block
                contains a subset of data rows from each input block, which
                requires all-to-all data movement. When shuffle is disabled,
                output blocks are created from adjacent input blocks,
                minimizing data movement.
            keys: List of key columns repartitioning will use to determine which
                partition will row belong to after repartitioning (by applying
                hash-partitioning algorithm to the whole dataset). Note that, this
                config is only relevant when `DataContext.use_hash_based_shuffle`
                is set to True.
            sort: Whether the blocks should be sorted after repartitioning. Note,
                that by default blocks will be sorted in the ascending order.

        Note that you must set either `num_blocks` or `target_num_rows_per_block`
        but not both.
        Additionally note that this operation materializes the entire dataset in memory
        when you set shuffle to True.

        Returns:
            The repartitioned :class:`Dataset`.
        Nz:`keys` is ignored when `target_num_rows_per_block` is set.Fz:`sort` is ignored when `target_num_rows_per_block` is set.z=`shuffle` is ignored when `target_num_rows_per_block` is set.z>Either `num_blocks` or `target_num_rows_per_block` must be setzROnly one of `num_blocks` or `target_num_rows_per_block` must be set, but not both.z@`shuffle` must be False when `target_num_rows_per_block` is set.)rN  )num_outputsrJ  rK  rL  )r   r   r   r   r   r9   r   r   r/   r,   r   r   )	r   rM  rN  rJ  rK  rL  r   r2  r   s	            r   repartitionzDataset.repartition@  s`   \ %0P   5  P    S   %>%FP   ")B)N   
 %0W0R   z  $0%"&*C  BB
 "&&  B #2t|44t\***r   )seedrM  rR  c                    |t          d          | j                                        }t          | j        j        ||          }t          || j                  }t          ||          S )aj  Randomly shuffle the rows of this :class:`Dataset`.

        .. tip::

            This method can be slow. For better performance, try
            :ref:`Iterating over batches with shuffling <iterating-over-batches-with-shuffling>`.
            Also, see :ref:`Optimizing shuffles <optimizing_shuffles>`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.random_shuffle().take(3)  # doctest: +SKIP
            {'id': 41}, {'id': 21}, {'id': 92}]
            >>> ds.random_shuffle(seed=42).take(3)  # doctest: +SKIP
            {'id': 77}, {'id': 21}, {'id': 63}]

        Time complexity: O(dataset size / parallelism)

        Args:
            seed: Fix the random seed to use, otherwise one is chosen
                based on system randomness.

        Returns:
            The shuffled :class:`Dataset`.
        Nz`num_blocks` parameter is deprecated in Ray 2.9. random_shuffle() does not support to change the number of output blocks. Use repartition() instead.)rR  r   )	r   r   r   r.   r   r   r,   r   r   )r   rR  rM  r   r   r2  r   s          r   random_shufflezDataset.random_shuffle  s{    F !$)  
 z  "+
 
 

 #2t|44t\***r   rR  c                    | j                                         }t          | j        j        |          }t          || j                  }t          ||          S )a,  Randomly shuffle the :ref:`blocks <dataset_concept>` of this :class:`Dataset`.

        This method is useful if you :meth:`~Dataset.split` your dataset into shards and
        want to randomize the data in each shard without performing a full
        :meth:`~Dataset.random_shuffle`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.take(5)
            [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]
            >>> ds.randomize_block_order().take(5)  # doctest: +SKIP
            {'id': 15}, {'id': 16}, {'id': 17}, {'id': 18}, {'id': 19}]

        Args:
            seed: Fix the random seed to use, otherwise one is chosen
                based on system randomness.

        Returns:
            The block-shuffled :class:`Dataset`.
        rU  )r   r   r-   r   r   r,   r   r   )r   rR  r   r2  r   s        r   randomize_block_orderzDataset.randomize_block_order  sX    : z  "
 
 
 #2t|44t\***r   fractionc                ,   ddl ddl| j                                        dk    rt	          d          dk     sdk    rt	          d          ddlm dt          dt          t                   ffd	}| 
                    ||gdd
          S )am  Returns a new :class:`Dataset` containing a random fraction of the rows.

        .. note::

            This method returns roughly ``fraction * total_rows`` rows. An exact number
            of rows isn't guaranteed.

        Examples:
            >>> import ray
            >>> ds1 = ray.data.range(100)
            >>> ds1.random_sample(0.1).count()  # doctest: +SKIP
            10
            >>> ds2 = ray.data.range(1000)
            >>> ds2.random_sample(0.123, seed=42).take(2)  # doctest: +SKIP
            [{'id': 2}, {'id': 9}]
            >>> ds2.random_sample(0.123, seed=42).take(2)  # doctest: +SKIP
            [{'id': 2}, {'id': 9}]

        Args:
            fraction: The fraction of elements to sample.
            seed: Seeds the python random pRNG generator.

        Returns:
            Returns a :class:`Dataset` containing the sampled rows.
        r   Nz$Cannot sample from an empty Dataset.r   z!Fraction must be between 0 and 1.TaskContextr   rR  c                 j                                    }d|j        v r|j        d         }n[|)t          j                                        }||j        d<   n0t          j                            |j        |g          }||j        d<   t          j        |                    t          |                     k               d         }t          | j	                  r| 
                    |          S t          | j                  r| j        |d d f         S t          dt          |                      )Nrngr   zUnsupported batch type: )get_currentkwargsr   randomdefault_rngtask_idxwherer  r   Tabletaker   ilocr   r   )	r   rR  ctxr]  mask_idxr[  rX  r   r   s	        r   random_samplez,Dataset.random_sample.<locals>.random_sample<  s   ))++C
""j'i++--$'
5!!i++S\4,@AA$'
5!x

3u:: 6 6 ABB1EH%** /zz(+++E2<00 /z(AAA+..EUEEFFFr   )r   r   r   )r   r   r   initial_num_blocksr   4ray.data._internal.execution.interfaces.task_contextr[  rX   r   r   r   )r   rX  rR  ri  r[  r   r   s    `  @@@r   ri  zDataset.random_sample  s    : 	:((**a//CDDDa<<8a<<@AAATTTTTT	G 	G(3- 	G 	G 	G 	G 	G 	G 	G 	G 	G( F	   
 
 	
r   )equallocality_hintsnrl  rm  rr   c                   | j                                         }t          | j        j        |||          }t          || j                  }t          ||          }|                    | j	                   t          j        |||          S )a  Returns ``n`` :class:`DataIterators <ray.data.DataIterator>` that can
        be used to read disjoint subsets of the dataset in parallel.

        This method is the recommended way to consume :class:`Datasets <Dataset>` for
        distributed training.

        Streaming split works by delegating the execution of this :class:`Dataset` to a
        coordinator actor. The coordinator pulls block references from the executed
        stream, and divides those blocks among ``n`` output iterators. Iterators pull
        blocks from the coordinator actor to return to their caller on ``next``.

        The returned iterators are also repeatable; each iteration will trigger a
        new execution of the Dataset. There is an implicit barrier at the start of
        each iteration, which means that `next` must be called on all iterators before
        the iteration starts.

        .. warning::

            Because iterators are pulling blocks from the same :class:`Dataset`
            execution, if one iterator falls behind, other iterators may be stalled.

        Examples:

            .. testcode::

                import ray

                ds = ray.data.range(100)
                it1, it2 = ds.streaming_split(2, equal=True)

            Consume data from iterators in parallel.

            .. testcode::

                @ray.remote
                def consume(it):
                    for batch in it.iter_batches():
                       pass

                ray.get([consume.remote(it1), consume.remote(it2)])

            You can loop over the iterators multiple times (multiple epochs).

            .. testcode::

                @ray.remote
                def train(it):
                    NUM_EPOCHS = 2
                    for _ in range(NUM_EPOCHS):
                        for batch in it.iter_batches():
                            pass

                ray.get([train.remote(it1), train.remote(it2)])

            The following remote function call blocks waiting for a read on ``it2`` to
            start.

            .. testcode::
                :skipif: True

                ray.get(train.remote(it1))

        Args:
            n: Number of output iterators to return.
            equal: If ``True``, each output iterator sees an exactly equal number
                of rows, dropping data if necessary. If ``False``, some iterators may
                see slightly more or less rows than others, but no data is dropped.
            locality_hints: Specify the node ids corresponding to each iterator
                location. Dataset will try to minimize data movement based on the
                iterator output locations. This list must have length ``n``. You can
                get the current node id of a task or actor by calling
                ``ray.get_runtime_context().get_node_id()``.

        Returns:
            The output iterator splits. These iterators are Ray-serializable and can
            be freely passed to any Ray task or actor.

        .. seealso::

            :meth:`Dataset.split`
                Unlike :meth:`~Dataset.streaming_split`, :meth:`~Dataset.split`
                materializes the dataset in memory.
        )
num_splitsrl  rm  )r   r   r<   r   r   r,   r   r   r   _uuidr+   create)r   rn  rl  rm  r   r2  r   split_datasets           r   streaming_splitzDataset.streaming_splitW  s    x z  ")	
 
 
 #2t|44l33
+++&-mQOOOr   MaterializedDatasetc          
      2
   !"# dk    rt          d d          |rY|W|                                 }|z  ##fdt          ddz             D             }|                     |          }|d         S |r6t	          |          k    r#t          dt	          |           d d	          | j                                         d
"| j                                        }t           j	         \  }}	|t          j        |          }
t          j        |	          }g }t          |
|          D ]\  }} "fdt          ||          D             }t          t          |          | j                  }|                    t!          t#          || j                                                  |                     |S t'          t          ||	                    !dt(          dt*          t,                   dt.          t,          t(          f         ffd}dt*          t0          t2                            dt.          t4          t*          t0          t2                            f         fd}dt*          t,                   dt.          t,          t4          f         fd} |t	          |          |          } ||          } ||          }t7          j        t:                    }|D ]r}||         }||         }||         }g }|rOt	          |          |k     r<|                    |                                           |rt	          |          |k     <|||<   st;          t>          j         !                    |"                                                    }|D ]m}t	          ||                   ||         k     rL||                             |                                           t	          ||                   ||         k     Lnt	          |          dk    sJ t	          |                      g }|D ]_}||         }!fd|D             }	tG          tI          t          ||	                    " j%                   |                                `|rtM          |"          }g }|D ]n t          t           g          | j                  }|                    t!          t#          || j                                                  |                     o|S )a  Materialize and split the dataset into ``n`` disjoint pieces.

        This method returns a list of ``MaterializedDataset`` that can be passed to Ray
        Tasks and Actors and used to read the dataset rows in parallel.

        Examples:

            .. testcode::

                @ray.remote
                class Worker:

                    def train(self, data_iterator):
                        for batch in data_iterator.iter_batches(batch_size=8):
                            pass

                workers = [Worker.remote() for _ in range(4)]
                shards = ray.data.range(100).split(n=4, equal=True)
                ray.get([w.train.remote(s) for w, s in zip(workers, shards)])

        Time complexity: O(1)

        Args:
            n: Number of child datasets to return.
            equal: Whether to guarantee each split has an equal
                number of records. This might drop records if the rows can't be
                divided equally among the splits.
            locality_hints: [Experimental] A list of Ray actor handles of size ``n``.
                The system tries to co-locate the blocks of the i-th dataset
                with the i-th actor to maximize data locality.

        Returns:
            A list of ``n`` disjoint dataset splits.

        .. seealso::

            :meth:`Dataset.split_at_indices`
                Unlike :meth:`~Dataset.split`, which splits a dataset into approximately
                equal splits, :meth:`Dataset.split_proportionately` lets you split a
                dataset into different sizes.

            :meth:`Dataset.split_proportionately`
                This method is equivalent to :meth:`Dataset.split_at_indices` if
                you compute indices manually.

            :meth:`Dataset.streaming_split`.
                Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
                doesn't materialize the dataset in memory.
        r   zThe number of splits z is not positive.Nc                     g | ]}|z  S r  r  )r  isplit_indexs     r   r  z!Dataset.split.<locals>.<listcomp>  s    FFF[1_FFFr   r   zThe length of locality_hints z$ doesn't equal the number of splits r(  Fc                 J    g | ]\  }}t          ||fgj                    S )owns_blocksr   r&   r   )r  bmbundleowned_by_consumers      r   r  z!Dataset.split.<locals>.<listcomp>  sN        1 Q.?    r   
input_datarM  actorsr   c                     t          |          }| |z  }| |z  z
  }i }t          |          D ] \  }}|||<   ||k     r||xx         dz  cc<   !|S )zGiven the total number of blocks and a list of actors, calcuate
            the expected number of blocks to allocate for each actor.
            r   )r  	enumerate)	rM  r  
num_actorsnum_blocks_per_actornum_blocks_leftnum_blocks_by_actorrx  actorrn  s	           r   build_allocation_size_mapz0Dataset.split.<locals>.build_allocation_size_map=  s     VJ#-#; (+?!+CCO"$%f-- 4 45-A#E*&&'...!3...&&r   blocksc                 "   t           j                            |           }t          j        t
                    }| D ]S}|                    |i                               dg           }|r|d         nd}||                             |           T|S )zBuild the reverse index from node_id to block_refs. For
            simplicity, if the block is stored on multiple nodes we
            only pick the first one.
            node_idsr   N)rayexperimentalget_object_locationscollectionsdefaultdictr  getappend)r  block_ref_locationsblock_refs_by_node_id	block_refr  node_ids         r   build_block_refs_by_node_idz2Dataset.split.<locals>.build_block_refs_by_node_idM  s     #&"2"G"G"O"O$/$;D$A$A!# A A	.229bAAEEjRTUU)1;(1++t%g.55i@@@@((r   c                 f    t           j        j                                        fd| D             S )z(Build a map from a actor to its node_id.c                     i | ]W}|                     |j                                        i                                d i                                d          XS )AddressNodeID)r  	_actor_idhex)r  r  actors_states     r   
<dictcomp>zADataset.split.<locals>.build_node_id_by_actor.<locals>.<dictcomp>_  se         |''(;(;(=(=rBBY##X  r   )r  _privatestater  )r  r  s    @r   build_node_id_by_actorz-Dataset.split.<locals>.build_node_id_by_actor\  sF    <-4466L    $	   r   c                      g | ]
}|         S r  r  )r  r~  metadata_mappings     r   r  z!Dataset.split.<locals>.<listcomp>  s    <<<(+<<<r   r{  )'r   countrangesplit_at_indicesr  r   executestatsr-  r  r   array_splitr,   r2   r   r  ru  r@   r   r*  r   r
   r   r   ri   rV   r	  r  r  r  pop	itertoolschainfrom_iterabler+  r&   tupler   r%   )$r   rn  rl  rm  r  split_indicesshardsr  
block_refsmetadatablock_refs_splitsmetadata_splitssplit_datasetsblock_refs_splitmetadata_splitref_bundlesr   r  r  r  expected_block_count_by_actorr  node_id_by_actorallocation_per_actorr  r  matching_blocksexpected_block_count
allocationremaining_block_refsper_split_bundlesr  r  r  r  ry  s$    `                              @@@@r   splitzDataset.split  s   l 66IQIIIJJJ
  	^+JJLLE1*K GFFFeAq1uooFFFM**=99F"1": 	c.11Q66;N0C0C ; ;67; ; ;  
 !J..00!
  """FM2
H! "z1 = = nXq99ON47!?5 5  0 .     !$$4n E E	    +555L    %%'%eT\->->-@-@AA$     "!J 9 9::"	'	'%)#Y	'#s(^	' 	' 	' 	' 	' 	' 	)5)*	)#tIe,--.	) 	) 	) 	)	49 	c3h 	 	 	 	 )B(A
OO^)
 )
% !< ;J G G11.AA*6t<<
 $ 	5 	5E&u-G3G<O#@#G J! 9c*oo8L&L&L!!/"5"5"7"7888 " 9c*oo8L&L&L*4 ''  $O))*?*F*F*H*HII 
  
 $ 	O 	OE(/003PQV3WWW$U+223G3K3K3M3MNNN (/003PQV3WWW '((A---s3G/H/H---# 	- 	-E)%0F<<<<V<<<Hc&(++,,-}  F
 $$V,,,, 	P )*;=N O O' 	 	F&yVH'E'E'Et|TTL!!#!%):):)<)<==      r   indicesc           
          t          |          dk     rt          d          t          |          |k    rt          d          |d         dk     rt          d          t          j                    }| j                                        t          j        |d          \  }}t          j                    |z
  }| j        	                                }g }t          ||          D ]\  }}	t          d|	i|          }
||
_        fd	t          ||	          D             }t          t          |
          | j                  }|                    t#          t%          |
| j                                                  |                     |S )a_  Materialize and split the dataset at the given indices (like ``np.split``).

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> d1, d2, d3 = ds.split_at_indices([2, 5])
            >>> d1.take_batch()
            {'id': array([0, 1])}
            >>> d2.take_batch()
            {'id': array([2, 3, 4])}
            >>> d3.take_batch()
            {'id': array([5, 6, 7, 8, 9])}

        Time complexity: O(num splits)

        Args:
            indices: List of sorted integers which indicate where the dataset
                are split. If an index exceeds the length of the dataset,
                an empty dataset is returned.

        Returns:
            The dataset splits.

        .. seealso::

            :meth:`Dataset.split`
                Unlike :meth:`~Dataset.split_at_indices`, which lets you split a
                dataset into different sizes, :meth:`Dataset.split` splits a dataset
                into approximately equal splits.

            :meth:`Dataset.split_proportionately`
                This method is equivalent to :meth:`Dataset.split_at_indices` if
                you compute indices manually.

            :meth:`Dataset.streaming_split`.
                Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
                doesn't materialize the dataset in memory.
        r   z$indices must be at least of length 1zindices must be sortedr   zindices must be positiveFSplitr  parentc                 J    g | ]\  }}t          ||fgd j                   S )Fr{  r}  )r  r~  r  r  s      r   r  z,Dataset.split_at_indices.<locals>.<listcomp>  sD       Aq Aq6(fmLLL  r   r  )r  r   sortedtimeperf_counterr   r  rD   r  r  r-  rE   time_total_sr,   r2   r   r  ru  r@   r   )r   r  
start_timer  r  split_durationparent_statssplitsbsmsr  r  r   r  s                @r   r  zDataset.split_at_indices  s   T w<<!CDDD'??g%%56661:>>7888&((
 J..00,M
 

 *,,z9z''))&(++ 	 	FB 7B-MMME!/E   BKK  K '[111 L
 MM#!%):):)<)<==      r   proportionsc                    t          |          dk     rt          d          t          |          dk    rt          d          t          d |D                       rt          d          |                                 t          j        |          }fd|D             }d}t          t          |          dz
  d	d	          D ]<}||xx         |z  cc<   ||         ||dz            k    r|dz  }||xx         dz  cc<   =t          d
 |D                       rt          d          |                     |          S )a  Materialize and split the dataset using proportions.

        A common use case for this is splitting the dataset into train
        and test sets (equivalent to eg. scikit-learn's ``train_test_split``).
        For a higher level abstraction, see :meth:`Dataset.train_test_split`.

        This method splits datasets so that all splits
        always contains at least one row. If that isn't possible,
        an exception is raised.

        This is equivalent to caulculating the indices manually and calling
        :meth:`Dataset.split_at_indices`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> d1, d2, d3 = ds.split_proportionately([0.2, 0.5])
            >>> d1.take_batch()
            {'id': array([0, 1])}
            >>> d2.take_batch()
            {'id': array([2, 3, 4, 5, 6])}
            >>> d3.take_batch()
            {'id': array([7, 8, 9])}

        Time complexity: O(num splits)

        Args:
            proportions: List of proportions to split the dataset according to.
                Must sum up to less than 1, and each proportion must be bigger
                than 0.

        Returns:
            The dataset splits.

        .. seealso::

            :meth:`Dataset.split`
                Unlike :meth:`~Dataset.split_proportionately`, which lets you split a
                dataset into different sizes, :meth:`Dataset.split` splits a dataset
                into approximately equal splits.

            :meth:`Dataset.split_at_indices`
                :meth:`Dataset.split_proportionately` uses this method under the hood.

            :meth:`Dataset.streaming_split`.
                Unlike :meth:`~Dataset.split`, :meth:`~Dataset.streaming_split`
                doesn't materialize the dataset in memory.
        r   z(proportions must be at least of length 1z#proportions must sum to less than 1c              3   "   K   | ]
}|d k    V  dS r   Nr  )r  ps     r   r  z0Dataset.split_proportionately.<locals>.<genexpr>-	  s&      ++!qAv++++++r   z!proportions must be bigger than 0c                 4    g | ]}t          |z            S r  )r   )r  
proportiondataset_lengths     r   r  z1Dataset.split_proportionately.<locals>.<listcomp>2	  s3     
 
 
1;C+,,
 
 
r   r   r9  r   c              3   "   K   | ]
}|d k    V  dS r  r  )r  rx  s     r   r  z0Dataset.split_proportionately.<locals>.<genexpr>=	  s&      --!qAv------r   z<Couldn't create non-empty splits with the given proportions.)	r  r   r=  anyr  r   cumsumr  r  )r   r  cumulative_proportionsr  subtractrx  r  s         @r   split_proportionatelyzDataset.split_proportionately  s   l {aGHHH{q  BCCC++{+++++ 	B@AAA!#;!7!7
 
 
 
?U
 
 

 s=))A-r266 	& 	&A!(Q=Q#777Aa   A%   --}----- 	N   $$]333r   )rJ  rR  stratify	test_sizer  )ru  ru  c                   | }|r|                     |          }t          |t          t          f          s t	          dt          |           d          |r|t          d          ||                     |||          S t          |t                    r.|                     |           |	                    d|z
  g          S | 
                    ||           |                                }|                    ||z
  g          S )a  Materialize and split the dataset into train and test subsets.

        Examples:

            >>> import ray
            >>> ds = ray.data.range(8)
            >>> train, test = ds.train_test_split(test_size=0.25)
            >>> train.take_batch()
            {'id': array([0, 1, 2, 3, 4, 5])}
            >>> test.take_batch()
            {'id': array([6, 7])}

        Args:
            test_size: If float, should be between 0.0 and 1.0 and represent the
                proportion of the dataset to include in the test split. If int,
                represents the absolute number of test samples. The train split
                always complements the test split.
            shuffle: Whether or not to globally shuffle the dataset before splitting.
                Defaults to ``False``. This may be a very expensive operation with a
                large dataset.
            seed: Fix the random seed to use for shuffle, otherwise one is chosen
                based on system randomness. Ignored if ``shuffle=False``.
            stratify: Optional column name to use for stratified sampling. If provided,
                the splits will maintain the same proportions of each class in the
                stratify column across both train and test sets.

        Returns:
            Train and test subsets as two ``MaterializedDatasets``.

        .. seealso::

            :meth:`Dataset.split_proportionately`
        rU  z%`test_size` must be int or float got r(  NzCannot specify both 'shuffle=True' and 'stratify' parameters. Stratified splitting maintains class proportions and is incompatible with shuffling.r   )rT  r   r   floatr  r   r   _stratified_train_test_split_validate_test_size_floatr  _validate_test_size_intr  r  )r   r  rJ  rR  r  r   	ds_lengths          r   train_test_splitzDataset.train_test_splitD	  s(   V  	."""--B)c5\22 	XVDOOVVVWWW  	x+g   44RHMMM i'' 	@**9555++Q]O<<<((B777

I&&	I(='>???r   c                    t          t                    r|                     |          }|z  n|                                fd}|                    |                              |                                          }|                    d                               t          g          }|                    d                               t          g          }||fS )a7  Perform stratified train-test split on the dataset.

        Args:
            ds: The dataset to split.
            test_size: Test size as int or float.
            stratify: Column name to use for stratified sampling.

        Returns:
            Train and test subsets as two MaterializedDatasets.
        c                     t          |           }t          |z            }t          j        dg||z
  z  dg|z  z             | t          <   | S )NTF)r  r   r   array_TRAIN_TEST_SPLIT_COLUMN)group_batchrn  
test_countr  s      r   add_train_flagz<Dataset._stratified_train_test_split.<locals>.add_train_flag	  sZ    K  AQ]++J46H!j.)UGj,@@5 5K01 r   c                     | t                    S r   r  rows    r   <lambda>z6Dataset._stratified_train_test_split.<locals>.<lambda>	  s    45 r   c                     | t                     S r   r  r  s    r   r  z6Dataset._stratified_train_test_split.<locals>.<lambda>	  s    C 899 r   )
r   r   r  r  groupby
map_groupsmaterializerI  r  r  )	r   r   r  r  r  r  split_dstrain_dstest_dss	     `      r   r  z$Dataset._stratified_train_test_split	  s     i%% 	644YCCI!I-II**9555	 	 	 	 	 ::h''22>BBNNPP??55
 

,01
2
2 	 //99
 

,01
2
2 	   r   c                 D    |dk    s|dk    rt          d| d          dS )zValidate test_size when it's a float.

        Args:
            test_size: Test size as float between 0 and 1.

        Raises:
            ValueError: If test_size is not in valid range.
        r   r   zLIf `test_size` is a float, it must be bigger than 0 and smaller than 1. Got r(  Nr6  )r   r  s     r   r  z!Dataset._validate_test_size_float	  sE     >>Y!^^,(, , ,   ,^r   c                 r    |                                 }|dk    s||k    rt          d| d| d          |S )a1  Validate test_size when it's an int and return dataset length.

        Args:
            test_size: Test size as int.
            ds: Dataset to validate against.

        Returns:
            Dataset length for reuse.

        Raises:
            ValueError: If test_size is not in valid range.
        r   z]If `test_size` is an int, it must be bigger than 0 and smaller than the size of the dataset (z). Got r(  )r  r   )r   r  r   r  s       r   r  zDataset._validate_test_size_int	  sb     HHJJ	>>Y)33$1:$ $ $ $ $  
 r   )r   r   r`  )
split_typehash_columnrR  r  )hashr`  r  )r   r   c                   ddl ddlddlm dk    sdk    rt	          d          |dk    rt	          d          |dk    rt	          d	          d
j        ffd}d
j        dt          j        j        f         ffd}|dk    r | j        |fddi|}n:|dk    r"t	          d           | j        |fddi|}nt	          d|           |                    t           d          
                    t          g          }	|                    t           d          
                    t          g          }
|	|
fS )a  split the dataset into train and test subsets in a streaming manner.
        This method is recommended for large datasets.

        The split type can be either "hash" or "random".
        - "random": The dataset is split into random train and test subsets.
        - "hash": The dataset is split into train and test subsets based on the hash of the key column.

        .. tip::
            Make sure to set the `preserve_order` flag in the `ExecutionOptions` to True
            to ensure that the split is deterministic across pipeline executions. This is important
            to avoid test rows to end up in the train set and vice versa on multiple executions.
            This can be set with ``ray.data.DataContext.get_current().execution_options.preserve_order = True``.

        Examples:
            Examples with Random split:

            >>> import ray
            >>> ctx = ray.data.DataContext.get_current()
            >>> ctx.execution_options.preserve_order = True
            >>> ds = ray.data.range(8)
            >>> train, test = ds.streaming_train_test_split(test_size=0.25, seed=0)
            >>> train.count()
            6
            >>> test.count()
            2
            >>> ctx.execution_options.preserve_order = False

            Examples with Hash split:

            >>> import ray
            >>> ds = ray.data.range(8)
            >>> train, test = ds.streaming_train_test_split(test_size=0.25, split_type="hash", hash_column="id")
            >>> train.take_batch()
            {'id': array([0, 2, 3, 4, 5, 6])}
            >>> test.take_batch()
            {'id': array([1, 7])}

        Args:
            test_size: The proportion of the dataset to include in the test split.
                Must be between 0.0 and 1.0.
            split_type: The type of split to perform. Can be "hash" or "random".
            hash_column: The column to use for the hash split. Required for hash split and
                ignored for random split.
            seed: The seed to use for the random split. Ignored for hash split.
            **ray_remote_kwargs: Additional kwargs to pass to the Ray remote function.

        Returns:
            Train and test subsets as two ``Dataset``.

        .. seealso::

            :meth:`Dataset.train_test_split`
        r   NrZ  r   z"test_size must be between 0 and 1.r  z$seed is not supported for hash splitr`  z-hash_column is not supported for random splitr   c                                                     }d|j        v r|j        d         }nb0t          j                            |j        g          }||j        d<   n0t          j                            |j        g          }||j        d<   |                    | j                  dz
  k     }|                     t          	                    |
                                                    S )a]  
            Perform a random split on a batch: each row goes to train with probability (1 - test_proportion),
            or to test otherwise.

            This version ensures that the random choices are **stable per Ray task execution** by seeding
            the RNG with a combination of a user-specified seed and the Ray task ID.
            train_test_split_rngNr   r   )r^  r_  r   r`  ra  rb  num_rowsr   r  r  bool_)r   rg  r]  is_trainr[  r   rR  r  s       r   random_splitz8Dataset.streaming_train_test_split.<locals>.random_split%
  s     ))++C%33j!78i++S\N;;58
122i++S\4,@AA58
12 zz%.11Q]CH&&("((8"((**(*M*M  r   r   c                 >   dt           dt          ffd| j        v r|                                          }nt	          d d                              fd|D                                                       }|                     t          |          S )Nkeyr   c                     t                                                   t          |                                           d                                          d          }|dz
  dz  k     rdndS )N   )digest_sizebigr   l            TF)r   
from_bytesblake2br	  encodedigest)r  hhashlibr  s     r   key_to_bucketzMDataset.streaming_train_test_split.<locals>.hash_split.<locals>.key_to_bucket>
  sg    NNOOCHHOO$5$51OEELLNNPU   !A	Mg#>>>ttEIr   zKey column z not found in batchc                 &    g | ]} |          S r  r  )r  r  r  s     r   r  zJDataset.streaming_train_test_split.<locals>.hash_split.<locals>.<listcomp>K
  s#    "F"F"F#==#5#5"F"F"Fr   r	  )	r   r   column_namesto_numpyr   r  r  r   r  )r   rK  
bucket_arrr  r  r  r   r  s      @r   
hash_splitz6Dataset.streaming_train_test_split.<locals>.hash_split=
  s    J3 J3 J J J J J J J e000[)2244 !O{!O!O!OPPP"F"F"F"F"F"F"FRXXZZXXJ&&'?LLLr   r   r   z&hash_column is required for hash splitzInvalid split type: z == True)r   z	 == False)r  r   rk  r[  r   rd  r  r   rI  r  r  )r   r  r  r  rR  ray_remote_kwargsr  r  	buckettedds_trainds_testr[  r  r   s    ` ``      @@@r   streaming_train_test_splitz"Dataset.streaming_train_test_split	  sG   ~ 	TTTTTT>>Y!^^ABBB
f 4 4CDDD"zX'='=LMMM	 	 	 	 	 	 	 	 	 	0	Mbh 	M5281C+D 	M 	M 	M 	M 	M 	M 	M 	M 	M" !!(( & $ II
 6!!" !IJJJ(( & $ II @J@@AAA##,666 $ 
 

,01
2
2 	 "",777 # 
 

,01
2
2 	   r   otherc                    t          j                    }| gt          |          z   }d |D             }t          d |D              }t	          || j                  }t          dg id |D                       }t          j                    |z
  |_        t          t          || j        
                                          |          S )a  Concatenate :class:`Datasets <ray.data.Dataset>` across rows.

        The order of the blocks in the datasets is preserved, as is the
        relative ordering between the datasets passed in the argument list.

        .. caution::
            Unioned datasets aren't lineage-serializable. As a result, they can't be
            used as a tunable hyperparameter in Ray Tune.

        Examples:

            >>> import ray
            >>> ds1 = ray.data.range(2)
            >>> ds2 = ray.data.range(3)
            >>> ds1.union(ds2).take_all()
            [{'id': 0}, {'id': 1}, {'id': 0}, {'id': 1}, {'id': 2}]

        Args:
            other: List of datasets to combine with this one. The datasets
                must have the same schema as this dataset, otherwise the
                behavior is undefined.

        Returns:
            A new dataset holding the rows of the input datasets.
        c                 &    g | ]}|j         j        S r  )r   r   )r  union_dss     r   r  z!Dataset.union.<locals>.<listcomp>
  s    OOO(5OOOr   c                     g | ]	}|j         
S r  )r   )r  r   s     r   r  z!Dataset.union.<locals>.<listcomp>
  s    1114dh111r   r   c                 @    g | ]}|j                                         S r  )r   r  )r  ds     r   r  z!Dataset.union.<locals>.<listcomp>
  s"    666AGMMOO666r   r  )r  r  r  UnionLogicalOperatorr,   r   rE   r  r   r@   r   )r   r%  r  datasetslogical_plansr2  r   r  s           r   unionzDataset.unionh
  s    6 &((
6DKK'OOhOOO!11=111
 #2t|44r]66X666
 
 
 ".00:=%!2!2!4!455
 
 	
r   id)partition_size_hintaggregator_ray_remote_argsvalidate_schemas	join_typenum_partitionsonright_onleft_suffixright_suffixr2  r3  r4  c                f   t          |t          t          f          s%t          dt	          |          j         d          |rAt          |t          t          f          s%t          dt	          |          j         d          |p|}|
r?|                                 }|                                }t          j        ||||           | j	        
                                }t          | j        j        |j        j        ||||||||	
  
        }t          |t          || j                            S )a  Join :class:`Datasets <ray.data.Dataset>` on join keys.

        Args:
            ds: Other dataset to join against
            join_type: The kind of join that should be performed, one of ("inner",
                "left_outer", "right_outer", "full_outer", "left_semi", "right_semi",
                "left_anti", "right_anti").
            num_partitions: Total number of "partitions" input sequences will be split
                into with each partition being joined independently. Increasing number
                of partitions allows to reduce individual partition size, hence reducing
                memory requirements when individual partitions are being joined. Note
                that, consequently, this will also be a total number of blocks that will
                be produced as a result of executing join.
            on: The columns from the left operand that will be used as
                keys for the join operation.
            right_on: The columns from the right operand that will be
                used as keys for the join operation. When none, `on` will
                be assumed to be a list of columns to be used for the right dataset
                as well.
            left_suffix: (Optional) Suffix to be appended for columns of the left
                operand.
            right_suffix: (Optional) Suffix to be appended for columns of the right
                operand.
            partition_size_hint: (Optional) Hint to joining operator about the estimated
                avg expected size of the individual partition (in bytes).
                This is used in estimating the total dataset size and allow to tune
                memory requirement of the individual joining workers to prevent OOMs
                when joining very large datasets.
            aggregator_ray_remote_args: (Optional) Parameter overriding `ray.remote`
                args passed when constructing joining (aggregator) workers.
            validate_schemas: (Optional) Controls whether validation of provided
                configuration against input schemas will be performed (defaults to
                false, since obtaining schemas could be prohibitively expensive).

        Returns:
            A :class:`Dataset` that holds rows of input left Dataset joined with the
            right Dataset based on join type and keys.

        Examples:

        .. testcode::
            :skipif: True

            doubles_ds = ray.data.range(4).map(
                lambda row: {"id": row["id"], "double": int(row["id"]) * 2}
            )

            squares_ds = ray.data.range(4).map(
                lambda row: {"id": row["id"], "square": int(row["id"]) ** 2}
            )

            # Inner join example
            joined_ds = doubles_ds.join(
                squares_ds,
                join_type="inner",
                num_partitions=2,
                on=("id",),
            )

            print(sorted(joined_ds.take_all(), key=lambda item: item["id"]))

        .. testoutput::
            :options: +ELLIPSIS, +NORMALIZE_WHITESPACE

            [
                {'id': 0, 'double': 0, 'square': 0},
                {'id': 1, 'double': 2, 'square': 1},
                {'id': 2, 'double': 4, 'square': 4},
                {'id': 3, 'double': 6, 'square': 9}
            ]

        .. testcode::
            :skipif: True

            # Left anti-join example: find rows in doubles_ds that don't match squares_ds
            partial_squares_ds = ray.data.range(2).map(
                lambda row: {"id": row["id"] + 2, "square": int(row["id"]) ** 2}
            )

            anti_joined_ds = doubles_ds.join(
                partial_squares_ds,
                join_type="left_anti",
                num_partitions=2,
                on=("id",),
            )

            print(sorted(anti_joined_ds.take_all(), key=lambda item: item["id"]))

        .. testoutput::
            :options: +ELLIPSIS, +NORMALIZE_WHITESPACE

            [
                {'id': 0, 'double': 0},
                {'id': 1, 'double': 2}
            ]

        .. testcode::
            :skipif: True

            # Left semi-join example: find rows in doubles_ds that have matches in squares_ds
            # (only returns columns from left dataset)
            semi_joined_ds = doubles_ds.join(
                squares_ds,
                join_type="left_semi",
                num_partitions=2,
                on=("id",),
            )

            print(sorted(semi_joined_ds.take_all(), key=lambda item: item["id"]))

        .. testoutput::
            :options: +ELLIPSIS, +NORMALIZE_WHITESPACE

            [
                {'id': 0, 'double': 0},
                {'id': 1, 'double': 2},
                {'id': 2, 'double': 4},
                {'id': 3, 'double': 6}
            ]
        z$Expected tuple or list as `on` (got )z*Expected tuple or list as `right_on` (got )
left_input_opright_input_opleft_key_columnsright_key_columnsr5  r6  left_columns_suffixright_columns_suffixr2  r3  )r   r  r  r   r   r@  r   r3   _validate_schemasr   r   r   r   r   r,   r   )r   r   r5  r6  r7  r8  r9  r:  r2  r3  r4  left_op_schemaright_op_schemar   r2  s                  r   joinzDataset.join
  s@   R "udm,, 	KtBxx7HKKK    	Jx%?? 	WT(^^=TWWW   >r
  	R15N24))++O">?BQQQz  ,0+/&) +!- 3'A
 
 
 t[T\::;;;r   r  rt   c                     ddl m} |6t          |                              |                     d                     ||dk    rt          d           || ||          S )a  Group rows of a :class:`Dataset` according to a column.

        Use this method to transform data based on a
        categorical variable.

        Examples:

            .. testcode::

                import pandas as pd
                import ray

                def normalize_variety(group: pd.DataFrame) -> pd.DataFrame:
                    for feature in group.drop(columns=["variety"]).columns:
                        group[feature] = group[feature] / group[feature].abs().max()
                    return group

                ds = (
                    ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
                    .groupby("variety")
                    .map_groups(normalize_variety, batch_format="pandas")
                )

        Time complexity: O(dataset size * log(dataset size / parallelism))

        Args:
            key: A column name or list of column names.
                If this is ``None``, place all rows in a single group.

            num_partitions: Number of partitions data will be partitioned into (only
                relevant if hash-shuffling strategy is used). When not set defaults
                to `DataContext.min_parallelism`.

        Returns:
            A lazy :class:`~ray.data.grouped_data.GroupedData`.

        .. seealso::

            :meth:`~ray.data.grouped_data.GroupedData.map_groups`
                Call this method to transform groups of data.
        r   rs   NFfetch_if_missingz+`num_partitions` must be a positive integer)r6  )ray.data.grouped_datart   rA   validate_schemar   r   )r   r  r6  rt   s       r   r  zDataset.groupbyF  s~    ` 	655555 ? CLL((e)L)LMMM%.A*=*=JKKK{4^DDDDr   r   ignore_nullsc                 f    |                      t          ||          }|                     |          S )a  List the unique elements in a given column.

        Examples:

            >>> import ray
            >>> ds = ray.data.from_items([1, 2, 3, 2, 3])
            >>> sorted(ds.unique("item"))
            [1, 2, 3]

            This function is very useful for computing labels
            in a machine learning dataset:

            >>> import ray
            >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
            >>> sorted(ds.unique("target"))
            [0, 1, 2]

            One common use case is to convert the class labels
            into integers for training and inference:

            >>> classes = {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'}
            >>> def preprocessor(df, classes):
            ...     df["variety"] = df["target"].map(classes)
            ...     return df
            >>> train_ds = ds.map_batches(
            ...     preprocessor, fn_kwargs={"classes": classes}, batch_format="pandas")
            >>> train_ds.sort("sepal length (cm)").take(1)  # Sort to make it deterministic
            [{'sepal length (cm)': 4.3, ..., 'variety': 'Setosa'}]

        Time complexity: O(dataset size / parallelism)

        Args:
            column: The column to collect unique elements over.
            ignore_nulls: If ``True``, ignore null values in the column.

        Returns:
            A list with unique elements in the given column.
        rL  )_aggregate_onrT   _aggregate_result)r   r   rL  rets       r   uniquezDataset.unique  s3    T   l KK%%c***r   aggsc                      |                      d          j        |                     d          }t          |          dk    r|d         ndS )ar  Aggregate values using one or more functions.

        Use this method to compute metrics like the product of a column.

        Examples:

            .. testcode::

                import ray
                from ray.data.aggregate import AggregateFn

                ds = ray.data.from_items([{"number": i} for i in range(1, 10)])
                aggregation = AggregateFn(
                    init=lambda column: 1,
                    # Apply this to each row to produce a partial aggregate result
                    accumulate_row=lambda a, row: a * row["number"],
                    # Apply this to merge partial aggregate results into a final result
                    merge=lambda a1, a2: a1 * a2,
                    name="prod"
                )
                print(ds.aggregate(aggregation))

            .. testoutput::

                {'prod': 362880}

        Time complexity: O(dataset size / parallelism)

        Args:
            *aggs: :class:`Aggregations <ray.data.aggregate.AggregateFn>` to perform.

        Returns:
            A ``dict`` where each each value is an aggregation for a given column.
        Nr   r   )r  	aggregatere  r  )r   rS  rQ  s      r   rU  zDataset.aggregate  sJ    L +dll4  *D166q99SAs1vv4/r   c                 f    |                      t          ||          }|                     |          S )au  Compute the sum of one or more columns.

        Examples:
            >>> import ray
            >>> ray.data.range(100).sum("id")
            4950
            >>> ray.data.from_items([
            ...     {"A": i, "B": i**2}
            ...     for i in range(100)
            ... ]).sum(["A", "B"])
            {'sum(A)': 4950, 'sum(B)': 328350}

        Args:
            on: a column name or a list of column names to aggregate.
            ignore_nulls: Whether to ignore null values. If ``True``, null
                values are ignored when computing the sum. If ``False``,
                when a null value is encountered, the output is ``None``.
                Ray Data considers ``np.nan``, ``None``, and ``pd.NaT`` to be null
                values. Default is ``True``.

        Returns:
            The sum result.

            For different values of ``on``, the return varies:

            - ``on=None``: a dict containing the column-wise sum of all
              columns,
            - ``on="col"``: a scalar representing the sum of all items in
              column ``"col"``,
            - ``on=["col_1", ..., "col_n"]``: an n-column ``dict``
              containing the column-wise sum of the provided columns.

            If the dataset is empty, all values are null. If ``ignore_nulls`` is
            ``False`` and any value is null, then the output is ``None``.
        rN  )rO  rS   rP  r   r7  rL  rQ  s       r   r=  zDataset.sum  3    R   b| DD%%c***r   c                 f    |                      t          ||          }|                     |          S )am  Return the minimum of one or more columns.

        Examples:
            >>> import ray
            >>> ray.data.range(100).min("id")
            0
            >>> ray.data.from_items([
            ...     {"A": i, "B": i**2}
            ...     for i in range(100)
            ... ]).min(["A", "B"])
            {'min(A)': 0, 'min(B)': 0}

        Args:
            on: a column name or a list of column names to aggregate.
            ignore_nulls: Whether to ignore null values. If ``True``, null
                values are ignored when computing the min; if ``False``,
                when a null value is encountered, the output is ``None``.
                This method considers ``np.nan``, ``None``, and ``pd.NaT`` to be null
                values. Default is ``True``.

        Returns:
            The min result.

            For different values of ``on``, the return varies:

            - ``on=None``: an dict containing the column-wise min of
              all columns,
            - ``on="col"``: a scalar representing the min of all items in
              column ``"col"``,
            - ``on=["col_1", ..., "col_n"]``: an n-column dict
              containing the column-wise min of the provided columns.

            If the dataset is empty, all values are null. If ``ignore_nulls`` is
            ``False`` and any value is null, then the output is ``None``.
        rN  )rO  rQ   rP  rW  s       r   minzDataset.min  rX  r   c                 f    |                      t          ||          }|                     |          S )ar  Return the maximum of one or more columns.

        Examples:
            >>> import ray
            >>> ray.data.range(100).max("id")
            99
            >>> ray.data.from_items([
            ...     {"A": i, "B": i**2}
            ...     for i in range(100)
            ... ]).max(["A", "B"])
            {'max(A)': 99, 'max(B)': 9801}

        Args:
            on: a column name or a list of column names to aggregate.
            ignore_nulls: Whether to ignore null values. If ``True``, null
                values are ignored when computing the max; if ``False``,
                when a null value is encountered, the output is ``None``.
                This method considers ``np.nan``, ``None``, and ``pd.NaT`` to be null
                values. Default is ``True``.

        Returns:
            The max result.

            For different values of ``on``, the return varies:

            - ``on=None``: an dict containing the column-wise max of
              all columns,
            - ``on="col"``: a scalar representing the max of all items in
              column ``"col"``,
            - ``on=["col_1", ..., "col_n"]``: an n-column dict
              containing the column-wise max of the provided columns.

            If the dataset is empty, all values are null. If ``ignore_nulls`` is
            ``False`` and any value is null, then the output is ``None``.
        rN  )rO  rO   rP  rW  s       r   maxzDataset.max2  rX  r   c                 f    |                      t          ||          }|                     |          S )a  Compute the mean of one or more columns.

        Examples:
            >>> import ray
            >>> ray.data.range(100).mean("id")
            49.5
            >>> ray.data.from_items([
            ...     {"A": i, "B": i**2}
            ...     for i in range(100)
            ... ]).mean(["A", "B"])
            {'mean(A)': 49.5, 'mean(B)': 3283.5}

        Args:
            on: a column name or a list of column names to aggregate.
            ignore_nulls: Whether to ignore null values. If ``True``, null
                values are ignored when computing the mean; if ``False``,
                when a null value is encountered, the output is ``None``.
                This method considers ``np.nan``, ``None``, and ``pd.NaT`` to be null
                values. Default is ``True``.

        Returns:
            The mean result.

            For different values of ``on``, the return varies:

            - ``on=None``: an dict containing the column-wise mean of
              all columns,
            - ``on="col"``: a scalar representing the mean of all items in
              column ``"col"``,
            - ``on=["col_1", ..., "col_n"]``: an n-column dict
              containing the column-wise mean of the provided columns.

            If the dataset is empty, all values are null. If ``ignore_nulls`` is
            ``False`` and any value is null, then the output is ``None``.
        rN  )rO  rP   rP  rW  s       r   meanzDataset.mean^  s3    R   r EE%%c***r   r   ddofc                 h    |                      t          |||          }|                     |          S )a  Compute the standard deviation of one or more columns.

        .. note::
            This method uses Welford's online method for an accumulator-style
            computation of the standard deviation. This method has
            numerical stability, and is computable in a single pass. This may give
            different (but more accurate) results than NumPy, Pandas, and sklearn, which
            use a less numerically stable two-pass algorithm.
            To learn more, see
            `the Wikapedia article <https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm>`_.

        Examples:
            >>> import ray
            >>> round(ray.data.range(100).std("id", ddof=0), 5)
            28.86607
            >>> result = ray.data.from_items([
            ...     {"A": i, "B": i**2}
            ...     for i in range(100)
            ... ]).std(["A", "B"])
            >>> [(key, round(value, 10)) for key, value in result.items()]
            [('std(A)', 29.0114919759), ('std(B)', 2968.1748039269)]

        Args:
            on: a column name or a list of column names to aggregate.
            ddof: Delta Degrees of Freedom. The divisor used in calculations
                is ``N - ddof``, where ``N`` represents the number of elements.
            ignore_nulls: Whether to ignore null values. If ``True``, null
                values are ignored when computing the std; if ``False``,
                when a null value is encountered, the output is ``None``.
                This method considers ``np.nan``, ``None``, and ``pd.NaT`` to be null
                values. Default is ``True``.

        Returns:
            The standard deviation result.

            For different values of ``on``, the return varies:

            - ``on=None``: an dict containing the column-wise std of
              all columns,
            - ``on="col"``: a scalar representing the std of all items in
              column ``"col"``,
            - ``on=["col_1", ..., "col_n"]``: an n-column dict
              containing the column-wise std of the provided columns.

            If the dataset is empty, all values are null. If ``ignore_nulls`` is
            ``False`` and any value is null, then the output is ``None``.
        )rL  r_  )rO  rR   rP  )r   r7  r_  rL  rQ  s        r   stdzDataset.std  s5    p   b|$ OO%%c***r   columnsoverride_dtype_agg_mappingru   c                    ddl m}m}m}m}  ||                                 ||          }|j        st          d||nd d           |                     d          j	        |j         }|
                    d          d         }	|                                 j        }
|                                j        } ||	|
||j                  \  }}} ||||
d	
          } ||||
d
          } ||||
t          |                    S )a  Generate a statistical summary of the dataset, organized by data type.

        This method computes various statistics for different column dtypes:

        - For numerical dtypes (int*, float*, decimal, bool): count, mean, min, max, std, approx_quantile (median), missing%, zero%
        - For string and binary dtypes: count, missing%, approx_top_k (top 10 values)
        - For temporal dtypes (timestamp, date, time, duration): count, min, max, missing%
        - For other dtypes: count, missing%, approx_top_k

        You can customize the aggregations performed for specific data types using the
        `override_dtype_agg_mapping` parameter.

        The summary separates statistics into two tables:
        - Schema-matching stats: Statistics that preserve the original column type (e.g., min/max for integers)
        - Schema-changing stats: Statistics that change the type (e.g., mean converts int to float)

        Examples:
            >>> import ray
            >>> ds = ray.data.from_items([
            ...     {"age": 25, "salary": 50000, "name": "Alice", "city": "NYC"},
            ...     {"age": 30, "salary": 60000, "name": None, "city": "LA"},
            ...     {"age": 0, "salary": None, "name": "Bob", "city": None},
            ... ])
            >>> summary = ds.summary()
            >>> # Get combined pandas DataFrame with all statistics
            >>> summary.to_pandas()  # doctest: +SKIP
                      statistic        age                         city                           name        salary
            0  approx_quantile[0]  25.000000                         None                           None  60000.000000
            1      approx_topk[0]        NaN   {'city': 'LA', 'count': 1}    {'count': 1, 'name': 'Bob'}           NaN
            2      approx_topk[1]        NaN  {'city': 'NYC', 'count': 1}  {'count': 1, 'name': 'Alice'}           NaN
            3               count   3.000000                            3                              3      3.000000
            4                 max  30.000000                          NaN                            NaN  60000.000000
            5                mean  18.333333                         None                           None  55000.000000
            6                 min   0.000000                          NaN                            NaN  50000.000000
            7         missing_pct   0.000000                    33.333333                      33.333333     33.333333
            8                 std  13.123346                         None                           None   5000.000000
            9            zero_pct  33.333333                         None                           None      0.000000

            >>> # Access individual column statistics
            >>> summary.get_column_stats("age")  # doctest: +SKIP
            statistic               value
            0   approx_quantile[0]  25.000000
            1       approx_topk[0]        NaN
            2       approx_topk[1]        NaN
            3                count   3.000000
            4                  max  30.000000
            5                 mean  18.333333
            6                  min   0.000000
            7          missing_pct   0.000000
            8                  std  13.123346
            9            zero_pct  33.333333

            Custom aggregations for specific types:

            >>> from ray.data.datatype import DataType
            >>> from ray.data.aggregate import Sum, Count
            >>> # Override aggregations for int64 columns
            >>> custom_mapping = {
            ...     DataType.int64(): lambda col: [Count(on=col), Sum(on=col)]
            ... }
            >>> summary = ds.summary(override_dtype_agg_mapping=custom_mapping)

        Args:
            columns: Optional list of column names to include in the summary.
                If None, all columns will be included.
            override_dtype_agg_mapping: Optional mapping from DataType to factory
                functions. Each factory function takes a column name and returns a
                list of aggregators for that column. This will be merged with the
                default mapping, with user-provided mappings taking precedence.

        Returns:
            A DatasetSummary object with methods to access statistics and the
            original dataset schema. Use `to_pandas()` to get all statistics
            as a DataFrame, or `get_column_stats(col)` for a specific column
        r   )ru   _build_summary_table_dtype_aggregators_for_dataset_parse_summary_stats)rb  dtype_agg_mappingzPsummary() requires at least one column with a supported type. Columns provided: Nr  z. Check that the specified columns exist and have supported types (numeric, string, binary, or temporal). Columns with None or object types are skipped.r   T)preserve_typesF)_stats_matching_column_dtype_stats_mismatching_column_dtypedataset_schemarb  )ray.data.statsru   re  rf  rg  r   aggregatorsr   r  rU  re  base_schemar  )r   rb  rc  ru   re  rf  rg  
dtype_aggsaggs_dataset
agg_resultoriginal_schema
agg_schemaschema_matching_statsschema_changing_statsall_columnsschema_matching_tableschema_changing_tables                    r   summaryzDataset.summary  s   j	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 	
 43KKMM8
 
 

 % 	,070CWW, , ,   4t||D))3Z5KL!&&q))!,
 ++--3!((**6

 ! Z5K
 
		
!! !5 4!;PT!
 !
 !
 !5 4!;PU!
 !
 !
 ~)>,A*%%	
 
 
 	
r   
descending
boundariesc                     |t          d          t          |||          }| j                                        }t	          | j        j        |          }t          || j                  }t          ||          S )av  Sort the dataset by the specified key column or key function.
        The `key` parameter must be specified (i.e., it cannot be `None`).

        .. note::
            If provided, the `boundaries` parameter can only be used to partition
            the first sort key.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(15)
            >>> ds = ds.sort("id", descending=False, boundaries=[5, 10])
            >>> for df in ray.get(ds.to_pandas_refs()):
            ...     print(df)
               id
            0   0
            1   1
            2   2
            3   3
            4   4
               id
            0   5
            1   6
            2   7
            3   8
            4   9
               id
            0  10
            1  11
            2  12
            3  13
            4  14

        Time complexity: O(dataset size * log(dataset size / parallelism))

        Args:
            key: The column or a list of columns to sort by.
            descending: Whether to sort in descending order. Must be a boolean or a list
                of booleans matching the number of the columns.
            boundaries: The list of values based on which to repartition the dataset.
                For example, if the input boundary is [10,20], rows with values less
                than 10 will be divided into the first block, rows with values greater
                than or equal to 10 and less than 20 will be divided into the
                second block, and rows with values greater than or equal to 20
                will be divided into the third block. If not provided, the
                boundaries will be sampled from the input blocks. This feature
                only supports numeric columns right now.

        Returns:
            A new, sorted :class:`Dataset`.

        Raises:
            ``ValueError``: if the sort key is None.
        Nz/The 'key' parameter cannot be None for sorting.)sort_key)
r   rA   r   r   r0   r   r   r,   r   r   )r   r  r{  r|  r~  r   r2  r   s           r   rL  zDataset.sortN  s~    z ;NOOO3
J77z  "
 
 
 #2t|44t\***r   c                     | j                                         }t          | j        j        gd |D             R  }t          || j                  }t          ||          S )a  Zip the columns of this dataset with the columns of another.

        The datasets must have the same number of rows. Their column sets are
        merged, and any duplicate column names are disambiguated with suffixes like
        ``"_1"``.

        .. note::
            The smaller of the two datasets is repartitioned to align the number
            of rows per block with the larger dataset.

        .. note::
            Zipped datasets aren't lineage-serializable. As a result, they can't be used
            as a tunable hyperparameter in Ray Tune.

        Examples:
            >>> import ray
            >>> ds1 = ray.data.range(5)
            >>> ds2 = ray.data.range(5)
            >>> ds3 = ray.data.range(5)
            >>> ds1.zip(ds2, ds3).take_batch()
            {'id': array([0, 1, 2, 3, 4]), 'id_1': array([0, 1, 2, 3, 4]), 'id_2': array([0, 1, 2, 3, 4])}

        Args:
            *other: List of datasets to combine with this one. The datasets
                must have the same row count as this dataset, otherwise the
                ValueError is raised.

        Returns:
            A :class:`Dataset` containing the columns of the second dataset
            concatenated horizontally with the columns of the first dataset,
            with duplicate column names disambiguated with suffixes like ``"_1"``.

        Raises:
            ValueError: If the datasets have different row counts.
        c                 &    g | ]}|j         j        S r  )r   r   )r  r%  s     r   r  zDataset.zip.<locals>.<listcomp>  s    *V*V*Vu5+>+B*V*V*Vr   )r   r   r:   r   r   r,   r   r   )r   r%  r   r2  r   s        r   r-  zDataset.zip  sb    J z  #'W*V*VPU*V*V*VWWW"2t|44t\***r   limitc                     | j                                         }t          | j        j        |          }t          || j                  }t          ||          S )a5  Truncate the dataset to the first ``limit`` rows.

        Unlike :meth:`~Dataset.take`, this method doesn't move data to the caller's
        machine. Instead, it returns a new :class:`Dataset` pointing to the truncated
        distributed data.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(1000)
            >>> ds.limit(5).count()
            5

        Time complexity: O(limit specified)

        Args:
            limit: The size of the dataset to truncate to.

        Returns:
            The truncated dataset.
        )r  )r   r   r;   r   r   r,   r   r   )r   r  r   r2  r   s        r   r  zDataset.limit  sN    , z  4%)777"2t|44t\***r      )r   c          	      ^   t          |          }|                     |          }	 t          t          |                    |d|                              }n# t
          $ r t          d          w xY w|                                  |j        	                                | j        _
        |S )a  Return up to ``batch_size`` rows from the :class:`Dataset` in a batch.

        Ray Data represents batches as NumPy arrays or pandas DataFrames. You can
        configure the batch type by specifying ``batch_format``.

        This method is useful for inspecting inputs to :meth:`~Dataset.map_batches`.

        .. warning::

            :meth:`~Dataset.take_batch` moves up to ``batch_size`` rows to the caller's
            machine. If ``batch_size`` is large, this method can cause an `
            ``OutOfMemory`` error on the caller.

        Examples:

            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.take_batch(5)
            {'id': array([0, 1, 2, 3, 4])}

        Time complexity: O(batch_size specified)

        Args:
            batch_size: The maximum number of rows to return.
            batch_format: If ``"default"`` or ``"numpy"``, batches are
                ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
                ``pandas.DataFrame``.

        Returns:
            A batch of up to ``batch_size`` rows from the dataset.

        Raises:
            ``ValueError``: if the dataset is empty.
        r   )r   prefetch_batchesr   zThe dataset is empty.)r]   r  nextiteriter_batchesStopIterationr   _synchronize_progress_barr   r  _snapshot_stats)r   r   r   
limited_dsress        r   
take_batchzDataset.take_batch  s    N +<88ZZ
++
	6++#-)*%1 ,    CC  	6 	6 	64555	6&&((( &0%5%;%;%=%=
"
s   2A A3c                    t           j                            d          rt                              d           g }|                     |          }|                                D ],}|                    |           t          |          |k    r n-| 	                                 |j
                                        | j
        _        |S )a$  Return up to ``limit`` rows from the :class:`Dataset`.

        This method is useful for inspecting data.

        .. warning::

            :meth:`~Dataset.take` moves up to ``limit`` rows to the caller's machine. If
            ``limit`` is large, this method can cause an ``OutOfMemory`` error on the
            caller.

        Examples:

            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.take(3)
            [{'id': 0}, {'id': 1}, {'id': 2}]

        Time complexity: O(limit specified)

        Args:
            limit: The maximum number of rows to return.

        Returns:
            A list of up to ``limit`` rows from the dataset.

        .. seealso::

            :meth:`~Dataset.take_all`
                Call this method to return all rows.
        dataset_takezgTip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.)r  utillog_onceloggerinfor  	iter_rowsr  r  r  r   r  r  )r   r  outputr  r  s        r   re  zDataset.take  s    B 8^,, 	KK;   ZZ&&
'')) 	 	CMM#6{{e## $&&((( &0%5%;%;%=%=
"r   c                     g }|                                  D ]?}|                    |           |&t          |          |k    rt          d| d          @|                                  |S )aT  Return all of the rows in this :class:`Dataset`.

        This method is useful for inspecting small datasets.

        .. warning::

            :meth:`~Dataset.take_all` moves the entire dataset to the caller's
            machine. If the dataset is large, this method can cause an
            ``OutOfMemory`` error on the caller.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(5)
            >>> ds.take_all()
            [{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]

        Time complexity: O(dataset size)

        Args:
            limit: Raise an error if the size exceeds the specified limit.

        Returns:
            A list of all the rows in the dataset.

        .. seealso::

            :meth:`~Dataset.take`
                Call this method to return a specific number of rows.
        Nz-The dataset has more than the given limit of z	 records.)r  r  r  r   r  )r   r  r  r  s       r   take_allzDataset.take_allJ  s    @ >>## 	 	CMM# S[[5%8%8 TETTT   	&&(((r   c                 T    |                      |          D ]}t          |           dS )a)  Print up to the given number of rows from the :class:`Dataset`.

        This method is useful for inspecting data.

        Examples:

            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.show(3)
            {'id': 0}
            {'id': 1}
            {'id': 2}

        Time complexity: O(limit specified)

        Args:
            limit: The maximum number of row to print.

        .. seealso::

            :meth:`~Dataset.take`
                Call this method to get (not print) a given number of rows.
        N)re  print)r   r  r  s      r   showzDataset.showt  s6    4 99U## 	 	C#JJJJ	 	r   z	row countz	Examples:)if_more_than_readdatasource_metadatapatternc                 *   | j                                         dk    rdS |                                 }||S | j                                         }t	          t          | j        j        g                     }t          || j	                  }t          ||          }d}|                    d          D ]O}t          j        |v sJ dt          j         d            ||t          j                                                 z  }Pt          |          S )a  Count the number of rows in the dataset.

        For Datasets which only read Parquet files (created with
        :meth:`~ray.data.read_parquet`), this method reads the file metadata to
        efficiently count the number of rows without reading in the entire data.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> ds.count()
            10

        Returns:
            The number of records in the dataset.
        r   N)r   )r   zIOutputs from the 'Count' logical operator should contain a column named '')r   rj  _meta_countr   r1   r8   r   r   r,   r   r   r  COLUMN_NAMEr=  r   )r   
meta_countr   count_opr   count_dsr  r   s           r   r  zDataset.count  s   . :((**a//1 %%''
!z   !3!7rBBBCC"8T\::4..**d*;; 	4 	4E$---/+/ / / .-- U5,-11333EE 5zzr   r   z-or if ``fetch_if_missing=True`` (the default)zTime complexity:)r  r  extra_conditionr  rI  Schemac                 0   | j         j        }| j                             d          }|t          ||          S |                     d          j                             |          }|+| j                             |           t          ||          S dS )av  Return the schema of the dataset.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> ds.schema()
            Column  Type
            ------  ----
            id      int64

        Time complexity: O(1)

        Args:
            fetch_if_missing: If True, synchronously fetch the schema if it's
                not known. If False, None is returned if the schema is not known.
                Default is True.

        Returns:
            The :class:`ray.data.Schema` class of the records, or None if the
            schema is not known and fetch_if_missing is False.
        FrH  Ndata_contextr   )r   _contextr   r  r  cache_schema)r   rI  r   ro  s       r   r   zDataset.schema  s    < *% j'''??"+G<<<<
 jjmm)00BR0SS"J##K000+G<<<<4r   c                 D    |                      |          }||j        S dS )a  Returns the columns of this Dataset.

        Time complexity: O(1)

        Example:
            >>> import ray
            >>> # Create dataset from synthetic data.
            >>> ds = ray.data.range(1000)
            >>> ds.columns()
            ['id']

        Args:
            fetch_if_missing: If True, synchronously fetch the column names from the
                schema if it's not known. If False, None is returned if the schema is
                not known. Default is True.

        Returns:
            A list of the column names for this Dataset or None if schema is not known
            and `fetch_if_missing` is False.

        rH  N)r   r  )r   rI  r   s      r   rb  zDataset.columns  s+    : .>??<tr   c                      t          d          )a  Return the number of blocks of this :class:`Dataset`.

        This method is only implemented for :class:`~ray.data.MaterializedDataset`,
        since the number of blocks may dynamically change during execution.
        For instance, during read and transform operations, Ray Data may dynamically
        adjust the number of blocks to respect memory limits, increasing the
        number of blocks at runtime.

        Returns:
            The number of blocks of this :class:`Dataset`.
        zNumber of blocks is only available for `MaterializedDataset`,because the number of blocks may dynamically change during execution.Call `ds.materialize()` to get a `MaterializedDataset`.)NotImplementedErrorr   s    r   rM  zDataset.num_blocks  s     "F
 
 	
r   c                    | j         j                                        j        #| j         j                                        j        S | j                                        j        }|r|d         j        dS t          d |D                       S )a9  Return the in-memory size of the dataset.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> ds.size_bytes()
            80

        Returns:
            The in-memory size of the dataset in bytes, or None if the
            in-memory size is not known.
        Nr   c              3   $   K   | ]}|j         V  d S r   )
size_bytes)r  r  s     r   r  z%Dataset.size_bytes.<locals>.<genexpr>=  s$      22A1<222222r   )r   r   infer_metadatar  r   r  r  r=  )r   r  s     r   r  zDataset.size_bytes'  s      !0022=I%)88::EE:%%''0 	8A;19422222222r   c                 h    t          t          | j                                                            S )a  Return the list of input files for the dataset.

        Examples:
            >>> import ray
            >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
            >>> ds.input_files()
            ['ray-example-data/iris.csv']

        Returns:
            The list of input files used to create the dataset, or an empty
            list if the input files is not known.
        )r  r  r   input_filesr   s    r   r  zDataset.input_files?  s(     C
..0011222r   )partition_cols
filesystemtry_create_dirarrow_open_stream_argsfilename_providerarrow_parquet_args_fnmin_rows_per_filemax_rows_per_filer   r   num_rows_per_filemodepathr  r  zpyarrow.fs.FileSystemr  r  r  r  r  r  r   r  r  c                    |d }t          |||	          \  }}t          ||||||||||| j        |          }|                     ||
|           dS )ak  Writes the :class:`~ray.data.Dataset` to parquet files under the provided ``path``.

        The number of files is determined by the number of blocks in the dataset.
        To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.

        If pyarrow can't represent your data, this method errors.

        By default, the format of the output files is ``{uuid}_{block_idx}.parquet``,
        where ``uuid`` is a unique id for the dataset. To modify this behavior,
        implement a custom :class:`~ray.data.datasource.FilenameProvider` and pass it in
        as the ``filename_provider`` argument.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.write_parquet("local:///tmp/data/")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where
                parquet files are written to.
            partition_cols: Column names by which to partition the dataset.
                Files are writted in Hive partition style.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in the
                destination path. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like. The filename is expected to be templatized with `{i}`
                to ensure unique filenames when writing multiple files. If it's not
                templatized, Ray Data will add `{i}` to the filename to ensure
                compatibility with the pyarrow `write_dataset <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_dataset.html>`_.
            arrow_parquet_args_fn: Callable that returns a dictionary of write
                arguments that are provided to `pyarrow.parquet.ParquetWriter() <https:/                    /arrow.apache.org/docs/python/generated/                        pyarrow.parquet.ParquetWriter.html>`_
                when writing each block to a file. Overrides
                any duplicate keys from ``arrow_parquet_args``. If `row_group_size` is
                provided, it will be passed to
                `pyarrow.parquet.ParquetWriter.write_table() <https:/                    /arrow.apache.org/docs/python/generated/pyarrow                        .parquet.ParquetWriter.html                        #pyarrow.parquet.ParquetWriter.write_table>`_. Use this argument
                instead of ``arrow_parquet_args`` if any of your write arguments
                can't pickled, or if you'd like to lazily resolve the write
                arguments for each dataset block.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            max_rows_per_file: [Experimental] The target maximum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is smaller than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file. If both ``min_rows_per_file``
                and ``max_rows_per_file`` are specified, ``max_rows_per_file`` takes
                precedence when they cannot both be satisfied.
            ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
            arrow_parquet_args: Options to pass to
                `pyarrow.parquet.ParquetWriter() <https:/                    /arrow.apache.org/docs/python/generated/                        pyarrow.parquet.ParquetWriter.html>`_, which is used to write
                out each block to a file. See `arrow_parquet_args_fn` for more detail.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        Nc                      i S r   r  r  r   r   r  z'Dataset.write_parquet.<locals>.<lambda>  s    B r   )r  r  r  )r  r  arrow_parquet_argsr  r  r  r  open_stream_argsr  dataset_uuidr  r   r   )rJ   r!   rq  write_datasink)r   r  r  r  r  r  r  r  r  r  r   r   r  r  r  effective_min_rowseffective_max_rowsdatasinks                     r   write_parquetzDataset.write_parquetP  s    ^ !($.J!1M///2
 2
 2
.. #)"7100!)3/
 
 
 	+# 	 	
 	
 	
 	
 	
r   )
r  r  r  r  pandas_json_args_fnr  r   r   r  r  r  c       
             |d }t          |
|          \  }}t          ||||||||| j        |
  
        }|                     |||	           dS )a#  Writes the :class:`~ray.data.Dataset` to JSON and JSONL files.

        The number of files is determined by the number of blocks in the dataset.
        To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.

        This method is only supported for datasets with records that are convertible to
        pandas dataframes.

        By default, the format of the output files is ``{uuid}_{block_idx}.json``,
        where ``uuid`` is a unique id for the dataset. To modify this behavior,
        implement a custom :class:`~ray.data.datasource.FilenameProvider` and pass it in
        as the ``filename_provider`` argument.

        Examples:
            Write the dataset as JSON file to a local directory.

            >>> import ray
            >>> import pandas as pd
            >>> ds = ray.data.from_pandas([pd.DataFrame({"one": [1], "two": ["a"]})])
            >>> ds.write_json("local:///tmp/data")

            Write the dataset as JSONL files to a local directory.

            >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/train.jsonl")
            >>> ds.write_json("local:///tmp/data")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where
                the JSON files are written to.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in the
                destination path. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            pandas_json_args_fn: Callable that returns a dictionary of write
                arguments that are provided to
                `pandas.DataFrame.to_json() <https://pandas.pydata.org/docs/reference/                    api/pandas.DataFrame.to_json.html>`_
                when writing each block to a file. Overrides
                any duplicate keys from ``pandas_json_args``. Use this parameter
                instead of ``pandas_json_args`` if any of your write arguments
                can't be pickled, or if you'd like to lazily resolve the write
                arguments for each dataset block.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: Deprecated. Use ``min_rows_per_file`` instead.
            pandas_json_args: These args are passed to
                `pandas.DataFrame.to_json() <https://pandas.pydata.org/docs/reference/                    api/pandas.DataFrame.to_json.html>`_,
                which is used under the hood to write out each
                :class:`~ray.data.Dataset` block. These
                are dict(orient="records", lines=True) by default.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        Nc                      i S r   r  r  r   r   r  z$Dataset.write_json.<locals>.<lambda>B  s    " r   r  r  )	r  pandas_json_argsr  r  r  r  r  r  r  r  )rJ   r   rq  r  )r   r  r  r  r  r  r  r  r   r   r  r  r  r  _r  s                   r   
write_jsonzDataset.write_json  s    J &",* </CT!
 !
 !
A   3-0!)3/
 
 
 	+# 	 	
 	
 	
 	
 	
r   table_identifiercatalog_kwargssnapshot_propertiesrb   overwrite_filterrv   upsert_kwargsoverwrite_kwargsc
           	      b    t          |||||||          }
|                     |
||	           dS )a  Writes the :class:`~ray.data.Dataset` to an Iceberg table.

        .. tip::
            For more details on PyIceberg, see
            - URI: https://py.iceberg.apache.org/

        Examples:
             .. testcode::
                :skipif: True

                import ray
                import pandas as pd
                from ray.data import SaveMode
                from ray.data.expressions import col

                # Basic append (default behavior)
                docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)]
                ds = ray.data.from_pandas(pd.DataFrame(docs))
                ds.write_iceberg(
                    table_identifier="db_name.table_name",
                    catalog_kwargs={"name": "default", "type": "sql"}
                )

                # Schema evolution is automatic - new columns are added automatically
                enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)]
                ds_enriched = ray.data.from_pandas(pd.DataFrame(enriched_docs))
                ds_enriched.write_iceberg(
                    table_identifier="db_name.table_name",
                    catalog_kwargs={"name": "default", "type": "sql"}
                )
                 # Upsert mode - update existing rows or insert new ones
                updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}]
                ds_updates = ray.data.from_pandas(pd.DataFrame(updated_docs))
                ds_updates.write_iceberg(
                    table_identifier="db_name.table_name",
                    catalog_kwargs={"name": "default", "type": "sql"},
                    mode=SaveMode.UPSERT,
                    upsert_kwargs={"join_cols": ["id"]},
                )

                # Partial overwrite with Ray Data expressions
                ds.write_iceberg(
                    table_identifier="events.user_activity",
                    catalog_kwargs={"name": "default", "type": "rest"},
                    mode=SaveMode.OVERWRITE,
                    overwrite_filter=col("date") >= "2024-10-28"
                )

        Args:
            table_identifier: Fully qualified table identifier (``db_name.table_name``)
            catalog_kwargs: Optional arguments to pass to PyIceberg's catalog.load_catalog()
                function (such as name, type, etc.). For the function definition, see
                `pyiceberg catalog
                <https://py.iceberg.apache.org/reference/pyiceberg/catalog/                #pyiceberg.catalog.load_catalog>`_.
            snapshot_properties: Custom properties to write to snapshot when committing
                to an iceberg table.
            mode: Write mode using SaveMode enum. Options:

                * SaveMode.APPEND (default): Add new data to the table without checking for duplicates.
                * SaveMode.UPSERT: Update existing rows that match on the join condition (``join_cols`` in ``upsert_kwargs``),
                  or insert new rows if they don't exist in the table.
                * SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace
                  data matching overwrite_filter if specified.

            overwrite_filter: Optional filter for OVERWRITE mode to perform partial overwrites.
                Must be a Ray Data expression from `ray.data.expressions`. Only rows matching
                this filter are replaced. If None with OVERWRITE mode, replaces all table data.
                Example: `col("date") >= "2024-01-01"` or `(col("region") == "US") & (col("status") == "active")`
            upsert_kwargs: Optional arguments for upsert operations.
                Supported parameters: join_cols (List[str]), case_sensitive (bool), branch (str).
                Note: Ray Data uses a copy-on-write strategy that always updates all columns
                for matched keys and inserts all new keys for optimal parallelism.
            overwrite_kwargs: Optional arguments to pass through to PyIceberg's table.overwrite() method.
                Supported parameters: case_sensitive (bool), branch (str). See PyIceberg documentation
                for details.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.

        Note:
            Schema evolution is automatically enabled. New columns in the incoming data
            are automatically added to the table schema. The schema is extracted
            automatically from the data being written.
        )r  r  r  r  r  r  r  r  N)r   r  )r   r  r  r  r  r  r  r  r   r   r  s              r   write_icebergzDataset.write_icebergZ  s`    J #-) 3-'-
 
 
 	+# 	 	
 	
 	
 	
 	
r   png)r  r  r  r  r   r   r  file_formatc                p    t          |||||||| j        |
	  	        }|                     |||	           dS )a
  Writes the :class:`~ray.data.Dataset` to images.

        Examples:
            >>> import ray
            >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
            >>> ds.write_images("local:///tmp/images", column="image")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where
                the images are written to.
            column: The column containing the data you want to write to images.
            file_format: The image file format to write with. For available options,
                see `Image file formats <https://pillow.readthedocs.io/en/latest                /handbook/image-file-formats.html>`_.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in the
                destination path. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        )r  r  r  r  r  r  r  N)r   rq  r  )r   r  r   r  r  r  r  r  r   r   r  r  s               r   write_imageszDataset.write_images  sh    z !!)3/

 

 

 	+# 	 	
 	
 	
 	
 	
r   )
r  r  r  r  arrow_csv_args_fnr  r   r   r  r  r  c       
             |d }t          |
|          \  }}t          ||||||||| j        |
  
        }|                     |||	           dS )a3  Writes the :class:`~ray.data.Dataset` to CSV files.

        The number of files is determined by the number of blocks in the dataset.
        To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.

        This method is only supported for datasets with records that are convertible to
        pyarrow tables.

        By default, the format of the output files is ``{uuid}_{block_idx}.csv``,
        where ``uuid`` is a unique id for the dataset. To modify this behavior,
        implement a custom :class:`~ray.data.datasource.FilenameProvider`
        and pass it in as the ``filename_provider`` argument.


        Examples:
            Write the dataset as CSV files to a local directory.

            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.write_csv("local:///tmp/data")

            Write the dataset as CSV files to S3.

            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.write_csv("s3://bucket/folder/)  # doctest: +SKIP

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where
                the CSV files are written to.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in the
                destination path if ``True``. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            arrow_csv_args_fn: Callable that returns a dictionary of write
                arguments that are provided to `pyarrow.write.write_csv <https://                arrow.apache.org/docs/python/generated/                pyarrow.csv.write_csv.html#pyarrow.csv.write_csv>`_ when writing each
                block to a file. Overrides any duplicate keys from ``arrow_csv_args``.
                Use this argument instead of ``arrow_csv_args`` if any of your write
                arguments cannot be pickled, or if you'd like to lazily resolve the
                write arguments for each dataset block.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
            arrow_csv_args: Options to pass to `pyarrow.write.write_csv <https://                arrow.apache.org/docs/python/generated/pyarrow.csv.write_csv.html                    #pyarrow.csv.write_csv>`_
                when writing each block to a file.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        Nc                      i S r   r  r  r   r   r  z#Dataset.write_csv.<locals>.<lambda>  s     r   r  )	r  arrow_csv_argsr  r  r  r  r  r  r  r  )rJ   r   rq  r  )r   r  r  r  r  r  r  r  r   r   r  r  r  r  r  r  s                   r   	write_csvzDataset.write_csv  s    F $ *
 </CT!
 !
 !
A /)0!)3/
 
 
 	+# 	 	
 	
 	
 	
 	
r   )
	tf_schemar  r  r  r  r  r   r   r  r  r  zschema_pb2.Schemac       
             t          |
|          \  }}t          |||||||| j        |	  	        }|                     |||	           dS )a  Write the :class:`~ray.data.Dataset` to TFRecord files.

        The `TFRecord <https://www.tensorflow.org/tutorials/load_data/tfrecord>`_
        files contain
        `tf.train.Example <https://www.tensorflow.org/api_docs/python/tf/train/            Example>`_
        records, with one Example record for each row in the dataset.

        .. warning::
            tf.train.Feature only natively stores ints, floats, and bytes,
            so this function only supports datasets with these data types,
            and will error if the dataset contains unsupported types.

        The number of files is determined by the number of blocks in the dataset.
        To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.

        This method is only supported for datasets with records that are convertible to
        pyarrow tables.

        By default, the format of the output files is ``{uuid}_{block_idx}.tfrecords``,
        where ``uuid`` is a unique id for the dataset. To modify this behavior,
        implement a custom :class:`~ray.data.datasource.FilenameProvider`
        and pass it in as the ``filename_provider`` argument.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.write_tfrecords("local:///tmp/data/")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where tfrecords
                files are written to.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in the
                destination path. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        r  )	r  r  r  r  r  r  r  r  r  r  N)rJ   r#   rq  r  )r   r  r  r  r  r  r  r  r   r   r  r  r  r  r  s                  r   write_tfrecordszDataset.write_tfrecords  s    p !=/CT!
 !
 !
A $0!)3/

 

 

 	+# 	 	
 	
 	
 	
 	
r   )
r  r  r  r  r  r   encoderr   r  r  r  c       
             t          |
|          \  }}t          |||||||| j        |	  	        }|                     |||	           dS )a]  Writes the dataset to `WebDataset <https://github.com/webdataset/webdataset>`_ files.

        The `TFRecord <https://www.tensorflow.org/tutorials/load_data/tfrecord>`_
        files will contain
        `tf.train.Example <https://www.tensorflow.org/api_docs/python/tf/train/Example>`_ # noqa: E501
        records, with one Example record for each row in the dataset.

        .. warning::
            tf.train.Feature only natively stores ints, floats, and bytes,
            so this function only supports datasets with these data types,
            and will error if the dataset contains unsupported types.

        This is only supported for datasets convertible to Arrow records.
        To control the number of files, use :meth:`Dataset.repartition`.

        Unless a custom filename provider is given, the format of the output
        files is ``{uuid}_{block_idx}.tfrecords``, where ``uuid`` is a unique id
        for the dataset.

        Examples:

            .. testcode::
                :skipif: True

                import ray

                ds = ray.data.range(100)
                ds.write_webdataset("s3://bucket/folder/")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where tfrecords
                files are written to.
            filesystem: The filesystem implementation to write to.
            try_create_dir: If ``True``, attempts to create all
                directories in the destination path. Does nothing if all directories
                already exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                ``pyarrow.fs.FileSystem.open_output_stream``
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        r  )r  r  r  r  r  r  r  r  r  N)rJ   r$   rq  r  )r   r  r  r  r  r  r  r   r  r   r  r  r  r  r  s                  r   write_webdatasetzDataset.write_webdataset  s    Z !=/CT!
 !
 !
A &0!)3/

 

 

 	+# 	 	
 	
 	
 	
 	
r   )	r  r  r  r  r  r   r   r  r  c       
             t          |
|          \  }}t          |||||||| j        |	  	        }|                     |||	           dS )a8  Writes a column of the :class:`~ray.data.Dataset` to .npy files.

        This is only supported for columns in the datasets that can be converted to
        NumPy arrays.

        The number of files is determined by the number of blocks in the dataset.
        To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.


        By default, the format of the output files is ``{uuid}_{block_idx}.npy``,
        where ``uuid`` is a unique id for the dataset. To modify this behavior,
        implement a custom :class:`~ray.data.datasource.FilenameProvider`
        and pass it in as the ``filename_provider`` argument.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100)
            >>> ds.write_numpy("local:///tmp/data/", column="id")

        Time complexity: O(dataset size / parallelism)

        Args:
            path: The path to the destination root directory, where
                the npy files are written to.
            column: The name of the column that contains the data to
                be written.
            filesystem: The pyarrow filesystem implementation to write to.
                These filesystems are specified in the
                `pyarrow docs <https://arrow.apache.org/docs                /python/api/filesystems.html#filesystem-implementations>`_.
                Specify this if you need to provide specific configurations to the
                filesystem. By default, the filesystem is automatically selected based
                on the scheme of the paths. For example, if the path begins with
                ``s3://``, the ``S3FileSystem`` is used.
            try_create_dir: If ``True``, attempts to create all directories in
                destination path. Does nothing if all directories already
                exist. Defaults to ``True``.
            arrow_open_stream_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_output_stream <https://arrow.apache.org                /docs/python/generated/pyarrow.fs.FileSystem.html                #pyarrow.fs.FileSystem.open_output_stream>`_, which is used when
                opening the file to write to.
            filename_provider: A :class:`~ray.data.datasource.FilenameProvider`
                implementation. Use this parameter to customize what your filenames
                look like.
            min_rows_per_file: [Experimental] The target minimum number of rows to write
                to each file. If ``None``, Ray Data writes a system-chosen number of
                rows to each file. If the number of rows per block is larger than the
                specified value, Ray Data writes the number of rows per block to each file.
                The specified value is a hint, not a strict limit. Ray Data
                might write more or fewer rows to each file.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
            num_rows_per_file: [Deprecated] Use min_rows_per_file instead.
            mode: Determines how to handle existing files. Valid modes are "overwrite", "error",
                "ignore", "append". Defaults to "append".
                NOTE: This method isn't atomic. "Overwrite" first deletes all the data
                before writing to `path`.
        r  )r  r  r  r  r  r  r  r  N)rJ   r    rq  r  )r   r  r   r  r  r  r  r  r   r   r  r  r  r  r  s                  r   write_numpyzDataset.write_numpyh  s    ` !=/CT!
 !
 !
A !0!)3/

 

 

 	+# 	 	
 	
 	
 	
 	
r   sqlconnection_factoryc                 X    t          ||          }|                     |||           dS )a  Write to a database that provides a
        `Python DB API2-compliant <https://peps.python.org/pep-0249/>`_ connector.

        .. note::

            This method writes data in parallel using the DB API2 ``executemany``
            method. To learn more about this method, see
            `PEP 249 <https://peps.python.org/pep-0249/#executemany>`_.

        Examples:

            .. testcode::

                import sqlite3
                import ray

                connection = sqlite3.connect("example.db")
                connection.cursor().execute("CREATE TABLE movie(title, year, score)")
                dataset = ray.data.from_items([
                    {"title": "Monty Python and the Holy Grail", "year": 1975, "score": 8.2},
                    {"title": "And Now for Something Completely Different", "year": 1971, "score": 7.5}
                ])

                dataset.write_sql(
                    "INSERT INTO movie VALUES(?, ?, ?)", lambda: sqlite3.connect("example.db")
                )

                result = connection.cursor().execute("SELECT * FROM movie ORDER BY year")
                print(result.fetchall())

            .. testoutput::

                [('And Now for Something Completely Different', 1971, 7.5), ('Monty Python and the Holy Grail', 1975, 8.2)]

            .. testcode::
                :hide:

                import os
                os.remove("example.db")

        Arguments:
            sql: An ``INSERT INTO`` statement that specifies the table to write to. The
                number of parameters must match the number of columns in the table.
            connection_factory: A function that takes no arguments and returns a
                Python DB API2
                `Connection object <https://peps.python.org/pep-0249/#connection-objects>`_.
            ray_remote_args: Keyword arguments passed to :func:`ray.remote` in the
                write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
        )r  r  r  N)r"   r  )r   r  r  r   r   r  s         r   	write_sqlzDataset.write_sql  sH    z 3;MNNN+# 	 	
 	
 	
 	
 	
r   r  tableconnection_parametersc                $  
 ddl 

fd}|                                 j        }d                    d |D                       }d                    dgt	          |          z            }d| d| d	| d
}	|                     |	|||           dS )aS  Write this ``Dataset`` to a Snowflake table.

        Examples:

            .. testcode::
                :skipif: True

                import ray

                connection_parameters = dict(
                    user=...,
                    account="ABCDEFG-ABC12345",
                    password=...,
                    database="SNOWFLAKE_SAMPLE_DATA",
                    schema="TPCDS_SF100TCL"
                )
                ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
                ds.write_snowflake("MY_DATABASE.MY_SCHEMA.IRIS", connection_parameters)

        Args:
            table: The name of the table to write to.
            connection_parameters: Keyword arguments to pass to
                ``snowflake.connector.connect``. To view supported parameters, read
                https://docs.snowflake.com/developer-guide/python-connector/python-connector-api#functions.
            ray_remote_args: Keyword arguments passed to :func:`ray.remote` in the
                write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
        r   Nc                  (     j         j        di  S )Nr  )	connectorconnect)r  	snowflakes   r   snowflake_connection_factoryz=Dataset.write_snowflake.<locals>.snowflake_connection_factory;  s     .9&.GG1FGGGr   z, c              3   "   K   | ]
}d | d V  dS )"Nr  r
  s     r   r  z*Dataset.write_snowflake.<locals>.<genexpr>B  s*      CCs
C


CCCCCCr   z%szINSERT INTO z (z
) VALUES (r<  )r  r   r   )snowflake.connectorr   r  rF  r  r  )r   r  r  r   r   r  r  columns_strplaceholdersr  r  s     `       @r   write_snowflakezDataset.write_snowflake  s    P 	#"""	H 	H 	H 	H 	H 	H {{}}* iiCClCCCCCyy$#l*;*;!;<<LULLkLL\LLL;+#	 	 	
 	
 	
 	
 	
r   uridatabase
collectionc                 Z    t          |||          }|                     |||           dS )a	  Writes the :class:`~ray.data.Dataset` to a MongoDB database.

        This method is only supported for datasets convertible to pyarrow tables.

        The number of parallel writes is determined by the number of blocks in the
        dataset. To control the number of number of blocks, call
        :meth:`~ray.data.Dataset.repartition`.

        .. warning::
            This method supports only a subset of the PyArrow's types, due to the
            limitation of pymongoarrow which is used underneath. Writing unsupported
            types fails on type checking. See all the supported types at:
            https://mongo-arrow.readthedocs.io/en/stable/api/types.html.

        .. note::
            The records are inserted into MongoDB as new documents. If a record has
            the _id field, this _id must be non-existent in MongoDB, otherwise the write
            is rejected and fail (hence preexisting documents are protected from
            being mutated). It's fine to not have _id field in record and MongoDB will
            auto generate one at insertion.

        Examples:

            .. testcode::
                :skipif: True

                import ray

                ds = ray.data.range(100)
                ds.write_mongo(
                    uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
                    database="my_db",
                    collection="my_collection"
                )

        Args:
            uri: The URI to the destination MongoDB where the dataset is
                written to. For the URI format, see details in the
                `MongoDB docs <https://www.mongodb.com/docs/manual/reference                    /connection-string/>`_.
            database: The name of the database. This database must exist otherwise
                a ValueError is raised.
            collection: The name of the collection in the database. This collection
                must exist otherwise a ValueError is raised.
            ray_remote_args: kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.

        Raises:
            ValueError: if ``database`` doesn't exist.
            ValueError: if ``collection`` doesn't exist.
        )r  r  r   r  N)r   r  )r   r  r  r   r   r   r  s          r   write_mongozDataset.write_mongoL  sT    @ !!
 
 

 	+# 	 	
 	
 	
 	
 	
r   
   
project_idr   max_retry_cntoverwrite_tablec                     |i }|                     dd          dk    rt          j        d           nd|d<   t          ||||          }|                     |||           dS )a  Write the dataset to a BigQuery dataset table.

        To control the number of parallel write tasks, use ``.repartition()``
        before calling this method.

        Examples:
             .. testcode::
                :skipif: True

                import ray
                import pandas as pd

                docs = [{"title": "BigQuery Datasource test"} for key in range(4)]
                ds = ray.data.from_pandas(pd.DataFrame(docs))
                ds.write_bigquery(
                    project_id="my_project_id",
                    dataset="my_dataset_table",
                    overwrite_table=True
                )

        Args:
            project_id: The name of the associated Google Cloud Project that hosts
                the dataset to read. For more information, see details in
                `Creating and managing projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`_.
            dataset: The name of the dataset in the format of ``dataset_id.table_id``.
                The dataset is created if it doesn't already exist.
            max_retry_cnt: The maximum number of retries that an individual block write
                is retried due to BigQuery rate limiting errors. This isn't
                related to Ray fault tolerance retries. The default number of retries
                is 10.
            overwrite_table: Whether the write will overwrite the table if it already
                exists. The default behavior is to overwrite the table.
                ``overwrite_table=False`` will append to the table if it exists.
            ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
        Nmax_retriesr   zVThe max_retries of a BigQuery Write Task should be set to 0 to avoid duplicate writes.)r  r   r  r  r  )r  r   r   r   r  )r   r  r   r  r  r   r   r  s           r   write_bigqueryzDataset.write_bigquery  s    b " O }a00A55M.   
 ./OM*#!'+	
 
 
 	+# 	 	
 	
 	
 	
 	
r   )r  r   client_settingsclient_kwargstable_settingsmax_insert_block_rowsr   r   dsnzpyarrow.Schemar
  r  r  r  c          
      d    t          ||||||||          }|                     ||	|
           dS )a%  Write the dataset to a ClickHouse dataset table.

        To control the number of parallel write tasks, use ``.repartition()``
        before calling this method.

        Examples:
             .. testcode::
                :skipif: True

                import ray
                import pyarrow as pa
                import pandas as pd

                docs = [{"title": "ClickHouse Datasink test"} for key in range(4)]
                ds = ray.data.from_pandas(pd.DataFrame(docs))
                user_schema = pa.schema(
                    [
                        ("id", pa.int64()),
                        ("title", pa.string()),
                    ]
                )
                ds.write_clickhouse(
                    table="default.my_table",
                    dsn="clickhouse+http://user:pass@localhost:8123/default",
                    mode=ray.data.SinkMode.OVERWRITE,
                    schema=user_schema,
                    table_settings=ray.data.ClickHouseTableSettings(
                        engine="ReplacingMergeTree()",
                        order_by="id",
                    ),
                )

        Args:
            table: Fully qualified table identifier (e.g., "default.my_table").
                The table is created if it doesn't already exist.
            dsn: A string in DSN (Data Source Name) HTTP format
                (e.g., "clickhouse+http://username:password@host:8123/default").
                For more information, see `ClickHouse Connection String doc
                <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
            mode: One of SinkMode.CREATE, SinkMode.APPEND, or
                SinkMode.OVERWRITE:

                * SinkMode.CREATE: Create a new table; fail if it already exists. If the table
                    does not exist, you must provide a schema (either via the `schema`
                    argument or as part of the dataset's first block).

                * SinkMode.APPEND: If the table exists, append data to it; if not, create
                    the table using the provided or inferred schema. If the table does
                    not exist, you must supply a schema.

                * SinkMode.OVERWRITE: Drop any existing table of this name, then create
                    a new table and write data to it. You **must** provide a schema in
                    this case, as the table is being re-created.

            schema: Optional :class:`pyarrow.Schema` specifying column definitions.
                This is mandatory if you are creating a new table (i.e., table doesn't
                exist in CREATE or APPEND mode) or overwriting an existing table (OVERWRITE).
                When appending to an existing table, a schema is optional, though you can
                provide one to enforce column types or cast data as needed. If omitted
                (and the table already exists), the existing table definition will be used.
                If omitted and the table must be created, the schema is inferred from
                the first block in the dataset.
            client_settings: Optional ClickHouse server settings to be used with the
                session/every request. For more information, see
                `ClickHouse Client Settings doc
                <https://clickhouse.com/docs/en/integrations/python#settings-argument>`_.
            client_kwargs: Optional keyword arguments to pass to the
                ClickHouse client. For more information, see
                `ClickHouse Core Settings doc
                <https://clickhouse.com/docs/en/integrations/python#additional-options>`_.
            table_settings: An optional :class:`ClickHouseTableSettings` dataclass
                that specifies additional table creation instructions, including:

                * engine (default: `"MergeTree()"`):
                    Specifies the engine for the `CREATE TABLE` statement.

                * order_by:
                    Sets the `ORDER BY` clause in the `CREATE TABLE` statement, iff not provided.
                    When overwriting an existing table, its previous `ORDER BY` (if any) is reused.
                    Otherwise, a "best" column is selected automatically (favoring a timestamp column,
                    then a non-string column, and lastly the first column).

                * partition_by:
                    If present, adds a `PARTITION BY <value>` clause to the `CREATE TABLE` statement.

                * primary_key:
                    If present, adds a `PRIMARY KEY (<value>)` clause.

                * settings:
                    Appends a `SETTINGS <value>` clause to the `CREATE TABLE` statement, allowing
                    custom ClickHouse settings.

            max_insert_block_rows: If you have extremely large blocks, specifying
                a limit here will chunk the insert into multiple smaller insert calls.
                Defaults to None (no chunking).
            ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
        )r  r  r  r   r
  r  r  r  r  N)r   r  )r   r  r  r  r   r
  r  r  r  r   r   r  s               r   write_clickhousezDataset.write_clickhouse  sc    h &+')"7	
 	
 	
 	+# 	 	
 	
 	
 	
 	
r   rr  i   i   )r   r  r  r  data_storage_versionstorage_optionsr   r   )rr  r  	overwriter  r  c          	      b    t          |||||||          }
|                     |
||	           dS )a\  Write the dataset to a Lance dataset.

        Examples:
             .. testcode::
                import ray
                import pandas as pd

                docs = [{"title": "Lance data sink test"} for key in range(4)]
                ds = ray.data.from_pandas(pd.DataFrame(docs))
                ds.write_lance("/tmp/data/")

        Args:
            path: The path to the destination Lance dataset.
            schema: The schema of the dataset. If not provided, it is inferred from the data.
            mode: The write mode. Can be "create", "append", or "overwrite".
            min_rows_per_file: The minimum number of rows per file.
            max_rows_per_file: The maximum number of rows per file.
            data_storage_version: The version of the data storage format to use. Newer versions are more
                efficient but require newer versions of lance to read.  The default is
                "legacy" which will use the legacy v1 version.  See the user guide
                for more details.
            storage_options: The storage options for the writer. Default is None.
        )r   r  r  r  r  r  r  N)r   r  )r   r  r   r  r  r  r  r  r   r   r  s              r   write_lancezDataset.write_lancef  s`    J !//!5+
 
 
 	+# 	 	
 	
 	
 	
 	
r   )r  r  c                f   |i }|j         snt          j        j        j                                        rt          d          t          t          j                                                    d          |d<   | j	        
                                }t          | j        j        |||          }t          || j                  }	 t!          |t"                    r@|                                 |j        r%t(                              d|j         d           dS t/          ||                                          | _        | j                                        \  }}g }	|D ]=}
t          j        |
j                  }|	                    t=          |                     >t?          j         |	 }t(                              d	|!                                |j"        tG          |j$                             |%                    |           dS # tL          $ r}|'                    |            d}~ww xY w)
a^  Writes the dataset to a custom :class:`~ray.data.Datasink`.

        Time complexity: O(dataset size / parallelism)

        Args:
            datasink: The :class:`~ray.data.Datasink` to write to.
            ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
            concurrency: The maximum number of Ray tasks to run concurrently. Set this
                to control number of tasks to run concurrently. This doesn't change the
                total number of tasks run. By default, concurrency is dynamically
                decided based on the available resources.
        NzUIf you're using Ray Client, Ray Data won't schedule write tasks on the driver's node.F)softscheduling_strategyr  zIgnoring write because z already existsz3Data sink %s finished. %d rows and %s data written.)(supports_distributed_writesr  r  clientis_connectedr   rm   get_runtime_contextget_node_idr   r   r=   r   r   r,   r   r   re   on_write_start_skip_writer  r  r  r   r  r   _execute_to_iteratorr  r  r  rd   rc   combineget_namer
  r(   r  on_write_complete	Exceptionon_write_failed)r   r  r   r   r   write_opr   iter_r  write_resultsr  r  combined_write_resultes                 r   r  zDataset.write_datasink  sM   ( " O3 		x"//11  ,   6T'))55776 6 6O12
 z  "+#	
 
 
 #8T\::$	
 (M22 '')))' KKP(-PPP   F$T<88DDFFDN>>>@@LE5M F Fgf/00$$%?%D%DEEEE$/$7$G!KKE!!##%.3>??	   &&'<===== 	 	 	$$Q'''	s    AH C.H 
H0H++H0zGCalling any of the consumption methods on the returned ``DataIterator``zReturns:)delegater  c                      t          |           S )zReturn a :class:`~ray.data.DataIterator` over this dataset.

        Don't call this method directly. Use it internally.

        Returns:
            A :class:`~ray.data.DataIterator` over this dataset.
        r)   r   s    r   iteratorzDataset.iterator  s      %%%r   c                 N    |                                                                  S )ak  Return an iterable over the rows in this dataset.

        Examples:
            >>> import ray
            >>> for row in ray.data.range(3).iter_rows():
            ...     print(row)
            {'id': 0}
            {'id': 1}
            {'id': 2}

        Time complexity: O(1)

        Returns:
            An iterable over the rows in this dataset.
        )r-  r  r   s    r   r  zDataset.iter_rows  s    $ }}((***r      r  r   r   	drop_lastlocal_shuffle_buffer_sizelocal_shuffle_seed_collate_fnr  r1  r2  r3  r4  c          	      |    t          |          }|                                                     |||||||          S )a  Return an iterable over batches of data.

        This method is useful for model training.

        Examples:

            .. testcode::

                import ray

                ds = ray.data.read_images("example://image-datasets/simple")

                for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
                    print(batch)

            .. testoutput::
                :options: +MOCK

                {'image': array([[[[...]]]], dtype=uint8)}
                ...
                {'image': array([[[[...]]]], dtype=uint8)}

        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool is used
                to fetch the objects to the local node and format the batches. Defaults
                to 1.
            batch_size: The number of rows in each batch, or ``None`` to use entire
                blocks as batches (blocks may contain different numbers of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            batch_format: If ``"default"`` or ``"numpy"``, batches are
                ``Dict[str, numpy.ndarray]``. If ``"pandas"``, batches are
                ``pandas.DataFrame``.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled
                using a local in-memory shuffle buffer, and this value serves as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer are drained.
            local_shuffle_seed: The seed to use for the local random shuffle.

        Returns:
            An iterable over batches of data.
        r0  )r]   r-  _iter_batches)r   r  r   r   r1  r2  r3  r4  s           r   r  zDataset.iter_batches  sL    x +<88}},,-!%&?1# - 
 
 	
r   autor  r   dtypesdevice
collate_fnr1  r2  r3  r9  ztorch.dtyper:  r;  c          
      `    |                                                      ||||||||          S )a  Return an iterable over batches of data represented as Torch tensors.

        This iterable yields batches of type ``Dict[str, torch.Tensor]``.
        For more flexibility, call :meth:`~Dataset.iter_batches` and manually convert
        your data to Torch tensors.

        Examples:
            >>> import ray
            >>> for batch in ray.data.range(
            ...     12,
            ... ).iter_torch_batches(batch_size=4):
            ...     print(batch)
            {'id': tensor([0, 1, 2, 3])}
            {'id': tensor([4, 5, 6, 7])}
            {'id': tensor([ 8,  9, 10, 11])}

            Use the ``collate_fn`` to customize how the tensor batch is created.

            >>> from typing import Any, Dict
            >>> import torch
            >>> import numpy as np
            >>> import ray
            >>> def collate_fn(batch: Dict[str, np.ndarray]) -> Any:
            ...     return torch.stack(
            ...         [torch.as_tensor(array) for array in batch.values()],
            ...         axis=1
            ...     )
            >>> dataset = ray.data.from_items([
            ...     {"col_1": 1, "col_2": 2},
            ...     {"col_1": 3, "col_2": 4}])
            >>> for batch in dataset.iter_torch_batches(collate_fn=collate_fn):
            ...     print(batch)
            tensor([[1, 2],
                    [3, 4]])


        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool is used
                to fetch the objects to the local node, format the batches, and apply
                the ``collate_fn``. Defaults to 1.
            batch_size: The number of rows in each batch, or ``None`` to use entire
                blocks as batches (blocks may contain different number of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            dtypes: The Torch dtype(s) for the created tensor(s); if ``None``, the dtype
                is inferred from the tensor data. You can't use this parameter with
                ``collate_fn``.
            device: The device on which the tensor should be placed. Defaults to
                "auto" which moves the tensors to the appropriate device when the
                Dataset is passed to Ray Train and ``collate_fn`` is not provided.
                Otherwise, defaults to CPU. You can't use this parameter with
                ``collate_fn``.
            collate_fn: A function to convert a Numpy batch to a PyTorch tensor batch.
                When this parameter is specified, the user should manually handle the
                host to device data transfer outside of collate_fn.
                This is useful for further processing the data after it has been
                batched. Potential use cases include collating along a dimension other
                than the first, padding sequences of various lengths, or generally
                handling batches of different length tensors. If not provided, the
                default collate function is used which simply converts the batch of
                numpy arrays to a batch of PyTorch tensors. This API is still
                experimental and is subject to change. You can't use this parameter in
                conjunction with ``dtypes`` or ``device``.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled
                using a local in-memory shuffle buffer, and this value serves as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer are drained.
                ``batch_size`` must also be specified when using local shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.

        Returns:
            An iterable over Torch Tensor batches.

        .. seealso::
            :meth:`Dataset.iter_batches`
                Call this method to manually convert your data to Torch tensors.
        r8  )r-  iter_torch_batches)	r   r  r   r9  r:  r;  r1  r2  r3  s	            r   r=  zDataset.iter_torch_batchesX  sB    @ }}11-!!&?1 2 	
 	
 		
r   r  r   r9  r1  r2  r3  ztf.dtypes.DTypec                    t          j        dt                     |                                                     ||||||          S )a1  Return an iterable over batches of data represented as TensorFlow tensors.

        This iterable yields batches of type ``Dict[str, tf.Tensor]``.
        For more flexibility, call :meth:`~Dataset.iter_batches` and manually convert
        your data to TensorFlow tensors.

        .. tip::
            If you don't need the additional flexibility provided by this method,
            consider using :meth:`~ray.data.Dataset.to_tf` instead. It's easier
            to use.

        Examples:

            .. testcode::

                import ray

                ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")

                tf_dataset = ds.to_tf(
                    feature_columns="sepal length (cm)",
                    label_columns="target",
                    batch_size=2
                )
                for features, labels in tf_dataset:
                    print(features, labels)

            .. testoutput::

                tf.Tensor([5.1 4.9], shape=(2,), dtype=float64) tf.Tensor([0 0], shape=(2,), dtype=int64)
                ...
                tf.Tensor([6.2 5.9], shape=(2,), dtype=float64) tf.Tensor([2 2], shape=(2,), dtype=int64)

        Time complexity: O(1)

        Args:
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool is used
                to fetch the objects to the local node, format the batches, and apply
                the ``collate_fn``. Defaults to 1.
            batch_size: The number of rows in each batch, or ``None`` to use entire
                blocks as batches (blocks may contain different numbers of rows).
                The final batch may include fewer than ``batch_size`` rows if
                ``drop_last`` is ``False``. Defaults to 256.
            dtypes: The TensorFlow dtype(s) for the created tensor(s); if ``None``, the
                dtype is inferred from the tensor data.
            drop_last: Whether to drop the last batch if it's incomplete.
            local_shuffle_buffer_size: If not ``None``, the data is randomly shuffled
                using a local in-memory shuffle buffer, and this value serves as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer are drained.
                ``batch_size`` must also be specified when using local shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.

        Returns:
            An iterable over TensorFlow Tensor batches.

        .. seealso::
            :meth:`Dataset.iter_batches`
                Call this method to manually convert your data to TensorFlow tensors.
        zX`iter_tf_batches` is deprecated and will be removed after May 2025. Use `to_tf` instead.r>  )r   r   r   r-  iter_tf_batches)r   r  r   r9  r1  r2  r3  s          r   r@  zDataset.iter_tf_batches  sZ    T 		
 	
 	

 }}..-!&?1 / 
 
 	
r   )	additional_columnsr  r   r1  r2  r3  feature_type_speclabel_type_specadditional_type_specfeature_columnslabel_columnsrA  rB  rz   rC  rD  ztf.data.Datasetc       	         f    |                                                      |||||||||	|
|          S )aZ  Return a `TensorFlow Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset/>`_
        over this :class:`~ray.data.Dataset`.

        .. warning::
            If your :class:`~ray.data.Dataset` contains ragged tensors, this method errors.
            To prevent errors, :ref:`resize your tensors <transforming_tensors>`.

        Examples:
            >>> import ray
            >>> ds = ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
            >>> ds
            Dataset(num_rows=?, schema=...)

            If your model accepts a single tensor as input, specify a single feature column.

            >>> ds.to_tf(feature_columns="sepal length (cm)", label_columns="target")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your model accepts a dictionary as input, specify a list of feature columns.

            >>> ds.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
            <_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your dataset contains multiple features but your model accepts a single
            tensor as input, combine features with
            :class:`~ray.data.preprocessors.Concatenator`.

            >>> from ray.data.preprocessors import Concatenator
            >>> columns_to_concat = ["sepal length (cm)", "sepal width (cm)", "petal length (cm)", "petal width (cm)"]
            >>> preprocessor = Concatenator(columns=columns_to_concat, output_column_name="features")
            >>> ds = preprocessor.transform(ds)
            >>> ds
            Concatenator
            +- Dataset(num_rows=?, schema=...)
            >>> ds.to_tf("features", "target")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>

            If your model accepts different types, shapes, or names of tensors as input, specify the type spec.
            If type specs are not specified, they are automatically inferred from the schema of the dataset.

            >>> import tensorflow as tf
            >>> ds.to_tf(
            ...     feature_columns="features",
            ...     label_columns="target",
            ...     feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
            ...     label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
            ... )
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>

            If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
            A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.

            >>> import pandas as pd
            >>> ds = ds.add_column("sample weights", lambda df: pd.Series([1] * len(df)))
            >>> ds.to_tf(feature_columns="features", label_columns="target", additional_columns="sample weights")
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>

            If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.

            >>> ds.to_tf(
            ...     feature_columns="features",
            ...     label_columns="target",
            ...     additional_columns="sample weights",
            ...     additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
            ... )
            <_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>

        Args:
            feature_columns: Columns that correspond to model inputs. If this is a
                string, the input data is a tensor. If this is a list, the input data
                is a ``dict`` that maps column names to their tensor representation.
            label_columns: Columns that correspond to model targets. If this is a
                string, the target data is a tensor. If this is a list, the target data
                is a ``dict`` that maps column names to their tensor representation.
            additional_columns: Columns that correspond to sample weights or other metadata.
                If this is a string, the weight data is a tensor. If this is a list, the
                weight data is a ``dict`` that maps column names to their tensor representation.
            prefetch_batches: The number of batches to fetch ahead of the current batch
                to fetch. If set to greater than 0, a separate threadpool is used
                to fetch the objects to the local node, format the batches, and apply
                the collate_fn. Defaults to 1.
            batch_size: Record batch size. Defaults to 1.
            drop_last: Set to True to drop the last incomplete batch,
                if the dataset size is not divisible by the batch size. If
                False and the size of the stream is not divisible by the batch
                size, then the last batch is smaller. Defaults to False.
            local_shuffle_buffer_size: If non-None, the data is randomly shuffled
                using a local in-memory shuffle buffer, and this value will serve as the
                minimum number of rows that must be in the local in-memory shuffle
                buffer in order to yield a batch. When there are no more rows to add to
                the buffer, the remaining rows in the buffer is drained. This
                buffer size must be greater than or equal to ``batch_size``, and
                therefore ``batch_size`` must also be specified when using local
                shuffling.
            local_shuffle_seed: The seed to use for the local random shuffle.
            feature_type_spec: The `tf.TypeSpec` of `feature_columns`. If there is
                only one column, specify a `tf.TypeSpec`. If there are multiple columns,
                specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.
            label_type_spec: The `tf.TypeSpec` of `label_columns`. If there is
                only one column, specify a `tf.TypeSpec`. If there are multiple columns,
                specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.
            additional_type_spec: The `tf.TypeSpec` of `additional_columns`. If there
                is only one column, specify a `tf.TypeSpec`. If there are multiple
                columns, specify a ``dict`` that maps column names to their `tf.TypeSpec`.
                Default is `None` to automatically infer the type of each column.

        Returns:
            A `TensorFlow Dataset`_ that yields inputs and targets.

        .. seealso::

            :meth:`~ray.data.Dataset.iter_tf_batches`
                Call this method if you need more flexibility.
        )rE  rF  rA  r  r1  r   r2  r3  rB  rC  rD  )r-  to_tf)r   rE  rF  rA  r  r   r1  r2  r3  rB  rC  rD  s               r   rH  zDataset.to_tf  sK    L }}$$+'1-!&?1/+!5 % 
 
 	
r   daft.DataFramec                 *    ddl } |j        |           S )a  Convert this :class:`~ray.data.Dataset` into a
        `Daft DataFrame <https://docs.getdaft.io/en/stable/api/dataframe/>`_.

        This will convert all the data inside the Ray Dataset into a Daft DataFrame in a zero-copy way
        (using Arrow as the intermediate data format).

        Time complexity: O(dataset size / parallelism)

        Returns:
            A `Daft DataFrame`_ created from this dataset.
        r   N)daftfrom_ray_dataset)r   rK  s     r   to_daftzDataset.to_daft  s      	$t$T***r   metapandas.DataFramezpandas.Seriesverify_metazdask.dataframe.DataFramec                    ddl }ddlm} ddl	 ddl}n# t
          $ r d}Y nw xY wddlm} ddlm	 ddl
m} |j                            |           |j        dt          t                    dj        ffd	            }|dd
lm |                     d          }	t+          |	|          r;                    fdt-          |	j        |	j                  D                       }n|t+          |	|j                  rt5                      t7          fd|	j        D                       r;                    fdt-          |	j        |	j                  D                       }n&|	                                                                }g }
|                                 D ]*}|j        D ] }|
                      ||                     !+|!                    |
||          }|S )a  Convert this :class:`~ray.data.Dataset` into a
        `Dask DataFrame <https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.html#dask.dataframe.DataFrame>`_.

        This is only supported for datasets convertible to Arrow records.

        Note that this function will set the Dask scheduler to Dask-on-Ray
        globally, via the config.

        Time complexity: O(dataset size / parallelism)

        Args:
            meta: An empty `pandas DataFrame`_ or `Series`_ that matches the dtypes and column
                names of the stream. This metadata is necessary for many algorithms in
                dask dataframe to work. For ease of use, some alternative inputs are
                also available. Instead of a DataFrame, a dict of ``{name: dtype}`` or
                iterable of ``(name, dtype)`` can be provided (note that the order of
                the names should match the order of the columns). Instead of a series, a
                tuple of ``(name, dtype)`` can be used.
                By default, this is inferred from the underlying Dataset schema,
                with this argument supplying an optional override.
            verify_meta: If True, Dask will check that the partitions have consistent
                metadata. Defaults to True.

        Returns:
            A `Dask DataFrame`_ created from this dataset.

        .. _pandas DataFrame: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html
        .. _Series: https://pandas.pydata.org/docs/reference/api/pandas.Series.html
        r   Nr?   )ClientObjectRef)ray_dask_get)	schedulerr  r   c                 x    t          | t          j        f          rt          d          t	          |           S )NzzDataset.to_dask() must be used with Dask-on-Ray, please set the Dask scheduler to ray_dask_get (located in ray.util.dask).)r   r  ri   r   _block_to_df)r  rS  s    r   block_to_dfz$Dataset.to_dask.<locals>.block_to_df  sC    )cm_%EFF  &  
  	***r   )TensorDtypeTrH  c           	      |    i | ]8\  }}|                     t          |          s|nt          j                   9S )dtype)r   r   r   object_)r  rx   r\  rY  r   s      r   r  z#Dataset.to_dask.<locals>.<dictcomp>  sa     	 	 	 'C RYY (2%'E'E!0%'Z	 '  	 	 	r   c              3   8   K   | ]}t          |          V  d S r   )r   )r  type_arrow_tensor_ext_typess     r   r  z"Dataset.to_dask.<locals>.<genexpr>  s?        BGJu&<==     r   c           	          i | ]J\  }}|                     t          |          s|                                nt          j                   KS r[  )r   r   to_pandas_dtyper   r]  )r  rx   r\  r`  r   s      r   r  z#Dataset.to_dask.<locals>.<dictcomp>  so     	 	 	 !+U   ,6e=S+T+T%4E$9$9$;$;$;)+	 "+ " "	 	 	r   )rN  rP  )"daskdask.dataframe	dataframer   r   r$  ray.data._internal.pandas_blockr?   ray.util.client.commonrS  ray.util.daskrT  configr  delayedri   rV   r   ray.data.extensionsrY  r   r   r-  r  typesr  r   r  empty_table	to_pandasiter_internal_ref_bundlesr  r  from_delayed)r   rN  rP  rc  ddr   r?   rT  rX  r   dfs
ref_bundler  ddfrS  rY  r`  r   s                 @@@@r   to_daskzDataset.to_dask  s   V 	######	      	 	 	BBB	 	FEEEEE::::::......,///		+9U#3 	+ 	+ 	+ 	+ 	+ 	+ 
	+ <777777 [[$[77F&"344  <||	 	 	 	 	 +.flFL*I*I	 	 	  Jvry$A$A)U)W)W&    KQ<     < <<	 	 	 	 	 /2&,.M.M	 	 	 DD "--//99;;D88:: 	3 	3J'2 3 3	

;;y1122223 oo#  
 

 
s    ((mars.dataframe.DataFramec                 \   ddl }ddl}ddlm} ddlm} ddlm} |                                 }| 	                                }t          |t                    r|j        }t          ||          r"|                    |j        |j                  }nSt          ||j                  r,|                                                                j        }nt'          d|            ||                    d                    }	 ||j        d	
          }
 ||          } ||	|
|          S )a&  Convert this :class:`~ray.data.Dataset` into a
        `Mars DataFrame <https://mars-project.readthedocs.io/en/latest/reference/dataframe/index.html>`_.

        Time complexity: O(dataset size / parallelism)

        Returns:
            A `Mars DataFrame`_ created from this dataset.
        r   N)DataFrameReadRayDataset)parse_indexrR  )r   zUnsupported format of schema r   T)
store_data)refs)index_valuecolumns_valuer9  )r   r   )mars.dataframe.datasource.read_raydatasetrx  mars.dataframe.utilsry  rf  r?   to_pandas_refsr   r   r  ro  r   rl  r  rm  rn  r9  r  
RangeIndexr   )r   r   r   rx  ry  r?   r{  r   r9  r|  r}  r2  s               r   to_marszDataset.to_mars8  sb    	UUUUUU444444EEEEEE""$$ff%% 	('Ff/00 	PYYv|6<Y@@FF	** 	P''))3355<FF%&Nf&N&NOOO!k"--"3"344#FLTBBB$$$///rkvVVVVr    modin.pandas.dataframe.DataFramec                 P    ddl m} |                                 } ||d          S )aw  Convert this :class:`~ray.data.Dataset` into a
        `Modin DataFrame <https://modin.readthedocs.io/en/stable/flow/modin/pandas/dataframe.html>`_.

        This works by first converting this dataset into a distributed set of
        Pandas DataFrames (using :meth:`Dataset.to_pandas_refs`).
        See caveats there. Then the individual DataFrames are used to
        create the Modin DataFrame using
        ``modin.distributed.dataframe.pandas.partitions.from_partitions()``.

        This is only supported for datasets convertible to Arrow records.
        This function induces a copy of the data. For zero-copy access to the
        underlying data, consider using :meth:`.to_arrow_refs` or
        :meth:`.iter_internal_ref_bundles`.

        Time complexity: O(dataset size / parallelism)

        Returns:
            A `Modin DataFrame`_ created from this dataset.
        r   )from_partitions)axis)-modin.distributed.dataframe.pandas.partitionsr  r  )r   r  pd_objss      r   to_modinzDataset.to_modinZ  s=    . 	RQQQQQ%%''wQ////r   sparkpyspark.sql.SparkSessionpyspark.sql.DataFramec                     ddl }|                                 }t          |t                    r|j        }|                                 }t          |          }|j                            |||          S )a'  Convert this :class:`~ray.data.Dataset` into a
        `Spark DataFrame <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html>`_.

        Time complexity: O(dataset size / parallelism)

        Args:
            spark: A `SparkSession`_, which must be created by RayDP (Spark-on-Ray).

        Returns:
            A `Spark DataFrame`_ created from this dataset.

        .. _SparkSession: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html
        r   N)	raydpr   r   r  ro  ro  r'   r  ray_dataset_to_spark_dataframe)r   r  r  r   r  r  s         r   to_sparkzDataset.to_sparkv  sl      	ff%% 	('F4466=kJJ
{99%TTTr   c           	      Z   |3|                                  }||k    rt          d| d| d| d          t                      }|                     dd          D ]}|                    |           |                                }t          j        |                                          S )a  Convert this :class:`~ray.data.Dataset` to a single pandas DataFrame.

        This method errors if the number of rows exceeds the provided ``limit``.
        To truncate the dataset beforehand, call :meth:`.limit`.

        Examples:
            >>> import ray
            >>> ds = ray.data.from_items([{"a": i} for i in range(3)])
            >>> ds.to_pandas()
               a
            0  0
            1  1
            2  2

        Time complexity: O(dataset size)

        Args:
            limit: The maximum number of rows to return. An error is
                raised if the dataset has more rows than this limit. Defaults to
                ``None``, which means no limit.

        Returns:
            A pandas DataFrame created from this dataset, containing a limited
            number of rows.

        Raises:
            ValueError: if the number of rows in the :class:`~ray.data.Dataset` exceeds
                ``limit``.
        Nz-the dataset has more than the given limit of z rows: z(. If you are sure that a DataFrame with zO rows will fit in local memory, set ds.to_pandas(limit=None) to disable limits.r   )r   r   )	r  r   r>   r  	add_blockbuildrW   	for_blockrn  )r   r  r  builderr   blocks         r   rn  zDataset.to_pandas  s    @ JJLLEu}} BE B B"B BB B B   %&&&&H&NN 	% 	%Ee$$$$
 &u--77999r   c                     t          t                    }g }|                                 D ]4}|j        D ]*}|                    |                    |                     +5|S )a  Converts this :class:`~ray.data.Dataset` into a distributed set of Pandas
        dataframes.

        One DataFrame is created for each block in this Dataset.

        This function induces a copy of the data. For zero-copy access to the
        underlying data, consider using :meth:`Dataset.to_arrow_refs` or
        :meth:`Dataset.iter_internal_ref_bundles`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10, override_num_blocks=2)
            >>> refs = ds.to_pandas_refs()
            >>> len(refs)
            2

        Time complexity: O(dataset size / parallelism)

        Returns:
            A list of remote pandas DataFrames created from this dataset.
        )rB   rW  ro  r  r  remote)r   rX  pandas_refsr  r  s        r   r  zDataset.to_pandas_refs  sx    2 '|444466 	B 	BF#. B B	"";#5#5i#@#@AAAABr   r   c                    t          t                    }g }|                                 D ]6}|j        D ],}|                    |                    ||                     -7|S )a  Converts this :class:`~ray.data.Dataset` into a distributed set of NumPy
        ndarrays or dictionary of NumPy ndarrays.

        This is only supported for datasets convertible to NumPy ndarrays.
        This function induces a copy of the data. For zero-copy access to the
        underlying data, consider using :meth:`Dataset.to_arrow_refs` or
        :meth:`Dataset.iter_internal_ref_bundles`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10, override_num_blocks=2)
            >>> refs = ds.to_numpy_refs()
            >>> len(refs)
            2

        Time complexity: O(dataset size / parallelism)

        Args:
            column: The name of the column to convert to numpy. If ``None``, all columns
                are used. If multiple columns are specified, each returned
                future represents a dict of ndarrays. Defaults to None.

        Returns:
            A list of remote NumPy ndarrays created from this dataset.
        r  )rB   _block_to_ndarrayro  r  r  r  )r   r   block_to_ndarray
numpy_refsr  r  s         r   to_numpy_refszDataset.to_numpy_refs  s    : ,,=>>
4466 	U 	UF#. U U	!!"2"9"9)F"9"S"STTTTUr   zpyarrow.Tablec                 4   ddl }| j                                        }t          |g          }|                     d          }t          |t                    r|j        }t          ||j                  r|S t          t                    fd|D             S )a  Convert this :class:`~ray.data.Dataset` into a distributed set of PyArrow
        tables.

        One PyArrow table is created for each block in this Dataset.

        This method is only supported for datasets convertible to PyArrow tables.
        This function is zero-copy if the existing data is already in PyArrow
        format. Otherwise, the data is converted to PyArrow format.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10, override_num_blocks=2)
            >>> refs = ds.to_arrow_refs()
            >>> len(refs)
            2

        Time complexity: O(1) unless conversion is required.

        Returns:
            A list of remote PyArrow tables created from this dataset.
        r   NTrH  c                 :    g | ]}                     |          S r  )r  )r  r  block_to_arrows     r   r  z)Dataset.to_arrow_refs.<locals>.<listcomp>0  s'    EEE%%e,,EEEr   )
r   r   r  r'   r   r   r  ro  rB   _block_to_arrow)r   r   rs  r  r   r  s        @r   to_arrow_refszDataset.to_arrow_refs  s    0 	 $
 2 2 4 4
 5j\BB 	
 d33ff%% 	('Ffbi(( 	)/::EEEE*EEEEr   zArgs:num_workersc                 p    |#dt          t          j                              z  }t          | ||          S )a  Convert this dataset into a distributed RandomAccessDataset (EXPERIMENTAL).

        RandomAccessDataset partitions the dataset across the cluster by the given
        sort key, providing efficient random access to records via binary search. A
        number of worker actors are created, each of which has zero-copy access to the
        underlying sorted data blocks of the dataset.

        Note that the key must be unique in the dataset. If there are duplicate keys,
        an arbitrary value is returned.

        This is only supported for Arrow-format datasets.

        Args:
            key: The key column over which records can be queried.
            num_workers: The number of actors to use to serve random access queries.
                By default, this is determined by multiplying the number of Ray nodes
                in the cluster by four. As a rule of thumb, you can expect each worker
                to provide ~3000 records / second via ``get_async()``, and
                ~10000 records / second via ``multiget()``.
        N   )r  )r  r  nodesrh   )r   r  r  s      r   to_random_access_datasetz Dataset.to_random_access_dataset2  s7    4 c#)++...K"4+FFFFr   zstore memory.)r  insert_afterc                    t                               | dt                    }|j                                        j        }fd|D             }t          t          |          | j                  }t          t          |j        
                                |j                  |          }|                    |j                   |                     |j                               |j                                         |S )a  Execute and materialize this dataset into object store memory.

        This can be used to read all blocks into memory. By default, Dataset
        doesn't read blocks from the datasource until the first transform.

        Note that this does not mutate the original Dataset. Only the blocks of the
        returned MaterializedDataset class are pinned in memory.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(10)
            >>> materialized_ds = ds.materialize()
            >>> materialized_ds
            MaterializedDataset(num_blocks=..., num_rows=10, schema={id: int64})

        Returns:
            A MaterializedDataset holding the materialized data blocks.
        T)r   r   c                 @    g | ]}t          |gd j                  S )F)r  r|  r   r}  )r  block_with_metadatar  s     r   r  z'Dataset.materialize.<locals>.<listcomp>o  sH     
 
 
 $ +,!}  
 
 
r   r  r  )r   r   ru  r   r  r  r,   r2   r   r@   r  r   r   r   	_get_uuid)r   r   blocks_with_metadatar  r   r  r  s         @r   r  zDataset.materializeP  s    * ||DT7J|KK J..00%}
 
 
 
 (<
 
 
 #9#D#D#DdlSS$$***,,4<HHH
 
 		"""))***r   c                 R   | j         r=| j                                                                                                         S | j        7| j        j                                        r| j                                        S |                                                                 S )a  Returns a string containing execution timing information.

        Note that this does not trigger execution, so if the dataset has not yet
        executed, an empty string is returned.

        Examples:

        .. testcode::

            import ray

            ds = ray.data.range(10)
            assert ds.stats() == ""

            ds = ds.materialize()
            print(ds.stats())

        .. testoutput::
            :options: +MOCK

            Operator 0 Read: 1 tasks executed, 5 blocks produced in 0s
            * Remote wall time: 16.29us min, 7.29ms max, 1.21ms mean, 24.17ms total
            * Remote cpu time: 16.0us min, 2.54ms max, 810.45us mean, 16.21ms total
            * Peak heap memory usage (MiB): 137968.75 min, 142734.38 max, 139846 mean
            * Output num rows: 0 min, 1 max, 0 mean, 10 total
            * Output size bytes: 0 min, 8 max, 4 mean, 80 total
            * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

        )	r   	get_stats
to_summary	to_stringr   r   has_computed_outputr  _get_stats_summaryr   s    r   r  zDataset.stats  s    > ! 	*)3355@@BBLLNNN^'DN,@,T,T,V,V'>'')))&&((22444r   c                 R    t          | j                                                   dS )a  Show the logical plan and physical plan of the dataset.

        Examples:

        .. testcode::

            import ray
            from ray.data import Dataset
            ds: Dataset = ray.data.range(10,  override_num_blocks=10)
            ds = ds.map(lambda x: x + 1)
            ds.explain()

        .. testoutput::

            <BLANKLINE>
            -------- Logical Plan --------
            MapRows[Map(<lambda>)]
            +- Read[ReadRange]
            <BLANKLINE>
            -------- Logical Plan (Optimized) --------
            MapRows[Map(<lambda>)]
            +- Read[ReadRange]
            <BLANKLINE>
            -------- Physical Plan --------
            TaskPoolMapOperator[Map(<lambda>)]
            +- TaskPoolMapOperator[ReadRange]
               +- InputDataBuffer[Input]
            <BLANKLINE>
            -------- Physical Plan (Optimized) --------
            TaskPoolMapOperator[ReadRange->Map(<lambda>)]
            +- InputDataBuffer[Input]
            <BLANKLINE>
        N)r  r   explainr   s    r   r  zDataset.explain  s'    F 	dj  ""#####r   c                 X    | j                                                                         S r   )r   r  r  r   s    r   r  zDataset._get_stats_summary  s"    z!!,,...r   c                 h    | j                                         \  }}}|                                  |S )a  Get an iterator over ``RefBundles``
        belonging to this Dataset. Calling this function doesn't keep
        the data materialized in-memory.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(1)
            >>> for ref_bundle in ds.iter_internal_ref_bundles():
            ...     for block_ref, block_md in ref_bundle.blocks:
            ...         block = ray.get(block_ref)

        Returns:
            An iterator over this Dataset's ``RefBundles``.
        )r   execute_to_iteratorr  )r   iter_ref_bundlesr  s      r   ro  z!Dataset.iter_internal_ref_bundles  s7    " "&!?!?!A!A!Q&&(((r   c                     t                               d           | j                                        j        }|                                  |S )a  Get a list of references to the underlying blocks of this dataset.

        This function can be used for zero-copy access to the data. It blocks
        until the underlying blocks are computed.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(1)
            >>> ds.get_internal_block_refs()
            [ObjectRef(...)]

        Returns:
            A list of references to this dataset's blocks.
        ze`Dataset.get_internal_block_refs()` is deprecated. Use `Dataset.iter_internal_ref_bundles()` instead.)r  r   r   r  r  r  )r   r  s     r   get_internal_block_refszDataset.get_internal_block_refs  sO    " 	=	
 	
 	
 Z''))4
&&(((r   c                 l    t          d | j        j                                        D                       S )a  Whether this dataset's lineage is able to be serialized for storage and
        later deserialized, possibly on a different cluster.

        Only datasets that are created from data that we know will still exist at
        deserialization time, e.g. data external to this Ray cluster such as persistent
        cloud object stores, support lineage-based serialization. All of the
        ray.data.read_*() APIs support lineage-based serialization.

        Examples:

            >>> import ray
            >>> ray.data.from_items(list(range(10))).has_serializable_lineage()
            False
            >>> ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv").has_serializable_lineage()
            True
        c              3   >   K   | ]}|                                 V  d S r   )is_lineage_serializable)r  r2  s     r   r  z3Dataset.has_serializable_lineage.<locals>.<genexpr>  sB       
 
 &&((
 
 
 
 
 
r   )r  r   r   post_order_iterr   s    r   has_serializable_lineagez Dataset.has_serializable_lineage  sD    $  
 
(,<<>>
 
 
 
 
 	
r   c                    |                                  st          d          | j                                        }t	          j        | j        j                  }t          ||          }|j                                         |                    | 	                                           dt          j        j        fd}t          j        j        j                                        }	 |                    t          j        j        |           t%          j        |          }|                    t          j        j                   n)# |                    t          j        j                   w xY w|S )ak  
        Serialize this dataset's lineage, not the actual data or the existing data
        futures, to bytes that can be stored and later deserialized, possibly on a
        different cluster.

        Note that this uses pickle and will drop all computed data, and that everything
        is recomputed from scratch after deserialization.

        Use :py:meth:`Dataset.deserialize_lineage` to deserialize the serialized
        bytes returned from this method into a Dataset.

        .. note::
            Unioned and zipped datasets, produced by :py:meth`Dataset.union` and
            :py:meth:`Dataset.zip`, are not lineage-serializable.

        Examples:

            .. testcode::

                import ray

                ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
                serialized_ds = ds.serialize_lineage()
                ds = ray.data.Dataset.deserialize_lineage(serialized_ds)
                print(ds)

            .. testoutput::

                Dataset(num_rows=?, schema=...)


        Returns:
            Serialized bytes containing the lineage of this dataset.
        aK  Lineage-based serialization is not supported for this stream, which means that it cannot be used as a tunable hyperparameter. Lineage-based serialization is explicitly NOT supported for unioned or zipped datasets (see docstrings for those methods), and is only supported for datasets created from data that we know will still exist at deserialization time, e.g. external data in persistent cloud object stores or in-memory data from long-lived clusters. Concretely, all ray.data.read_*() APIs should support lineage-based serialization, while all of the ray.data.from_*() APIs do not. To allow this stream to be serialized to storage, write the data to an external store (such as AWS S3, GCS, or Azure Blob Storage) using the Dataset.write_*() APIs, and serialize a new dataset reading from the external store using the ray.data.read_*() APIs.rfc                 F    |                                  \  }}}d |d<   |||fS )N_last_export_session_and_job)
__reduce__)r  reconstructorargsr  s       r   _reduce_remote_fnz4Dataset.serialize_lineage.<locals>._reduce_remote_fnR  s0     *,&M448E01 $--r   )r  r   r   r   r   r   r   clear_snapshotr   r  r  remote_functionRemoteFunctionr  workerglobal_workerget_serialization_context_register_cloudpickle_reducerpickledumps_unregister_cloudpickle_reducer)r   	plan_copylogical_plan_copyr   r  r   
serializeds          r   serialize_lineagezDataset.serialize_lineage  sU   H ,,.. 	L  " J((**	 Idj&>??Y 122
!!!
T^^%%&&&		.#"5"D 		. 		. 		. 		. ,%3MMOO	X11#24E    b))J33C4G4VWWWWG33C4G4VWWWWs   )9E &E-serialized_dsc                 *    t          j        |           S )a  
        Deserialize the provided lineage-serialized Dataset.

        This uses pickle, and assumes that the provided serialized bytes were
        serialized using :py:meth:`Dataset.serialize_lineage`.

        Examples:

            .. testcode::

                import ray

                ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
                serialized_ds = ds.serialize_lineage()
                ds = ray.data.Dataset.deserialize_lineage(serialized_ds)
                print(ds)

            .. testoutput::

                Dataset(num_rows=?, schema=...)

        Args:
            serialized_ds: The serialized Dataset that we wish to deserialize.

        Returns:
            A deserialized ``Dataset`` instance.
        )r  loads)r  s    r   deserialize_lineagezDataset.deserialize_lineageg  s    < |M***r   c                     | j         j        S )z3Return the DataContext used to create this Dataset.)r   r  r   s    r   r   zDataset.context  s     z""r   agg_clsc                 :     | j         ||g|R i |} | j        | S )aa  Helper for aggregating on a particular subset of the dataset.

        This validates the `on` argument, and converts a list of column names
        or lambdas to a multi-aggregation. A null `on` results in a
        multi-aggregation on all columns for an Arrow Dataset, and a single
        aggregation on the entire row for a simple Dataset.
        )_build_multicolumn_aggsrU  )r   r  r7  r  r_  rS  s         r   rO  zDataset._aggregate_on  s7     ,t+GRI$III&IIt~t$$r   )	skip_colsr  c                Z   |\|                      d          }|Dt          |t                    s/sg t          |j                  dk    rfd|j        D             }t          |t
                    s|g}t          |          dk    rt          d          fd|D             S )zaBuild set of aggregations for applying a single aggregation to
        multiple columns.
        NTrH  r   c                     g | ]}|v|	S r  r  )r  rx   r  s     r   r  z3Dataset._build_multicolumn_aggs.<locals>.<listcomp>  s#    NNN#I9M9M#9M9M9Mr   z4At least 1 column to aggregate on has to be providedc                 (    g | ]} |gR i S r  r  )r  on_r  r  r_  s     r   r  z3Dataset._build_multicolumn_aggs.<locals>.<listcomp>  s3    <<<#-d---f--<<<r   )r   r   r   r  r  r  r   )r   r  r7  r  r  r_  r   s    ` ``` r   r  zDataset._build_multicolumn_aggs  s     :[[$[77F!*VT*B*B!  # "Iv|$$q((NNNNNNNB"d## 	Br77a<<STTT<<<<<<<<<<r   resultc                     |Wt          |          dk    rDt          |t                    r|d         S t          |                                          d         S |S )Nr   r   )r  r   r  r  r+  )r   r  s     r   rP  zDataset._aggregate_result  sV    #f++"2"2&%(( 0ay 
 FMMOO,,Q//Mr   
ipywidgets8c                 8   ddl }|                    d| j        j         d          }|                                 }|                    ||g|                    d                    } |j        d	i |}|                    dt          |           i           |S )
a  Return a mimebundle with an ipywidget repr and a simple text repr.

        Depending on the frontend where the data is being displayed,
        different mimetypes are used from this bundle.
        See https://ipython.readthedocs.io/en/stable/config/integrating.html
        for information about this method, and
        https://ipywidgets.readthedocs.io/en/latest/embedding.html
        for more information about the jupyter widget mimetype.

        Args:
            **kwargs: Additional arguments passed to the widget's _repr_mimebundle_ method.

        Returns:
            A mimebundle containing an ipywidget repr and a simple text repr.
        r   Nz<h2>z</h2>z100%)width)layoutz
text/plainr  )
r  HTML	__class__r@  
_tab_repr_VBoxLayout_repr_mimebundle_updaterepr)r   r_  r  titletabwidgetr  s          r   r  zDataset._repr_mimebundle_  s    " 	 Et~'> E E EFFoo%j6G6Gf6G6U6UVV *)33F33d4jj	
 	
 	

 r   c                    ddl m}m} | j                                        |                                 d}|                     d          }|$t          d                              d          }nt          |t                    rGt          d                              d	t          j        t          |                     d
          }ni }t          |j        |j                  D ]&\  }}t#          |dt          |                    ||<   't          d                              t%          |                                ddddg          d          }g }	|	                     |t          d                              t%          |                                ddddg          d                               |	                     ||                      ||	ddg          S )Nr   )r  Tab)rM  r
  FrH  zrendered_html_common.html.j2z<h5>Unknown schema</h5>)contentz<h5>Data type: <code>z</code></h5>r@  zscrollableTable.html.j2htmlNameType)tabular_datatablefmt	showindexheaders300px)r  
max_heightFieldValueMetadatar  )titles)r  r  r  r   rj  r  r   rn   renderr   r   r  escaper	  r-  r  rl  getattrr   r,  r  )
r   r  r  r  r   schema_reprschema_datasnamestypechildrens
             r   r  zDataset._tab_repr_  s;   (((((((( *7799((**
 

 e44>"#ABBII1 J  KK %% 	"#ABBIIVCKK0H0HVVV J  KK K #FL&, ? ? L Lu%,UJE

%K%KE"""#<==DD!,!2!2!4!4###V,	   # E  K D233::"%-^^%5%5!'"'!(' 2	    ' ;  
 
	
 	
 	
 	[))***s8Z$:;;;;r   c                 @    | j                             | j                  S r   )r   get_plan_as_stringr  r   s    r   __repr__zDataset.__repr__  s    z,,T^<<<r   c                      t          |           S r   )r  r   s    r   __str__zDataset.__str__  s    Dzzr   c                     dS NTr  r   s    r   __bool__zDataset.__bool__  s	     tr   c                      t          d          )NzdUse `ds.count()` to compute the length of a distributed Dataset. This may be an expensive operation.)AttributeErrorr   s    r   __len__zDataset.__len__  s    2
 
 	
r   c                      t          d          )Nz`Dataset` objects aren't iterable. To iterate records, call `ds.iter_rows()` or `ds.iter_batches()`. For more information, read https://docs.ray.io/en/latest/data/iterating-over-data.html.)r  r   s    r   __iter__zDataset.__iter__%  s    K
 
 	
r   c                     t          t                    }g }|                                 D ]4}|j        D ]*}|                    |                    |                     +5t          j        |          S r   )rB   rC   ro  r  r  r  r  r  )r   get_num_rowsr
  rs  r  s        r   _block_num_rowszDataset._block_num_rows,  s~    '6688:: 	@ 	@J'2 @ @	 3 3I > >????@wx   r   c                 4    | j                                         S r   )r   r  r   s    r   r  zDataset._meta_count4  s    z$$&&&r   c                     | j         S r   )rq  r   s    r   r  zDataset._get_uuid7  s
    zr   uuidc                 N    || _         || j        _        || j        j        _        d S r   )rq  r   _dataset_uuid	_in_statsr  )r   r"  s     r   r   zDataset._set_uuid:  s&    
#'
 ,0
)))r   c                 \    | j         r$| j                             d           d| _         dS dS )a  Flush progress bar output by shutting down the current executor.

        This should be called at the end of all blocking APIs (e.g., `take`), but not
        async APIs (e.g., `iter_batches`).

        The streaming executor runs in a separate generator / thread, so it is
        possible the shutdown logic runs even after a call to retrieve rows from the
        stream has finished. Explicit shutdown avoids this, which can clobber console
        output (https://github.com/ray-project/ray/issues/32414).
        TforceN)r   shutdownr   s    r   r  z!Dataset._synchronize_progress_bar?  sC     ! 	* "++$+777%)D"""		* 	*r   c                 R    | j                                         \  }}}|| _        ||fS r   )r   r  r   )r   bundle_iterr  executors       r   r   zDataset._execute_to_iteratorP  s1    '+z'E'E'G'G$UH "*E!!r   c                 ,    | j         | j        | j        dS )N)r   r"  r   )r   rq  r   r   s    r   __getstate__zDataset.__getstate__X  s#     JJ .
 
 	
r   c                 b    |d         | _         |d         | _        |d         | _        d | _        d S )Nr   r"  r   )r   rq  r   r   )r   r  s     r   __setstate__zDataset.__setstate__`  s3    6]
6]
">2!%r   c                     | j         sd S 	 t          0t          j                    r| j                             d           d S d S d S # t          $ r Y d S w xY w)NFr'  )r   r  is_initializedr)  r  r   s    r   __del__zDataset.__del__f  sz    % 	F
	3#5#7#7 &//e/<<<<<	 
  	 	 	DD	s   5A 
AA)FN)NN)r0  NNNr   )Fr  )Nr   T)r  )T)r  )r  TNN)r   rI  )r   rv  )r   r  )r  r  r   r  )r   ru  )r@  
__module____qualname____doc__r@   r,   r   staticmethodboolr   r   r   rl   BT_API_GROUPr   r   r	  r   r   r   r  r   r   r   r   rj   r   r   propertyr   r   r   r\   rX   r   r   r   EXPRESSION_API_GROUPrv   r   rY   r   r
   r  r  r/  r3  rI  SSR_API_GROUPrQ  rH   rT  rW  ri  rI   SMJ_API_GROUPrg   rt  r  r  r  r  r  r  r  r$  r/  rF  GGA_API_GROUPr  rR  rM   rU  r=  rZ  r\  r^  ra  rf   rN   rz  rL  r-  r  CD_API_GROUPr  re  r  r  IM_API_GROUPr  r   rb  rM  r  r  IOC_API_GROUPrb   APPENDra   r  r  r  r  r  r  r   r  r  r  r_   r  r  r  r	  r   CREATEr   r  r  r`   r  r-  r  r{   r  TorchDeviceTyper   r   TorchBatchTyper=  TensorFlowTensorBatchTyper@  rH  rM  ru  r  r  r  rn  rk   ri   r  r  r  rh   r  E_API_GROUPr  r  r  rF   r  r	   r&   ro  rV   r  r  bytesr  r  r^   r   rO  r  r   r[   rP  ro   r  r  r  r  r  r  r  r  r  r  r   r  rE   r   r.  r0  r3  r  r   r   r   r      s5       G GRHH "H H H H. GK: ::#':6>tn:	: : : \: Y&&&
 .2+/.27;:>$($("&SWEIP+ P+ P+d38n%tCH~56P+ /*	P+
 (3-(P+ DcN+P+ &hsm4P+  (S#X7P+ 5/P+ 5/P+ P+ eCsCx%S#:N$NOPP+ %Xb$sCx..@%ABP+ 
P+ P+ P+ '&P+d Z0$???hsm    @?(Xc] ( ( ( ( Z,d;;;x}    <; X (hsm ( ( ( X(+ + + + + Y&&&
 <@-1&/ $+/.27;:>$($("&SW(-EI#F
 F
 F
	9 45F
 #tWY%778	F

 /*F
 smF
 F
 (3-(F
 DcN+F
 &hsm4F
  (S#X7F
 5/F
 5/F
 F
 eCsCx%S#:N$NOPF
  "&!F
" %Xb$sCx..@%AB#F
& 
'F
 F
 F
 '&F
PK+	9 45K+ #tWY%778	K+
 /*K+ smK+ K+ (3-(K+ DcN+K+ &hsm4K+  (S#X7K+ 5/K+ 5/K+ K+ eCsCx%S#:N$NOPK+  "&!K+" %Xb$sCx..@%AB#K+ K+ K+ K+Z Y-AAAC+C+ C+
 
C+ C+ C+ BAC+J Z7888Y&&& '/!%%)q
 q
 q
q
 K
q
 smq
 #q
 c]q
 
q
 q
 q
 '& 98q
f Y&&&
 "&%)7
 7
 7
3i7
 #	7

 c]7
 
7
 7
 7
 '&7
r Y&&&
 04%)N+ N+ N+CcN#N+ sO+,	N+
 c]N+ 
N+ N+ N+ '&N+` Y&&&
 TX	{+ {+ {+T#YS#X./{+ eCsCx%S#:N$NOP	{+ {+ {+ '&{+z Y&&& .2+/.27;:>$($("&SWEIJ+ J+ J+cNE$tCH~"6S#X"FGG
J+ /*J+ (3-(J+ DcN+J+ &hsm4J+  (S#X7J+ 5/J+ 5/J+ J+ eCsCx%S#:N$NOPJ+ %Xb$sCx..@%ABJ+" 
#J+ J+ J+ '&J+X Y&&& CG+/D+
 04+/.27;:>$($("&SWEID+ D+ D+(c3h)=>?D+ uS$Y'(D+
 sO+,D+ (3-(D+ DcN+D+ &hsm4D+  (S#X7D+ 5/D+ 5/D+ D+ eCsCx%S#:N$NOPD+ %Xb$sCx..@%ABD+  
!D+ D+ D+ '&D+L Y''' %)37z+
 $(z+ z+ z+SMz+ $,C=z+
 z+ tCy!z+ z+ 
z+ z+ z+ ('z+x Y''' #$(	.+ .+ .+ sm.+ SM	.+ 
.+ .+ .+ (' [.+` Y''' #!+ !+ !+ sm!+ 
	!+ !+ !+ (' [!+F Y&&&8<@
 @
 @
@
(0@
	@
 @
 @
 '&@
D Y'''
 6:eP eP ePeP 	eP
 !k!23eP 
l	eP eP eP (' ^ePN Y'''',TX` ` `` $`>FtCy>Q`	#	$` ` ` (' ^`D Y'''KS	 Kd;P6Q K K K (' ^KZ Y'''M4;M4	#	$M4 M4 M4 (' ^M4^ Y'''
 ""&C@ C@ C@e$C@ 	C@
 smC@ 3-C@ 
;	<C@ C@ C@ (' ^C@J%!%!(-c5j(9%!EH%!	;	<%! %! %! %!N5 T     )     , YM:::
 19%)"N! N! N!N! ,-	N!
 c]N! smN! 
#	$N! N! N! ;:N!` Y'''+
DO +
	 +
 +
 +
 ('+
Z Y''' !)-%)&*l< .2?C!&l< l< l<l< l< 	l<
 #Jl< 5:&l< c]l< sml< &c]l< %-T#s(^$<l< l< 
l< l< l< (' [l<\ Y''' )-:E :E3S	4'(:E !:E 
	:E :E :E (' [:Ex Y'''(+ (+S (+ (+c (+ (+ (+ (' ^ [(+T Y'''$0{ $0uS$sCx.5H/I $0 $0 $0 (' ^ [$0L Y'''OS'+ '+5d3i01'+HL'+	sDcN"	#'+ '+ '+ (' ^ ['+R Y'''OS'+ '+5d3i01'+HL'+	sDcN"	#'+ '+ '+ (' ^ ['+R Y'''OS'+ '+5d3i01'+HL'+	sDcN"	#'+ '+ '+ (' ^ ['+R Y'''OS'+ '+5d3i01'+HL'+	sDcN"	#'+ '+ '+ (' ^ ['+R Y''' /3!	6+ 6+U3S	>*+6+ 6+ 	6+
 
sDcN"	#6+ 6+ 6+ (' ^ [6+p Y'::: (, D
 D
$s)$D
 %-8SE4+>$>??@%
D
 
D
 D
 D
 ;: ^ [D
L Y''' /4.2	D+ D+3S	>"D+ $T
*+D+ sEz*+	D+
 
D+ D+ D+ (' [D+L Y''''+$y/ '+i '+ '+ '+ (''+R Y&&&+3 +9 + + + '&+4 Y&&& "8EN8 8 885=c]8	8 8 8 '& ^8t Y&&&/ /# /tDcN'; / / / '& ^/b Y&&&& &hsm &tDcN7K & & & '& ^&P Y&&& # t    '& ^6 ^'  
 Y&&&*s * * * '& *X ^$G"	   Y&&&& &t &x7I & & & '& &P ^$G"	   Y&&&  c8K    '& 6 Y&&&
C 
 
 
 '&
$ Y&&&3C 3 3 3 '& ^3, Y&&&3T#Y 3 3 3 '& ^3 Y'''
 /38<#;?8<HL+/+/*.%)+/!H
 H
 H
H
 !c+	H

 45H
 H
 !)c3h 8H
 $$45H
  (T#s(^1C(DEH
 $C=H
 $C=H
 c3hH
 c]H
 $C=H
 H
" 
#H
 H
 H
 (' ^H
T Y'''
 9=#;?8<FJ+/*.%)+/!z
 z
 z
z
 45	z

 z
 !)c3h 8z
 $$45z
 &hr4S>/A&BCz
 $C=z
 c3hz
 c]z
 $C=z
 z
 
z
 z
 z
 (' ^z
x YM::: 488<#?-12659*.%)q
 q
q
 !c3h0q
 &d38n5	q

 q
 #6*q
  S#X/q
 #4S>2q
 c3hq
 c]q
 
q
 q
 q
 ;: ^q
f YM:::
 !	J
 9=#;?8<*.%)!J
 J
 J
J
 J
 	J
 45J
 J
 !)c3h 8J
 $$45J
 c3hJ
 c]J
 J
 
J
 J
 J
 ^ ;:J
X Y'''
 9=#;?8<DH+/*.%)+/!x
 x
 x
x
 45	x

 x
 !)c3h 8x
 $$45x
 $HRc3h-?$@Ax
 $C=x
 c3hx
 c]x
 $C=x
 x
 
x
 x
 x
 (' ^x
t Y'''
 488<#;?8<+/*.%)+/!i
 i
 i
i
 /0	i

 45i
 i
 !)c3h 8i
 $$45i
 $C=i
 c3hi
 c]i
 $C=i
 i
 
i
 i
 i
 (' ^i
V YM:::
 9=#;?8<+/*.>B%)+/!^
 ^
 ^
^
 45	^

 ^
 !)c3h 8^
 $$45^
 $C=^
 c3h^
 %c8T 9:;^
 c]^
 $C=^
 ^
 
^
 ^
 ^
 ;: ^^
@ Y''' 9=#;?8<+/*.%)+/!a
 a
 a
a
 	a

 45a
 a
 !)c3h 8a
 $$45a
 $C=a
 c3ha
 c]a
 $C=a
 a
 
a
 a
 a
 (' ^a
F 
 59%)A
 A
A
 %R^4A
 "$sCx.1	A

 c]A
 
A
 A
 A
 ^A
F  +/%)8
 8
 8
8
  #8

 c3h8
 c]8
 8
 8
 ^8
t YM::: +/%)G
 G
G
 G
 	G

 c3hG
 c]G
 
G
 G
 G
 ^ ;:G
R 
  *.*.%)G
 G
G
 G
 	G

 "$G
 c3hG
 c]G
 
G
 G
 G
 ^G
R  "-14826<@/3*.%)B
 B
 B
B
 B

 B
 )*B
 "$sCx.1B
  S#X/B
 !!89B
  (}B
 c3hB
 c]B
 
B
 B
 B
 ^B
H 
 .29A!,!1.248*.%)2
 2
 2
2
 )*	2

 562
 2
 2
 'sm2
 "$sCx.12
 c3h2
 c]2
 
2
 2
 2
 ^2
h ^.///
 +/%)N N NN c3h	N
 c]N 
N N N 0/N` ^U	   Y&&&&, & & & '& & Y&&&+8DcN3 + + + '& ^+$ Y&&& !"$'&/37,0EIC
 C
 C
 C
 SM	C

 smC
 C
 $,C=C
 %SMC
 h	{L'@ABC
 
)	C
 C
 C
 '& ^C
J Y&&& !"$'KO:@PT37,0g
 g
 g
 g
 SM	g

 }d33E.FFGHg
 owv67g
 XtCO'<&=|&KLMg
 g
 $,C=g
 %SMg
 
.	!g
 g
 g
 '& ^g
R  !"$'SW37,0T
 T
 T
 T
 SM	T

 0$s<M7M2NNOPT
 T
 $,C=T
 %SMT
 
+	,T
 T
 T
 Z ^T
l Y''' 59 !37,0LPJNOSP
 P
 P
sDI~.P
 S$s)^,P

 "#tCy.1P
 P
 P
 P
 $,C=P
 %SMP
 !S-5G0H!HIP
 }d33E.FFGP
 $M4]8J3K$KLP
 
P
 P
 P
 (' ^P
d ^.///Y'''+ + + (' 0/+  ^.///Y'''  s scNSM#J
s s 
$s s s (' 0/sj ^.///Y'''W W W (' 0/W@ ^.///Y'''0 0 0 (' 0/04 ^.///Y'''U U U (' 0/U0 ^.///Y'''0: 0:s 0:.@ 0: 0: 0: (' 0/0:d ^.///Y/A%B C    \ 0/< )-! ! !!#!	i
#	$! ! ! \!F ^.///&FtIo$>? &F &F &F \ 0/&FP ^G$$$ &*G GG c]G 
	G G G %$G: ^O$???Y%%%/ / / &% @?/b Y&&&"5s "5 "5 "5 '&"5H Y999"$ "$ :9"$H/$7 / / / / ^K((( 8I+>       \ )( ( ^K(((i.>)?    )( Z. 
$ 
 
 
 \
, N5 N N N \N` +5 +Y + + + \ \+< # # # # \ X#%%!)%T#Y*?!@% % % %$ *.= = == U3S	>*+=
 DI&= = = =8
eWn(= 
! 
 
 
 
 s+,,  -,@/< /< /<b=# = = = =    $    

 
 
 
 

 
 
!c ! ! ! !'Xc] ' ' ' '3    1c 1d 1 1 1 1
* * *""eHY,?,M&N " " " "
 
 
& & &    r   r   c                       e Zd ZdZdefdZdS )ru  aI  A Dataset materialized in Ray memory, e.g., via `.materialize()`.

    The blocks of a MaterializedDataset object are materialized into Ray object store
    memory, which means that this class can be shared or iterated over by multiple Ray
    tasks without re-executing the underlying computations for producing the stream.
    r   c                 4    | j                                         S )aZ  Return the number of blocks of this :class:`MaterializedDataset`.

        Examples:
            >>> import ray
            >>> ds = ray.data.range(100).repartition(10).materialize()
            >>> ds.num_blocks()
            10

        Time complexity: O(1)

        Returns:
            The number of blocks of this :class:`Dataset`.
        )r   rj  r   s    r   rM  zMaterializedDataset.num_blocks  s     z,,...r   N)r@  r4  r5  r6  r   rM  r  r   r   ru  ru  w  s9         /C / / / / / /r   ru  beta)r   c                       e Zd ZdZddded         dee         fdZede	e
         fd	            Zede	eee         d
f                  fd            Zd Zd ZdS )r  zaDataset schema.

    Attributes:
        base_schema: The underlying Arrow or Pandas schema.
    Nr  ro  )zpyarrow.lib.Schemar?   r  c                l    || _         |p$t          j        t          j                              | _        dS )z
        Initialize a :class:`Schema` wrapper around an Arrow or Pandas schema.

        Args:
            base_schema: The underlying Arrow or Pandas schema.
            data_context: The data context to use for this schema.
        N)ro  r   deepcopyr^   r^  r  )r   ro  r  s      r   r   zSchema.__init__  s1     ' %Pk6M6O6O(P(Pr   r   c                     | j         j        S )z"Lists the columns of this Dataset.)ro  r  r   s    r   r  zSchema.names  s     %%r   zpyarrow.lib.DataTypec           	        	 ddl 	ddlddlm ddlm}m} dt          t          j	        	j
        f         dj        f	fd}t          | j        j        j                  rt!          | j        j                  S g }| j        j        D ]}t          ||          rJ| j        j        rt(          }n|}|                     ||j         ||j                                       \	 |                     ||                     |# j        $ r |                    t2                     Y t4          $ r6 t6                              d	| d
           |                    d           Y w xY w|S )zuLists the types of this Dataset in Arrow format

        For non-Arrow compatible types, we return "object".
        r   N)BaseMaskedDtype)ArrowTensorTyperY  r\  r   c                     t          | j                  r| j        S t          | j                  r                                S t          |           r| j        }                     |           S r   )r   
ArrowDtypepyarrow_dtypeStringDtypestringnumpy_dtypefrom_numpy_dtype)r\  rQ  r   r   s    r   _convert_to_pa_typez)Schema.types.<locals>._convert_to_pa_type  sr     %// ***E2>22 *yy{{"E?33 *)&&u---r   )shaper\  zError converting dtype z
 to Arrow.)r   r   pandas.core.dtypes.dtypesrQ  rk  rR  rY  r   r   r\  rT  rf   r   ro  libr  r  rl  r  use_arrow_tensor_v2r   r  _shape_dtypeArrowNotImplementedErrorobjectr$  r  	exception)
r   rR  rY  rZ  arrow_typesr\  pa_tensor_type_classrQ  r   r   s
          @@@r   rl  zSchema.types  s    	======DDDDDDDD
	.2=/AB
	.[
	. 
	. 
	. 
	. 
	. 
	. 
	. 
	. d&66 	0(.///%+ 	- 	-E%-- -=4 ;+<((+:( ""((#l11%,??     -&&':':5'A'ABBBB2 / / /&&v.....  - - -$$%Pu%P%P%PQQQ&&t,,,,,- s   +D

$E/0<E/.E/c                 l    t          |t                    o|j        | j        k    o|j        | j        k    S r   )r   r  rl  r  )r   r%  s     r   __eq__zSchema.__eq__  s6    uf%% *tz)*tz)	
r   c                    t          d | j        D             t          d          gz             }d}d}|d||z   t          d          z
  z  z  }|dz  }|dt          d          z  z  }|d||z   t          d          z
  z  z  }|dt          d          z  dz   z  }t          | j        | j                  D ]-\  }}||z  }|d||z   t          |          z
  z  z  }|| dz  }.|                                }|S )	Nc                 ,    g | ]}t          |          S r  )r  )r  r   s     r   r  z#Schema.__repr__.<locals>.<listcomp>  s    ===$CII===r   Columnr9   zType
-r  
)r\  r  r  r-  rl  rstrip)r   column_widthpaddingr  r   r   s         r   r  zSchema.__repr__  s   ==$*===XOPP#,0CMMABB(#H%%#,0CMMABB#F#d**dj$*55 	" 	"JD$dNFclW4D		ABBFkkk!FFr   )r@  r4  r5  r6  r   r   r^   r   r:  r
   r	  r  r   rb  rl  rg  r  r  r   r   r  r    s          /3	Q Q QDEQ {+	Q Q Q Q& &tCy & & & X& 2tE$v,0F"FGH 2 2 2 X2h
 
 
    r   r  r  r   rO  c                 R    t          j        |           } |                                 S r   )rW   r  rn  r  s    r   rW  rW    s"    #E**E??r   r   c                 T    t          j        |           } |                     |          S r   )rW   r  r  )r  r   s     r   r  r    s$    #E**E>>&!!!r   c                 R    t          j        |           } |                                 S r   )rW   r  to_arrowrr  s    r   r  r    s"    #E**E>>r   )r  r   r  r  loggingr  r   typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   r   r   r  ray.cloudpicklecloudpickler  ray._common.usager   )ray._private.thirdparty.tabulate.tabulater   $ray.air.util.tensor_extensions.arrowr   r   r  r   /ray.data._internal.datasource.bigquery_datasinkr   1ray.data._internal.datasource.clickhouse_datasinkr   r   r   *ray.data._internal.datasource.csv_datasinkr   .ray.data._internal.datasource.iceberg_datasinkr   ,ray.data._internal.datasource.image_datasinkr   +ray.data._internal.datasource.json_datasinkr   ,ray.data._internal.datasource.lance_datasinkr   ,ray.data._internal.datasource.mongo_datasinkr   ,ray.data._internal.datasource.numpy_datasinkr    .ray.data._internal.datasource.parquet_datasinkr!   *ray.data._internal.datasource.sql_datasinkr"   0ray.data._internal.datasource.tfrecords_datasinkr#   1ray.data._internal.datasource.webdataset_datasinkr$   ray.data._internal.equalizer%   'ray.data._internal.execution.interfacesr&   2ray.data._internal.execution.interfaces.ref_bundler'   !ray.data._internal.execution.utilr(   )ray.data._internal.iterator.iterator_implr*   1ray.data._internal.iterator.stream_split_iteratorr+   %ray.data._internal.logical.interfacesr,   8ray.data._internal.logical.operators.all_to_all_operatorr-   r.   r/   r0   3ray.data._internal.logical.operators.count_operatorr1   8ray.data._internal.logical.operators.input_data_operatorr2   2ray.data._internal.logical.operators.join_operatorr3   r   r4   r5   r6   r7   r8   r9   3ray.data._internal.logical.operators.n_ary_operatorr,  r:   r   r;   =ray.data._internal.logical.operators.streaming_split_operatorr<   3ray.data._internal.logical.operators.write_operatorr=   rf  r>   r?   ray.data._internal.planr@   2ray.data._internal.planner.exchange.sort_task_specrA   ray.data._internal.remote_fnrB   ray.data._internal.splitrC   rD   ray.data._internal.statsrE   rF   rG   ray.data._internal.utilrH   rI   rJ   rK   rL   ray.data.aggregaterM   rN   rO   rP   rQ   rR   rS   rT   ray.data.blockrU   rV   rW   rX   rY   rZ   r[   r\   r]   ray.data.contextr^   ray.data.datasourcer_   r`   ra   rb   ray.data.datasource.datasinkrc   rd   !ray.data.datasource.file_datasinkre   ray.data.datatyperf   ray.data.iteratorrg   ray.data.random_access_datasetrh   	ray.typesri   ray.util.annotationsrj   rk   rl   ray.util.scheduling_strategiesrm   ray.widgetsrn   ray.widgets.utilro   rK  rc  marsmodinr   r   pyspark
tensorflowtftorchtorch.utils.datatensorflow_metadata.proto.v0rp   rq   rr   rJ  rt   rm  ru   r   rv   rw   rx   	getLoggerr@  r  r  r	  TensorflowFeatureTypeSpecrF  r{   rE  r   rD  r9  r<  r=  r>  r?  rA  r@  rG  r;  r   ru  r  rW  r  r  r  r   r   <module>r     s	                                              "     



             ' ' ' ' ' ' > > > > > >        7 6 6 6 6 6 L L L L L L         
 C B B B B B J J J J J J F F F F F F D D D D D D F F F F F F F F F F F F F F F F F F J J J J J J B B B B B B M M M M M M P P P P P P 1 1 1 1 1 1 = = = = = =      < ; ; ; ; ; F F F F F F U U U U U U = = = = = =            F E E E E E N N N N N N C C C C C C                       K J J J J J X X X X X X E E E E E E Q Q Q Q Q Q Q Q 1 1 1 1 1 1 F F F F F F 9 9 9 9 9 9 E E E E E E E E U U U U U U U U U U             	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 ) ( ( ( ( ( P P P P P P P P P P P P P P P P P P P P ; ; ; ; ; ; & & & & & & * * * * * * > > > > > >       D D D D D D D D D D I I I I I I             / / / / / / .KKKKKKKKKLLLMMMNNNNNNLLL777777KKKKKKKK111111------ 4 4 4 4 4 4 4 4 4 4		8	$	$ ? !4&S--?(@@  "+tC4D/E"EF w~&&tC/0,>?^S01
 '761$$$  |f |f |f |f |f |f |f |f~M / / / / /'71: / / /2 Vm m m m m m m m` "4    
"U "HSM " " " "
5      r   