
    &`iV                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZmZ e	r
d dlZd dlmZ eeeeeef                  Z e j        e          Z e G d dee                      Z!ee G d	 d
                                  Z"e G d d                      Z# ed           G d d                      Z$dedede
fdZ%dS )    N)	dataclass)Enum)TYPE_CHECKINGAnyCallableDictListOptionalTypeUnion)DeveloperAPI	PublicAPI)Exprc                       e Zd ZdZdZdZdS )PartitionStylea  Supported dataset partition styles.

    Inherits from `str` to simplify plain text serialization/deserialization.

    Examples:
        >>> # Serialize to JSON text.
        >>> json.dumps(PartitionStyle.HIVE)  # doctest: +SKIP
        '"hive"'

        >>> # Deserialize from JSON text.
        >>> PartitionStyle(json.loads('"hive"'))  # doctest: +SKIP
        <PartitionStyle.HIVE: 'hive'>
    hivedirN)__name__
__module____qualname____doc__HIVE	DIRECTORY     t/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/datasource/partitioning.pyr   r      s$          DIIIr   r   c                       e Zd ZU dZeed<   dZee         ed<   dZ	ee
e                  ed<   dZeeeef                  ed<   dZed         ed<   d	 Zed
efd            Zedd            Zd ZdS )Partitioninga  Partition scheme used to describe path-based partitions.

    Path-based partition formats embed all partition keys and values directly in
    their dataset file paths.

    For example, to read a dataset with
    `Hive-style partitions <https://athena.guide/articles/hive-style-partitioning>`_:

        >>> import ray
        >>> from ray.data.datasource.partitioning import Partitioning
        >>> ds = ray.data.read_csv(
        ...     "s3://anonymous@ray-example-data/iris.csv",
        ...     partitioning=Partitioning("hive"),
        ... )

    Instead, if your files are arranged in a directory structure such as:

    .. code::

        root/dog/dog_0.jpeg
        root/dog/dog_1.jpeg
        ...

        root/cat/cat_0.jpeg
        root/cat/cat_1.jpeg
        ...

    Then you can use directory-based partitioning:

        >>> import ray
        >>> from ray.data.datasource.partitioning import Partitioning
        >>> root = "s3://anonymous@air-example-data/cifar-10/images"
        >>> partitioning = Partitioning("dir", field_names=["class"], base_dir=root)
        >>> ds = ray.data.read_images(root, partitioning=partitioning)
    styleNbase_dirfield_namesfield_typespyarrow.fs.FileSystem
filesystemc                 Z    | j         d| _         | j        i | _        d | _        d | _        d S )N )r    r"   _normalized_base_dir_resolved_filesystemselfs    r   __post_init__zPartitioning.__post_init__n   s8    = DM#!D$(!$(!!!r   returnc                 F    | j         |                                  | j         S )zJReturns the base directory normalized for compatibility with a filesystem.)r'   _normalize_base_dirr)   s    r   normalized_base_dirz Partitioning.normalized_base_dirx   (     $,$$&&&((r   c                 F    | j         |                                  | j         S )zHReturns the filesystem resolved for compatibility with a base directory.)r(   r.   r)   s    r   resolved_filesystemz Partitioning.resolved_filesystem   r0   r   c                    ddl m}  || j        | j                  \  }| _        t          |          dk    sJ dt          |                       |d         }t          |          r|                    d          s|dz  }|| _        dS )a  Normalizes the partition base directory for compatibility with the
        given filesystem.

        This should be called once a filesystem has been resolved to ensure that this
        base directory is correctly discovered at the root of all partitioned file
        paths.
        r   )_resolve_paths_and_filesystem   z0Expected 1 normalized base directory, but found /N)ray.data.datasource.path_utilr4   r    r$   r(   lenendswithr'   )r*   r4   pathsr/   s       r   r.   z Partitioning._normalize_base_dir   s     	POOOOO+H+HMO,
 ,
(t(
 JJ!OOOJc%jjJJ OO#Ah"## 	',?,H,H,M,M 	'3&$7!!!r   )r,   r#   )r   r   r   r   r   __annotations__r    r
   strr!   r	   r"   r   PartitionDataTyper$   r+   propertyr/   r2   r.   r   r   r   r   r   3   s         " "J  #Hhsm""" (,K$s)$+++ ;?K$s$5567>>>48J01888) ) ) )S ) ) ) X) ) ) ) X)8 8 8 8 8r   r   c                   \   e Zd ZdZeej        ddddfdedee         dee	e                  dee
