
    &`i                     v   d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d d	l m!Z! d d
l"m#Z#m$Z$m%Z%m&Z& d dl'm(Z(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6 d dl7m8Z8m9Z9m:Z:m;Z; d dl<m=Z= d dl>m?Z?m@Z@mAZA d dlBmCZC er
d dlDZDd dlEmFZF  e jG        eH          ZI ed          ZJdZKdZLdZMdZNdZOdZPdZQdZRdZS G d  d!          ZTd" ZUe G d# d$                      ZVd%e@d&eWd'eVfd(ZX G d) d*e.          ZY	 dWd+ee(gee(         f         d,e	eZef         d-ee[         d.eeeZ                  d/ee	eZeZf                  d&eeeZ                  d0eee\d1f                  d2eeT         d3e]d4e9d5ed6         d'ed7         fd8Z^ddd9d9dd:d;d<d0d=d.eeeZ                  d/ee	eZeZf                  d&eeeZ                  d4e9d5ed6         d>ee[         d?e]d@e]d,ee	eZef                  d'e
d7         fdAZ_d;d<d&eeeZ                  d4e9fdBZ`d;eTdCeeeZ                  d0ed=         d'edD         fdEZae G dF dD                      Zbd2eeT         dGeeb         d'ecfdHZddIeeT         dCeeeZ                  d0ed=         dJee]         d'eeeb                  f
dKZedGeeeb                  dLee[         d'ee[         fdMZfdN Zgd2eeT         d'eeT         fdOZhdPe	eZe8f         dQd7d'd7fdRZid4e9dSeeZ         d'd=fdTZjdUeeZ         d;d<d4ee9         d'eeeZ         eeZ         f         fdVZkdS )X    N)	dataclass)TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalTupleUnion)parse)get_pyarrow_version)$_BATCH_SIZE_PRESERVING_STUB_COL_NAMEArrowBlockAccessor)get_column_references)ProgressBar)cached_remote_fn)RetryingPyFileSystem_check_pyarrow_version_is_local_schemeiterate_with_retry)BlockBlockAccessorBlockMetadata)DataContext)
Datasource)ReadTask)FileShuffleConfig)FileMetadataProvider_handle_read_os_error_list_files)PartitionDataTypePartitioningPathPartitionFilterPathPartitionParser)_resolve_paths_and_filesystem)
BinaryExprExpr	Operation)log_once)ParquetFileFragmentz10.0.0g      ?i'           g{Gz?   
   i   c                   r    e Zd ZdZdddefdZedefd            Zedd            Zd	 Z	e
d
             ZdS )_ParquetFragmentzThis wrapper class is created to avoid utilizing `ParquetFileFragment` original
    serialization protocol that actually does network RPCs during serialization
    (to fetch actual parquet metadata)fr-   	file_sizec                 "    || _         || _        d S N)	_fragment
_file_size)selfr5   r6   s      /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/parquet_datasource.py__init__z_ParquetFragment.__init__t   s    #    returnc                     | j         S r8   )r:   r;   s    r<   r6   z_ParquetFragment.file_sizex   s
    r>   c                     | j         S r8   )r9   rA   s    r<   originalz_ParquetFragment.original|   s
    ~r>   c                     t           j        | j        j        | j        j        | j        j        | j        j        | j        ffS r8   )r4   make_fragmentr9   formatpath
filesystempartition_expressionr:   rA   s    r<   
__reduce__z_ParquetFragment.__reduce__   s<    -N!NN%N/O0
 
 	
r>   c                 P    |                      |||          }t          ||          S r8   )rE   r4   )rF   rG   rH   rI   r6   fragments         r<   rE   z_ParquetFragment.make_fragment   s*    ''j:NOO)444r>   N)r?   r-   )__name__
__module____qualname____doc__intr=   propertyr6   rC   rJ   staticmethodrE    r>   r<   r4   r4   o   s        * *$/ $C $ $ $ $ 3    X    X
 
 
 5 5 \5 5 5r>   r4   c                     ddl }t          | j        | j                  D ]B\  }}t	          ||j                  r(t	          ||j                  rt          d| d          CdS )aX  Check for the legacy tensor extension type and raise an error if found.

    Ray Data uses an extension type to represent tensors in Arrow tables. Previously,
    the extension type extended `PyExtensionType`. However, this base type can expose
    users to arbitrary code execution. To prevent this, we don't load the type by
    default.
    r   Nz,Ray Data couldn't infer the type of column 'aN  '. This might mean you're trying to read data written with an older version of Ray. Reading data written with older versions of Ray might expose you to arbitrary code execution. To try reading the data anyway, set `RAY_DATA_AUTOLOAD_PYEXTENSIONTYPE=1` on *all* nodes.To learn more, see https://github.com/ray-project/ray/issues/41314.)pyarrowzipnamestypes
