
    &`i              
          d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZmZmZmZmZ d dlm Z m!Z! erd dl"m#Z#  G d de          Z$ ed	          Z% ed
e$          Z& ed          Z' ej(        d          Z) e d          e! G d d                                  Z* e!d           G d de*e j+        ee%e'f                               Z,e! G d de,e-e-f                               Z.e! G d de,ee-e/f         ee-e/f         f                               Z0e! G d de,e&e&f                               Z1e! G d de,e&e&f                               Z2e! G d de,eee-e/f                  e/f                               Z3e! G d  d!e,eee-e/f                  e/f                               Z4e! G d" d#e,e&e&f                               Z5e! G d$ d%e,ee         ee         f                               Z6e! G d& d'e,ee         ee         f                               Z7e! G d( d)e,                      Z8d*e9fd+Z:d,eege%f         d*e9d-eegee%         f         fd.Z;d/ee%ge%f         d-eee%         ge%f         fd0Z<d1ee%e%ge%f         d*e9d-eee%         ee%         gee%         f         fd2Z= e!d           G d3 d4e,ee-         e/f                               Z> e!d           G d5 d6e,ee-         e/f                               Z? e!d           G d7 d8e,                      Z@ e!d           G d9 d:e,                      ZAdS );    N)TYPE_CHECKINGAnyCallable
CollectionDictGenericListOptionalProtocolSetTypeVarUnionis_null)BlockBlockAccessorBlockColumnBlockColumnAccessorKeyType)
Deprecated	PublicAPI)Schemac                   N    e Zd ZdedefdZdedefdZdedefdZdedefdZdS )_SupportsRichComparisonotherreturnc                     d S N selfr   s     f/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/aggregate.py__lt__z_SupportsRichComparison.__lt__'           c                     d S r   r   r    s     r"   __le__z_SupportsRichComparison.__le__*   r$   r%   c                     d S r   r   r    s     r"   __gt__z_SupportsRichComparison.__gt__-   r$   r%   c                     d S r   r   r    s     r"   __ge__z_SupportsRichComparison.__ge__0   r$   r%   N)	__name__
__module____qualname__r   boolr#   r'   r)   r+   r   r%   r"   r   r   &   s        C D    C D    C D    C D      r%   r   AccumulatorTypeSupportsRichComparisonType)boundAggOutputTypez^([^(]+)(?:\(.*\))?$z3AggregateFn is deprecated, please use AggregateFnV2)messagec                       e Zd ZdZ	 	 	 ddeegef         deeegef         dedeeeee	f         gef         deee
gef         deeegef                  fd	Zd
ed         ddfdZdS )AggregateFnae	  NOTE: THIS IS DEPRECATED, PLEASE USE :class:`AggregateFnV2` INSTEAD

    Defines how to perform a custom aggregation in Ray Data.

    `AggregateFn` instances are passed to a Dataset's ``.aggregate(...)`` method to
    specify the steps required to transform and combine rows sharing the same key.
    This enables implementing custom aggregators beyond the standard
    built-in options like Sum, Min, Max, Mean, etc.

    Args:
        init: Function that creates an initial aggregator for each group. Receives a key
            (the group key) and returns the initial accumulator state (commonly 0,
            an empty list, or an empty dictionary).
        merge: Function that merges two accumulators generated by different workers
            into one accumulator.
        name: An optional display name for the aggregator. Useful for debugging.
        accumulate_row: Function that processes an individual row. It receives the current
            accumulator and a row, then returns an updated accumulator. Cannot be
            used if `accumulate_block` is provided.
        accumulate_block: Function that processes an entire block of rows at once. It receives the
            current accumulator and a block of rows, then returns an updated accumulator.
            This allows for vectorized operations. Cannot be used if `accumulate_row`
            is provided.
        finalize: Function that finishes the aggregation by transforming the final
            accumulator state into the desired output. For example, if your
            accumulator is a list of items, you may want to compute a statistic
            from the list. If not provided, the final accumulator state is returned
            as-is.

    Example:
        .. testcode::

            import ray
            from ray.data.aggregate import AggregateFn

            # A simple aggregator that counts how many rows there are per group
            count_agg = AggregateFn(
                init=lambda k: 0,
                accumulate_row=lambda counter, row: counter + 1,
                merge=lambda c1, c2: c1 + c2,
                name="custom_count"
            )
            ds = ray.data.from_items([{"group": "A"}, {"group": "B"}, {"group": "A"}])
            result = ds.groupby("group").aggregate(count_agg).take_all()
            # result: [{'group': 'A', 'custom_count': 2}, {'group': 'B', 'custom_count': 1}]
    Ninitmergenameaccumulate_rowaccumulate_blockfinalizec                    ||t          d          |dt          dt          dt          ffd}t          |t                    st          d          |d }|| _        || _        || _        || _	        || _
        d S )NzCExactly one of accumulate_row or accumulate_block must be provided.ablockr   c                 z    t          j        |          }|                    d          D ]} | |          } | S NF)public_row_format)r   	for_block	iter_rows)r>   r?   	block_accrr:   s       r"   r;   z.AggregateFn.__init__.<locals>.accumulate_block   sK    )3E::	",,u,EE - -A&q!,,AAr%   z`name` must be provided.c                     | S r   r   )r>   s    r"   <lambda>z&AggregateFn.__init__.<locals>.<lambda>        r%   )
ValueErrorr0   r   
isinstancestr	TypeErrorr9   r7   r8   r;   r<   )r!   r7   r8   r9   r:   r;   r<   s       `  r"   __init__zAggregateFn.__init__o   s     "'7'?&+;+GU   #O E o       $$$ 	86777"{H		
 0 r%   schemar   r   c                     dS )z=Raise an error if this cannot be applied to the given schema.Nr   )r!   rO   s     r"   	_validatezAggregateFn._validate   s    r%   )NNN)r,   r-   r.   __doc__r   r   r0   rL   r   r   r   r
   r3   rN   rQ   r   r%   r"   r6   r6   =   s        - -l PTIM$! $!y/12$! /:OKL$! 	$!
 !d38n->
$! #OU#;_#LM$! 8_$5}$DEF$! $! $! $!L 2 t      r%   r6   alpha)	stabilityc                       e Zd ZdZdedeg ef         dee         def fdZ	dee         fdZ
defd	Zej        d
ededefd            Zej        dedefd            Zdedee         fdZded         ddfdZ xZS )AggregateFnV2a  Provides an interface to implement efficient aggregations to be applied
    to the dataset.

    `AggregateFnV2` instances are passed to a Dataset's ``.aggregate(...)`` method to
    perform distributed aggregations. To create a custom aggregation, you should subclass
    `AggregateFnV2` and implement the `aggregate_block` and `combine` methods.
    The `finalize` method can also be overridden if the final accumulated state
    needs further transformation.

    Aggregation follows these steps:

    1. **Initialization**: For each group (if grouping) or for the entire dataset,
       an initial accumulator is created using `zero_factory`.
    2. **Block Aggregation**: The `aggregate_block` method is applied to
       each block independently, producing a partial aggregation result for that block.
    3. **Combination**: The `combine` method is used to merge these partial
       results (or an existing accumulated result with a new partial result)
       into a single, combined accumulator.
    4. **Finalization**: Optionally, the `finalize` method transforms the
       final combined accumulator into the desired output format.

    Args:
        name: The name of the aggregation. This will be used as the column name
            in the output, e.g., "sum(my_col)".
        zero_factory: A callable that returns the initial "zero" value for the
            accumulator. For example, for a sum, this would be `lambda: 0`; for
            finding a minimum, `lambda: float("inf")`, for finding a maximum,
            `lambda: float("-inf")`.
        on: The name of the column to perform the aggregation on. If `None`,
            the aggregation is performed over the entire row (e.g., for `Count()`).
        ignore_nulls: Whether to ignore null values during aggregation.
            If `True`, nulls are skipped.
            If `False`, the presence of a null value might result in a null output,
            depending on the aggregation logic.
    r9   zero_factoryonignore_nullsc                  	 |st          d| d          || _        || _        t                              |          }|r|                    d          | _        n|| _        t          | j        |          }t          | j
        |          	t          | j                  }t          ||          }t                                          |||	fd|           d S )Nz1Non-empty string has to be provided as name (got )   c                      |          S r   r   )_r?   _safe_aggregates     r"   rH   z(AggregateFnV2.__init__.<locals>.<lambda>   s    ooe.D.D r%   )r9   r7   r8   r;   r<   )rJ   _target_col_name_ignore_nulls_AGGREGATION_NAME_PATTERNmatchgroup	_agg_name_null_safe_combinecombine_null_safe_aggregateaggregate_block_null_safe_finalizer<   _null_safe_zero_factorysuperrN   )r!   r9   rW   rX   rY   rc   _safe_combine_safe_finalize_safe_zero_factoryr_   	__class__s            @r"   rN   zAggregateFnV2.__init__   s      	KDKKK   !#) *//55 	""[[^^DNN!DN*4<FF.t/C\RR,T];;4\<PP#DDDD# 	 	
 	
 	
 	
 	
