
    &`iZH                     `   d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
mZmZmZmZmZ ddlZddlmZ ddlmZ ddlmZ ddlmZmZ dd	lmZmZ dd
lmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' ddl(m)Z) ddl*m+Z+ 	 ddl,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z= e$j>        e.e$j?        e7e$j@        e/e$jA        e0e$jB        e3e$jC        e4e$jD        e-e$jE        e:e$jF        e1e$jG        e8e$jH        e2e$jI        e9e$jJ        e6iZKn# eL$ r  e)d           Y nw xY wer.ddlMmNZN ddl,mOZO ddlPmQZQ ddlRmSZS ddlTmUZU ddlVmWZWmXZXmYZY ddlZm[Z[  ej\        e]          Z^ G d ded                   Z_de
d         dddddd d!e`d"eea         d#d$d%ee	ebebf                  d&e
e         fd'Zce+ G d( d)e                      ZddS )*zV
Module to read an iceberg table into a Ray Dataset, by using the Ray Datasource API.
    N)partial)	TYPE_CHECKINGAnyDictIterableListOptionalSetTupleUnion)version)_ExprVisitor)_check_import)BlockBlockMetadata)
DatasourceReadTask)		AliasExpr
BinaryExpr
ColumnExprDownloadExprLiteralExpr	OperationStarExprUDFExpr	UnaryExpr)log_once)DeveloperAPI)AndEqualToGreaterThanGreaterThanOrEqualInIsNullLessThanLessThanOrEqualLiteralNot
NotEqualToNotInNotNullOr	ReferenceUnboundTermliteralzBpyiceberg.expressions not found. Please install pyiceberg >= 0.9.0)Catalog)BooleanExpression)FileIO)DataFile)Schema)DataScanFileScanTaskTable)TableMetadatac                   r    e Zd ZdZddZdd	ZddZddZ	 	 	 	 ddZ	 	 	 	 ddZ		 	 	 	 ddZ
	 	 	 	 d dZdS )!_IcebergExpressionVisitora  
    Visitor that converts Ray Data expressions to PyIceberg expressions.

    This enables Ray Data users to write filters using the familiar col() syntax
    while leveraging Iceberg's native filtering capabilities.

    Example:
        >>> from ray.data.expressions import col
        >>> ray_expr = (col("date") >= "2024-01-01") & (col("status") == "active")
        >>> iceberg_expr = _IcebergExpressionVisitor().visit(ray_expr)
        >>> # iceberg_expr can now be used with PyIceberg's filter APIs
    exprr   returnUnboundTerm[Any]c                 *    t          |j                  S )z3Convert a column reference to an Iceberg reference.)r-   nameselfr;   s     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/iceberg_datasource.pyvisit_columnz&_IcebergExpressionVisitor.visit_columnc   s    ###    r   Literal[Any]c                 *    t          |j                  S )z.Convert a literal value to an Iceberg literal.)r/   valuer@   s     rB   visit_literalz'_IcebergExpressionVisitor.visit_literalg   s    tz"""rD   r   r1   c                 z   |j         t          j        t          j        fv r|                     |j                  }t          |j        t                    s5t          |j         j
         dt          |j                  j                   t          |j                  ||j        j                  S |                     |j                  }|                     |j                  }|j         t          v rt          |j                  ||          S t          d|j          dt                                           d          )z4Convert a binary operation to an Iceberg expression.z< operation requires right operand to be a literal list, got z2Unsupported binary operation for Iceberg filters: z. Iceberg filters support: zG. Arithmetic operations (ADD, SUB, MUL, DIV) cannot be used in filters.)opr   INNOT_INvisitleft
isinstancerightr   
ValueErrorr?   type__name__RAY_DATA_OPERATION_TO_ICEBERGrG   keys)rA   r;   rN   rP   s       rB   visit_binaryz&_IcebergExpressionVisitor.visit_binaryk   s5    7y|Y%5666::di((Ddj+66  w| 7 7
++47 7   19$
@PQQQ zz$)$$

4:&&733309$FFF YTW Y Y,I,N,N,P,PY Y Y  rD   r   c                     |                      |j                  }|j        t          v rt          |j                 |          S t	          d|j         dt                                                     )z3Convert a unary operation to an Iceberg expression.z)Unsupported unary operation for Iceberg: z. Supported operations: )rM   operandrJ   rT   rQ   rU   )rA   r;   rX   s      rB   visit_unaryz%_IcebergExpressionVisitor.visit_unary   s{    **T\**733309'BBBPDG P P)F)K)K)M)MP P  rD   r   3BooleanExpression | UnboundTerm[Any] | Literal[Any]c                 6    |                      |j                  S )z6Convert an aliased expression (just unwrap the alias).)rM   r;   r@   s     rB   visit_aliasz%_IcebergExpressionVisitor.visit_alias   s     zz$)$$$rD   r   c                      t          d          )z;UDF expressions cannot be converted to Iceberg expressions.zUDF expressions cannot be converted to Iceberg expressions. Iceberg filters must use simple column comparisons and boolean operations.	TypeErrorr@   s     rB   	visit_udfz#_IcebergExpressionVisitor.visit_udf   s     Y
 
 	
rD   r   c                      t          d           )z@Download expressions cannot be converted to Iceberg expressions.r^   r@   s     rB   visit_downloadz(_IcebergExpressionVisitor.visit_download   s     N
 
 	
rD   r   c                      t          d          )z<Star expressions cannot be converted to Iceberg expressions.zCStar expressions cannot be converted to Iceberg filter expressions.r^   r@   s     rB   
visit_starz$_IcebergExpressionVisitor.visit_star   s     Q
 
 	
rD   N)r;   r   r<   r=   )r;   r   r<   rE   )r;   r   r<   r1   )r;   r   r<   r1   )r;   r   r<   rZ   )r;   r   r<   rZ   )r;   r   r<   rZ   )r;   r   r<   rZ   )rS   
__module____qualname____doc__rC   rH   rV   rY   r\   r`   rb   rd    rD   rB   r:   r:   S   s         $ $ $ $# # # #   6
 
 
 
