Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass fitness function moran process #1198

Merged
merged 8 commits into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 67 additions & 35 deletions axelrod/moran.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
"""Implementation of the Moran process on Graphs."""

from collections import Counter
import random
from collections import Counter
from typing import Callable, List, Optional, Set, Tuple

import matplotlib.pyplot as plt
import numpy as np

from axelrod import DEFAULT_TURNS, Player, Game
from axelrod import DEFAULT_TURNS, Game, Player

from .deterministic_cache import DeterministicCache
from .graph import complete_graph, Graph
from .graph import Graph, complete_graph
from .match import Match
from .random_ import randrange

from typing import List, Tuple, Set, Optional


def fitness_proportionate_selection(scores: List) -> int:
def fitness_proportionate_selection(
scores: List, fitness_transformation: Callable = None
) -> int:
"""Randomly selects an individual proportionally to score.

Parameters
----------
scores: Any sequence of real numbers
fitness_transformation: A function mapping a score to a (non-negative) float

Returns
-------
An index of the above list selected at random proportionally to the list
element divided by the total.
"""
csums = np.cumsum(scores)
if fitness_transformation is None:
csums = np.cumsum(scores)
else:
csums = np.cumsum([fitness_transformation(s) for s in scores])
total = csums[-1]
r = random.random() * total

Expand All @@ -38,13 +44,20 @@ def fitness_proportionate_selection(scores: List) -> int:


class MoranProcess(object):
def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS,
prob_end: float = None, noise: float = 0,
game: Game = None,
deterministic_cache: DeterministicCache = None,
mutation_rate: float = 0., mode: str = 'bd',
interaction_graph: Graph = None,
reproduction_graph: Graph = None) -> None:
def __init__(
self,
players: List[Player],
turns: int = DEFAULT_TURNS,
prob_end: float = None,
noise: float = 0,
game: Game = None,
deterministic_cache: DeterministicCache = None,
mutation_rate: float = 0.,
mode: str = "bd",
interaction_graph: Graph = None,
reproduction_graph: Graph = None,
fitness_transformation: Callable = None,
) -> None:
"""
An agent based Moran process class. In each round, each player plays a
Match with each other player. Players are assigned a fitness score by
Expand Down Expand Up @@ -92,6 +105,8 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS,
reproduction_graph: Axelrod.graph.Graph
The reproduction graph, set equal to the interaction graph if not
given
fitness_transformation:
A function mapping a score to a (non-negative) float
"""
self.turns = turns
self.prob_end = prob_end
Expand All @@ -107,7 +122,7 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS,
assert (mutation_rate >= 0) and (mutation_rate <= 1)
assert (noise >= 0) and (noise <= 1)
mode = mode.lower()
assert mode in ['bd', 'db']
assert mode in ["bd", "db"]
self.mode = mode
if deterministic_cache is not None:
self.deterministic_cache = deterministic_cache
Expand All @@ -129,19 +144,22 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS,
if interaction_graph is None:
interaction_graph = complete_graph(len(players), loops=False)
if reproduction_graph is None:
reproduction_graph = Graph(interaction_graph.edges(),
directed=interaction_graph.directed)
reproduction_graph = Graph(
interaction_graph.edges(), directed=interaction_graph.directed
)
reproduction_graph.add_loops()
# Check equal vertices
v1 = interaction_graph.vertices()
v2 = reproduction_graph.vertices()
assert list(v1) == list(v2)
self.interaction_graph = interaction_graph
self.reproduction_graph = reproduction_graph
self.fitness_transformation = fitness_transformation
# Map players to graph vertices
self.locations = sorted(interaction_graph.vertices())
self.index = dict(zip(sorted(interaction_graph.vertices()),
range(len(players))))
self.index = dict(
zip(sorted(interaction_graph.vertices()), range(len(players)))
)

def set_players(self) -> None:
"""Copy the initial players into the first population."""
Expand Down Expand Up @@ -192,8 +210,9 @@ def death(self, index: int = None) -> int:
else:
# Select locally
# index is not None in this case
vertex = random.choice(sorted(
self.reproduction_graph.out_vertices(self.locations[index])))
vertex = random.choice(
sorted(self.reproduction_graph.out_vertices(self.locations[index]))
)
i = self.index[vertex]
return i

Expand All @@ -212,11 +231,15 @@ def birth(self, index: int = None) -> int:
# possible choices
scores.pop(index)
# Make sure to get the correct index post-pop
j = fitness_proportionate_selection(scores)
j = fitness_proportionate_selection(
scores, fitness_transformation=self.fitness_transformation
)
if j >= index:
j += 1
else:
j = fitness_proportionate_selection(scores)
j = fitness_proportionate_selection(
scores, fitness_transformation=self.fitness_transformation
)
return j

def fixation_check(self) -> bool:
Expand Down Expand Up @@ -321,11 +344,14 @@ def score_all(self) -> List:
for i, j in self._matchup_indices():
player1 = self.players[i]
player2 = self.players[j]
match = Match((player1, player2),
turns=self.turns, prob_end=self.prob_end,
noise=self.noise,
game=self.game,
deterministic_cache=self.deterministic_cache)
match = Match(
(player1, player2),
turns=self.turns,
prob_end=self.prob_end,
noise=self.noise,
game=self.game,
deterministic_cache=self.deterministic_cache,
)
match.play()
match_scores = match.final_score_per_turn()
scores[i] += match_scores[0]
Expand Down Expand Up @@ -373,7 +399,8 @@ def play(self) -> List[Counter]:
if self.mutation_rate != 0:
raise ValueError(
"MoranProcess.play() will never exit if mutation_rate is"
"nonzero. Use iteration instead.")
"nonzero. Use iteration instead."
)
while True:
try:
self.__next__()
Expand Down Expand Up @@ -435,8 +462,10 @@ class ApproximateMoranProcess(MoranProcess):
Instead of playing the matches, the result is sampled
from a dictionary of player tuples to distribution of match outcomes
"""
def __init__(self, players: List[Player], cached_outcomes: dict,
mutation_rate: float = 0) -> None:

def __init__(
self, players: List[Player], cached_outcomes: dict, mutation_rate: float = 0
) -> None:
"""
Parameters
----------
Expand All @@ -448,8 +477,12 @@ def __init__(self, players: List[Player], cached_outcomes: dict,
probability `mutation_rate`
"""
super(ApproximateMoranProcess, self).__init__(
players, turns=0, noise=0, deterministic_cache=None,
mutation_rate=mutation_rate)
players,
turns=0,
noise=0,
deterministic_cache=None,
mutation_rate=mutation_rate,
)
self.cached_outcomes = cached_outcomes

def score_all(self) -> List:
Expand All @@ -466,8 +499,7 @@ def score_all(self) -> List:
scores = [0] * N
for i in range(N):
for j in range(i + 1, N):
player_names = tuple([str(self.players[i]),
str(self.players[j])])
player_names = tuple([str(self.players[i]), str(self.players[j])])

cached_score = self._get_scores_from_cache(player_names)
scores[i] += cached_score[0]
Expand Down
Loading