r%   r   c                     | j         S r   )r`   r!   s    r"   get_target_columnzAggregateFnV2.get_target_column   s    $$r%   c                     | j         S )zReturn the agg name (e.g., 'sum', 'mean', 'count').

        Returns the aggregation type extracted from the name during initialization.
        For example, returns 'sum' for an aggregator named 'sum(col)'.
        )re   rr   s    r"   get_agg_namezAggregateFnV2.get_agg_name   s     ~r%   current_accumulatornewc                     dS )a  Combines a new partial aggregation result with the current accumulator.

        This method defines how two intermediate aggregation states are merged.
        For example, if `aggregate_block` produces partial sums `s1` and `s2` from
        two different blocks, `combine(s1, s2)` should return `s1 + s2`.

        Args:
            current_accumulator: The current accumulated state (e.g., the result of
                previous `combine` calls or an initial value from `zero_factory`).
            new: A new partially aggregated value, typically the output of
                `aggregate_block` from a new block of data, or another accumulator
                from a parallel task.

        Returns:
            The updated accumulator after combining it with the new value.
        Nr   r!   rv   rw   s      r"   rg   zAggregateFnV2.combine   s	    ( 	r%   r?   c                     dS )a  Aggregates data within a single block.

        This method processes all rows in a given `Block` and returns a partial
        aggregation result for that block. For instance, if implementing a sum,
        this method would sum all relevant values within the block.

        Args:
            block: A `Block` of data to be aggregated.

        Returns:
            A partial aggregation result for the input block. The type of this
            result (`AggType`) should be consistent with the `current_accumulator`
            and `new` arguments of the `combine` method, and the `accumulator`
            argument of the `finalize` method.
        Nr   r!   r?   s     r"   ri   zAggregateFnV2.aggregate_block  s	    " 	r%   accumulatorc                     |S )a  Transforms the final accumulated state into the desired output.

        This method is called once per group after all blocks have been processed
        and all partial results have been combined. It provides an opportunity
        to perform a final transformation on the accumulated data.

        For many aggregations (e.g., Sum, Count, Min, Max), the accumulated state
        is already the final result, so this method can simply return the
        accumulator as is (which is the default behavior).

        For other aggregations, like Mean, this method is crucial.
        A Mean aggregation might accumulate `[sum, count]`. The `finalize`
        method would then compute `sum / count` to get the final mean.

        Args:
            accumulator: The final accumulated state for a group, after all
                `aggregate_block` and `combine` operations.

        Returns:
            The final result of the aggregation for the group.
        r   r!   r|   s     r"   r<   zAggregateFnV2.finalize  s
    , r%   rO   r   Nc                 j    | j         r+ddlm}  || j                                       |           d S d S )Nr   )SortKey)r`   2ray.data._internal.planner.exchange.sort_task_specr   validate_schema)r!   rO   r   s      r"   rQ   zAggregateFnV2._validate2  sS      	CRRRRRRGD)**::6BBBBB	C 	Cr%   )r,   r-   r.   rR   rL   r   r0   r
   r/   rN   rs   ru   abcabstractmethodrg   r   ri   r3   r<   rQ   __classcell__rp   s   @r"   rV   rV      s}       " "H$
$
 r?23$

 SM$
 $
 $
 $
 $
 $
 $
L%8C= % % % %c     	#29H	   * 	U     $O 8O    0C 2 Ct C C C C C C C Cr%   rV   c                   t     e Zd ZdZ	 	 	 ddee         dedee         f fdZded	e	fd
Z
de	de	d	e	fdZ xZS )Counta  Defines count aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Count

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Counting all rows:
            result = ds.aggregate(Count())
            # result: {'count()': 100}


            # Counting all rows per group:
            result = ds.groupby("group_key").aggregate(Count(on="id")).take_all()
            # result: [{'group_key': 0, 'count(id)': 34},
            #          {'group_key': 1, 'count(id)': 33},
            #          {'group_key': 2, 'count(id)': 33}]


    Args:
        on: Optional name of the column to count values on. If None, counts rows.
        ignore_nulls: Whether to ignore null values when counting. Only applies if
            `on` is specified. Default is `False` which means `Count()` on a column
            will count nulls by default. To match pandas default behavior of not counting nulls,
            set `ignore_nulls=True`.
        alias_name: Optional name for the resulting column.
    NFrX   rY   
alias_namec                 h    t                                          |r|nd|pd d||d            d S )Nzcount( r[   c                      dS Nr   r   r   r%   r"   rH   z Count.__init__.<locals>.<lambda>h  rI   r%   rX   rY   rW   )rl   rN   r!   rX   rY   r   rp   s       r"   rN   zCount.__init__^  sW     	$>JJ*>28*>*>*>%"	 	 	
 	
 	
 	
 	
r%   r?   r   c                     t          j        |          }| j        |                                S |                    | j        | j                  S )NrY   )r   rC   r`   num_rowscountra   )r!   r?   block_accessors      r"   ri   zCount.aggregate_blockk  sU    &077 (!**,,,##!0B $ 
 
 	
r%   rv   rw   c                     ||z   S r   r   ry   s      r"   rg   zCount.combinev  s    "S((r%   )NFN)r,   r-   r.   rR   r
   rL   r/   rN   r   intri   rg   r   r   s   @r"   r   r   9  s        ! !J !"$(	
 
SM
 
 SM	
 
 
 
 
 
	
U 	
s 	
 	
 	
 	
)3 )S )S ) ) ) ) ) ) ) )r%   r   c                        e Zd ZdZ	 	 	 ddee         dedee         f fdZded	e	e
ef         fd
Zde	e
ef         de	e
ef         d	e	e
ef         fdZ xZS )Suma7  Defines sum aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Sum

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Summing all rows per group:
            result = ds.aggregate(Sum(on="id"))
            # result: {'sum(id)': 4950}

    Args:
        on: The name of the numerical column to sum. Must be provided.
        ignore_nulls: Whether to ignore null values during summation. If `True` (default),
                      nulls are skipped. If `False`, the sum will be null if any
                      value in the group is null.
        alias_name: Optional name for the resulting column.
    NTrX   rY   r   c                 ~    t                                          |r|ndt          |           d||d            d S )Nzsum(r[   c                      dS r   r   r   r%   r"   rH   zSum.__init__.<locals>.<lambda>  rI   r%   r   rl   rN   rL   r   s       r"   rN   zSum.__init__  sY     	$;JJ*;R*;*;*;%"	 	 	
 	
 	
 	
 	
r%   r?   r   c                 f    t          j        |                              | j        | j                  S r   )r   rC   sumr`   ra   r{   s     r"   ri   zSum.aggregate_block  0    &u--11!4#5
 
 	
r%   rv   rw   c                     ||z   S r   r   ry   s      r"   rg   zSum.combine  s     #S((r%   NTN)r,   r-   r.   rR   r
   rL   r/   rN   r   r   r   floatri   rg   r   r   s   @r"   r   r   z  s         8 !!$(	
 
SM
 
 SM	
 
 
 
 
 

U 
uS%Z/@ 
 
 
 

)#(e#4);@e;L)	sEz	) ) ) ) ) ) ) )r%   r   c                        e Zd ZdZdddd fdee         dedee         deg ef         f fd	Z	d
e
defdZdededefdZ xZS )Mina3  Defines min aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Min

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Finding the minimum value per group:
            result = ds.groupby("group_key").aggregate(Min(on="id")).take_all()
            # result: [{'group_key': 0, 'min(id)': 0},
            #          {'group_key': 1, 'min(id)': 1},
            #          {'group_key': 2, 'min(id)': 2}]

    Args:
        on: The name of the column to find the minimum value from. Must be provided.
        ignore_nulls: Whether to ignore null values. If `True` (default), nulls are
                      skipped. If `False`, the minimum will be null if any value in
                      the group is null (for most data types, or follow type-specific
                      comparison rules with nulls).
        alias_name: Optional name for the resulting column.
        zero_factory: A callable that returns the initial "zero" value for the
                      accumulator. For example, for a float column, this would be
                      `lambda: float("+inf")`. Default is `lambda: float("+inf")`.
    NTc                       t          d          S )Nz+infr   r   r%   r"   rH   zMin.<lambda>      v r%   rX   rY   r   rW   c                 |    t                                          |r|ndt          |           d|||           d S )Nzmin(r[   r   r   r!   rX   rY   r   rW   rp   s        r"   rN   zMin.__init__  W     	$;JJ*;R*;*;*;%%	 	 	
 	
 	
 	
 	
