
    &`iE              	          d dl Z d dlmZmZmZmZmZmZ d dlZ	d dl
Zd dlmZ d dlmZmZmZ d dlmZ d dlmZ d dlmZmZmZ  G d d	          Z G d
 d          Ze G d dee                      Ze G d d                      Z G d de          Ze G d deg ee         f                               Ze G d de                      Z dS )    N)CallableDict	GeneratorIterableListOptional)_check_pyarrow_version)BlockBlockMetadataSchema)_iter_sliced_blocks)Expr)
DeprecatedDeveloperAPI	PublicAPIc            
          e Zd ZdZdefdZdeeeef                  fdZ	deeeef                  fdZ
deee                  fdZedeeeef                  deeeef                  deeeef                  fd	            Zd
eeeef                  ddfdZedddeeeef                  ddfd            Zeded         deeeef                  ded         fd            ZdS )"_DatasourceProjectionPushdownMixinz:Mixin for reading operators supporting projection pushdownreturnc                     dS )zvReturns ``True`` in case ``Datasource`` supports projection operation
        being pushed down into the reading layerF selfs    r/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/datasource/datasource.pysupports_projection_pushdownz?_DatasourceProjectionPushdownMixin.supports_projection_pushdown   s	     u    c                     | j         S )a  Return the projection map (original column names -> final column names).

        Returns:
            Dict mapping original column names (in storage) to final column names
            (after optional renames). Keys indicate which columns are selected.
            None means all columns are selected with no renames.
            Empty dict {} means no columns are selected.
        )_projection_mapr   s    r   get_projection_mapz5_DatasourceProjectionPushdownMixin.get_projection_map   s     ##r   c                 f    | j         dS d | j                                         D             }|r|ndS )aN  Return the column renames from the projection map.

        This is used by predicate pushdown to rewrite filter expressions
        from renamed column names back to original column names.

        Returns:
            Dict mapping original column names to renamed names,
            or None if no renaming has been applied.
        Nc                 &    i | ]\  }}||k    ||S r   r   ).0kvs      r   
<dictcomp>zI_DatasourceProjectionPushdownMixin.get_column_renames.<locals>.<dictcomp>.   s#    KKKDAqAFF1aFFFr   )r   items)r   renamess     r   get_column_renamesz5_DatasourceProjectionPushdownMixin.get_column_renames!   sC     '4KKD$8$>$>$@$@KKK!+wwt+r   c                 `    | j         &t          | j                                                   ndS )aQ  Extract data columns from projection map.

        Helper method for datasources that need to pass columns to legacy
        read functions expecting separate columns and rename_map parameters.

        Returns:
            List of column names, or None if all columns should be read.
            Empty list [] means no columns.
        N)r   listkeysr   s    r   _get_data_columnsz4_DatasourceProjectionPushdownMixin._get_data_columns1   s4     #/ %**,,---	
r   prev_projection_mapnew_projection_mapc                 p    | |S || S i }|                                  D ]\  }}||v r||         }|||<   |S )a  Combine two projection maps via transitive composition.

        Args:
            prev_projection_map: Previous projection (original -> intermediate names)
            new_projection_map: New projection to apply (intermediate -> final names)

        Returns:
            Combined projection map (original -> final names)

        Examples:
            >>> # Select columns a, b with no renames
            >>> prev = {"a": "a", "b": "b"}
            >>> # Select only 'a', rename to 'x'
            >>> new = {"a": "x"}
            >>> _DatasourceProjectionPushdownMixin._combine_projection_map(prev, new)
            {'a': 'x'}

            >>> # First rename a->temp
            >>> prev = {"a": "temp"}
            >>> # Then rename temp->final
            >>> new = {"temp": "final"}
            >>> _DatasourceProjectionPushdownMixin._combine_projection_map(prev, new)
            {'a': 'final'}
        )r%   )r,   r-   composedorig_colintermediate_name
final_names         r   _combine_projection_mapz:_DatasourceProjectionPushdownMixin._combine_projection_mapA   sm    < &%%'&& +>+D+D+F+F 	0 	0'H' $666/0AB
%/" r   projection_map
Datasourcec                 n    t          j         |           }|                     | j        |          |_        |S )a  Apply a projection to this datasource.

        Args:
            projection_map: Dict mapping original column names (in storage)
                to final column names (after optional renames). Keys indicate
                which columns to select. None means select all columns with no renames.

        Returns:
            A new datasource instance with the projection applied.
        )copyr3   r   )r   r4   clones      r   apply_projectionz3_DatasourceProjectionPushdownMixin.apply_projections   s:     	$ !% < < .!
 !
 r   tablepa.Tablecolumn_rename_mapc                 f    s| S fd| j         j        D             }|                     |          S )zApply column renaming to a PyArrow table.

        Args:
            table: PyArrow table to rename
            column_rename_map: Mapping from old column names to new names

        Returns:
            Table with renamed columns
        c                 <    g | ]}                     ||          S r   )get)r!   colr<   s     r   
