
    &`i                         d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZmZ d dlmZmZ eZeZ e j        e          ZdefdZddZdd	Zed
eg ef         dee         fd            Z G d de          ZdS )    N)contextmanager)AnyCallableIterableIteratorListOptional)BlockBlockMetadata)
DatasourceReadTaskreturnc                     dd l }|                                 d | j        D             }fdt          |          D             }|j                            |          S )Nr   c                     g | ]
}|d          S )r    ).0column_descriptions     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/sql_datasource.py
<listcomp>z$_cursor_to_block.<locals>.<listcomp>   s    RRR);!!$RRR    c                 6    i | ]\  }|fd D             S )c                      g | ]
}|         S r   r   )r   rowis     r   r   z/_cursor_to_block.<locals>.<dictcomp>.<listcomp>   s    ...#s1v...r   r   )r   columnr   rowss     @r   
<dictcomp>z$_cursor_to_block.<locals>.<dictcomp>   s5    SSS91ff.......SSSr   )pyarrowfetchalldescription	enumerateTablefrom_pydict)cursorpacolumnspydictr   s       @r   _cursor_to_blockr(      sn    ??D SRv?QRRRGSSSS	'@R@RSSSF8'''r   c                 V    dD ]%}t          | |          st          d|d          &d S )N)closecommitr$   zBYour `connection_factory` created a `Connection` object without a  method, but this method is required by the Python DB API2 specification. Check that your database connector is DB API2-compliant. To learn more, read https://peps.python.org/pep-0249/.hasattr
ValueError)
connectionattrs     r   %_check_connection_is_dbapi2_compliantr2      s_    +  z4(( 	TT T T  	 r   c                 V    dD ]%}t          | |          st          d|d          &d S )N)executeexecutemanyfetchoner   r    z<Your database connector created a `Cursor` object without a r,   r-   )r$   r1   s     r   !_check_cursor_is_dbapi2_compliantr7   %   sb     P  vt$$ 	TT T T  	 r   connection_factoryc              #     K    |             }t          |           	 |                                }t          |           |V  |                                 nb# t          $ rU 	 |                                 n=# t          $ r0}t          |t                    s|j        j	        dk    r	 Y d }~nd }~ww xY w w xY w	 |
                                 d S # |
                                 w xY w)NNotSupportedError)r2   r$   r7   r+   	Exceptionrollback
isinstanceAttributeError	__class____name__r*   )r8   r0   r$   es       r   _connectrB   2   s!     ##%%J)*555""$$)&111   		!!!! 	 	 	 1n--;'+>>>	 	 	 	
sG   ;A C 
B8$A98B89
B3&B.)B8.B33B88C C(c                       e Zd ZdZ	 ddedeg ef         dedeee                  fdZ	dee
         fd	Zd
e
defdZ	 dd
e
dee
         dee         fdZde
fdZde
d
e
fdZdS )SQLDatasource2   Nsqlr8   shard_hash_fn
shard_keysc                     || _         |r2t          |          dk    rdd                    |           d| _        n+|r"t          |          dk    r|d          | _        nd | _        || _        || _        d S )N   zCONCAT(,)r   )rF   lenjoinrH   rG   r8   )selfrF   r8   rG   rH   s        r   __init__zSQLDatasource.__init__P   s      	##j//A--?(<(<???DOO 	#C
OOq00!+A0DOO"DO*"4r   r   c                     d S Nr   )rO   s    r   estimate_inmemory_data_sizez)SQLDatasource.estimate_inmemory_data_sizea   s    tr   parallelismc           	      n   |dk    s| j         dS | j        }d| j         d| d| j          d| d	}	 t          | j                  5 }|                    |           ddd           n# 1 swxY w Y   d	S # t          $ r6}t                              d
t          |           d           Y d}~dS d}~ww xY w)zCheck if database supports sharding with MOD/ABS/CONCAT operations.

        Returns:
            bool: True if sharding is supported, False otherwise.
        rJ   NFzSELECT COUNT(1) FROM () as T WHERE MOD(ABS(()), z) = 0Tz$Database does not support sharding: .)
rH   rG   rF   rB   r8   r4   r;   loggerinfostr)rO   rT   hash_fnqueryr$   rA   s         r   supports_shardingzSQLDatasource.supports_shardingd   sD    !t65 $PTX P P%P P(,P P=HP P P 		$122 &fu%%%& & & & & & & & & & & & & & &4 	 	 	KKHs1vvHHHIII55555	s;   A4 A'A4 'A++A4 .A+/A4 4
B4>+B//B4per_task_row_limitc                 Z    dt           t                   f fd}                     |          s=t                              d           t          d d d d           }t          ||          gS                                  }|dk    rg S t          |t          j
        | j        z                      }||z  }||z  }g }t          |          D ]]}	|}
|	|k     r|
dz  }
                     |	|          }t          |
d d d           }|                    t          |||                     ^|S )Nr   c                      t          j                  5 } |                     j                   t	          |           gcddd           S # 1 swxY w Y   dS )z?Read all data in a single block when sharding is not supported.N)rB   r8   r4   rF   r(   )r$   rO   s    r   fallback_read_fnz6SQLDatasource.get_read_tasks.<locals>.fallback_read_fn   s    $122 2ftx((((0012 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2s   *AAAzMSharding is not supported. Falling back to reading all data in a single task.r   rJ   )num_rows
size_bytesinput_files
exec_stats)r`   )r   r
   r_   rZ   r[   r   r   _get_num_rowsminmathceilMIN_ROWS_PER_READ_TASKrange_create_parallel_read_fnappend)rO   rT   r`   rc   metadatanum_rows_totalnum_rows_per_blocknum_blocks_with_extra_rowtasksr   rd   read_fns   `           r   get_read_taskszSQLDatasource.get_read_tasks|   s   	2(5/ 	2 	2 	2 	2 	2 	2 %%k22 	:KKE   %T4t<<H-x8899 ++--QI>D4O#OPP
 
 ,{:$2[$@!{## 	 	A)H,,,A33A{CCG$! 	  H LL(?QRRR    r   c                     t          | j                  5 }|                    d| j         d           |                                d         cd d d            S # 1 swxY w Y   d S )NzSELECT COUNT(*) FROM (z) as Tr   )rB   r8   r4   rF   r6   )rO   r$   s     r   rh   zSQLDatasource._get_num_rows   s    d-.. 	(&NNDDHDDDEEE??$$Q'	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	( 	(s   8AA!Atask_idc           
            j         }d j         d| d j         d| d| 
dt          t                   f fd}|S )NzSELECT * FROM (rV   rW   rX   z) = r   c                      t          j                  5 } |                                t          |           }|gcd d d            S # 1 swxY w Y   d S rR   )rB   r8   r4   r(   )r$   blockr^   rO   s     r   ru   z7SQLDatasource._create_parallel_read_fn.<locals>.read_fn   s    $122 fu%%%(00w                 s   'A

AA)rG   rF   rH   r   r
   )rO   rx   rT   r]   ru   r^   s   `    @r   rn   z&SQLDatasource._create_parallel_read_fn   s    $Wdh W W$W W'+W W<GW WMTW W 	
	% 	 	 	 	 	 	 	 r   rR   )r@   
__module____qualname__rl   r\   r   
Connectionr	   r   rP   intrS   boolr_   r   rv   rh   rn   r   r   r   rD   rD   M   s/        +/5 55 %R^45 	5
 T#Y'5 5 5 5"Xc]    S T    2 EI/ //4<SM/	h/ / / /b(s ( ( ( (
 #      r   rD   )r   N)loggingrj   
contextlibr   typingr   r   r   r   r   r	   ray.data.blockr
   r   ray.data.datasource.datasourcer   r   r~   Cursor	getLoggerr@   rZ   r(   r2   r7   rB   rD   r   r   r   <module>r      sb     % % % % % % D D D D D D D D D D D D D D D D / / / / / / / / ? ? ? ? ? ? ? ?
			8	$	$( ( ( ( (   
 
 
 
 "j.!9 hv>N    4r r r r rJ r r r r rr   