r%   r?   r   c                 f    t          j        |                              | j        | j                  S r   )r   rC   minr`   ra   r{   s     r"   ri   zMin.aggregate_block  r   r%   rv   rw   c                 "    t          ||          S r   )r   ry   s      r"   rg   zMin.combine      
 &,,,r%   r,   r-   r.   rR   r
   rL   r/   r   r1   rN   r   ri   rg   r   r   s   @r"   r   r              D !!$(AVAV
 
SM
 
 SM	

 r#==>
 
 
 
 
 

U 
/I 
 
 
 

-7- (- 
$	- - - - - - - -r%   r   c                        e Zd ZdZdddd fdee         dedee         deg ef         f fd	Z	d
e
defdZdededefdZ xZS )Maxa9  Defines max aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Max

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Finding the maximum value per group:
            result = ds.groupby("group_key").aggregate(Max(on="id")).take_all()
            # result: [{'group_key': 0, 'max(id)': ...},
            #          {'group_key': 1, 'max(id)': ...},
            #          {'group_key': 2, 'max(id)': ...}]

    Args:
        on: The name of the column to find the maximum value from. Must be provided.
        ignore_nulls: Whether to ignore null values. If `True` (default), nulls are
                      skipped. If `False`, the maximum will be null if any value in
                      the group is null (for most data types, or follow type-specific
                      comparison rules with nulls).
        alias_name: Optional name for the resulting column.
        zero_factory: A callable that returns the initial "zero" value for the
                      accumulator. For example, for a float column, this would be
                      `lambda: float("-inf")`. Default is `lambda: float("-inf")`.
    NTc                       t          d          S )Nz-infr   r   r%   r"   rH   zMax.<lambda>  r   r%   rX   rY   r   rW   c                 |    t                                          |r|ndt          |           d|||           d S )Nzmax(r[   r   r   r   s        r"   rN   zMax.__init__  r   r%   r?   r   c                 f    t          j        |                              | j        | j                  S r   )r   rC   maxr`   ra   r{   s     r"   ri   zMax.aggregate_block  r   r%   rv   rw   c                 "    t          ||          S r   r   ry   s      r"   rg   zMax.combine   r   r%   r   r   s   @r"   r   r     r   r%   r   c            	       (    e Zd ZdZ	 	 	 ddee         dedee         f fdZded	ee	e
eef                           fd
Zde	e
eef                  de	e
eef                  d	e	e
eef                  fdZde	e
eef                  d	ee         fdZ xZS )Meana  Defines mean (average) aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Mean

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Calculating the mean value per group:
            result = ds.groupby("group_key").aggregate(Mean(on="id")).take_all()
            # result: [{'group_key': 0, 'mean(id)': ...},
            #          {'group_key': 1, 'mean(id)': ...},
            #          {'group_key': 2, 'mean(id)': ...}]

    Args:
        on: The name of the numerical column to calculate the mean on. Must be provided.
        ignore_nulls: Whether to ignore null values. If `True` (default), nulls are
                      skipped. If `False`, the mean will be null if any value in the
                      group is null.
        alias_name: Optional name for the resulting column.
    NTrX   rY   r   c                 ~    t                                          |r|ndt          |           d||d            d S )Nzmean(r[   c                  $    t          ddg          S r   listr   r%   r"   rH   zMean.__init__.<locals>.<lambda>S  s    q!f r%   r   r   r   s       r"   rN   zMean.__init__F  s\     	$<JJ*<#b''*<*<*<% .- 	 	
 	
 	
 	
 	
r%   r?   r   c                     t          j        |          }|                    | j        | j                  }|dk    s|d S |                    | j        | j                  }t          |          r|S ||gS r   )r   rC   r   r`   ra   r   r   )r!   r?   rE   r   sum_s        r"   ri   zMean.aggregate_blockV  su    !+E22	 5t7IJJA::4}}T2D4FGG4== 	 Ke}r%   rv   rw   c                 F    |d         |d         z   |d         |d         z   gS Nr   r\   r   ry   s      r"   rg   zMean.combineh  s,     $A&Q/1DQ1G#a&1PQQr%   r|   c                 T    |d         dk    rt           j        S |d         |d         z  S )Nr\   r   )npnanr~   s     r"   r<   zMean.finalizem  s-    q>Q 6M1~A..r%   r   )r,   r-   r.   rR   r
   rL   r/   rN   r   r	   r   r   r   ri   rg   r<   r   r   s   @r"   r   r   (  sB        < !!$(	
 
SM
 
 SM	
 
 
 
 
 
 U xU3:=N8O/P    $R#'c5j(9#:RAEeCQVJFWAXR	eCJ	 R R R R
/DsEz):$; / / / / / / / / /r%   r   c            
            e Zd ZdZ	 	 	 	 ddee         dededee         f fd	Zd
e	de
eeef                  fdZde
e         de
e         de
e         fdZde
e         dee         fdZ xZS )Stdah  Defines standard deviation aggregation.

    Uses Welford's online algorithm for numerical stability. This method computes
    the standard deviation in a single pass. Results may differ slightly from
    libraries like NumPy or Pandas that use a two-pass algorithm but are generally
    more accurate.

    See: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford's_online_algorithm

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Std

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Calculating the standard deviation per group:
            result = ds.groupby("group_key").aggregate(Std(on="id")).take_all()
            # result: [{'group_key': 0, 'std(id)': ...},
            #          {'group_key': 1, 'std(id)': ...},
            #          {'group_key': 2, 'std(id)': ...}]

    Args:
        on: The name of the column to calculate standard deviation on.
        ddof: Delta Degrees of Freedom. The divisor used in calculations is `N - ddof`,
            where `N` is the number of elements. Default is 1.
        ignore_nulls: Whether to ignore null values. Default is True.
        alias_name: Optional name for the resulting column.
    Nr\   TrX   ddofrY   r   c                     t                                          |r|ndt          |           d||d            || _        d S )Nzstd(r[   c                  $    t          g d          S )N)r   r   r   r   r   r%   r"   rH   zStd.__init__.<locals>.<lambda>  s    iii r%   r   )rl   rN   rL   _ddof)r!   rX   r   rY   r   rp   s        r"   rN   zStd.__init__  s_     	$;JJ*;R*;*;*;% 10 	 
	
 
	
 
	
 


r%   r?   r   c                 8   t          j        |          }|                    | j        | j                  }|dk    s|d S |                    | j        | j                  }t          |          r|S ||z  }|                    | j        | j        |          }|||gS Nr   r   )r   rC   r   r`   ra   r   r   sum_of_squared_diffs_from_mean)r!   r?   rE   r   r   meanM2s          r"   ri   zStd.aggregate_block  s    !+E22	 5DDVWWA::4}}T2D4FGG4== 	 Ke|55!4#5t
 
 D%  r%   rv   rw   c                 |    |\  }}}|\  }}}||z
  }	||z   }
