
    &`iO                     d   d Z ddlZddlmZmZ ddlmZmZmZm	Z	m
Z
mZmZ ddlmZ ddlmZ ddlmZmZ ddlmZmZ dd	lmZ dd
lmZ er"ddlZddlmZ ddlm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z&  ej'        e(          Z)e G d d                      Z*dZ+e G d dee*                               Z,dS )zU
Module to write a Ray Dataset into an iceberg table, by using the Ray Datasink API.
    N)	dataclassfield)TYPE_CHECKINGAnyDictIterableListOptionalUnion)TaskContext)SaveMode)BlockBlockAccessor)DatasinkWriteResult)Expr)DeveloperAPI)Catalog)DataFile)Schema)Table)UpdateSchemac                       e Zd ZU dZ ee          Zed         ed<   dZ	e
eeee         f                  ed<    ee          Zed         ed<   dS )	IcebergWriteResultaI  Result from writing blocks to Iceberg storage.

    Attributes:
        data_files: List of DataFile objects containing metadata about written Parquet files.
        upsert_keys: Dictionary mapping column names to lists of key values for upsert operations.
        schemas: List of PyArrow schemas from all non-empty blocks.
    )default_factoryr   
data_filesNupsert_keys	pa.Schemaschemas)__name__
__module____qualname____doc__r   listr   r	   __annotations__r   r
   r   strr   r        /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/iceberg_datasink.pyr   r      s           $)5#>#>#>JZ >>>26K$sDI~./666!&t!<!<!<GT+<<<<<r(   r   	join_colsc                       e Zd ZdZddej        dddfdedeeee	f                  deeeef                  deded         d	eeee	f                  d
eeee	f                  fdZ
defdZdeddfdZd,dZd-dZdee         fdZddded         ddfdZddded         deeeee	         f                  ddfdZ	 	 	 	 	 	 d.dZddd ed!         ddddfd"Zd/d#ed$         ddfd%Zd&ee         d'edefd(Zddded         ddfd)Zd*eddfd+ZdS )0IcebergDatasinka  
    Iceberg datasink to write a Ray Dataset into an existing Iceberg table.
    This datasink handles concurrent writes by:
    - Each worker writes Parquet files to storage and returns DataFile metadata
    - The driver collects all DataFile objects and performs a single commit

    Schema evolution is supported:
    - New columns in incoming data are automatically added to the table schema
    - Type promotion across blocks is handled via schema reconciliation on the driver
    Ntable_identifiercatalog_kwargssnapshot_propertiesmodeoverwrite_filterr   upsert_kwargsoverwrite_kwargsc                 .   || _         |pi                                 | _        |pi                                 | _        || _        || _        |pi                                 | _        |pi                                 | _        | j        r,| j        t          j	        k    rt          d| j                   | j        r,| j        t          j        k    rt          d| j                   | j        r,| j        t          j        k    rt          d| j                   dD ]@\  }}	| j                            |d           t                              d| d|	            Ad| j        v r | j                            d          | _        nd	| _        d| _        dS )
a  
        Initialize the IcebergDatasink

        Args:
            table_identifier: The identifier of the table such as `default.taxi_dataset`
            catalog_kwargs: Optional arguments to use when setting up the Iceberg catalog
            snapshot_properties: Custom properties to write to snapshot summary
            mode: Write mode - APPEND, UPSERT, or OVERWRITE. Defaults to APPEND.
                - APPEND: Add new data without checking for duplicates
                - UPSERT: Update existing rows or insert new ones based on a join condition
                - OVERWRITE: Replace table data (all data or filtered subset)
            overwrite_filter: Optional filter for OVERWRITE mode to perform partial overwrites.
                Must be a Ray Data expression from `ray.data.expressions`. Only rows matching
                this filter are replaced. If None with OVERWRITE mode, replaces all table data.
            upsert_kwargs: Optional arguments for upsert operations.
                Supported parameters: join_cols (List[str]), case_sensitive (bool),
                branch (str). Note: This implementation uses a copy-on-write strategy
                that always updates all columns for matched keys and inserts all new keys.
            overwrite_kwargs: Optional arguments to pass through to PyIceberg's table.overwrite()
                method. Supported parameters include case_sensitive (bool) and branch (str).
                See PyIceberg documentation for details.

        Note:
            Schema evolution is automatically enabled. New columns in the incoming data
            are automatically added to the table schema. The schema is extracted from
            the first input bundle when on_write_start is called.
        zNupsert_kwargs can only be specified when mode is SaveMode.UPSERT, but mode is zToverwrite_kwargs can only be specified when mode is SaveMode.OVERWRITE, but mode is zToverwrite_filter can only be specified when mode is SaveMode.OVERWRITE, but mode is ))r1   z;should be passed as a separate parameter to write_iceberg())delete_filterzBis an internal PyIceberg parameter; use 'overwrite_filter' insteadNz	Removed 'z' from overwrite_kwargs: namedefault)r-   copy_catalog_kwargs_snapshot_properties_mode_overwrite_filter_upsert_kwargs_overwrite_kwargsr   UPSERT
ValueError	OVERWRITEpoploggerwarning_catalog_name_table)
selfr-   r.   r/   r0   r1   r2   r3   invalid_paramreasons
             r)   __init__zIcebergDatasink.__init__9   s   J !1 . 4"::<<%8%>B$D$D$F$F!
!1,288::"2"8b!>!>!@!@  	4:#@#@maeakmm   ! 	djH4F&F&Fsgkgqss   ! 	djH4F&F&Fsgkgqss  
	&
 	 	!M6 %))->>JPPPPP   T)))!%!5!9!9&!A!AD!*D#r(   returnc                 d    | j                                         }|                    dd           |S )z!Exclude `_table` during pickling.rF   N)__dict__r8   rB   rG   states     r)   __getstate__zIcebergDatasink.__getstate__   s.    ""$$		(D!!!r(   rO   c                 H    | j                             |           d | _        d S N)rM   updaterF   rN   s     r)   __setstate__zIcebergDatasink.__setstate__   s#    U###r(   r   c                 >    ddl m}  |j        | j        fi | j        S )Nr   )catalog)	pyicebergrV   load_catalogrE   r9   rG   rV   s     r)   _get_catalogzIcebergDatasink._get_catalog   s5    %%%%%%#w#D$6OO$:NOOOr(   c                 l    |                                  }|                    | j                  | _        dS )z*Reload the Iceberg table from the catalog.N)rZ   
load_tabler-   rF   rY   s     r)   _reload_tablezIcebergDatasink._reload_table   s/    ##%%(()>??r(   c                 &   | j                             t          g           }|sn| j        j                                        j        D ]J}| j        j                                                            |          }|r|                    |           K|S )zGGet join columns for upsert, using table identifier fields as fallback.)	r=   get_UPSERT_COLS_IDrF   metadataschemaidentifier_field_idsfind_column_nameappend)rG   upsert_colsfield_idcol_names       r)   _get_upsert_colsz IcebergDatasink._get_upsert_cols   s    )--orBB 	1 K07799N 1 1;/6688II(SS 1&&x000r(   txnzTable.transactionr   r   c                     |                     | j                  5 }|D ]}|                    |           	 ddd           n# 1 swxY w Y   |                                 dS )zAppend data files to a transaction and commit.

        Args:
            txn: PyIceberg transaction object
            data_files: List of DataFile objects to append
        N)_append_snapshot_producerr:   append_data_filecommit_transaction)rG   rj   r   append_files	data_files        r)   _append_and_commitz"IcebergDatasink._append_and_commit   s     **4+DEE 	9' 9 9	--i88889	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	9 	     s   AA
Aupsert_keys_dictsc                    ddl }ddlm} ddlddlm}  |t                    }|D ]7}|                                D ] \  }	}
||	                             |
           !8|r j	        t          |                    |                                 }fd|D             }|                    j        j        |          }                    |          t!                    dk    rU ||          }| j                                        }|                    t(          d            |j        d|| j        d| |                     ||           dS )a   
        Commit upsert transaction with copy-on-write strategy.

        Args:
            txn: PyIceberg transaction object
            data_files: List of DataFile objects to commit
            upsert_keys_dicts: List of dictionaries mapping column names to lists of key values
        r   Ndefaultdict)create_match_filterc              3   X   K   | ]$}j                             |                   V  %d S rR   )computeis_valid).0col
keys_tablepas     r)   	<genexpr>z1IcebergDatasink._commit_upsert.<locals>.<genexpr>   s7      QQcRZ((C99QQQQQQr(   r5   r/   r'   )	functoolscollectionsru   pyarrowpyiceberg.table.upsert_utilrv   r$   itemsextendtabledictri   reducerx   and_filterlenr=   r8   rB   r`   deleter:   rq   )rG   rj   r   rr   r   ru   rv   merged_keys_dictupsert_keys_dictr{   valuesrf   masksmaskr5   delete_kwargsr|   r}   s                   @@r)   _commit_upsertzIcebergDatasink._commit_upsert   s    	++++++CCCCCC ';t,, 1 	5 	5/5577 5 5V %,,V44445  	!$'7"8"899J //11KQQQQQ[QQQE##BJOU;;D#**400J :"" 3 3J L L !% 3 8 8 : :!!/4888
 "/(,(A  $   	Z00000r(   rS   r   table_schemar   c           
          ddl m} |j        }|D ]W}||j        v rL|j        |         }|j        s8 ||j        |j        |j        |j        d|j	        |j
                  |j        |<   XdS )a  Ensure identifier fields remain required after schema union.

        When union_by_name is called with a schema that has nullable fields,
        PyIceberg may make identifier fields optional. Since identifier fields
        must be required, this helper ensures they remain required after union.

        Example:
            Table schema:   id: int (required, identifier), val: string
            Input schema:   id: int (optional),             val: string

            `union_by_name` merges them to:
                            id: int (optional),             val: string

            This violates the identifier constraint. This function forces `id`
            back to required in the pending update.

        Args:
            update: The UpdateSchema object from update_schema() context manager
            table_schema: The current table schema to get identifier field IDs from
        r   )NestedFieldT)rg   r6   
