
    &`i	             ,       F,   d dl Z d dlZd dlZd dlZd dlmZmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZF d dlGmHZH d dlImJZJ d dlKmLZL d dlMmNZN d dlOmPZP d dlQmRZR d d lSmTZT d d!lUmVZVmWZWmXZXmYZYmZZZ d d"l[m\Z\ d d#l]m^Z^ d d$l_m`Z` d d%lambZb d d&lcmdZdmeZemfZfmgZgmhZh d d'limjZjmkZkmlZl d d(lmmnZn d d)lompZpmqZq d d*lrmsZsmtZtmuZumvZv d d+lwmxZx d d,lymzZzm{Z{ d d-l|m}Z}m~Z~ d d.lmZ d d/lmZ d d0lmZmZ d d1lmZ er>d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd d2lmZ d d3lmZ d d4lGmZ  ed5          Z ej        e          Zed6e	ej         fd7            Zed8dd9d:e	e         d;ed<ee         d=eqfd>            Zed8ddd?d@ed;edAee         d<ee         d=epf
dB            ZedCd8dddDd@edEed;edAee         d<ee         d=epfdF            Zeed8dddddddGdHeud;edIee         dJee         dKee         dLeeef         dAee         d<ee         d=epfdM                        Z edNO          dddddPdPej        ddddddddQdReee	e         f         dSedT         dUeeeef                  dVeev         dWee         dXedYedZee	e                  d[ee
d\         df         dAee         d<ee         dIee         dJee         dKee         dLeeeef                  fd]            Z edNO          dddddPdPdPeNj        dddddddd^dReee	e         f         dSedT         dUeeeef                  dVeev         dWee         dXed_edYedZee	e                  d[ee
d\         df         dAee         d<ee         dIee         dJee         dKee         dLeeeef                  f d`            Z edNO          ddd8ddddddda	dbedceddedeee	e                  dfedg         d;edIee         dJee         dKee         dLeeef         dAee         d<ee         d=epfdh            Z edNO          	 	 d
d8dddddddGdiedjee         dkee         d;edIee         dJee         dKee         dLeeef         dAee         d<ee         d=epfdl            Zeddd8ddddddd edm          ddPe@j        dddndReee	e         f         dSedT         doee	e                  d;edIee         dJee         dKee         dLeeef         dpeeeeej        eedqf         f         f                  dree~         dVeev         dWee         d[eee
d\         ezf                  dXedZee	e                  dAee         d<ee         d=epf$ds            Z edtO          dd8dddddddddddPdPde-j        dddudReee	e         f         dSedT         d;edIee         dJee         dKee         drees         dLeeef         dveeeef                  dVeev         dWedweeeef                  dxee         dXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf(dy            ZedPdd8ddddddd edm          dPdPde0dddzdReee	e         f         d{edSedT         d;edIee         dJee         dKee         dLeeef         dUeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf&d|            Zedd8ddddddd edm          dPdPddddd}dReee	e         f         dSedT         d;edIee         dJee         dKee         dLeeef         dUeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf$d~            Zedddd8dddddddddPdPddddddReee	e         f         dededSedT         d;edIee         dJee         dKee         dLeeeef                  dUeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf(d            Zedd8dddddddddPdPddddd}dReee	e         f         dSedT         d;edIee         dJee         dKee         dLeeeef                  dUeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf$d            Zedd8dddddPdPde>j        ddddReee	e         f         dSedT         d;edUeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epfd            Z edNO          dd8ddddddddPdPddddddddReee	e         f         dSedT         d;edIee         dJee         dKee         dLeeef         dUeeeef                  drees         dVeev         dXedYeded         d[eee
d\         ezf                  dZee	e                  dAee         d<ee         ded         d=epf&d            Z edNO          dddddd8ddddddddPdPddddddReee	e         f         deee	e         ee         f                  deeeeef         e:f                  deee	e         ee         f                  dedSedT         d;edIee         dJee         dKee         dLeeeef                  drees         dVeev         dWedXedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf*d            Z edNO          dd8ddddddddPddPddddPddReee	e         f         dSedT         d;edUeeeef                  drees         dVeev         deeeeeef                  deeeef                  deeeef                  deeeef                  ded[eee
d\         ezf                  dXedZee	e                  dAee         d<ee         ded=epf$d            ZedPdd8dddddddddPddddddReee	e         f         dXedSedT         d;edIee         dJee         dKee         dLeeef         dUeeeef                  drees         dVeev         dWedYed[eee
d\         ezf                  dZee	e                  dAee         d<ee         d=epf$d            Z edNO          ddd8ddddddd	dedeg etf         deee                  ded;edIee         dJee         dKee         dLeeeef                  dAee         d<ee         d=epfd            Z edNO          dd8ddddddddedeeef         deee                  d;edIee         dJee         dKee         dLeeef         dAee         d<ee         d=epfd            Z edNO          ddddd8ddddddddedee         dkee         dee         dfee         d;edIee         dJee         dKee         dLeeeef                  dAee         d<ee         d=epfd            Z edNO          ddddddddddd
dededee	eeeef                           deeeef                  deeeef                  dIee         dJee         dKee         dLeeeef                  dAee         d<ee         d=epfd            Zeddd=epfd            Zeddd=eqfd            Zeddd=eqfd            Zeddd=eqfd            Ze	 ddede	d         f         d<ee         d=eqfd            Zedeed         e	ed                  f         d=eqfd            Zedeej        e	ej                 f         d=eqfdÄ            Zedeeej                 e	eej                          f         d=eqfdĄ            ZeddŜdedee	edef                  f         d<ee         d=eqfdȄ            Zedeeedef                  e	eedef                           f         d=eqfdɄ            Z edNO          dddddddddddʜ
dedee         dee         dee         dee         dLeeeef                  dIee         dJee         dKee         dAee         d<ee         d=epfdЄ            Zeddd9ddd;ee         d<ee         d=eqfd҄            Ze	 	 	 ddjed         d;edAee         d<ee         d=eeqepf         f
dԄ            Zedjdd=eqfdք            Ze	 ddjdded=epfdل            Zedd8ddddddddddۜdedeedf         d;edeedqf         dee         deeeef                  deeeef                  dLeeeef                  dIee         dJee         dKee         d<ee         d=epfd            Zedddddddddddddbedeeeef                  doee	e                  dee         deeeef                  deeeef                  dLeeeef                  dIee         dJee         dKee         dAee         d<ee         d=epfd            Z edNO          dddddddddddddededoee	e                  dee         deee	e         ef                  deeeef                  deeeef                  dLeeeef                  dIee         dJee         dKee         dAee         d<ee         d=epfd            Z edNO          dddddedededee         dee         dee         d=epfd            Z edNO          	 dddd8dddddd edm          ddPddddeee	e         f         dee         dSedT         doee	e                  d;edIee         dJee         dKee         dLeeeef                  dree~         dVeev         dWee         d[ee
d\         df         dXedAee         d<ee         f d            Z edNO          ddddddddddd
deee	e         f         deee	e         f         de
d         deee
d         f         deee
d         f         d ee4         dIee         dJee         dKee         dLeeeef                  d<ee         ded=epfd            ZՐdeudended=eeuexf         fdZ	 ddpeeeeej        eedqf         f         f                  d=eeef         fdZ	 	 dd;ed<ee         d=efdZdrees         d=dfd	ZdS (      N)TYPE_CHECKINGAnyCallableDictListLiteralOptionalSetTupleTypeVarUnion)parse)get_pyarrow_version)wrap_auto_init)_create_possibly_ragged_ndarray)AudioDatasource)AvroDatasource)BigQueryDatasource)BinaryDatasource)ClickHouseDatasource)CSVDatasource)DeltaSharingDatasource)HudiDatasource)ImageDatasourceImageFileMetadataProvider)JSON_FILE_EXTENSIONSArrowJSONDatasourcePandasJSONDatasource)KafkaAuthConfigKafkaDatasource)LanceDatasource)MCAPDatasource	TimeRange)MongoDatasource)NumpyDatasource)ParquetDatasource)RangeDatasource)SQLDatasource)TextDatasource)TFRecordDatasource)TorchDatasource)UnityCatalogConnector)VideoDatasource)WebDatasetDatasource)DelegatingBlockBuilder)LogicalPlan)	FromArrow
FromBlocks	FromItems	FromNumpy
FromPandas)Read)ExecutionPlan)cached_remote_fn)DatasetStats)_autodetect_parallelismget_table_block_metadata_schema"merge_resources_to_ray_remote_argsndarray_to_blockpandas_df_to_arrow_block)BlockBlockExecStatsBlockMetadataWithSchema)DataContext)DatasetMaterializedDataset)BaseFileMetadataProvider
Connection
DatasourcePathPartitionFilter)Reader)FileShuffleConfig_validate_shuffle_arg)DefaultFileMetadataProviderFileMetadataProvider)Partitioning)	ObjectRef)DeveloperAPI	PublicAPI)NodeAffinitySchedulingStrategy)BooleanExpression)
schema_pb2)TFXReadOptionsTblocksc                 $   d | D             }d | D             }t          ||          }t          t          d|id          t          j                                                              }t          ||j                  }t          ||          S )a  Create a :class:`~ray.data.Dataset` from a list of blocks.

    This method is primarily used for testing. Unlike other methods like
    :func:`~ray.data.from_pandas` and :func:`~ray.data.from_arrow`, this method
    gaurentees that it won't modify the number of blocks.

    Args:
        blocks: List of blocks to create the dataset from.

    Returns:
        A :class:`~ray.data.Dataset` holding the blocks.
    c                 6    g | ]}t          j        |          S  rayput.0blocks     e/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/read_api.py
<listcomp>zfrom_blocks.<locals>.<listcomp>   s     555U#'%..555    c                 6    g | ]}t          j        |          S rZ   )rA   
from_blockr^   s     ra   rb   zfrom_blocks.<locals>.<listcomp>   s$    VVVe/:5AAVVVrc   r2   Nmetadataparent)	r2   r7   r9   rB   get_currentcopyr0   _contextrD   )rW   
block_refsmeta_with_schemafrom_blocks_opexecution_planlogical_plans         ra   from_blocksrq      s     65f555JVVvVVV
,<==N"|-=>tLLL!!&&(( N ~~/FGGL  rc   parallelismoverride_num_blocksitemsrt   ru   returnc                   ddl }t          ||          }|dk    rt          d|           t          |t          j                                        t          j                              \  }}}t          t          |           |          }|dk    r!t          t          |           |          \  }}nd\  }}g }g }	|                    |          D ]%}
t          j                    }t                      }|
|z  t          |
|          z   }|
dz   |z  t          |
dz   |          z   }|                    ||          D ]B}| |         }t!          |t"          j        j                  sd|i}|                    |           C|                                }|                    t	          j        |                     |	                    t1          j        ||                                                     't5          ||	          }t7          t9          d|	id	          t          j                                                              }t=          ||j                  }tA          ||          S )
a  Create a :class:`~ray.data.Dataset` from a list of local Python objects.

    Use this method to create small datasets from data that fits in memory. The column
    name defaults to "item".

    Examples:

        >>> import ray
        >>> ds = ray.data.from_items([1, 2, 3, 4, 5])
        >>> ds
        MaterializedDataset(num_blocks=..., num_rows=5, schema={item: int64})
        >>> ds.schema()
        Column  Type
        ------  ----
        item    int64

    Args:
        items: List of local Python objects.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` holding the items.
    r   Nz$parallelism must be -1 or > 0, got: )r   r      item)statsr3   rf   )!builtins_get_num_output_blocks
ValueErrorr:   r\   utilget_current_placement_grouprB   ri   minlendivmodranger@   builderr/   
isinstancecollectionsabcMappingaddbuildappendr]   rA   re   r3   r7   r9   rj   r0   rk   rD   )rv   rt   ru   r|   detected_parallelism_
block_size	remainderrW   rm   ir{   r   block_start	block_endjrz   r`   from_items_opro   rp   s                        ra   
from_itemsr      sr   D OOO(6IJJKaMMMNNN!8,,..!!" "!Q s5zz+?@@a &s5zz3G H H
II $
I &(F68^^011 
 
&(((***ns1i'8'88Uj(3q1ui+@+@@	Y77 	 	A8DdKO$;<< &~KKcgenn%%%#.uEKKMMJJJ	
 	
 	
 	
 f&677M"{,<=dKKK!!&&(( N }n.EFFL  rc   rt   concurrencyru   nr   c                L    t          | dd          }t          ||||          S )a  Creates a :class:`~ray.data.Dataset` from a range of integers [0..n).

    This function allows for easy creation of synthetic datasets for testing or
    benchmarking :ref:`Ray Data <data>`. The column name defaults to "id".

    Examples:

        >>> import ray
        >>> ds = ray.data.range(10000)
        >>> ds
        Dataset(num_rows=10000, schema={id: int64})
        >>> ds.map(lambda row: {"id": row["id"] * 2}).take(4)
        [{'id': 0}, {'id': 2}, {'id': 4}, {'id': 6}]

    Args:
        n: The upper bound of the range of integers.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing the integers from the range 0 to n.

    .. seealso::

        :meth:`~ray.data.range_tensor`
                    Call this method for creating synthetic datasets of tensor data.

    arrowid)r   block_formatcolumn_namer   )r'   read_datasource)r   rt   r   ru   
datasources        ra   r   r      s<    V !17MMMJ/	   rc   )ry   )shapert   r   ru   r   c                h    t          | ddt          |                    }t          ||||          S )a  Creates a :class:`~ray.data.Dataset` tensors of the provided shape from range
    [0...n].

    This function allows for easy creation of synthetic tensor datasets for testing or
    benchmarking :ref:`Ray Data <data>`. The column name defaults to "data".

    Examples:

        >>> import ray
        >>> ds = ray.data.range_tensor(1000, shape=(2, 2))
        >>> ds
        Dataset(
           num_rows=1000,
           schema={data: ArrowTensorTypeV2(shape=(2, 2), dtype=int64)}
        )
        >>> ds.map_batches(lambda row: {"data": row["data"] * 2}).take(2)
        [{'data': array([[0, 0],
               [0, 0]])}, {'data': array([[2, 2],
               [2, 2]])}]

    Args:
        n: The upper bound of the range of tensor records.
        shape: The shape of each tensor in the dataset.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing the tensor data from range 0 to n.

    .. seealso::

        :meth:`~ray.data.range`
                    Call this method to create synthetic datasets of integer data.

    tensordata)r   r   r   tensor_shaper   )r'   tupler   )r   r   rt   r   ru   r   s         ra   range_tensorr   )  sM    f !
