
    &`i-                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZ d d	lmZ erd d
lm Z  dZ! e j"        e#          Z$dede%fdZ&dee%         de%fdZ'e	 dddde(de
ej)        j*        eej)        j*                 f         fd            Z+e G d de                      Z,dS )    N)Path)TYPE_CHECKINGListOptionalTuple)InputReader)	IOContext)from_json_datapostprocess_actions)DEFAULT_POLICY_IDSampleBatchconcat_samples)	PublicAPIoverride)SampleBatchType)AlgorithmConfigg      ?fpathextract_pathc                     t          j        t          |           d          5 }|                    |           d d d            d S # 1 swxY w Y   d S )Nr)zipfileZipFilestr
extractall)r   r   zip_refs      t/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/offline/dataset_reader.py_unzip_this_pathr      s    	US	)	) )W<((() ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) )s   AA
A
pathsformatc                 &   g }| D ]
}t          j        dt          |                    rt          |                              d          rt	          d          d}	 t          t          |          |           na# t          $ rT 	 t          t          t                    j	        j	        |z  |           n # t          $ r t          d|           w xY wY nw xY wt          t          |          
                                t          |          j         d| z            }|                    |           8t          |                              d          r|                    |           qt          |                                          sct          t          t                    j	        j	        |z            }t          |                                          st          d|           |}|                    |           |S )zLIf a path in paths is a zip file, unzip it and use path of the unzipped filez\.zip$zs3://z?unzip_if_needed currently does not support remote paths from s3z./zFile not found: .)researchr   
startswith
ValueErrorr   FileNotFoundErrorr   __file__parentabsolutestemappendexists)r   r   	ret_pathspathr   unzipped_pathrelative_paths          r   _unzip_if_neededr1      s.   I  '  '9YD		** 	'4yy##G,,  U    LG TL9999$ G G GG$T(^^%:%AD%H,WWWW( G G G+,Et,E,EFFFG XW	G  \""++--4::?0M0MV0M0MM M ]++++ 4yy##G,, '  &&&&Dzz((** )$'X(=(Dt(K$L$LM..5577 T/0R=0R0RSSS(D  &&&&s*   A==
C/B87C8CCCconfigr   num_workersreturnc                 
   | j         dk    sJ d| j                      | j        }|                    d          }ddg}|||vrt          d| d|           |                    d	          }|                    d
          }|r|s|rt          d          |r|s|st          d          |st	          |t
                    r|g}nJt	          |t                    r&t	          |d         t
                    s
J d            nt          d          t          ||          }|                    d|pd          }|                    dt                    }|r |            }	nf|dk    r%t          j
                            ||d|i          }	n;|dk    r%t          j
                            ||d|i          }	nt          d|          |dk    r|	|	gfS |	                    |d                              |          }
|	dg|
z   fS )a  Returns a dataset and a list of shards.

    This function uses algorithm configs to create a dataset and a list of shards.
    The following config keys are used to create the dataset:
        input: The input type should be "dataset".
        input_config: A dict containing the following key and values:
            `format`: str, speciifies the format of the input data. This will be the
            format that ray dataset supports. See ray.data.Dataset for
            supported formats. Only "parquet" or "json" are supported for now.
            `paths`: str, a single string or a list of strings. Each string is a path
            to a file or a directory holding the dataset. It can be either a local path
            or a remote path (e.g. to an s3 bucket).
            `loader_fn`: Callable[None, ray.data.Dataset], Instead of
            specifying paths and format, you can specify a function to load the dataset.
            `parallelism`: int, The number of tasks to use for loading the dataset.
            If not specified, it will be set to the number of workers.
            `num_cpus_per_read_task`: float, The number of CPUs to use for each read
            task. If not specified, it will be set to 0.5.

    Args:
        config: The config dict for the algorithm.
        num_workers: The number of shards to create for remote workers.

    Returns:
        dataset: The dataset object.
        shards: A list of dataset shards. For num_workers > 0 the first returned
        shared would be a dummy None shard for local_worker.
    datasetzQMust specify config.input_ as 'dataset' if calling `get_dataset_and_shards`. Got r   jsonparquetNzUnsupported format z. Supported formats are r   	loader_fnzBWhen using a `loader_fn`, you cannot specify a `format` or `path`.zMMust specify either a `loader_fn` or a `format` and `path` in `input_config`.r   z%Paths must be a list of path strings.z6Paths must be a path string or a list of path strings.parallelism   num_cpus_per_read_tasknum_cpus)r:   ray_remote_argsz!Un-supported Ray dataset format: F)
num_blocksshuffle)input_input_configgetr%   
isinstancer   listr1   DEFAULT_NUM_CPUS_PER_TASKraydata	read_jsonread_parquetrepartitionsplit)r2   r3   rB   r   supported_fmtsr   r9   r:   cpus_per_taskr6   remote_shardss              r   get_dataset_and_shardsrP   F   s   B =I%%%	B28-	B 	B &%% &Lh''Fi(NfN::R&RR.RR
 
 	

 W%%E  --I 
f 
 
P
 
 	

  
u 
i 

 
 	
 eS!! 	WGEEt$$ 	WeAh,,UU.UUU,UUVVV // ""=+2BCCK $$ "; M  F)++	6		($${Z<W % 
 
 
9		(''{Z<W ( 
 
 <fEEE a	!!  ++"E , 
 

%

 	 ...    c                       e Zd ZdZeddej        j        dee	         fd            Z
 ee          defd            Zdedefd	Zdedefd
ZdS )DatasetReadera  Reader object that loads data from Ray Dataset.

    Examples:
        config = {
            "input": "dataset",
            "input_config": {
                "format": "json",
                # A single data file, a directory, or anything
                # that ray.data.dataset recognizes.
                "paths": "/tmp/sample_batches/",
                # By default, parallelism=num_workers.
                "parallelism": 3,
                # Dataset allocates 0.5 CPU for each reader by default.
                # Adjust this value based on the size of your offline dataset.
                "num_cpus_per_read_task": 0.5,
            }
        }
    Ndsioctxc                     |pt                       _        dx _         _        d _        | _         j        sdn j                                         _        dt          j        j	        
                                _         j        j                            dd           _         j        j                            dd          } j        j                            dd          |r/t          t!          j         j        |z            d           _        |r߉ j        j         j        j        j         _         j                            t(                     _         j        j                            dd          s) j        j        j                            t(                    nd _        t-          d	 j        j         d
|                                 d            fd} |             _        dS d _        dS )ziInitializes a DatasetReader instance.

        Args:
            ds: Ray dataset to sample from.
        NFtrain_batch_sizer;   num_env_runnersr   seed_disable_preprocessorszDatasetReader z has z
, samples.c               3   v   K   	 j                                       } |                                 E d {V  6)NT)rY   )_datasetrandom_shuffle	iter_rows)rT   rY   selfs    r   iteratorz(DatasetReader.__init__.<locals>.iterator   sI      .5545@@B!||~~-------.rQ   )r	   _ioctx_default_policy
policy_mappreprocessorr\   countrG   rH   DataContextget_currentenable_progress_barsr2   rC   
batch_sizemaxmathceilworker_policy_mapr   preprocessorsprintworker_index_iter)r_   rT   rU   r3   r`   rY   s   `    @r   __init__zDatasetReader.__init__   s    *y{{155t !%ITTDM4G4G4I4I
BG((**? +,001CQGGk(,,->BB{!%%fd33 	O!$)DOk,I"J"JANNDO  	{!-#';#5#@ '+'7';';<M'N'N$  ;-112JERRDK&4889JKKK !
 V!9VV

VVV  . . . . . .
 "DJJJDJJJrQ   r4   c                    | j         J g }d}|| j        k     rt          | j                   }t          || j        j                  }||j        z  }|                     |          }t          || j                  }| 	                    |          }|
                    |           || j        k     t          |          }|S )Nr   )rr   ri   nextr
   ra   rm   re   _preprocess_if_neededr   _postprocess_if_neededr+   r   )r_   retre   ds       r   ru   zDatasetReader.next   s     z%%%do%%TZ  Aq$+"455AQWE**1--A#At{33A++A..AJJqMMM do%% S!!
rQ   batchc                       j         rHt          j        t          j        fD ]/}||v r)t	          j         fd||         D                       ||<   0|S )Nc                 D    g | ]}j                             |          S  )rd   	transform).0sr_   s     r   
<listcomp>z7DatasetReader._preprocess_if_needed.<locals>.<listcomp>  s*    LLLA*44Q77LLLrQ   )rd   r   CUR_OBSNEXT_OBSnpstack)r_   rz   keys   `  r   rv   z#DatasetReader._preprocess_if_needed  si     	#+[-AB  %<<!#LLLLsLLL" "E#J rQ   c                 p   | j         j                            d          s|S t          |t                    rrg }|                                D ]L}| j        .|                    | j                            |                     7|                    |           Mt          |          S t          d          )Npostprocess_inputsz7Postprocessing of multi-agent data not implemented yet.)ra   r2   rC   rD   r   split_by_episoderb   r+   postprocess_trajectoryr   NotImplementedError)r_   rz   out	sub_batchs       r   rw   z$DatasetReader._postprocess_if_needed  s    {!%%&:;; 	Le[)) 	C"3355 * *	'3JJt3JJ9UUVVVVJJy))))!#&&& &I  rQ   )N)__name__
__module____qualname____doc__r   rG   rH   Datasetr   r	   rs   r   r   r   ru   rv   rw   r}   rQ   r   rS   rS      s         & * *38+ *HY4G * * * Y*X Xko    "?     O       rQ   rS   )r   )-loggingrk   r"   r   pathlibr   typingr   r   r   r   numpyr   ray.datarG   ray.rllib.offline.input_readerr   ray.rllib.offline.io_contextr	   ray.rllib.offline.json_readerr
   r   ray.rllib.policy.sample_batchr   r   r   ray.rllib.utils.annotationsr   r   ray.rllib.utils.typingr   %ray.rllib.algorithms.algorithm_configr   rF   	getLoggerr   loggerr   r   r1   intrH   r   rP   rS   r}   rQ   r   <module>r      s      				        7 7 7 7 7 7 7 7 7 7 7 7      6 6 6 6 6 6 2 2 2 2 2 2 M M M M M M M M X X X X X X X X X X ; ; ; ; ; ; ; ; 2 2 2 2 2 2 FEEEEEE 		8	$	$)D ) ) ) ) )
$DI $s $ $ $ $N 23i/ i/i/,/i/
38T#("2334i/ i/ i/ i/X n n n n nK n n n n nrQ   