
    &`i|M                        d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZmZ d dlmZmZ d dlmZ d dlm Z m!Z! d dl"m#Z#m$Z$ d d	l%m&Z&m'Z'm(Z( d d
l)m*Z*m+Z+ d dl,m-Z- erd dl.Z/d dl0Z0 ej1        e2          Z3dZ4dZ5e-e G d d                                  Z6e- G d de                       Z7ded         dee8ef         ded         fdZ9dddee8ef         ddfdZ:dddee8ef         ddfdZ;d(dZ<ded         fd Z= G d! d"          Z>d#eg ee8ef         f         dee8ef         fd$Z?d%eed&         e6df         ddfd'Z@dS ))    N)	dataclass)
TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalUnion)RetryingContextManagerRetryingPyFileSystem_check_pyarrow_version_is_local_schemeinfer_compressioniterate_with_retrymake_async_gen)BlockBlockAccessor)DataContext)
DatasourceReadTask)BaseFileMetadataProviderDefaultFileMetadataProvider)PartitioningPathPartitionFilterPathPartitionParser)_has_file_extension_resolve_paths_and_filesystem)DeveloperAPI   c                   4    e Zd ZU dZdZee         ed<   d ZdS )FileShuffleConfiga^  Configuration for file shuffling.

    This configuration object controls how files are shuffled while reading file-based
    datasets.

    .. note::
        Even if you provided a seed, you might still observe a non-deterministic row
        order. This is because tasks are executed in parallel and their completion
        order might vary. If you need to preserve the order of rows, set
        `DataContext.get_current().execution_options.preserve_order`.

    Args:
        seed: An optional integer seed for the file shuffler. If provided, Ray Data
            shuffles files deterministically based on this seed.

    Example:
        >>> import ray
        >>> from ray.data import FileShuffleConfig
        >>> shuffle = FileShuffleConfig(seed=42)
        >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
    Nseedc                 j    | j         )t          | j         t                    st          d          dS dS )z2Ensure that the seed is either None or an integer.Nz Seed must be an integer or None.)r%   
isinstanceint
ValueErrorselfs    }/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/datasource/file_based_datasource.py__post_init__zFileShuffleConfig.__post_init__Y   s8    9 DIs)C)C ?@@@ !       )	__name__
__module____qualname____doc__r%   r   r(   __annotations__r-    r.   r,   r$   r$   >   sJ          , D(3-A A A A Ar.   r$   c                       e Zd ZU dZdZdZeeee	e         f                  e
d<   dZddd e            ddddddd
deee	e         f         ded	         d
eeedf                  deeeef                  dededededeeed         ef                  dedee	e                  f fdZde	e         fdZde	e         fdZdee         fdZ	 d+dedee         de	e         fdZdedeeef         dee         fdZdeeef         dee         fd Z	 	 	 	 	 	 d,d%Z dd#dedd"fd&Z!d' Z"d(d"dede#e$         fd)Z%e&defd*            Z' xZ(S )-FileBasedDatasourcezFile-based datasource for reading files.

    Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
    FN_FILE_EXTENSIONSr   )

filesystemschemaopen_stream_argsmeta_providerpartition_filterpartitioningignore_missing_pathsshuffleinclude_pathsfile_extensionspathsr8   pyarrow.fs.FileSystemr9   zpyarrow.lib.Schemar:   r;   r<   r=   r>   r?   filesr@   rA   c       
            t                                                       t                       t          |           | _        | j        s7t
          j        j        j                                        rt          d          || _
        t          j                    | _        || _        || _        || _        || _        || _        |
| _        || _        t+          ||          \  }| _        t/          j        | j        | j        j                  | _        t5          t6          t9          |                    || j        ||                     \  }}|r"t=          |          dk    rt          d          | j        bt?          t9          ||                    |                     |          }fd|D             }t=          |          dk    rt          d          _t?          t9          ||                    fd|D             }fd	|D             }t=          |          dk    rt          d
 d          tA          |	           d | _!        |	dk    r$tD          j#        $                                | _!        n>tK          |	tL                    r)tD          j#        $                    |	j'                  | _!        t          j(        |          | _)        t          j(        |          | _*        d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.)retryable_errors)r>   r   zRNone of the provided paths exist. The 'ignore_missing_paths' field is set to True.c                      g | ]
}|         S r4   r4   .0ppath_to_sizes     r,   
<listcomp>z0FileBasedDatasource.__init__.<locals>.<listcomp>       999a,q/999r.   z`No input files found to read. Please double check that 'partition_filter' field is set properly.c                 4    g | ]}t          |          |S r4   )r   )rI   rJ   rA   s     r,   rL   z0FileBasedDatasource.__init__.<locals>.<listcomp>   s)    QQQ1)<Q)P)PQQQQQr.   c                      g | ]
}|         S r4   r4   rH   s     r,   rL   z0FileBasedDatasource.__init__.<locals>.<listcomp>   rM   r.   zANo input files found to read with the following file extensions: zC. Please double check that 'file_extensions' field is set properly.rD   )+super__init__r   r   _supports_distributed_readsrayutilclientis_connectedr)   _schemar   get_current_data_context_open_stream_args_meta_provider_partition_filter_partitioning_ignore_missing_paths_include_paths_source_pathsr    _filesystemr   wrapretried_io_errorsmaplistzipexpand_pathslendict_validate_shuffle_arg_file_metadata_shufflernprandomdefault_rngr'   r$   r%   put
_paths_ref_file_sizes_ref)r+   rB   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   
file_sizesrK   	__class__s              ` @r,   rQ   zFileBasedDatasource.__init__o   s    	   /?/F/F+F(/ 	CHO4G4T4T4V4V 	O   (466!1+!1)%9"+""?z"R"Rt/4t/A/S
 
 
  ++$ )=	 ,  

 

z   	CJJ!OOC  
 !-E: 6 677L**511E99995999J5zzQ @  
 &E: 6 677LQQQQQQQE99995999J5zzQ ?&? ? ?   	g&&&'+$g+-9+@+@+B+BD((!233 	O+-9+@+@+N+ND(
 '%.."wz22r.   returnc                 4    t          j        | j                  S N)rS   getrp   r*   s    r,   _pathszFileBasedDatasource._paths   s    wt'''r.   c                 4    t          j        | j                  S rv   )rS   rw   rq   r*   s    r,   _file_sizeszFileBasedDatasource._file_sizes   s    wt+,,,r.   c                 F    d}|                                  D ]	}|||z  }
|S Nr   )rz   )r+   
total_sizeszs      r,   estimate_inmemory_data_sizez/FileBasedDatasource.estimate_inmemory_data_size   s8    
""$$ 	! 	!B~b 
r.   parallelismper_task_row_limitc                     dd l } j         j                                         }                                 } j        |t          t          ||                    fd j                            t                              D             }t          t          t          t          |                     \  }}t           j                  i dt          t                   dt          t                   f fd fd}t!          |t          |                    }g } |j        ||          }	 |j        ||          }
t          |	|
          D ]{\  }}t          |          dk    r                     |                                 |          } || j                  }t+          |||          }|                    |           ||S )	Nr   c                      g | ]
}|         S r4   r4   )rI   ifiles_metadatas     r,   rL   z6FileBasedDatasource.get_read_tasks.<locals>.<listcomp>   s.     ' ' ' q!' ' 'r.   
read_pathsrt   c              3     K   t                    }| D ]Ǌi }
t          
          } |          }t           j        |fi 	j                  5 t          fddj        j                  D ]I}|rt          ||          }j        r*t          j
        |          }|                    d          }|V  J	 d d d            n# 1 swxY w Y   d S )N)contextc                  0                                    S rv   )_read_stream)f	read_pathr+   s   r,   <lambda>zHFileBasedDatasource.get_read_tasks.<locals>.read_files.<locals>.<lambda>  s     1 1!Y ? ? r.   zread stream iteratively)descriptionmatchpath)#_unwrap_s3_serialization_workaroundr   r   _open_input_sourcerY   r   rc   _add_partitionsr_   r   	for_blockfill_column)r   fs
partitionsparseblockblock_accessorr   r   r8   r:   r=   r+   s         @@r,   
read_filesz6FileBasedDatasource.get_read_tasks.<locals>.read_files   s{     
 5Z@@B' $ $	-/
+/==E!&y!1!1J++D+B	NN=MNN .   $ !3??????$="0B" " " 
$ 
$
 & G$3E:$F$FE. R-:-DU-K-KN$2$>$>vy$Q$QE#
$	$ $ $ $ $ $ $ $ $ $ $ $ $ $ $$ $s   A,CC	C	c                       fd}|S )Nc               3     K   j         j        j        rd  dk    rst           t	                               t
                              dt	                     d  d           t          t                     d          E d {V  d S t
                              dt	                     d                      E d {V  d S )Nr   zReading z files with z	 threads.T)num_workerspreserve_orderingz files.)	rY   execution_optionspreserve_orderminrh   loggerdebugr   iter)num_threadsr   r   r+   s   r,   read_task_fnzUFileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn.<locals>.read_task_fn  s     
 %7F $"#K??"%k3z??"C"CKLLV3z??VVVVV    .Z(("$/*.	               LL!DC
OO!D!D!DEEE)z*55555555555r.   r4   )r   r   r   r   r+   s   `` r,   create_read_task_fnz?FileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn  s5    6 6 6 6 6 6 6 62  r.   )rows_per_filerr   )r   )numpyrZ   r]   rx   rz   rk   re   rf   permutationrh   rd   !_wrap_s3_serialization_workaroundra   r   strr   r   array_splitr[   _rows_per_file_NUM_THREADS_PER_TASKr   append)r+   r   r   rl   rB   rr   shuffled_files_metadatar   
read_taskssplit_pathssplit_file_sizesr   metar   	read_taskr   r8   r:   r=   r   s   `              @@@@@r,   get_read_tasksz"FileBasedDatasource.get_read_tasks   sA    	1)%%''
'3!#eZ"8"899N' ' ' '5AA#nBUBUVV' ' '# !%Ss4K/L%M%M N NE:6t7GHH
#!	$ 	$e_	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$:	  	  	  	  	  	 : +s5zz22
$bnUK88)2>*kBB&)+7G&H&H 	) 	)"J
:!##&&"1133% '  D /.z4;UVVL d7I  I i((((r.   r   	open_argsc                 T    |                     dd          }|t          |          }|S )a  Resolves the compression format for a stream.

        Args:
            path: The file path to resolve compression for.
            open_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_stream>`_
                when opening input files to read.

        Returns:
            The compression format (e.g., "gzip", "snappy", "bz2") or None if
            no compression is detected or specified.
        compressionN)rw   r   )r+   r   r   r   s       r,   resolve_compressionz'FileBasedDatasource.resolve_compressionF  s0      mmM488+D11Kr.   c                 N    |                     dd           }|| j        j        }|S )Nbuffer_size)poprY   streaming_read_buffer_size)r+   r   r   s      r,   _resolve_buffer_sizez(FileBasedDatasource._resolve_buffer_sizeZ  s,    mmM488,GKr.   filepyarrow.NativeFiler   pyarrow.PythonFilec                 H   dd l }dd l}ddlm} t	          j                    }t          |                                |          r|j        	                    ||           n|	                    ||           |
                    d           |                    |d          S )Nr   )HadoopFileSystem)srcdstr)mode)pyarrowsnappy
pyarrow.fsr   ioBytesIOr'   unwraphadoop_snappystream_decompressseek
PythonFile)r+   r   r8   par   r   streams          r,   _file_to_snappy_streamz*FileBasedDatasource._file_to_snappy_stream`  s    
 	//////j''))+;<< 	; 22t2HHHH$$6$:::A}}V#}...r.   c                     |                      ||          }|                     |          }|dk    r+d|d<    |j        |fd|i|}|                     ||          S ||d<    |j        |fd|i|S )a  Opens a source path for reading and returns the associated Arrow NativeFile.

        The default implementation opens the source path as a sequential input stream,
        using self._data_context.streaming_read_buffer_size as the buffer size if none
        is given by the caller.

        Implementations that do not support streaming reads (e.g. that require random
        access) should override this method.
        r   Nr   r   )r   r   open_input_streamr   )r+   r8   r   r   r   r   r   s          r,   r   z&FileBasedDatasource._open_input_sourcer  s      ..tY??//	::("" (,Im$/:/ "-1: D ..tZ@@@#.	- +z+DWWkWYWWWr.   c                     dS )z8Returns the number of rows per file, or None if unknown.Nr4   r*   s    r,   r   z"FileBasedDatasource._rows_per_file  s    tr.   r   c                      t          d          )z`Streaming read a single file.

        This method should be implemented by subclasses.
        z@Subclasses of FileBasedDatasource must implement _read_stream().)NotImplementedError)r+   r   r   s      r,   r   z FileBasedDatasource._read_stream  s    
 "N
 
 	
