diff --git a/axelrod/moran.py b/axelrod/moran.py index ba7268180..24693007a 100644 --- a/axelrod/moran.py +++ b/axelrod/moran.py @@ -1,33 +1,39 @@ """Implementation of the Moran process on Graphs.""" -from collections import Counter import random +from collections import Counter +from typing import Callable, List, Optional, Set, Tuple import matplotlib.pyplot as plt import numpy as np -from axelrod import DEFAULT_TURNS, Player, Game +from axelrod import DEFAULT_TURNS, Game, Player + from .deterministic_cache import DeterministicCache -from .graph import complete_graph, Graph +from .graph import Graph, complete_graph from .match import Match from .random_ import randrange -from typing import List, Tuple, Set, Optional - -def fitness_proportionate_selection(scores: List) -> int: +def fitness_proportionate_selection( + scores: List, fitness_transformation: Callable = None +) -> int: """Randomly selects an individual proportionally to score. Parameters ---------- scores: Any sequence of real numbers + fitness_transformation: A function mapping a score to a (non-negative) float Returns ------- An index of the above list selected at random proportionally to the list element divided by the total. """ - csums = np.cumsum(scores) + if fitness_transformation is None: + csums = np.cumsum(scores) + else: + csums = np.cumsum([fitness_transformation(s) for s in scores]) total = csums[-1] r = random.random() * total @@ -38,13 +44,20 @@ def fitness_proportionate_selection(scores: List) -> int: class MoranProcess(object): - def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS, - prob_end: float = None, noise: float = 0, - game: Game = None, - deterministic_cache: DeterministicCache = None, - mutation_rate: float = 0., mode: str = 'bd', - interaction_graph: Graph = None, - reproduction_graph: Graph = None) -> None: + def __init__( + self, + players: List[Player], + turns: int = DEFAULT_TURNS, + prob_end: float = None, + noise: float = 0, + game: Game = None, + deterministic_cache: DeterministicCache = None, + mutation_rate: float = 0., + mode: str = "bd", + interaction_graph: Graph = None, + reproduction_graph: Graph = None, + fitness_transformation: Callable = None, + ) -> None: """ An agent based Moran process class. In each round, each player plays a Match with each other player. Players are assigned a fitness score by @@ -92,6 +105,8 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS, reproduction_graph: Axelrod.graph.Graph The reproduction graph, set equal to the interaction graph if not given + fitness_transformation: + A function mapping a score to a (non-negative) float """ self.turns = turns self.prob_end = prob_end @@ -107,7 +122,7 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS, assert (mutation_rate >= 0) and (mutation_rate <= 1) assert (noise >= 0) and (noise <= 1) mode = mode.lower() - assert mode in ['bd', 'db'] + assert mode in ["bd", "db"] self.mode = mode if deterministic_cache is not None: self.deterministic_cache = deterministic_cache @@ -129,8 +144,9 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS, if interaction_graph is None: interaction_graph = complete_graph(len(players), loops=False) if reproduction_graph is None: - reproduction_graph = Graph(interaction_graph.edges(), - directed=interaction_graph.directed) + reproduction_graph = Graph( + interaction_graph.edges(), directed=interaction_graph.directed + ) reproduction_graph.add_loops() # Check equal vertices v1 = interaction_graph.vertices() @@ -138,10 +154,12 @@ def __init__(self, players: List[Player], turns: int = DEFAULT_TURNS, assert list(v1) == list(v2) self.interaction_graph = interaction_graph self.reproduction_graph = reproduction_graph + self.fitness_transformation = fitness_transformation # Map players to graph vertices self.locations = sorted(interaction_graph.vertices()) - self.index = dict(zip(sorted(interaction_graph.vertices()), - range(len(players)))) + self.index = dict( + zip(sorted(interaction_graph.vertices()), range(len(players))) + ) def set_players(self) -> None: """Copy the initial players into the first population.""" @@ -192,8 +210,9 @@ def death(self, index: int = None) -> int: else: # Select locally # index is not None in this case - vertex = random.choice(sorted( - self.reproduction_graph.out_vertices(self.locations[index]))) + vertex = random.choice( + sorted(self.reproduction_graph.out_vertices(self.locations[index])) + ) i = self.index[vertex] return i @@ -212,11 +231,15 @@ def birth(self, index: int = None) -> int: # possible choices scores.pop(index) # Make sure to get the correct index post-pop - j = fitness_proportionate_selection(scores) + j = fitness_proportionate_selection( + scores, fitness_transformation=self.fitness_transformation + ) if j >= index: j += 1 else: - j = fitness_proportionate_selection(scores) + j = fitness_proportionate_selection( + scores, fitness_transformation=self.fitness_transformation + ) return j def fixation_check(self) -> bool: @@ -321,11 +344,14 @@ def score_all(self) -> List: for i, j in self._matchup_indices(): player1 = self.players[i] player2 = self.players[j] - match = Match((player1, player2), - turns=self.turns, prob_end=self.prob_end, - noise=self.noise, - game=self.game, - deterministic_cache=self.deterministic_cache) + match = Match( + (player1, player2), + turns=self.turns, + prob_end=self.prob_end, + noise=self.noise, + game=self.game, + deterministic_cache=self.deterministic_cache, + ) match.play() match_scores = match.final_score_per_turn() scores[i] += match_scores[0] @@ -373,7 +399,8 @@ def play(self) -> List[Counter]: if self.mutation_rate != 0: raise ValueError( "MoranProcess.play() will never exit if mutation_rate is" - "nonzero. Use iteration instead.") + "nonzero. Use iteration instead." + ) while True: try: self.__next__() @@ -435,8 +462,10 @@ class ApproximateMoranProcess(MoranProcess): Instead of playing the matches, the result is sampled from a dictionary of player tuples to distribution of match outcomes """ - def __init__(self, players: List[Player], cached_outcomes: dict, - mutation_rate: float = 0) -> None: + + def __init__( + self, players: List[Player], cached_outcomes: dict, mutation_rate: float = 0 + ) -> None: """ Parameters ---------- @@ -448,8 +477,12 @@ def __init__(self, players: List[Player], cached_outcomes: dict, probability `mutation_rate` """ super(ApproximateMoranProcess, self).__init__( - players, turns=0, noise=0, deterministic_cache=None, - mutation_rate=mutation_rate) + players, + turns=0, + noise=0, + deterministic_cache=None, + mutation_rate=mutation_rate, + ) self.cached_outcomes = cached_outcomes def score_all(self) -> List: @@ -466,8 +499,7 @@ def score_all(self) -> List: scores = [0] * N for i in range(N): for j in range(i + 1, N): - player_names = tuple([str(self.players[i]), - str(self.players[j])]) + player_names = tuple([str(self.players[i]), str(self.players[j])]) cached_score = self._get_scores_from_cache(player_names) scores[i] += cached_score[0] diff --git a/axelrod/tests/unit/test_moran.py b/axelrod/tests/unit/test_moran.py index c50571878..120d4517a 100644 --- a/axelrod/tests/unit/test_moran.py +++ b/axelrod/tests/unit/test_moran.py @@ -1,22 +1,20 @@ -from collections import Counter import itertools import random import unittest - -from hypothesis import given, example, settings +from collections import Counter import matplotlib.pyplot as plt import axelrod -from axelrod import MoranProcess, ApproximateMoranProcess, Pdf +from axelrod import ApproximateMoranProcess, MoranProcess, Pdf from axelrod.moran import fitness_proportionate_selection from axelrod.tests.property import strategy_lists +from hypothesis import example, given, settings C, D = axelrod.Action.C, axelrod.Action.D class TestMoranProcess(unittest.TestCase): - def test_init(self): players = axelrod.Cooperator(), axelrod.Defector() mp = MoranProcess(players) @@ -26,17 +24,22 @@ def test_init(self): self.assertEqual(mp.noise, 0) self.assertEqual(mp.initial_players, players) self.assertEqual(mp.players, list(players)) - self.assertEqual(mp.populations, - [Counter({'Cooperator': 1, 'Defector': 1})]) + self.assertEqual( + mp.populations, [Counter({"Cooperator": 1, "Defector": 1})] + ) self.assertIsNone(mp.winning_strategy_name) self.assertEqual(mp.mutation_rate, 0) - self.assertEqual(mp.mode, 'bd') + self.assertEqual(mp.mode, "bd") self.assertEqual(mp.deterministic_cache, axelrod.DeterministicCache()) - self.assertEqual(mp.mutation_targets, - {'Cooperator': [players[1]], 'Defector': [players[0]]}) + self.assertEqual( + mp.mutation_targets, + {"Cooperator": [players[1]], "Defector": [players[0]]}, + ) self.assertEqual(mp.interaction_graph._edges, [(0, 1), (1, 0)]) - self.assertEqual(mp.reproduction_graph._edges, - [(0, 1), (1, 0), (0, 0), (1, 1)]) + self.assertEqual( + mp.reproduction_graph._edges, [(0, 1), (1, 0), (0, 0), (1, 1)] + ) + self.assertEqual(mp.fitness_transformation, None) self.assertEqual(mp.locations, [0, 1]) self.assertEqual(mp.index, {0: 0, 1: 1}) @@ -46,11 +49,14 @@ def test_init(self): graph = axelrod.graph.Graph(edges, directed=True) mp = MoranProcess(players, interaction_graph=graph) self.assertEqual(mp.interaction_graph._edges, [(0, 1), (2, 0), (1, 2)]) - self.assertEqual(sorted(mp.reproduction_graph._edges), - sorted([(0, 1), (2, 0), (1, 2), (0, 0), (1, 1), (2, 2)])) - - mp = MoranProcess(players, interaction_graph=graph, - reproduction_graph=graph) + self.assertEqual( + sorted(mp.reproduction_graph._edges), + sorted([(0, 1), (2, 0), (1, 2), (0, 0), (1, 1), (2, 2)]), + ) + + mp = MoranProcess( + players, interaction_graph=graph, reproduction_graph=graph + ) self.assertEqual(mp.interaction_graph._edges, [(0, 1), (2, 0), (1, 2)]) self.assertEqual(mp.reproduction_graph._edges, [(0, 1), (2, 0), (1, 2)]) @@ -182,7 +188,7 @@ def test_death_birth(self): seeds = range(0, 20) for seed in seeds: axelrod.seed(seed) - mp = MoranProcess((p1, p2), mode='db') + mp = MoranProcess((p1, p2), mode="db") next(mp) self.assertIsNotNone(mp.winning_strategy_name) @@ -197,11 +203,11 @@ def test_death_birth_outcomes(self): players.append(axelrod.Defector()) for seed, outcome in seeds: axelrod.seed(seed) - mp = MoranProcess(players, mode='bd') + mp = MoranProcess(players, mode="bd") mp.play() winner = mp.winning_strategy_name axelrod.seed(seed) - mp = MoranProcess(players, mode='db') + mp = MoranProcess(players, mode="db") mp.play() winner2 = mp.winning_strategy_name self.assertEqual((winner == winner2), outcome) @@ -220,18 +226,21 @@ def test_two_players_with_mutation(self): p1, p2 = axelrod.Cooperator(), axelrod.Defector() axelrod.seed(5) mp = MoranProcess((p1, p2), mutation_rate=0.2) - self.assertDictEqual(mp.mutation_targets, - {str(p1): [p2], str(p2): [p1]}) + self.assertDictEqual( + mp.mutation_targets, {str(p1): [p2], str(p2): [p1]} + ) # Test that mutation causes the population to alternate between # fixations counters = [ - Counter({'Cooperator': 2}), - Counter({'Defector': 2}), - Counter({'Cooperator': 2}), - Counter({'Defector': 2}) + Counter({"Cooperator": 2}), + Counter({"Defector": 2}), + Counter({"Cooperator": 2}), + Counter({"Defector": 2}), ] for counter in counters: - for _ in itertools.takewhile(lambda x: x.population_distribution() != counter, mp): + for _ in itertools.takewhile( + lambda x: x.population_distribution() != counter, mp + ): pass self.assertEqual(mp.population_distribution(), counter) @@ -242,8 +251,11 @@ def test_play_exception(self): mp.play() def test_three_players(self): - players = [axelrod.Cooperator(), axelrod.Cooperator(), - axelrod.Defector()] + players = [ + axelrod.Cooperator(), + axelrod.Cooperator(), + axelrod.Defector(), + ] axelrod.seed(11) mp = MoranProcess(players) populations = mp.play() @@ -258,16 +270,17 @@ def test_three_players_with_mutation(self): p3 = axelrod.Defector() players = [p1, p2, p3] mp = MoranProcess(players, mutation_rate=0.2) - self.assertDictEqual(mp.mutation_targets, { - str(p1): [p3, p2], str(p2): [p1, p3], str(p3): [p1, p2]}) + self.assertDictEqual( + mp.mutation_targets, + {str(p1): [p3, p2], str(p2): [p1, p3], str(p3): [p1, p2]}, + ) # Test that mutation causes the population to alternate between # fixations - counters = [ - Counter({'Cooperator': 3}), - Counter({'Defector': 3}), - ] + counters = [Counter({"Cooperator": 3}), Counter({"Defector": 3})] for counter in counters: - for _ in itertools.takewhile(lambda x: x.population_distribution() != counter, mp): + for _ in itertools.takewhile( + lambda x: x.population_distribution() != counter, mp + ): pass self.assertEqual(mp.population_distribution(), counter) @@ -314,8 +327,12 @@ def test_reset(self): def test_constant_fitness_case(self): # Scores between an Alternator and Defector will be: (1, 6) axelrod.seed(0) - players = (axelrod.Alternator(), axelrod.Alternator(), - axelrod.Defector(), axelrod.Defector()) + players = ( + axelrod.Alternator(), + axelrod.Alternator(), + axelrod.Defector(), + axelrod.Defector(), + ) mp = MoranProcess(players, turns=2) winners = [] for _ in range(100): @@ -357,9 +374,22 @@ def test_population_plot(self): self.assertEqual(ax.get_xlim(), (-0.8, 16.8)) self.assertEqual(ax.get_ylim(), (0, 5.25)) + def test_cooperator_can_win_with_fitness_transformation(self): + axelrod.seed(689) + players = ( + axelrod.Cooperator(), + axelrod.Defector(), + axelrod.Defector(), + axelrod.Defector(), + ) + w = 0.95 + fitness_transformation = lambda score: 1 - w + w * score + mp = MoranProcess(players, turns=10, fitness_transformation=fitness_transformation) + populations = mp.play() + self.assertEqual(mp.winning_strategy_name, "Cooperator") + class GraphMoranProcess(unittest.TestCase): - def test_complete(self): """A complete graph should produce the same results as the default case.""" @@ -417,13 +447,15 @@ def test_asymmetry(self): players.append(axelrod.Defector()) for seed, outcome in seeds: axelrod.seed(seed) - mp = MoranProcess(players, interaction_graph=graph1, - reproduction_graph=graph2) + mp = MoranProcess( + players, interaction_graph=graph1, reproduction_graph=graph2 + ) mp.play() winner = mp.winning_strategy_name axelrod.seed(seed) - mp = MoranProcess(players, interaction_graph=graph2, - reproduction_graph=graph1) + mp = MoranProcess( + players, interaction_graph=graph2, reproduction_graph=graph1 + ) mp.play() winner2 = mp.winning_strategy_name self.assertEqual((winner == winner2), outcome) @@ -441,11 +473,11 @@ def test_cycle_death_birth(self): players.append(axelrod.Defector()) for seed, outcome in seeds: axelrod.seed(seed) - mp = MoranProcess(players, interaction_graph=graph, mode='bd') + mp = MoranProcess(players, interaction_graph=graph, mode="bd") mp.play() winner = mp.winning_strategy_name axelrod.seed(seed) - mp = MoranProcess(players, interaction_graph=graph, mode='db') + mp = MoranProcess(players, interaction_graph=graph, mode="db") mp.play() winner2 = mp.winning_strategy_name self.assertEqual((winner == winner2), outcome) @@ -453,29 +485,36 @@ def test_cycle_death_birth(self): class TestApproximateMoranProcess(unittest.TestCase): """A suite of tests for the ApproximateMoranProcess""" + players = [axelrod.Cooperator(), axelrod.Defector()] cached_outcomes = {} counter = Counter([(0, 5)]) pdf = Pdf(counter) - cached_outcomes[('Cooperator', 'Defector')] = pdf + cached_outcomes[("Cooperator", "Defector")] = pdf counter = Counter([(3, 3)]) pdf = Pdf(counter) - cached_outcomes[('Cooperator', 'Cooperator')] = pdf + cached_outcomes[("Cooperator", "Cooperator")] = pdf counter = Counter([(1, 1)]) pdf = Pdf(counter) - cached_outcomes[('Defector', 'Defector')] = pdf + cached_outcomes[("Defector", "Defector")] = pdf amp = ApproximateMoranProcess(players, cached_outcomes) def test_init(self): """Test the initialisation process""" - self.assertEqual(set(self.amp.cached_outcomes.keys()), - set([('Cooperator', 'Defector'), - ('Cooperator', 'Cooperator'), - ('Defector', 'Defector')])) + self.assertEqual( + set(self.amp.cached_outcomes.keys()), + set( + [ + ("Cooperator", "Defector"), + ("Cooperator", "Cooperator"), + ("Defector", "Defector"), + ] + ), + ) self.assertEqual(self.amp.players, self.players) self.assertEqual(self.amp.turns, 0) self.assertEqual(self.amp.noise, 0) diff --git a/docs/reference/bibliography.rst b/docs/reference/bibliography.rst index bb97f4930..2dd38cce1 100644 --- a/docs/reference/bibliography.rst +++ b/docs/reference/bibliography.rst @@ -47,6 +47,7 @@ documentation. .. [Nowak1990] Nowak, M., & Sigmund, K. (1990). The evolution of stochastic strategies in the Prisoner's Dilemma. Acta Applicandae Mathematica. https://link.springer.com/article/10.1007/BF00049570 .. [Nowak1992] Nowak, M.., & May, R. M. (1992). Evolutionary games and spatial chaos. Nature. http://doi.org/10.1038/359826a0 .. [Nowak1993] Nowak, M., & Sigmund, K. (1993). A strategy of win-stay, lose-shift that outperforms tit-for-tat in the Prisoner’s Dilemma game. Nature, 364(6432), 56–58. http://doi.org/10.1038/364056a0 +.. [Ohtsuki2006] Ohtsuki, Hisashi, et al. "A simple rule for the evolution of cooperation on graphs and social networks." Nature 441.7092 (2006): 502. .. [PD2017] http://www.prisoners-dilemma.com/competition.html (Accessed: 6 June 2017) .. [Press2012] Press, W. H., & Dyson, F. J. (2012). Iterated Prisoner’s Dilemma contains strategies that dominate any evolutionary opponent. Proceedings of the National Academy of Sciences, 109(26), 10409–10413. http://doi.org/10.1073/pnas.1206569109 .. [Prison1998] LIFL (1998) PRISON. Available at: http://www.lifl.fr/IPD/ipd.frame.html (Accessed: 19 September 2016). diff --git a/docs/tutorials/getting_started/moran.rst b/docs/tutorials/getting_started/moran.rst index 13d3e05ab..d57e779c8 100644 --- a/docs/tutorials/getting_started/moran.rst +++ b/docs/tutorials/getting_started/moran.rst @@ -115,6 +115,19 @@ function like :code:`takewhile` from :code:`itertools`):: >>> mp.population_distribution() Counter({'Grudger': 4}) +It is possible to pass a fitness function that scales the utility values. A common one +used in the literature, [Ohtsuki2006]_, is :math:`f(s) = 1 - w + ws` where :math:`w` +denotes the intensity of selection:: + + >>> axl.seed(689) + >>> players = (axl.Cooperator(), axl.Defector(), axl.Defector(), axl.Defector()) + >>> w = 0.95 + >>> fitness_transformation = lambda score: 1 - w + w * score + >>> mp = axl.MoranProcess(players, turns=10, fitness_transformation=fitness_transformation) + >>> populations = mp.play() + >>> mp.winning_strategy_name + 'Cooperator' + Other types of implemented Moran processes: - :ref:`moran-process-on-graphs`