
    &`iQ                     &   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dl m!Z!m"Z"m#Z#m$Z$ d dl%m&Z&  e&d           G d dee j'                              Z(dS )    N)defaultdict)	AnyCallable
CollectionDictIteratorListOptionalTupleUnion)RLModule)SingleAgentEpisode)
force_list)OverrideToImplementCustomLogicoverride)Checkpointable)MetricsLoggerBatchedNdArray)AgentIDEpisodeTypeModuleID	StateDict)	PublicAPIalpha)	stabilityc                   r   e Zd ZdZ	 	 d2deej                 deej                 fdZedej        dej        dej        fd            Z	edej        dej        dej        fd            Z
ej        dddd	d
edeeef         dee         dee         dee         dee         defd            Ze	 	 d3dee         dedeeee         eeef         f                  dee         fd            Ze	 d4deeef         dededee         ddf
d            Ze	 d4deeef         dedededee         ddfd            Zedeeef         deeee         ee         f         de eee         ee!         ee"         gef         ddfd            Z#edeeee"ef         f         dee"eeef         f         fd            Z$ e%e&          	 d4ddd eeee'e         f                  d!eeee'e         f                  de(fd"            Z) e%e&          d#e(ddfd$            Z* e%e&          deeeeef         f         fd%            Z+d5d&Z,d'eeeef                  deeef         fd(Z-e.d)             Z/e.d*             Z0e.d+             Z1e1j2        d,             Z1e.d-             Z3e3j2        d.             Z3d6d0efd1Z4dS )7ConnectorV2a  Base class defining the API for an individual "connector piece".

    A ConnectorV2 ("connector piece") is usually part of a whole series of connector
    pieces within a so-called connector pipeline, which in itself also abides to this
    very API.
    For example, you might have a connector pipeline consisting of two connector pieces,
    A and B, both instances of subclasses of ConnectorV2 and each one performing a
    particular transformation on their input data. The resulting connector pipeline
    (A->B) itself also abides to this very ConnectorV2 API and could thus be part of yet
    another, higher-level connector pipeline, e.g. (A->B)->C->D.

    Any ConnectorV2 instance (individual pieces or several connector pieces in a
    pipeline) is a callable and users should override the `__call__()` method.
    When called, they take the outputs of a previous connector piece (or an empty dict
    if there are no previous pieces) and all the data collected thus far in the
    ongoing episode(s) (only applies to connectors used in EnvRunners) or retrieved
    from a replay buffer or from an environment sampling step (only applies to
    connectors used in Learner pipelines). From this input data, a ConnectorV2 then
    performs a transformation step.

    There are 3 types of pipelines any ConnectorV2 piece can belong to:
    1) EnvToModulePipeline: The connector transforms environment data before it gets to
    the RLModule. This type of pipeline is used by an EnvRunner for transforming
    env output data into RLModule readable data (for the next RLModule forward pass).
    For example, such a pipeline would include observation postprocessors, -filters,
    or any RNN preparation code related to time-sequences and zero-padding.
    2) ModuleToEnvPipeline: This type of pipeline is used by an
    EnvRunner to transform RLModule output data to env readable actions (for the next
    `env.step()` call). For example, in case the RLModule only outputs action
    distribution parameters (but not actual actions), the ModuleToEnvPipeline would
    take care of sampling the actions to be sent back to the end from the
    resulting distribution (made deterministic if exploration is off).
    3) LearnerConnectorPipeline: This connector pipeline type transforms data coming
    from an `EnvRunner.sample()` call or a replay buffer and will then be sent into the
    RLModule's `forward_train()` method in order to compute loss function inputs.
    This type of pipeline is used by a Learner worker to transform raw training data
    (a batch or a list of episodes) to RLModule readable training data (for the next
    RLModule `forward_train()` call).

    Some connectors might be stateful, for example for keeping track of observation
    filtering stats (mean and stddev values). Any Algorithm, which uses connectors is
    responsible for frequently synchronizing the states of all connectors and connector
    pipelines between the EnvRunners (owning the env-to-module and module-to-env
    pipelines) and the Learners (owning the Learner pipelines).
    Ninput_observation_spaceinput_action_spacec                 h   d| _         d| _        d| _        d| _        || _        || _        | j        j                            d          St          j
                    d         j        }t          j        |          }|j        fd|j        D             | _        dS | j        | j        d| _        dS )a  Initializes a ConnectorV2 instance.

        Args:
            input_observation_space: The (optional) input observation space for this
                connector piece. This is the space coming from a previous connector
                piece in the (env-to-module or learner) pipeline or is directly
                defined within the gym.Env.
            input_action_space: The (optional) input action space for this connector
                piece. This is the space coming from a previous connector piece in the
                (module-to-env) pipeline or is directly defined within the gym.Env.
            **kwargs: Forward API-compatibility kwargs.
        N__init__   c                 .    i | ]}|d k    ||         S self ).0argcaller_localss     u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/connectors/connector_v2.py
<dictcomp>z(ConnectorV2.__init__.<locals>.<dictcomp>r   s,     ! ! !,/C6MM]3'MMM    )r   r    )_observation_space_action_space_input_observation_space_input_action_spacer    r   	__class____dict__getinspectstackframegetargvalueslocalsargs_ctor_kwargs)r&   r   r    kwargscaller_framearg_infor*   s         @r+   r"   zConnectorV2.__init__N   s    $ #'!(,%#' "4'>$
 >"&&z22>"=??1-3L+L99H  ! ! ! !3;=! ! !D
 ,0+G&*&=! !Dr-   returnc                     | j         S )aT	  Re-computes a new (output) observation space based on the input spaces.

        This method should be overridden by users to make sure a ConnectorPipelineV2
        knows how the input spaces through its individual ConnectorV2 pieces are being
        transformed.

        .. testcode::

            from gymnasium.spaces import Box, Discrete
            import numpy as np

            from ray.rllib.connectors.connector_v2 import ConnectorV2
            from ray.rllib.utils.numpy import one_hot
            from ray.rllib.utils.test_utils import check

            class OneHotConnector(ConnectorV2):
                def recompute_output_observation_space(
                    self,
                    input_observation_space,
                    input_action_space,
                ):
                    return Box(0.0, 1.0, (input_observation_space.n,), np.float32)

                def __call__(
                    self,
                    *,
                    rl_module,
                    batch,
                    episodes,
                    explore=None,
                    shared_data=None,
                    metrics=None,
                    **kwargs,
                ):
                    assert "obs" in batch
                    batch["obs"] = one_hot(batch["obs"])
                    return batch

            connector = OneHotConnector(input_observation_space=Discrete(2))
            batch = {"obs": np.array([1, 0, 0], np.int32)}
            output = connector(rl_module=None, batch=batch, episodes=None)

            check(output, {"obs": np.array([[0.0, 1.0], [1.0, 0.0], [1.0, 0.0]])})

        If this ConnectorV2 does not change the observation space in any way, leave
        this parent method implementation untouched.

        Args:
            input_observation_space: The input observation space (either coming from the
                environment if `self` is the first connector piece in the pipeline or
                from the previous connector piece in the pipeline).
            input_action_space: The input action space (either coming from the
                environment if `self` is the first connector piece in the pipeline or
                from the previous connector piece in the pipeline).

        Returns:
            The new observation space (after data has passed through this ConnectorV2
            piece).
        )r   r&   r   r    s      r+   "recompute_output_observation_spacez.ConnectorV2.recompute_output_observation_space{   s    B ++r-   c                     | j         S )a  Re-computes a new (output) action space based on the input space.

        This method should be overridden by users to make sure a ConnectorPipelineV2
        knows how the input spaces through its individual ConnectorV2 pieces are being
        transformed.

        If this ConnectorV2 does not change the action space in any way, leave
        this parent method implementation untouched.

        Args:
            input_observation_space: The input observation space (either coming from the
                environment if `self` is the first connector piece in the pipeline or
                from the previous connector piece in the pipeline).
            input_action_space: The input action space (either coming from the
                environment if `self` is the first connector piece in the pipeline or
                from the previous connector piece in the pipeline).

        Returns:
            The new action space (after data has passed through this ConenctorV2
            piece).
        )r    rA   s      r+   recompute_output_action_spacez)ConnectorV2.recompute_output_action_space   s    6 &&r-   )exploreshared_datametrics	rl_modulebatchepisodesrE   rF   rG   c                    dS )aM  Method for transforming an input `batch` into an output `batch`.

        Args:
            rl_module: The RLModule object that the connector connects to or from.
            batch: The input data to be transformed by this connector. Transformations
                might either be done in-place or a new structure may be returned.
                Note that the information in `batch` will eventually either become the
                forward batch for the RLModule (env-to-module and learner connectors)
                or the input to the `env.step()` call (module-to-env connectors). Note
                that in the first case (`batch` is a forward batch for RLModule), the
                information in `batch` will be discarded after that RLModule forward
                pass. Any transformation of information (e.g. observation preprocessing)
                that you have only done inside `batch` will be lost, unless you have
                written it back into the corresponding `episodes` during the connector
                pass.
            episodes: The list of SingleAgentEpisode or MultiAgentEpisode objects,
                each corresponding to one slot in the vector env. Note that episodes
                can be read from (e.g. to place information into `batch`), but also
                written to. You should only write back (changed, transformed)
                information into the episodes, if you want these changes to be
                "permanent". For example if you sample from an environment, pick up
                observations from the episodes and place them into `batch`, then
                transform these observations, and would like to make these
                transformations permanent (note that `batch` gets discarded after the
                RLModule forward pass), then you have to write the transformed
                observations back into the episode to make sure you do not have to
                perform the same transformation again on the learner (or replay buffer)
                side. The Learner will hence work on the already changed episodes (and
                compile the train batch using the Learner connector).
            explore: Whether `explore` is currently on. Per convention, if True, the
                RLModule's `forward_exploration` method should be called, if False, the
                EnvRunner should call `forward_inference` instead.
            shared_data: Optional additional context data that needs to be exchanged
                between different ConnectorV2 pieces (in the same pipeline) or across
                ConnectorV2 pipelines (meaning between env-to-module and module-to-env).
            metrics: Optional MetricsLogger instance to log custom metrics to.
            kwargs: Forward API-compatibility kwargs.

        Returns:
            The transformed connector output.
        Nr'   )r&   rH   rI   rJ   rE   rF   rG   r<   s           r+   __call__zConnectorV2.__call__   s      r-   Tagents_that_stepped_onlyzip_with_batch_columnc              #     K   t          t                    }| rt          | d         t                    r|t	          |          t	          |           k    r0t          dt	          |            dt	          |           d          t          |t                    rt          | |          D ]\  }}||fV  nlt          | |                                          D ]?\  }\  }}|j	        |d         k    sJ |||                  }||xx         dz  cc<   ||fV  @n	| D ]}|V  dS | D ]}|r|
                                n|j        D ]}|j        |         }	|m|	j        |	j        |	j        f}
t	          ||
                   ||
         k    rt          d          ||
         ||
                  }||
xx         dz  cc<   |	|fV  ~|	V  dS )aX  An iterator over a list of episodes yielding always SingleAgentEpisodes.

        In case items in the list are MultiAgentEpisodes, these are broken down
        into their individual agents' SingleAgentEpisodes and those are then yielded
        one after the other.

        Useful for connectors that operate on both single-agent and multi-agent
        episodes.

        Args:
            episodes: The list of SingleAgent- or MultiAgentEpisode objects.
            agents_that_stepped_only: If True (and multi-agent setup), will only place
                items of those agents into the batch that have just stepped in the
                actual MultiAgentEpisode (this is checked via a
                `MultiAgentEpside.episode.get_agents_to_act()`). Note that this setting
                is ignored in a single-agent setups b/c the agent steps at each timestep
                regardless.
            zip_with_batch_column: If provided, must be a list of batch items
                corresponding to the given `episodes` (single agent case) or a dict
                mapping (AgentID, ModuleID) tuples to lists of individual batch items
                corresponding to this agent/module combination. The iterator will then
                yield tuples of SingleAgentEpisode objects (1st item) along with the
                data item (2nd item) that this episode was responsible for generating
                originally.

        Yields:
            All SingleAgentEpisodes in the input list, whereby MultiAgentEpisodes will
            be broken down into their individual SingleAgentEpisode components.
        r   NzYInvalid `zip_with_batch_column` data: Must have the same length as the list of episodes (z), but has length !r#   zInvalid `zip_with_batch_column` data: Must structurally match the single-agent contents in the given list of (multi-agent) episodes!)r   int
isinstancer   len
ValueErrorlistzipitemsid_get_agents_that_stepped	agent_idsagent_episodesmulti_agent_episode_idagent_id	module_id)rJ   rM   rN   list_indicesepisodedataeps_id_tupledr]   
sa_episodekeys              r+   single_agent_episode_iteratorz)ConnectorV2.single_agent_episode_iterator  s     F #3''  	
8A;0BCC 	$0,--X>>$@;>x==@ @"%&;"<"<@ @ @   3T:: )),X7L)M)M , ,%tm++++, := -3355: : ) )5!5,  '{l1o==== l!;<$\222a7222%qj(((()  ( " "G!MMMMF   	% 	%G ,'//111&% %
 %3H=
(4"9"+",C
 0566,s:KKK(6  
 .c2<3DEA %%%*%%%$a-''''$$$$$/%	% 	%r-   columnitem_to_addsingle_agent_episodec                 V   d}|@|j         }||| v rt          d          |j        |j        |j        |j         f}n|j        f}|| vr|g n|g i| |<   |8|| |         vrg | |         |<   | |         |                             |           dS | |                             |           dS )a>  Adds a data item under `column` to the given `batch`.

        The `item_to_add` is stored in the `batch` in the following manner:
        1) If `single_agent_episode` is not provided (None), will store the item in a
        list directly under `column`:
        `column` -> [item, item, ...]
        2) If `single_agent_episode`'s `agent_id` and `module_id` properties are None
        (`single_agent_episode` is not part of a multi-agent episode), will append
        `item_to_add` to a list under a `(<episodeID>,)` key under `column`:
        `column` -> `(<episodeID>,)` -> [item, item, ...]
        3) If `single_agent_episode`'s `agent_id` and `module_id` are NOT None
        (`single_agent_episode` is part of a multi-agent episode), will append
        `item_to_add` to a list under a `(<episodeID>,<AgentID>,<ModuleID>)` key
        under `column`:
        `column` -> `(<episodeID>,<AgentID>,<ModuleID>)` -> [item, item, ...]

        See the these examples here for clarification of these three cases:

        .. testcode::

            from ray.rllib.connectors.connector_v2 import ConnectorV2
            from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
            from ray.rllib.env.single_agent_episode import SingleAgentEpisode
            from ray.rllib.utils.test_utils import check

            # 1) Simple case (no episodes provided) -> Store data in a list directly
            # under `column`:
            batch = {}
            ConnectorV2.add_batch_item(batch, "test_col", item_to_add=5)
            ConnectorV2.add_batch_item(batch, "test_col", item_to_add=6)
            check(batch, {"test_col": [5, 6]})
            ConnectorV2.add_batch_item(batch, "test_col_2", item_to_add=-10)
            check(batch, {
                "test_col": [5, 6],
                "test_col_2": [-10],
            })

            # 2) Single-agent case (SingleAgentEpisode provided) -> Store data in a list
            # under the keys: `column` -> `(<eps_id>,)` -> [...]:
            batch = {}
            episode = SingleAgentEpisode(
                id_="SA-EPS0",
                observations=[0, 1, 2, 3],
                actions=[1, 2, 3],
                rewards=[1.0, 2.0, 3.0],
            )
            ConnectorV2.add_batch_item(batch, "test_col", 5, episode)
            ConnectorV2.add_batch_item(batch, "test_col", 6, episode)
            ConnectorV2.add_batch_item(batch, "test_col_2", -10, episode)
            check(batch, {
                "test_col": {("SA-EPS0",): [5, 6]},
                "test_col_2": {("SA-EPS0",): [-10]},
            })

            # 3) Multi-agent case (SingleAgentEpisode provided that has `agent_id` and
            # `module_id` information) -> Store data in a list under the keys:
            # `column` -> `(<episodeID>,<AgentID>,<ModuleID>)` -> [...]:
            batch = {}
            ma_episode = MultiAgentEpisode(
                id_="MA-EPS1",
                observations=[
                    {"ag0": 0, "ag1": 1}, {"ag0": 2, "ag1": 4}
                ],
                actions=[{"ag0": 0, "ag1": 1}],
                rewards=[{"ag0": -0.1, "ag1": -0.2}],
                # ag0 maps to mod0, ag1 maps to mod1, etc..
                agent_to_module_mapping_fn=lambda aid, eps: f"mod{aid[2:]}",
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col",
                item_to_add=5,
                single_agent_episode=ma_episode.agent_episodes["ag0"],
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col",
                item_to_add=6,
                single_agent_episode=ma_episode.agent_episodes["ag0"],
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col_2",
                item_to_add=10,
                single_agent_episode=ma_episode.agent_episodes["ag1"],
            )
            check(
                batch,
                {
                    "test_col": {("MA-EPS1", "ag0", "mod0"): [5, 6]},
                    "test_col_2": {("MA-EPS1", "ag1", "mod1"): [10]},
                },
            )

        Args:
            batch: The batch to store `item_to_add` in.
            column: The column name (str) within the `batch` to store `item_to_add`
                under.
            item_to_add: The data item to store in the batch.
            single_agent_episode: An optional SingleAgentEpisode.
                If provided and its `agent_id` and `module_id` properties are None,
                creates a further sub dictionary under `column`, mapping from
                `(<episodeID>,)` to a list of data items (to which `item_to_add` will
                be appended in this call).
                If provided and its `agent_id` and `module_id` properties are NOT None,
                creates a further sub dictionary under `column`, mapping from
                `(<episodeID>,,<AgentID>,<ModuleID>)` to a list of data items (to which
                `item_to_add` will be appended in this call).
                If not provided, will append `item_to_add` to a list directly under
                `column`.
        NzCan't call `add_batch_item` on a `batch` that is already module-major (meaning ModuleID is top-level with column names on the level thereunder)! Make sure to only call `add_batch_items` before the `AgentToModuleMapping` ConnectorV2 piece is applied.)r^   rT   r]   r\   rX   append)rI   rg   rh   ri   sub_keyr^   s         r+   add_batch_itemzConnectorV2.add_batch_itemp  s    l +,6I $e);); V   &.:(?(1(2 035")/BB}E&MeFm++)+fg&&M'"))+66666&M  -----r-   items_to_add	num_itemsc           	      h   t          |t                    rht          |          |k    r0t          d| dt          |           dt          j         d          |D ] }t                              | |||           !dS d }t                              | |t          j        ||          |           dS )ae  Adds a list of items (or batched item) under `column` to the given `batch`.

        If `items_to_add` is not a list, but an already batched struct (of np.ndarray
        leafs), the `items_to_add` will be appended to possibly existing data under the
        same `column` as-is. A subsequent `BatchIndividualItems` ConnectorV2 piece will
        recognize this and batch the data properly into a single (batched) item.
        This is much faster than first splitting up `items_to_add` and then adding each
        item individually.

        If `single_agent_episode` is provided and its `agent_id` and `module_id`
        properties are None, creates a further sub dictionary under `column`, mapping
        from `(<episodeID>,)` to a list of data items (to which `items_to_add` will
        be appended in this call).
        If `single_agent_episode` is provided and its `agent_id` and `module_id`
        properties are NOT None, creates a further sub dictionary under `column`,
        mapping from `(<episodeID>,,<AgentID>,<ModuleID>)` to a list of data items (to
        which `items_to_add` will be appended in this call).
        If `single_agent_episode` is not provided, will append `items_to_add` to a list
        directly under `column`.

        .. testcode::

            import numpy as np

            from ray.rllib.connectors.connector_v2 import ConnectorV2
            from ray.rllib.env.multi_agent_episode import MultiAgentEpisode
            from ray.rllib.env.single_agent_episode import SingleAgentEpisode
            from ray.rllib.utils.test_utils import check

            # Simple case (no episodes provided) -> Store data in a list directly under
            # `column`:
            batch = {}
            ConnectorV2.add_n_batch_items(
                batch,
                "test_col",
                # List of (complex) structs.
                [{"a": np.array(3), "b": 4}, {"a": np.array(5), "b": 6}],
                num_items=2,
            )
            check(
                batch["test_col"],
                [{"a": np.array(3), "b": 4}, {"a": np.array(5), "b": 6}],
            )
            # In a new column (test_col_2), store some already batched items.
            # This way, you may avoid having to disassemble an already batched item
            # (e.g. a numpy array of shape (10, 2)) into its individual items (e.g.
            # split the array into a list of len=10) and then adding these individually.
            # The performance gains may be quite large when providing already batched
            # items (such as numpy arrays with a batch dim):
            ConnectorV2.add_n_batch_items(
                batch,
                "test_col_2",
                # One (complex) already batched struct.
                {"a": np.array([3, 5]), "b": np.array([4, 6])},
                num_items=2,
            )
            # Add more already batched items (this time with a different batch size)
            ConnectorV2.add_n_batch_items(
                batch,
                "test_col_2",
                {"a": np.array([7, 7, 7]), "b": np.array([8, 8, 8])},
                num_items=3,  # <- in this case, this must be the batch size
            )
            check(
                batch["test_col_2"],
                [
                    {"a": np.array([3, 5]), "b": np.array([4, 6])},
                    {"a": np.array([7, 7, 7]), "b": np.array([8, 8, 8])},
                ],
            )

            # Single-agent case (SingleAgentEpisode provided) -> Store data in a list
            # under the keys: `column` -> `(<eps_id>,)`:
            batch = {}
            episode = SingleAgentEpisode(
                id_="SA-EPS0",
                observations=[0, 1, 2, 3],
                actions=[1, 2, 3],
                rewards=[1.0, 2.0, 3.0],
            )
            ConnectorV2.add_n_batch_items(
                batch=batch,
                column="test_col",
                items_to_add=[5, 6, 7],
                num_items=3,
                single_agent_episode=episode,
            )
            check(batch, {
                "test_col": {("SA-EPS0",): [5, 6, 7]},
            })

            # Multi-agent case (SingleAgentEpisode provided that has `agent_id` and
            # `module_id` information) -> Store data in a list under the keys:
            # `column` -> `(<episodeID>,<AgentID>,<ModuleID>)`:
            batch = {}
            ma_episode = MultiAgentEpisode(
                id_="MA-EPS1",
                observations=[
                    {"ag0": 0, "ag1": 1}, {"ag0": 2, "ag1": 4}
                ],
                actions=[{"ag0": 0, "ag1": 1}],
                rewards=[{"ag0": -0.1, "ag1": -0.2}],
                # ag0 maps to mod0, ag1 maps to mod1, etc..
                agent_to_module_mapping_fn=lambda aid, eps: f"mod{aid[2:]}",
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col",
                item_to_add=5,
                single_agent_episode=ma_episode.agent_episodes["ag0"],
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col",
                item_to_add=6,
                single_agent_episode=ma_episode.agent_episodes["ag0"],
            )
            ConnectorV2.add_batch_item(
                batch,
                "test_col_2",
                item_to_add=10,
                single_agent_episode=ma_episode.agent_episodes["ag1"],
            )
            check(
                batch,
                {
                    "test_col": {("MA-EPS1", "ag0", "mod0"): [5, 6]},
                    "test_col_2": {("MA-EPS1", "ag1", "mod1"): [10]},
                },
            )

        Args:
            batch: The batch to store n `items_to_add` in.
            column: The column name (str) within the `batch` to store `item_to_add`
                under.
            items_to_add: The list of data items to store in the batch OR an already
                batched (possibly nested) struct. In the latter case, the `items_to_add`
                will be appended to possibly existing data under the same `column`
                as-is. A subsequent `BatchIndividualItems` ConnectorV2 piece will
                recognize this and batch the data properly into a single (batched) item.
                This is much faster than first splitting up `items_to_add` and then
                adding each item individually.
            num_items: The number of items in `items_to_add`. This arg is mostly for
                asserting the correct usage of this method by checking, whether the
                given data in `items_to_add` really has the right amount of individual
                items.
            single_agent_episode: An optional SingleAgentEpisode.
                If provided and its `agent_id` and `module_id` properties are None,
                creates a further sub dictionary under `column`, mapping from
                `(<episodeID>,)` to a list of data items (to which `items_to_add` will
                be appended in this call).
                If provided and its `agent_id` and `module_id` properties are NOT None,
                creates a further sub dictionary under `column`, mapping from
                `(<episodeID>,,<AgentID>,<ModuleID>)` to a list of data items (to which
                `items_to_add` will be appended in this call).
                If not provided, will append `items_to_add` to a list directly under
                `column`.
        zMismatch between `num_items` (z') and the length of the provided list (z) in z.add_n_batch_items()!)rI   rg   rh   ri   Nc                      t          |           S Nr   )ss    r+   _tagz+ConnectorV2.add_n_batch_items.<locals>._tag  s    !!$$$r-   )	rR   rU   rS   rT   r   __name__rm   treemap_structure)rI   rg   rn   ro   ri   itemrt   s          r+   add_n_batch_itemszConnectorV2.add_n_batch_items
  s   N lD)) 	<  I-- CY C C-0->->C C"+C C C  
 %  **! $)=	 +     F	% 	% 	% 	"" *4>>!5 	# 	
 	
 	
 	
 	
r-   funcc           	           fdt          |          D             }t          |t                    }t          d |D                       r5t	          d| dt                                                      d          t          |d         t
                    rjt          t          |           D ]Q\  }} ||r|d         n|ddd          }|r|fn|}t          t          |                    D ]\  }}	|	||         |<   RdS |d         	                                D ]\  }
t                    dk    r\  }}}nd         }dx}}fd	|d
d         D             }t          t          |
g|R            D ]J\  }} ||r|d         n||||          }|r|fn|}t          |          D ]\  }}	|	||                  |<   KdS )a  Runs the provided `func` on all items under one or more columns in the batch.

        Use this method to conveniently loop through all items in a batch
        and transform them in place.

        `func` takes the following as arguments:
        - The item itself. If column is a list of column names, this argument is a tuple
        of items.
        - The EpisodeID. This value might be None.
        - The AgentID. This value might be None in the single-agent case.
        - The ModuleID. This value might be None in the single-agent case.

        The return value(s) of `func` are used to directly override the values in the
        given `batch`.

        Args:
            batch: The batch to process in-place.
            column: A single column name (str) or a list thereof. If a list is provided,
                the first argument to `func` is a tuple of items. If a single
                str is provided, the first argument to `func` is an individual
                item.
            func: The function to call on each item or tuple of item(s).

        .. testcode::

            from ray.rllib.connectors.connector_v2 import ConnectorV2
            from ray.rllib.utils.test_utils import check

            # Simple case: Batch items are in lists directly under their column names.
            batch = {
                "col1": [0, 1, 2, 3],
                "col2": [0, -1, -2, -3],
            }
            # Increase all ints by 1.
            ConnectorV2.foreach_batch_item_change_in_place(
                batch=batch,
                column="col1",
                func=lambda item, *args: item + 1,
            )
            check(batch["col1"], [1, 2, 3, 4])

            # Further increase all ints by 1 in col1 and flip sign in col2.
            ConnectorV2.foreach_batch_item_change_in_place(
                batch=batch,
                column=["col1", "col2"],
                func=(lambda items, *args: (items[0] + 1, -items[1])),
            )
            check(batch["col1"], [2, 3, 4, 5])
            check(batch["col2"], [0, 1, 2, 3])

            # Single-agent case: Batch items are in lists under (eps_id,)-keys in a dict
            # under their column names.
            batch = {
                "col1": {
                    ("eps1",): [0, 1, 2, 3],
                    ("eps2",): [400, 500, 600],
                },
            }
            # Increase all ints of eps1 by 1 and divide all ints of eps2 by 100.
            ConnectorV2.foreach_batch_item_change_in_place(
                batch=batch,
                column="col1",
                func=lambda item, eps_id, *args: (
                    item + 1 if eps_id == "eps1" else item / 100
                ),
            )
            check(batch["col1"], {
                ("eps1",): [1, 2, 3, 4],
                ("eps2",): [4, 5, 6],
            })

            # Multi-agent case: Batch items are in lists under
            # (eps_id, agent_id, module_id)-keys in a dict
            # under their column names.
            batch = {
                "col1": {
                    ("eps1", "ag1", "mod1"): [1, 2, 3, 4],
                    ("eps2", "ag1", "mod2"): [400, 500, 600],
                    ("eps2", "ag2", "mod3"): [-1, -2, -3, -4, -5],
                },
            }
            # Decrease all ints of "eps1" by 1, divide all ints of "mod2" by 100, and
            # flip sign of all ints of "ag2".
            ConnectorV2.foreach_batch_item_change_in_place(
                batch=batch,
                column="col1",
                func=lambda item, eps_id, ag_id, mod_id: (
                    item - 1
                    if eps_id == "eps1"
                    else item / 100
                    if mod_id == "mod2"
                    else -item
                ),
            )
            check(batch["col1"], {
                ("eps1", "ag1", "mod1"): [0, 1, 2, 3],
                ("eps2", "ag1", "mod2"): [4, 5, 6],
                ("eps2", "ag2", "mod3"): [1, 2, 3, 4, 5],
            })
        c                 :    g | ]}                     |          S r'   )r4   )r(   crI   s     r+   
<listcomp>zBConnectorV2.foreach_batch_item_change_in_place.<locals>.<listcomp>@  s#    DDDA599Q<<DDDr-   c              3      K   | ]}|d u V  	d S rr   r'   )r(   rc   s     r+   	<genexpr>zAConnectorV2.foreach_batch_item_change_in_place.<locals>.<genexpr>B  s&      22QqDy222222r-   zInvalid column name(s) (z7)! One or more not found in given batch. Found columns .r   N   c                      g | ]
}|         S r'   r'   )r(   rc   re   s     r+   r~   zBConnectorV2.foreach_batch_item_change_in_place.<locals>.<listcomp>c  s    CCC!qvCCCr-   r#   )r   rR   stranyrT   rU   keys	enumeraterV   rW   rS   )rI   rg   rz   data_to_process
single_collist_pos
data_tupleresultscol_slotresultd0_listeps_idr]   r^   other_listsre   s   `              @r+   "foreach_batch_item_change_in_placez.ConnectorV2.foreach_batch_item_change_in_place  s   X EDDDF1C1CDDD,,
22/22222 	D6 D D.25::<<.@.@D D D   oa($// $	J(1#2G(H(H 
A 
A$*$%/?JqMMZ	  )3?7**(1*W2E2E(F(F A A$Hf:@OH-h77A
A 
A !0 2 8 8 : : J JW s88q==25/FHii
 !VF+//HyCCCCqrr/BCCC,5c'6PK6P6P6P,Q,Q 
J 
J(Hj"d)3C
1 !	 G -7CwjjGG,5g,>,> J J(&CI1#6x@@J
JJ Jr-   c                     t          t                    }|                                 D ]*\  }}|                                D ]\  }}|||         |<   +t          |          S )a  Switches the first two levels of a `col_name -> ModuleID -> data` type batch.

        Assuming that the top level consists of column names as keys and the second
        level (under these columns) consists of ModuleID keys, the resulting batch
        will have these two reversed and thus map ModuleIDs to dicts mapping column
        names to data items.

        .. testcode::

            from ray.rllib.utils.test_utils import check

            batch = {
                "obs": {"module_0": [1, 2, 3]},
                "actions": {"module_0": [4, 5, 6], "module_1": [7]},
            }
            switched_batch = ConnectorV2.switch_batch_from_column_to_module_ids(batch)
            check(
                switched_batch,
                {
                    "module_0": {"obs": [1, 2, 3], "actions": [4, 5, 6]},
                    "module_1": {"actions": [7]},
                },
            )

        Args:
            batch: The batch to switch from being column name based (then ModuleIDs)
                to being ModuleID based (then column names).

        Returns:
            A new batch dict mapping ModuleIDs to dicts mapping column names (e.g.
            "obs") to data.
        )r   dictrW   )rI   module_datarg   column_datar^   ra   s         r+   &switch_batch_from_column_to_module_idsz2ConnectorV2.switch_batch_from_column_to_module_idsp  sr    H "$''#(;;== 	6 	6FK#.#4#4#6#6 6 6	415I&v..6K   r-   )not_components