field_typedocrequiredinitial_defaultwrite_defaultN)pyiceberg.typesr   rc   _updatesr   rg   r6   r   r   r   r   )rG   rS   r   r   rc   rg   updated_fields          r)   '_preserve_identifier_field_requirementsz7IcebergDatasink._preserve_identifier_field_requirements   s    . 	0/////+@, 	 	H6?** & 9$-  1<!.!7*/#0#;)-!%(5(E&3&A1 1 1FOH-	 	r(   
new_schema)r   r   c                 \    |                     |           |                     ||           dS )a  Update schema using union_by_name while preserving identifier field requirements.

        Args:
            update: The UpdateSchema object.
            new_schema: The new schema to union with the table schema.
            table_schema: The current table schema.
        N)union_by_namer   )rG   rS   r   r   s       r)   _update_schema_with_unionz)IcebergDatasink._update_schema_with_union  s4     	Z(((44V\JJJJJr(   rb   r   c                    |                                   |z| j        j                                        }| j                                        5 }|                     |||           ddd           n# 1 swxY w Y   |                                   | j        t          j        k    rV| j	        
                    t          g           }|s6| j        j                                        j        }|st          d          dS dS dS )ap  Initialize table for writing and create a shared write UUID.

        Args:
            schema: The PyArrow schema of the data being written. This is
                automatically extracted from the first input bundle by the
                Write operator. Used to evolve the table schema before writing
                to avoid PyIceberg name mapping errors.
        Nz`join_cols must be specified in upsert_kwargs for UPSERT mode when table has no identifier fields)r]   rF   ra   rb   update_schemar   r;   r   r?   r=   r_   r`   rc   r@   )rG   rb   r   rS   rf   rc   s         r)   on_write_startzIcebergDatasink.on_write_start/  sb    	 ;/6688L**,, M..vv|LLLM M M M M M M M M M M M M M M     :((-11/2FFK 	 K(//11F % , $>   )(	 	
 s   A22A69A6blocksctxc                    ddl m} ddlm} | j        |                                  g } |t                    }g }| j        t          j	        k    }|D ]}	t          j        |	                                          }
