
    &`i+-                        d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ erd dlZdZdZej        dej         dej!        dej"        diZ#dZ$h dZ%dZ& e j'        e(          Z)de
e*         de
e*         de
e*         de+e
e*         e
e*         e
e*         f         fdZ, G d de          Z-dS )    N)Path)TYPE_CHECKINGAnyCallableDictIterableListOptional)call_with_retry)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)BlockBlockAccessor)_resolve_kwargs)_FileDatasink)FilenameProvider
       overwrite_or_ignoreerrorparquet>   pathbuffermetadatai   row_group_sizemin_rows_per_filemax_rows_per_filereturnc                     | ||dS | |||}}}|||t           k    r||}}|||fS | 	||| | |fS t          |t          | |                    }|||fS )z
    Configure `min_rows_per_group`, `max_rows_per_group`, `max_rows_per_file` parameters of Pyarrow's `write_dataset` API based on Ray Data's configuration

    Returns
    -------
    (min_rows_per_group, max_rows_per_group, max_rows_per_file)
    N)NNN) ARROW_DEFAULT_MAX_ROWS_PER_GROUPmaxmin)r   r   r   min_rows_per_groupmax_rows_per_groupclamped_group_sizes         /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/parquet_datasink.pychoose_row_group_limitsr(   *   s     	%%		
  1B. *"*"%EEE #" !2
 "#57HHH		#!%6%>~/@@@ !s>3DEE
 
 "#57HHH    c                       e Zd Zddddddddddej        ddedeee                  deeg e	ee
f         f                  dee	ee
f                  dee         d	ee         d
ed         dedee	ee
f                  dee         dee         def fdZdee         deddfdZdededefdZded         dedddede	ee
f         ddfdZedee         fd            Z xZS ) ParquetDatasinkNT)partition_colsarrow_parquet_args_fnarrow_parquet_argsr   r   
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidmoder   r,   r-   r.   r   r   r/   zpyarrow.fs.FileSystemr0   r1   r2   r3   r4   c          
         |d }|i }|| _         || _        || _        || _        || _        | j        !| j        | j        | j        k    s
J d            |	jt
                              t          |	                                                    }|rt          
                    d|           d|	v r|	d         | j        d<   t                                          ||||	|
|t          |           d S )Nc                      i S N r8   r)   r'   <lambda>z*ParquetDatasink.__init__.<locals>.<lambda>z   s    B r)   zAmin_rows_per_file must be less than or equal to max_rows_per_filezopen_stream_args contains unsupported arguments: %s. These arguments are not supported by ParquetDatasink. They will be ignored.compression)r/   r0   r1   r2   r3   file_formatr4   )r-   r.   r   r   r,   UNSUPPORTED_OPEN_STREAM_ARGSintersectionsetkeysloggerwarningsuper__init__FILE_FORMAT)selfr   r,   r-   r.   r   r   r/   r0   r1   r2   r3   r4   intersecting_keys	__class__s                 r'   rC   zParquetDatasink.__init__i   s=     !($.J!%!#%:""4!2!2,!-$2H2T&$*@@@@R A@@ ' < I I$))++,,! ! ! R%    0009I-9X'6!)-/%# 	 		
 		
 		
 		
 		
r)   blocksctxr   c                     dd l t                    t          d D                       rd S d D              j                            d         j        t                   j        d          t           j	        fi  j
                            dd            fd}t                              d d j         d           t          |d	 d
 j         d j        j        t$          t&                     d S )Nr   c              3   j   K   | ].}t          j        |                                          d k    V  /dS )r   Nr   	for_blocknum_rows.0blocks     r'   	<genexpr>z(ParquetDatasink.write.<locals>.<genexpr>   s>      RR%}&u--6688A=RRRRRRr)   c                 f    g | ].}t          j        |                                          d k    ,|/S )r   rL   rO   s     r'   
<listcomp>z)ParquetDatasink.write.<locals>.<listcomp>   sA     
 
 
)@)G)G)P)P)R)RUV)V)VE)V)V)Vr)   schemac                      d D             }                       d | D                       }n}                    | |j        t                              d S )Nc                 Z    g | ](}t          j        |                                          )S r8   )r   rM   to_arrowrO   s     r'   rT   zGParquetDatasink.write.<locals>.write_blocks_to_path.<locals>.<listcomp>   s/    TTTEm-e44==??TTTr)   c                     g | ]	}|j         
S r8   )rU   )rP   tables     r'   rT   zGParquetDatasink.write.<locals>.write_blocks_to_path.<locals>.<listcomp>   s    1S1S1S5%,1S1S1Sr)   )unify_schemas_write_parquet_fileskwargsr   )	tablesoutput_schemarH   rI   filenameparE   user_schemawrite_kwargss	     r'   write_blocks_to_pathz3ParquetDatasink.write.<locals>.write_blocks_to_path   s~    TTVTTTF" " 0 01S1SF1S1S1S T T +%%
01    r)   zWriting z	 file to .zwrite 'z' to '')descriptionmatchmax_attemptsmax_backoff_s)pyarrowlistallr2   get_filename_for_blockr]   r   task_idxr   r-   r.   popr@   debugr   r   _data_contextretried_io_errorsWRITE_FILE_MAX_ATTEMPTS$WRITE_FILE_RETRY_MAX_BACKOFF_SECONDS)rE   rH   rI   rd   r`   ra   rb   rc   s   ``` @@@@r'   writezParquetDatasink.write   s   
 	fRR6RRRRR 	F
 
%
 
 
 )@@1Isz"78#,
 
 '&
 
*.*A
 
 #&&x66	 	 	 	 	 	 	 	 	 	 	 	???49???@@@ >(>>$)>>>$60>	
 	
 	
 	
 	
 	
r)   r`   
write_uuidc                 .   ||vr+| j         t          j        k    rt          d| d| d          d|v rt          |vr| dt           }nK|}nHt          |vr| dt           }n2t          |          }|j        }d|vs
J d            |j        }| d| }|S )	NzWrite UUID 'z"' missing from filename template 'z'. This could result in files being overwritten.Modify your FileNameProvider implementation to include the `write_uuid` into the filename template or change your write mode to SaveMode.OVERWRITE. z{i}re   z-{i}.z!Filename should not contain a dotz-{i})r4   r   APPEND
ValueErrorrD   r   stemsuffix)rE   r`   rw   basename_templatefilename_pathr{   r|   s          r'   _get_basename_templatez&ParquetDatasink._get_basename_template   s    X%%$)x*F*Fhz h hX h h h  
 H(**'/$?$?+$?$?!! %-!!((#+ A AK A A
 !NNM %Dd???$G???")F#' 7 7v 7 7  r)   r^   zpyarrow.Tabler_   zpyarrow.Schemarc   c                 (   dd l m} t          |          D ];\  }}|r/|j                            |          s|                    |          }|||<   <|                    dd           }	t                              | j	        d          }
t          |	| j        | j                  \  }}}|                     ||          }|                    || j        ||| j        | j        t$          |
dd||| |                                j        di |           d S )	Nr   r   r   )r   r   hiveT)database_dirrU   r}   r/   partitioningformatexisting_data_behaviorpartitioning_flavoruse_threadsr$   r%   r   file_optionsr8   )pyarrow.datasetdataset	enumeraterU   equalscastrp   EXISTING_DATA_BEHAVIOR_MAPgetr4   r(   r   r   r   write_datasetr   r/   r,   rD   ParquetFileFormatmake_write_options)rE   r^   r`   r_   rw   rc   dsidxrZ   r   r   r$   r%   r   r}   s                  r'   r\   z$ParquetDatasink._write_parquet_files   sb    	%$$$$$ $F++ 	  	 JC 2U\%8%8%G%G 2