r.   c                     | j         S rv   )rR   r*   s    r,   supports_distributed_readsz.FileBasedDatasource.supports_distributed_reads  s    //r.   rv   )r   r   r8   r   rt   r   ))r/   r0   r1   r2   _WRITE_FILE_PER_ROWr7   r   r   r   r
   r3   r   r   typer   r   r   r   r   boolr   r$   rQ   rx   floatrz   r(   r   r   r   r   r   r   r   r   r	   r   r   propertyr   __classcell__)rs   s   @r,   r6   r6   _   sU           8<huS$s)^45<<<  9=>B592M2M2O2O04%)%*HL#/3\3 \3 \3S$s)^$\3 45	\3
 t%99:;\3 #4S>2\3 0\3 .\3 #\3 #\3 % 02C CDE\3 \3 "$s),\3 \3 \3 \3 \3 \3|(S	 ( ( ( (-T%[ - - - -Xc]     EIj jj4<SMj	hj j j jX$(cN	#   (d38n #    /"/ +/ 
	/ / / /$X*X X
 
X X X X@  
2 
# 
(5/ 
 
 
 
 0D 0 0 0 X0 0 0 0 0r.   r6   data)pyarrow.Tablepd.DataFramer   rt   c                     dd l }dd l}t          | |j        |j        f          sJ t          | |j                  rt          | |          S t          | |j                  rt          | |          S d S r|   )pandasr   r'   Table	DataFrame_add_partitions_to_table_add_partitions_to_dataframe)r   r   pdr   s       r,   r   r     s     dRXr|455555$!! :'j999$%% >+D*===> >r.   tabler   c           
         dd l }dd lm} t          | j                  }|                                D ]5\  }}|                    |gt          |           z            }||v r| j        	                    |          j
        }|                    |          }|                    |                    || |                             }	|	                                }	|	sCt          d| d| d| |                                                                          d          | j                            |          }