<listcomp>zD_DatasourceProjectionPushdownMixin._apply_rename.<locals>.<listcomp>   s*    SSS&**344SSSr   )schemanamesrename_columns)r:   r<   	new_namess    ` r   _apply_renamez0_DatasourceProjectionPushdownMixin._apply_rename   sD     ! 	LSSSS@RSSS	##I...r   tables)r;   NNc              #   N   K   | D ]}t                               ||          V   dS )a  Wrap a table generator to apply column renaming to each table.

        This helper eliminates duplication across datasources that need to apply
        column renames to tables yielded from generators.

        Args:
            tables: Iterator/generator yielding PyArrow tables
            column_rename_map: Mapping from old column names to new names

        Yields:
            pa.Table: Tables with renamed columns
        N)r   rF   )rG   r<   r:   s      r   _apply_rename_to_tablesz:_DatasourceProjectionPushdownMixin._apply_rename_to_tables   sN      "  	 	E4BB(     	 	r   N)__name__
__module____qualname____doc__boolr   r   r   strr   r'   r   r+   staticmethodr3   r9   rF   r   r   rI   r   r   r   r   r      s       DDd    
	$HT#s(^$< 	$ 	$ 	$ 	$,HT#s(^$< , , , , 
8DI#6 
 
 
 
  /%d38n5/$T#s(^4/ 
$sCx.	!/ / / \/b c3h0 
   . //#DcN3/ 
/ / / \/& $#DcN3 
)	*   \  r   r   c                   L    e Zd ZdZd ZdefdZdee         fdZ	deddfdZ
d	S )
!_DatasourcePredicatePushdownMixinz9Mixin for reading operators supporting predicate pushdownc                     d | _         d S N_predicate_exprr   s    r   __init__z*_DatasourcePredicatePushdownMixin.__init__   s    /3r   r   c                     dS )NFr   r   s    r   supports_predicate_pushdownz=_DatasourcePredicatePushdownMixin.supports_predicate_pushdown   s    ur   c                     | j         S rT   rU   r   s    r   get_current_predicatez7_DatasourcePredicatePushdownMixin.get_current_predicate   s    ##r   predicate_exprr5   c                 ^    ddl } |j         |           }|j        |n	|j        |z  |_        |S )a  Apply a predicate to this datasource.

        Default implementation that combines predicates using AND.
        Subclasses that support predicate pushdown should have a _predicate_expr
        attribute to store the predicate.

        Note: Column rebinding is handled by the PredicatePushdown rule
        before this method is called, so the predicate_expr should already
        reference the correct column names.
        r   N)r7   rV   )r   r\   r7   r8   s       r   apply_predicatez1_DatasourcePredicatePushdownMixin.apply_predicate   sH     		$
 $, N&7 	 r   N)rJ   rK   rL   rM   rW   rN   rY   r   r   r[   r^   r   r   r   rR   rR      s        CC4 4 4T    $x~ $ $ $ $ 
     r   rR   c                       e Zd ZdZd Zedd            Zededed         fd            Z	de
fd	Zdee         fd
Z	 ddedee         ded         fdZedefd            Zedefd            ZdS )r5   zInterface for defining a custom :class:`~ray.data.Dataset` datasource.

    To read a datasource into a dataset, use :meth:`~ray.data.read_datasource`.
    c                 :    t                               |            dS )z)Initialize the datasource and its mixins.N)rR   rW   r   s    r   rW   zDatasource.__init__   s    )22488888r   r   Readerc                     t          | fi |S z
        Deprecated: Implement :meth:`~ray.data.Datasource.get_read_tasks` and
        :meth:`~ray.data.Datasource.estimate_inmemory_data_size` instead.
        )_LegacyDatasourceReader)r   	read_argss     r   create_readerzDatasource.create_reader   s     't99y999r   parallelismReadTaskc                     t           rc   NotImplementedError)r   rg   re   s      r   prepare_readzDatasource.prepare_read   s
     "!r   c                     t          |           j        }d}|                    |          r|dt          |                    }|S )ztReturn a human-readable name for this datasource.
        This will be used as the names of the read tasks.
        r5   N)typerJ   endswithlen)r   namedatasource_suffixs      r   get_namezDatasource.get_name   sN     Dzz"(==*++ 	313011112Dr   c                     t           zReturn an estimate of the in-memory data size, or None if unknown.

        Note that the in-memory data size may be larger than the on-disk data size.
        rj   r   s    r   estimate_inmemory_data_sizez&Datasource.estimate_inmemory_data_size  
    
 "!r   Nper_task_row_limitc                     t           )a  Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.
            per_task_row_limit: The per-task row limit for the read tasks.
        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        rj   r   rg   rx   s      r   get_read_taskszDatasource.get_read_tasks	  s
     "!r   c                     t          |           j        t          j        u}t          |           j        t          j        u}| p| S rT   )rn   r{   r5   rv   )r   has_implemented_get_read_tasks+has_implemented_estimate_inmemory_data_sizes      r   should_create_readerzDatasource.should_create_reader  sS     JJ%Z-FF 	' JJ29: 	4
 /. ?>>	
r   c                     dS )z:If ``False``, only launch read tasks on the driver's node.Tr   r   s    r   supports_distributed_readsz%Datasource.supports_distributed_reads&  s	     tr   )r   ra   rT   )rJ   rK   rL   rM   rW   r   rf   intr   rl   rO   rs   r   rv   r{   propertyrN   r   r   r   r   r   r5   r5      sI        
9 9 9 : : : Z: " "T*=M " " " Z"#    "Xc] " " " " EI" ""4<SM"	j	" " " " 
d 
 
 
 X
 D    X  r   r5   c                   F    e Zd ZdZdee         fdZdeded         fdZdS )ra   a  A bound read operation for a :class:`~ray.data.Datasource`.

    This is a stateful class so that reads can be prepared in multiple stages.
    For example, it is useful for :class:`Datasets <ray.data.Dataset>` to know the
    in-memory size of the read prior to executing it.
    r   c                     t           ru   rj   r   s    r   rv   z"Reader.estimate_inmemory_data_size5  rw   r   rg   rh   c                     t           )aM  Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.

        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        rj   )r   rg   s     r   r{   zReader.get_read_tasks<  s
     "!r   N)	rJ   rK   rL   rM   r   r   rv   r   r{   r   r   r   ra   ra   ,  sd         "Xc] " " " ""# "$z2B " " " " " "r   ra   c                   b    e Zd ZdefdZdee         fdZ	 d
dedee         ded         fd	Z	dS )rd   
datasourcec                 "    || _         || _        d S rT   )_datasource
_read_args)r   r   re   s      r   rW   z _LegacyDatasourceReader.__init__K  s    %#r   r   c                     d S rT   r   r   s    r   rv   z3_LegacyDatasourceReader.estimate_inmemory_data_sizeO  s    tr   Nrg   rx   rh   c                 2     | j         j        |fi | j        S )a  Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.
            per_task_row_limit: The per-task row limit for the read tasks.

        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        )r   rl   r   rz   s      r   r{   z&_LegacyDatasourceReader.get_read_tasksR  s$     -t,[LLDOLLLr   rT   )
rJ   rK   rL   r5   rW   r   r   rv   r   r{   r   r   r   rd   rd   J  s        $: $ $ $ $Xc]     EIM MM4<SMM	j	M M M M M Mr   rd   c            
           e Zd ZdZ	 	 ddeg ee         f         deded         dee	         fdZ
ed	efd
            Zed	ed         fd            Zed	eg ee         f         fd            Zed	ee	         fd            Zd	ee         fdZdS )rh   a  A function used to read blocks from the :class:`~ray.data.Dataset`.

    Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`,
    and return a list of ``ray.data.Block`` when called. Initial metadata about the read
    operation can be retrieved via the ``metadata`` attribute prior to executing the
    read. Final metadata is returned after the read along with the blocks.

    Ray will execute read tasks in remote functions to parallelize execution.
    Note that the number of blocks returned can vary at runtime. For example,
    if a task is reading a single large file it can return multiple blocks to
    avoid running out of memory during the read.

    The initial metadata should reflect all the blocks returned by the read,
    e.g., if the metadata says ``num_rows=1000``, the read can return a single
    block of 1000 rows, or multiple blocks with 1000 rows altogether.

    The final metadata (returned with the actual block) reflects the exact
    contents of the block itself.
    Nread_fnmetadatarB   r   rx   c                 >    || _         || _        || _        || _        d S rT   )	_metadata_read_fn_schema_per_task_row_limit)r   r   r   rB   rx   s        r   rW   zReadTask.__init__y  s'     "#5   r   r   c                     | j         S rT   )r   r   s    r   r   zReadTask.metadata  s
    ~r   c                     | j         S rT   )r   r   s    r   rB   zReadTask.schema  s
    |r   c                     | j         S rT   )r   r   s    r   r   zReadTask.read_fn  s
    }r   c                     | j         S )z.Get the per-task row limit for this read task.)r   r   s    r   rx   zReadTask.per_task_row_limit  s     ''r   c              #      K   |                                  }t          |d          s"t          d                    |                     | j        