||z  ||z  z   |
z  }||z   |	dz  |z  |z  |
z  z   }|||
gS )N   r   )r!   rv   rw   M2_amean_acount_aM2_bmean_bcount_bdeltar   r   r   s                r"   rg   zStd.combine  sx    
 !4fg #fg'!
  6G#33u<D[E1H/'9EAAD%  r%   r|   c                     |\  }}}|| j         z
  dk    rt          j        S t          j        ||| j         z
  z            S r   )r   r   r   mathsqrt)r!   r|   r   r   r   s        r"   r<   zStd.finalize  sF     &D%4:"" 6Myutz12333r%   )Nr\   TN)r,   r-   r.   rR   r
   rL   r   r/   rN   r   r	   r   r   ri   rg   r<   r   r   s   @r"   r   r   w  s       ! !J !!$( SM  	
 SM     *!U !tE#u*4E/F ! ! ! !"!#';!59%[!	e! ! ! !$4DK 4HUO 4 4 4 4 4 4 4 4r%   r   c                        e Zd ZdZdddd fdee         dedee         deg ef         f fd	Z	d
e
dee         fdZdededefdZ xZS )AbsMaxa_  Defines absolute max aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import AbsMax

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Calculating the absolute maximum value per group:
            result = ds.groupby("group_key").aggregate(AbsMax(on="id")).take_all()
            # result: [{'group_key': 0, 'abs_max(id)': ...},
            #          {'group_key': 1, 'abs_max(id)': ...},
            #          {'group_key': 2, 'abs_max(id)': ...}]

    Args:
        on: The name of the column to calculate absolute maximum on. Must be provided.
        ignore_nulls: Whether to ignore null values. Default is True.
        alias_name: Optional name for the resulting column.
        zero_factory: A callable that returns the initial "zero" value for the
                      accumulator. For example, for a float column, this would be
                      `lambda: 0`. Default is `lambda: 0`.
    NTc                      dS r   r   r   r%   r"   rH   zAbsMax.<lambda>  s     r%   rX   rY   r   rW   c                     |t          |t                    st          d| d          t                                          |r|ndt          |           d|||           d S )Nz/Column to aggregate on has to be provided (got r[   zabs_max(r   )rK   rL   rJ   rl   rN   r   s        r"   rN   zAbsMax.__init__  s     :ZC00:TrTTTUUU$?JJ*?SWW*?*?*?%%	 	 	
 	
 	
 	
 	
r%   r?   r   c                 >   t          j        |          }|                    | j        | j                  }|                    | j        | j                  }t          |          st          |          rd S t          t          |          t          |                    S r   )r   rC   r   r`   ra   r   r   abs)r!   r?   r   max_min_s        r"   ri   zAbsMax.aggregate_block  s    &077!!$"79KLL!!$"79KLL4== 	GDMM 	43t99c$ii(((r%   rv   rw   c                 "    t          ||          S r   r   ry   s      r"   rg   zAbsMax.combine  r   r%   r   r   s   @r"   r   r     s         > !!$(AJ
 
SM
 
 SM	

 r#==>
 
 
 
 
 
"	)U 	)x8R/S 	) 	) 	) 	)-7- (- 
$	- - - - - - - -r%   r   c            
            e Zd ZdZ	 	 	 	 ddee         dededee         f fd	Zd
e	e
         de	e
         de	e
         fdZdede	e
         fdZde	e
         dee
         fdZ xZS )Quantilea  Defines Quantile aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Quantile

            ds = ray.data.range(100)
            # Schema: {'id': int64}
            ds = ds.add_column("group_key", lambda x: x % 3)
            # Schema: {'id': int64, 'group_key': int64}

            # Calculating the 50th percentile (median) per group:
            result = ds.groupby("group_key").aggregate(Quantile(q=0.5, on="id")).take_all()
            # result: [{'group_key': 0, 'quantile(id)': ...},
            #          {'group_key': 1, 'quantile(id)': ...},
            #          {'group_key': 2, 'quantile(id)': ...}]

    Args:
        on: The name of the column to calculate the quantile on. Must be provided.
        q: The quantile to compute, which must be between 0 and 1 inclusive.
           For example, q=0.5 computes the median.
        ignore_nulls: Whether to ignore null values. Default is True.
        alias_name: Optional name for the resulting column.
    N      ?TrX   qrY   r   c                     || _         t                                          |r|ndt          |           d||t                     d S )Nz	quantile(r[   r   )_qrl   rN   rL   r   )r!   rX   r   rY   r   rp   s        r"   rN   zQuantile.__init__C  s^     $@JJ*@c"gg*@*@*@%	 	 	
 	
 	
 	
 	
r%   rv   rw   r   c                 $   t          |t                    r,t          |t                    r|                    |           |S t          |t                    r4t          |t                    s||dk    r|                    |           |S t          |t                    r4t          |t                    s||dk    r|                    |           |S g }||dk    r|                    |           ||dk    r|                    |           |S )Nr   )rK   r	   extendappend)r!   rv   rw   lss       r"   rg   zQuantile.combineS  s"   )400 	'ZT5J5J 	'&&s+++&&)400 	'*S$:O:O 	'3"99#**3///&&c4   	*5H$*O*O 	".3F"3L3L

.///J*/Bb/H/HII)***?sbyyIIcNNN	r%   r?   c                     t          j        |          }g }|                    d          D ]/}|                    |                    | j                             0|S rA   )r   rC   rD   r   getr`   )r!   r?   rE   r   rows        r"   ri   zQuantile.aggregate_blockl  s`    !+E22	&&&?? 	6 	6CIIcggd3445555	r%   r|   c                 &   | j         rd |D             }n'd |D             }t          |          dk    r|d         S |sd S d }t          |          }t          |          dz
  | j        z  }t	          j        |          }t	          j        |          }||k    r ||t          |                             S  ||t          |                             ||z
  z  } ||t          |                             ||z
  z  }	t          ||	z   d          S )Nc                 0    g | ]}t          |          |S r   r   .0vs     r"   
<listcomp>z%Quantile.finalize.<locals>.<listcomp>w  s#    DDDD1DDDr%   c                 0    g | ]}t          |          |S r   r   r   s     r"   r   z%Quantile.finalize.<locals>.<listcomp>y  s#    :::1wqzz:Q:::r%   r   c                     | S r   r   xs    r"   rH   z#Quantile.finalize.<locals>.<lambda>  s     r%   r\      )	ra   lensortedr   r   floorceilr   round)
r!   r|   nullskeyinput_valueskfcd0d1s
             r"   r<   zQuantile.finalizeu  s%    	 DDkDDDKK:::::E5zzA~~ Qx 	 4kk**"dg-JqMMIaLL663|CFF+,,, Sc!ff%&&!a%0Sc!ff%&&!a%0R"Wa   r%   )Nr   TN)r,   r-   r.   rR   r
   rL   r   r/   rN   r	   r   rg   r   ri   r<   r   r   s   @r"   r   r   %  s        < !!$(
 