|                     |
||          } |                     ||          } 7| S )Nr   Partition column , exists in table data, but partition value '$' is different from in-data values: .)r   pyarrow.computecomputesetcolumn_namesitemsarrayrh   r9   fieldr   castallequalas_pyr)   unique	to_pylistget_field_index
set_columnappend_column)r   r   r   pcr   r   valuecolumncolumn_typevalues_are_equalr   s              r,   r   r     s          u)**L"((** 7 7u5'CJJ.//L  ,,,U338K[[--F!vvbhhvuU|&D&DEE/5577#  < < <#< <U|**,,6688< < <   ,,U33A$$Qv66EE''v66EELr.   dfr   c                    dd l }|                                D ]\  }} |j        |gt          |           z  |          }|| v r|                    | |         j                  }| |                                         }| |         |                             ||                   s>t          d| d| dt          | |         
                                           d          || |<   | S )Nr   )r   namer   r   r   r   )r   r   Seriesrh   astypedtypenotnaequalsr)   re   r   )r
  r   r   r   r  r  masks          r,   r   r     s    "((**  u#b'' 1>>>B;;]]2e9?33Fe9??$$De9T?))&,77  3 3 3#3 3BuI,,..//3 3 3   5		Ir.   r8   rC   c                     dd l }dd l}| }t          | t                    r|                                 }t          ||j        j                  rt          |           S | S r|   )r   r   r'   r   r   r   S3FileSystem_S3FileSystemWrapper)r8   r   r   base_fss       r,   r   r     sp     G*233 &##%%'25-.. 0#J///r.   )rC   r  c                 X    t          | t                    r|                                 } | S rv   )r'   r  r   )r8   s    r,   r   r     s.     *233 )&&((