(U5\\  J /	   rc   rt   num_cpusnum_gpusmemoryray_remote_argsr   ru   r   r   r   r   r   c                (   t          ||          }t          j                    }	|i }| j        s7t	          t          j                                                    d          |d<   d|vr
|	j        |d<   t          ||||          }t          | |	|          }
t
          j                                        }t          ||	j        t          j                    |
|          \  }}}|
                    |          }t!          dd |D             id          }t#          | |
||rt%          |          nd	||
          }t'          |t          j                                                              }t+          ||j                  }t/          ||          S )a  Read a stream from a custom :class:`~ray.data.Datasource`.

    Args:
        datasource: The :class:`~ray.data.Datasource` to read data from.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        read_args: Additional kwargs to pass to the :class:`~ray.data.Datasource`
            implementation.

    Returns:
        :class:`~ray.data.Dataset` that reads data from the :class:`~ray.data.Datasource`.
    NFsoftscheduling_strategy)placement_groupr6   c                     g | ]	}|j         
S rZ   )rg   )r_   	read_tasks     ra   rb   z#read_datasource.<locals>.<listcomp>  s    JJJ)9-JJJrc   rf   r   )rt   num_outputsr   r   )planrp   )r}   rB   ri   supports_distributed_readsrR   r\   get_runtime_contextget_node_idr   r<    _get_datasource_or_legacy_readerr   r   r:   target_max_block_sizeget_read_tasksr9   r6   r   r7   rj   r0   rk   rC   )r   rt   r   r   r   r   r   ru   	read_argsctxdatasource_or_legacy_readercur_pgrequested_parallelismr   
read_tasksr{   read_opro   rp   s                      ra   r   r   g  s   N )6IJJK

!
#
#C0 
1O#%%11332
 2
 2
-.
 O33141H-.8	 O #C# # X1133F"9!!!## # #1a -;;<QRRJJJzJJJK  E #'18C
OOOq'  G #!!&&(( N w(?@@L!   rc   alpha)	stabilityF)
filesystemarrow_open_stream_argspartition_filterpartitioninginclude_pathsignore_missing_pathsfile_extensionsshuffler   ru   r   r   r   r   pathsr   zpyarrow.fs.FileSystemr   r   r   r   r   r   r   filesc                x    t          | ||t                      ||||||
  
        }t          ||||||	|
          S )a2  Creates a :class:`~ray.data.Dataset` from audio files.

    The column names default to "amplitude" and "sample_rate".

    Examples:
        >>> import ray
        >>> path = "s3://anonymous@air-example-data-2/6G-audio-data-LibriSpeech-train-clean-100-flac/train-clean-100/5022/29411/5022-29411-0000.flac"
        >>> ds = ray.data.read_audio(path)
        >>> ds.schema()
        Column       Type
        ------       ----
        amplitude    ArrowTensorTypeV2(shape=(1, 191760), dtype=float)
        sample_rate  int64

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The pyarrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_file>`_.
            when opening input files to read.
        partition_filter:  A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
            with a custom callback to read only selected partitions of a dataset.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each image. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
            that are not found. Defaults to False.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.

    Returns:
        A :class:`~ray.data.Dataset` containing audio amplitudes and associated
        metadata.
    	r   open_stream_argsmeta_providerr   r   r   r   r   r   r   r   r   r   r   ru   )r   rL   r   )r   r   r   r   r   r   r   r   r   r   ru   r   r   r   r   r   s                   ra   
read_audior     sk    Z !/133)!1#'  J '/   rc   )r   r   r   r   r   include_timestampsr   r   r   r   ru   r   r   r   r   r   c                z    t          | ||t                      ||||	|||          }t          ||||||
|          S )a  Creates a :class:`~ray.data.Dataset` from video files.

    Each row in the resulting dataset represents a video frame. The column names default
    to "frame", "frame_index" and "frame_timestamp".

    Examples:
        >>> import ray
        >>> path = "s3://anonymous@ray-example-data/basketball.mp4"
        >>> ds = ray.data.read_videos(path)
        >>> ds.schema()
        Column       Type
        ------       ----
        frame        ArrowTensorTypeV2(shape=(720, 1280, 3), dtype=uint8)
        frame_index  int64

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The pyarrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_file>`_.
            when opening input files to read.
        partition_filter:  A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
            with a custom callback to read only selected partitions of a dataset.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each image. File paths are
            stored in the ``'path'`` column.
        include_timestmaps: If ``True``, include the frame timestamps from the video
            as a ``'frame_timestamp'`` column.
        ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
            that are not found. Defaults to False.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.

    Returns:
        A :class:`~ray.data.Dataset` containing video frames from the video files.
    )
r   r   r   r   r   r   r   r   r   r   r   )r-   rL   r   )r   r   r   r   r   r   r   r   r   r   r   ru   r   r   r   r   r   s                    ra   read_videosr   3  sn    X !/133)!1#-'  J '/   rc   )	pipelineschemart   r   r   r   r   r   ru   uridatabase
collectionr   r   zpymongoarrow.api.Schemac       	   
      T    t          d| ||||d|}t          ||||||	|
|          S )a  Create a :class:`~ray.data.Dataset` from a MongoDB database.

    The data to read from is specified via the ``uri``, ``database`` and ``collection``
    of the MongoDB. The dataset is created from the results of executing
    ``pipeline`` against the ``collection``. If ``pipeline`` is None, the entire
    ``collection`` is read.

    .. tip::

        For more details about these MongoDB concepts, see the following:
        - URI: https://www.mongodb.com/docs/manual/reference/connection-string/
        - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/
        - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/

    To read the MongoDB in parallel, the execution of the pipeline is run on partitions
    of the collection, with a Ray read task to handle a partition. Partitions are
    created in an attempt to evenly distribute the documents into the specified number
    of partitions. The number of partitions is determined by ``parallelism`` which can
    be requested from this interface or automatically chosen if unspecified (see the
    ``parallelism`` arg below).

    Examples:
        >>> import ray
        >>> from pymongoarrow.api import Schema # doctest: +SKIP
        >>> ds = ray.data.read_mongo( # doctest: +SKIP
        ...     uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501
        ...     database="my_db",
        ...     collection="my_collection",
        ...     pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
        ...     schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
        ...     override_num_blocks=10,
        ... )

    Args:
        uri: The URI of the source MongoDB where the dataset is
            read from. For the URI format, see details in the `MongoDB docs <https:/                /www.mongodb.com/docs/manual/reference/connection-string/>`_.
        database: The name of the database hosted in the MongoDB. This database
            must exist otherwise ValueError is raised.
        collection: The name of the collection in the database. This collection
            must exist otherwise ValueError is raised.
        pipeline: A `MongoDB pipeline <https://www.mongodb.com/docs/manual/core            /aggregation-pipeline/>`_, which is executed on the given collection
            with results used to create Dataset. If None, the entire collection will
            be read.
        schema: The schema used to read the collection. If None, it'll be inferred from
            the results of pipeline.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        mongo_args: kwargs passed to `aggregate_arrow_all() <https://mongo-arrow            .readthedocs.io/en/latest/api/api.html#pymongoarrow.api            aggregate_arrow_all>`_ in pymongoarrow in producing
            Arrow-formatted results.

    Returns:
        :class:`~ray.data.Dataset` producing rows from the results of executing the pipeline on the specified MongoDB collection.

    Raises:
        ValueError: if ``database`` doesn't exist.
        ValueError: if ``collection`` doesn't exist.
    )r   r   r   r   r   r   r   r   rt   r   r   ru   rZ   )r$   r   )r   r   r   r   r   rt   r   r   r   r   r   ru   
mongo_argsr   s                 ra   
read_mongor     sh    v !    J '/	 	 	 	rc   
project_iddatasetqueryc          
      T    t          | ||          }
t          |
|||||||	          S )a{  Create a dataset from BigQuery.

    The data to read from is specified via the ``project_id``, ``dataset``
    and/or ``query`` parameters. The dataset is created from the results of
    executing ``query`` if a query is provided. Otherwise, the entire
    ``dataset`` is read.

    For more information about BigQuery, see the following concepts:

    - Project id: `Creating and Managing Projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`_

    - Dataset: `Datasets Intro <https://cloud.google.com/bigquery/docs/datasets-intro>`_

    - Query: `Query Syntax <https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax>`_

    This method uses the BigQuery Storage Read API which reads in parallel,
    with a Ray read task to handle each stream. The number of streams is
    determined by ``parallelism`` which can be requested from this interface
    or automatically chosen if unspecified (see the ``parallelism`` arg below).

    .. warning::
        The maximum query response size is 10GB.

    Examples:
        .. testcode::
            :skipif: True

            import ray
            # Users will need to authenticate beforehand (https://cloud.google.com/sdk/gcloud/reference/auth/login)
            ds = ray.data.read_bigquery(
                project_id="my_project",
                query="SELECT * FROM `bigquery-public-data.samples.gsod` LIMIT 1000",
            )

    Args:
        project_id: The name of the associated Google Cloud Project that hosts the dataset to read.
            For more information, see `Creating and Managing Projects <https://cloud.google.com/resource-manager/docs/creating-managing-projects>`_.
        dataset: The name of the dataset hosted in BigQuery in the format of ``dataset_id.table_id``.
            Both the dataset_id and table_id must exist otherwise an exception will be raised.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        Dataset producing rows from the results of executing the query (or reading the entire dataset)
        on the specified BigQuery dataset.
    )r   r   r   r   )r   r   )r   r   r   rt   r   r   r   r   r   ru   r   s              ra   read_bigqueryr     sI    R $z7RWXXXJ'/	 	 	 	rc   hive)r   columnsrt   r   r   r   r   tensor_column_schemar   r   r   r   r   r   r   ru   r   r   .r   c                   t          |	           t          |           d|v rt          j        dt          d           t          |fi |}|                    dd          }|                    dd          }|                    dd          }t          | |||||||	|
||||	          }t          ||||||||
          S )a  Creates a :class:`~ray.data.Dataset` from parquet files.


    Examples:
        Read a file in remote storage.

        >>> import ray
        >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
        >>> ds.schema()
        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

        Read a directory in remote storage.

        >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris-parquet/")

        Read multiple local files.

        >>> ray.data.read_parquet(
        ...    ["local:///path/to/file1", "local:///path/to/file2"]) # doctest: +SKIP

        Specify a schema for the parquet file.

        >>> import pyarrow as pa
        >>> fields = [("sepal.length", pa.float32()),
        ...           ("sepal.width", pa.float32()),
        ...           ("petal.length", pa.float32()),
        ...           ("petal.width", pa.float32()),
        ...           ("variety", pa.string())]
        >>> ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet",
        ...     schema=pa.schema(fields))
        >>> ds.schema()
        Column        Type
        ------        ----
        sepal.length  float
        sepal.width   float
        petal.length  float
        petal.width   float
        variety       string

        The Parquet reader also supports projection and filter pushdown, allowing column
        selection and row filtering to be pushed down to the file scan.

        .. testcode::

            import pyarrow as pa

            # Create a Dataset by reading a Parquet file, pushing column selection and
            # row filtering down to the file scan.
            ds = ray.data.read_parquet(
                "s3://anonymous@ray-example-data/iris.parquet",
                columns=["sepal.length", "variety"],
                filter=pa.dataset.field("sepal.length") > 5.0,
            )

            ds.show(2)

        .. testoutput::

            {'sepal.length': 5.1, 'variety': 'Setosa'}
            {'sepal.length': 5.4, 'variety': 'Setosa'}

        For further arguments you can pass to PyArrow as a keyword argument, see the
        `PyArrow API reference <https://arrow.apache.org/docs/python/generated/        pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_.

    Args:
        paths: A single file path or directory, or a list of file paths. Multiple
            directories are not supported.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the ``S3FileSystem`` is
            used. If ``None``, this function uses a system-chosen implementation.
        columns: A list of column names to read. Only the specified columns are
            read during the file scan.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        tensor_column_schema: A dict of column name to PyArrow dtype and shape
            mappings for converting a Parquet column containing serialized
            tensors (ndarrays) as their elements to PyArrow tensors. This function
            assumes that the tensors are serialized in the raw
            NumPy array format in C-contiguous order (e.g., via
            `arr.tobytes()`).
        meta_provider: A :ref:`file metadata provider <metadata_provider>`. Custom
            metadata providers may be able to resolve file metadata more quickly and/or
            accurately. In most cases you do not need to set this parameter.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
            with a custom callback to read only selected partitions of a dataset.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to HIVE partitioning.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
            set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/                python/generated/pyarrow.dataset.Scanner.html                    #pyarrow.dataset.Scanner.from_fragment>`_
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified parquet
        files.
    filterzThe `filter` argument is deprecated and will not supported in a future release. Use `dataset.filter(expr=expr)` instead to filter rows.   