=11F3KK%))*:DAA!;!?!?I,"
 "
 $"4"4
 
 
		
 !77*MM
Y /,#9 &11/B--//BRR\RR 	 	
 	
 	
 	
 	
r)   c                     | j         S r7   )r   )rE   s    r'   min_rows_per_writez"ParquetDatasink.min_rows_per_write%  s    %%r)   )__name__
__module____qualname__r   ry   strr
   r	   r   r   r   intboolr   rC   r   r   r   rv   r   r\   propertyr   __classcell__)rG   s   @r'   r+   r+   h   s       
 /3HL7;+/+/8<#598<&*!8
 8
 8
8
 !c+	8

  (T#s(^1C(DE8
 %T#s(^48
 $C=8
 $C=8
 458
 8
 #4S>28
 $$458
 sm8
 8
 8
 8
 8
 8
 8
t/
/
 /
 
	/
 /
 /
 /
b!s ! ! ! ! ! !<1
_%1
 1
 (	1

 1
 38n1
 
1
 1
 1
 1
f &HSM & & & X& & & & &r)   r+   ).loggingpathlibr   typingr   r   r   r   r   r	   r
   ray._common.retryr   'ray.data._internal.execution.interfacesr   (ray.data._internal.planner.plan_write_opr   ray.data._internal.savemoder   ray.data.blockr   r   )ray.data.datasource.file_based_datasourcer   !ray.data.datasource.file_datasinkr   %ray.data.datasource.filename_providerr   rk   rt   ru   ry   	OVERWRITEIGNOREERRORr   rD   r<   r!   	getLoggerr   r@   r   tupler(   r+   r8   r)   r'   <module>r      s          O O O O O O O O O O O O O O O O O O - - - - - - ? ? ? ? ? ? J J J J J J 0 0 0 0 0 0 / / / / / / / / E E E E E E ; ; ; ; ; ; B B B B B B NNN ') $ O*-O*NG	    >==  $/  		8	$	$;ISM;I};I  };I 8C=(3-#67	;I ;I ;I ;I|& & & & &m & & & & &r)   