SM
 
 	

 SM
 
 
 
 
 
 49 49 c    2U tCy    !DI !(3- ! ! ! ! ! ! ! !r%   r   c                   *    e Zd ZdZ G d deej                  Z	 	 	 	 ddee         de	dee         d	e
e	edf         f fd
Zdee         dee         dee         fdZdedefdZdedee         fdZed             Zededefd            Z xZS )Uniqueab  Defines unique aggregation.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import Unique

            ds = ray.data.range(100)
            ds = ds.add_column("group_key", lambda x: x % 3)

            # Calculating the unique values per group:
            result = ds.groupby("group_key").aggregate(Unique(on="id")).take_all()
            # result: [{'group_key': 0, 'unique(id)': ...},
            #          {'group_key': 1, 'unique(id)': ...},
            #          {'group_key': 2, 'unique(id)': ...}]

    Args:
        on: The name of the column from which to collect unique values.
        ignore_nulls: Whether to ignore null values when collecting unique items.
                      Default is True (nulls are excluded).
        alias_name: Optional name for the resulting column.
        encode_lists: If `True`, encode list elements.  If `False`, encode
            whole lists (i.e., the entire list is considered as a single object).
            `False` by default. Note that this is a top-level flatten (not a recursive
            flatten) operation.
    c                       e Zd ZdZdZdZdS )Unique.ListEncodingModea_  Controls how to encode individual elements inside the list column:

        - NONE: no encoding applied, elements (lists) are stored as is and
                unique ones are returned.
        - FLATTEN: column of element lists is flattened into a single list.
        - HASH: each list element is hashed, a list of unique hashes is returned.
        FLATTENHASHN)r,   r-   r.   rR   r  r  r   r%   r"   ListEncodingModer    s$        	 	 r%   r  NFrX   rY   r   encode_listsc                 8   t                                          |r|ndt          |           d||t                     t	          |t
          j                  r	|| _        d S t	          |t                    r|rt
          j        j	        | _        d S d | _        d S )Nzunique(r[   r   )
rl   rN   rL   setrK   r  r  _list_encoding_moder/   r  )r!   rX   rY   r   r  rp   s        r"   rN   zUnique.__init__  s     	$>JJ*>CGG*>*>*>%	 	 	
 	
 	
 lF$;<< 	,'3D$$$d++ 	, 	,'-'>'FD$$$'+D$$$r%   rv   rw   r   c                 X    |                      |          |                      |          z  S r   )_to_setry   s      r"   rg   zUnique.combine  s&    ||/004<<3D3DDDr%   r?   c                 .   || j                  }t          j        |          }|                                r| j        | j        t
          j        j        k    r't          j        |                                          }nX| j        t
          j        j	        k    r't          j        |
                                          }nt          d| j                   | j        r&t          j        |                                          }|                                S )Nz"list encoding mode not supported: )r`   r   
for_columnis_composed_of_listsr  r  r  r  flattenr  hashrJ   ra   dropnaunique)r!   r?   columncolumn_accessors       r"   _compute_uniquezUnique._compute_unique  s   t,--8@@ 0022	(4'6+B+JJJ"5"@#++--# # )V-D-III"5"@AUAUAWAW"X"X S9QSS    	W1<_=S=S=U=UVVO%%'''r%   c                 x    |                      |          }t          j        |                                          S r   )r   r   r  	to_pylist)r!   r?   r  s      r"   ri   zUnique.aggregate_block  s2    %%e,,"-f55??AAAr%   c                 B   t          | t                    rt                              |           S t          | t                    rYt          |           dk    r,t          | d         t                    rt          d |           } t                              |           S | hS )Nr   c                 (    | d nt          |           S r   )tuple)r   s    r"   rH   z Unique._to_set.<locals>.<lambda>  s    !)$$q r%   )rK   r  r  _normalize_nansr   r   mapr   s    r"   r  zUnique._to_set  s    a 	))!,,,4   	1vvzzj1t44z AA1EE))!,,,3Jr%   r   c                     d | D             S )Nc                 |    h | ]9}t          |t                    rt          j        |          s|nt          j        :S r   )rK   r   r   isnanr   r   s     r"   	<setcomp>z)Unique._normalize_nans.<locals>.<setcomp>  s;    WWWPQ*Q..K28A;;KRVWWWr%   r   r   s    r"   r&  zUnique._normalize_nans  s     XWUVWWWWr%   )NFNN)r,   r-   r.   rR   rL   enumEnumr  r
   r/   r   rN   r   r   rg   r   r   r   r	   ri   staticmethodr  r   r&  r   r   s   @r"   r  r    s        :
 
 
 
 
3	 
 
 
 !"$(<@, ,SM, , SM	,
 D"2D89, , , , , ,*E3s8 E#c( Es3x E E E E(U ({ ( ( ( (0BU BtCy B B B B   \ X: X# X X X \X X X X Xr%   r  c                        e Zd ZdZ	 ddedee         f fdZdedeee	f         fdZ
d	eee	f         d
eee	f         deee	f         fdZ xZS )ValueCountera  Counts the number of times each value appears in a column.

    This aggregation computes value counts for a specified column, similar to pandas'
    `value_counts()` method. It returns a dictionary with two lists: "values" containing
    the unique values found in the column, and "counts" containing the corresponding
    count for each value.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import ValueCounter

            # Create a dataset with repeated values
            ds = ray.data.from_items([
                {"category": "A"}, {"category": "B"}, {"category": "A"},
                {"category": "C"}, {"category": "A"}, {"category": "B"}
            ])

            # Count occurrences of each category
            result = ds.aggregate(ValueCounter(on="category"))
            # result: {'value_counter(category)': {'values': ['A', 'B', 'C'], 'counts': [3, 2, 1]}}

            # Using with groupby
            ds = ray.data.from_items([
                {"group": "X", "category": "A"}, {"group": "X", "category": "B"},
                {"group": "Y", "category": "A"}, {"group": "Y", "category": "A"}
            ])
            result = ds.groupby("group").aggregate(ValueCounter(on="category")).take_all()
            # result: [{'group': 'X', 'value_counter(category)': {'values': ['A', 'B'], 'counts': [1, 1]}},
            #          {'group': 'Y', 'value_counter(category)': {'values': ['A'], 'counts': [2]}}]

    Args:
        on: The name of the column to count values in. Must be provided.
        alias_name: Optional name for the resulting column. If not provided,
            defaults to "value_counter({column_name})".
    NrX   r   c                 ~    t                                          |r|ndt          |           d|dd            d S )Nzvalue_counter(r[   Tc                      g g dS )N)valuescountsr   r   r%   r"   rH   z'ValueCounter.__init__.<locals>.<lambda>=  s    B"!=!= r%   r   r   r!   rX   r   rp   s      r"   rN   zValueCounter.__init__4  sZ    
 	$EJJ*E3r77*E*E*E==	 	 	
 	
 	
 	
 	