stackleveldataset_kwargsN
_block_udfr   )r   r   to_batch_kwargsr   r   r   r   r   r   r   r   r   r   )	'_emit_meta_provider_deprecation_warningrK   warningswarnDeprecationWarning_resolve_parquet_argspopr&   r   )r   r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   ru   arrow_parquet_argsr   r   r   r   s                         ra   read_parquetr   \  s$   l ,M:::'""" %%%F		
 	
 	
 	
 / 
 
 (++,<dCCN#''d;;J##Hd33F"%*#)!#'  J '/	 	 	 	rc   beta)r   rt   r   r   r   r   r   arrow_open_file_argsr   r   sizemoder   r   r   r   r   ru   r   r   r   c                    t          |           |t                      }t          | |||||||	|
|||          }t          ||||||||          S )a7  Creates a :class:`~ray.data.Dataset` from image files.

    The column name defaults to "image".

    Examples:
        >>> import ray
        >>> path = "s3://anonymous@ray-example-data/batoidea/JPEGImages/"
        >>> ds = ray.data.read_images(path)
        >>> ds.schema()
        Column  Type
        ------  ----
        image   ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)

        If you need image file paths, set ``include_paths=True``.

        >>> ds = ray.data.read_images(path, include_paths=True)
        >>> ds.schema()
        Column  Type
        ------  ----
        image   ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)
        path    string
        >>> ds.take(1)[0]["path"]
        'ray-example-data/batoidea/JPEGImages/1.jpeg'

        If your images are arranged like:

        .. code::

            root/dog/xxx.png
            root/dog/xxy.png

            root/cat/123.png
            root/cat/nsdf3.png

        Then you can include the labels by specifying a
        :class:`~ray.data.datasource.partitioning.Partitioning`.

        >>> import ray
        >>> from ray.data.datasource.partitioning import Partitioning
        >>> root = "s3://anonymous@ray-example-data/image-datasets/dir-partitioned"
        >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
        >>> ds = ray.data.read_images(root, size=(224, 224), partitioning=partitioning)
        >>> ds.schema()
        Column  Type
        ------  ----
        image   ArrowTensorTypeV2(shape=(224, 224, 3), dtype=uint8)
        class   string

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The pyarrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        arrow_open_file_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_file>`_.
            when opening input files to read.
        partition_filter:  A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
            with a custom callback to read only selected partitions of a dataset.
            By default, this filters out any file paths whose file extension does not
            match ``*.png``, ``*.jpg``, ``*.jpeg``, ``*.tiff``, ``*.bmp``, or ``*.gif``.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        size: The desired height and width of loaded images. If unspecified, images
            retain their original shape.
        mode: A `Pillow mode <https://pillow.readthedocs.io/en/stable/handbook/concepts            .html#modes>`_
            describing the desired type and depth of pixels. If unspecified, image
            modes are inferred by
            `Pillow <https://pillow.readthedocs.io/en/stable/index.html>`_.
        include_paths: If ``True``, include the path to each image. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
            that are not found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing tensors that represent the images at
        the specified paths. For information on working with tensors, read the
        :ref:`tensor data guide <working_with_tensors>`.

    Raises:
        ValueError: if ``size`` contains non-positive numbers.
        ValueError: if ``mode`` is unsupported.
    N)r   r   r   r   r   r   r   r   r   r   r   r   )r   r   r   r   )r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   s                       ra   read_imagesr   !  s    R ,M:::133 ##-)!1'  J '/	 	 	 	rc   )linesr   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   c                   t          |	           |r5|||d}|                                D ]\  }}|rt          d| d          |	t                      }	t	          |||	|
|||||	  	        }|r=t
          j        j        j        	                                j
        }t          | fd|i|}nt          | fd|i|}t          ||||||||          S )	aW  Creates a :class:`~ray.data.Dataset` from JSON and JSONL files.

    For JSON file, the whole file is read as one row.
    For JSONL file, each line of file is read as separate row.

    Examples:
        Read a JSON file in remote storage.

        >>> import ray
        >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/log.json")
        >>> ds.schema()
        Column     Type
        ------     ----
        timestamp  timestamp[...]
        size       int64

        Read a JSONL file in remote storage.

        >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/train.jsonl", lines=True)
        >>> ds.schema()
        Column  Type
        ------  ----
        input   <class 'object'>

        Read multiple local files.

        >>> ray.data.read_json( # doctest: +SKIP
        ...    ["local:///path/to/file1", "local:///path/to/file2"])

        Read multiple directories.

        >>> ray.data.read_json( # doctest: +SKIP
        ...     ["s3://bucket/path1", "s3://bucket/path2"])

        By default, :meth:`~ray.data.read_json` parses
        `Hive-style partitions <https://athena.guide/articles/        hive-style-partitioning/>`_
        from file paths. If your data adheres to a different partitioning scheme, set
        the ``partitioning`` parameter.

        >>> ds = ray.data.read_json("s3://anonymous@ray-example-data/year=2022/month=09/sales.json")
        >>> ds.take(1)
        [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        lines: [Experimental] If ``True``, read files assuming line-delimited JSON.
            If set, will ignore the ``filesystem``, ``arrow_open_stream_args``, and
            ``arrow_json_args`` parameters.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `PyArrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
            when opening input files to read.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset.
            By default, this filters out any file paths whose file extension does not
            match "*.json" or "*.jsonl".
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. By default, this function parses
            `Hive-style partitions <https://athena.guide/articles/                hive-style-partitioning/>`_.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to ``FileShuffleConfig``, you can pass a random seed to shuffle
            the input files, e.g. ``FileShuffleConfig(seed=42)``.
            Defaults to not shuffle with ``None``.
        arrow_json_args: JSON read options to pass to `pyarrow.json.read_json <https://            arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html#pyarrow.            json.read_json>`_.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified paths.
    )r   r   arrow_json_args`z&` is not supported when `lines=True`. Nr   target_output_size_bytesr  r   )r   rv   r~   rL   dictr\   r   contextrB   ri   r   r   r   r   )r   r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r  incompatible_paramsparamvaluefile_based_datasource_kwargsr  r   s                            ra   	read_jsonr
    ss   D ,M::: T$&<.
 

 05577 	T 	TLE5 T !RU!R!R!RSSST 355#'/#)!1#'
$ 
$ 
$   
H(4466L 	! *
 
%=
 +
 


 )
 
+
 +
 

 '/	 	 	 	rc   )r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   c                    t          |           |t                      }t          | |||||	|
||||          }t          ||||||||          S )a  Creates a :class:`~ray.data.Dataset` from CSV files.

    Examples:
        Read a file in remote storage.

        >>> import ray
        >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        >>> ds.schema()
        Column             Type
        ------             ----
        sepal length (cm)  double
        sepal width (cm)   double
        petal length (cm)  double
        petal width (cm)   double
        target             int64

        Read multiple local files.

        >>> ray.data.read_csv( # doctest: +SKIP
        ...    ["local:///path/to/file1", "local:///path/to/file2"])

        Read a directory from remote storage.

        >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris-csv/")

        Read files that use a different delimiter. For more uses of ParseOptions see
        https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html  # noqa: #501

        >>> from pyarrow import csv
        >>> parse_options = csv.ParseOptions(delimiter="\t")
        >>> ds = ray.data.read_csv(
        ...     "s3://anonymous@ray-example-data/iris.tsv",
        ...     parse_options=parse_options)
        >>> ds.schema()
        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

        Convert a date column with a custom format from a CSV file. For more uses of ConvertOptions see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html  # noqa: #501

        >>> from pyarrow import csv
        >>> convert_options = csv.ConvertOptions(
        ...     timestamp_parsers=["%m/%d/%Y"])
        >>> ds = ray.data.read_csv(
        ...     "s3://anonymous@ray-example-data/dow_jones.csv",
        ...     convert_options=convert_options)

        By default, :meth:`~ray.data.read_csv` parses
        `Hive-style partitions <https://athena.guide/        articles/hive-style-partitioning/>`_
        from file paths. If your data adheres to a different partitioning scheme, set
        the ``partitioning`` parameter.

        >>> ds = ray.data.read_csv("s3://anonymous@ray-example-data/year=2022/month=09/sales.csv")
        >>> ds.take(1)
        [{'order_number': 10107, 'quantity': 30, 'year': '2022', 'month': '09'}]

        By default, :meth:`~ray.data.read_csv` reads all files from file paths. If you want to filter
        files by file extensions, set the ``file_extensions`` parameter.

        Read only ``*.csv`` files from a directory.

        >>> ray.data.read_csv("s3://anonymous@ray-example-data/different-extensions/",
        ...     file_extensions=["csv"])
        Dataset(num_rows=?, schema=...)

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
            when opening input files to read.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset. By default, no files are filtered.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. By default, this function parses
            `Hive-style partitions <https://athena.guide/articles/                hive-style-partitioning/>`_.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        arrow_csv_args: CSV read options to pass to
            `pyarrow.csv.open_csv <https://arrow.apache.org/docs/python/generated/            pyarrow.csv.open_csv.html#pyarrow.csv.open_csv>`_
            when opening CSV files.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified paths.
    N)
arrow_csv_argsr   r   r   r   r   r   r   r   r   r   )r   rL   r   r   )r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r  r   s                      ra   read_csvr    s    l ,M:::355%/#)!1#'  J '/	 	 	 	rc   zutf-8T)encodingdrop_empty_linesr   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r  r  c                    t          |
           |
t                      }
t          | ||||	|
||||||          }t          ||||||||          S )ae  Create a :class:`~ray.data.Dataset` from lines stored in text files.

    The column name default to "text".

    Examples:
        Read a file in remote storage.

        >>> import ray
        >>> ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")
        >>> ds.schema()
        Column  Type
        ------  ----
        text    string

        Read multiple local files.

        >>> ray.data.read_text( # doctest: +SKIP
        ...    ["local:///path/to/file1", "local:///path/to/file2"])

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        encoding: The encoding of the files (e.g., "utf-8" or "ascii").
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `PyArrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks and
            in the subsequent text decoding map task.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
            when opening input files to read.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset. By default, no files are filtered.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing lines of text read from the specified
        paths.
    N)r  r  r   r   r   r   r   r   r   r   r   r   )r   rL   r)   r   )r   r  r  r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   s                       ra   	read_textr  5  s    B ,M:::355)/#)!1#'  J '/	 	 	 	rc   c                    t          |           |t                      }t          | ||||	|
||||
  
        }t          ||||||||          S )a  Create a :class:`~ray.data.Dataset` from records stored in Avro files.

    Examples:
        Read an Avro file in remote storage or local storage.

        >>> import ray
        >>> ds = ray.data.read_avro("s3://anonymous@ray-example-data/mnist.avro")
        >>> ds.schema()
        Column    Type
        ------    ----
        features  list<item: int64>
        label     int64
        dataType  string

        >>> ray.data.read_avro( # doctest: +SKIP
        ...    ["local:///path/to/file1", "local:///path/to/file2"])

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `PyArrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks and
            in the subsequent text decoding map task.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
            when opening input files to read.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset. By default, no files are filtered.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` holding records from the Avro files.
    Nr   r   )r   rL   r   r   )r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   s                     ra   	read_avror    s    v ,M:::355/#)!1#'  J '/	 	 	 	rc   )r   rt   r   r   r   r   r   r   r   r   r   ru   c                    t          |           |t                      }t          | ||||||||	||
          }t          ||||          S )a  Create an Arrow dataset from numpy files.

    The column name defaults to "data".

    Examples:
        Read a directory of files in remote storage.

        >>> import ray
        >>> ray.data.read_numpy("s3://bucket/path") # doctest: +SKIP

        Read multiple local files.

        >>> ray.data.read_numpy(["/path/to/file1", "/path/to/file2"]) # doctest: +SKIP

        Read multiple directories.

        >>> ray.data.read_numpy( # doctest: +SKIP
        ...     ["s3://bucket/path1", "s3://bucket/path2"])

    Args:
        paths: A single file/directory path or a list of file/directory paths.
            A list of paths can contain both files and directories.
        filesystem: The filesystem implementation to read from.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`_.
        numpy_load_args: Other options to pass to np.load.
        meta_provider: File metadata provider. Custom metadata providers may
            be able to resolve file metadata more quickly and/or accurately. If
            ``None``, this function uses a system-chosen implementation.
        partition_filter: Path-based partition filter, if any. Can be used
            with a custom callback to read only selected partitions of a dataset.
            By default, this filters out any file paths whose file extension does not
            match "*.npy*".
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
            input files, i.e. ``FileShuffleConfig(seed = 42)``.
            Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        Dataset holding Tensor records read from the specified paths.
    N)
numpy_load_argsr   r   r   r   r   r   r   r   r   r   )r   rL   r%   r   )r   r   rt   r   r   r   r   r   r   r   r   r   ru   r  r   s                  ra   
read_numpyr  -  s~    V ,M:::355 '/#)!1#'  J /	   rc   )r   rt   r   r   r   r   r   r   r   r   r   	tf_schemar   r   r   ru   tfx_read_optionsr  zschema_pb2.Schemar  rU   c                   ddl }t          |           d}|rL|                                dk    r4	 ddl}d}n,# t          $ r d}t
                              d           Y nw xY w|t                      }t          | |||||	|||
||          }t          ||||||||          }|r|j
        r|r|sdd	lm}  ||          S |S )
a  Create a :class:`~ray.data.Dataset` from TFRecord files that contain
    `tf.train.Example <https://www.tensorflow.org/api_docs/python/tf/train/Example>`_
    messages.

    .. tip::
        Using the ``tfx-bsl`` library is more performant when reading large
        datasets (for example, in production use cases). To use this
        implementation, you must first install ``tfx-bsl``:

        1. `pip install tfx_bsl --no-dependencies`
        2. Pass tfx_read_options to read_tfrecords, for example:
           `ds = read_tfrecords(path, ..., tfx_read_options=TFXReadOptions())`

    .. warning::
        This function exclusively supports ``tf.train.Example`` messages. If a file
        contains a message that isn't of type ``tf.train.Example``, then this function
        fails.

    Examples:
        >>> import ray
        >>> ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")
        Dataset(num_rows=?, schema=...)

        We can also read compressed TFRecord files, which use one of the
        `compression types supported by Arrow <https://arrow.apache.org/docs/python/            generated/pyarrow.CompressedInputStream.html>`_:

        >>> ray.data.read_tfrecords(
        ...     "s3://anonymous@ray-example-data/iris.tfrecords.gz",
        ...     arrow_open_stream_args={"compression": "gzip"},
        ... )
        Dataset(num_rows=?, schema=...)

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `PyArrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
            when opening input files to read. To read a compressed TFRecord file,
            pass the corresponding compression type (e.g., for ``GZIP`` or ``ZLIB``),
            use ``arrow_open_stream_args={'compression': 'gzip'}``).
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths:  If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        tf_schema: Optional TensorFlow Schema which is used to explicitly set the schema
            of the underlying Dataset.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        tfx_read_options: Specifies read options when reading TFRecord files with TFX.
            When no options are provided, the default version without tfx-bsl will
            be used to read the tfrecords.
    Returns:
        A :class:`~ray.data.Dataset` that contains the example features.

    Raises:
        ValueError: If a file contains a message that isn't a ``tf.train.Example``.
    r   NFarmTzPlease install tfx-bsl package with `pip install tfx_bsl --no-dependencies`. This can help speed up the reading of large TFRecord files.)
r  r   r   r   r   r   r   r   r   r  )rt   r   r   r   r   r   ru   )_infer_schema_and_transform)platformr   	processortfx_bslModuleNotFoundErrorloggerwarningrL   r*   r   auto_infer_schema2ray.data._internal.datasource.tfrecords_datasourcer  )r   r   rt   r   r   r   r   r   r   r   r   r   r  r   r   r   ru   r  r  tfx_readr  r   dsr  s                           ra   read_tfrecordsr&    sp   d OOO+M:::H H..00E99	NNNHH" 	 	 	#NNO    	 355#/#)1#')  J 
'/	
 	
 	
B 	
/.
/ 
/ 	
/	
 	
 	
 	
 	
 	
 +*2...Is   8 &A! A!)topics
time_rangemessage_typesinclude_metadatar   rt   r   r   r   r   r   r   r   r   r   r   r   r   ru   r'  r(  r)  r*  c                   t          |           t          |           |t                      }|dg}|Wt          |t                    rBt          |          dk    rt          d|           t          |d         |d                   }t          | ||||||||||||          }t          |||||	|
||	          S )
a  Create a :class:`~ray.data.Dataset` from MCAP (Message Capture) files.

    MCAP is a format commonly used in robotics and autonomous systems for storing
    ROS2 messages and other time-series data. This reader provides predicate pushdown
    optimization for efficient filtering by topics, time ranges, and message types.

    Examples:
        :noindex:

        Read all MCAP files in a directory.

        >>> import ray
        >>> ds = ray.data.read_mcap("s3://bucket/mcap-data/") # doctest: +SKIP
        >>> ds.schema() # doctest: +SKIP

        Read with filtering for specific topics and time range.

        >>> from ray.data.datasource import TimeRange  # doctest: +SKIP
        >>> ds = ray.data.read_mcap( # doctest: +SKIP
        ...     "s3://bucket/mcap-data/", # doctest: +SKIP
        ...     topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
        ...     time_range=TimeRange(start_time=1000000000, end_time=5000000000), # doctest: +SKIP
        ...     message_types={"sensor_msgs/Image", "sensor_msgs/PointCloud2"} # doctest: +SKIP
        ... ) # doctest: +SKIP

        Alternatively, use a tuple for time range (backwards compatible).

        >>> ds = ray.data.read_mcap( # doctest: +SKIP
        ...     "s3://bucket/mcap-data/", # doctest: +SKIP
        ...     topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
        ...     time_range=(1000000000, 5000000000), # doctest: +SKIP
        ... ) # doctest: +SKIP

        Read multiple local files with include_paths.

        >>> ray.data.read_mcap( # doctest: +SKIP
        ...     ["local:///path/to/file1.mcap", "local:///path/to/file2.mcap"], # doctest: +SKIP
        ...     include_paths=True # doctest: +SKIP
        ... ) # doctest: +SKIP

        Read with topic filtering and metadata inclusion.

        >>> ds = ray.data.read_mcap( # doctest: +SKIP
        ...     "data.mcap", # doctest: +SKIP
        ...     topics={"/camera/image_raw", "/lidar/points"}, # doctest: +SKIP
        ...     include_metadata=True, # doctest: +SKIP
        ...     include_paths=True # doctest: +SKIP
        ... ) # doctest: +SKIP

    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        topics: Optional list or set of topic names to include. If specified, only
            messages from these topics will be read.
        time_range: Optional time range for filtering messages by timestamp. Can be either
            a tuple of (start_time, end_time) in nanoseconds (for backwards compatibility)
            or a TimeRange object. Both values must be non-negative and start_time < end_time.
        message_types: Optional list or set of message type names (schema names) to
            include. Only messages with matching schema names will be read.
        include_metadata: Whether to include MCAP metadata fields in the output.
            Defaults to True. When True, includes schema, channel, and message metadata.
        filesystem: The PyArrow filesystem implementation to read from.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        meta_provider: A :ref:`file metadata provider <metadata_provider>`. Custom
            metadata providers may be able to resolve file metadata more quickly and/or
            accurately. In most cases you do not need to set this parameter.
        partition_filter: A :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a dataset.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
            Defaults to ``["mcap"]``.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified MCAP files.
    Nmcapr   z:Time range must be a tuple of (start_time, end_time): got r   ry   )
start_timeend_time)r'  r(  r)  r*  r   r   r   r   r   r   r   r   r   )
r   rK   rL   r   r   r   r~   r#   r"   r   )r   r'  r(  r)  r*  r   rt   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   s                        ra   	read_mcapr/  A  s   p ,M:::'"""355!( *Z"?"?z??a       *Q-*Q-PPP
#)#)!1#'  J '/	 	 	 	rc   )r   rt   r   r   r   decoder
fileselect
filerenamesuffixesverbose_openr   r   r   r   ru   expand_jsonr0  r1  r2  r3  r4  r5  c                    t          |           |t                      }t          | ||||	|
||||||||          }t          ||||          S )a  Create a :class:`~ray.data.Dataset` from
    `WebDataset <https://github.com/webdataset/webdataset>`_ files.

    Args:
        paths: A single file/directory path or a list of file/directory paths.
            A list of paths can contain both files and directories.
        filesystem: The filesystem implementation to read from.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        arrow_open_stream_args: Key-word arguments passed to
            `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html>`_.
            To read a compressed TFRecord file,
            pass the corresponding compression type (e.g. for ``GZIP`` or ``ZLIB``, use
            ``arrow_open_stream_args={'compression': 'gzip'}``).
        meta_provider: File metadata provider. Custom metadata providers may
            be able to resolve file metadata more quickly and/or accurately. If
            ``None``, this function uses a system-chosen implementation.
        partition_filter: Path-based partition filter, if any. Can be used
            with a custom callback to read only selected partitions of a dataset.
        decoder: A function or list of functions to decode the data.
        fileselect: A callable or list of glob patterns to select files.
        filerename: A function or list of tuples to rename files prior to grouping.
        suffixes: A function or list of suffixes to select for creating samples.
        verbose_open: Whether to print the file names as they are opened.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
            input files, i.e. ``FileShuffleConfig(seed = 42)``.
            Defaults to not shuffle with ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        expand_json: If ``True``, expand JSON objects into individual samples.
            Defaults to ``False``.

    Returns:
        A :class:`~ray.data.Dataset` that contains the example features.

    Raises:
        ValueError: If a file contains a message that isn't a `tf.train.Example`_.

    .. _tf.train.Example: https://www.tensorflow.org/api_docs/python/tf/train/Example
    N)r0  r1  r2  r3  r4  r   r   r   r   r   r   r   r5  r   )r   rL   r.   r   )r   r   rt   r   r   r   r0  r1  r2  r3  r4  r   r   r   r   ru   r5  r   s                     ra   read_webdatasetr7    s    L ,M:::355%!/#)#'  J  /	   rc   )r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   ru   c                    t          |	           |	t                      }	t          | ||||	|