componentsr   c                    i S rr   r'   )r&   r   r   r<   s       r+   	get_statezConnectorV2.get_state  s	     	r-   statec                     d S rr   r'   )r&   r   s     r+   	set_statezConnectorV2.set_state  s    r-   c                     d| j         fS )Nr'   )r;   r%   s    r+   get_ctor_args_and_kwargsz$ConnectorV2.get_ctor_args_and_kwargs  s     
 	
r-   c                     dS )zResets the state of this ConnectorV2 to some initial value.

        Note that this may NOT be the exact state that this ConnectorV2 was originally
        constructed with.
        Nr'   r%   s    r+   reset_statezConnectorV2.reset_state  s	     	r-   statesc                     i S )al  Computes a resulting state given self's state and a list of other states.

        Algorithms should use this method for merging states between connectors
        running on parallel EnvRunner workers. For example, to synchronize the connector
        states of n remote workers and a local worker, one could:
        - Gather all remote worker connector states in a list.
        - Call `self.merge_states()` on the local worker passing it the states list.
        - Broadcast the resulting local worker's connector state back to all remote
        workers. After this, all workers (including the local one) hold a
        merged/synchronized new connecto state.

        Args:
            states: The list of n other ConnectorV2 states to merge with self's state
                into a single resulting state.

        Returns:
            The resulting state dict.
        r'   )r&   r   s     r+   merge_stateszConnectorV2.merge_states  s	    & 	r-   c                     | j         S )zGetter for our (output) observation space.

        Logic: Use user provided space (if set via `observation_space` setter)
        otherwise, use the same as the input space, assuming this connector piece
        does not alter the space.
        )r.   r%   s    r+   observation_spacezConnectorV2.observation_space  s     &&r-   c                     | j         S )zGetter for our (output) action space.

        Logic: Use user provided space (if set via `action_space` setter)
        otherwise, use the same as the input space, assuming this connector piece
        does not alter the space.
        )r/   r%   s    r+   action_spacezConnectorV2.action_space  s     !!r-   c                     | j         S rr   )r0   r%   s    r+   r   z#ConnectorV2.input_observation_space  s    ,,r-   c                 \    || _         |"|                     || j                  | _        d S d S rr   )r0   rB   r    r.   r&   values     r+   r   z#ConnectorV2.input_observation_space  s>    (-%&*&M&Mt.' 'D### r-   c                     | j         S rr   )r1   r%   s    r+   r    zConnectorV2.input_action_space  s    ''r-   c                 \    || _         |"|                     | j        |          | _        d S d S rr   )r1   rD   r   r/   r   s     r+   r    zConnectorV2.input_action_space  s>    #( !%!C!C,e" "D r-   r   indentationc                 &    d|z  | j         j        z   S )N )r2   ru   )r&   r   s     r+   __str__zConnectorV2.__str__  s    [ 4>#:::r-   )NN)TNrr   )r?   N)r   )5ru   