|
j        dk    r|                    |
j                   |rL|                                 }|D ]5}||                             |
|                                                    6t           || j        j        |
| j        j                            }|                    |           t+          ||pd|          S )aD  
        Write blocks to Parquet files in storage and return DataFile metadata with schemas.

        This runs on each worker in parallel. Files are written directly to storage
        (S3, HDFS, etc.) and only metadata is returned to the driver.
        Schema updates are NOT performed here - they happen on the driver.

        Args:
            blocks: Iterable of Ray Data blocks to write
            ctx: TaskContext object containing task-specific information

        Returns:
            IcebergWriteResult containing DataFile objects, upsert keys, and schemas.
        r   rt   )_dataframe_to_data_filesN)table_metadatadfio)r   r   r   )r   ru   pyiceberg.io.pyarrowr   rF   r]   r$   r;   r   r?   r   	for_blockto_arrownum_rowsre   rb   ri   r   	to_pylistra   r   r   )rG   r   r   ru   r   all_data_filesr   block_schemasuse_copy_on_write_upsertblockpa_tablerf   r{   r   s                 r)   writezIcebergDatasink.writeQ  s    	,+++++AAAAAA ;   &;t,,#':#@  	2 	2E$.u55>>@@H 1$$$$X_555 , P"&"7"7"9"9K* P P(-44Xc]5L5L5N5NOOOO ",,'+{';#;>   
 %%j111!%(0D!
 
 
 	
r(   c                    | j         Eddlm}  |            }|                    | j                   } |j        d|| j        d| j         n(ddlm}  |j        d |            | j        d| j         | 	                    ||           dS )z'Commit data files using OVERWRITE mode.Nr   )_IcebergExpressionVisitorr   )
AlwaysTruer'   )
r<   0ray.data._internal.datasource.iceberg_datasourcer   visitr   r:   r>   pyiceberg.expressionsr   rq   )rG   rj   r   r   visitor
pyi_filterr   s          r)   _commit_overwritez!IcebergDatasink._commit_overwrite  s    
 !-      0/11G t'=>>JCJ ($($=  (    988888CJ (jll$($=  (   	Z00000r(   write_resultc                    g }g }g }|j         D ]a}|s|j        rU|                    |j                   |                    |j                   |j        r|                    |j                   b|sdS ddlm} ddlm	} |
                    | j                                                  } ||g|z   d          }	| j                                        }