||||
  
        }t          ||||||||          S )a  Create a :class:`~ray.data.Dataset` from binary files of arbitrary contents.

    Examples:
        Read a file in remote storage.

        >>> import ray
        >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf"
        >>> ds = ray.data.read_binary_files(path)
        >>> ds.schema()
        Column  Type
        ------  ----
        bytes   binary

        Read multiple local files.

        >>> ray.data.read_binary_files( # doctest: +SKIP
        ...     ["local:///path/to/file1", "local:///path/to/file2"])

        Read a file with the filepaths included as a column in the dataset.

        >>> path = "s3://anonymous@ray-example-data/pdf-sample_0.pdf"
        >>> ds = ray.data.read_binary_files(path, include_paths=True)
        >>> ds.take(1)[0]["path"]
        'ray-example-data/pdf-sample_0.pdf'


    Args:
        paths: A single file or directory, or a list of file or directory paths.
            A list of paths can contain both files and directories.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `PyArrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the `S3FileSystem` is used.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        arrow_open_stream_args: kwargs passed to
            `pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/                python/generated/pyarrow.fs.FileSystem.html                    #pyarrow.fs.FileSystem.open_input_stream>`_.
        meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
            Custom metadata providers may be able to resolve file metadata more quickly
            and/or accurately. In most cases, you do not need to set this. If ``None``,
            this function uses a system-chosen implementation.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`.
            Use with a custom callback to read only selected partitions of a
            dataset. By default, no files are filtered.
            By default, this does not filter out any files.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to ``None``.
        ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
            found. Defaults to False.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
            shuffle the input files. Defaults to not shuffle with ``None``.
        file_extensions: A list of file extensions to filter files by.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` producing rows read from the specified paths.
    N)	r   r   r   r   r   r   r   r   r   r   )r   rL   r   r   )r   r   r   rt   r   r   r   r   r   r   r   r   r   r   r   r   ru   r   s                     ra   read_binary_filesr9  I	  s    F ,M:::355!#/#)!1'  J '/	 	 	 	rc   MD5)	
shard_keysshard_hash_fnrt   r   r   r   r   r   ru   sqlconnection_factoryr;  r<  c       	   
          t          | |||          }|
r;|
dk    r5|t          d          |                    |
          st          d          t          |||||||	|
          S )ag  Read from a database that provides a
    `Python DB API2-compliant <https://peps.python.org/pep-0249/>`_ connector.

    .. note::

        Parallelism is supported by databases that support sharding. This means
        that the database needs to support all of the following operations:
        ``MOD``, ``ABS``, and ``CONCAT``.

        You can use ``shard_hash_fn`` to specify the hash function to use for sharding.
        The default is ``MD5``, but other common alternatives include ``hash``,
        ``unicode``, and ``SHA``.

        If the database does not support sharding, the read operation will be
        executed in a single task.

    Examples:

        For examples of reading from larger databases like MySQL and PostgreSQL, see
        :ref:`Reading from SQL Databases <reading_sql>`.

        .. testcode::

            import sqlite3

            import ray

            # Create a simple database
            connection = sqlite3.connect("example.db")
            connection.execute("CREATE TABLE movie(title, year, score)")
            connection.execute(
                """
                INSERT INTO movie VALUES
                    ('Monty Python and the Holy Grail', 1975, 8.2),
                    ("Monty Python Live at the Hollywood Bowl", 1982, 7.9),
                    ("Monty Python's Life of Brian", 1979, 8.0),
                    ("Rocky II", 1979, 7.3)
                """
            )
            connection.commit()
            connection.close()

            def create_connection():
                return sqlite3.connect("example.db")

            # Get all movies
            ds = ray.data.read_sql("SELECT * FROM movie", create_connection)
            # Get movies after the year 1980
            ds = ray.data.read_sql(
                "SELECT title, score FROM movie WHERE year >= 1980", create_connection
            )
            # Get the number of movies per year
            ds = ray.data.read_sql(
                "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
            )

        .. testcode::
            :hide:

            import os
            os.remove("example.db")

    Args:
        sql: The SQL query to execute.
        connection_factory: A function that takes no arguments and returns a
            Python DB API2
            `Connection object <https://peps.python.org/pep-0249/#connection-objects>`_.
        shard_keys: The keys to shard the data by.
        shard_hash_fn: The hash function string to use for sharding. Defaults to "MD5".
            For other databases, common alternatives include "hash" and "SHA".
            This is applied to the shard keys.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            This is used for sharding when shard_keys is provided.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`Dataset` containing the queried data.
    )r=  r;  r<  r>  ry   Nz8shard_keys must be provided when override_num_blocks > 1zHDatabase does not support sharding. Please set override_num_blocks to 1.r   )r(   r~   supports_shardingr   )r=  r>  r;  r<  rt   r   r   r   r   r   ru   r   s               ra   read_sqlrA  	  s    T #-	  J  2Q66WXXX++,?@@ 	Z   '/	 	 	 	rc   )r;  rt   r   r   r   r   r   ru   connection_parametersc                n    ddl fd}
t          j                            | |
|d|||||||	          S )a7  Read data from a Snowflake data set.

    Example:

        .. testcode::
            :skipif: True

            import ray

            connection_parameters = dict(
                user=...,
                account="ABCDEFG-ABC12345",
                password=...,
                database="SNOWFLAKE_SAMPLE_DATA",
                schema="TPCDS_SF100TCL"
            )
            ds = ray.data.read_snowflake("SELECT * FROM CUSTOMERS", connection_parameters)

    Args:
        sql: The SQL query to execute.
        connection_parameters: Keyword arguments to pass to
            ``snowflake.connector.connect``. To view supported parameters, read
            https://docs.snowflake.com/developer-guide/python-connector/python-connector-api#functions.
        shard_keys: The keys to shard the data by.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            This is used for sharding when shard_keys is provided.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A ``Dataset`` containing the data from the Snowflake data set.
    r   Nc                  (     j         j        di  S )NrZ   )	connectorconnect)rB  	snowflakes   ra   snowflake_connection_factoryz4read_snowflake.<locals>.snowflake_connection_factory
  s     *y"*CC-BCCCrc   hash)