r%   r?   r   c                 h    t          j        || j                           }|                                S r   )r   r  r`   value_counts)r!   r?   col_accessors      r"   ri   zValueCounter.aggregate_block@  s,    *5eD<Q6RSS((***r%   rv   new_accumulatorc                 N   |d         }|d         }d t          |          D             }t          |d         |d                   D ]^\  }}||v r||         }||xx         |z  cc<   "t          |          ||<   |                    |           |                    |           _|S )Nr3  r4  c                     i | ]\  }}||	S r   r   )r   ir   s      r"   
<dictcomp>z(ValueCounter.combine.<locals>.<dictcomp>O  s    ===41a!Q===r%   )	enumeratezipr   r   )	r!   rv   r9  r3  r4  value_to_indexv_newc_newidxs	            r"   rg   zValueCounter.combineE  s     %X.$X. >=9V+<+<=== 9?8;TUU 	% 	%LE5&&$U+su$(+Fu%e$$$e$$$$""r%   r   )r,   r-   r.   rR   rL   r
   rN   r   r   r	   ri   rg   r   r   s   @r"   r0  r0    s        % %T %)

 



 SM

 

 

 

 

 

+U +tCI + + + +
#!#t)_# c4i# 
c4i	# # # # # # # #r%   r0  rY   c                      |rd }n fd}|S )a  NOTE: PLEASE READ CAREFULLY BEFORE CHANGING

    Null-safe zero factory is crucial for implementing proper aggregation
    protocol (monoid) w/o the need for additional containers.

    Main hurdle for implementing proper aggregation semantic is to be able to encode
    semantic of an "empty accumulator" and be able to tell it from the case when
    accumulator is actually holding null value:

        - Empty container can be overridden with any value
        - Container holding null can't be overridden if ignore_nulls=False

    However, it's possible for us to exploit asymmetry in cases of ignore_nulls being
    True or False:

        - Case of ignore_nulls=False entails that if there's any "null" in the sequence,
         aggregation is undefined and correspondingly expected to return null

        - Case of ignore_nulls=True in turn, entails that if aggregation returns "null"
        if and only if the sequence does NOT have any non-null value

    Therefore, we apply this difference in semantic to zero-factory to make sure that
    our aggregation protocol is adherent to that definition:

        - If ignore_nulls=True, zero-factory returns null, therefore encoding empty
        container
        - If ignore_nulls=False, couldn't return null as aggregation will incorrectly
        prioritize it, and instead it returns true zero value for the aggregation
        (ie 0 for count/sum, -inf for max, etc).
    c                     d S r   r   )r^   s    r"   ro   z3_null_safe_zero_factory.<locals>._safe_zero_factory  s    4r%   c                                  S r   r   )r^   rW   s    r"   ro   z3_null_safe_zero_factory.<locals>._safe_zero_factory  s    <>>!r%   r   )rW   rY   ro   s   `  r"   rk   rk   ]  sC    @  "	 	 	 	
	" 	" 	" 	" 	" r%   	aggregater   c                 J     dt           dt          t                   f fd}|S )Nr?   r   c                 D     |           }t          |          rrd S |S r   r   )r?   resultrG  rY   s     r"   r_   z-_null_safe_aggregate.<locals>._safe_aggregate  s3    5!! 6?? 	| 	4r%   )r   r
   r0   )rG  rY   r_   s   `` r"   rh   rh     sB    u /)B        r%   r<   c                 F     dt           t                   dt          f fd}|S )Naccr   c                 <    t          |           r| n
 |           S r   r   )rL  r<   s    r"   rn   z+_null_safe_finalize.<locals>._safe_finalize  s"     cll5ss5r%   r
   r0   )r<   rn   s   ` r"   rj   rj     s<    6H_5 6/ 6 6 6 6 6 6
 r%   rg   c                      |r=dt           t                   dt           t                   dt           t                   f fd}n<dt           t                   dt           t                   dt           t                   f fd}|S )a&  Null-safe combination have to be an associative operation
    with an identity element (zero) or in other words implement a monoid.

    To achieve that in the presence of null values following semantic is
    established:

        - Case of ignore_nulls=True:
            - If current accumulator is null (ie empty), return new accumulator
            - If new accumulator is null (ie empty), return cur
            - Otherwise combine (current and new)

        - Case of ignore_nulls=False:
            - If new accumulator is null (ie has null in the sequence, b/c we're
            NOT ignoring nulls), return it
            - If current accumulator is null (ie had null in the prior sequence,
            b/c we're NOT ignoring nulls), return it
            - Otherwise combine (current and new)
    currw   r   c                 `    t          |           r|S t          |          r| S  | |          S r   r   rP  rw   rg   s     r"   rm   z)_null_safe_combine.<locals>._safe_combine  =     s|| )
 )
wsC(((r%   c                 `    t          |          r|S t          |           r| S  | |          S r   r   rR  s     r"   rm   z)_null_safe_combine.<locals>._safe_combine  rS  r%   rN  )rg   rY   rm   s   `  r"   rf   rf     s    2  )	)/*	)19/1J	)o&	) 	) 	) 	) 	) 	) 	)	)/*	)19/1J	)o&	) 	) 	) 	) 	) 	) r%   c                        e Zd ZdZ	 ddedee         f fdZdedee	         fdZ
d	ee	         d
ee	         dee	         fdZdee	         dee         fdZ xZS )MissingValuePercentagea  Calculates the percentage of null values in a column.

    This aggregation computes the percentage of null (missing) values in a dataset column.
    It treats both None values and NaN values as null. The result is a percentage value
    between 0.0 and 100.0, where 0.0 means no missing values and 100.0 means all values
    are missing.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import MissingValuePercentage

            # Create a dataset with some missing values
            ds = ray.data.from_items([
                {"value": 1}, {"value": None}, {"value": 3},
                {"value": None}, {"value": 5}
            ])

            # Calculate missing value percentage
            result = ds.aggregate(MissingValuePercentage(on="value"))
            # result: 40.0 (2 out of 5 values are missing)

            # Using with groupby
            ds = ray.data.from_items([
                {"group": "A", "value": 1}, {"group": "A", "value": None},
                {"group": "B", "value": 3}, {"group": "B", "value": None}
            ])
            result = ds.groupby("group").aggregate(MissingValuePercentage(on="value")).take_all()
            # result: [{'group': 'A', 'missing_pct(value)': 50.0},
            #          {'group': 'B', 'missing_pct(value)': 50.0}]

    Args:
        on: The name of the column to calculate missing value percentage on.
        alias_name: Optional name for the resulting column. If not provided,
            defaults to "missing_pct({column_name})".
    NrX   r   c                 ~    t                                          |r|ndt          |           d|dd            d S )Nzmissing_pct(r[   Fc                  
    ddgS r   r   r   r%   r"   rH   z1MissingValuePercentage.__init__.<locals>.<lambda>  
    !Q r%   r   r   r5  s      r"   rN   zMissingValuePercentage.__init__  sY     	$CJJ*CR*C*C*C'	 	 	
 	
 	
 	
 	
r%   r?   r   c                    t          j        || j                           }|                    d          }t	          j        t	          j        |                                d                                                    }||gS )NFr   T)nan_is_null)	r   r  r`   r   pcr   r   _as_arrow_compatibleas_py)r!   r?   r  total_count
null_counts        r"   ri   z&MissingValuePercentage.aggregate_block  sx    -8t?T9UVV%+++??VJ;;==4PPP
 

%'' 	
 K((r%   rv   rw   c                     t          |          t          |          cxk    rdk    sn J |d         |d         z   |d         |d         z   gS )Nr   r   r\   )r   ry   s      r"   rg   zMissingValuePercentage.combine  sb    &''3s888888q888888"SV+"SV+
 	
r%   r|   c                 F    |d         dk    rd S |d         |d         z  dz  S Nr\   r   g      Y@r   r~   s     r"   r<   zMissingValuePercentage.finalize#  s.    q>Q4AQ/588r%   r   )r,   r-   r.   rR   rL   r
   rN   r   r	   r   ri   rg   r   r<   r   r   s   @r"   rV  rV    s        % %T %)
 

 SM
 
 
 
 
 

)U 
)tCy 
) 
) 
) 
)
49 
49 
c 
 
 
 
9DI 9(5/ 9 9 9 9 9 9 9 9r%   rV  c                        e Zd ZdZ	 	 ddededee         f fdZded	e	e
         fd
Zde	e
         de	e
         d	e	e
         fdZde	e
         d	ee         fdZ xZS )ZeroPercentagea  Calculates the percentage of zero values in a numeric column.

    This aggregation computes the percentage of zero values in a numeric dataset column.
    It can optionally ignore null values when calculating the percentage. The result is
    a percentage value between 0.0 and 100.0, where 0.0 means no zero values and 100.0
    means all non-null values are zero.

    Example:

        .. testcode::

            import ray
            from ray.data.aggregate import ZeroPercentage

            # Create a dataset with some zero values
            ds = ray.data.from_items([
                {"value": 0}, {"value": 1}, {"value": 0},
                {"value": 3}, {"value": 0}
            ])

            # Calculate zero value percentage
            result = ds.aggregate(ZeroPercentage(on="value"))
            # result: 60.0 (3 out of 5 values are zero)

            # With null values and ignore_nulls=True (default)
            ds = ray.data.from_items([
                {"value": 0}, {"value": None}, {"value": 0},
                {"value": 3}, {"value": 0}
            ])
            result = ds.aggregate(ZeroPercentage(on="value", ignore_nulls=True))
            # result: 75.0 (3 out of 4 non-null values are zero)

            # Using with groupby
            ds = ray.data.from_items([
                {"group": "A", "value": 0}, {"group": "A", "value": 1},
                {"group": "B", "value": 0}, {"group": "B", "value": 0}
            ])
            result = ds.groupby("group").aggregate(ZeroPercentage(on="value")).take_all()
            # result: [{'group': 'A', 'zero_pct(value)': 50.0},
            #          {'group': 'B', 'zero_pct(value)': 100.0}]

    Args:
        on: The name of the column to calculate zero value percentage on.
            Must be a numeric column.
        ignore_nulls: Whether to ignore null values when calculating the percentage.
            If True (default), null values are excluded from both numerator and denominator.
            If False, null values are included in the denominator but not the numerator.
        alias_name: Optional name for the resulting column. If not provided,
            defaults to "zero_pct({column_name})".

    TNrX   rY   r   c                 ~    t                                          |r|ndt          |           d||d            d S )Nz	zero_pct(r[   c                  
    ddgS r   r   r   r%   r"   rH   z)ZeroPercentage.__init__.<locals>.<lambda>k  rY  r%   r   r   r   s       r"   rN   zZeroPercentage.__init__`  sY     	$@JJ*@c"gg*@*@*@%'	 	 	
 	
 	
 	
 	
