1from deap_er import creator
2from deap_er import tools
3from deap_er import base
4import itertools
5import operator
6import random
7import numpy
8import math
9
10
11# Disable randomization to guarantee reproducibility
12random.seed(1234)
13
14# Define constants, objects and functions.
15NDIM = 5
16NSWARMS = 1
17NPARTICLES = 5
18NEXCESS = 3
19RCLOUD = 0.5
20SWARM_DSTRB = "nuvd"
21AVG_OE_THRESHOLD = 5
22AVG_OE_MEASURE_INTERVAL = 200
23
24SCENARIO = tools.MPConfigs.ALT1
25mpb = tools.MovingPeaks(dimensions=NDIM, **SCENARIO)
26
27BOUNDS = [SCENARIO["min_coord"], SCENARIO["max_coord"]]
28SMIN = -(BOUNDS[1] - BOUNDS[0]) / 2.0
29SMAX = (BOUNDS[1] - BOUNDS[0]) / 2.0
30PMIN = BOUNDS[0]
31PMAX = BOUNDS[1]
32
33
34def generate_particle(pclass, dim, pmin, pmax, smin, smax):
35 part = pclass(random.uniform(pmin, pmax) for _ in range(dim))
36 part.speed = [random.uniform(smin, smax) for _ in range(dim)]
37 return part
38
39
40def update_particle(part, best, chi, c):
41 ce1 = (c * random.uniform(0, 1) for _ in range(len(part)))
42 ce2 = (c * random.uniform(0, 1) for _ in range(len(part)))
43 ce1_p = map(operator.mul, ce1, map(operator.sub, best, part))
44 ce2_g = map(operator.mul, ce2, map(operator.sub, part.best, part))
45 a = map(
46 operator.sub, map(
47 operator.mul, itertools.repeat(chi), map(
48 operator.add, ce1_p, ce2_g)
49 ), map(operator.mul, itertools.repeat(1 - chi), part.speed)
50 )
51 part.speed = list(map(operator.add, part.speed, a))
52 part[:] = list(map(operator.add, part, part.speed))
53
54
55def convert_swarm(swarm, rcloud, centre, dist):
56 dim = len(swarm[0])
57 for part in swarm:
58 position = [random.gauss(0, 1) for _ in range(dim)]
59 dist_ = math.sqrt(sum(x**2 for x in position))
60
61 if dist == "gaussian":
62 u = abs(random.gauss(0, 1.0/3.0))
63 part[:] = [(rcloud * x * u**(1.0/dim) / dist_) + c for x, c in zip(position, centre)]
64 elif dist == "uvd":
65 u = random.random()
66 part[:] = [(rcloud * x * u**(1.0/dim) / dist_) + c for x, c in zip(position, centre)]
67 elif dist == "nuvd":
68 u = abs(random.gauss(0, 1.0/3.0))
69 part[:] = [(rcloud * x * u / dist_) + c for x, c in zip(position, centre)]
70
71 del part.fitness.values
72 del part.bestfit.values
73 part.best = None
74
75 return swarm
76
77
78def setup():
79 creator.create("FitnessMax", base.Fitness, weights=(1.0,))
80 creator.create("Particle", list, fitness=creator.FitnessMax,
81 speed=list, best=None, bestfit=creator.FitnessMax)
82 creator.create("Swarm", list, best=None, bestfit=creator.FitnessMax)
83
84 toolbox = base.Toolbox()
85 toolbox.register("particle", generate_particle, creator.Particle,
86 dim=NDIM, pmin=PMIN, pmax=PMAX, smin=SMIN, smax=SMAX)
87 toolbox.register("swarm", tools.init_repeat, creator.Swarm, toolbox.particle)
88 toolbox.register("update", update_particle, chi=0.729843788, c=2.05)
89 toolbox.register("convert", convert_swarm, dist=SWARM_DSTRB)
90 toolbox.register("evaluate", mpb)
91
92 stats = tools.Statistics(lambda ind: ind.fitness.values)
93 stats.register("avg", numpy.mean)
94 stats.register("std", numpy.std)
95 stats.register("min", numpy.min)
96 stats.register("max", numpy.max)
97
98 logbook = tools.Logbook()
99 logbook.header = "gen", "nswarm", "evals", "error", "offline_error", "avg", "max"
100
101 return toolbox, stats, logbook
102
103
104def stop_condition(logbook):
105 interval = AVG_OE_MEASURE_INTERVAL
106 if len(logbook) >= 5e+5:
107 raise RuntimeError('Evolution failed to converge.')
108 elif len(logbook) % interval == 0:
109 err_sum = 0
110 for i in range(interval, 0, -1):
111 val = logbook.select("offline_error")[-i]
112 err_sum += val
113 avg_err = err_sum / interval
114 if avg_err <= AVG_OE_THRESHOLD:
115 print_results(avg_err)
116 return 1
117 return 0
118
119
120def print_results(avg_err):
121 print(f'\nAverage offline error: {avg_err:.3f} (<={AVG_OE_THRESHOLD}).')
122 print(f'\nEvolution converged correctly.')
123
124
125def main():
126 toolbox, stats, logbook = setup()
127
128 def update_fitness(group):
129 part.fitness.values = toolbox.evaluate(part)
130 if not part.best or part.fitness > part.bestfit:
131 part.best = toolbox.clone(part[:])
132 part.bestfit.values = part.fitness.values
133 if not group.best or part.fitness > group.bestfit:
134 group.best = toolbox.clone(part[:])
135 group.bestfit.values = part.fitness.values
136
137 def log_stats(ngen=0):
138 chain = itertools.chain(*population)
139 record = stats.compile(chain)
140 args = dict(
141 gen=ngen,
142 evals=mpb.nevals,
143 nswarm=len(population),
144 error=mpb.current_error,
145 offline_error=mpb.offline_error
146 )
147 logbook.record(**args, **record)
148 print(logbook.stream)
149
150 # Generate the initial population.
151 population = [toolbox.swarm(size=NPARTICLES) for _ in range(NSWARMS)]
152
153 # Evaluate the initial population.
154 for swarm in population:
155 for part in swarm:
156 update_fitness(swarm)
157
158 log_stats()
159
160 generation = 1
161
162 # Define the main evolution loop.
163 while not stop_condition(logbook):
164
165 # Reset convergence variables.
166 rex_cl = (BOUNDS[1] - BOUNDS[0]) / (2 * len(population) ** (1.0 / NDIM))
167 worst_swarm_idx = None
168 worst_swarm = None
169 not_converged = 0
170
171 # Compute the diameters of the swarms and search for the
172 # worst swarm according to its best global position.
173 for i, swarm in enumerate(population):
174 for p1, p2 in itertools.combinations(swarm, 2):
175 d = math.sqrt(sum((x1 - x2) ** 2. for x1, x2 in zip(p1, p2)))
176 if d > 2 * rex_cl:
177 not_converged += 1
178 if not worst_swarm or swarm.bestfit < worst_swarm.bestfit:
179 worst_swarm_idx = i
180 worst_swarm = swarm
181 break
182
183 # If all swarms have converged, add a swarm.
184 if not_converged == 0:
185 population.append(toolbox.swarm(size=NPARTICLES))
186
187 # If too many swarms are roaming, remove the worst swarm.
188 elif not_converged > NEXCESS:
189 population.pop(worst_swarm_idx)
190
191 # Update and evaluate the swarm.
192 for swarm in population:
193
194 # Check for change.
195 if swarm.best and toolbox.evaluate(swarm.best) != swarm.bestfit.values:
196
197 # Convert particles to quantum particles.
198 swarm[:] = toolbox.convert(swarm, rcloud=RCLOUD, centre=swarm.best)
199 swarm.best = None
200 del swarm.bestfit.values
201
202 # Not necessary to update if it's a new swarm
203 # or a swarm just converted to quantum.
204 for part in swarm:
205 if swarm.best and part.best:
206 toolbox.update(part, swarm.best)
207 update_fitness(swarm)
208
209 log_stats(generation)
210
211 # Apply exclusion to swarms which have the best
212 # position and are not set to reinitialize.
213 reinit_swarms = set()
214 for s1, s2 in itertools.combinations(range(len(population)), 2):
215 if (
216 population[s1].best and population[s2].best and
217 not (s1 in reinit_swarms or s2 in reinit_swarms)
218 ):
219 dist = 0
220 for x1, x2 in zip(population[s1].best, population[s2].best):
221 dist += (x1 - x2) ** 2.
222 dist = math.sqrt(dist)
223 if dist < rex_cl:
224 if population[s1].bestfit <= population[s2].bestfit:
225 reinit_swarms.add(s1)
226 else:
227 reinit_swarms.add(s2)
228
229 # Reinitialize and evaluate swarms.
230 for s in reinit_swarms:
231 population[s] = toolbox.swarm(size=NPARTICLES)
232 for part in population[s]:
233 update_fitness(population[s])
234
235 # Update iteration counter.
236 generation += 1
237
238
239if __name__ == "__main__":
240 main()