r>  r;  r<  rt   r   r   r   r   r   ru   )snowflake.connectorr\   r   rA  )r=  rB  r;  rt   r   r   r   r   r   ru   rH  rG  s    `         @ra   read_snowflakerK  N
  sy    t D D D D D D 87'/    rc   )tabler   catalogr   rt   r   r   r   r   r   ru   warehouse_idrL  rM  c           
         ddl m} d }t          j                            d          }|st          d          t          j                            d          }|sddlm}  |            r |            j        j	        
                                                                                                }|                                                    d                                          }nt          d	          |sAdd
lm}  |                                d                                          d         d         }|sAdd
lm}  |                                d                                          d         d         }||t          d          |rd| }|t          d           |||| |||          }t!          ||||||	|
|          S )a
  Read a Databricks unity catalog table or Databricks SQL execution result.

    Before calling this API, set the ``DATABRICKS_TOKEN`` environment
    variable to your Databricks warehouse access token.

    .. code-block:: console

        export DATABRICKS_TOKEN=...

    If you're not running your program on the Databricks runtime, also set the
    ``DATABRICKS_HOST`` environment variable.

    .. code-block:: console

        export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

    .. note::

        This function is built on the
        `Databricks statement execution API <https://docs.databricks.com/api/workspace/statementexecution>`_.

    Examples:

        .. testcode::
            :skipif: True

            import ray

            ds = ray.data.read_databricks_tables(
                warehouse_id='...',
                catalog='catalog_1',
                schema='db_1',
                query='select id from table_1 limit 750000',
            )

    Args:
        warehouse_id: The ID of the Databricks warehouse. The query statement is
            executed on this warehouse.
        table: The name of UC table you want to read. If this argument is set,
            you can't set ``query`` argument, and the reader generates query
            of ``select * from {table_name}`` under the hood.
        query: The query you want to execute. If this argument is set,
            you can't set ``table_name`` argument.
        catalog: (Optional) The default catalog name used by the query.
        schema: (Optional) The default schema used by the query.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`Dataset` containing the queried data.
    r   )DatabricksUCDatasourcec                      t          d          } 	 dd l}|                                }|| |j        d         d         S # t          $ r | t
          $ r | w xY w)NzNo dbutils module found.r   user_globaldbutils)RuntimeErrorIPythonget_ipythonns_tableImportErrorKeyError)no_dbutils_errorrU  ip_shells      ra   get_dbutilsz+read_databricks_tables.<locals>.get_dbutils
  s    '(BCC
	#NNN**,,H&&$]3I>> 	# 	# 	#"" 	# 	# 	#""	#s   .A   ADATABRICKS_TOKENzXPlease set environment variable 'DATABRICKS_TOKEN' to databricks workspace access token.DATABRICKS_HOST)is_in_databricks_runtimebrowserHostNamezYou are not in databricks runtime, please set environment variable 'DATABRICKS_HOST' to databricks workspace URL(e.g. "adb-<workspace-id>.<random-number>.azuredatabricks.net").)get_spark_sessionzSELECT CURRENT_CATALOG()zSELECT CURRENT_DATABASE()Nz5Only one of 'query' and 'table' arguments can be set.zselect * from z3One of 'query' and 'table' arguments should be set.)hosttokenrN  rM  r   r   )r   rt   r   r   r   r   r   ru   )6ray.data._internal.datasource.databricks_uc_datasourcerP  osenvirongetr~   ray.util.spark.utilsr_  notebookentry_point
getDbutils
getContexttagsra  r=  collectr   )rN  rL  r   rM  r   rt   r   r   r   r   r   ru   rP  r\  rc  rb  r_  r   ra  r   s                       ra   read_databricks_tablesro  
  s_   `     # # # JNN-..E 
1
 
 	

 :>>+,,D AAAAAA##%% 
	&2==??HHJJUUWW  88::>>"34488::DDS    V::::::##%%))*DEEMMOOPQRSTU V::::::""$$(()DEEMMOOPQRSTUU.PQQQ )(((}NOOO''!  J '/	 	 	 	rc   snapshot)

query_typefiltershudi_optionsstorage_optionsr   r   r   r   r   ru   	table_urirq  rr  rs  rt  c       
   	      V    t          | ||||          }t          ||||||	|
          S )a"  
    Create a :class:`~ray.data.Dataset` from an
    `Apache Hudi table <https://hudi.apache.org>`_.

    Examples:
        >>> import ray
        >>> ds = ray.data.read_hudi( # doctest: +SKIP
        ...     table_uri="/hudi/trips",
        ...     query_type="snapshot",
        ...     filters=[("city", "=", "san_francisco")],
        ... )

        >>> ds = ray.data.read_hudi( # doctest: +SKIP
        ...     table_uri="/hudi/trips",
        ...     query_type="incremental",
        ...     hudi_options={
        ...         "hoodie.read.file_group.start_timestamp": "20230101123456789",
        ...         "hoodie.read.file_group.end_timestamp": "20230201123456789",
        ...     },
        ... )

    Args:
        table_uri: The URI of the Hudi table to read from. Local file paths, S3, and GCS are supported.
        query_type: The Hudi query type to use. Supported values are ``snapshot`` and ``incremental``.
        filters: Optional list of filters to apply to the Hudi table when the
            ``query_type`` is ``snapshot``. Each filter is a tuple of the form
            ``(column_name, operator, value)``. The operator can be
            one of ``"="``, ``"!="``, ``"<"``, ``"<="``, ``">"``, ``">="``.
            Currently, only filters on partition columns will be effective.
        hudi_options: A dictionary of Hudi options to pass to the Hudi reader.
        storage_options: Extra options that make sense for a particular storage
            connection. This is used to store connection parameters like credentials,
            endpoint, etc. See more explanation
            `here <https://github.com/apache/hudi-rs?tab=readme-ov-file#work-with-cloud-storage>`_.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing records read from the Hudi table.
    )ru  rq  rr  rs  rt  r   r   r   r   r   r   ru   )r   r   )ru  rq  rr  rs  rt  r   r   r   r   r   ru   r   s               ra   	read_hudirx  =  sV    F  !'  J '/   rc   dfzdaft.DataFramec                     t                      }|J |t          d          k    rt          d          |                                 S )a  Create a :class:`~ray.data.Dataset` from a `Daft DataFrame <https://docs.getdaft.io/en/stable/api/dataframe/>`_.

    .. warning::

        This function only works with PyArrow 13 or lower. For more details, see
        https://github.com/ray-project/ray/issues/53278.

    Args:
        df: A Daft DataFrame

    Returns:
        A :class:`~ray.data.Dataset` holding rows read from the DataFrame.
    Nz14.0.0zw`from_daft` only works with PyArrow 13 or lower. For more details, see https://github.com/ray-project/ray/issues/53278.)r   parse_versionrT  to_ray_dataset)ry  pyarrow_versions     ra   	from_daftr~    sY     *++O&&&-1111?
 
 	
 rc   zdask.dataframe.DataFramec                     ddl }ddlm} |                                 } |j        |d|i}ddlfdt          fd|D                       }|S )a:  Create a :class:`~ray.data.Dataset` from a
    `Dask DataFrame <https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.html#dask.dataframe.DataFrame>`_.

    Args:
        df: A `Dask DataFrame`_.

    Returns:
        A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
    r   N)ray_dask_get	schedulerc                     t          | j                  rt          j        |           S t          | t          j                  r| S t          dt          |                      )Nz5Expected a Ray object ref or a Pandas DataFrame, got )r   	DataFramer\   r]   rO   r~   type)ry  pandass    ra   to_refzfrom_dask.<locals>.to_ref  sb    b&*++ 	72;;CM** 	IRRRR  rc   c           
          g | ]>} t          t          |j                                                                      ?S rZ   )nextiterdaskvalues)r_   partr  s     ra   rb   zfrom_dask.<locals>.<listcomp>  sA    QQQDT$)**,,--..	/	/QQQrc   )r  ray.util.daskr  
to_delayedpersistr  from_pandas_refs)ry  r  r  
partitionspersisted_partitionsr%  r  r  s         @@ra   	from_daskr    s     KKK******J'4<L|LLMMM     
QQQQ<PQQQ
 
B Irc   zmars.dataframe.DataFramec                 <    ddl m} |                    |           }|S )aM  Create a :class:`~ray.data.Dataset` from a
    `Mars DataFrame <https://mars-project.readthedocs.io/en/latest/reference/dataframe/index.html>`_.

    Args:
        df: A `Mars DataFrame`_, which must be executed by Mars-on-Ray.

    Returns:
        A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
    r   N)mars.dataframe	dataframer|  )ry  mdr%  s      ra   	from_marsr    s.      ##B''BIrc   z modin.pandas.dataframe.DataFramec                 J    ddl m}  || d          }t          |          }|S )a@  Create a :class:`~ray.data.Dataset` from a
    `Modin DataFrame <https://modin.readthedocs.io/en/stable/flow/modin/pandas/dataframe.html>`_.

    Args:
        df: A `Modin DataFrame`_, which must be using the Ray backend.

    Returns:
        A :class:`~ray.data.MaterializedDataset` rows read from the DataFrame.
    r   )unwrap_partitionsaxis)-modin.distributed.dataframe.pandas.partitionsr  r  )ry  r  partsr%  s       ra   
