
    &`i:                         d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
mZ d dlmZmZmZ d dlmZmZ d dlmZ  e j        e          Zdedefd	Ze G d
 de                      ZdS )    N)AnyCallableDictIterableListOptionalTuple)_check_import)BlockBlockAccessorBlockMetadata)
DatasourceReadTask)DeveloperAPI
filter_strreturnc                 p    d}d}| D ].}|r|dk    r|sd}|dk    o| }|dk    rd}d}#|dk    r dS d}/dS )NF'\T; )r   	in_stringescape_nextcs       /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/clickhouse_datasource.py_is_filter_string_safer      s|    IK $ $ 	$Cxxx!	99k/KK Cxx 	#cuu# 4    c                      e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
Z	 	 	 	 	 d'dededeee                  dee         deeee         ef                  deeeef                  deeeef                  fdZd Zd ZdefdZdededefdZdedeg ee         f         fdZd Zdee         fdZdee         fdZ d edee         fd!Z!dedefd"Z"dee         fd#Z#	 d(d$ed%ee         dee$         fd&Z%dS ))ClickHouseDatasourcea  
    A Ray datasource for reading from ClickHouse.

    Args:
        table: Fully qualified table or view identifier (e.g.,
            "default.table_name").
        dsn: A string in DSN (Data Source Name) HTTP format (e.g.,
            "clickhouse+http://username:password@host:8124/default").
            For more information, see `ClickHouse Connection String doc
            <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
        columns: Optional List of columns to select from the data source.
            If no columns are specified, all columns will be selected by default.
        filter: Optional SQL filter string that will be used in the
            WHERE statement (e.g., "label = 2 AND text IS NOT NULL").
            The filter must be valid for use in a ClickHouse SQL WHERE clause.
            Note: Parallel reads are not currently supported when a filter is set.
            Specifying a filter forces the parallelism to 1 to ensure deterministic
            and consistent results. For more information, see
            `ClickHouse SQL WHERE Clause doc
            <https://clickhouse.com/docs/en/sql-reference/statements/select/where>`_.
        order_by: Optional Tuple containing a list of columns to order by
            and a boolean indicating the order. Note: order_by is required to
            support parallelism.
        client_settings: Optional ClickHouse server settings to be used with the
            session/every request. For more information, see
            `ClickHouse Client Settings doc
            <https://clickhouse.com/docs/en/integrations/python#settings-argument>`_.
        client_kwargs: Optional Additional keyword arguments to pass to the
            ClickHouse client. For more information,
            see `ClickHouse Core Settings doc
            <https://clickhouse.com/docs/en/integrations/python#additional-options>`_.
    d   2   z#SELECT {select_clause} FROM {table}z3EXPLAIN SELECT 1 FROM {table} WHERE {filter_clause}z2SELECT SUM(byteSize(*)) AS estimate FROM ({query})z*SELECT COUNT(*) AS estimate FROM ({query})z{query} LIMIT {limit_row_count}zT
        {query}
        FETCH FIRST {fetch_row_count} {fetch_row_or_rows} ONLY
    z
        {query}
        OFFSET {offset_row_count} {offset_row_or_rows}
        FETCH NEXT {fetch_row_count} {fetch_row_or_rows} ONLY
    Ntabledsncolumnsfilterorder_byclient_settingsclient_kwargsc                     || _         || _        || _        || _        || _        |pi | _        |pi | _        |                                 | _        d S N)	_table_dsn_columns_filter	_order_by_client_settings_client_kwargs_generate_query_query)selfr"   r#   r$   r%   r&   r'   r(   s           r   __init__zClickHouseDatasource.__init__Y   sX     	! / 52+1r**,,r   c                 t    t          | dd           dd l} |j        d| j        | j        pi d| j        pi S )Nclickhouse_connectzclickhouse-connect)modulepackager   )r#   settingsr   )r
   r7   
get_clientr,   r0   r1   )r4   r7   s     r   _init_clientz!ClickHouseDatasource._init_clientl   si    d#7AUVVVV!!!!,!, 
	*0b
 
 !'R
 
 	
r   c                    | j         sd S t          | j                   st          d| j                    |                                 }	 | j                            | j        | j                   }|                    |           n,# t          $ r}t          d| j          d|           d }~ww xY w	 |	                                 d S # |	                                 w xY w)Nz9Invalid characters outside of string literals in filter: )r"   filter_clausezInvalid filter expression: z	. Error: )
r.   r   
ValueErrorr<   _EXPLAIN_FILTERS_QUERYformatr+   query	Exceptionclose)r4   client
test_queryes       r   _validate_filterz%ClickHouseDatasource._validate_filterv   s   | 	F%dl33 	=.2l= =  
 ""$$	4;;k"l <  J LL$$$$ 	 	 	HdlHHQHH  	 % LLNNNNNFLLNNNNs*   