isinstanceUnknownExtensionTypePyExtensionTypeRuntimeError)schemapanametypes       r<   check_for_legacy_tensor_typerb      s     &,55  
ddB344 
	"$:
 :
 
	 Vt V V V  	 r>   c                   @    e Zd ZU dZee         ed<   ee         ed<   dS )_SplitPredicateResulta{  Result of splitting a predicate by column type.

    Attributes:
        data_predicate: Expression containing only data column predicates
            (for PyArrow pushdown), or None if no data predicates exist.
        partition_predicate: Expression containing only partition column predicates
            (for partition pruning), or None if no partition predicates exist.
    data_predicatepartition_predicateN)rM   rN   rO   rP   r   r*   __annotations__rT   r>   r<   rd   rd      s>           TN"""!$'''''r>   rd   	predicatepartition_columnsr?   c                 R   t          t          |                     }||z
  }||z  }|st          | d          S |st          d|           S t          | t                    r| j        t          j        k    rt          | j	        |          }t          | j
        |          }dt          t                   dt          t                   dt          t                   fd} ||j        |j                  } ||j        |j                  }	t          ||	          S t          dd          S )a  Split a predicate into data-only and partition-only parts.

    This function extracts both data column predicates and partition column
    predicates from AND chains, enabling both PyArrow pushdown (data part) and
    partition pruning (partition part).

    Args:
        predicate: The predicate expression to analyze.
        partition_columns: Set of partition column names.

    Returns:
        _SplitPredicateResult containing:
        - data_predicate: Expression with only data columns (for PyArrow pushdown),
          or None if no data predicates can be extracted.
        - partition_predicate: Expression with only partition columns (for pruning),
          or None if no partition predicates can be extracted.

    Examples:
        >>> from ray.data.expressions import col
        >>> # Pure data predicate:
        >>> result = _split_predicate_by_columns(col("data1") > 5, {"partition_col"})
        >>> result.data_predicate is not None  # Should have data predicate
        True
        >>> result.partition_predicate is None  # Should not have partition predicate
        True

        >>> # Pure partition predicate:
        >>> result = _split_predicate_by_columns(col("partition_col") == "US", {"partition_col"})
        >>> result.data_predicate is None  # Should not have data predicate
        True
        >>> result.partition_predicate is not None  # Should have partition predicate
        True

        >>> # Mixed AND - can split both parts:
        >>> result = _split_predicate_by_columns(
        ...     (col("data1") > 5) & (col("partition_col") == "US"),
        ...     {"partition_col"}
        ... )
        >>> result.data_predicate is not None  # Should have data predicate
        True
        >>> result.partition_predicate is not None  # Should have partition predicate
        True

        >>> # Mixed OR - can't split safely:
        >>> result = _split_predicate_by_columns(
        ...     (col("data1") > 5) | (col("partition_col") == "US"),
        ...     {"partition_col"}
        ... )
        >>> result.data_predicate is None  # Should not have data predicate
        True
        >>> result.partition_predicate is None  # Should not have partition predicate
        True
    N)re   rf   leftrightr?   c                     | r|r| |z  S | p|S r8   rT   )rk   rl   s     r<   combine_predicatesz7_split_predicate_by_columns.<locals>.combine_predicates  s'      $ $e|#=5 r>   )setr   rd   rZ   r)   opr+   AND_split_predicate_by_columnsrk   rl   r   r*   re   rf   )
rh   ri   referenced_cols	data_colspartition_cols_in_predicateleft_resultright_resultrn   re   rf   s
             r<   rr   rr      s\   r /	::;;O"33I"14E"E& Y$ISWXXXX Y$DiXXXX )Z(( 
Y\Y]-J-J1).BSTT29?DUVV	!4.	!)1$	!d^	! 	! 	! 	! ,+&(C
 
 10+\-M
 
 %)?R
 
 
 	
 !$OOOOr>   c                       e Zd ZdZdgZdddddddd ed          dddddeeee         f         de	ee                  d	e	e
eef                  d
e	e
eef                  de	eegef                  de	d         de	eedf                  de	e         dede	e         deed         df         dede	ee                  f fdZdefdZ	 d.dede	e         dee         fdZd Zedefd            ZdefdZdefd Zde	ee                  fd!Zde	ee                  fd"Zde	ee                  fd#Zd$e dd f fd%Z!d&ee"         defd'Z#e$d(e	d)         d*d)d+e	d)         d,e	ee                  dd)f
d-            Z% xZ&S )/ParquetDatasourcea;  Parquet datasource, for reading and writing Parquet files.

    This implementation uses PyArrow's `ParquetDataset` abstraction for dataset reads,
    and thus offers automatic Arrow dataset schema inference and row count collection at
    the cost of some potential performance and/or compatibility penalties.
    parquetNhiveF)columnsdataset_kwargsto_batch_kwargs
_block_udfrH   r^   meta_providerpartition_filterpartitioningshuffleinclude_pathsfile_extensionspathsr|   r}   r~   r   rH   zpyarrow.fs.FileSystemr^   pyarrow.lib.Schemar   r   r   r   filesr   r   c                H   t                                                       t                       t          |           | _        | j        s7t
          j        j        j                                        rt          d          d | _
        | j        s;ddlm}  |t          j                                                    d          | _
        || _        t!          ||          \  }| _        t%          j        | j        t)          j                    j                  }t/          |||	|          }|rt1          | \  }}ng g }}|t2                              d           ni }d	|v rt          d
          d |d	<   t7          t9          |          ||          }d\  }}|+|j        r t=          ||j        d         |
          \  }}ng g }}|i }d t1          |j        |          D             | _        d |j        D             | _         || _!        || _"        |
|d | _#        nSi | _#        |$| j#        $                    d |D                        |$| j#        $                    d |D                        |}|V|
T|j        rMtK          |
          } ||j        d         j&                  }|r!t9          |'                                          }||ng | _(        |d uotS          | j(                  dk    | _*        || _+        |j,        | _-        t]          |
| j                   | _/        d | _0        || _1        |
| _2        |dk    r$tf          j4        5                                | _0        n>tm          |tn                    r)tf          j4        5                    |j8                  | _0        ts          | j                  }tu          || ;                                || j
                  }ty          ||          | _=        t}          |t)          j                    j?                  | _@        d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.r   )NodeAffinitySchedulingStrategyF)softretryable_errors)r   r   z|Please note that `ParquetDatasource.__init__`s `dataset_kwargs` is a deprecated parameter and will be removed in the future.r   zuThe 'partitioning' parameter isn't supported in 'dataset_kwargs'. Use the top-level 'partitioning' parameter instead.)NNc                 4    g | ]\  }}t          ||          S rT   )r4   ).0rL   r6   s      r<   
<listcomp>z.ParquetDatasource.__init__.<locals>.<listcomp>  s6     
 
 
#) Xy11
 
 
r>   c                     g | ]	}|j         
S rT   )rG   )r   ps     r<   r   z.ParquetDatasource.__init__.<locals>.<listcomp>  s    :::Q!&:::r>   c                     i | ]}||S rT   rT   r   cols     r<   
<dictcomp>z.ParquetDatasource.__init__.<locals>.<dictcomp>  s    ,N,N,N#S#,N,N,Nr>   c                     i | ]}||S rT   rT   r   s     r<   r   z.ParquetDatasource.__init__.<locals>.<dictcomp>  s    ,S,S,S#S#,S,S,Sr>   r   )r|   r^   local_scheduling)Asuperr=   r   r   _supports_distributed_readsrayutilclientis_connected
ValueError_local_schedulingray.util.scheduling_strategiesr   get_runtime_contextget_node_id_source_pathsr(   _filesystemr   wrapr   get_currentretried_io_errorsr#   rW   loggerwarningget_parquet_datasetlist	fragments!_infer_data_and_partition_columns_pq_fragments	_pq_pathsr   _to_batches_kwargs_projection_mapupdater'   rG   keys_partition_columnslen_partition_columns_selected_read_schemar^   _file_schema_get_partition_columns_schema_partition_schema_file_metadata_shuffler_include_paths_partitioningnprandomdefault_rngrZ   r    seed_sample_fragments_fetch_file_infos_get_data_columns_estimate_files_encoding_ratio_encoding_ratio_estimate_reader_batch_sizetarget_max_block_size_default_batch_size)r;   r   r|   r}   r~   r   rH   r^   r   r   r   r   r   r   r   listed_files
file_sizespq_dsdata_columnsri   actual_partition_columnsr   parsed_partitionssampled_fragmentssampled_file_infos	__class__s                            r<   r=   zParquetDatasource.__init__#  s   " 	   /?/F/F+F(/ 	CHO4G4T4T4V4V 	O   "&/ 	UUUUUU%C%C'))5577e& & &D" #"?z"R"Rt).(466H
 
 


 #-+	
 
 
  	' #\ 2E:: "B:E%NNO   
  N^++F   *.~& $DKK^LL +5'' 92SU_Q/3 3///
 35b/" O

 
'*5?J'G'G
 
 
 ;:%/:::$"1
 $5$=#'D  #%D '$++,N,N,N,N,NOOO ,$++,S,SAR,S,S,STTT $5 $)Aeo)A'55E %eoa&8&= > >  J+/0A0F0F0H0H+I+I(
 )A(L$$RT 	
 T)Nc$2I.J.JQ.N 	( #!L!>$."
 "
 (,$+)g+-9+@+@+B+BD((!233 	O+-9+@+@+N+ND( .
 
 /**,,!3	
 
 
  > 
  

 $? 7 9 9 O$
 $
   r>   r?   c                 P    | j         i k    rdS |                     | j                  S Nr   )r   _estimate_in_mem_sizer   rA   s    r<   estimate_inmemory_data_sizez-ParquetDatasource.estimate_inmemory_data_size  s,    2%%1))$*<===r>   parallelismper_task_row_limitc                 B   | j         t          t          | j        | j                            fd| j                             t                              D             }t          t          t          t          |                     \  }}n| j        | j        }}|                     | j	        | j
        | j        |                                 | j                  }g }| j        | j                                        nd t          t!          j        ||          t!          j        ||                    D ]\  }}	t          |          dk    rt%          d |                     |          |	d           }
| j        | j        | j        |                                 |                                 |                                 | j	        | j        | j        f	\	  |                    t9          |ff
d	|
||                     |S )Nc                      g | ]
}|         S rT   rT   )r   ifiles_metadatas     r<   r   z4ParquetDatasource.get_read_tasks.<locals>.<listcomp>  s.     ' ' ' q!' ' 'r>   )file_schemapartition_schemaprojected_columnsr   r   )num_rows
size_bytesinput_files
exec_statsc                 6   
 t          
	|           S r8   )read_fragments)r5   	block_udfr   data_columns_rename_mapdefault_read_batch_size_rowsfilter_exprr   ri   r   read_schemato_batches_kwargss    r<   <lambda>z2ParquetDatasource.get_read_tasks.<locals>.<lambda>"  s2    !)4$/)#%$#) ) r>   )r^   r   )r   r   rW   r   r   permutationr   map_derive_schemar   r   r   get_current_projectionr   _predicate_expr
to_pyarrowr   array_splitr   r   r   r   r   get_column_renames_get_partition_columnsr   r   appendr   )r;   r   r   shuffled_files_metadatapq_fragmentspq_pathstarget_schema
read_tasksr   r   metar   r   r   r   r   r   r   ri   r   r   r   s              @@@@@@@@@@@r<   get_read_tasksz ParquetDatasource.get_read_tasks  s    '3!#d&8$."I"IJJN' ' ' '5AA#nBUBUVV' ' '# &*#dC9P4Q*R*R%S%S"L(( " #L ++)!3"99;; , 
 
 
 #/  ++--- 	 !$N<55N8[11!
 !
 7	 7	Iu 9~~"" 55i@@!	  D$ '(&&((''))++--!#"

!,'! &               ('9!     * r>   c                     dS )zuReturn a human-readable name for this datasource.

        This will be used as the names of the read tasks.
        ParquetrT   rA   s    r<   get_namezParquetDatasource.get_name7  s	    
 yr>   c                     | j         S r8   )r   rA   s    r<   supports_distributed_readsz,ParquetDatasource.supports_distributed_reads>  s    //r>   c                     dS NTrT   rA   s    r<   supports_projection_pushdownz.ParquetDatasource.supports_projection_pushdownB      tr>   c                     dS r  rT   rA   s    r<   supports_predicate_pushdownz-ParquetDatasource.supports_predicate_pushdownE  r  r>   c                 p    |                                  }|                                 }||dS |pg |pg z   S )zBOverride to include partition columns in addition to data columns.N)r   r   )r;   r   ri   s      r<   r   z(ParquetDatasource.get_current_projectionH  sN     --// 7799$5$=4"'8'>B??r>   c                 ~      j         dS  j        sdS  fd j                                         D             }|r|S g S )au  Extract partition columns from projection map.

        This method extracts partition columns from _projection_map, which is the
        source of truth after projection pushdown. Since partition columns are now
        included in _projection_map during initialization when requested, we can
        reliably extract them from the map.

        Returns:
            List of partition column names in the projection, None if there's
            no projection (meaning include all partition columns), or [] if
            partition columns aren't in the projection map (meaning include
            no partition columns).
        Nc                 &    g | ]}|j         v |S rT   )r   )r   r   r;   s     r<   r   z<ParquetDatasource._get_partition_columns.<locals>.<listcomp>h  s-     
 
 
#AX:X:XC:X:X:Xr>   r   r   r   )r;   partition_colss   ` r<   r   z(ParquetDatasource._get_partition_columnsS  sq     '4& 	4
 
 
 
/4466
 
 

  	"!!
 	r>   c                 r    | j         dS | j        fd| j                                         D             }|S )a  Extract data columns from projection map, excluding partition columns.

        Partition columns aren't in the physical file schema, so they must be
        filtered out before passing to PyArrow's to_batches().

        Returns:
            List of data column names to read from files, or None if no projection.
            Can return empty list if only partition columns are projected.
        Nc                     g | ]}|v|	S rT   rT   )r   r   r  s     r<   r   z7ParquetDatasource._get_data_columns.<locals>.<listcomp>  s*     
 
 
#^:S:SC:S:S:Sr>   r  )r;   rt   r  s     @r<   r   z#ParquetDatasource._get_data_columnsu  s\     '4 0
 
 
 
/4466
 
 
	 r>   predicate_exprc                 b   t          | j                  }|s!t                                          |          S t	          ||          }|j        | j        t          | j                  }g }g }t          | j	        | j
                  D ]J\  }}|                    ||j                  r*|                    |           |                    |           K|| _	        || _
        ddl}	|	                    |           }
|j        -t          t          |
                              |j                  S |
S )u^  Apply a predicate with data pushdown and partition pruning.

        This method optimizes predicates in three ways:
        1. Data predicates → pushed to PyArrow (row-level filtering)
        2. Partition predicates → used for partition pruning (file-level filtering)
        3. Mixed predicates → both optimizations applied together
        Nr   )ro   r   r   apply_predicaterr   rf   r   r'   rW   r   r   evaluate_predicate_on_partitionr   copyre   ry   )r;   r  r  split_resultparserpruned_fragmentspruned_pathsrL   rG   r  
datasourcer   s              r<   r  z!ParquetDatasource.apply_predicate  sI    T455 	;77**>::: 3>>RR ,8".();<<F!L"%d&8$."I"I . .$99,:  . %++H555 ''--- "2D)DN 	YYt__

 &2*J77GG+   r>   r   c                 b    t          d |D                       | j        z  }t          |          S )Nc                     g | ]	}|j         
S rT   r6   r   r5   s     r<   r   z;ParquetDatasource._estimate_in_mem_size.<locals>.<listcomp>  s    :::11;:::r>   )sumr   round)r;   r   in_mem_sizes      r<   r   z'ParquetDatasource._estimate_in_mem_size  s4    ::	:::;;d>RR[!!!r>   r   pyarrow.Schemar   r   r   c                  	 ddl }| | 	nMt                    }|t          |          ng }|                    |fd|D             z   j                  	|'|                    	fd|D             	j                  	|j	                                }	  ||          j                            	j                  	n,# t          $ r t                              dd           Y nw xY wt          	           	S )	z(Derives target schema for read operationr   Nc                 P    g | ]"}                     |j                  d k     |#S ))get_field_indexr`   )r   r5   r   s     r<   r   z4ParquetDatasource._derive_schema.<locals>.<listcomp>  sC        '66qv>>"DD	  EDDr>   )fieldsmetadatac                 :    g | ]}                     |          S rT   )field)r   columnr   s     r<   r   z4ParquetDatasource._derive_schema.<locals>.<listcomp>  s'    MMM$$V,,MMMr>   zdFailed to infer schema of dataset by passing dummy table through UDF due to the following exception:T)exc_info)
rV   r   r^   r(  empty_tablewith_metadata	Exceptionr   debugrb   )
r   r   r   r   r   r_   file_schema_fieldspartition_schema_fieldsdummy_tabler   s
    `       @r<   r   z ParquetDatasource._derive_schema  s|    	 "'MM!%k!2!2*:*F%&&&B $
 II&   !8   %- &  M (IIMMMM;LMMM& M
 !'3355K	 *
; 7 7 > L L!*! !    B!       	%]333s   (C &C+*C+r8   )'rM   rN   rO   rP   _FILE_EXTENSIONSr%   r   strr
   r   r   r   r   r   ra   r!   r&   r   boolr=   rQ   r   r   r   r  rR   r  r  r	  r   r   r   r*   r  r4   r   rS   r   __classcell__)r   s   @r<   ry   ry     s         "{ (,37489=8<>B8<04/;|F/C/C15#/3l
 l
 l
S$s)^$l
 $s)$	l

 !c3h0l
 "$sCx.1l
 Xugun56l
 45l
 t%99:;l
   45l
 .l
 |,l
 ww'-.l
 l
 "$s),l
 l
 l
 l
 l
 l
\>S > > > > EI] ]]4<SM]	h] ] ] ]~   0D 0 0 0 X0d    T    	@c(; 	@ 	@ 	@ 	@ c(;        D8DI#6    *77 
7 7 7 7 7 7r"t4D/E "# " " " "
 ;./; &; ##34	;
 $DI.; 
; ; ; \; ; ; ; ;r>   ry   r   r   r   r   r   r^   r   r   r   r   r   zpyarrow.dataset.Expressionzpyarrow.Tablec              #     	
K   ddl m} t          |          dk    sJ t                              dt          |           d           |D ]jt
          j        j                                        }t          
	f
dd|j
                  D ]!}|j        dk    r|  | |          V  |V  "kd S )Nr   )ArrowTensorTypezReading z parquet fragmentsc                  @   
 t          j         	
  
        S )N)	r^   r   r   ri   r   include_pathr   
batch_sizer   )_read_batches_fromrC   )
r   r   r   r   rL   r   ri   r   r^   r   s
   r<   r   z read_fragments.<locals>.<lambda>   s8    &!)(?"3)*'7"3   r>   zreading batches)match)$ray.data.extensions.tensor_extensionr9  r   r   r0  r   datar   r   r   r   r   )r   r   r   r   r   ri   r^   r   r   r   r   r9  ctxtablerL   s    `````` ```   @r<   r   r     s:      EDDDDD y>>A
LL>C	NN>>>???     h"..00'             '
 
 
 	  	 E" ~!!(#)E******KKK+	 	   r>   F)r   r<  r;  use_threadsr   rL   r-   r"  r<  r;  rC  c       
   
   #   v   	
K   ddl ddlm} t          
pi           

                    d	          	
                    dd          }|	|n|z  |
                    d|           t           ||          d
 
	f	d	}|                     |            |          E d{V  dS )z3Get an iterable of batches from a parquet fragment.r   N)"_DatasourceProjectionPushdownMixinrC  filterr<  r?   pa.Tablec            	   3     	K   	  j         d	dD ]} 
j                            | g          }	r-t          j        |                              dj                  }rt          |          }|j        dk    r>|j	        dk    r3|
                    t          
                    |j	                            }|V  dS # 
j        j        $ rp}t          |          }d|v rWUt           j                            j                  }t%          j        j                  }t+          d d| d|            d}~ww xY w)
z4Inner generator that yields tables without renaming.)r|   rF  r^   rC  rG   r   zNo match for FieldRef.NameNzFilter expression: 'z' failed on parquet file: 'z' with columns: rT   )
to_batchesTablefrom_batchesr   	for_blockfill_columnrG   _add_partitions_to_tablenum_columnsr   append_columnr   nullslibArrowInvalidr5  osbasenamero   physical_schemarX   r]   )batchrB  eerror_messagefilenamefile_columnsr   r   rL   r;  r_   partition_col_valuesr^   r   rC  s         r<   _generate_tablesz,_read_batches_from.<locals>._generate_tablesa  s     4	,, $"'	 
 $  % % --ug66 .8??KK E ( R45I5QQE" $))enq.@.@!//<bhhu~>V>V E K% %N v" 	 	 	FFM,==+7++HM::"8#;#ABB"G; G G&G G8DG G   	s   B>C EA+D??E)r?   rG  )rV   ray.data.datasource.datasourcerE  dictpop
setdefault_parse_partition_column_values_apply_rename_to_tables)rL   r^   r   r   ri   r   r   r<  r;  rC  r   rE  filter_from_kwargsr]  r_   r\  s   ```   ` ```   @@r<   r=  r=  7  sd       QQQQQQ .4"55 $''{CCK*..x>>% " 11 	 $$\:>>>9#\ 6 6 6 6 6 6 6 6 6 6 6 6 6 6r 2II3          r>   c                     i }|t          |          } || j                  } fd|                                D             }|S )Nc                 $    i | ]\  }}|v 	||S rT   rT   )r   
field_namevalueri   s      r<   r   z2_parse_partition_column_values.<locals>.<dictcomp>  s5     
 
 
!
E... ...r>   )r'   rG   items)rL   ri   r   
partitionsr   s    `   r<   rb  rb    sr    
 J#L11U8=))
 $
 
 
 
%/%5%5%7%7
 
 

 r>   r|   _ParquetFileInfoc                   | j         j        }|j        dk    rd S | j                             dg          }t	          t          |j        j        t                    d          }i }t                      t          k    rd|d<    |j
        d|||d|}d }|D ]0}	|	j        dk    r#t          j        |	j        |	j        z            } n1t          ||          S )Nr   )row_group_idsr0   batch_readahead)r|   r^   r<  )avg_row_in_mem_bytesr(  rT   )rC   r(  num_row_groupssubsetmaxminr   (PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWSr    MIN_PYARROW_TO_BATCHES_READAHEADrI  mathceilnbytesrk  )
rL   r|   r^   r(  row_group_fragmentr<  r   batches_iteravg_row_sizerW  s
             r<   _fetch_parquet_file_infor|    s+     )H!##t "*111DD'04	
 	
 	
 J  @@@/0+,0%0   	 L #'L   >A9U\EN%BCCLE  )   r>   c                   H    e Zd ZU ee         ed<   ded<   dee         fdZdS )rk  ro  zpyarrow._parquet.FileMetaDatar(  r?   c                 <    | j         d S | j         | j        j        z  S r8   )ro  r(  r   rA   s    r<   estimate_in_memory_bytesz)_ParquetFileInfo.estimate_in_memory_bytes  s#    $,4(4=+AAAr>   N)rM   rN   rO   r   rQ   rg   r  rT   r>   r<   rk  rk    sZ          #3-'''----B(3- B B B B B Br>   
file_infosc                    t          j                    j        st          S t	          |          t	          |           k    sJ d |D             }d | D             }d t          ||          D             }|st          S t          j        |          }t          	                    d|dd           t          |t                    S )zReturn an estimate of the Parquet files encoding ratio.

    To avoid OOMs, it is safer to return an over-estimate than an underestimate.
    c                 >    g | ]}||                                 nd S r8   )r  )r   fis     r<   r   z2_estimate_files_encoding_ratio.<locals>.<listcomp>  s:     ! ! !FH##%%%T! ! !r>   c                     g | ]	}|j         
S rT   r  r  s     r<   r   z2_estimate_files_encoding_ratio.<locals>.<listcomp>
  s    444QQ[444r>   c                 H    g | ]\  }}|d k    |t          |          |z   S )r   )float)r   r!  r6   s      r<   r   z2_estimate_files_encoding_ratio.<locals>.<listcomp>  sA     ! ! !"Kq==[4 	kY&444r>   z$Estimated parquet encoding ratio is z.3f.)r   r   decoding_size_estimation'PARQUET_ENCODING_RATIO_ESTIMATE_DEFAULTr   rW   r   meanr   inforr  +PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND)r   r  estimated_in_mem_size_arrfile_size_arrestimated_encoding_ratiosestimated_ratios         r<   r   r     s     "$$= 766z??c)nn,,,,! !LV! ! ! 54)444M! !&)*C]&S&S! ! ! % 766g788O
KKMMMMMNNN KLLLr>   r   r   c                   t          t                    }g }| D ]`}|                    |                    |pt	          j                    j        t          g                              |||                     at          dt          |          d          }|                    |          }|                                 |S )N)scheduling_strategyretry_exceptions)r|   r^   zParquet dataset samplingfile)unit)r   r|  r   optionsr   r   r  OSErrorremoter   r   fetch_until_completeclose)	r   r|   r^   r   fetch_file_infofuturesrL   
sample_barr  s	            r<   r   r     s     ''?@@OG% 
 
 	##$4 %A*,,@")	 $  
 f   	
 	
 	
 	
 7WFSSSJ0099Jr>   target_block_sizec                     d S fd| D             }|st           S t          t          j        t	          j        |                    d          }t                              d| d           |S )Nc                 L    g | ] }||j         |j         dk    |j         z  !S r   )ro  )r   r  r  s     r<   r   z/_estimate_reader_batch_size.<locals>.<listcomp>E  sI       N'3'!++ 	B33
 ,++r>   r0   z'Estimated parquet reader batch size at z rows)%DEFAULT_PARQUET_READER_ROW_BATCH_SIZErr  rv  rw  r   r  r   r  )r  r  avg_num_rows_per_blockestimated_batch_sizes    `  r<   r   r   ?  s      t      " 544 #DIbg6L.M.M$N$NPQ R R
KKU:NUUUVVVr>   c                    dd l m} t          |           dk    r| d         } 	  |j        | fi |d|i}n# t          $ r~ 	 t          | d           \  }}t          j        |t          j	                    j
                  } |j        |fi |d|i}n'# t          $ r}t          ||            Y d }~nd }~ww xY wY n&t          $ r}t          ||            Y d }~nd }~ww xY w|S )Nr   r0   rH   )rH   r   )pyarrow.parquetrz   r   ParquetDataset	TypeErrorr(   r   r   r   r   r   r  r"   )	r   rH   r}   pqdatasetresolved_pathsresolved_filesystemos_erX  s	            r<   r   r   Y  s         
 5zzQa(#"#
 

 
 "
 
 

  / / /	/2O$3 3 3/N/ #7";#!,!8!:!:!L# # # (b'    /  GG
  	/ 	/ 	/!$........	/ ( ( (a''''''''(NsF   6 
C ABC 
B8B3.C 3B88C =	C CC c                      sg S t          j        t                     t          z            }t	          t          |t                    t                    }t          |t                               }t          j	        dt                     dz
  |          
                    t                    } fd|                                D             S )Nr   r0   c                      g | ]
}|         S rT   rT   )r   idxr   s     r<   r   z%_sample_fragments.<locals>.<listcomp>  s    666sIcN666r>   )rv  rw  r   .PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIOrr  rs  /PARQUET_ENCODING_RATIO_ESTIMATE_MAX_NUM_SAMPLES/PARQUET_ENCODING_RATIO_ESTIMATE_MIN_NUM_SAMPLESr   linspaceastyperQ   tolist)r   target_num_samplespivotss   `  r<   r   r   ~  s      	IGG   OPP7  /Y@@ [C	NNQ.0BCCJJ3OOF6666fmmoo6666r>   r\  rB  c                 ,   |                                  D ]~\  }}|j                            |          }|dk    r)t          j        |                              ||          }Nt          d|           rt                              d| d           |S )Nr%  duplicate_partition_field_zThe partition field 'z`' also exists in the Parquet file. Ray Data will default to using the value in the Parquet file.)	ri  r^   r&  r   rL  rM  r,   r   r   )r\  rB  partition_colrh  field_indexs        r<   rN  rN    s     !5 : : < <  ul22=AA"!+E22>>}eTTEEB=BBCC 	NNW W W W  
 Lr>   
file_pathsc                    ddl }t          |          dk    r|                    g           S | |                    g           S |d         }g }t          |           } ||          }|D ]i}|| j        v r!|                    | j        |                   }n|                                }|                    |                    ||                     j|                    |          S )zReturn a new schema with partition fields added.

    This function infers the partition fields from the first file path in the dataset.
    r   N)	rV   r   r^   r'   field_typesfrom_numpy_dtypestringr   r*  )	r   r  r_   
first_pathr'  r  rj  rg  
field_types	            r<   r   r     s      :!yy}}		yy}}AJF ..F
##J  8 8
111,,\-Ej-QRRJJJ 	bhhz:66777799Vr>   user_specified_columnsc                     fd| D             }|.t          |          } |j                  fd| D             }ng }||fS )aV  Infer which columns are in the files and which columns are partition columns.

    This function uses the schema and path of the first file to infer what columns
    represent.

    Args:
        user_specified_columns: A list of column names that the user specified.
        fragment: The first fragment in the dataset.
        partitioning: The partitioning scheme used to partition the data.

    Returns:
        A tuple of lists of column names. The first list contains the columns that are
        in the file, and the second list contains the columns that are partition
        columns.
    c                 0    g | ]}|j         j        v |S rT   )rV  rX   )r   r+  rL   s     r<   r   z5_infer_data_and_partition_columns.<locals>.<listcomp>  s4       X-333 	333r>   Nc                     g | ]}|v |	S rT   rT   )r   r+  rj  s     r<   r   z5_infer_data_and_partition_columns.<locals>.<listcomp>  s*     
 
 
6Z;O;OF;O;O;Or>   )r'   rG   )r  rL   r   r   r   ri   rj  s    `    @r<   r   r     s    (   ,  L
 #L11U8=))

 
 
 
!7
 
 
 ***r>   r8   )lloggingrv  rT  dataclassesr   typingr   r   r   r   r   r	   r
   r   r   r   r   numpyr   packaging.versionr   parse_versionr   ray._private.arrow_utilsr   ray.data._internal.arrow_blockr   r   >ray.data._internal.planner.plan_expression.expression_visitorsr   ray.data._internal.progress_barr   ray.data._internal.remote_fnr   ray.data._internal.utilr   r   r   r   ray.data.blockr   r   r   ray.data.contextr   ray.data.datasourcer   r^  r   )ray.data.datasource.file_based_datasourcer    &ray.data.datasource.file_meta_providerr!   r"   r#    ray.data.datasource.partitioningr$   r%   r&   r'   ray.data.datasource.path_utilr(   ray.data.expressionsr)   r*   r+   ray.util.debugr,   rV   pyarrow.datasetr-   	getLoggerrM   r   ru  NUM_CPUS_FOR_META_FETCH_TASKr  FILE_READING_RETRYr  r  r  r  r  rt  r4   rb   rd   ro   rr   ry   r5  rQ   ra   r6  r   r=  rb  r|  rk  r  r   r   r   r   r   rN  r   r   rT   r>   r<   <module>r     s     				 ! ! ! ! ! !                              4 4 4 4 4 4 



 8 8 8 8 8 8             8 7 7 7 7 7 9 9 9 9 9 9            ? > > > > > > > > > ( ( ( ( ( ( * * * * * * 3 3 3 3 3 3 G G G G G G         
                 = < < < < < < < < < # # # # # # 4NNN333333 
	8	$	$ $1=#:#:  
  #  )/ %  +, ' /0 + 26 . 34 /24 / ,0 (5 5 5 5 5 5 5 5@  0 ( ( ( ( ( ( ( (`P`P`P `P `P `P `PFk k k k k
 k k kr ;?-  - %01- CH~-  #+3--  49%	- 
 &d38n5-   S	*-  U4!5567-  $%-  -  -  67-  o-  -  -  - p ;? $26e e e#e e 49%	e
 &d38n5e  S	*e e 67e e e e  S#X/e oe e e eP#S	*    ,22 d3i 2 %&	2
  !2 2 2 2j 
B 
B 
B 
B 
B 
B 
B 
B"M$%"M%&"M "M "M "M "MJ,- d3i  %&	
 tn 
(#
$%   D X./0 EMc] c]       4" " "J7$%7	
7 7 7 72s$556?N   "!!S	! ! ! ! !H"+ I"+#"+ <("+ 49d3i 	"+ "+ "+ "+ "+ "+r>   