from_modinr    s>     POOOOObq)))E	%	 	 BIrc   dfszpandas.DataFramec                 ^   ddl }t          | |j                  r| g} |Ht          |           dk    r|                    | d          }n| d         }t          j        ||          } ddlm t          j
                    }|j        rfd| D             } t          d | D                       S )a  Create a :class:`~ray.data.Dataset` from a list of pandas dataframes.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
        >>> ray.data.from_pandas(df)
        MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64})

       Create a Ray Dataset from a list of Pandas DataFrames.

        >>> ray.data.from_pandas([df, df])
        MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64})

    Args:
        dfs: A pandas dataframe or a list of pandas dataframes.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` holding data read from the dataframes.
    r   Nry   r  ))_cast_ndarray_columns_to_tensor_extensionc                 J    g | ]} |                                            S rZ   )rj   )r_   ry  r  s     ra   rb   zfrom_pandas.<locals>.<listcomp>*  s-    RRR88CCRRRrc   c                 6    g | ]}t          j        |          S rZ   r[   )r_   ry  s     ra   rb   zfrom_pandas.<locals>.<listcomp>,  s     777RSWR[[777rc   )r  r   r  r   concatnparray_split"ray.air.util.data_batch_conversionr  rB   ri   enable_tensor_extension_castingr  )r  ru   pdaryr  r  s        @ra   from_pandasr    s    : #r|$$ e&s88a<< ))Ca)((CCa&CnS"566      %''G. SRRRRcRRR773777888rc   c                 N  	 t          | t          j                  r| g} nst          | t                    r?| D ];}t          |t          j                  st	          dt          |                     <nt	          dt          |                     t          j                    }|j        rt          t                    	t          j        	fd| D                       }t          t          d|id          t          j                                                              }t          t!          | |          |j                  }t%          ||          S t          t&          d          fd	| D             }t)          t          t+          |           \  }}t          j        |          }t          t          d|id          t          j                                                              }t          t!          ||          |j                  }t%          ||          S )
a6  Create a :class:`~ray.data.Dataset` from a list of Ray object references to
    pandas dataframes.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> df_ref = ray.put(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}))
        >>> ray.data.from_pandas_refs(df_ref)
        MaterializedDataset(num_blocks=1, num_rows=3, schema={a: int64, b: int64})

        Create a Ray Dataset from a list of Pandas Dataframes references.

        >>> ray.data.from_pandas_refs([df_ref, df_ref])
        MaterializedDataset(num_blocks=2, num_rows=6, schema={a: int64, b: int64})

    Args:
        dfs: A Ray object reference to a pandas dataframe, or a list of
             Ray object references to pandas dataframes.

    Returns:
        :class:`~ray.data.Dataset` holding data read from the dataframes.
    6Expected list of Ray object refs, got list containing 8Expected Ray object ref or list of Ray object refs, got c                 :    g | ]}                     |          S rZ   remote)r_   ry  get_metadata_schemas     ra   rb   z$from_pandas_refs.<locals>.<listcomp>Y  s(    "P"P"Pb#6#=#=b#A#A"P"P"Prc   r5   Nrf   r   num_returnsc                 :    g | ]}                     |          S rZ   r  )r_   ry  df_to_blocks     ra   rb   z$from_pandas_refs.<locals>.<listcomp>h  s'    
0
0
0b;b!!
0
0
0rc   )r   r\   rO   listr~   r  rB   ri   enable_pandas_blockr8   r;   rg  r7   r9   rj   r0   r5   rk   rD   r>   mapzip)
r  ry  r  metadata_schemaro   rp   resrW   r  r  s
           @@ra   r  r  /  s@   4 #s}%% 
e	C		 	
 	 	Bb#-00  WTRTXXWW  	 QtBxxQQ
 
 	
 %''G" 
./NOO'"P"P"P"PC"P"P"PQQ&<"A$OOO#%%**,,
 
 #sO,,n.E
 
 #
 
 	

 ##;KKKK
0
0
0
0C
0
0
0C!$S	22FOgo..O"|_=dKKK!!&&(( N 6?++^-D L   rc   ndarraysc                 n    t          | t          j                  r| g} t          d | D                       S )a  Creates a :class:`~ray.data.Dataset` from a list of NumPy ndarrays.

    The column name defaults to "data".

    Examples:
        >>> import numpy as np
        >>> import ray
        >>> arr = np.array([1])
        >>> ray.data.from_numpy(arr)
        MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64})

        Create a Ray Dataset from a list of NumPy arrays.

        >>> ray.data.from_numpy([arr, arr])
        MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64})

    Args:
        ndarrays: A NumPy ndarray or a list of NumPy ndarrays.

    Returns:
        :class:`~ray.data.Dataset` holding data from the given ndarrays.
    c                 6    g | ]}t          j        |          S rZ   r[   )r_   ndarrays     ra   rb   zfrom_numpy.<locals>.<listcomp>  s"    EEECGG,,EEErc   )r   r  r  from_numpy_refs)r  s    ra   
from_numpyr  x  s<    0 (BJ'' :EEHEEEFFFrc   c                    t          | t          j                  r| g} nst          | t                    r?| D ];}t          |t          j                  st	          dt          |                     <nt	          dt          |                     t          j                    t          t          d          fd| D             }t          t          t          |           \  }}t          j        |          }t          t          d|id          t          j                                                              }t!          t#          ||          |j                  }t'          ||          S )	a%  Creates a :class:`~ray.data.Dataset` from a list of Ray object references to
    NumPy ndarrays.

    The column name defaults to "data".

    Examples:
        >>> import numpy as np
        >>> import ray
        >>> arr_ref = ray.put(np.array([1]))
        >>> ray.data.from_numpy_refs(arr_ref)
        MaterializedDataset(num_blocks=1, num_rows=1, schema={data: int64})

        Create a Ray Dataset from a list of NumPy array references.

        >>> ray.data.from_numpy_refs([arr_ref, arr_ref])
        MaterializedDataset(num_blocks=2, num_rows=2, schema={data: int64})

    Args:
        ndarrays: A Ray object reference to a NumPy ndarray or a list of Ray object
            references to NumPy ndarrays.

    Returns:
        :class:`~ray.data.Dataset` holding data from the given ndarrays.
    r  r  r   r  c                 <    g | ]}                     |          S rZ   r  )r_   r  r   ndarray_to_block_remotes     ra   rb   z#from_numpy_refs.<locals>.<listcomp>  s*    
P
P
PG"))'377
P
P
Prc   r4   Nrf   )r   r\   rO   r  r~   r  rB   ri   r8   r=   r  r  rg  r7   r9   rj   r0   r4   rk   rD   )	r  r  r  rW   r  ro   rp   r   r  s	          @@ra   r  r    s   8 (CM** 
:	Hd	#	# 

 	 	Ggs}55  ;+/==; ;  	 VtG}}VV
 
 	
 
!
#
#C./?QOOO
P
P
P
P
Px
P
P
PC!$S	22FOgo..O"{O<TJJJ!!&&(( N
 &/**N,C L   rc   ru   tablespyarrow.Tablec                   ddl }ddl}t          | |j        t          f          r| g} |>|dk    rt          d          t          |           dk    r|                    |           n| d         t                    }|dk    r"fd|                    |          D             } n||z   dz
  |z  }g }|                    |          D ]K}||z  }||k    r n=t          |||z
            }	|
                                        ||	                     Lt          |          |k     r?                    dd          }
|                    |
g|t          |          z
  z             |} t          d | D                       S )a  Create a :class:`~ray.data.Dataset` from a list of PyArrow tables.

    Examples:
        >>> import pyarrow as pa
        >>> import ray
        >>> table = pa.table({"x": [1]})
        >>> ray.data.from_arrow(table)
        MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64})

        Create a Ray Dataset from a list of PyArrow tables.

        >>> ray.data.from_arrow([table, table])
        MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64})


    Args:
        tables: A PyArrow table, or a list of PyArrow tables,
                or its streaming format in bytes.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        :class:`~ray.data.Dataset` holding data from the PyArrow tables.
    r   Nzoverride_num_blocks must be > 0ry   c                 <    g | ]}                     d d           S )r   )slice)r_   r   combined_tables     ra   rb   zfrom_arrow.<locals>.<listcomp>  s6       /0$$Q**  rc   c                 6    g | ]}t          j        |          S rZ   r[   )r_   ts     ra   rb   zfrom_arrow.<locals>.<listcomp>  s     7771CGAJJ777rc   )r|   pyarrowr   Tablebytesr~   r   concat_tablesr   r   r   r  extendfrom_arrow_refs)r  ru   r|   pa
total_rows
batch_sizeslicesr   startlengthempty_tabler  s              @ra   
from_arrowr    s   @ OOO&28U+,, &!##>???58[[1__))&111&QR)((
??   4<NNCV4W4W  FF %'::Q>CVVJF^^$788 C CJJ&&EZe);<<n225&AABBBB 6{{000,221a88{m/BS[[/PQRRRF77777888rc   c                    t          | t          j                  r| g} t          t                    t          j        fd| D                       }t          t          d|id          t          j	                    
                                          }t          t          | |          |j                  }t          ||          S )a#  Create a :class:`~ray.data.Dataset` from a list of Ray object references to
    PyArrow tables.

    Examples:
        >>> import pyarrow as pa
        >>> import ray
        >>> table_ref = ray.put(pa.table({"x": [1]}))
        >>> ray.data.from_arrow_refs(table_ref)
        MaterializedDataset(num_blocks=1, num_rows=1, schema={x: int64})

        Create a Ray Dataset from a list of PyArrow table references

        >>> ray.data.from_arrow_refs([table_ref, table_ref])
        MaterializedDataset(num_blocks=2, num_rows=2, schema={x: int64})


    Args:
        tables: A Ray object reference to Arrow table, or list of Ray object
                references to Arrow tables, or its streaming format in bytes.

    Returns:
         :class:`~ray.data.Dataset` holding data read from the tables.
    c                 :    g | ]}                     |          S rZ   r  )r_   r  r  s     ra   rb   z#from_arrow_refs.<locals>.<listcomp>?  s(    MMM299!<<MMMrc   r1   Nrf   )r   r\   rO   r8   r;   rg  r7   r9   rB   ri   rj   r0   r1   rk   rD   )r  r  ro   rp   r  s       @ra   r  r    s    < &#-(( *+JKKgMMMMfMMMNNO"{O<TJJJ!!&&(( N &/**N,C L   rc   )
limitversion	timestampjson_predicate_hintsr   r   r   r   r   ru   urlr  r  r  r  c       
   	      v    t          | ||||          }t          j                            ||||||	|
          S )a  
    Read data from a Delta Sharing table.
    Delta Sharing projct https://github.com/delta-io/delta-sharing/tree/main

    This function reads data from a Delta Sharing table specified by the URL.
    It supports various options such as limiting the number of rows, specifying
    a version or timestamp, and configuring concurrency.

    Before calling this function, ensure that the URL is correctly formatted
    to point to the Delta Sharing table you want to access. Make sure you have
    a valid delta_share profile in the working directory.

    Examples:

        .. testcode::
            :skipif: True

            import ray

            ds = ray.data.read_delta_sharing_tables(
                url=f"your-profile.json#your-share-name.your-schema-name.your-table-name",
                limit=100000,
                version=1,
            )

    Args:
        url: A URL under the format
            "<profile-file-path>#<share-name>.<schema-name>.<table-name>".
            Example can be found at
            https://github.com/delta-io/delta-sharing/blob/main/README.md#quick-start
        limit: A non-negative integer. Load only the ``limit`` rows if the
            parameter is specified. Use this optional parameter to explore the
            shared table without loading the entire table into memory.
        version: A non-negative integer. Load the snapshot of the table at
            the specified version.
        timestamp: A timestamp to specify the version of the table to read.
        json_predicate_hints: Predicate hints to be applied to the table. For more
            details, see:
            https://github.com/delta-io/delta-sharing/blob/main/PROTOCOL.md#json-predicates-for-filtering.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control the number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`Dataset` containing the queried data.

    Raises:
        ValueError: If the URL is not properly formatted or if there is an issue
            with the Delta Sharing table connection.
    )r  r  r  r  r  rw  )r   r\   r   r   )r  r  r  r  r  r   r   r   r   r   ru   r   s               ra   read_delta_sharing_tablesr  N  s^    Z (1  J 8##'/ $   rc   zpyspark.sql.DataFramec                `    ddl }t          ||          }|j                            | |          S )a  Create a :class:`~ray.data.Dataset` from a
    `Spark DataFrame <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html>`_.

    Args:
        df: A `Spark DataFrame`_, which must be created by RayDP (Spark-on-Ray).
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.MaterializedDataset` holding rows read from the DataFrame.
    r   N)raydpr}   sparkspark_dataframe_to_ray_dataset)ry  rt   ru   r  s       ra   
from_sparkr    s5    * LLL(6IJJK;55b+FFFrc   )zdatasets.Datasetzdatasets.IterableDatasetc                    ddl }ddlm} ddlm} t          | |j        |j        f          rP	 |                    |           }t          |          dk    rddl
}g }	|D ]}
	 |                    |
dd          }|j        dk    r|	                    |j                   n&t                              d	|j         d
|
 d           h# |j        $ r+}t                              d|
 d| d           Y d}~d}~ww xY w|	st%          d          ddl}|j        j                                        }t/          |	||||ddt$          |gi          S n,# t$          |f$ r t                              d           Y nw xY wt          | |j                  rt1           ||           |||          S t          | |j                  r0|                     d          }t5          |dd         |          }|S t          | |j        |j        f          r=t;          |                                           }t?          d|d          d| d          tA          dtC          |                      )ai  Read a Hugging Face Dataset into a Ray Dataset.

    Creates a :class:`~ray.data.MaterializedDataset` from a
    `Hugging Face Datasets Dataset <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.Dataset/>`_
    or a :class:`~ray.data.Dataset` from a `Hugging Face Datasets IterableDataset <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.IterableDataset/>`_.

    It is recommended to use :func:`~ray.data.read_parquet` with the ``HfFileSystem``
    filesystem to read Hugging Face datasets rather than ``from_huggingface``.

    See :ref:`Loading Hugging Face datasets <loading_huggingface_datasets>` for more details.

    Args:
        dataset: A `Hugging Face Datasets Dataset`_ or `Hugging Face Datasets IterableDataset`_.
            `DatasetDict <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.DatasetDict/>`_
            and `IterableDatasetDict <https://huggingface.co/docs/datasets/package_reference/main_classes#datasets.IterableDatasetDict/>`_
            are not supported.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` holding rows from the `Hugging Face Datasets Dataset`_.
    r   N)ClientResponseError)HuggingFaceDatasourceT   )allow_redirectstimeout   zUnexpected status z resolving z$ from Hugging Face Hub parquet fileszFailed to resolve z: zDNo resolvable Parquet URLs found from Hugging Face Hub parquet filesretry_exceptions)rt   r   r   ru   r   r   z]Distributed read via Hugging Face Hub parquet files failed, falling back on single node read.r   r   r   r  a9  You provided a Hugging Face DatasetDict or IterableDatasetDict, which contains multiple datasets, but `from_huggingface` now only accepts a single Hugging Face Dataset. To convert just a single Hugging Face Dataset to a Ray Dataset, specify a split. For example, `ray.data.from_huggingface(my_dataset_dictionary['z'])`. Available splits are .z0`dataset` must be a `datasets.Dataset`, but got )"datasetsaiohttp.client_exceptionsr  4ray.data._internal.datasource.huggingface_datasourcer  r   IterableDatasetrC   list_parquet_urls_from_datasetr   requestsheadstatus_coder   r  r   r!  RequestExceptionFileNotFoundErrorfsspec.implementations.httpimplementationshttpHTTPFileSystemr   r   with_formatr  DatasetDictIterableDatasetDictr  keysr   	TypeErrorr  )r   rt   r   ru   r  r  r  	file_urlsr  resolved_urlsr  respefsspecr  hf_ds_arrowray_dsavailable_keyss                     ra   from_huggingfacer    s   H OOO======      'H4h6FGHH 87	 .LLWUUI9~~!! "$  C'}}S$PQ}RR+s22)00::::"NN!BT5E !B !BRU !B !B !B   $4   ______       
 % +^   3222-2AACC#! +# +(; %)*->@S,T%   A "\ "#67 	 	 	NN4    	 '8344 
!!'222## 3	
 
 
 	
 '8+,, 
 ))'22KN@STTT'H0(2NOPP 
gllnn-- 6
  "6 6 %36 6 6
 
 	
 NtG}}NN
 
 	
s=   2E "A$CE 
C<!C72E 7C<<AE &E;:E;ztf.data.Datasetc                 ^    t          t          |                                                     S )a  Create a :class:`~ray.data.Dataset` from a
    `TensorFlow Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset/>`_.

    This function is inefficient. Use it to read small datasets or prototype.

    .. warning::
        If your dataset is large, this function may execute slowly or raise an
        out-of-memory error. To avoid issues, read the underyling data with a function
        like :meth:`~ray.data.read_images`.

    .. note::
        This function isn't parallelized. It loads the entire dataset into the local
        node's memory before moving the data to the distributed object store.

    Examples:
        >>> import ray
        >>> import tensorflow_datasets as tfds
        >>> dataset, _ = tfds.load('cifar10', split=["train", "test"])  # doctest: +SKIP
        >>> ds = ray.data.from_tf(dataset)  # doctest: +SKIP
        >>> ds  # doctest: +SKIP
        MaterializedDataset(
            num_blocks=...,
            num_rows=50000,
            schema={
                id: binary,
                image: ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8),
                label: int64
            }
        )
        >>> ds.take(1)  # doctest: +SKIP
        [{'id': b'train_16399', 'image': array([[[143,  96,  70],
        [141,  96,  72],
        [135,  93,  72],
        ...,
        [ 96,  37,  19],
        [105,  42,  18],
        [104,  38,  20]],
        ...,
        [[195, 161, 126],
        [187, 153, 123],
        [186, 151, 128],
        ...,
        [212, 177, 147],
        [219, 185, 155],
        [221, 187, 157]]], dtype=uint8), 'label': 7}]

    Args:
        dataset: A `TensorFlow Dataset`_.

    Returns:
        A :class:`MaterializedDataset` that contains the samples stored in the `TensorFlow Dataset`_.
    )r   r  as_numpy_iteratorr  s    ra   from_tfr  P  s'    r d7446677888rc   ztorch.utils.data.Dataset
local_readc                     i }|r7t          t          j                                                    d          dd}t	          t          |           |d          S )a  Create a :class:`~ray.data.Dataset` from a
    `Torch Dataset <https://pytorch.org/docs/stable/data.html#torch.utils.data.Dataset/>`_.

    The column name defaults to "data".

    .. note::
        The input dataset can either be map-style or iterable-style, and can have arbitrarily large amount of data.
        The data will be sequentially streamed with one single read task.

    Examples:
        >>> import ray
        >>> from torchvision import datasets
        >>> dataset = datasets.MNIST("data", download=True)  # doctest: +SKIP
        >>> ds = ray.data.from_torch(dataset)  # doctest: +SKIP
        >>> ds  # doctest: +SKIP
        MaterializedDataset(num_blocks=..., num_rows=60000, schema={item: object})
        >>> ds.take(1)  # doctest: +SKIP
        {"item": (<PIL.Image.Image image mode=L size=28x28 at 0x...>, 5)}

    Args:
        dataset: A `Torch Dataset`_.
        local_read: If ``True``, perform the read as a local read.


    Returns:
        A :class:`~ray.data.Dataset` containing the Torch dataset samples.
    Fr   r   )r   r   r  ry   )r   ru   )rR   r\   r   r   r   r+   )r   r  r   s      ra   