%%	>% % % %

	>
 
 
 

"
	>
 
 
 


	>
 
 
 
 
 
rD   r:   rZ   tasksr6   table_ior2   table_metadatar8   
row_filterr1   case_sensitivelimitschemar4   column_rename_mapr<   c           	   #       
K   dd l 
ddlm} dt          t          j                 f
 fd}	|                     |	            |          E d {V  d S )Nr   )"_DatasourceProjectionPushdownMixinr<   c            	   3     K   t          j        j                  t          j        d          k    rhddlm}   | 	
          }|                              }|                                D ]$}t          j        	                    |g          V  %dS ddl
m} |                    	
          }|V  dS )	z4Inner generator that yields tables without renaming.z0.9.0r   )	ArrowScan)rk   iorl   projected_schemarm   rn   )ri   pyarrow)ri   rk   ru   rl   rv   rm   rn   N)r   parse__version__pyiceberg.io.pyarrowrt   to_table
to_batchespar7   from_batchespyiceberg.iorx   project_table)rt   scannerresult_tablebatch	pyi_pa_iotablerm   rn   	pyicebergrl   ro   rj   rk   ri   s         rB   _generate_tablesz(_get_read_task.<locals>._generate_tables   s     =.//7=3I3III666666  i-%!'-  G #++%+88L &0022 5 5h++UG4444445 5
 :99999 ++-%!'- ,  E KKKKKrD   )r   ray.data.datasource.datasourcerr   r   r~   r7   _apply_rename_to_tables)ri   rj   rk   rl   rm   rn   ro   rp   rr   r   r   s   ```````   @rB   _get_read_taskr      s       QQQQQQ(hrx0 ( ( ( ( ( ( ( ( ( ( ( ( (V 2II-          rD   c                       e Zd ZdZ	 	 	 	 	 d dedeedf         deedf         d	ee         d
ee	ee
f                  dee	ee
f                  f fdZd!dZed"d            Zeded         fd            Zd#dZd$dZdee         fdZdefdZdefdZeded         dedeed                  fd            Z	 d%dedee         dee         fdZ xZS )&IcebergDatasourcez
    Iceberg datasource to read Iceberg tables into a Ray Dataset. This module heavily
    uses PyIceberg to read iceberg tables. All the routines in this class override
    `ray.data.Datasource`.
    N*table_identifierrl   r1   selected_fields.snapshot_idscan_kwargscatalog_kwargsc                    t                                                       t          | dd           ddlm} ||ni | _        ||ni | _        d| j        v r | j                            d          | _        nd| _        || _	        ||n	 |            | _
        ||dk    rd| _        nd	 |D             | _        |r
|| j        d
<   d| _        d| _        dS )a  
        Initialize an IcebergDatasource.

        Args:
            table_identifier: Fully qualified table identifier (i.e.,
                "db_name.table_name")
            row_filter: A PyIceberg BooleanExpression to use to filter the data *prior*
                 to reading
            selected_fields: Which columns from the data to read, passed directly to
                PyIceberg's load functions
            snapshot_id: Optional snapshot ID for the Iceberg table
            scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan()
                function
            catalog_kwargs: Optional arguments to use when setting up the Iceberg
                catalog
        r   )modulepackager   )
