
    &`i*                        d dl Z d dlZd dlmZmZmZmZmZmZ d dl	Z
d dlmZ d dlmZ d dlmZ d dlmZ erd dlZ ej        e          Zg dZ G d d	e          Z G d
 de          Zde
j        fdZ G d de j                  ZdS )    N)TYPE_CHECKINGAnyDictListOptionalUnion)pyarrow_table_from_pydict)PandasBlockAccessor)DataContext)FileBasedDatasource)
jsonjsonlzjson.gzzjsonl.gzzjson.brzjsonl.brzjson.zstz	jsonl.zstzjson.lz4z	jsonl.lz4c                        e Zd ZdZdddeeee         f         deeee	f                  f fdZ
dd	Zdd
ZdddefdZ xZS )ArrowJSONDatasourcez>JSON datasource, for reading and writing JSON and JSONL files.N)arrow_json_argspathsr   c                    ddl m}  t                      j        |fi | |i }|                    d|                    d                    | _        || _        d S )Nr   )r   read_optionsF)use_threads)pyarrowr   super__init__popReadOptionsr   r   )selfr   r   file_based_datasource_kwargsr   	__class__s        /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/json_datasource.pyr   zArrowJSONDatasource.__init__&   s     	!     ??">???" O+//D,,,??
 
  /    bufferpyarrow.lib.Bufferc              #   >  K   ddl }ddlm} | j        j        }t          j                    j        }	 	  |j        t          j
        |          fd| j        i| j        V  || j        _        dS # |j        $ r}dt          |          v r|| j        j        |k     rNt                              d| j        j         d| j        j        dz   d	           | j        xj        dz  c_        n'|                    | d
| j        j         d          |Y d}~nd}~ww xY w)zRead with PyArrow JSON reader, trying to auto-increase the
        read block size in the case of the read object
        straddling block boundaries.r   NTr   z0straddling object straddles two block boundariesz+JSONDatasource read failed with block_size=z. Retrying with block_size=   .z! - Auto-increasing block size to a+   bytes failed. Please try manually increasing the block size through the `read_options` parameter to a larger size. For example: `read_json(..., read_options=pyarrow.json.ReadOptions(block_size=10 << 25))`More information on this issue can be found here: https://github.com/apache/arrow/issues/25674)r   pyarrow.jsonr   r   
block_sizer   get_currenttarget_max_block_size	read_jsonioBytesIOr   ArrowInvalidstrloggerdebug)r   r    papajsoninit_block_sizemax_block_sizees          r   _read_with_pyarrow_read_jsonz0ArrowJSONDatasource._read_with_pyarrow_read_json9   s      	%%%%%%$ +6$022H#	"&f&Jv&& !%!2 *    
 0?!,?   EQOO&.,7.HH N*.*;*FN N*.*;*F*JN N N  
 )449444 oo  L L#0;L L L	 	 	 G 54444#	s   :A/ /
D9BDDc              #     K   ddl }ddl}|j        dk    rdS |                    t	          j        |                    }	 |j                            |          V  dS # t          $ r}dt          |          v sJ t          |                      ddl
m}  |t                    }|D ]7}|                                D ] \  }	}
||	                             |
           !8t          |          V  Y d}~dS d}~ww xY w)z{Fallback method to read JSON files with Python's native json.load(),
        in case the default pyarrow json reader fails.r   Nzno attribute 'from_pylist')defaultdict)r   r   sizeloadr*   r+   Tablefrom_pylistAttributeErrorr-   collectionsr7   listitemsappendr	   )r   r    r   r0   parsed_jsonr4   r7   dctrowkvs              r   _read_with_python_jsonz*ArrowJSONDatasource._read_with_python_jsonw   sH      	 ;!Fii
6 2 233	1(&&{3333333 	1 	1 	1 03q669993q66999//////+d##C" % %IIKK % %DAqFMM!$$$$%+C00000000000	1s    A 
C<(B	C77C<fpyarrow.NativeFilepathc              #     K   dd l }|                                }	 |                     |          E d {V  d S # |j        $ rC}t                              d|            |                     |          E d {V  Y d }~d S d }~ww xY w)Nr   zyError reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
)r   read_bufferr5   r,   r.   warningrF   )r   rG   rI   r0   r    r4   s         r   _read_streamz ArrowJSONDatasource._read_stream   s       !		;88@@@@@@@@@@@ 	; 	; 	;NN+'(+ +  
 226:::::::::::::::	;s   9 
B8BB)r    r!   )__name__
__module____qualname____doc__r   r-   r   r   r   r   r   r5   rF   rM   __classcell__r   s   @r   r   r   #   s        HH 59	/ / /S$s)^$/ "$sCx.1	/ / / / / /&< < < <|1 1 1 18;2 ;# ; ; ; ; ; ; ; ;r   r   c                        e Zd ZdZdZdeeee         f         def fdZ	dddefd	Z
ddd
ee         fdZddded
df fdZ xZS )PandasJSONDatasourcei   i'  r   target_output_size_bytesc                 J     t                      j        |fi | || _        d S N)r   r   _target_output_size_bytes)r   r   rV   r   r   s       r   r   zPandasJSONDatasource.__init__   s3     	??">???)A&&&r   rG   rH   rI   c              #   F  K   |                      |          }t          || j                  }|*t          j        ||d          }t          |          V  d S t          j        ||d          5 }|D ]}t          |          V  	 d d d            d S # 1 swxY w Y   d S )Nbuffer_sizeT	chunksizelines)_estimate_chunksizeStrictBufferedReader_BUFFER_SIZEpdr)   _cast_range_index_to_string)r   rG   rI   r^   streamdfreaders          r   rM   z!PandasJSONDatasource._read_stream   s	     ,,Q//	%aT5FGGGf	FFFB-b1111111 f	FFF :&  : :B5b999999:: : : : : : : : : : : : : : : : : :s   1BBBreturnc                    |                                 s| j        S |                                dk    s
J d            | j        dS t	          || j                  }t          j        |dd          5 }	 t          t          |                    }n# t          $ r Y ddd           dS w xY w	 ddd           n# 1 swxY w Y   t          j        |          }|                                dk    rd}nN|                                |                                z  }t          t!          | j        |z            d          }|                    d           |S )z{Estimate the chunksize by sampling the first row.

        This is necessary to avoid OOMs while reading the file.
        r   z%File pointer must be at the beginningNr[      Tr]   )seekable_DEFAULT_CHUNK_SIZEtellrY   ra   rb   rc   r)   rd   nextStopIterationr
   	for_blocknum_rows
size_bytesmaxroundseek)r   rG   re   rg   rf   block_accessorr^   bytes_per_rows           r   r`   z(PandasJSONDatasource._estimate_chunksize   s    zz|| 	,++vvxx1}}}E}}})14%aT5FGGG\&AT::: 	f0f>>    		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 -6r::""$$))II*5577.:Q:Q:S:SSME$"@="PQQSTUUI 	
q			s6   4B<6BB<
B,B<+B,,B<<C C 
filesystemzpyarrow.fs.FileSystemc                     |                      ||          }||                    |          S  t                      j        ||fi |S rX   )resolve_compressionopen_input_filer   _open_input_source)r   rx   rI   	open_argscompressionr   s        r   r|   z'PandasJSONDatasource._open_input_source   sU     ..tY??--d333)uww)*dHHiHHHr   )rN   rO   rP   rb   rl   r   r-   r   intr   rM   r   r`   r|   rR   rS   s   @r   rU   rU      s        L  BS$s)^$B #&B B B B B B:2 :# : : : :%9 hsm    @I+I I
 