|	                    |          sa| j        j                                        }|
                                5 }|                     ||	|           ddd           n# 1 swxY w Y   | j        t&          j        k    r|                     |
|           dS | j        t&          j        k    r|                     |
|           dS | j        t&          j        k    r|                     |
||           dS t5          d| j                   )ak  
        Complete the write by reconciling schemas and committing all data files.

        This runs on the driver after all workers finish writing files.
        Collects all DataFile objects and schemas from all workers, reconciles schemas
        (allowing type promotion), updates table schema if needed, then performs a single
        atomic commit.
        Nr   )r   )unify_schemasT)promote_typeszUnsupported mode: )write_returnsr   r   r   r   re   pyiceberg.ior   .ray.data._internal.arrow_ops.transform_pyarrowr   schema_to_pyarrowrF   rb   transactionequalsra   r   r   r;   r   APPENDrq   rA   r   r?   r   r@   )rG   r   r   all_schemasrr   write_return	pyi_pa_ior   r   final_reconciled_schemarj   current_table_schemarS   s                r)   on_write_completez!IcebergDatasink.on_write_complete  sq    ,.)+8:(6 	G 	GL & G%%l&=>>>""<#7888+ G%,,\-EFFF 	F 	655555PPPPPP 224;3E3E3G3GHH"/-N[(#
 #
 #

 k%%'' '--l;; 	#';#7#>#>#@#@ ""$$ ..35I                 :((##C88888Z8---""377777Z8?**^5FGGGGG>$*>>???s   EEE)rK   r   )rK   N)rS   r   r   r   rK   NrR   ) r    r!   r"   r#   r   r   r&   r
   r   r   rJ   r   rP   rT   rZ   r]   r	   ri   rq   r   r   r   r   r   r   r   r   r   r   r   r   r   r'   r(   r)   r,   r,   ,   s       	 	 488<!-12659P$ P$P$ !c3h0P$ &d38n5	P$
 P$ #6*P$  S#X/P$ #4S>2P$ P$ P$ P$dd    $ 4    P P P P
@ @ @ @
	$s) 	 	 	 	!&!484D!	! ! ! !61 61 $61  S$s)^ 45	61
 
61 61 61 61p-$-4<-	- - - -^KK /0K 	K
 
K K K K    Xk%:  d        D6
HUO 6
+ 6
BT 6
 6
 6
 6
p1&1484D1	1 1 1 1>9@k 9@d 9@ 9@ 9@ 9@ 9@ 9@r(   r,   )-r#   loggingdataclassesr   r   typingr   r   r   r   r	   r
   r   'ray.data._internal.execution.interfacesr   ray.data._internal.savemoder   ray.data.blockr   r   ray.data.datasource.datasinkr   r   ray.data.expressionsr   ray.util.annotationsr   r   r}   pyiceberg.catalogr   pyiceberg.manifestr   pyiceberg.schemar   pyiceberg.tabler   pyiceberg.table.update.schemar   	getLoggerr    rC   r   r`   r,   r'   r(   r)   <module>r      s     ( ( ( ( ( ( ( ( L L L L L L L L L L L L L L L L L L ? ? ? ? ? ? 0 0 0 0 0 0 / / / / / / / / > > > > > > > > % % % % % % - - - - - - ;))))))++++++''''''%%%%%%::::::		8	$	$ = = = = = = = =  t@ t@ t@ t@ t@h12 t@ t@ t@ t@ t@r(   