
    &`i                        d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlmZ erd dlZ ej        e          Z e
d          Z	 ee G d d	ee                                           Ze G d
 dee                               Ze G d ded                               Zdee         defdZdS )    N)	dataclass)TYPE_CHECKINGGenericIterableListOptionalTypeVar)TaskContext)BlockBlockAccessor)DeveloperAPIWriteReturnTypec                   V    e Zd ZU dZeed<   eed<   ee         ed<   ed	d            Z	dS )
WriteResultz3Aggregated result of the Datasink write operations.num_rows
size_byteswrite_returnswrsreturnc                     t          d |D                       }t          d |D                       }t          t          j        d |D                        }t	          |||          S )Nc              3   $   K   | ]}|j         V  d S N)r   .0wrs     p/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/datasource/datasink.py	<genexpr>z&WriteResult.combine.<locals>.<genexpr>#   s$      11rr{111111    c              3   $   K   | ]}|j         V  d S r   )r   r   s     r   r   z&WriteResult.combine.<locals>.<genexpr>$   s$      552555555r   c                     g | ]	}|j         
S  )r   r   s     r   
<listcomp>z'WriteResult.combine.<locals>.<listcomp>%   s    .N.N.NBr/?.N.N.Nr   )r   r   r   )sumlist	itertoolschainr   )clsr   r   r   r   s        r   combinezWriteResult.combine!   s}    11S111115555555
Y_.N.N#.N.N.NOPP!'
 
 
 	
r   N)r   r   r   r   )
__name__
__module____qualname____doc__int__annotations__r   r   classmethodr(   r!   r   r   r   r      sa          >= MMMOOO((((	
 	
 	
 [	
 	
 	
r   r   c                       e Zd ZdZdded         ddfdZdee         dede	fd	Z
d
ee	         fdZdeddfdZdefdZedefd            Zedee         fd            ZdS )DatasinkzInterface for defining write-related logic.

    If you want to write data to something that isn't built-in, subclass this class
    and call :meth:`~ray.data.Dataset.write_datasink`.
    Nschemaz	pa.Schemar   c                     dS )a\  Callback for when a write job starts.

        Use this method to perform setup for write tasks. For example, creating a
        staging bucket in S3.

        This is called on the driver when the first input bundle is ready, just
        before write tasks are submitted. The schema is extracted from the first
        input bundle, enabling schema-dependent initialization.

        Args:
            schema: The PyArrow schema of the data being written. This is
                automatically extracted from the first input bundle. May be None
                if the input data has no schema.
        Nr!   )selfr2   s     r   on_write_startzDatasink.on_write_start6   s	     	r   blocksctxc                     t           )a  Write blocks. This is used by a single write task.

        Args:
            blocks: Generator of data blocks.
            ctx: ``TaskContext`` for the write task.

        Returns:
            Result of this write task. When the entire write operator finishes,
            All returned values will be passed as `WriteResult.write_returns`
            to `Datasink.on_write_complete`.
        )NotImplementedError)r4   r6   r7   s      r   writezDatasink.writeG   s
      "!r   write_resultc                     dS )a  Callback for when a write job completes.

        This can be used to `commit` a write output. This method must
        succeed prior to ``write_datasink()`` returning to the user. If this
        method fails, then ``on_write_failed()`` is called.

        Args:
            write_result: Aggregated result of the
               Write operator, containing write results and stats.
        Nr!   r4   r;   s     r   on_write_completezDatasink.on_write_completeY   s	     	r   errorc                     dS )zCallback for when a write job fails.

        This is called on a best-effort basis on write failures.

        Args:
            error: The first error encountered.
        Nr!   r4   r?   s     r   on_write_failedzDatasink.on_write_failedf   s	     	r   c                     t          |           j        }d}|                    d          r
|dd         }|                    |          r|dt	          |                    }|S )zoReturn a human-readable name for this datasink.

        This is used as the names of the write tasks.
        r1   _   N)typer)   
startswithendswithlen)r4   namedatasink_suffixs      r   get_namezDatasink.get_namep   sl    
 Dzz"$??3 	8D==)) 	1/3////0Dr   c                     dS )z;If ``False``, only launch write tasks on the driver's node.Tr!   r4   s    r   supports_distributed_writesz$Datasink.supports_distributed_writes}   s	     tr   c                     dS )zThe target number of rows to pass to each :meth:`~ray.data.Datasink.write` call.

        If ``None``, Ray Data passes a system-chosen number of rows.
        Nr!   rN   s    r   min_rows_per_writezDatasink.min_rows_per_write   s	     tr   r   )r)   r*   r+   r,   r   r5   r   r   r
   r   r:   r   r>   	ExceptionrB   strrL   propertyboolrO   r-   rQ   r!   r   r   r1   r1   .   s-         Xk%: d    """ " 
	" " " "$k/.J    Y 4    #     T    X HSM    X  r   r1   c                   `    e Zd ZdZd Zdee         deddfdZde	d         fd	Z
