diff --git a/plurals/deliberation.py b/plurals/deliberation.py index eedc43b..b60c0da 100644 --- a/plurals/deliberation.py +++ b/plurals/deliberation.py @@ -115,56 +115,68 @@ class Moderator(Agent): """ def __init__( - self, - persona: Optional[str] = None, - system_instructions: Optional[str] = None, - combination_instructions: str = "default", - model: str = "gpt-4o", - task: Optional[str] = None, - kwargs: Optional[Dict] = None): + self, + persona: Optional[str] = None, + system_instructions: Optional[str] = None, + combination_instructions: str = "default", + model: str = "gpt-4o", + task: Optional[str] = None, + kwargs: Optional[Dict] = None, + ): if kwargs is None: kwargs = {} self.task = task - if system_instructions is not None and system_instructions != 'auto': + if system_instructions is not None and system_instructions != "auto": if "${task}" not in system_instructions: warnings.warn( - "System instructions usually contain the placeholder ${task} so Moderators know what task it is. Consider adding 'Here is the task: ${task}' to your system instructions.") + "System instructions usually contain the placeholder ${task} so Moderators know what task it is. Consider adding 'Here is the task: ${task}' to your system instructions." + ) # Case 1: if both persona and system_instructions are provided, raise a ValueError - if persona and system_instructions and system_instructions != 'auto': - raise ValueError( - "Cannot provide both persona and system instructions") + if persona and system_instructions and system_instructions != "auto": + raise ValueError("Cannot provide both persona and system instructions") # Case 2: if system_instructions is 'auto', generate system instructions using an LLM - if system_instructions == 'auto': + if system_instructions == "auto": self.model = model self.kwargs = kwargs - self.system_instructions = self.generate_system_instructions( - task=task) + self.system_instructions = self.generate_system_instructions(task=task) # Case 3: if only persona is provided, use persona with dummy persona template ${persona} if persona and not system_instructions: persona_template = "${persona}" - persona_value = DEFAULTS["moderator"]['persona'].get( - persona, persona) - super().__init__(persona=persona_value, persona_template=persona_template, model=model, kwargs=kwargs, - task=task) + persona_value = DEFAULTS["moderator"]["persona"].get(persona, persona) + super().__init__( + persona=persona_value, + persona_template=persona_template, + model=model, + kwargs=kwargs, + task=task, + ) # Case 4: if only system_instructions is provided, set system_instructions and set persona and persona_template to None elif system_instructions and not persona: - super().__init__(system_instructions=system_instructions, persona=None, persona_template=None, model=model, - kwargs=kwargs, task=task) + super().__init__( + system_instructions=system_instructions, + persona=None, + persona_template=None, + model=model, + kwargs=kwargs, + task=task, + ) # Case 5: if neither persona nor system_instructions are provided, set no system instructions. else: - super().__init__(system_instructions=None, model=model, kwargs=kwargs, - task=task) + super().__init__( + system_instructions=None, model=model, kwargs=kwargs, task=task + ) - self.combination_instructions = DEFAULTS["moderator"]['combination_instructions'].get(combination_instructions, - combination_instructions) + self.combination_instructions = DEFAULTS["moderator"][ + "combination_instructions" + ].get(combination_instructions, combination_instructions) def generate_system_instructions(self, task: str, max_tries: int = 10) -> str: """ @@ -186,27 +198,39 @@ def generate_system_instructions(self, task: str, max_tries: int = 10) -> str: """ for _ in range(max_tries): - prompt = (f"INSTRUCTIONS\nA moderator LLM will see responses for the following task: {task}. Generate " - f"system instructions for the moderator to best aggregate these responses after all responses are " - f"submitted. Return system instructions and nothing else. Instructions should be 50 words or " - f"less and start with 'System Instructions':\n" - f"System Instructions:") + prompt = ( + f"INSTRUCTIONS\nA moderator LLM will see responses for the following task: {task}. Generate " + f"system instructions for the moderator to best aggregate these responses after all responses are " + f"submitted. Return system instructions and nothing else. Instructions should be 50 words or " + f"less and start with 'System Instructions':\n" + f"System Instructions:" + ) try: - response = Agent(task=prompt, model=self.model, - kwargs=self.kwargs).process() + response = Agent( + task=prompt, model=self.model, kwargs=self.kwargs + ).process() # Check if the response starts with "System Instructions:" (case-insensitive and allows for spaces) # Remove the "System Instructions:" part and any leading/trailing whitespace - if re.match(r"^\s*system\s+instructions\s*:\s*", response, re.IGNORECASE): - system_instructions = re.sub(r"^\s*system\s+instructions\s*:\s*", "", response, - flags=re.IGNORECASE).strip() + if re.match( + r"^\s*system\s+instructions\s*:\s*", response, re.IGNORECASE + ): + system_instructions = re.sub( + r"^\s*system\s+instructions\s*:\s*", + "", + response, + flags=re.IGNORECASE, + ).strip() return system_instructions except Exception as e: print(f"Attempt failed with error: {e}") raise ValueError( - "Failed to generate valid system instructions after max tries.") + "Failed to generate valid system instructions after max tries." + ) - def generate_and_set_system_instructions(self, task: str, max_tries: int = 10) -> None: + def generate_and_set_system_instructions( + self, task: str, max_tries: int = 10 + ) -> None: """ Generate and set system instructions using an LLM and a task. This function will generate the system instructions and also set it as the system instructions for the moderator. @@ -221,8 +245,7 @@ def generate_and_set_system_instructions(self, task: str, max_tries: int = 10) - Sets: system_instructions (str): The system instructions for the moderator. """ - self.system_instructions = self.generate_system_instructions( - task, max_tries) + self.system_instructions = self.generate_system_instructions(task, max_tries) return self.system_instructions def _moderate_responses(self, responses: List[str]) -> str: @@ -236,16 +259,18 @@ def _moderate_responses(self, responses: List[str]) -> str: str: A combined response based on the moderator's instructions """ combined_responses_str = format_previous_responses(responses) - self.combination_instructions = SmartString(self.combination_instructions).format( + self.combination_instructions = SmartString( + self.combination_instructions + ).format( previous_responses=combined_responses_str, task=self.task, - avoid_double_period=True + avoid_double_period=True, ) if self.system_instructions: self.system_instructions = SmartString(self.system_instructions).format( task=self.task, previous_responses=combined_responses_str, - persona=self.persona + persona=self.persona, ) else: pass @@ -285,14 +310,15 @@ class AbstractStructure(ABC): """ def __init__( - self, - agents: List[Agent], - task: Optional[str] = None, - shuffle: bool = False, - cycles: int = 1, - last_n: int = 1000, - combination_instructions: Optional[str] = "default", - moderator: Optional[Moderator] = None): + self, + agents: List[Agent], + task: Optional[str] = None, + shuffle: bool = False, + cycles: int = 1, + last_n: int = 1000, + combination_instructions: Optional[str] = "default", + moderator: Optional[Moderator] = None, + ): self.defaults = DEFAULTS self.task = task @@ -326,13 +352,16 @@ def __init__( # populate the templates if self.moderator: self._set_moderator_task_description() - if self.moderator.system_instructions == 'auto': - self.moderator.system_instructions = self.moderator.generate_system_instructions( - self.moderator.task) + if self.moderator.system_instructions == "auto": + self.moderator.system_instructions = ( + self.moderator.generate_system_instructions(self.moderator.task) + ) self.moderator.task_description = self.task - self.moderator.persona = SmartString( - self.moderator.persona).format( - task=self.moderator.task) if self.moderator.persona else None + self.moderator.persona = ( + SmartString(self.moderator.persona).format(task=self.moderator.task) + if self.moderator.persona + else None + ) if shuffle: self.agents = random.sample(self.agents, len(self.agents)) @@ -344,9 +373,10 @@ def _set_combination_instructions(self) -> None: """ self.combination_instructions = SmartString( - self.defaults['combination_instructions'].get( - self.combination_instructions, - self.combination_instructions)) + self.defaults["combination_instructions"].get( + self.combination_instructions, self.combination_instructions + ) + ) for agent in self.agents: if agent.combination_instructions: @@ -367,10 +397,14 @@ def _set_agent_task_description(self) -> None: """ for agent in self.agents: if self.task: - if agent.task_description and agent.task_description.strip() != self.task.strip(): + if ( + agent.task_description + and agent.task_description.strip() != self.task.strip() + ): # Case 1: Task provided to both Structure and agents warnings.warn( - f"You provided a task to both the Structure and agents. Using agent's task description:'''\n\n{agent.task_description}'''\n\nEnsure this is what you want to happen.") + f"You provided a task to both the Structure and agents. Using agent's task description:'''\n\n{agent.task_description}'''\n\nEnsure this is what you want to happen." + ) agent.task_description = self.task else: # Case 3: Value provided to Structure but not agents @@ -379,12 +413,14 @@ def _set_agent_task_description(self) -> None: # Common operations for cases 1 and 3 agent.original_task_description = agent.task_description agent.system_instructions = SmartString( - agent.system_instructions).format(task=self.task) + agent.system_instructions + ).format(task=self.task) else: - if not agent.task_description or agent.task_description.strip() == '': + if not agent.task_description or agent.task_description.strip() == "": # Case 2: Value provided to neither agents nor chain raise ValueError( - "Error: You did not specify a task for agents or chain") + "Error: You did not specify a task for agents or chain" + ) else: # Case 4: Value provided to agents but not Structure pass # Use Agent's existing task description @@ -402,10 +438,14 @@ def _set_moderator_task_description(self) -> None: """ if self.moderator: if self.task: - if self.moderator.task and self.moderator.task.strip() != self.task.strip(): + if ( + self.moderator.task + and self.moderator.task.strip() != self.task.strip() + ): # Case 1: Task provided to both Structure and moderator warnings.warn( - f"You provided a different task to both the Structure and a Moderator. Using the Moderator's task description:'''\n\n{self.moderator.task}'''\n\nEnsure this is what you want to happen.") + f"You provided a different task to both the Structure and a Moderator. Using the Moderator's task description:'''\n\n{self.moderator.task}'''\n\nEnsure this is what you want to happen." + ) else: # Case 3: Value provided to Structure but not moderator @@ -413,14 +453,19 @@ def _set_moderator_task_description(self) -> None: # Common operations for cases 1 and 3 self.moderator.task_description = self.task - self.moderator.system_instructions = SmartString( - self.moderator.system_instructions).format( - task=self.task) if self.moderator.system_instructions else None + self.moderator.system_instructions = ( + SmartString(self.moderator.system_instructions).format( + task=self.task + ) + if self.moderator.system_instructions + else None + ) else: - if not self.moderator.task or self.moderator.task.strip() == '': + if not self.moderator.task or self.moderator.task.strip() == "": # Case 2: Value provided to neither moderator nor structure raise ValueError( - "Error: You did not specify a task for Moderator or Structure") + "Error: You did not specify a task for Moderator or Structure" + ) else: # Case 4: Value provided to moderator but not Structure pass # Use Moderator's existing task description @@ -432,7 +477,8 @@ def info(self) -> Dict[str, Any]: """ if not self.final_response: warnings.warn( - "The structure has not been processed yet so there are no responses.") + "The structure has not been processed yet so there are no responses." + ) result = { "structure_information": { "final_response": self.final_response, @@ -441,9 +487,11 @@ def info(self) -> Dict[str, Any]: "combination_instructions": self.combination_instructions, "moderated": self.moderated, "moderator_persona": self.moderator.persona if self.moderator else None, - "moderator_instructions": self.moderator.combination_instructions if self.moderator else None + "moderator_instructions": ( + self.moderator.combination_instructions if self.moderator else None + ), }, - "agent_information": [agent.info for agent in self.agents] + "agent_information": [agent.info for agent in self.agents], } return result @@ -452,8 +500,7 @@ def process(self) -> None: """ Abstract method for processing agents. Must be implemented in a subclass. """ - raise NotImplementedError( - "This method must be implemented in a subclass") + raise NotImplementedError("This method must be implemented in a subclass") def __repr__(self): return pformat(self.info, indent=2) @@ -495,18 +542,21 @@ def process(self): self.agents = random.sample(self.agents, len(self.agents)) for agent in self.agents: agent.current_task_description = None - previous_responses_slice = previous_responses[-self.last_n:] + previous_responses_slice = previous_responses[-self.last_n :] previous_responses_str = format_previous_responses( - previous_responses_slice) - agent.combination_instructions = agent.combination_instructions if agent.combination_instructions else self.combination_instructions - response = agent.process( - previous_responses=previous_responses_str) + previous_responses_slice + ) + agent.combination_instructions = ( + agent.combination_instructions + if agent.combination_instructions + else self.combination_instructions + ) + response = agent.process(previous_responses=previous_responses_str) previous_responses.append(response) self.responses.append(response) if self.moderated and self.moderator: - moderated_response = self.moderator._moderate_responses( - self.responses) + moderated_response = self.moderator._moderate_responses(self.responses) self.responses.append(moderated_response) self.final_response = self.responses[-1] @@ -539,15 +589,17 @@ def process(self): for agent in self.agents: previous_responses_str = "" agent.combination_instructions = self.combination_instructions - futures.append(executor.submit( - agent.process, previous_responses=previous_responses_str)) + futures.append( + executor.submit( + agent.process, previous_responses=previous_responses_str + ) + ) for future in as_completed(futures): response = future.result() self.responses.append(response) if self.moderated and self.moderator: - moderated_response = self.moderator._moderate_responses( - self.responses) + moderated_response = self.moderator._moderate_responses(self.responses) self.responses.append(moderated_response) self.final_response = self.responses[-1] @@ -579,18 +631,25 @@ class Debate(AbstractStructure): """ def __init__( - self, - agents: List[Agent], - task: Optional[str] = None, - shuffle: bool = False, - cycles: int = 1, - last_n: int = 1000, - combination_instructions: Optional[str] = "debate", - moderator: Optional[Moderator] = None): + self, + agents: List[Agent], + task: Optional[str] = None, + shuffle: bool = False, + cycles: int = 1, + last_n: int = 1000, + combination_instructions: Optional[str] = "debate", + moderator: Optional[Moderator] = None, + ): if len(agents) != 2: raise ValueError("Debate requires exactly two agents.") super().__init__( - agents=agents, task=task, shuffle=shuffle, cycles=cycles, last_n=last_n, moderator=moderator) + agents=agents, + task=task, + shuffle=shuffle, + cycles=cycles, + last_n=last_n, + moderator=moderator, + ) self.combination_instructions = combination_instructions @staticmethod @@ -605,10 +664,7 @@ def _format_previous_responses(responses: List[str]) -> str: if not responses: return "" else: - resp_list = [ - "\n{}".format( - responses[i]) for i in range( - len(responses))] + resp_list = ["\n{}".format(responses[i]) for i in range(len(responses))] return "".join(resp_list).strip() @staticmethod @@ -617,7 +673,9 @@ def _strip_placeholders(response: str) -> str: Strip placeholders from the response. These placeholders are used to indicate the speaker in the debate, but sometimes LLMs add them to the response. This function removes them. """ - return response.replace("[WHAT YOU SAID]: ", "").replace("[WHAT OTHER PARTICIPANT SAID]: ", "") + return response.replace("[WHAT YOU SAID]: ", "").replace( + "[WHAT OTHER PARTICIPANT SAID]: ", "" + ) def process(self): """ @@ -644,32 +702,36 @@ def process(self): # index if i == 0: previous_responses_str = self._format_previous_responses( - previous_responses_agent1[-self.last_n:]) + previous_responses_agent1[-self.last_n :] + ) else: previous_responses_str = self._format_previous_responses( - previous_responses_agent2[-self.last_n:]) - - agent.combination_instructions = agent.combination_instructions if agent.combination_instructions else self.combination_instructions - response = agent.process( - previous_responses=previous_responses_str) + previous_responses_agent2[-self.last_n :] + ) + + agent.combination_instructions = ( + agent.combination_instructions + if agent.combination_instructions + else self.combination_instructions + ) + response = agent.process(previous_responses=previous_responses_str) response = self._strip_placeholders(response) self.responses.append("[Debater {}] ".format(i + 1) + response) # Apply the correct prefix and update both lists if i == 0: - previous_responses_agent1.append( - f"[WHAT YOU SAID]: {response}") + previous_responses_agent1.append(f"[WHAT YOU SAID]: {response}") previous_responses_agent2.append( - f"[WHAT OTHER PARTICIPANT SAID]: {response}") + f"[WHAT OTHER PARTICIPANT SAID]: {response}" + ) else: - previous_responses_agent2.append( - f"[WHAT YOU SAID]: {response}") + previous_responses_agent2.append(f"[WHAT YOU SAID]: {response}") previous_responses_agent1.append( - f"[WHAT OTHER PARTICIPANT SAID]: {response}") + f"[WHAT OTHER PARTICIPANT SAID]: {response}" + ) if self.moderated and self.moderator: - moderated_response = self.moderator._moderate_responses( - self.responses) + moderated_response = self.moderator._moderate_responses(self.responses) self.responses.append(moderated_response) self.final_response = self.responses[-1] @@ -728,13 +790,15 @@ class Graph(AbstractStructure): graph.process() """ - def __init__(self, - agents: List[Agent], - edges: List[tuple], - task: Optional[str] = None, - last_n: int = 1000, - combination_instructions: Optional[str] = "default", - moderator: Optional[Moderator] = None): + def __init__( + self, + agents: List[Agent], + edges: List[tuple], + task: Optional[str] = None, + last_n: int = 1000, + combination_instructions: Optional[str] = "default", + moderator: Optional[Moderator] = None, + ): """ Args: agents (Union[List[Agent], Dict[str, Agent]]): A list or dictionary of agents to be included in the structure. @@ -765,8 +829,9 @@ def __init__(self, self.agents = agents self.edges = edges - super().__init__(agents=self.agents, task=task, last_n=last_n, - moderator=moderator) + super().__init__( + agents=self.agents, task=task, last_n=last_n, moderator=moderator + ) self._build_graph() def _build_graph(self): @@ -812,7 +877,8 @@ def process(self): # Initialize the queue with agents that have in-degree 0 zero_in_degree_queue = collections.deque( - [agent for agent in self.agents if self.in_degree[agent] == 0]) + [agent for agent in self.agents if self.in_degree[agent] == 0] + ) topological_order = [] # Kahn's Algorithm @@ -833,17 +899,22 @@ def process(self): if len(topological_order) != len(self.agents): raise ValueError( - "There is a cycle in the graph!!! This is not allowed in a DAG.") + "There is a cycle in the graph!!! This is not allowed in a DAG." + ) # Process agents according to topological order response_dict = {} for agent in topological_order: - agent.combination_instructions = agent.combination_instructions if agent.combination_instructions else self.combination_instructions + agent.combination_instructions = ( + agent.combination_instructions + if agent.combination_instructions + else self.combination_instructions + ) # Gather responses from all predecessors to form the input for the current agent - previous_responses = [response_dict[pred] - for pred in self.agents if agent in self.graph[pred]] - previous_responses_str = format_previous_responses( - previous_responses) + previous_responses = [ + response_dict[pred] for pred in self.agents if agent in self.graph[pred] + ] + previous_responses_str = format_previous_responses(previous_responses) response = agent.process(previous_responses=previous_responses_str) response_dict[agent] = response self.responses.append(response) @@ -852,7 +923,8 @@ def process(self): if self.moderated and self.moderator: original_task = self.agents[0].original_task_description moderated_response = self.moderator._moderate_responses( - list(response_dict.values())) + list(response_dict.values()) + ) self.responses.append(moderated_response) self.final_response = moderated_response self.final_response = self.responses[-1] @@ -862,33 +934,43 @@ def process(self): def _validate_input_format(agents, edges): # Check #1: Check if agents and edges are in the correct format - if isinstance(agents, list) and all(isinstance(agent, Agent) for agent in agents) and all( - isinstance(edge, tuple) for edge in edges): + if ( + isinstance(agents, list) + and all(isinstance(agent, Agent) for agent in agents) + and all(isinstance(edge, tuple) for edge in edges) + ): pass - elif isinstance(agents, dict) and all(isinstance(agent, Agent) for agent in agents.values()) and all( - isinstance(edge, tuple) for edge in edges): + elif ( + isinstance(agents, dict) + and all(isinstance(agent, Agent) for agent in agents.values()) + and all(isinstance(edge, tuple) for edge in edges) + ): pass else: - raise ValueError("The agents and edges must be in the correct format. See the documentation for more " - "information. Either (Method 1) `agents` is a list of Agents and `edges` is a list of " - "tuples corresponding to the indices of agents, (src_idx, dest_idx). Or, (Method 2) " - "`agents` is a dictionary of Agents and edges is a list of tuples corresponding to the " - "names of agents (src_agent_name, dest_agent_name).") + raise ValueError( + "The agents and edges must be in the correct format. See the documentation for more " + "information. Either (Method 1) `agents` is a list of Agents and `edges` is a list of " + "tuples corresponding to the indices of agents, (src_idx, dest_idx). Or, (Method 2) " + "`agents` is a dictionary of Agents and edges is a list of tuples corresponding to the " + "names of agents (src_agent_name, dest_agent_name)." + ) # If Method 1: Check if edge indices are within the range of agents if isinstance(agents, list): for src_idx, dst_idx in edges: if src_idx >= len(agents) or dst_idx >= len(agents): - raise ValueError( - "Edge indices must be within the range of agents.") + raise ValueError("Edge indices must be within the range of agents.") if src_idx == dst_idx: - raise ValueError( - "Self loops are not allowed in the graph.") + raise ValueError("Self loops are not allowed in the graph.") # If Method 2: Check if agent names in edges are keys in the agent dictionary if isinstance(agents, dict): agent_names = list(agents.keys()) for src_agent_name, dst_agent_name in edges: - if src_agent_name not in agent_names or dst_agent_name not in agent_names: + if ( + src_agent_name not in agent_names + or dst_agent_name not in agent_names + ): raise ValueError( - "Agent names in edges must be keys in the agent dictionary.") + "Agent names in edges must be keys in the agent dictionary." + )