__module____qualname____doc__r
   gymSpacer"   r   rB   rD   abcabstractmethodr   r   r   r   r	   r   boolr   r   rL   staticmethodr   r   r   r   rf   rm   rQ   ry   r   r   r   r   r   r   r   r   r   r   r   r   r   r   propertyr   r   r   setterr    r   r'   r-   r+   r   r      s        , ,` 8<26+ +!)#)!4+ %SY/+ + + +Z $@,!$@,  I@, 
	@, @, @, $#@,D $'!$'  I' 
	' ' ' $#'8 	 #'&*+/3 3 3 3 CH~	3
 {#3 $3 d^3 -(3 
3 3 3 3j  *.NR\% \%{#\%"&\%  (d3ieSj9I.I(JK\% 
$	%	\% \% \% \\%| 
 >B	W. W.CH~W.W. W. ''9:	W.
 
W. W. W. \W.r  >BG
 G
CH~G
G
 G
 	G

 ''9:G
 
G
 G
 G
 \G
R YJCH~YJc49eCj01YJ (3-'!2HX4FGL
YJ 
YJ YJ YJ \YJv '!Chm,,-'!	hS#X&	''! '! '! \'!R Xn =A AE	  U3
3#789 !sJsO';!<=	 
    Xny T     Xn
%tCH~0E*F 
 
 
 
   4S#X#7 DcN    * ' ' X' " " X" - - X- #  $# ( ( X(   ; ;3 ; ; ; ; ; ;r-   r   ))r   r5   collectionsr   typingr   r   r   r   r   r	   r
   r   r   	gymnasiumr   rv   "ray.rllib.core.rl_module.rl_moduler   "ray.rllib.env.single_agent_episoder   ray.rllib.utilsr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.checkpointsr   &ray.rllib.utils.metrics.metrics_loggerr   "ray.rllib.utils.spaces.space_utilsr   ray.rllib.utils.typingr   r   r   r   ray.util.annotationsr   ABCr   r'   r-   r+   <module>r      s   



  # # # # # #
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
      7 7 7 7 7 7 A A A A A A & & & & & & P P P P P P P P 6 6 6 6 6 6 @ @ @ @ @ @ = = = = = = L L L L L L L L L L L L * * * * * * WZ; Z; Z; Z; Z;.#' Z; Z; Z; Z; Z;r-   