r%   r?   r   c                 4   t          j        || j                           }|                    | j                  }|dk    rddgS |                                }t          j        |d          }t          j        |          	                                pd}||gS r   )
r   r  r`   r   ra   r]  r\  equalr   r^  )r!   r?   r  r   arrow_compatible	zero_mask
zero_counts          r"   ri   zZeroPercentage.aggregate_blockn  s    -8t?T9UVV%%43E%FFA::q6M*??AA H-q11	 VI&&,,..3!
E""r%   rv   rw   c                 F    |d         |d         z   |d         |d         z   gS r   r   ry   s      r"   rg   zZeroPercentage.combine  s.    "SV+"SV+
 	
r%   r|   c                 F    |d         dk    rd S |d         |d         z  dz  S rc  r   r~   s     r"   r<   zZeroPercentage.finalize  s.    q>Q4AQ/588r%   )TN)r,   r-   r.   rR   rL   r/   r
   rN   r   r	   r   ri   rg   r   r<   r   r   s   @r"   re  re  *  s        2 2n "$(	
 

 
 SM	
 
 
 
 
 
#U #tCy # # # #$
49 
49 
c 
 
 
 
9DI 9(5/ 9 9 9 9 9 9 9 9r%   re  c            
            e Zd Zd Z	 	 ddedee         dedee         f fdZ	defd	Z
d
edefdZdededefdZdedee         fdZ xZS )ApproximateQuantilec                 Z    	 ddl m} n"# t          $ r}t          d          |d }~ww xY w|S )Nr   )kll_floats_sketchzdApproximateQuantile requires the `datasketches` package. Install it with `pip install datasketches`.)datasketchesrr  ImportError)r!   rr  excs      r"   _require_datasketchesz)ApproximateQuantile._require_datasketches  s_    	6666666 	 	 	>  	
 !    	 
(#(   NrX   	quantilesquantile_precisionr   c                                                         _        | _         _        t	                                          |r|ndt          |           d|d fd           dS )u	  
        Computes the approximate quantiles of a column by using a datasketches kll_floats_sketch.
        https://datasketches.apache.org/docs/KLL/KLLSketch.html

        The accuracy of the KLL quantile sketch is a function of the configured quantile precision, which also affects
        the overall size of the sketch.
        The KLL Sketch has absolute error. For example, a specified rank accuracy of 1% at the
        median (rank = 0.50) means that the true quantile (if you could extract it from the set)
        should be between getQuantile(0.49) and getQuantile(0.51). This same 1% error applied at a
        rank of 0.95 means that the true quantile should be between getQuantile(0.94) and getQuantile(0.96).
        In other words, the error is a fixed +/- epsilon for the entire range of ranks.

        Typical single-sided rank error by quantile_precision (use for getQuantile/getRank):
            - quantile_precision=100 → ~2.61%
            - quantile_precision=200 → ~1.33%
            - quantile_precision=400 → ~0.68%
            - quantile_precision=800 → ~0.35%

        See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size.

        Null values in the target column are ignored when constructing the sketch.

        Example:

            .. testcode::

                import ray
                from ray.data.aggregate import ApproximateQuantile

                # Create a dataset with some values
                ds = ray.data.from_items(
                    [{"value": 20.0}, {"value": 40.0}, {"value": 60.0},
                    {"value": 80.0}, {"value": 100.0}]
                )

                result = ds.aggregate(ApproximateQuantile(on="value", quantiles=[0.1, 0.5, 0.9]))
                # Result: {'approx_quantile(value)': [20.0, 60.0, 100.0]}


        Args:
            on: The name of the column to calculate the quantile on. Must be a numeric column.
            quantiles: The list of quantiles to compute. Must be between 0 and 1 inclusive. For example, quantiles=[0.5] computes the median. Null entries in the source column are skipped.
            quantile_precision: Controls the accuracy and memory footprint of the sketch (K in KLL); higher values yield lower error but use more memory. Defaults to 800. See https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html for details on accuracy and size.
            alias_name: Optional name for the resulting column. If not provided, defaults to "approx_quantile({column_name})".
        zapprox_quantile(r[   Tc                  R                                                                    S r   zero	serialize)rz  r!   s   r"   rH   z.ApproximateQuantile.__init__.<locals>.<lambda>  s     +=!>!>!H!H!J!J r%   r   N)rv  _sketch_cls
_quantiles_quantile_precisionrl   rN   rL   )r!   rX   ry  rz  r   rp   s   `  ` r"   rN   zApproximateQuantile.__init__  s    h  5577##5 $GJJ*GSWW*G*G*GJJJJJ	 	 	
 	
 	
 	
 	
r%   c                 .    |                      |          S )N)r  )r  )r!   rz  s     r"   r~  zApproximateQuantile.zero  s    "4555r%   r?   r   c                    t          j        |          }|                                }|                    |                                           }|                     | j                  }|D ]J}|                                4|                    t          |                                                     K|
                                S r   )r   rC   to_arrowr  rs   r~  r  r^  updater   r  )r!   r?   rE   tabler  sketchvalues          r"   ri   z#ApproximateQuantile.aggregate_block  s    !+E22	""$$d4466774344 	4 	4E{{}}(eEKKMM22333!!!r%   rv   rw   c                    |                      | j                  }|                    | j                            |                     |                    | j                            |                     |                                S r   )r~  r  r8   r  deserializer  r!   rv   rw   combineds       r"   rg   zApproximateQuantile.combine  so    99T566t'334GHHIIIt'33C88999!!###r%   r|   c                 f    | j                             |                              | j                  S r   )r  r  get_quantilesr  r~   s     r"   r<   zApproximateQuantile.finalize  s)    ++K88FFtWWWr%   )rx  N)r,   r-   r.   rv  rL   r	   r   r   r
   rN   r~  r   bytesri   rg   r<   r   r   s   @r"   rp  rp    s       ! ! ! #&$(<
 <
<
 ;<
  	<

 SM<
 <
 <
 <
 <
 <
|6s 6 6 6 6	"U 	"u 	" 	" 	" 	"$5 $u $ $ $ $ $XE Xd5k X X X X X X X Xr%   rp  c                        e Zd Zd Z	 	 	 ddedededee         d	ef
 fd
ZdefdZ	de
defdZdededefdZdedeeeef                  fdZ xZS )ApproximateTopKc                 Z    	 ddl m} n"# t          $ r}t          d          |d }~ww xY w|S )Nr   )frequent_strings_sketchz`ApproximateTopK requires the `datasketches` package. Install it with `pip install datasketches`.)rs  r  rt  )r!   r  ru  s      r"   rv  z%ApproximateTopK._require_datasketches  s_    	<<<<<<< 	 	 	>  	
 '&rw     NFrX   r  log_capacityr   r  c                      | _          _                                          _        | _        t                                          |r|ndt          |           d|d fd           dS )u_  
        Computes the approximate top k items in a column by using a datasketches frequent_strings_sketch.
        https://datasketches.apache.org/docs/Frequency/FrequentItemsOverview.html

        Guarantees:
            - Any item with true frequency > N / (2^log_capacity) is guaranteed to appear in the results
            - Reported counts may have an error of at most ± N / (2^log_capacity).


        If log_capacity is too small for your data:
            - Low-frequency items may be evicted from the sketch, potentially causing the top-k
              results to miss items that should appear in the output.
            - The error bounds increase, reducing the accuracy of the reported counts.

        Example:

            .. testcode::

                import ray
                from ray.data.aggregate import ApproximateTopK

                ds = ray.data.from_items([
                    {"word": "apple"}, {"word": "banana"}, {"word": "apple"},
                    {"word": "cherry"}, {"word": "apple"}
                ])

                result = ds.aggregate(ApproximateTopK(on="word", k=2))
                # Result: {'approx_topk(word)': [{'word': 'apple', 'count': 3}, {'word': 'banana', 'count': 1}]}

        Args:
            on: The name of the column to aggregate.
            k: The number of top items to return.
            log_capacity: Base 2 logarithm of the maximum size of the internal hash map.
                Higher values increase accuracy but use more memory. Defaults to 15.
            alias_name: The name of the aggregate. Defaults to None.
            encode_lists: If `True`, encode list elements.  If `False`, encode
                whole lists (i.e., the entire list is considered as a single object).
                `False` by default. Note that this is a top-level flatten (not a recursive
                flatten) operation.
        zapprox_topk(r[   Tc                  R                                                                    S r   r}  )r  r!   s   r"   rH   z*ApproximateTopK.__init__.<locals>.<lambda>4  s    <!8!8!B!B!D!D r%   r   N)r  _log_capacityrv  _frequent_strings_sketch_encode_listsrl   rN   rL   )r!   rX   r  r  r   r  rp   s   `  `  r"   rN   zApproximateTopK.__init__  s    b )(,(B(B(D(D%)$CJJ*CR*C*C*CDDDDD	 	 	
 	
 	
 	
 	
r%   c                 .    |                      |          S )N)lg_max_k)r  )r!   r  s     r"   r~  zApproximateTopK.zero7  s    ,,l,CCCr%   r?   r   c                 h   t          j        |          }|                                }|                    |                                           }|                     | j                  }|D ]}|                                }| j        rYt          |t                    rD|D ]@}|t          j        |                                          }	|                    |	           Av|;t          j        |                                          }	|                    |	           |                                S r   )r   rC   r  r  rs   r~  r  r^  r  rK   r   pickledumpshexr  r  )
r!   r?   rE   r  r  r  r  py_valueitemdumps
             r"   ri   zApproximateTopK.aggregate_block:  s"   
 "+E22	""$$d4466774-.. 
	$ 
	$E{{}}H! $j4&@&@ $$ ( (D| !<--1133DMM$''''	(
 %|H--1133d###!!!r%   rv   rw   c                    |                      | j                  }|                    | j                            |                     |                    | j                            |                     |                                S r   )r~  r  r8   r  r  r  r  s       r"   rg   zApproximateTopK.combineP  so    99T/00t4@@ATUUVVVt4@@EEFFF!!###r%   r|   c                     ddl m} |                                 | j                            |                              |j                  }fd|d | j                 D             S )Nr   )frequent_items_error_typec           	          g | ]K}t          j        t                              |d                              dt	          |d                   iLS )r   r   r\   )r  loadsr  fromhexr   )r   r  r  s     r"   r   z,ApproximateTopK.finalize.<locals>.<listcomp>_  sV     
 
 
 V\%--Q"8"8997CQLLQ
 
 
r%   )rs  r  rs   r  r  get_frequent_itemsNO_FALSE_NEGATIVESr  )r!   r|   r  frequent_itemsr  s       @r"   r<   zApproximateTopK.finalizeV  s    ::::::''))6BB
 


6I
J
J 	
 
 
 
&xx0
 
 
 	
r%   )r  NF)r,   r-   r.   rv  rL   r   r
   r/   rN   r~  r   r  ri   rg   r	   r   r   r<   r   r   s   @r"   r  r    s+       ' ' ' $(";
 ;
;
 ;
 	;

 SM;
 ;
 ;
 ;
 ;
 ;
 ;
zD D D D D"U "u " " " ",$5 $u $ $ $ $ $
E 
d4S>.B 
 
 
 
 
 
 
 
r%   r  )Br   r,  r   r  retypingr   r   r   r   r   r   r	   r
   r   r   r   r   numpyr   pyarrow.computecomputer\  ray.data._internal.utilr   ray.data.blockr   r   r   r   r   ray.util.annotationsr   r   ray.data.datasetr   r   r0   r1   r3   compilerb   r6   ABCrV   r   r   r   r   r   r   r   r   r   r   r  r0  r/   rk   rh   rj   rf   rV  re  rp  r  r   r%   r"   <module>r     s   



    				                                      + + + + + +              7 6 6 6 6 6 6 6 (''''''    h    '+,,$W (?    ((&BJ'>??  IJJJ
X X X X X X X  KJXv W[C [C [C [C [CK'/=2P*Q [C [C [C| =) =) =) =) =)M#s(# =) =) =)@ 0) 0) 0) 0) 0)-c5j)5e+<<
= 0) 0) 0)f 9- 9- 9- 9- 9--24NN
O 9- 9- 9-x 9- 9- 9- 9- 9--24NN
O 9- 9- 9-x K/ K/ K/ K/ K/=eCJ/0%78 K/ K/ K/\ g4 g4 g4 g4 g4-U3:./6
7 g4 g4 g4T ?- ?- ?- ?- ?-]57QQR ?- ?- ?-D k! k! k! k! k!}T#YS	12 k! k! k!\ sX sX sX sX sX]3s8T#Y./ sX sX sXl N# N# N# N# N#= N# N# N#b* * * * *Z01 ugx001    (/9:x()?:;   18/IJ11 o 9:H_<UU1 1 1 1h WM9 M9 M9 M9 M9]49e+;< M9 M9 M9` W^9 ^9 ^9 ^9 ^9]49e#34 ^9 ^9 ^9B W^X ^X ^X ^X ^X- ^X ^X ^XB Ws
 s
 s
 s
 s
m s
 s
 s
 s
 s
r%   