diff --git a/changelogs/unreleased/7694-executor-cleanup.yml b/changelogs/unreleased/7694-executor-cleanup.yml
new file mode 100644
index 0000000000..2fc72ee24a
--- /dev/null
+++ b/changelogs/unreleased/7694-executor-cleanup.yml
@@ -0,0 +1,4 @@
+description: Add policy to cleanup old executors
+issue-nr: 7694
+change-type: patch
+destination-branches: [master]
diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py
index c29b90852e..51d4895196 100644
--- a/src/inmanta/agent/agent.py
+++ b/src/inmanta/agent/agent.py
@@ -639,15 +639,26 @@ def periodic_schedule(
             action: Callable[[], Coroutine[object, None, object]],
             interval: Union[int, str],
             splay_value: int,
-            initial_time: datetime.datetime,
         ) -> bool:
+            """
+            Schedule a periodic task
+
+            :param kind: Name of the task (value to display in logs)
+            :param action: The action to schedule periodically
+            :param interval: The interval at which to schedule the task. Can be specified as either a number of
+                seconds, or a cron string.
+            :param splay_value: When specifying the interval as a number of seconds, this parameter specifies
+                the number of seconds by which to delay the initial execution of this action.
+            """
+            now = datetime.datetime.now().astimezone()
+
             if isinstance(interval, int) and interval > 0:
                 self.logger.info(
                     "Scheduling periodic %s with interval %d and splay %d (first run at %s)",
                     kind,
                     interval,
                     splay_value,
-                    (initial_time + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
+                    (now + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
                 )
                 interval_schedule: IntervalSchedule = IntervalSchedule(
                     interval=float(interval), initial_delay=float(splay_value)
@@ -674,8 +685,9 @@ def periodic_schedule(
                 )
             )
             self.ensure_deploy_on_start = False
-        periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now)
-        periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now)
+
+        periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value)
+        periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value)
 
     def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None:
         self.process._sched.add_action(action, schedule)