eef                  ded         d	d fd
            ZdefdZded	e
eef         fdZdeddd	efdZed	efd            Zded	ee         fdZded	e
eef         fdZded	e
eef         fdZdS )PathPartitionParsera  Partition parser for path-based partition formats.

    Path-based partition formats embed all partition keys and values directly in
    their dataset file paths.

    Two path partition formats are currently supported - `HIVE` and `DIRECTORY`.

    For `HIVE` Partitioning, all partition directories under the base directory
    will be discovered based on `{key1}={value1}/{key2}={value2}` naming
    conventions. Key/value pairs do not need to be presented in the same
    order across all paths. Directory names nested under the base directory that
    don't follow this naming condition will be considered unpartitioned. If a
    partition filter is defined, then it will be called with an empty input
    dictionary for each unpartitioned file.

    For `DIRECTORY` Partitioning, all directories under the base directory will
    be interpreted as partition values of the form `{value1}/{value2}`. An
    accompanying ordered list of partition field names must also be provided,
    where the order and length of all partition values must match the order and
    length of field names. Files stored directly in the base directory will
    be considered unpartitioned. If a partition filter is defined, then it will
    be called with an empty input dictionary for each unpartitioned file. For
    example, if the base directory is `"foo"`, then `"foo.csv"` and `"foo/bar.csv"`
    would be considered unpartitioned files but `"foo/bar/baz.csv"` would be associated
    with partition `"bar"`. If the base directory is undefined, then `"foo.csv"` would
    be unpartitioned, `"foo/bar.csv"` would be associated with partition `"foo"`, and
    "foo/bar/baz.csv" would be associated with partition `("foo", "bar")`.
    Nr   r    r!   r"   r$   r#   r,   c                 F    t          | ||||          }t          |          S )a  Creates a path-based partition parser using a flattened argument list.

        Args:
            style: The partition style - may be either HIVE or DIRECTORY.
            base_dir: "/"-delimited base directory to start searching for partitions
                (exclusive). File paths outside of this directory will be considered
                unpartitioned. Specify `None` or an empty string to search for
                partitions in all file path directories.
            field_names: The partition key names. Required for DIRECTORY partitioning.
                Optional for HIVE partitioning. When non-empty, the order and length of
                partition key field names must match the order and length of partition
                directories discovered. Partition key field names are not required to
                exist in the dataset schema.
            field_types: A dictionary that maps partition key names to their desired
                data type. If not provided, the data type default to string.
            filesystem: Filesystem that will be used for partition path file I/O.

        Returns:
            The new path-based partition parser.
        )r   r@   )r   r    r!   r"   r$   schemes         r   ofzPathPartitionParser.of   s'    8 eX{KTT"6***r   partitioningc                 J   |j         }|j        }|t          j        k    r|st	          d          t          j        | j        t          j        | j        i}|                    |          | _	        | j	        't	          d| d|
                                           || _        dS )a  Creates a path-based partition parser.

        Args:
            partitioning: The path-based partition scheme. The parser starts
                searching for partitions from this scheme's base directory. File paths
                outside the base directory will be considered unpartitioned. If the
                base directory is `None` or an empty string then this will search for
                partitions in all file path directories. Field names are required for
                DIRECTORY partitioning, and optional for HIVE partitioning. When
                non-empty, the order and length of partition key field names must match
                the order and length of partition directories discovered.
        zDirectory partitioning requires a corresponding list of partition key field names. Please retry your request with one or more field names specified.NzUnsupported partition style: z. Supported styles: )r   r!   r   r   
