Multi Agent DQN for CIM

This example demonstrates how to use MARO’s reinforcement learning (RL) toolkit to solve the CIM problem. It is formalized as a multi-agent reinforcement learning problem, where each port acts as a decision agent. The agents take actions independently, e.g., loading containers to vessels or discharging containers from vessels.

State Shaper

State shaper converts the environment observation to the model input state which includes temporal and spatial information. For this scenario, the model input state includes:

  • Temporal information, including the past week’s information of ports and vessels, such as shortage on port and

remaining space on vessel. - Spatial information, including related downstream port features.

class CIMStateShaper(StateShaper):
    ...
    def __call__(self, decision_event, snapshot_list):
        tick, port_idx, vessel_idx = decision_event.tick, decision_event.port_idx, decision_event.vessel_idx
        ticks = [tick - rt for rt in range(self._look_back - 1)]
        future_port_idx_list = snapshot_list["vessels"][tick : vessel_idx : 'future_stop_list'].astype('int')
        port_features = snapshot_list["ports"][ticks : [port_idx] + list(future_port_idx_list) : self._port_attributes]
        vessel_features = snapshot_list["vessels"][tick : vessel_idx : self._vessel_attributes]
        state = np.concatenate((port_features, vessel_features))
        return str(port_idx), state

Action Shaper

Action shaper is used to convert an agent’s model output to an environment executable action. For this specific scenario, the action space consists of integers from -10 to 10, with -10 indicating loading 100% of the containers in the current inventory to the vessel and 10 indicating discharging 100% of the containers on the vessel to the port.

class CIMActionShaper(ActionShaper):
    ...
    def __call__(self, model_action, decision_event, snapshot_list):
        scope = decision_event.action_scope
        tick = decision_event.tick
        port_idx = decision_event.port_idx
        vessel_idx = decision_event.vessel_idx

        port_empty = snapshot_list["ports"][tick: port_idx: ["empty", "full", "on_shipper", "on_consignee"]][0]
        vessel_remaining_space = snapshot_list["vessels"][tick: vessel_idx: ["empty", "full", "remaining_space"]][2]
        early_discharge = snapshot_list["vessels"][tick:vessel_idx: "early_discharge"][0]
        assert 0 <= model_action < len(self._action_space)

        if model_action < self._zero_action_index:
            actual_action = max(round(self._action_space[model_action] * port_empty), -vessel_remaining_space)
        elif model_action > self._zero_action_index:
            plan_action = self._action_space[model_action] * (scope.discharge + early_discharge) - early_discharge
            actual_action = round(plan_action) if plan_action > 0 else round(self._action_space[model_action] * scope.discharge)
        else:
            actual_action = 0

        return Action(vessel_idx, port_idx, actual_action)

Experience Shaper

Experience shaper is used to convert an episode trajectory to trainable experiences for RL agents. For this specific scenario, the reward is a linear combination of fulfillment and shortage in a limited time window.