from_torchr    s}    D O 
#A'))5577$ $ $ 

 

 ((('	   rc   *)
row_filterrt   selected_fieldssnapshot_idscan_kwargscatalog_kwargsr   r   r   r   ru   table_identifierr  rS   r  r  r  r  c           	          ddl m} |t          j        dt          d           |dk    rt          j        dt          d            || |||||	          }t          ||||	|
||
          }|S )a  Create a :class:`~ray.data.Dataset` from an Iceberg table.

    The table to read from is specified using a fully qualified ``table_identifier``.
    Using PyIceberg, any intended row filters, selection of specific fields and
    picking of a particular snapshot ID are applied, and the files that satisfy
    the query are distributed across Ray read tasks.
    The number of output blocks is determined by ``override_num_blocks``
    which can be requested from this interface or automatically chosen if
    unspecified.

    .. tip::

        For more details on PyIceberg, see
        - URI: https://py.iceberg.apache.org/

    Examples:
        >>> import ray
        >>> from ray.data.expressions import col  #doctest: +SKIP
        >>> # Read the table and apply filters using Ray Data expressions
        >>> ds = ray.data.read_iceberg( #doctest: +SKIP
        ...     table_identifier="db_name.table_name",
        ...     catalog_kwargs={"name": "default", "type": "glue"}
        ... ).filter(col("column_name") == "literal_value")
        >>> # Select specific columns
        >>> ds = ds.select_columns(["col1", "col2"])  #doctest: +SKIP

    Args:
        table_identifier: Fully qualified table identifier (``db_name.table_name``)
        row_filter: **Deprecated**. Use ``.filter()`` method on the dataset instead.
            A PyIceberg :class:`~pyiceberg.expressions.BooleanExpression`
            to use to filter the data *prior* to reading.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        selected_fields: **Deprecated**. Use ``.select_columns()`` method on the dataset instead.
            Which columns from the data to read, passed directly to
            PyIceberg's load functions. Should be an tuple of string column names.
        snapshot_id: Optional snapshot ID for the Iceberg table, by default the latest
            snapshot is used
        scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan() function
             (e.g., case_sensitive, limit, etc.)
        catalog_kwargs: Optional arguments to pass to PyIceberg's catalog.load_catalog()
             function (e.g., name, type, etc.). For the function definition, see
             `pyiceberg catalog
             <https://py.iceberg.apache.org/reference/pyiceberg/catalog/             #pyiceberg.catalog.load_catalog>`_.
        ray_remote_args: Optional arguments to pass to :func:`ray.remote` in the
            read tasks.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources, and capped at the number of
            physical files to be read. You shouldn't manually set this value in most
            cases.

    Returns:
        :class:`~ray.data.Dataset` with rows from the Iceberg table.
    r   )IcebergDatasourceNzThe 'row_filter' parameter is deprecated and will be removed in a future release. Use the .filter() method on the dataset instead. For example: ds = ray.data.read_iceberg(...).filter(col('column') > 5)r   r   r  zThe 'selected_fields' parameter is deprecated and will be removed in a future release. Use the .select_columns() method on the dataset instead. For example: ds = ray.data.read_iceberg(...).select_columns(['col1', 'col2']))r  r  r  r  r  r  )r   rt   r   r   r   ru   r   )0ray.data._internal.datasource.iceberg_datasourcer!  r   r   r   r   )r  r  rt   r  r  r  r  r   r   r   r   ru   r!  r   r   s                  ra   read_icebergr#    s    X SRRRRR U 	
 	
 	
 	
 &  \ 	
 	
 	
 	
 #")'%  J /'  G Nrc   )r  r   r   rt  scanner_optionsr   r   r   r   r   ru   r   r$  c          	      X    t          | |||||          }t          |||||	|
|          S )aX
  
    Create a :class:`~ray.data.Dataset` from a
    `Lance Dataset <https://lancedb.github.io/lance-python-doc/all-modules.html#lance.LanceDataset>`_.

    Examples:
        >>> import ray
        >>> ds = ray.data.read_lance( # doctest: +SKIP
        ...     uri="./db_name.lance",
        ...     columns=["image", "label"],
        ...     filter="label = 2 AND text IS NOT NULL",
        ... )

    Args:
        uri: The URI of the Lance dataset to read from. Local file paths, S3, and GCS
            are supported.
        version: Load a specific version of the Lance dataset. This can be an
            integer version number or a string tag. By default, the
            latest version is loaded.
        columns: The columns to read. By default, all columns are read.
        filter: Read returns only the rows matching the filter. By default, no
            filter is applied.
        storage_options: Extra options that make sense for a particular storage
            connection. This is used to store connection parameters like credentials,
            endpoint, etc. For more information, see `Object Store Configuration <https                ://lancedb.github.io/lance/guide/object_store/>`_.
        scanner_options: Additional options to configure the `LanceDataset.scanner()`
            method, such as `batch_size`. For more information,
            see `LanceDB API doc <https://lancedb.github.io            /lance-python-doc/all-modules.html#lance.LanceDataset.scanner>`_
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing records read from the Lance dataset.
    )r   r  r   r   rt  r$  rw  )r!   r   )r   r  r   r   rt  r$  r   r   r   r   r   ru   r   s                ra   
read_lancer&  <  sY    ~ !''  J '/   rc   )r   r   order_byclient_settingsclient_kwargsr   r   r   r   r   ru   dsnr'  r(  r)  c           	      Z    t          | ||||||          }t          ||||	|
||          S )aZ  
    Create a :class:`~ray.data.Dataset` from a ClickHouse table or view.

    Examples:
        >>> import ray
        >>> ds = ray.data.read_clickhouse( # doctest: +SKIP
        ...     table="default.table",
        ...     dsn="clickhouse+http://username:password@host:8124/default",
        ...     columns=["timestamp", "age", "status", "text", "label"],
        ...     filter="age > 18 AND status = 'active'",
        ...     order_by=(["timestamp"], False),
        ... )

    Args:
        table: Fully qualified table or view identifier (e.g.,
            "default.table_name").
        dsn: A string in standard DSN (Data Source Name) HTTP format (e.g.,
            "clickhouse+http://username:password@host:8124/default").
            For more information, see `ClickHouse Connection String doc
            <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
        columns: Optional list of columns to select from the data source.
            If no columns are specified, all columns will be selected by default.
        filter: Optional SQL filter string that will be used in the WHERE statement
            (e.g., "label = 2 AND text IS NOT NULL"). The filter string must be valid for use in
            a ClickHouse SQL WHERE clause. Please Note: Parallel reads are not currently supported
            when a filter is set. Specifying a filter forces the parallelism to 1 to ensure
            deterministic and consistent results. For more information, see `ClickHouse SQL WHERE Clause doc
            <https://clickhouse.com/docs/en/sql-reference/statements/select/where>`_.
        order_by: Optional tuple containing a list of columns to order by and a boolean indicating whether the order
            should be descending (True for DESC, False for ASC). Please Note: order_by is required to support
            parallelism. If not provided, the data will be read in a single task. This is to ensure
            that the data is read in a consistent order across all tasks.
        client_settings: Optional ClickHouse server settings to be used with the session/every request.
            For more information, see `ClickHouse Client Settings
            <https://clickhouse.com/docs/en/integrations/python#settings-argument>`_.
        client_kwargs: Optional additional arguments to pass to the ClickHouse client. For more information,
            see `ClickHouse Core Settings <https://clickhouse.com/docs/en/integrations/python#additional-options>`_.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.

    Returns:
        A :class:`~ray.data.Dataset` producing records read from the ClickHouse table or view.
    )rL  r*  r   r   r'  r(  r)  rw  )r   r   )rL  r*  r   r   r'  r(  r)  r   r   r   r   r   ru   r   s                 ra   read_clickhouser,    s\    P &'#  J '/   rc   )data_formatregionreader_kwargsrc  r-  r.  r/  c                T    t          ||| |||          }|                                S )a  Loads a Unity Catalog table or files into a Ray Dataset using Databricks Unity Catalog credential vending,
    with automatic short-lived cloud credential handoff for secure, parallel, distributed access from external engines.

    This function works by leveraging Unity Catalog's credential vending feature, which grants temporary, least-privilege
    credentials for the cloud storage location backing the requested table or data files. It authenticates via the Unity Catalog
    REST API (Unity Catalog credential vending for external system access, `Databricks Docs <https://docs.databricks.com/en/data-governance/unity-catalog/credential-vending.html>`_),
    ensuring that permissions are enforced at the Databricks principal (user, group, or service principal) making the request.
    The function supports reading data directly from AWS S3, Azure Data Lake, or GCP GCS in standard formats including Delta and Parquet.

    .. note::

       This function is experimental and under active development.

    Examples:
        Read a Unity Catalog Delta table:

        >>> import ray
        >>> ds = ray.data.read_unity_catalog(  # doctest: +SKIP
        ...     table="main.sales.transactions",
        ...     url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
        ...     token="dapi...",
        ...     region="us-west-2"
        ... )
        >>> ds.show(3)  # doctest: +SKIP

    Args:
        table: Unity Catalog table path in format ``catalog.schema.table``.
        url: Databricks workspace URL (e.g., ``"https://dbc-XXXXXXX-XXXX.cloud.databricks.com"``).
        token: Databricks Personal Access Token with ``EXTERNAL USE SCHEMA`` permission.
        data_format: Data format (``"delta"`` or ``"parquet"``). If not specified, inferred from table metadata.
        region: AWS region for S3 access (e.g., ``"us-west-2"``). Required for AWS, not needed for Azure/GCP.
        reader_kwargs: Additional arguments passed to the underlying Ray Data reader.

    Returns:
        A :class:`~ray.data.Dataset` containing the data from Unity Catalog.
    )base_urlrc  table_full_namer-  r.  r/  )r,   read)rL  r  rc  r-  r.  r/  rE  s          ra   read_unity_catalogr4    s=    \ &#  I >>rc   )r   r   rt   r   r   r   r   r   r   r   r   r   r   ru   pathc                T   ddl }d}	 |                    |           n*# t          $ r t          d| d| d| d| d	          w xY wdd	lm} t          | t                    st          d
           || |                                          }t          |f|||||	|
|||||d|S )a  Creates a :class:`~ray.data.Dataset` from Delta Lake files.

    Examples:

        >>> import ray
        >>> ds = ray.data.read_delta("s3://bucket@path/to/delta-table/") # doctest: +SKIP

    Args:
        path: A single file path for a Delta Lake table. Multiple tables are not yet
            supported.
        version: The version of the Delta Lake table to read. If not specified, the latest version is read.
        filesystem: The PyArrow filesystem
            implementation to read from. These filesystems are specified in the
            `pyarrow docs <https://arrow.apache.org/docs/python/api/            filesystems.html#filesystem-implementations>`_. Specify this parameter if
            you need to provide specific configurations to the filesystem. By default,
            the filesystem is automatically selected based on the scheme of the paths.
            For example, if the path begins with ``s3://``, the ``S3FileSystem`` is
            used. If ``None``, this function uses a system-chosen implementation.
        columns: A list of column names to read. Only the specified columns are
            read during the file scan.
        parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker. For
            example, specify `num_gpus=1` to request 1 GPU for each parallel read
            worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :meth:`~ray.remote` in the read tasks.
        meta_provider: A :ref:`file metadata provider <metadata_provider>`. Custom
            metadata providers may be able to resolve file metadata more quickly and/or
            accurately. In most cases you do not need to set this parameter.
        partition_filter: A
            :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
            with a custom callback to read only selected partitions of a dataset.
        partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
            that describes how paths are organized. Defaults to HIVE partitioning.
        shuffle: If setting to "files", randomly shuffle input files order before read.
            Defaults to not shuffle with ``None``.
        include_paths: If ``True``, include the path to each file. File paths are
            stored in the ``'path'`` column.
        concurrency: The maximum number of Ray tasks to run concurrently. Set this
            to control number of tasks to run concurrently. This doesn't change the
            total number of tasks run or the total number of output blocks. By default,
            concurrency is dynamically decided based on the available resources.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        **arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
            set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/                python/generated/pyarrow.dataset.Scanner.html                    #pyarrow.dataset.Scanner.from_fragment>`_

    Returns:
        :class:`~ray.data.Dataset` producing records read from the specified parquet
        files.

    r   N	deltalakez"`ray.data.read_delta` depends on 'z', but 'z)' couldn't be imported. You can install 'z' by running `pip install z`.)
DeltaTablez1Only a single Delta Lake table path is supported.)r  )r   r   rt   r   r   r   r   r   r   r   ru   )
	importlibimport_modulerX  r7  r8  r   strr~   	file_urisr   )r5  r  r   r   rt   r   r   r   r   r   r   r   r   r   r   ru   r   r9  packager8  r   s                        ra   
