Differential Evolution

Basic DE

 1from deap_er import creator
 2from deap_er import tools
 3from deap_er import base
 4import random
 5import array
 6import numpy
 7
 8
 9random.seed(1234)  # disables randomization
10
11NDIM = 10
12CR = 0.25
13F = 1
14MU = 300
15NGEN = 200
16
17
18def setup():
19    creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
20    creator.create("Individual", array.array, typecode='d', fitness=creator.FitnessMin)
21
22    toolbox = base.Toolbox()
23    toolbox.register("attr_float", random.uniform, -3, 3)
24    toolbox.register("individual", tools.init_repeat, creator.Individual, toolbox.attr_float, NDIM)
25    toolbox.register("population", tools.init_repeat, list, toolbox.individual)
26    toolbox.register("select", tools.sel_random, sel_count=3)
27    toolbox.register("evaluate", tools.bm_sphere)
28
29    stats = tools.Statistics(lambda ind: ind.fitness.values)
30    stats.register("avg", numpy.mean)
31    stats.register("std", numpy.std)
32    stats.register("min", numpy.min)
33    stats.register("max", numpy.max)
34
35    logbook = tools.Logbook()
36    logbook.header = "gen", "evals", "std", "min", "avg", "max"
37
38    return toolbox, stats, logbook
39
40
41def print_results(best_ind):
42    if not best_ind.fitness.values < (1e-3,):
43        raise RuntimeError('Evolution failed to converge.')
44    print(f'\nEvolution converged correctly.')
45
46
47def main():
48    toolbox, stats, logbook = setup()
49    pop = toolbox.population(size=MU)
50    hof = tools.HallOfFame(1)
51
52    def log_stats(ngen=0):
53        record = stats.compile(pop)
54        logbook.record(gen=ngen, evals=len(pop), **record)
55        print(logbook.stream)
56
57    fitness = toolbox.map(toolbox.evaluate, pop)
58    for ind, fit in zip(pop, fitness):
59        ind.fitness.values = fit
60
61    log_stats()
62
63    for gen in range(1, NGEN):
64        for k, agent in enumerate(pop):
65            a, b, c = toolbox.select(pop)
66            y = toolbox.clone(agent)
67            index = random.randrange(NDIM)
68            for i, value in enumerate(agent):
69                if i == index or random.random() < CR:
70                    y[i] = a[i] + F * (b[i] - c[i])
71            y.fitness.values = toolbox.evaluate(y)
72            if y.fitness > agent.fitness:
73                pop[k] = y
74        hof.update(pop)
75        log_stats(gen)
76
77    print_results(hof[0])
78
79
80if __name__ == "__main__":
81    main()


Dynamic DE

  1from deap_er import creator
  2from deap_er import tools
  3from deap_er import base
  4import itertools
  5import random
  6import array
  7import numpy
  8import math
  9
 10
 11# Disable randomization to guarantee reproducibility
 12random.seed(1234)
 13
 14# Define constants, objects and functions.
 15AVG_OE_MEASURE_INTERVAL = 100
 16AVG_OE_THRESHOLD = 3
 17VERBOSE = True
 18
 19NDIM = 5
 20NPOP = 10
 21CR = 0.6
 22F = 0.4
 23
 24SCENARIO = tools.MPConfigs.ALT1
 25mpb = tools.MovingPeaks(dimensions=NDIM, **SCENARIO)
 26
 27BOUNDS = [SCENARIO["min_coord"], SCENARIO["max_coord"]]
 28
 29
 30def brown_ind(iter_, best, sigma):
 31    return iter_(random.gauss(x, sigma) for x in best)
 32
 33
 34def setup():
 35    creator.create("FitnessMax", base.Fitness, weights=(1.0,))
 36    creator.create("Individual", array.array, typecode='d', fitness=creator.FitnessMax)
 37
 38    toolbox = base.Toolbox()
 39    toolbox.register("attr_float", random.uniform, BOUNDS[0], BOUNDS[1])
 40    toolbox.register("individual", tools.init_repeat, creator.Individual, toolbox.attr_float, NDIM)
 41    toolbox.register("brownian_individual", brown_ind, creator.Individual, sigma=0.3)
 42    toolbox.register("population", tools.init_repeat, list, toolbox.individual)
 43    toolbox.register("select", random.sample, k=4)
 44    toolbox.register("best", tools.sel_best, sel_count=1)
 45    toolbox.register("evaluate", mpb)
 46
 47    stats = tools.Statistics(lambda ind: ind.fitness.values)
 48    stats.register("avg", numpy.mean)
 49    stats.register("std", numpy.std)
 50    stats.register("min", numpy.min)
 51    stats.register("max", numpy.max)
 52
 53    logbook = tools.Logbook()
 54    logbook.header = "gen", "evals", "error", "offline_error", "avg", "max"
 55
 56    return toolbox, stats, logbook
 57
 58
 59def stop_condition(logbook):
 60    interval = AVG_OE_MEASURE_INTERVAL
 61    if len(logbook) >= 5e+5:
 62        raise RuntimeError('Evolution failed to converge.')
 63    elif len(logbook) % interval == 0:
 64        err_sum = 0
 65        for i in range(interval, 0, -1):
 66            val = logbook.select("offline_error")[-i]
 67            err_sum += val
 68        avg_err = err_sum / interval
 69        if avg_err <= AVG_OE_THRESHOLD:
 70            print_results(avg_err)
 71            return 1
 72    return 0
 73
 74
 75def print_results(avg_err):
 76    print(f'\nAverage offline error: {avg_err:.3f} (<={AVG_OE_THRESHOLD}).')
 77    print(f'\nEvolution converged correctly.')
 78
 79
 80def main():
 81    toolbox, stats, logbook = setup()
 82    regular, brownian = 4, 2
 83    pop_size = regular + brownian
 84
 85    def log_stats(ngen=0):
 86        chain = itertools.chain(*populations)
 87        record = stats.compile(chain)
 88        args = dict(
 89            gen=ngen,
 90            evals=mpb.nevals,
 91            error=mpb.current_error,
 92            offline_error=mpb.offline_error
 93        )
 94        logbook.record(**args, **record)
 95        if VERBOSE:
 96            print(logbook.stream)
 97
 98    # Generate the initial populations.
 99    populations = [toolbox.population(size=pop_size) for _ in range(NPOP)]