|E d {V  d S t          || j                  E d {V  d S )N__iter__zlRead function must return Iterable[Block], got {}. Probably you need to return `[block]` instead of `block`.)r   hasattrDeprecationWarningformatr   r   )r   results     r   __call__zReadTask.__call__  s      vz** 	!6&>>  
 #+F&vt/GHHHHHHHHHHHr   )NN)rJ   rK   rL   rM   r   r   r
   r   r   r   rW   r   r   rB   r   rx   r   r   r   r   rh   rh   c  sU        0 &*,0
6 
6"huo-.
6  
6 "	
6
 %SM
6 
6 
6 
6 -    X *    X "huo"56    X (HSM ( ( ( X(I(5/ I I I I I Ir   rh   c                   v    e Zd ZdZdedefdZdee         fdZ	 dded	ee         dee	         fd
Z
defdZdS )RandomIntRowDatasourcea  An example datasource that generates rows with random int64 columns.

    Examples:
        >>> import ray
        >>> from ray.data.datasource import RandomIntRowDatasource
        >>> source = RandomIntRowDatasource() # doctest: +SKIP
        >>> ray.data.read_datasource( # doctest: +SKIP
        ...     source, n=10, num_columns=2).take()
        {'c_0': 1717767200176864416, 'c_1': 999657309586757214}
        {'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
    nnum_columnsc                 "    || _         || _        dS )zInitialize the datasource that generates random-integer rows.

        Args:
            n: The number of rows to generate.
            num_columns: The number of columns to generate.
        N_n_num_columns)r   r   r   s      r   rW   zRandomIntRowDatasource.__init__  s     'r   r   c                 &    | j         | j        z  dz  S )N   r   r   s    r   rv   z2RandomIntRowDatasource.estimate_inmemory_data_size  s    w**Q..r   Nrg   rx   c                    t                       dd lg }| j        }| j        }t	          d||z            }dt
          dt
          dt          ffdj                            d t          |          D                       j
        }d}||k     rct          |||z
            }	t          |	d|	z  |z  d d 	          }
|                    t          |	|ffd
	|
||                     ||z  }||k     c|S )Nr      countr   r   c                    j                             t          j                            t          j        t          j                  j        || ft          j                  d t          |          D                       S )N)sizedtypec                     g | ]}d | S )c_r   r!   is     r   rA   zMRandomIntRowDatasource.get_read_tasks.<locals>.make_block.<locals>.<listcomp>  s    <<<AxAxx<<<r   )rC   )	Tablefrom_arraysnprandomrandintiinfoint64maxrange)r   r   pyarrows     r   
make_blockz9RandomIntRowDatasource.get_read_tasks.<locals>.make_block  st    =,,	!!HRX&&*+u1ERX "   =<{););<<<	 -   r   c                     i | ]	}d | dg
S )r   r   r   r   s     r   r$   z9RandomIntRowDatasource.get_read_tasks.<locals>.<dictcomp>  s"    777qX!XXs777r   r   )num_rows
size_bytesinput_files
exec_statsc                      | |          gS rT   r   )r   r   r   s     r   <lambda>z7RandomIntRowDatasource.get_read_tasks.<locals>.<lambda>  s    "
5+66B r   )rB   rx   )r	   r   r   r   r   r   r
   r   from_pydictr   rB   minr   appendrh   )r   rg   rx   
