
    &`ie,                        d dl Z d dlZd dlmZmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZmZ d dlmZ d dlmZm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z' erd dl(Z( e j)        e*          Z+ G d ded                   Z,e' G d de,                      Z-e' G d de,                      Z.dS )    N)TYPE_CHECKINGAnyDictIterableOptional)urlparse)call_with_retry)%add_creatable_buckets_param_if_s3_uri)DelegatingBlockBuilder)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)RetryingPyFileSystem_is_local_scheme)BlockBlockAccessor)DataContext)DatasinkWriteResult)FilenameProvider_DefaultFilenameProvider)_resolve_paths_and_filesystem)DeveloperAPIc                   @   e Zd Zddddddej        ddeded         dedeeee	f                  d	ee
         d
ee         dee         defdZdeddfdZdded         ddfdZdefdZdee         deddfdZdededefdZded         fdZedefd            ZdS )_FileDatasinkNT)
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidfile_formatmodepathr   zpyarrow.fs.FileSystemr   r   r   r    r!   r"   c                   |i }|t          ||          }t          j                    | _        || _        t          ||          \  }	| _        t          j        | j        | j        j	                  | _        t          |	          dk    sJ t          |	                      |	d         | _        || _        || _        || _        || _        || _        || _        d| _        d| _        d| _        dS )a
  Initialize this datasink.

        Args:
            path: The folder to write files to.
            filesystem: The filesystem to write files to. If not provided, the
                filesystem is inferred from the path.
            try_create_dir: Whether to create the directory to write files to.
            open_stream_args: Arguments to pass to ``filesystem.open_output_stream``.
            filename_provider: A :class:`ray.data.datasource.FilenameProvider` that
                generates filenames for each row or block.
            dataset_uuid: The UUID of the dataset being written. If specified, it's
                included in the filename.
            file_format: The file extension. If specified, files are written with this
                extension.
        N)r    r!   )retryable_errors   r   F)r   r   get_current_data_contextunresolved_pathr   r   r   wrapretried_io_errorslenr#   r   r   r   r    r!   r"   has_created_dir_skip_write_write_started)
selfr#   r   r   r   r   r    r!   r"   pathss
             u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/datasource/file_datasink.py__init__z_FileDatasink.__init__!   s    6 #!$ 8){! ! ! )466#!>tZ!P!Pt.3Od.@.R
 
 
 5zzQE

!H	, 0!2(&	$ #    returnpyarrow.NativeFilec                 2     | j         j        |fi | j        S N)r   open_output_streamr   )r0   r#   s     r2   r9   z _FileDatasink.open_output_streamW   s"    1t1$PP$:OPPPr4   schemazpyarrow.Schemac                 p   | j         rd S d| _         ddlm} | j                            | j                  j        |j        u}|r| j        t          j
        k    rt          d| j         d          | j        t          j        k    r3t                              d| j         d| j                    d| _        d S | j        t          j        k    rIt                              d| j         d| j                    | j                            | j                   |                     | j                  | _        d S )	NTr   FileTypezPath zO already exists. If this is unexpected, use mode='ignore' to ignore those filesz
[SaveMode=z] Skipping z] Replacing contents )r/   
pyarrow.fsr=   r   get_file_infor#   typeNotFoundr"   r   ERROR
ValueErrorIGNOREloggerwarningr.   	OVERWRITEdelete_dir_contents_create_dirr-   )r0   r:   r=   
dir_existss       r2   on_write_startz_FileDatasink.on_write_startZ   sL    	F"'''''' O))$)449ARR 	  	?yHN** UDI U U U   yHO++MDIMM$)MMNNN#' yH...WDIWWDIWWXXX33DI>>>#//	::r4   c                    ddl m} t          |          }|j        dk    }|o| j        j         }| j        rU|sS| j                            |          j	        |j
        u r-t          |          }| j                            |d           dS dS )zsCreate a directory to write files to.

        If ``try_create_dir`` is ``False``, this method is a no-op.
        r   r<   s3T)	recursiveF)r>   r=   r   schemer(   s3_try_create_dirr   r   r?   r@   rA   r
   
create_dir)r0   destr=   
parsed_uri	is_s3_uriskip_create_dir_for_s3tmps          r2   rI   z_FileDatasink._create_dirt   s    
 	(''''' d^^
%-	!*!W43E3W/W 	'= 	,,T2278;LLL <DAA**3$*???tur4   blocksctxc                 L   t                      }|D ]}|                    |           |                                }t          j        |          }|                                dk    r$t                              d| j                    d S | 	                    |d|           d S )Nr   zSkipped writing empty block to )
r   	add_blockbuildr   	for_blocknum_rowsrE   rF   r#   write_block)r0   rW   rX   builderblockblock_accessors         r2   writez_FileDatasink.write   s    
 )** 	% 	%Ee$$$$&077""$$))NNHTYHHIIIFC00000r4   r`   block_indexc                     t           r8   NotImplementedError)r0   r`   rc   rX   s       r2   r^   z_FileDatasink.write_block   s    !!r4   write_resultc                 p    | j         r,|j        dk    r#| j                            | j                   d S d S d S )Nr   )r-   r]   r   
delete_dirr#   )r0   rg   s     r2   on_write_completez_FileDatasink.on_write_complete   sH     	2L$9Q$>$>O&&ty11111	2 	2$>$>r4   c                 ,    t          | j                   S r8   )r   r)   r0   s    r2   supports_distributed_writesz)_FileDatasink.supports_distributed_writes   s    #D$89999r4   r8   )__name__
__module____qualname__r   APPENDstrr   boolr   r   r   r3   r9   rK   rI   r   r   r   rb   r   intr^   r   rj   propertyrm    r4   r2   r   r       s       
 9=#598<&*%)!4$ 4$ 4$4$ 45	4$
 4$ #4S>24$ $$454$ sm4$ c]4$ 4$ 4$ 4$ 4$lQs Q/C Q Q Q Q; ;X.>%? ;4 ; ; ; ;44    @11 1 
	1 1 1 1"" "S "{ " " " "2k$.? 2 2 2 2
 :T : : : X: : :r4   r   c                   F    e Zd ZdZdeeef         ddfdZdede	de
fd	Zd
S )RowBasedFileDatasinka  A datasink that writes one row to each file.

    Subclasses must implement ``write_row_to_file`` and call the superclass constructor.

    Examples:
        .. testcode::

            import io
            from typing import Any, Dict

            import pyarrow
            from PIL import Image

            from ray.data.datasource import RowBasedFileDatasink

            class ImageDatasink(RowBasedFileDatasink):
                def __init__(self, path: str, *, column: str, file_format: str = "png"):
                    super().__init__(path, file_format=file_format)
                    self._file_format = file_format
                    self._column = column

                def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):
                    image = Image.fromarray(row[self._column])
                    buffer = io.BytesIO()
                    image.save(buffer, format=self._file_format)
                    file.write(buffer.getvalue())
    rowfiler6   c                     t           )zWrite a row to a file.

        Args:
            row: The row to write.
            file: The file to write the row to.
        re   )r0   ry   rz   s      r2   write_row_to_filez&RowBasedFileDatasink.write_row_to_file   
     "!r4   r`   rc   rX   c                     t          |                    d                    D ]\  } j                            |j        t
                   |j        ||          }t          j         j	        |          t                              d d            fd}t          |d d j        j                   d S )	NF)public_row_formatWriting  file.c                                                     5 }                     |            d d d            d S # 1 swxY w Y   d S r8   )r9   r|   )rz   ry   r0   
