1from deap_er import creator
2from deap_er import tools
3from deap_er import base
4import itertools
5import random
6import array
7import numpy
8import math
9
10
11# Disable randomization to guarantee reproducibility
12random.seed(1234)
13
14# Define constants, objects and functions.
15AVG_OE_MEASURE_INTERVAL = 100
16AVG_OE_THRESHOLD = 3
17VERBOSE = True
18
19NDIM = 5
20NPOP = 10
21CR = 0.6
22F = 0.4
23
24SCENARIO = tools.MPConfigs.ALT1
25mpb = tools.MovingPeaks(dimensions=NDIM, **SCENARIO)
26
27BOUNDS = [SCENARIO["min_coord"], SCENARIO["max_coord"]]
28
29
30def brown_ind(iter_, best, sigma):
31 return iter_(random.gauss(x, sigma) for x in best)
32
33
34def setup():
35 creator.create("FitnessMax", base.Fitness, weights=(1.0,))
36 creator.create("Individual", array.array, typecode='d', fitness=creator.FitnessMax)
37
38 toolbox = base.Toolbox()
39 toolbox.register("attr_float", random.uniform, BOUNDS[0], BOUNDS[1])
40 toolbox.register("individual", tools.init_repeat, creator.Individual, toolbox.attr_float, NDIM)
41 toolbox.register("brownian_individual", brown_ind, creator.Individual, sigma=0.3)
42 toolbox.register("population", tools.init_repeat, list, toolbox.individual)
43 toolbox.register("select", random.sample, k=4)
44 toolbox.register("best", tools.sel_best, sel_count=1)
45 toolbox.register("evaluate", mpb)
46
47 stats = tools.Statistics(lambda ind: ind.fitness.values)
48 stats.register("avg", numpy.mean)
49 stats.register("std", numpy.std)
50 stats.register("min", numpy.min)
51 stats.register("max", numpy.max)
52
53 logbook = tools.Logbook()
54 logbook.header = "gen", "evals", "error", "offline_error", "avg", "max"
55
56 return toolbox, stats, logbook
57
58
59def stop_condition(logbook):
60 interval = AVG_OE_MEASURE_INTERVAL
61 if len(logbook) >= 5e+5:
62 raise RuntimeError('Evolution failed to converge.')
63 elif len(logbook) % interval == 0:
64 err_sum = 0
65 for i in range(interval, 0, -1):
66 val = logbook.select("offline_error")[-i]
67 err_sum += val
68 avg_err = err_sum / interval
69 if avg_err <= AVG_OE_THRESHOLD:
70 print_results(avg_err)
71 return 1
72 return 0
73
74
75def print_results(avg_err):
76 print(f'\nAverage offline error: {avg_err:.3f} (<={AVG_OE_THRESHOLD}).')
77 print(f'\nEvolution converged correctly.')
78
79
80def main():
81 toolbox, stats, logbook = setup()
82 regular, brownian = 4, 2
83 pop_size = regular + brownian
84
85 def log_stats(ngen=0):
86 chain = itertools.chain(*populations)
87 record = stats.compile(chain)
88 args = dict(
89 gen=ngen,
90 evals=mpb.nevals,
91 error=mpb.current_error,
92 offline_error=mpb.offline_error
93 )
94 logbook.record(**args, **record)
95 if VERBOSE:
96 print(logbook.stream)
97
98 # Generate the initial populations.
99 populations = [toolbox.population(size=pop_size) for _ in range(NPOP)]
100
101 # Evaluate the initial populations.
102 for idx, subpop in enumerate(populations):
103 fitness = toolbox.map(toolbox.evaluate, subpop)
104 for ind, fit in zip(subpop, fitness):
105 ind.fitness.values = fit
106
107 log_stats()
108
109 generation = 1
110
111 # Define the main evolution loop.
112 while not stop_condition(logbook):
113
114 # Detect changes and invalidate fitness if necessary.
115 bests = [toolbox.best(subpop)[0] for subpop in populations]
116 if any(b.fitness.values != toolbox.evaluate(b) for b in bests):
117 for individual in itertools.chain(*populations):
118 del individual.fitness.values
119
120 # Apply exclusionary pressure to the best individuals.
121 rex_cl = (BOUNDS[1] - BOUNDS[0]) / (2 * NPOP**(1.0/NDIM))
122 for i, j in itertools.combinations(range(NPOP), 2):
123 if bests[i].fitness.is_valid() and bests[j].fitness.is_valid():
124 d = sum((bests[i][k] - bests[j][k])**2 for k in range(NDIM))
125 d = math.sqrt(d)
126 if d < rex_cl:
127 if bests[i].fitness < bests[j].fitness:
128 k = i
129 else:
130 k = j
131 populations[k] = toolbox.population(size=pop_size)
132
133 # Evaluate the individuals with an invalid fitness.
134 chain = itertools.chain(*populations)
135 invalid_ind = [ind for ind in chain if not ind.fitness.is_valid()]
136 fitness = toolbox.map(toolbox.evaluate, invalid_ind)
137 for ind, fit in zip(invalid_ind, fitness):
138 ind.fitness.values = fit
139
140 log_stats(generation)
141
142 # Evolve the subpopulations.
143 for idx, subpop in enumerate(populations):
144 new_pop = []
145 xbest, = toolbox.best(subpop)
146
147 # Apply regular DE to the first part of the population.
148 for individual in subpop[:regular]:
149 x1, x2, x3, x4 = toolbox.select(subpop)
150 offspring = toolbox.clone(individual)
151 index = random.randrange(NDIM)
152 for i, value in enumerate(individual):
153 if i == index or random.random() < CR:
154 offspring[i] = xbest[i] + F * (x1[i] + x2[i] - x3[i] - x4[i])
155 offspring.fitness.values = toolbox.evaluate(offspring)
156 if offspring.fitness >= individual.fitness:
157 new_pop.append(offspring)
158 else:
159 new_pop.append(individual)
160
161 # Apply brownian DE to the last part of the population.
162 new_pop.extend(toolbox.brownian_individual(xbest) for _ in range(brownian))
163
164 # Evaluate the brownian individuals.
165 for individual in new_pop[-brownian:]:
166 individual.fitness.value = toolbox.evaluate(individual)
167
168 # Replace the population with the new one.
169 populations[idx] = new_pop
170
171 # Update iteration counter.
172 generation += 1
173
174
175if __name__ == "__main__":
176 main()