
    &`i!                     t   d dl Z d dlmZmZmZ d dlZd dlmZ d dlm	Z	m
Z
mZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ  e j        e          Zeddd	dd
d
d
ddedee         dee         dedee         dedededeee         eee         ef         fd            Zededee         defd            ZdS )    N)ListOptionalUnion)EnvRunnerGroup)DEFAULT_POLICY_IDSampleBatchconcat_samples)ExperimentalAPIOldAPIStack)NUM_AGENT_STEPS_SAMPLEDNUM_ENV_STEPS_SAMPLED)standardized)EpisodeTypeSampleBatchTypeTF)max_agent_stepsmax_env_stepsconcatsample_timeout_srandom_actions_uses_new_env_runners_return_metrics
worker_setr   r   r   r   r   r   r   returnc                    ||J d}|p|pd}	g }
g }|si nddi|	|dk    s
|	||	k     r|                                  dk    r0 | j        j        di g}|r| j                                        g}n|                     |sfdnfdd|          }|r|                                 dk    rQ|st                              d	           n2|                                 dk    rt                              d
           n|rd |D             }d |D             }|rf|r|t          d |D                       z  }n|t          d |D                       z  }|
	                    |           |	                    |           n|D ]}|r3||rt          d |D                       n|
                                z  }n2||rt          d |D                       n|                                z  }|
                    |           |	||	k    r n|	|dk    |	||	k     |du r&|rt          j        |
          }
nt          |
          }
|r|
|fS |
S )aw  Runs parallel and synchronous rollouts on all remote workers.

    Waits for all workers to return from the remote calls.

    If no remote workers exist (num_workers == 0), use the local worker
    for sampling.

    Alternatively to calling `worker.sample.remote()`, the user can provide a
    `remote_fn()`, which will be applied to the worker(s) instead.

    Args:
        worker_set: The EnvRunnerGroup to use for sampling.
        remote_fn: If provided, use `worker.apply.remote(remote_fn)` instead
            of `worker.sample.remote()` to generate the requests.
        max_agent_steps: Optional number of agent steps to be included in the
            final batch or list of episodes.
        max_env_steps: Optional number of environment steps to be included in the
            final batch or list of episodes.
        concat: Whether to aggregate all resulting batches or episodes. in case of
            batches the list of batches is concatinated at the end. in case of
            episodes all episode lists from workers are flattened into a single list.
        sample_timeout_s: The timeout in sec to use on the `foreach_env_runner` call.
            After this time, the call will return with a result (or not if all
            EnvRunners are stalling). If None, will block indefinitely and not timeout.
        _uses_new_env_runners: Whether the new `EnvRunner API` is used. In this case
            episodes instead of `SampleBatch` objects are returned.

    Returns:
        The list of collected sample batch types or episode types (one for each parallel
        rollout worker in the given `worker_set`).

    .. testcode::

        # Define an RLlib Algorithm.
        from ray.rllib.algorithms.ppo import PPO, PPOConfig
        config = (
            PPOConfig()
            .environment("CartPole-v1")
        )
        algorithm = config.build()
        # 2 remote EnvRunners (num_env_runners=2):
        episodes = synchronous_parallel_sample(
            worker_set=algorithm.env_runner_group,
            _uses_new_env_runners=True,
            concat=False,
        )
        print(len(episodes))

    .. testoutput::

        2
    Nr   r   Tc                      | j         di S N )samplewrandom_action_kwargss    s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/execution/rollout_ops.py<lambda>z-synchronous_parallel_sample.<locals>.<lambda>m   s    xqx??*>??     c                 F     | j         di |                                 fS r   )r   get_metricsr   s    r"   r#   z-synchronous_parallel_sample.<locals>.<lambda>o   s'    XQX%E%E0D%E%Eq}}$W r$   F)local_env_runnertimeout_secondszNo samples returned from remote workers. If you have a slow environment or model, consider increasing the `sample_timeout_s` or decreasing the `rollout_fragment_length` in `AlgorithmConfig.env_runners().z=No healthy remote workers left. Trying to restore workers ...c                     g | ]
}|d          S )   r   .0ss     r"   
<listcomp>z/synchronous_parallel_sample.<locals>.<listcomp>   s    :::qt:::r$   c                     g | ]
}|d          S )r   r   r+   s     r"   r.   z/synchronous_parallel_sample.<locals>.<listcomp>   s    ;;;!;;;r$   c              3   x   K   | ]5}|t                                                    D ]}t          |          V  6d S N)r   valuesint)r,   	stat_dict
agent_stats      r"   	<genexpr>z.synchronous_parallel_sample.<locals>.<genexpr>   se       * *!&/0G&H&O&O&Q&Q* * # 
OO* * * * * * *r$   c              3   J   K   | ]}t          |t                             V  d S r1   )r3   r   )r,   r4   s     r"   r6   z.synchronous_parallel_sample.<locals>.<genexpr>   sA       * *>GC	"7899* * * * * *r$   c              3   >   K   | ]}|                                 V  d S r1   )agent_stepsr,   es     r"   r6   z.synchronous_parallel_sample.<locals>.<genexpr>   s*      FFAMMOOFFFFFFr$   c              3   >   K   | ]}|                                 V  d S r1   )	env_stepsr:   s     r"   r6   z.synchronous_parallel_sample.<locals>.<genexpr>   s*      DDaAKKMMDDDDDDr$   r   )num_remote_workersr'   r   r&   foreach_env_runnernum_healthy_remote_workersloggerwarningsumextendr9   r=   appendtreeflattenr	   )r   r   r   r   r   r   r   r   agent_or_env_stepsmax_agent_or_env_stepssample_batches_or_episodesall_stats_dictssampled_datastats_dictsbatch_or_episoder!   s                  @r"   synchronous_parallel_samplerO      s   B  +0I0IJ,EE!#O%3Q22:JD9Q ").@A.E.E*!777 ((**a//>J7>VVAUVVWL J):FFHHI &88 +Y?????WWWW!& 0 9  L   :#H#H#J#Ja#O#O# 
NNW     ::<<AANNW    <::\:::;;l;;;  $	 	"c * *%0* * * ' ' "" #c * *KV* * * ' ' " '--l;;;"";////$0   " &0<FF5EFFFFFF-99;;&& '0:DD3CDDDDDD-7799&
 +112BCCC +6*.DDDEa ").@A.E.E*!777` ~~  	T)-6P)Q)Q&& *88R)S)S& ;)?::%%r$   samplesfieldsc                     d}t          | t                    r|                                 } d}| j        D ]0}| j        |         }|D ]}||v rt	          ||                   ||<   1|r| j        t
                   } | S )z+Standardize fields of the given SampleBatchFT)
isinstancer   as_multi_agentpolicy_batchesr   r   )rP   rQ   wrapped	policy_idbatchfields         r"   standardize_fieldsrZ      s     G';'' ((**+ : :	&y1 	: 	:E~~+E%L99e	:  <():;Nr$   ) loggingtypingr   r   r   rF   ray.rllib.env.env_runner_groupr   ray.rllib.policy.sample_batchr   r   r	   ray.rllib.utils.annotationsr
   r   ray.rllib.utils.metricsr   r   ray.rllib.utils.sgdr   ray.rllib.utils.typingr   r   	getLogger__name__rA   r3   boolfloatrO   strrZ   r   r$   r"   <module>rh      s    ( ( ( ( ( ( ( ( ( (  9 9 9 9 9 9         
 E D D D D D D D R R R R R R R R , , , , , , ? ? ? ? ? ? ? ?		8	$	$  &*#'(, "'!f& f& f&f& c]f& C=	f&
 f& uof& f&  f& f& 4 /43DkQRf& f& f& f&R  c       r$   