class TruncatedExperienceShaper(ExperienceShaper):
    ...
    def __call__(self, trajectory, snapshot_list):
        experiences_by_agent = {}
        for i in range(len(trajectory) - 1):
            transition = trajectory[i]
            agent_id = transition["agent_id"]
            if agent_id not in experiences_by_agent:
                experiences_by_agent[agent_id] = defaultdict(list)
            experiences = experiences_by_agent[agent_id]
            experiences["state"].append(transition["state"])
            experiences["action"].append(transition["action"])
            experiences["reward"].append(self._compute_reward(transition["event"], snapshot_list))
            experiences["next_state"].append(trajectory[i + 1]["state"])

        return experiences_by_agent

    def _compute_reward(self, decision_event, snapshot_list):
        start_tick = decision_event.tick + 1
        end_tick = decision_event.tick + self._time_window
        ticks = list(range(start_tick, end_tick))

        # calculate tc reward
        future_fulfillment = snapshot_list["ports"][ticks::"fulfillment"]
        future_shortage = snapshot_list["ports"][ticks::"shortage"]
        decay_list = [self._time_decay_factor ** i for i in range(end_tick - start_tick)
                      for _ in range(future_fulfillment.shape[0] // (end_tick - start_tick))]

        tot_fulfillment = np.dot(future_fulfillment, decay_list)
        tot_shortage = np.dot(future_shortage, decay_list)

        return np.float(self._fulfillment_factor * tot_fulfillment - self._shortage_factor * tot_shortage)

Agent

Agent is a combination of (RL) algorithm, experience pool, and a set of parameters that governs the training loop. For this scenario, the agent is the abstraction of a port. We choose DQN as our underlying learning algorithm with a TD-error-based sampling mechanism.

Agent Manager

Agent manager is an agent assembler and isolates the complexities of the environment and algorithm. For this scenario, It will load the DQN algorithm and an experience pool for each agent.

class DQNAgentManager(AbsAgentManager):
    def _assemble(self, agent_dict):
        set_seeds(config.agents.seed)
        num_actions = config.agents.algorithm.num_actions
        for agent_id in self._agent_id_list:
            eval_model = LearningModel(decision_layers=MLPDecisionLayers(name=f'{agent_id}.policy',
                                                                         input_dim=self._state_shaper.dim,
                                                                         output_dim=num_actions,
                                                                         **config.agents.algorithm.model)
                                       )

            algorithm = DQN(model_dict={"eval": eval_model},
                            optimizer_opt=(RMSprop, config.agents.algorithm.optimizer),
                            loss_func_dict={"eval": smooth_l1_loss},
                            hyper_params=DQNHyperParams(**config.agents.algorithm.hyper_parameters,
                                                        num_actions=num_actions))

            experience_pool = ColumnBasedStore(**config.agents.experience_pool)
            agent_dict[agent_id] = CIMAgent(name=agent_id, algorithm=algorithm, experience_pool=experience_pool,
                                            **config.agents.training_loop_parameters)

Main Loop with Actor and Learner (Single Process)

This single-process workflow of a learning policy’s interaction with a MARO environment is comprised of: - Initializing an environment with specific scenario and topology parameters. - Defining scenario-specific components, e.g. shapers. - Creating an agent manager, which assembles underlying agents. - Creating an actor and a learner to start the training process in which the agent manager interacts with the environment for collecting experiences and updating policies.

Main Loop with Actor and Learner (Distributed/Multi-process)

We demonstrate a single-learner and multi-actor topology where the learner drives the program by telling remote actors to perform roll-out tasks and using the results they sent back to improve the policies. The workflow usually involves launching a learner process and an actor process separately. Because training occurs on the learner side and inference occurs on the actor side, we need to create appropriate agent managers on both sides.

On the actor side, the agent manager must be equipped with all shapers as well as an explorer. Thus, The code for creating an environment and an agent manager on the actor side is similar to that for the single-host version, except that it is necessary to set the AgentMode to AgentMode.INFERENCE. As in the single-process version, the environment and the agent manager are wrapped in a SimpleActor instance. To make the actor a distributed worker, we need to further wrap it in an ActorWorker instance. Finally, we launch the worker and it starts to listen to roll-out requests from the learner. The following code snippet shows the creation of an actor worker with a simple (local) actor wrapped inside.

agent_manager = DQNAgentManager(name="cim_remote_actor",
                                agent_id_list=agent_id_list,
                                mode=AgentMode.INFERENCE,
                                state_shaper=state_shaper,
                                action_shaper=action_shaper,
                                experience_shaper=experience_shaper,
                                explorer=explorer)
proxy_params = {"group_name": config.distributed.group_name,
                "expected_peers": config.distributed.actor.peer,
                "redis_address": (config.distributed.redis.host_name, config.distributed.redis.port)
                }
actor_worker = ActorWorker(local_actor=SimpleActor(env=env, inference_agents=agent_manager),
                           proxy_params=proxy_params)
actor_worker.launch()

On the learner side, an agent manager in AgentMode.TRAIN mode is required. However, it is not necessary to create shapers for an agent manager in AgentMode.TRAIN mode (although a state shaper is created in this example so that the model input dimension can be readily accessed). Instead of creating an actor, we create an actor proxy and wrap it inside the learner. This proxy serves as the communication interface for the learner and is responsible for sending roll-out requests to remote actor processes and receiving results. Calling the train method executes the usual training loop except that the actual roll-out is performed remotely. The code snippet below shows the creation of a learner with an actor proxy wrapped inside.

agent_manager = DQNAgentManager(name="cim_remote_learner", agent_id_list=agent_id_list, mode=AgentMode.TRAIN,
                                state_shaper=state_shaper, explorer=explorer)

proxy_params = {"group_name": config.distributed.group_name,
                "expected_peers": config.distributed.learner.peer,
                "redis_address": (config.distributed.redis.host_name, config.distributed.redis.port)
                }
learner = SimpleLearner(trainable_agents=agent_manager,
                        actor=ActorProxy(proxy_params=proxy_params),
                        logger=Logger("distributed_cim_learner", auto_timestamp=False))
learner.train(total_episodes=config.general.total_training_episodes)

Note

All related code snippets are supported in maro playground.