@@ -976,7 +988,7 @@ def __init__(
 
         self.agent_map: Optional[dict[str, str]] = agent_map
 
-        remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExcutorMode.forking
+        remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExecutorMode.forking
         can_have_remote_executor = code_loader
 
         self.executor_manager: executor.ExecutorManager[executor.Executor]
@@ -1065,6 +1077,7 @@ async def start(self) -> None:
         # cache reference to THIS ioloop for handlers to push requests on it
         self._io_loop = asyncio.get_running_loop()
         await super().start()
+        await self.executor_manager.start()
 
     async def add_end_point_name(self, name: str) -> None:
         async with self._instances_lock:
diff --git a/src/inmanta/agent/config.py b/src/inmanta/agent/config.py
index e6b5fa587e..68713587ed 100644
--- a/src/inmanta/agent/config.py
+++ b/src/inmanta/agent/config.py
@@ -17,6 +17,7 @@
 """
 
 import enum
+import functools
 import logging
 import typing
 import uuid
@@ -149,6 +150,7 @@
     is_time,
 )
 
+
 agent_get_resource_backoff: Option[float] = Option(
     "config",
     "agent-get-resource-backoff",
@@ -161,26 +163,42 @@
 )
 
 
-class AgentExcutorMode(str, enum.Enum):
+class AgentExecutorMode(str, enum.Enum):
     threaded = "threaded"
     forking = "forking"
 
 
-def is_executor_mode(value: str | AgentExcutorMode) -> AgentExcutorMode:
+def is_executor_mode(value: str | AgentExecutorMode) -> AgentExecutorMode:
     """threaded | forking"""
-    if isinstance(value, AgentExcutorMode):
+    if isinstance(value, AgentExecutorMode):
         return value
-    return AgentExcutorMode(value)
+    return AgentExecutorMode(value)
 
 
 agent_executor_mode = Option(
     "agent",
     "executor-mode",
-    AgentExcutorMode.threaded,
+    AgentExecutorMode.threaded,
     "EXPERIMENTAL: set the agent to use threads or fork subprocesses to create workers.",
     is_executor_mode,
 )
 
+agent_executor_cap = Option[int](
+    "agent",
+    "executor-cap",
+    3,
+    "Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
+    "when creating a new executor, the oldest one will be stopped first.",
+    is_lower_bounded_int(1),
+)
+
+agent_executor_retention_time = Option[int](
+    "agent",
+    "executor-retention-time",
+    60,
+    "Amount of time (in seconds) to wait before cleaning up inactive executors.",
+    is_time,
+)
 
 ##############################
 # agent_rest_transport
diff --git a/src/inmanta/agent/executor.py b/src/inmanta/agent/executor.py
index a6355faee3..600d5b8864 100644
--- a/src/inmanta/agent/executor.py
+++ b/src/inmanta/agent/executor.py
@@ -471,11 +471,14 @@ class ExecutorManager(abc.ABC, typing.Generic[E]):
     @abc.abstractmethod
     async def get_executor(self, agent_name: str, agent_uri: str, code: typing.Collection[ResourceInstallSpec]) -> E:
         """
-        Retrieves an Executor based on the agent name and blueprint.
+        Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
         If an Executor does not exist for the given configuration, a new one is created.
 
         :param agent_name: The name of the agent for which an Executor is being retrieved or created.
-        :param blueprint: The ExecutorBlueprint defining the configuration for the Executor.
+        :param agent_uri: The name of the host on which the agent is running.
+        :param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
+            which resource types it can act on and all necessary information to install the relevant
+            handler code in its venv.
         :return: An Executor instance
         """
         pass
@@ -491,6 +494,13 @@ async def stop_for_agent(self, agent_name: str) -> list[E]:
         """
         pass
 
+    @abc.abstractmethod
+    async def start(self) -> None:
+        """
+        Start the manager.
+        """
+        pass
+
     @abc.abstractmethod
     async def stop(self) -> None:
         """
diff --git a/src/inmanta/agent/forking_executor.py b/src/inmanta/agent/forking_executor.py
index d5ad8373df..8d022b2929 100644
--- a/src/inmanta/agent/forking_executor.py
+++ b/src/inmanta/agent/forking_executor.py
@@ -20,6 +20,7 @@
 import collections
 import concurrent.futures
 import concurrent.futures.thread
+import datetime
 import functools
 import logging
 import logging.config
@@ -28,7 +29,8 @@
 import socket
 import typing
 import uuid
-from asyncio import transports
+from asyncio import Future, transports
+from typing import Optional
 
 import inmanta.agent.cache
 import inmanta.agent.executor
@@ -44,7 +46,15 @@
 import inmanta.util
 from inmanta.agent import executor
 from inmanta.data.model import ResourceType
-from inmanta.protocol.ipc_light import FinalizingIPCClient, IPCServer, LogReceiver, LogShipper
+from inmanta.protocol.ipc_light import (
+    FinalizingIPCClient,
+    IPCMethod,
+    IPCReplyFrame,
+    IPCServer,
+    LogReceiver,
+    LogShipper,
+    ReturnType,
+)
 from setproctitle import setproctitle
 
 LOGGER = logging.getLogger(__name__)
@@ -159,7 +169,40 @@ def connection_lost(self, exc: Exception | None) -> None:
 
 
 class ExecutorClient(FinalizingIPCClient[ExecutorContext], LogReceiver):
-    pass
+    def __init__(self, name: str):
+        super().__init__(name)
+
+        # Keeps track of when this client was active last
+        self.last_used_at: datetime.datetime = datetime.datetime.now().astimezone()
+
+    def get_idle_time(self) -> datetime.timedelta:
+        return datetime.datetime.now().astimezone() - self.last_used_at
+
+    @typing.overload
+    def call(
+        self, method: IPCMethod[ExecutorContext, ReturnType], has_reply: typing.Literal[True] = True
+    ) -> Future[ReturnType]: ...
+
+    @typing.overload
+    def call(self, method: IPCMethod[ExecutorContext, ReturnType], has_reply: typing.Literal[False]) -> None: ...
+
+    @typing.overload
+    def call(self, method: IPCMethod[ExecutorContext, ReturnType], has_reply: bool = True) -> Future[ReturnType] | None: ...
+
+    def call(self, method: IPCMethod[ExecutorContext, ReturnType], has_reply: bool = True) -> Future[ReturnType] | None:
+        """Call a method with given arguments"""
+        self.last_used_at = datetime.datetime.now().astimezone()
+        response = super().call(method, has_reply)
+        assert response is None or isinstance(response, Future)
+        return response
+
+    def has_outstanding_calls(self) -> bool:
+        """Is this client still waiting for replies"""
+        return len(self.requests) > 0
+
+    def process_reply(self, frame: IPCReplyFrame) -> None:
+        super().process_reply(frame)
+        self.last_used_at = datetime.datetime.now().astimezone()
 
 
 class StopCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, None]):
@@ -361,7 +404,7 @@ def __init__(
         self,
         owner: "MPManager",
         process: multiprocessing.Process,
-        connection: FinalizingIPCClient[ExecutorContext],
+        connection: ExecutorClient,
         executor_id: executor.ExecutorId,
         venv: executor.ExecutorVirtualEnvironment,
     ):
@@ -388,6 +431,15 @@ async def stop(self) -> None:
             # Already gone
             pass
 
+    def can_be_cleaned_up(self, retention_time: int) -> bool:
+        if self.connection.has_outstanding_calls():
+            return False
+
+        return self.connection.get_idle_time() > datetime.timedelta(seconds=retention_time)
+
+    def last_used(self) -> datetime.datetime:
+        return self.connection.last_used_at
+
     async def force_stop(self, grace_time: float = inmanta.const.SHUTDOWN_GRACE_HARD) -> None:
         """Stop by process close"""
         self.closing = True
@@ -517,6 +569,13 @@ def __init__(
 
         self._locks: inmanta.util.NamedLock = inmanta.util.NamedLock()
 
+        self.executor_retention_time = inmanta.agent.config.agent_executor_retention_time.get()
+        self.max_executors_per_agent = inmanta.agent.config.agent_executor_cap.get()
+
+        # We keep a reference to the periodic cleanup task to prevent it
+        # from disappearing mid-execution https://docs.python.org/3.11/library/asyncio-task.html#creating-tasks
+        self.cleanup_job: Optional[asyncio.Task[None]] = None
+
     def __add_executor(self, theid: executor.ExecutorId, the_executor: MPExecutor) -> None:
         self.executor_map[theid] = the_executor
         self.agent_map[theid.agent_name].add(theid)
@@ -549,11 +608,14 @@ async def get_executor(
         self, agent_name: str, agent_uri: str, code: typing.Collection[executor.ResourceInstallSpec]
     ) -> MPExecutor:
         """
-        Retrieves an Executor based on the agent name and ResourceInstallSpec.
+        Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
         If an Executor does not exist for the given configuration, a new one is created.
 
         :param agent_name: The name of the agent for which an Executor is being retrieved or created.
-        :param code: The set of sources to be installed on the executor.
+        :param agent_uri: The name of the host on which the agent is running.
+        :param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
+            which resource types it can act on and all necessary information to install the relevant
+            handler code in its venv.
         :return: An Executor instance
         """
         blueprint = executor.ExecutorBlueprint.from_specs(code)
@@ -563,8 +625,7 @@ async def get_executor(
             if not it.closing:
                 LOGGER.debug("Found existing executor for agent %s with id %s", agent_name, executor_id.identity())
                 return it
-        # Acquire a lock based on the blueprint's hash
-        # We don't care about URI here
+        # Acquire a lock based on the executor's identity (agent name, agent uri and blueprint hash)
         async with self._locks.get(executor_id.identity()):
             if executor_id in self.executor_map:
                 it = self.executor_map[executor_id]
@@ -576,6 +637,18 @@ async def get_executor(
                         "Found stale executor for agent %s with id %s, waiting for close", agent_name, executor_id.identity()
                     )
                     await it.join(2.0)
+            n_executors_for_agent = len(self.agent_map[executor_id.agent_name])
+            if n_executors_for_agent >= self.max_executors_per_agent:
+                # Close oldest executor:
+                executor_ids = self.agent_map[executor_id.agent_name]
+                oldest_executor = min([self.executor_map[id] for id in executor_ids], key=lambda e: e.connection.last_used_at)
+                LOGGER.debug(
+                    f"Reached executor cap for agent {executor_id.agent_name}. Stopping oldest executor "
+                    f"{oldest_executor.executor_id.identity()} to make room for a new one."
+                )
+
+                await oldest_executor.stop()
+
             my_executor = await self.create_executor(executor_id)
             self.__add_executor(executor_id, my_executor)
             if my_executor.failed_resource_sources:
@@ -629,7 +702,7 @@ async def create_executor(self, executor_id: executor.ExecutorId) -> MPExecutor:
     async def make_child_and_connect(
         self, executor_id: executor.ExecutorId, venv: executor.ExecutorVirtualEnvironment
     ) -> MPExecutor:
-        """Async code to make a child process as share a socker with it"""
+        """Async code to make a child process and share a socket with it"""
         loop = asyncio.get_running_loop()
         name = executor_id.agent_name
 
@@ -658,6 +731,7 @@ def _child_closed(self, child_handle: MPExecutor) -> None:
     def _make_child(self, name: str, log_level: int, cli_log: bool) -> tuple[multiprocessing.Process, socket.socket]:
         """Sync code to make a child process and share a socket with it"""
         parent_conn, child_conn = socket.socketpair()
+        # Fork an ExecutorServer
         p = multiprocessing.Process(
             target=mp_worker_entrypoint,
             args=(child_conn, name, log_level, cli_log, inmanta.config.Config.config_as_dict()),
@@ -665,9 +739,15 @@ def _make_child(self, name: str, log_level: int, cli_log: bool) -> tuple[multipr
         )
         p.start()
         child_conn.close()
+
         return p, parent_conn
 
+    async def start(self) -> None:
+        self.running = True
+        self.cleanup_job = asyncio.create_task(self.cleanup_inactive_executors())
+
     async def stop(self) -> None:
+        self.running = False
         await asyncio.gather(*(child.stop() for child in self.children))
 
     async def force_stop(self, grace_time: float) -> None:
@@ -676,9 +756,52 @@ async def force_stop(self, grace_time: float) -> None:
     async def join(self, thread_pool_finalizer: list[concurrent.futures.ThreadPoolExecutor], timeout: float) -> None:
         thread_pool_finalizer.append(self.thread_pool)
         await asyncio.gather(*(child.join(timeout) for child in self.children))
+        if self.cleanup_job:
+            await self.cleanup_job
 
     async def stop_for_agent(self, agent_name: str) -> list[MPExecutor]:
         children_ids = self.agent_map[agent_name]
         children = [self.executor_map[child_id] for child_id in children_ids]
         await asyncio.gather(*(child.stop() for child in children))
         return children
+
+    async def cleanup_inactive_executors(self) -> None:
+        """
+        This task periodically cleans up idle executors
+        """
+        while self.running:
+            cleanup_start = datetime.datetime.now().astimezone()
+
+            reschedule_interval: float = self.executor_retention_time
+            for _executor in self.executor_map.values():
+                if _executor.can_be_cleaned_up(self.executor_retention_time):
+                    async with self._locks.get(_executor.executor_id.identity()):
+                        # Check that the executor can still be cleaned up by the time we have acquired the lock
+                        if _executor.can_be_cleaned_up(self.executor_retention_time):
+                            try:
+                                LOGGER.debug(
+                                    "Stopping executor %s because it "
+                                    "was inactive for %d s, which is longer then the retention time of %d s.",
+                                    _executor.executor_id.identity(),
+                                    _executor.connection.get_idle_time().total_seconds(),
+                                    self.executor_retention_time,
+                                )
+                                await _executor.stop()
+                            except Exception:
+                                LOGGER.debug(
+                                    "Unexpected error during executor %s cleanup:",
+                                    _executor.executor_id.identity(),
+                                    exc_info=True,
+                                )
+                else:
+                    reschedule_interval = min(
+                        reschedule_interval,
+                        (
+                            datetime.timedelta(seconds=self.executor_retention_time)
+                            - (cleanup_start - _executor.connection.last_used_at)
+                        ).total_seconds(),
+                    )
+
+            cleanup_end = datetime.datetime.now().astimezone()
+
+            await asyncio.sleep(max(0.0, reschedule_interval - (cleanup_end - cleanup_start).total_seconds()))
diff --git a/src/inmanta/agent/in_process_executor.py b/src/inmanta/agent/in_process_executor.py
index 0fc877daf3..2e0d103c16 100644
--- a/src/inmanta/agent/in_process_executor.py
+++ b/src/inmanta/agent/in_process_executor.py
@@ -458,6 +458,9 @@ async def stop(self) -> None:
         for child in self.executors.values():
             child.stop()
 
+    async def start(self) -> None:
+        pass
+
     async def stop_for_agent(self, agent_name: str) -> list[InProcessExecutor]:
         if agent_name in self.executors:
             out = self.executors[agent_name]
@@ -474,10 +477,14 @@ async def get_executor(
         self, agent_name: str, agent_uri: str, code: typing.Collection[executor.ResourceInstallSpec]
     ) -> InProcessExecutor:
         """
-        Creates an Executor based with the specified agent name and blueprint.
-        It ensures the required virtual environment is prepared and source code is loaded.
-
-        :param executor_id: executor identifier containing an agent name and a blueprint configuration.
+        Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
+        If an Executor does not exist for the given configuration, a new one is created.
+
+        :param agent_name: The name of the agent for which an Executor is being retrieved or created.
+        :param agent_uri: The name of the host on which the agent is running.
+        :param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
+            which resource types it can act on and all necessary information to install the relevant
+            handler code in its venv.
         :return: An Executor instance
         """
         if agent_name in self.executors:
diff --git a/src/inmanta/config.py b/src/inmanta/config.py
index 90d0e4b2d5..17fc379e31 100644
--- a/src/inmanta/config.py
+++ b/src/inmanta/config.py
@@ -262,6 +262,19 @@ def is_list(value: str | list[str]) -> list[str]:
     return [] if value == "" else [x.strip() for x in value.split(",")]
 
 
+def is_lower_bounded_int(lower_bound: int) -> Callable[[str | int], int]:
+    """lower-bounded int factory"""
+
+    def inner(value: str | int) -> int:
+        to_int = int(value)
+        if to_int < lower_bound:
+            raise ValueError(f"Value can not be lower than {lower_bound}")
+        return to_int
+
+    inner.__doc__ = f"int >= {lower_bound}"
+    return inner
+
+
 def is_map(map_in: str | typing.Mapping[str, str]) -> typing.Mapping[str, str]:
     """List of comma-separated key=value pairs"""
     if isinstance(map_in, typing.Mapping):
diff --git a/src/inmanta/protocol/ipc_light.py b/src/inmanta/protocol/ipc_light.py
index 1e698f0a7b..02740d247c 100644
--- a/src/inmanta/protocol/ipc_light.py
+++ b/src/inmanta/protocol/ipc_light.py
@@ -226,8 +226,9 @@ class IPCClient(IPCFrameProtocol, typing.Generic[ServerContext]):
     def __init__(self, name: str):
         super().__init__(name)
         # TODO timeouts
-        self.requests: dict[uuid.UUID, Future[object]] = {}
+
         # All outstanding calls
+        self.requests: dict[uuid.UUID, Future[object]] = {}
 
     @typing.overload
     def call(
@@ -237,6 +238,9 @@ def call(
     @typing.overload
     def call(self, method: IPCMethod[ServerContext, ReturnType], has_reply: typing.Literal[False]) -> None: ...
 
+    @typing.overload
+    def call(self, method: IPCMethod[ServerContext, ReturnType], has_reply: bool = True) -> Future[ReturnType] | None: ...
+
     def call(self, method: IPCMethod[ServerContext, ReturnType], has_reply: bool = True) -> Future[ReturnType] | None:
         """Call a method with given arguments"""
         request = IPCRequestFrame(
diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py
index 78681fedf3..c70ba3c6e0 100644
--- a/src/inmanta/server/agentmanager.py
+++ b/src/inmanta/server/agentmanager.py
@@ -1230,6 +1230,8 @@ async def _make_agent_config(
 
 [agent]
 executor-mode={agent_cfg.agent_executor_mode.get().name}
+executor-cap={agent_cfg.agent_executor_cap.get()}
+executor-retention-time={agent_cfg.agent_executor_retention_time.get()}
 
 [agent_rest_transport]
 port=%(port)s
diff --git a/tests/forking_agent/test_executor.py b/tests/forking_agent/test_executor.py
index 8cdfa76bc1..72ede64b98 100644
--- a/tests/forking_agent/test_executor.py
+++ b/tests/forking_agent/test_executor.py
@@ -34,6 +34,7 @@
 from inmanta.agent import executor
 from inmanta.agent.forking_executor import MPManager
 from inmanta.protocol.ipc_light import ConnectionLost
+from utils import log_contains, retry_limited
 
 
 class Echo(inmanta.protocol.ipc_light.IPCMethod[list[str], None]):
@@ -74,7 +75,27 @@ async def call(self, ctx) -> list[str]:
         return [inmanta_plugins.test.testA.test(), inmanta_plugins.test.testB.test()]
 
 
-async def test_executor_server(mpmanager: MPManager, client):
+@pytest.fixture
+def set_custom_executor_policy():
+    """
+    Fixture to temporarily set the policy for executor management.
+    """
+    old_cap_value = inmanta.agent.config.agent_executor_cap.get()
+
+    # Keep only 2 executors per agent
+    inmanta.agent.config.agent_executor_cap.set("2")
+
+    old_retention_value = inmanta.agent.config.agent_executor_retention_time.get()
+    # Clean up executors after 3s of inactivity
+    inmanta.agent.config.agent_executor_retention_time.set("3")
+
+    yield
+
+    inmanta.agent.config.agent_executor_cap.set(str(old_cap_value))
+    inmanta.agent.config.agent_executor_retention_time.set(str(old_retention_value))
+
+
+async def test_executor_server(set_custom_executor_policy, mpmanager: MPManager, client, caplog):
     """
     Test the MPManager, this includes
 
@@ -83,12 +104,19 @@ async def test_executor_server(mpmanager: MPManager, client):
     3. communicate with it
     4. build up venv with requirements, source files, ...
     5. check that code is loaded correctly
+
+    Also test that an executor policy can be set:
+        - the agent_executor_cap option correctly stops the oldest executor.
+        - the agent_executor_retention_time option is used to clean up old executors.
     """
+
     with pytest.raises(ImportError):
         # make sure lorem isn't installed at the start of the test.
         import lorem  # noqa: F401
 
     manager = mpmanager
+    await manager.start()
+
     inmanta.config.Config.set("test", "aaa", "bbbb")
 
     # Simple empty venv
@@ -126,12 +154,22 @@ def test():
     res = await client.upload_file(id=server_content_hash, content=base64.b64encode(server_content).decode("ascii"))
     assert res.code == 200
 
+    # Dummy executor to test executor cap:
+    # Create this one first to make sure this is the one being stopped
+    # when the cap is reached
+    dummy = executor.ExecutorBlueprint(
+        pip_config=inmanta.data.PipConfig(use_system_config=True), requirements=["lorem"], sources=[direct]
+    )
+
+    oldest_executor = await manager.get_executor("agent2", "internal:", [executor.ResourceInstallSpec("test::Test", 5, dummy)])
+
     # Full config: 2 source files, one python dependency
     full = executor.ExecutorBlueprint(
         pip_config=inmanta.data.PipConfig(use_system_config=True), requirements=["lorem"], sources=[direct, via_server]
     )
     full_runner = await manager.get_executor("agent2", "internal:", [executor.ResourceInstallSpec("test::Test", 5, full)])
 
+    assert oldest_executor.executor_id in manager.agent_map["agent2"]
     # assert loaded
     result2 = await full_runner.connection.call(TestLoader())
     assert ["DIRECT", "server"] == result2
@@ -140,10 +178,32 @@ def test():
     assert await simplest.connection.call(GetName()) == "agent1"
     assert await full_runner.connection.call(GetName()) == "agent2"
 
+    # Request a third executor:
+    # The executor cap is reached -> check that the oldest executor got correctly stopped
+    dummy = executor.ExecutorBlueprint(
+        pip_config=inmanta.data.PipConfig(use_system_config=True), requirements=["lorem"], sources=[via_server]
+    )
+    with caplog.at_level(logging.DEBUG):
+        _ = await manager.get_executor("agent2", "internal:", [executor.ResourceInstallSpec("test::Test", 5, dummy)])
+        assert oldest_executor.executor_id not in manager.agent_map["agent2"]
+        log_contains(
+            caplog,
+            "inmanta.agent.forking_executor",
+            logging.DEBUG,
+            (
+                f"Reached executor cap for agent agent2. Stopping oldest executor "
+                f"{oldest_executor.executor_id.identity()} to make room for a new one."
+            ),
+        )
+
     # Assert shutdown and back up
     await mpmanager.stop_for_agent("agent2")
+    await retry_limited(lambda: len(manager.agent_map["agent2"]) == 0, 10)
+
     full_runner = await manager.get_executor("agent2", "internal:", [executor.ResourceInstallSpec("test::Test", 5, full)])
 
+    await retry_limited(lambda: len(manager.agent_map["agent2"]) == 1, 1)
+
     await simplest.stop()
     await simplest.join(2)
     with pytest.raises(ConnectionLost):
@@ -153,6 +213,18 @@ def test():
         # we aren't leaking into this venv
         import lorem  # noqa: F401, F811
 
+    async def check_automatic_clean_up() -> bool:
+        return len(manager.agent_map["agent2"]) == 0
+
+    with caplog.at_level(logging.DEBUG):
+        await retry_limited(check_automatic_clean_up, 10)
+        log_contains(
+            caplog,
+            "inmanta.agent.forking_executor",
+            logging.DEBUG,
+            (f"Stopping executor {full_runner.executor_id.identity()} because it was inactive for"),
+        )
+
 
 async def test_executor_server_dirty_shutdown(mpmanager: MPManager, caplog):
     manager = mpmanager
diff --git a/tests/test_config.py b/tests/test_config.py
index 38fee20df4..aaaf0e6fb7 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -22,6 +22,7 @@
 import socket
 
 import netifaces
+import pytest
 from tornado import netutil
 
 import inmanta.agent.config as cfg
@@ -347,3 +348,16 @@ def test_option_is_list_empty():
     option: Option = Option("test", "list", "default,values", "documentation", cfg.is_list)
     option.set("")
     assert option.get() == []
+
+
+def test_option_is_lower_bounded_int():
+    lower_bound = 1
+    option: Option = Option("test", "lb_int", lower_bound, "documentation", cfg.is_lower_bounded_int(lower_bound))
+    option.set("2")
+    assert option.get() == 2
+
+    option.set("0")
+    with pytest.raises(ValueError) as exc_info:
+        option.get()
+
+    assert f"Value can not be lower than {lower_bound}" in str(exc_info.value)