read_tasksr   r   
block_sizerB   r   r   metar   r   s              @@r   r{   z%RandomIntRowDatasource.get_read_tasks  sm   
 	   %'
G'A,--
	c 	 	 	 	 	 	 	 	 **77E+$6$6777
 

 	 !ee
AE**E u9{2 	  D !&K      !'9  	 	 	 OA% !ee( r   c                     dS )zReturn a human-readable name for this datasource.
        This will be used as the names of the read tasks.
        Note: overrides the base `Datasource` method.
        	RandomIntr   r   s    r   rs   zRandomIntRowDatasource.get_name  s	    
 {r   rT   )rJ   rK   rL   rM   r   rW   r   rv   r   rh   r{   rO   rs   r   r   r   r   r     s        
 
(# (C ( ( ( (/Xc] / / / / -1. .. %SM. 
h	. . . .`#      r   r   )!r7   typingr   r   r   r   r   r   numpyr   r   paray.data._internal.utilr	   ray.data.blockr
   r   r   ray.data.datasource.utilr   ray.data.expressionsr   ray.util.annotationsr   r   r   r   rR   r5   ra   rd   rh   r   r   r   r   <module>r      sb    F F F F F F F F F F F F F F F F         : : : : : : 7 7 7 7 7 7 7 7 7 7 8 8 8 8 8 8 % % % % % % D D D D D D D D D Dd d d d d d d dN% % % % % % % %P K K K K K35V K K K\ " " " " " " " ":M M M M Mf M M M2 ?I ?I ?I ?I ?IxHUO+, ?I ?I ?ID O O O O OZ O O O O Or   