d
eddfdZdS )DummyOutputDatasinka0  An example implementation of a writable datasource for testing.
    Examples:
        >>> import ray
        >>> from ray.data.datasource import DummyOutputDatasink
        >>> output = DummyOutputDatasink()
        >>> ray.data.range(10).write_datasink(output)
        >>> assert output.num_ok == 1
    c                    t           j        j                                        }t          j        |j                   G d d                      }|                                | _        d| _        d| _        d| _	        d S )N)scheduling_strategyc                   *    e Zd Zd ZdeddfdZd ZdS ).DummyOutputDatasink.__init__.<locals>.DataSinkc                 "    d| _         d| _        d S )Nr   T)rows_writtenenabledrN   s    r   __init__z7DummyOutputDatasink.__init__.<locals>.DataSink.__init__   s    $%!#r   blockr   Nc                 r    t          j        |          }| xj        |                                z  c_        d S r   )r   	for_blockr]   r   )r4   r`   s     r   r:   z4DummyOutputDatasink.__init__.<locals>.DataSink.write   s6    %/66!!U^^%5%55!!!!r   c                     | j         S r   )r]   rN   s    r   get_rows_writtenz?DummyOutputDatasink.__init__.<locals>.DataSink.get_rows_written   s    ((r   )r)   r*   r+   r_   r   r:   rd   r!   r   r   DataSinkr[      sT        $ $ $65 6T 6 6 6 6) ) ) ) )r   re   r   T)
raydataDataContextget_currentremoterY   	data_sinknum_ok
num_failedr^   )r4   r7   re   s      r   r_   zDummyOutputDatasink.__init__   s    h"..00 
(?	@	@	@
	) 
	) 
	) 
	) 
	) 
	) 
	) 
A	@
	) "**r   r6   r7   r   Nc                     g }| j         st          d          |D ]4}|                    | j        j                            |                     5t          j        |           d S )Ndisabled)r^   
ValueErrorappendrk   r:   rj   rf   get)r4   r6   r7   tasksbs        r   r:   zDummyOutputDatasink.write   si    
 | 	)Z((( 	9 	9ALL-44Q778888r   r;   c                 &    | xj         dz  c_         d S NrE   )rl   r=   s     r   r>   z%DummyOutputDatasink.on_write_complete   s    qr   r?   c                 &    | xj         dz  c_         d S rv   )rm   rA   s     r   rB   z#DummyOutputDatasink.on_write_failed   s    1r   )r)   r*   r+   r,   r_   r   r   r
   r:   r   r>   rR   rB   r!   r   r   rW   rW      s           .

 
 
	
 
 
 
k$.?    Y 4      r   rW   write_result_blocksr   c                     dd l t          fd| D                       sJ t          d | D                       }t          d | D                       }d | D             }t          |||          S )Nr   c              3   h   K   | ],}t          |j                  ot          |          d k    V  -dS )rE   N)
isinstance	DataFramerI   )r   r`   pds     r   r   z-_gen_datasink_write_result.<locals>.<genexpr>   sR         	5",'';CJJ!O     r   c              3   J   K   | ]}|d                                           V  dS )r   Nr#   r   results     r   r   z-_gen_datasink_write_result.<locals>.<genexpr>   s3      TTf
+//11TTTTTTr   c              3   J   K   | ]}|d                                           V  dS )r   Nr   r   s     r   r   z-_gen_datasink_write_result.<locals>.<genexpr>   s3      XX&6,/3355XXXXXXr   c                 *    g | ]}|d          d         S )write_returnr   r!   r   s     r   r"   z._gen_datasink_write_result.<locals>.<listcomp>   s"    QQQ6VN+A.QQQr   )pandasallr#   r   )rx   total_num_rowstotal_size_bytesr   r}   s       @r   _gen_datasink_write_resultr      s         (       
 TT@STTTTTNXXDWXXXXXQQ=PQQQM~'7GGGr   )r%   loggingdataclassesr   typingr   r   r   r   r   r	   rf   'ray.data._internal.execution.interfacesr
   ray.data.blockr   r   ray.util.annotationsr   pyarrowpa	getLoggerr)   loggerr   r   r1   rW   r   r!   r   r   <module>r      s        ! ! ! ! ! ! L L L L L L L L L L L L L L L L 



 ? ? ? ? ? ? / / / / / / / / - - - - - - 		8	$	$ '+,, < 
 
 
 
 
'/* 
 
  
. Y Y Y Y Yw' Y Y Yx 1 1 1 1 1(4. 1 1 1hHeHH H H H H Hr   