ValueErrorr   _parse_hive_path_parse_dir_pathget
_parser_fnkeys_scheme)r*   rD   r   r!   parserss        r   __init__zPathPartitionParser.__init__   s     "".N,,,[,1   !6$d&:
 <C;;u;M;M?"6 6 6%,\\^^6 6   $r   pathc                     |                      |          }|i S |                     |          }| j        j                                        D ]\  }}t          ||         |          ||<   |S )a,  Parses partition keys and values from a single file path.

        Args:
            path: Input file path to parse.

        Returns:
            Dictionary mapping directory partition keys to values from the input file
            path. Returns an empty dictionary for unpartitioned files.
        )_dir_path_trim_baserJ   rL   r"   items_cast_value)r*   rO   dir_path
partitionsfield	data_types         r   __call__zPathPartitionParser.__call__   s|     ++D11I%)__X%>%>
 $ 8 > > @ @ 	J 	JE9 +Ju,=y I IJur   	predicater   c                 0   ddl }ddlm}  | |          }|sdS 	 |                    d |                                D                       } ||          }|                    |          }t          ||j        |j        t          j
        f          rt          |d                   S ddl}	t          ||	j                  rt          |j        d                   S t          |          S # t          $ r! t                               d|d           Y dS w xY w)	a3  Evaluate a predicate expression against partition values from a path.

        This method enables partition pruning by evaluating predicates that reference
        partition columns against the partition values parsed from file paths.

        Args:
            path: File path to parse partition values from.
            predicate: Expression that references partition columns.

        Returns:
            True if the partition satisfies the predicate (should read the file),
            False if it doesn't (can skip the file for partition pruning).
        r   N)NativeExpressionEvaluatorFc                     i | ]	\  }}||g
S r   r   ).0colvals      r   
<dictcomp>zGPathPartitionParser.evaluate_predicate_on_partition.<locals>.<dictcomp>0  s     EEESseEEEr   zUFailed to evaluate predicate on partition for path %s, conservatively including file.T)exc_info)pyarrow?ray.data._internal.planner.plan_expression.expression_evaluatorr[   tablerR   visit
isinstanceArrayChunkedArraynpndarrayboolpandasSeriesiloc	Exceptionloggerdebug)
r*   rO   rY   par[   partition_valuespartition_table	evaluatorresultpds
             r   evaluate_predicate_on_partitionz3PathPartitionParser.evaluate_predicate_on_partition  se    		
 	
 	
 	
 	
 	

  4:: 	 5	 hhEE,<,B,B,D,DEEE O
 21/BBI__Y//F &28R_bj"IJJ 'F1I&  &"),, ,FKN+++ << 	 	 	LL1	     44	s   BC* (2C* C* *'DDc                     | j         S )z)Returns the partitioning for this parser.)rL   r)   s    r   rB   zPathPartitionParser.schemeM       |r   c                     |                     | j        j                  sdS |t          | j        j                  d         }t	          j        |          S )zTrims the normalized base directory and returns the directory path.

        Returns None if the path does not start with the normalized base directory.
        Simply returns the directory path if the base directory is undefined.
        N)
startswithrL   r/   r8   	posixpathdirnamer*   rO   s     r   rQ   z'PathPartitionParser._dir_path_trim_baseR  sP     t|?@@ 	4C899;;< &&&r   rT   c           	         d |                     d          D             }|rd |D             ng }d |D             }| j        j        }|r|rt          |          t          |          k    r3t	          dt          |           dt          |           d| d          t          |          D ]8\  }}||         d	         |k    r!t	          d
| d||         d	                    9t          |          S )zHive partition path parser.

        Returns a dictionary mapping partition keys to values given a hive-style
        partition path of the form "{key1}={value1}/{key2}={value2}/..." or an empty
        dictionary for unpartitioned files.
        c                 H    g | ]}||                     d           dk    | S )=r5   )countr]   ds     r   
