
    &`i                     n   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZmZ d dlZd dlmZ d dlmZ d dlmZ er
d dlZd dlmZ dddd	ddd
deed                  dede
d         dede
e         dede
e         de
eeef                  deed                  fdZ G d de          Z G d de          ZdS )    N)chain)	TYPE_CHECKINGAnyDictIterableListLiteralOptionalTupleUnion)_check_import)BlockAccessor)Datasink)FragmentMetadata   i   )schemamax_rows_per_filemax_bytes_per_filemax_rows_per_groupdata_storage_versionstorage_optionsstream)zpa.Tablepd.DataFrameurir   	pa.Schemar   r   r   r   r   return)r   r   c          
          dd l }ddlm}	m}
 t	                     }t          ||j                  r2t          j        	                    |          
                                n|j        t          j                  dk    rd t          |g             fd}||	n|}t          j                             |                      } |
|||||||          }fd|D             S )Nr   )DEFAULT_MAX_BYTES_PER_FILEwrite_fragmentsc               3      K   D ]B} t          j        |                                           }|                                E d {V  Cd S N)r   	for_blockto_arrow
to_batches)blocktblr   s     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/lance_datasink.pyrecord_batch_converterz/_write_fragment.<locals>.record_batch_converter4   sa       	( 	(E)%0099;;C~~''''''''''	( 	(    )r   r   r   r   r   r   c                     g | ]}|fS  r+   .0fragmentr   s     r'   
<listcomp>z#_write_fragment.<locals>.<listcomp>H   s    9998Xv999r)   )pandaslance.fragmentr   r   next
isinstance	DataFramepaSchemafrom_pandasremove_metadatar   lennamesr   RecordBatchReaderfrom_batches)r   r   r   r   r   r   r   r   pdr   r   firstr(   reader	fragmentss   ` `            r'   _write_fragmentrA      s@    JJJJJJJJ~VeR\** 	"Y**511AACCFF\Fv|!!Fw''( ( ( ( ( '9&@""FX  !..v7M7M7O7OPPF+--1'	 	 	I :999y9999r)   c                        e Zd ZdZ	 	 	 ddedeej                 ded         dee	ee
f                  f fd	Zed
efd            Zdded         d
dfdZdeeeeef                           fdZ xZS )_BaseLanceDatasinkzBase class for Lance Datasink.Ncreater   r   moderD   append	overwriter   c                      t                      j        |i | || _        || _        || _        d | _        || _        d S r!   )super__init__r   r   rE   read_versionr   )selfr   r   rE   r   argskwargs	__class__s          r'   rK   z_BaseLanceDatasink.__init__N   sL     	$)&)))	+/.r)   r   c                     dS )NTr+   rM   s    r'   supports_distributed_writesz._BaseLanceDatasink.supports_distributed_writes`   s    tr)   r   c                     t          | dd           dd l}| j        dk    rB|                    | j        | j                  }|j        | _        | j        |j        | _        d S d S d S )Nlancepylance)modulepackager   rG   )r   )	r   rU   rE   LanceDatasetr   r   versionrL   r   )rM   r   rU   dss       r'   on_write_startz!_BaseLanceDatasink.on_write_startd   sy    d7I>>>>9  ##DHd>R#SSB "
D{" i	 !  #"r)   write_resultsc                 t   dd l }dd l}|s|                    dt                     d S t	          |d          r|j        }t          |          dk    r|                    dt                     d S g }d }|D ]G}|D ]B\  }}t          j        |          }	|	                    |	           t          j        |          }CH|sd S | j
        dv r|j                            ||          }
n%| j
        dk    r|j                            |          }