write_paths    r2   write_row_to_pathz;RowBasedFileDatasink.write_block.<locals>.write_row_to_path   s    ,,Z88 6D**35556 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6   ;??write ''descriptionmatch)	enumerate	iter_rowsr   get_filename_for_rowkwargsr   task_idx	posixpathjoinr#   rE   debugr	   r(   r+   )	r0   r`   rc   rX   	row_indexfilenamer   ry   r   s	   `      @@r2   r^   z RowBasedFileDatasink.write_block   s    '%(P(PQQ 	 	NIs-BB
01 H #	8<<JLL6J6667776 6 6 6 6 6 6 !3j333(:    	 	r4   N)rn   ro   rp   __doc__r   rr   r   r|   r   rt   r   r^   rv   r4   r2   rx   rx      sr         8"T#s(^ ";O " " " " S {      r4   rx   c                        e Zd ZdZdddee         f fdZdeddfd	Zded
ede	fdZ
edee         fd            Z xZS )BlockBasedFileDatasinka)  A datasink that writes multiple rows to each file.

    Subclasses must implement ``write_block_to_file`` and call the superclass
    constructor.

    Examples:
        .. testcode::

            class CSVDatasink(BlockBasedFileDatasink):
                def __init__(self, path: str):
                    super().__init__(path, file_format="csv")

                def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
                    from pyarrow import csv
                    csv.write_csv(block.to_arrow(), file)
    N)min_rows_per_filer   c                J     t                      j        |fi | || _        d S r8   )superr3   _min_rows_per_file)r0   r#   r   file_datasink_kwargs	__class__s       r2   r3   zBlockBasedFileDatasink.__init__  s3     	66!5666"3r4   r`   rz   r6   c                     t           )zWrite a block of data to a file.

        Args:
            block: The block to write.
            file: The file to write the block to.
        re   )r0   r`   rz   s      r2   write_block_to_filez*BlockBasedFileDatasink.write_block_to_file
  r}   r4   rc   rX   c                 .     j                             |j        t                   |j        |          }t          j         j        |           fd}t          	                    d d           t          |d d j        j                   d S )Nc                                                     5 }                     |            d d d            d S # 1 swxY w Y   d S r8   )r9   r   )rz   r`   r0   r   s    r2   write_block_to_pathz?BlockBasedFileDatasink.write_block.<locals>.write_block_to_path  s    ((44 6((5556 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6 6r   r   r   r   r   r   )r   get_filename_for_blockr   r   r   r   r   r#   rE   r   r	   r(   r+   )r0   r`   rc   rX   r   r   r   s   ``    @r2   r^   z"BlockBasedFileDatasink.write_block  s    )@@3:34clK
 
 ^DIx88
	6 	6 	6 	6 	6 	6 	6 	2
222333/*///$6	
 	
 	
 	
 	
 	
r4   r5   c                     | j         S r8   )r   rl   s    r2   min_rows_per_writez)BlockBasedFileDatasink.min_rows_per_write$  s    &&r4   )rn   ro   rp   r   r   rt   r3   r   r   r   r^   ru   r   __classcell__)r   s   @r2   r   r      s         $ ;?4 4 4*23-4 4 4 4 4 4" ">R " " " "
 
S 
{ 
 
 
 
" 'HSM ' ' ' X' ' ' ' 'r4   r   )/loggingr   typingr   r   r   r   r   urllib.parser   ray._common.retryr	   ray._private.arrow_utilsr
   +ray.data._internal.delegating_block_builderr   'ray.data._internal.execution.interfacesr   (ray.data._internal.planner.plan_write_opr   ray.data._internal.savemoder   ray.data._internal.utilr   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasinkr   r   %ray.data.datasource.filename_providerr   r   ray.data.datasource.path_utilr   ray.util.annotationsr   pyarrow	getLoggerrn   rE   r   rx   r   rv   r4   r2   <module>r      sN        ? ? ? ? ? ? ? ? ? ? ? ? ? ? ! ! ! ! ! ! - - - - - - J J J J J J N N N N N N ? ? ? ? ? ? J J J J J J 0 0 0 0 0 0        0 / / / / / / / ( ( ( ( ( ( > > > > > > > >        H G G G G G - - - - - - NNN		8	$	$O: O: O: O: O:HTN O: O: O:d : : : : := : : :z 5' 5' 5' 5' 5'] 5' 5' 5' 5' 5'r4   