<listcomp>z8PathPartitionParser._parse_hive_path.<locals>.<listcomp>d  s1    LLLa!L9J9J9J9J9Jr   r6   c                 8    g | ]}|                     d           S )r   )splitr   s     r   r   z8PathPartitionParser._parse_hive_path.<locals>.<listcomp>e  s"    ///QAGGCLL///r   c                 V    g | ]&\  }}|t           j                            |          g'S r   )urllibparseunquote)r]   keyvalues      r   r   z8PathPartitionParser._parse_hive_path.<locals>.<listcomp>i  s1    RRR:3S&,..u556RRRr   	Expected  partition value(s) but found : .r   zExpected partition key z but found )r   rL   r!   r8   rF   	enumeratedict)r*   rT   dirskv_pairsr!   i
field_names          r   rG   z$PathPartitionParser._parse_hive_path]  sU    ML8>>#..LLL37?//$////R SRRRRl. 	8 	8}}K 0 000 4K 0 0 4 48}}4 4(04 4 4   "+;!7!7  :A;q>Z//$,* , ,#A;q>, ,   0
 H~~r   c           	      >   d |                     d          D             }| j        j        }|rSt          |          t          |          k    r3t	          dt          |           dt          |           d| d          |si S d t          ||          D             S )af  Directory partition path parser.

        Returns a dictionary mapping directory partition keys to values from a
        partition path of the form "{value1}/{value2}/..." or an empty dictionary for
        unpartitioned files.

        Requires a corresponding ordered list of partition key field names to map the
        correct key to each value.
        c                     g | ]}||S r   r   r   s     r   r   z7PathPartitionParser._parse_dir_path.<locals>.<listcomp>  s    444a!4444r   r6   r   r   r   r   c                     i | ]