read_deltar>  %  s?   b G
(((( 
 
 
# # #' # #6=# ## # #
 
 	

 %$$$$$ dC   NLMMM JtW---7799E'#)!#/    s	    'Aonceearliestlatesti'  )
triggerstart_offset
end_offsetkafka_auth_configr   r   r   r   ru   
timeout_msbootstrap_serversrB  rC  rD  rE  rF  c          
          |dk    rt          d|          t          j                            t	          | |||||          d||||	|
          S )a]  Read data from Kafka topics.

    This function supports bounded reads from Kafka topics, reading messages
    between a start and end offset. Only the "once" trigger is
    supported for now, which performs a single bounded read. Currently we only
    have one read task for each partition.

    Examples:

        .. testcode::
            :skipif: True

            import ray

            # Read from a single topic with offset range
            ds = ray.data.read_kafka(
                topics="my-topic",
                bootstrap_servers="localhost:9092",
                start_offset=0,
                end_offset=1000,
            )


    Args:
        topics: Kafka topic name(s) to read from. Can be a single topic name
            or a list of topic names.
        bootstrap_servers: Kafka broker addresses. Can be a single string or
            a list of strings.
        trigger: Trigger mode for reading. Only "once" is supported, which
            performs a single bounded read.
        start_offset: Starting position for reading. Can be:
            - int: Offset number
            - str: "earliest"
        end_offset: Ending position for reading (exclusive). Can be:
            - int: Offset number
            - str: "latest"
        kafka_auth_config: Authentication configuration. See KafkaAuthConfig for details.
        num_cpus: The number of CPUs to reserve for each parallel read worker.
        num_gpus: The number of GPUs to reserve for each parallel read worker.
        memory: The heap memory in bytes to reserve for each parallel read worker.
        ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
        override_num_blocks: Override the number of output blocks from all read tasks.
            By default, the number of output blocks is dynamically decided based on
            input data size and available resources. You shouldn't manually set this
            value in most cases.
        timeout_ms: Timeout in milliseconds for every read task to poll until reaching end_offset (default 10000ms).
            If the read task does not reach end_offset within the timeout, it will stop polling and return the messages
            it has read so far.

    Returns:
        A :class:`~ray.data.Dataset` containing Kafka messages with the following schema:
        - offset: int64 - Message offset within partition
        - key: binary - Message key as raw bytes
        - value: binary - Message value as raw bytes
        - topic: string - Topic name
        - partition: int32 - Partition ID
        - timestamp: int64 - Message timestamp in milliseconds
        - timestamp_type: int32 - 0=CreateTime, 1=LogAppendTime
        - headers: map<string, binary> - Message headers (keys as strings, values as bytes)

    Raises:
        ValueError: If invalid parameters are provided.
        ImportError: If kafka-python is not installed.
    r?  z.Only trigger='once' is supported. Got trigger=)r'  rG  rC  rD  rE  rF  rr   )rt   r   r   r   r   ru   )r~   r\   r   r   r    )r'  rG  rB  rC  rD  rE  r   r   r   r   ru   rF  s               ra   
read_kafkarI    s~    ` &U'UUVVV8##/%!/!	
 	
 	
 '/ $   rc   r%  r   kwargsc                     t          j        |           | j        r(t          j        dt
                      | j        di |}n| }|S )a&  Generates reader.

    Args:
        ds: Datasource to read from.
        ctx: Dataset config to use.
        kwargs: Additional kwargs to pass to the legacy reader if
            `Datasource.create_reader` is implemented.

    Returns:
        The datasource or a generated legacy reader.
    z`create_reader` has been deprecated in Ray 2.9. Instead of creating a `Reader`, implement `Datasource.get_read_tasks` and `Datasource.estimate_inmemory_data_size`.rZ   )rB   _set_currentshould_create_readerr   r   r   create_reader)r%  r   rJ  r   s       ra   r   r     sh      S!!!	 	)8 		
 	
 	
 '7b&6&@&@&@&@##&(#&&rc   c                 R      "|                     dd           d fd}||d<   |S )Nr   r`   r  rw   c           	      F   ddl m}                                 D ]u\  }\  t          fd|                     |          D                       }|                     |                     |          ||                    ||                    } v |           } | S )Nr   )ArrowTensorArrayc                 b    g | ]+}t          j        |                                           ,S ))bufferdtype)r  r  	as_buffer)r_   bufrT  r   s     ra   rb   z=_resolve_parquet_args.<locals>._block_udf.<locals>.<listcomp>1  sB        
5NNN  rc   )ray.data.extensionsrQ  rv   r   column
set_column_ensure_integer_indexr  )r`   rQ  tensor_col_namenp_colrT  r   existing_block_udfr   s       @@ra   r   z)_resolve_parquet_args.<locals>._block_udf)  s    <<<<<<3G3M3M3O3O  /% 9    #(<<#@#@    ((//@@#$//HH 
 "-**511Lrc   )r`   r  rw   r  )r   )r   r   r   r]  s   `  @ra   r   r   "  sX     '/33L$GG	 	 	 	 	 	 	0 ,6<(rc   c                 P    | dk    rt                               d           n||} | S )Nrr   zpThe argument ``parallelism`` is deprecated in Ray 2.10. Please specify argument ``override_num_blocks`` instead.)r   r!  rs   s     ra   r}   r}   E  sB     b8	
 	
 	
 	
 
	()rc   c                 B    | t          j        dt                     d S d S )NzNThe `meta_provider` argument is deprecated and will be removed after May 2025.)r   r   r   )r   s    ra   r   r   S  s8      	
 	
 	
 	
 	
 ! rc   )NN)N)rr   NN)F)rr   N)r   loggingre  r   typingr   r   r   r   r   r   r	   r
   r   r   r   numpyr  packaging.versionr   r{  r\   ray._private.arrow_utilsr   ray._private.auto_init_hookr   $ray.air.util.tensor_extensions.utilsr   .ray.data._internal.datasource.audio_datasourcer   -ray.data._internal.datasource.avro_datasourcer   1ray.data._internal.datasource.bigquery_datasourcer   /ray.data._internal.datasource.binary_datasourcer   3ray.data._internal.datasource.clickhouse_datasourcer   ,ray.data._internal.datasource.csv_datasourcer   6ray.data._internal.datasource.delta_sharing_datasourcer   -ray.data._internal.datasource.hudi_datasourcer   .ray.data._internal.datasource.image_datasourcer   r   -ray.data._internal.datasource.json_datasourcer   r   r   .ray.data._internal.datasource.kafka_datasourcer   r    .ray.data._internal.datasource.lance_datasourcer!   -ray.data._internal.datasource.mcap_datasourcer"   r#   .ray.data._internal.datasource.mongo_datasourcer$   .ray.data._internal.datasource.numpy_datasourcer%   0ray.data._internal.datasource.parquet_datasourcer&   .ray.data._internal.datasource.range_datasourcer'   ,ray.data._internal.datasource.sql_datasourcer(   -ray.data._internal.datasource.text_datasourcer)   r#  r*   .ray.data._internal.datasource.torch_datasourcer+   +ray.data._internal.datasource.uc_datasourcer,   .ray.data._internal.datasource.video_datasourcer-   3ray.data._internal.datasource.webdataset_datasourcer.   +ray.data._internal.delegating_block_builderr/   %ray.data._internal.logical.interfacesr0   3ray.data._internal.logical.operators.from_operatorsr1   r2   r3   r4   r5   2ray.data._internal.logical.operators.read_operatorr6   ray.data._internal.planr7   ray.data._internal.remote_fnr8   ray.data._internal.statsr9   ray.data._internal.utilr:   r;   r<   r=   r>   ray.data.blockr?   r@   rA   ray.data.contextrB   ray.data.datasetrC   rD   ray.data.datasourcerE   rF   rG   rH   ray.data.datasource.datasourcerI   )ray.data.datasource.file_based_datasourcerJ   rK   &ray.data.datasource.file_meta_providerrL   rM    ray.data.datasource.partitioningrN   	ray.typesrO   ray.util.annotationsrP   rQ   ray.util.scheduling_strategiesrR   daftr  r  marsmodinr  r  pymongoarrow.apipymongoarrowpyspark
tensorflowtftorchpyiceberg.expressionsrS   tensorflow_metadata.proto.v0rT   rU   rV   	getLogger__name__r   rq   intr   r   r   floatr;  r   _FILE_EXTENSIONSboolr   r   r   r   rT  r   r   r
  r  r  r  r  r&  r/  callabler  r7  r9  rA  rK  ro  rx  r~  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r#  r&  r,  r  r4  r>  rI  r   r   r}   r   rZ   rc   ra   <module>r     s1        				                               4 4 4 4 4 4 



 8 8 8 8 8 8 6 6 6 6 6 6 P P P P P P J J J J J J H H H H H H P P P P P P L L L L L L T T T T T T F F F F F F      I H H H H H                
        K J J J J J S S S S S S S S J J J J J J J J J J J J N N N N N N J J J J J J F F F F F F H H H H H H Q Q Q Q Q Q J J J J J J M M M M M M J J J J J J T T T T T T N N N N N N = = = = = =              D C C C C C 1 1 1 1 1 1 9 9 9 9 9 9 1 1 1 1 1 1                      
 ) ( ( ( ( ( 9 9 9 9 9 9 9 9            2 1 1 1 1 1               : 9 9 9 9 9       8 8 8 8 8 8 8 8 I I I I I I RKKKKKKOOOKKKLLLMMMNNNNNNLLL777777777777QQQQQQGCLL		8	$	$ U    8  )-	Q Q Q9Q Q "#	Q
 Q Q Q Qh  !%)-0 0 0
0 0 #	0
 "#0 0 0 0 0f  !%)-: : :
: : 	:
 #: "#: : : : :z   $ $"&*!%)-c c cc c uo	c
 uoc UOc #s(^c #c "#c c c c  cL W 597;6:+/!&+:+K-1!%)- $ $"04!` ` `d3i ` 01` %T#s(^4	`
 23` <(` ` ` d3i(` 77#T)*` #` "#` uo` uo` UO`  d38n-!` ` ` `F W 597;6:+/$!&+:+K-1!%)- $ $"04#` ` `d3i ` 01` %T#s(^4	`
 23` <(` ` ` ` d3i(` 77#T)*` #` "#` uo` uo`  UO!`" d38n-#` ` ` `F W &*26 $ $"&*!%)-k k k	kk k
 tDz"k ./k k uok uok UOk #s(^k #k "#k k k k k\ W "R
  $ $"&*!%)-R R RRc]R C=R
 R uoR uoR UOR #s(^R #R "#R R R R Rj  59#' $ $"&*RV486:+7<+?+?DH+<+M!%)-%A A Ad3i A 01A d3i 	A
 A uoA uoA UOA #s(^A #4U28U38_3L-M(M#NOA 01A 23A <(A eGG,.??@AA A  d3i(!A" ##A$ "#%A( )A A A AH V 59 $ $"8<&*596:!%&*!&DH+:+K!%)-)d d dd3i d 01d 	d
 uod uod UOd 45d #s(^d #4S>2d 23d d 5c?
#d 3-d d  !d" eGG,.??@A#d$ d3i(%d& #'d( "#)d* +d d d dN  48 $ $"&*7;8<6:!-f!5!5!&DH+?!%)-'t t td3i t t 01	t
 t uot uot UOt #s(^t %T#s(^4t 45t 23t t t t  eGG,.??@A!t" d3i(#t$ #%t& "#'t* +t t t tn  59 $ $"&*7;8<6:!-f!5!5!&DH+/!%)-%p p pd3i p 01p 	p
 uop uop UOp #s(^p %T#s(^4p 45p 23p p p p eGG,.??@Ap  d3i(!p" ##p$ "#%p( )p p p pf  !48 $ $"047;8<6:!%!&DH+/!%)-)| | |d3i | | 	|
 01| | uo| uo| UO| d38n-| %T#s(^4| 45| 23| | |  !|" eGG,.??@A#|$ d3i(%|& #'|( "#)|* +| | | |~  59 $ $"047;8<6:!%!&DH+/!%)-%t t td3i t 01t 	t
 uot uot UOt d38n-t %T#s(^4t 45t 23t t t t eGG,.??@At  d3i(!t" ##t$ "#%t& 't t t tn  597;8<6:!%!&DH+:+K!%)-a a ad3i a 01a 	a
 %T#s(^4a 45a 23a a a a eGG,.??@Aa d3i(a #a "#a  !a a a aH W 59 $ $"&*7;8<6:!&/3DH+/!%)-37'k k kd3i k 01k 	k
 uok uok UOk #s(^k %T#s(^4k 45k 23k k k +,k eGG,.??@Ak  d3i(!k" ##k$ "#%k& /0'k( )k k k k\ W 48>B:>!48 $ $"048<6:!%!&DH+/!%)-+a a ad3i a U49c#h./0a uS#X	9:;	a
 E$s)SX"567a a 01a a uoa uoa UOa d38n-a 45a 23a a  !a" #a$ eGG,.??@A%a& d3i('a( #)a* "#+a, -a a a aH W 597;8<6::>262604DH+/!%)-%_ _ _d3i _ 01_ 	_
 %T#s(^4_ 45_ 23_ eD#x567_ tX~./_ tX~./_ uT8^,-_ _ eGG,.??@A_ _ d3i(_  #!_" "##_$ %_& '_ _ _ _D   48 $ $"&*7;8<6:!%!&DH+/!%)-%| | |d3i | | 01	|
 | uo| uo| UO| #s(^| %T#s(^4| 45| 23| | | eGG,.??@A|  d3i(!|" ##|$ "#%|& '| | | |~ W
 '+ $ $"04!%)-A A A	A Z0A c#	A
 A A uoA uoA UOA d38n-A #A "#A A A A AH W
 '+ $ $"&*!%)-J J J	JS>J c#	J
 J uoJ uoJ UOJ #s(^J #J "#J J J J JZ W  !  $ $"04!%)-] ] ]] C=] C=	]
 c]] SM] ] uo] uo] UO] d38n-] #] "#] ] ] ] ]@ W !48-104 $ $"04!%)-R R RR R d5c3/01	R
 4S>*R d38n-R uoR uoR UOR d38n-R #R "#R R R R Rj " w    <  ,  1D        F , 1D      5 :M    "  *.29 29	!4(:#;;	<29!#29 29 29 29 29j E	y+,d9=O3P.QQ	REE E E EP Grz4
+;;< GAT G G G G: <Ibj)4	"*0E+FFG<< < < <~  *.C9 C9 C9/5$u_e5K/L*MMNC9 "#C9 	C9 C9 C9 C9L -%./0Yu_e3456	8-
 - - - -` W  !#*.04 $ $"!%)-] ] ]	] C=] c]	]
 }] #3-] d38n-] uo] uo] UO] #] "#] ] ] ] ]@  "&)-	G G GG #G "#	G
 G G G G4  !%)-	B
 B
ABB
B
 #B
 "#	B

 '(B
 B
 B
 B
J 898989 89 89 89v  3 3'33 3 3 3 3l  37'-!%,0/304 $ $")-u u uu c../u 	u
 38_u #u $sCx.)u T#s(^,u d38n-u uou uou UOu "#u u u u up  *.#' 040404 $ $"!%)-O O O	O eCHo&O d3i 	O
 SMO d38n-O d38n-O d38n-O uoO uoO UOO #O "#O O O O Od W
 $( 1504.204 $ $"!%)-Y Y YY 
Y d3i 	Y
 SMY uT#Y_-.Y d38n-Y DcN+Y d38n-Y uoY uoY UOY #Y "#Y Y Y Y Yx W "& $(5 5 55	5 5
 #5 SM5 D>5 5 5 5 5p W "t 59#' $ $"04486:+7<+?+?-1!%)-#t t t
T#Y
tc]t 01	t
 d3i t t uot uot UOt d38n-t 01t 23t <(t 77#T)*t t  #!t" "##t t t tn W
  &4>0837 $ $"04)-a a a#tCy.!a S$s)^,a V_	a
 WZ001a c78,,-a  0a uoa uoa UOa d38n-a "#a a a a a aH''	' ' :v	' ' ' 'B SW   "4U28U38_3L-M(M#NO  
#s(^       H )- !# 	   
45
	
 
 
 
 
 
rc   