;B C	 
B/B**B//C	 	Cr   c                    | j                             | j        rd                    | j                  nd| j                  }| j        r!|                                  |d| j         z  }| j        rg| j        \  }}|rdnd}t          |          dk    r|d|d	          | z  }n3t          |          dk    r d                    |          }|d
| d| z  }|S )Nz, *)select_clauser"   z WHERE z DESC    z
 ORDER BY r   z ORDER BY ())	_BASE_QUERYrA   r-   joinr+   r.   rH   r/   len)r4   rB   r$   desc	directioncolumns_clauses         r   r2   z$ClickHouseDatasource._generate_query   s	    ''6:mL$))DM222+ ( 
 
 < 	.!!###-t|---E> 	D NMGT#'/RI7||q  =gaj=)===W!!!%7!3!3C~CC	CCCr   limit_row_countoffset_row_countc                     |dk    r*| j                             | j        ||dk    rdnd          S | j                            | j        ||dk    rdnd||dk    rdnd          S )Nr   rM   ROWSROWrB   fetch_row_countfetch_row_or_rows)rB   rV   offset_row_or_rowsr[   r\   )_FIRST_BLOCK_QUERYrA   r3   _NEXT_BLOCK_QUERY)r4   rU   rV   s      r   _build_block_queryz'ClickHouseDatasource._build_block_query   s    q   *11k /,;a,?,?&&U 2    %,,+-)9A)=)=vv5+(7!(;(;ff - 
 
 	
r   rB   c                 <     dt           t                   f fd}|S )Nr   c                  0                                    gS r*   )_execute_block_query)rB   r4   s   r   read_fnz5ClickHouseDatasource._create_read_fn.<locals>.read_fn   s    --e4455r   )r   r   )r4   rB   rd   s   `` r   _create_read_fnz$ClickHouseDatasource._create_read_fn   s9    	6% 	6 	6 	6 	6 	6 	6 	6 r   c                    | j         5| j                            | j        | j        | j        dk    rdnd          }n&| j                            | j        | j                  }t          j        |                     |                    }t          j
        |                                |                                z            }|                                }||fS )NrM   rX   rY   rZ   )rB   rU   )r/   r^   rA   r3   NUM_SAMPLE_ROWS_SAMPLE_BLOCK_QUERYr   	for_blockrc   mathceil
size_bytesnum_rowsschema)r4   rB   sample_block_accessorestimated_size_bytes_per_rowsample_block_schemas        r   _get_sampled_estimatesz+ClickHouseDatasource._get_sampled_estimates   s    >% +22k $ 4,0,@1,D,D&&% 3  EE ,33k $ 4 4  E !. 7%%e,,!
 !
 (,y!,,..1F1O1O1Q1QQ(
 (
$ 4::<<+-@@@r   c                 6    |                      | j                  S r*   )_execute_estimate_query_COUNT_ESTIMATE_QUERYr4   s    r   _get_estimate_countz(ClickHouseDatasource._get_estimate_count   s    ++D,FGGGr   c                 6    |                      | j                  S r*   )rt   _SIZE_ESTIMATE_QUERYrv   s    r   _get_estimate_sizez'ClickHouseDatasource._get_estimate_size   s    ++D,EFFFr   estimate_queryc                     |                                  }	 |                    | j                  }|                    |          }|rSt	          |j                  dk    r;|j        d         d         }|t          |          nd 	 |                                 S n4# t          $ r'}t          
                    d|            Y d }~nd }~ww xY w|                                 n# |                                 w xY wd S )N)rB   r   z"Failed to execute estimate query: )r<   rA   r3   rB   rQ   result_rowsintrD   rC   loggerwarning)r4   r{   rE   rB   resultestimaterG   s          r   rt   z,ClickHouseDatasource._execute_estimate_query   s   ""$$	 #)))<<E\\%((F G#f011A55!-a03(0(<s8}}}$F LLNNNN  	E 	E 	ENNCCCDDDDDDDD	E LLNNNNFLLNNNNts0   A/B C% 