I I I I I I I I I Ir   rU   rf   c                     t          | j        t          j                  r$| j                            t
                    | _        | S rX   )
isinstancecolumnsrc   
RangeIndexastyper-   )rf   s    r   rd   rd      s6     "*bm,, ,Z&&s++
Ir   c                   P     e Zd ZdZdej        defdZd
dZde	fdZ
 fd	Z xZS )ra   a  Wrapper that prevents premature file closure and ensures full-buffered reads.

    This is necessary for two reasons:
    1. The datasource reads the file twice -- first to sample and determine the chunk size,
       and again to load the actual data. Since pandas assumes ownership of the file and
       may close it, we prevent that by explicitly detaching the underlying file before
       closing the buffer.

    2. pandas wraps the file in a TextIOWrapper to decode bytes into text. TextIOWrapper
       prefers calling read1(), which doesn't prefetch for random-access files
       (e.g., from PyArrow). This wrapper forces all reads through the full buffer to
       avoid inefficient small-range S3 GETs.
    filer\   c                 <    t          j        ||          | _        d S )Nr[   )r*   BufferedReader_file)r   r   r\   s      r   r   zStrictBufferedReader.__init__  s    &tEEE


r   c                6    | j                             |          S rX   )r   read)r   r8   s     r   r   zStrictBufferedReader.read  s    zt$$$r   rh   c                     dS )NT )r   s    r   readablezStrictBufferedReader.readable  s    tr   c                     | j         sT| j                                         | j                                         t	                                                       d S d S rX   )closedr   detachcloser   )r   r   s    r   r   zStrictBufferedReader.close  sT    { 	JJGGMMOOOOO	 	r   )r   )rN   rO   rP   rQ   r*   	RawIOBaser   r   r   boolr   r   rR   rS   s   @r   ra   ra     s         FR\ F F F F F% % % %$            r   ra   )r*   loggingtypingr   r   r   r   r   r   pandasrc   $ray.air.util.tensor_extensions.arrowr	   ray.data._internal.pandas_blockr
   ray.data.contextr   )ray.data.datasource.file_based_datasourcer   r   	getLoggerrN   r.   JSON_FILE_EXTENSIONSr   rU   	DataFramerd   r   ra   r   r   r   <module>r      s   				  B B B B B B B B B B B B B B B B     J J J J J J ? ? ? ? ? ? ( ( ( ( ( ( I I I I I I NNN		8	$	$   $~; ~; ~; ~; ~;- ~; ~; ~;BRI RI RI RI RI. RI RI RIjBL        2<     r   