AlwaysTrueNr?   defaultr   c                     i | ]}||S rh   rh   ).0cols     rB   
<dictcomp>z.IcebergDatasource.__init__.<locals>.<dictcomp>(  s    #H#H#HC#H#H#HrD   r   )super__init__r   pyiceberg.expressionsr   _scan_kwargs_catalog_kwargspop_catalog_namer   _row_filter_projection_map_plan_files_table)	rA   r   rl   r   r   r   r   r   	__class__s	           rB   r   zIcebergDatasource.__init__   s   4 	d;DDDD444444+6+BKK1?1K~~QST)))!%!5!9!9&!A!AD!*D 0)3)?::ZZ\\ "o&?&?#'D  #H#H#H#H#HD  	;/:Dm,rD   r<   r0   c                 >    ddl m}  |j        | j        fi | j        S )Nr   )catalog)r   r   load_catalogr   r   rA   r   s     rB   _get_catalogzIcebergDatasource._get_catalog0  s5    %%%%%%#w#D$6OO$:NOOOrD   r7   c                     | j         3|                                 }|                    | j                  | _         | j         S )z=
        Return the table reference from the catalog
        )r   r   
load_tabler   r   s     rB   r   zIcebergDatasource.table5  s=    
 ;''))G!,,T-BCCDK{rD   r6   c                 x    | j         -|                                 }|                                | _         | j         S )z?
        Return the plan files specified by this query
        )r   _get_data_scan
plan_files)rA   	data_scans     rB   r   zIcebergDatasource.plan_files?  s;     #++--I(3355DrD   c                     | j         }| j        Ot                      }|                    | j                  }ddlm}m} t          ||          s |||          }n|}|S )zMGet the combined filter including both row_filter and pushed-down predicates.Nr   )r   r   )r   _predicate_exprr:   rM   r   r   r   rO   )rA   combined_filtervisitoriceberg_filterr   r   s         rB   _get_combined_filterz&IcebergDatasource._get_combined_filterK  s|    *+/11G$]]4+?@@N >=======oz:: 1"%#o~"F"F"0rD   r5   c                     |                                  }|                                 }|dnt          |          } | j        j        d||d| j        }|S )Nr   )rl   r   rh   )r   _get_data_columnstupler   scanr   )rA   r   data_columnsr   r   s        rB   r   z IcebergDatasource._get_data_scan^  sv    3355 --//$0$8&&eL>Q>Q#DJO 
&+
 
 
 
	 rD   c                 >    t          d | j        D                       S )Nc              3   .   K   | ]}|j         j        V  d S Nfilefile_size_in_bytesr   tasks     rB   	<genexpr>z@IcebergDatasource.estimate_inmemory_data_size.<locals>.<genexpr>r  s'      LLD49/LLLLLLrD   )sumr   rA   s    rB   estimate_inmemory_data_sizez-IcebergDatasource.estimate_inmemory_data_sizen  s#     LLDOLLLLLLrD   c                     dS )zEReturns True to indicate this datasource supports predicate pushdown.Trh   r   s    rB   supports_predicate_pushdownz-IcebergDatasource.supports_predicate_pushdownt      trD   c                     dS )zFReturns True to indicate this datasource supports projection pushdown.Trh   r   s    rB   supports_projection_pushdownz.IcebergDatasource.supports_projection_pushdownx  r   rD   r   n_chunksc                    d t          |          D             }d t          |          D             }t          j        |           t          | d d          D ]g}t          j        |          }||d                                      |           t          j        ||d         |j        j        z   |d         f           h|S )z
        Implement a greedy knapsack algorithm to distribute the files in the scan
        across tasks, based on their file size, as evenly as possible
        c                 *    g | ]}t                      S rh   )list)r   _s     rB   
<listcomp>zIIcebergDatasource._distribute_tasks_into_equal_chunks.<locals>.<listcomp>  s    222Q$&&222rD   c                     g | ]}d |fS )r   rh   )r   chunk_ids     rB   r   zIIcebergDatasource._distribute_tasks_into_equal_chunks.<locals>.<listcomp>  s    EEE8}EEErD   c                     | j         j        S r   r   )fs    rB   <lambda>zGIcebergDatasource._distribute_tasks_into_equal_chunks.<locals>.<lambda>  s    af&? rD   T)keyreverse   r   )	rangeheapqheapifysortedheappopappendheappushr   r   )r   r   chunkschunk_sizes	plan_filesmallest_chunks         rB   #_distribute_tasks_into_equal_chunksz5IcebergDatasource._distribute_tasks_into_equal_chunks|  s     32%//222EEU8__EEEk"""  ??
 
 
 	 	I #];77N>!$%,,Y777N"1%	(II"1%    rD   parallelismper_task_row_limitc                 P   ddl m} ddlm |                                 }| j        }|                                }|                    |          }|t          t          |                    k    r:t          t          |                    }t                              d| d           | j        j        }| j        j        }	|                                 }