100
101    # Evaluate the initial populations.
102    for idx, subpop in enumerate(populations):
103        fitness = toolbox.map(toolbox.evaluate, subpop)
104        for ind, fit in zip(subpop, fitness):
105            ind.fitness.values = fit
106
107    log_stats()
108
109    generation = 1
110
111    # Define the main evolution loop.
112    while not stop_condition(logbook):
113
114        # Detect changes and invalidate fitness if necessary.
115        bests = [toolbox.best(subpop)[0] for subpop in populations]
116        if any(b.fitness.values != toolbox.evaluate(b) for b in bests):
117            for individual in itertools.chain(*populations):
118                del individual.fitness.values
119
120        # Apply exclusionary pressure to the best individuals.
121        rex_cl = (BOUNDS[1] - BOUNDS[0]) / (2 * NPOP**(1.0/NDIM))
122        for i, j in itertools.combinations(range(NPOP), 2):
123            if bests[i].fitness.is_valid() and bests[j].fitness.is_valid():
124                d = sum((bests[i][k] - bests[j][k])**2 for k in range(NDIM))
125                d = math.sqrt(d)
126                if d < rex_cl:
127                    if bests[i].fitness < bests[j].fitness:
128                        k = i
129                    else:
130                        k = j
131                    populations[k] = toolbox.population(size=pop_size)
132
133        # Evaluate the individuals with an invalid fitness.
134        chain = itertools.chain(*populations)
135        invalid_ind = [ind for ind in chain if not ind.fitness.is_valid()]
136        fitness = toolbox.map(toolbox.evaluate, invalid_ind)
137        for ind, fit in zip(invalid_ind, fitness):
138            ind.fitness.values = fit
139
140        log_stats(generation)
141
142        # Evolve the subpopulations.
143        for idx, subpop in enumerate(populations):
144            new_pop = []
145            xbest, = toolbox.best(subpop)
146
147            # Apply regular DE to the first part of the population.
148            for individual in subpop[:regular]:
149                x1, x2, x3, x4 = toolbox.select(subpop)
150                offspring = toolbox.clone(individual)
151                index = random.randrange(NDIM)
152                for i, value in enumerate(individual):
153                    if i == index or random.random() < CR:
154                        offspring[i] = xbest[i] + F * (x1[i] + x2[i] - x3[i] - x4[i])
155                offspring.fitness.values = toolbox.evaluate(offspring)
156                if offspring.fitness >= individual.fitness:
157                    new_pop.append(offspring)
158                else:
159                    new_pop.append(individual)
160
161            # Apply brownian DE to the last part of the population.
162            new_pop.extend(toolbox.brownian_individual(xbest) for _ in range(brownian))
163
164            # Evaluate the brownian individuals.
165            for individual in new_pop[-brownian:]:
166                individual.fitness.value = toolbox.evaluate(individual)
167
168            # Replace the population with the new one.
169            populations[idx] = new_pop
170
171        # Update iteration counter.
172        generation += 1
173
174
175if __name__ == "__main__":
176    main()