C&CC% CC% %C;c                    dd l }|                                 }	 |                    |          5 }t          |          }d d d            n# 1 swxY w Y   |j                            |          |                                 S # t          $ r}t          d|           d }~ww xY w# |                                 w xY w)Nr   zFailed to execute block query: )	pyarrowr<   query_arrow_streamlistTablefrom_batchesrD   rC   RuntimeError)r4   rB   parE   streamrecord_batchesrG   s          r   rc   z)ClickHouseDatasource._execute_block_query   s
   ""$$	**511 .V!%f. . . . . . . . . . . . . . .8((88 LLNNNN  	F 	F 	FDDDEEE	F LLNNNNsE   B AB AB AB 
B%B  B%%B( (B>c                 *    |                                  S )z
        Estimate the in-memory data size for the query.

        Returns:
            Estimated in-memory data size in bytes, or
            None if the estimation cannot be performed.
        )rz   rv   s    r   estimate_inmemory_data_sizez0ClickHouseDatasource.estimate_inmemory_data_size   s     &&(((r   parallelismper_task_row_limitc                                                       }|dk    s|g S t          |t          j        | j        z                      } j        "|dk    rt                              d           d} j        "|dk    rt                              d           d}||z  }||z  } 	                                \  dt          dt          dt          d	t          f fd
}|dk    r ||dd          gS g }d}t          |          D ]4}	|}
|	|k     r|
dz  }
|                     ||
|d                     ||
z  }5|S )au  
        Create read tasks for the ClickHouse query.

        Args:
            parallelism: The desired number of partitions to read the data into.
                - If ``order_by`` is not set, parallelism will be forced to 1.
                - If ``filter`` is set, parallelism will also be forced to 1
                    to ensure deterministic results.
            per_task_row_limit: Maximum number of rows allowed in each emitted
                task.  Blocks larger than this limit will be sliced before
                being yielded downstream.

        Returns:
            A list of read tasks to be executed.
        r   NrM   zwClickHouse datasource does not currently support parallel reads when a filter is set; falling back to parallelism of 1.zyClickHouse datasource requires dataset to be explicitly ordered to support parallelism; falling back to parallelism of 1.
block_rowsoffset_rowsparallelizedr   c           	          |r                     | |          }nj        }t                              |          t	          | | z  d d                     S )N)rm   rl   input_files
exec_stats)rn   r   )r`   r3   r   re   r   )r   r   r   rB   rp   r   rq   r4   s       r   _get_read_taskz;ClickHouseDatasource.get_read_tasks.<locals>._get_read_task;  s      $ //
KHH $$U++';jH $#	   +#5
 
 
 
r   FT)rw   minrj   rk   MIN_ROWS_PER_READ_TASKr.   r   r   r/   rr   r~   boolr   rangeappend)r4   r   r   num_rows_totalnum_rows_per_blocknum_blocks_with_extra_rowr   
read_tasksoffsetithis_block_sizerp   rq   s   ` `        @@r   get_read_tasksz#ClickHouseDatasource.get_read_tasks  s   $ 1133Q."8I>D4O#OPP
 
 <#aNNJ   K >!kAooNNL   K ,{:$2[$@! ''))	
(		*-	=A		 	 	 	 	 	 	 	 	. ! #N>1e<<==
 
{## 	& 	&A0O,,,1$nn_fdKKLLLo%FFr   )NNNNNr*   )&__name__
__module____qualname____doc__rg   r   rO   r@   ry   ru   rh   r^   r_   strr   r   r	   r   r   r   r5   r<   rH   r2   r~   r`   r   r   r   re   rr   rw   rz   rt   rc   r   r   r   r   r   r   r   r   %   s        B O7KROH; (, $594826- -- - $s)$	-
 - 5cD12- "$sCx.1-  S#X/- - - -&
 
 
  0    $
# 
 
QT 
 
 
 
& 
"huo%	&   A A A4HXc] H H H HGHSM G G G Gc hsm    "# %    )Xc] ) ) ) ) EI_ __4<SM_	h_ _ _ _ _ _r   r   )loggingrj   typingr   r   r   r   r   r   r	   ray.data._internal.utilr
   ray.data.blockr   r   r   ray.data.datasource.datasourcer   r   ray.util.annotationsr   	getLoggerr   r   r   r   r   r   r   r   r   <module>r      s%     G G G G G G G G G G G G G G G G G G 1 1 1 1 1 1 > > > > > > > > > > ? ? ? ? ? ? ? ? - - - - - -		8	$	$s t    0 } } } } }: } } } } }r   