r.   c                   <    e Zd ZdZd	dZd Zed             Zd ZdS )
r  ax  pyarrow.fs.S3FileSystem wrapper that can be deserialized safely.

    Importing pyarrow.fs during reconstruction triggers the pyarrow
    S3 subsystem initialization.

    NOTE: This is only needed for pyarrow<14.0.0 and should be removed
        once the minimum supported pyarrow version exceeds that.
        See https://github.com/apache/arrow/pull/38375 for context.
    r   rC   c                     || _         d S rv   _fs)r+   r   s     r,   rQ   z_S3FileSystemWrapper.__init__  s    r.   c                     | j         S rv   r  r*   s    r,   r   z_S3FileSystemWrapper.unwrap  s	    xr.   c                 &    dd l } |  ||           S r|   )r   )clsfs_reconstructfs_argsr   s       r,   _reconstructz!_S3FileSystemWrapper._reconstruct  s'     	s>>7+,,,r.   c                 L    t           j        | j                                        fS rv   )r  r!  r  
__reduce__r*   s    r,   r#  z_S3FileSystemWrapper.__reduce__  s    #0$(2E2E2G2GGGr.   N)r   rC   )	r/   r0   r1   r2   rQ   r   classmethodr!  r#  r4   r.   r,   r  r     sp               - - [-H H H H Hr.   r  	kwargs_fnc                 H    | r |             }|                     |           |S rv   )update)r%  kwargskwarg_overridess      r,   _resolve_kwargsr*    s.      '#)++o&&&Mr.   r?   rD   c                 n    | .| dk    s*t          | t                    st          d|  d          d S d S d S )NrD   zInvalid value for 'shuffle': z6. Valid values are None, 'files', `FileShuffleConfig`.)r'   r$   r)   )r?   s    r,   rj   rj   &  sb     	7g--GEV1W1W-CG C C C
 
 	
 	----r.   )r8   rC   )Ar   loggingdataclassesr   typingr   r   r   r   r   r	   r
   r   r   r   r   rl   rS   ray.data._internal.utilr   r   r   r   r   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasourcer   r   &ray.data.datasource.file_meta_providerr   r    ray.data.datasource.partitioningr   r   r   ray.data.datasource.path_utilr   r    ray.util.annotationsr!   r   r   r   	getLoggerr/   r   )FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLDPATHS_PER_FILE_SIZE_FETCH_TASKr$   r6   r   r   r   r   r   r   r  r*  rj   r4   r.   r,   <module>r:     s?   				  ! ! ! ! ! !                            



                  0 / / / / / / / ( ( ( ( ( ( ? ? ? ? ? ? ? ?                
        . - - - - - NNN 
	8	$	$ -/ ) "$  
A A A A A A A  A> A0 A0 A0 A0 A0* A0 A0 A0H

>
/
0
>>B38n
>
*+
> 
> 
> 
>(,S#X   @$(cN   .    EF   H H H H H H H H:DcN*+	#s(^   	
77#%6<=	
		
 	
 	
 	
 	
 	
r.   