| j                            dd          }| j                            d          }t%          t&          ||	|
||||                                 	          g }t*                              ||          D ]}t/          t0          j                            d
 |D                                 }t7          fd|D                       }t9          t7          d |D                       |z
  t7          d |D                       d |D             d           }|                    t=          |ffd	|||                     |S )Nr   rw   )DataFileContentzReducing the parallelism to z, as that is thenumber of filesrm   Trn   )rj   rk   rl   rm   rn   ro   rp   c                     g | ]	}|j         
S rh   )delete_filesr   s     rB   r   z4IcebergDatasource.get_read_tasks.<locals>.<listcomp>  s    ???4T&???rD   c              3   F   K   | ]}|j         j        k    |j        V  d S r   )contentPOSITION_DELETESrecord_count)r   deleter   s     rB   r   z3IcebergDatasource.get_read_tasks.<locals>.<genexpr>  sB       ( (>_%EEE #EEEE( (rD   c              3   .   K   | ]}|j         j        V  d S r   )r   r   r   s     rB   r   z3IcebergDatasource.get_read_tasks.<locals>.<genexpr>  s'      LLTY3LLLLLLrD   c              3   $   K   | ]}|j         V  d S r   )lengthr   s     rB   r   z3IcebergDatasource.get_read_tasks.<locals>.<genexpr>  s$      CCtt{CCCCCCrD   c                 &    g | ]}|j         j        S rh   )r   	file_pathr   s     rB   r   z4IcebergDatasource.get_read_tasks.<locals>.<listcomp>  s    IIITTY0IIIrD   )num_rows
size_bytesinput_files
exec_statsc                      |           S r   rh   )ri   get_read_tasks    rB   r   z2IcebergDatasource.get_read_tasks.<locals>.<lambda>  s    mmE6J6J rD   )read_fnmetadataro   r   )r   rx   pyiceberg.manifestr   r   r   
projectionschema_to_pyarrowlenr   loggerwarningr   ru   r  r   r   getr   r   get_column_renamesr   r   set	itertoolschainfrom_iterabler   r   r   r   )rA   r   r   r   r   r   rv   
pya_schemarj   rk   rl   rm   rn   
read_taskschunk_tasksunique_deletesposition_delete_countr  r   r   s                     @@rB   get_read_tasksz IcebergDatasource.get_read_tasks  s    	655555666666 ''))	_
 %//11001ABB
 T*--....d:..//KNN"{ " " "   :=,..00
*../?FF!%%g..)!)#"5577	
 	
 	
 
,PP
 
 	 	K -0--??;??? - -N %( ( ( ( (,( ( ( % %!
 %LLLLLLL'(CC{CCCCCII[III  H )4JJJJJ%%'9	      rD   )Nr   NNN)r<   r0   )r<   r7   )r<   r1   )r<   r5   r   )rS   re   rf   rg   strr   r   r	   intr   r   r   r   propertyr   r   r   r   r   r   boolr   r   staticmethodr   r   r   r  __classcell__)r   s   @rB   r   r      sB         7;+1%)04375 55 #2235 sCx	5
 c]5 d38n-5 !c3h05 5 5 5 5 5nP P P P
    X 	 D0 	  	  	  X	    &    MXc] M M M MT    d     ^,8;	d>"	#   \< EIS SS4<SMS	hS S S S S S S SrD   r   )erg   r   r  logging	functoolsr   typingr   r   r   r   r   r	   r
   r   r   rx   r~   	packagingr   >ray.data._internal.planner.plan_expression.expression_visitorsr   ray.data._internal.utilr   ray.data.blockr   r   r   r   r   ray.data.expressionsr   r   r   r   r   r   r   r   r   ray.utilr   ray.util.annotationsr   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   EQNEGTGELTLEANDORrK   rL   IS_NULLIS_NOT_NULLNOTrT   ImportErrorpyiceberg.catalogr0   r1   r   r2   r  r3   pyiceberg.schemar4   pyiceberg.tabler5   r6   r7   pyiceberg.table.metadatar8   	getLoggerrS   r  r:   r  r  r  r   r   rh   rD   rB   <module>r6     s                X X X X X X X X X X X X X X X X X X X X X X           W W W W W W 1 1 1 1 1 1 / / / / / / / / ? ? ? ? ? ? ? ?
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
       - - - - - -%S                                     * 	gjk(hosbb%6ws%!!  S S SHQRRRRRS  7))))))777777######++++++''''''==========666666		8	$	$\
 \
 \
 \
 \
FG\
 \
 \
~<N#<< $< $	<
 < C=< <  S#X/< e_< < < <~ { { { { {
 { { { { {s   4BC8 8D
D