\  }}|||S Nr   )r]   rV   	directorys      r   r`   z7PathPartitionParser._parse_dir_path.<locals>.<dictcomp>  s.     
 
 
 y  9   r   )r   rL   r!   r8   rF   zip)r*   rT   r   r!   s       r   rH   z#PathPartitionParser._parse_dir_pathz  s     548>>#..444l. 	CII[!1!111(C,, ( (t99( ( $( ( (  
  	I
 
$'T$:$:
 
 
 	
r   )r   r   r   r   staticmethodr   r   r
   r<   r	   r   r=   rC   r   rN   rX   rk   rx   r>   rB   rQ   rG   rH   r   r   r   r@   r@      s        :  . 3"&+/>B8<+ ++3-+ d3i(+ d3(9#9:;	+
 45+ 
+ + + \+<$\ $ $ $ $BS T#s(^    (;C ;F ;t ; ; ; ;z     X	' 	' 	' 	' 	' 	' c3h    :
 
S#X 
 
 
 
 
 
r   r@   beta)	stabilityc                   P   e Zd ZdZeej        ddddfdeee	e	f         ge
f         dedee	         deee	                  deee	ef                  ded	         d
d fd            Zdedeee	e	f         ge
f         fdZdee	         d
ee	         fdZde	d
e
fdZed
efd            ZdS )PathPartitionFilterzPartition filter for path-based partition formats.

    Used to explicitly keep or reject files based on a custom filter function that
    takes partition keys and values parsed from the file's path as input.
    N	filter_fnr   r    r!   r"   r$   r#   r,   c                 f    t          |||||          }t          |          }t          ||           S )a  Creates a path-based partition filter using a flattened argument list.

        Args:
            filter_fn: Callback used to filter partitions. Takes a dictionary mapping
                partition keys to values as input. Unpartitioned files are denoted with
                an empty input dictionary. Returns `True` to read a file for that
                partition or `False` to skip it. Partition keys and values are always
                strings read from the filesystem path. For example, this removes all
                unpartitioned files:

                .. code:: python

                    lambda d: True if d else False

                This raises an assertion error for any unpartitioned file found:

                .. code:: python

                    def do_assert(val, msg):
                        assert val, msg

                    lambda d: do_assert(d, "Expected all files to be partitioned!")

                And this only reads files from January, 2022 partitions:

                .. code:: python

                    lambda d: d["month"] == "January" and d["year"] == "2022"

            style: The partition style - may be either HIVE or DIRECTORY.
            base_dir: "/"-delimited base directory to start searching for partitions
                (exclusive). File paths outside of this directory will be considered
                unpartitioned. Specify `None` or an empty string to search for
                partitions in all file path directories.
            field_names: The partition key names. Required for DIRECTORY partitioning.
                Optional for HIVE partitioning. When non-empty, the order and length of
                partition key field names must match the order and length of partition
                directories discovered. Partition key field names are not required to
                exist in the dataset schema.
            field_types: A dictionary that maps partition key names to their desired
                data type. If not provided, the data type defaults to string.
            filesystem: Filesystem that will be used for partition path file I/O.

        Returns:
            The new path-based partition filter.
        )r   r@   r   )r   r   r    r!   r"   r$   rB   path_partition_parsers           r   rC   zPathPartitionFilter.of  s9    n eX{KTT 3F ; ;"#8)DDDr   r   c                 "    || _         || _        dS )a  Creates a new path-based partition filter based on a parser.

        Args:
            path_partition_parser: The path-based partition parser.
            filter_fn: Callback used to filter partitions. Takes a dictionary mapping
                partition keys to values as input. Unpartitioned files are denoted with
                an empty input dictionary. Returns `True` to read a file for that
                partition or `False` to skip it. Partition keys and values are always
                strings read from the filesystem path. For example, this removes all
                unpartitioned files:
                ``lambda d: True if d else False``
                This raises an assertion error for any unpartitioned file found:
                ``lambda d: assert d, "Expected all files to be partitioned!"``
                And this only reads files from January, 2022 partitions:
                ``lambda d: d["month"] == "January" and d["year"] == "2022"``
        N)_parser
_filter_fn)r*   r   r   s      r   rN   zPathPartitionFilter.__init__  s    * -#r   r:   c                 6     |} j          fd|D             }|S )a}  Returns all paths that pass this partition scheme's partition filter.

        If no partition filter is set, then returns all input paths. If a base
        directory is set, then only paths under this base directory will be parsed
        for partitions. All paths outside of this base directory will automatically
        be considered unpartitioned, and passed into the filter function as empty
        dictionaries.

        Also normalizes the partition base directory for compatibility with the
        given filesystem before applying the filter.

        Args:
            paths: Paths to pass through the partition filter function. All
                paths should be normalized for compatibility with the given
                filesystem.
        Returns:
            List of paths that pass the partition filter, or all paths if no
            partition filter is defined.
        Nc                 >    g | ]}                     |          |S r   )apply)r]   rO   r*   s     r   r   z0PathPartitionFilter.__call__.<locals>.<listcomp>  s*    IIIt

48H8HIdIIIr   )r   )r*   r:   filtered_pathss   `  r   rX   zPathPartitionFilter.__call__  s1    ( ?&IIIIuIIINr   rO   c                 R    |                      |                     |                    S r   )r   r   r   s     r   r   zPathPartitionFilter.apply
  s     t||D11222r   c                     | j         S )z2Returns the path partition parser for this filter.)r   r)   s    r   parserzPathPartitionFilter.parser  rz   r   )r   r   r   r   r   r   r   r   r   r<   rk   r
   r	   r=   rC   r@   rN   rX   r   r>   r   r   r   r   r   r     s          !/ 3"&+/>B8<8E 8ET#s(^,d238E8E 3-8E d3i(	8E
 d3(9#9:;8E 458E 
8E 8E 8E \8Et$2$ T#s(^,d23$ $ $ $0d3i DI    23# 3$ 3 3 3 3 +    X  r   r   r   rW   r,   c                     |t           u rt          |           S |t          u rt          |           S |t          u r|                                 dk    S | S )Ntrue)intfloatrk   lower)r   rW   s     r   rS   rS     sS    C5zz	e		U||	d		{{}}&&r   )&loggingr}   urllib.parser   dataclassesr   enumr   typingr   r   r   r   r	   r
   r   r   numpyri   ray.util.annotationsr   r   rb   ray.data.expressionsr   r   r   r<   rk   r=   	getLoggerr   rp   r   r   r@   r   rS   r   r   r   <module>r      s_            ! ! ! ! ! !      	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	     8 8 8 8 8 8 8 8 *NNN)))))) sE3456 		8	$	$     S$   & 
e8 e8 e8 e8 e8 e8 e8  e8P u
 u
 u
 u
 u
 u
 u
 u
p Vy y y y y y y yxs '8 S      r   