|j                            | j        |
| j        | j                   d S )Nr   zwrite_results is empty.write_returnszBwrite results is empty. please check ray version or internal error>   rD   rH   rG   )rL   r   )warningsrU   warnDeprecationWarninghasattrr_   r9   pickleloadsrG   rE   LanceOperation	OverwriteAppendrY   commitr   rL   r   )rM   r]   r`   rU   r@   r   batchfragment_str
schema_strr.   ops              r'   on_write_completez$_BaseLanceDatasink.on_write_completeo   s    	 	MM)"   F=/22 	8)7M}""MMT"   F	" 	2 	2E,1 2 2(j!<55  ***j112  	F9///%//	BBBBY(""%,,Y77B!!H* 0	 	" 	
 	
 	
 	
 	
r)   )NrD   Nr!   )__name__
__module____qualname____doc__strr
   r5   r6   r	   r   r   rK   propertyboolrS   r\   r   r   rn   __classcell__rP   s   @r'   rC   rC   K   s       ((
 '+9A48/ // #/ 56	/
 "$sCx.1/ / / / / /$ T    X	( 	(Xk%: 	(d 	( 	( 	( 	(,
DsCx12,
 ,
 ,
 ,
 ,
 ,
 ,
 ,
r)   rC   c                        e Zd ZdZdZ	 	 	 	 	 	 ddedeej                 d	e	d
         de
de
dee         deeeef                  f fdZede
fd            ZdefdZdeeej        df                  fdZ xZS )LanceDatasinkaf  Lance Ray Datasink.

    Write a Ray dataset to lance.

    If we expect to write larger-than-memory files,
    we can use `LanceFragmentWriter` and `LanceCommitter`.

    Args:
        uri : the base URI of the dataset.
        schema : pyarrow.Schema, optional.
            The schema of the dataset.
        mode : str, optional
            The write mode. Default is 'append'.
            Choices are 'append', 'create', 'overwrite'.
        min_rows_per_file : int, optional
            The minimum number of rows per file. Default is 1024 * 1024.
        max_rows_per_file : int, optional
            The maximum number of rows per file. Default is 64 * 1024 * 1024.
        data_storage_version: optional, str, default None
            The version of the data storage format to use. Newer versions are more
            efficient but require newer versions of lance to read.  The default is
            "legacy" which will use the legacy v1 version.  See the user guide
            for more details.
        storage_options : Dict[str, Any], optional
            The storage options for the writer. Default is None.
    LanceNrD      r   r   r   rE   rF   min_rows_per_filer   r   r   c                      t                      j        |g|R |||d|	 || _        || _        || _        d | _        d S )N)r   rE   r   )rJ   rK   r|   r   r   rL   )rM   r   r   rE   r|   r   r   r   rN   rO   rP   s             r'   rK   zLanceDatasink.__init__   sw     		

 	
 	
+		
 	
 	
 	
 	
 "3!2$8!+/r)   r   c                     | j         S r!   )r|   rR   s    r'   min_rows_per_writez LanceDatasink.min_rows_per_write   s    %%r)   c                     | j         S r!   )NAMErR   s    r'   get_namezLanceDatasink.get_name   s
    yr)   blocksr   c                 v    t          || j        | j        | j        | j        | j                  }d |D             S )N)r   r   r   r   c                 d    g | ]-\  }}t          j        |          t          j        |          f.S r+   )rd   dumpsr,   s      r'   r/   z'LanceDatasink.write.<locals>.<listcomp>   sE     
 
 
 & \(##V\&%9%9:
 
 
r)   )rA   r   r   r   r   r   )rM   r   _ctxfragments_and_schemas       r'   writezLanceDatasink.write   sY    
  /H;"4!%!: 0 
  
  

 
$8
 
 
 	
r)   )NrD   r{   r   NN)ro   rp   rq   rr   r   rs   r
   r5   r6   r	   intr   r   rK   rt   r   r   r   r   Tabler   rv   rw   s   @r'   ry   ry      s8        6 D
 '+9A!,!1.2480 00 #0 56	0
 0 0 'sm0 "$sCx.10 0 0 0 0 06 &C & & & X&#    
rx789
 
 
 
 
 
 
 
r)   ry   )rd   	itertoolsr   typingr   r   r   r   r   r	   r
   r   r   pyarrowr5   ray.data._internal.utilr   ray.data.blockr   ray.data.datasource.datasinkr   r0   r=   r1   r   rs   r   rA   rC   ry   r+   r)   r'   <module>r      s#         
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
     1 1 1 1 1 1 ( ( ( ( ( ( 1 1 1 1 1 1 0////// %)-(,"*.04.: .: .:U567.:	.: [!	.:
 .: !.: .: #3-.: d38n-.: 
%/
01.: .: .: .:bP
 P
 P
 P
 P
 P
 P
 P
fP
 P
 P
 P
 P
& P
 P
 P
 P
 P
r)   