Commit 609461eb authored by David A.. Werner's avatar David A.. Werner
Browse files

Merge branch 'dev'. Initial version of code.

parents 9588dee5 825f37dc
......@@ -44,6 +44,7 @@ nosetests.xml
coverage.xml
*,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
......
.PHONY: test
test:
python setup.py test
.PHONY: develop
develop:
pip install --upgrade pip
pip install setuptools
python setup.py develop
.PHONY: setup_docs
setup_docs:
sphinx-apidoc -f -M -F -o docs/ manduca/
.PHONY: docs docs_html
docs: docs_html docs_pdf
docs_html: setup_docs
python setup.py build_sphinx
.PHONY: docs_pdf
docs_pdf: setup_docs
python setup.py build_sphinx -b latex
make -C docs/_build/latex/
# EE194_Manduca_Simulator
\ No newline at end of file
# EE194_Manduca_Simulator
# Setup
## Initial Setup
1. Clone this repo
1. Create and configure a python virtual environment
1. Create a virtual environment: e.g. `virtualenv venv`
1. Activate the environment with the appropriate script: e.g. `source venv/bin/activate.csh`
1. Upgrade pip: `pip install --upgrade pip`
1. Install/Upgrade setuptools: `pip install --upgrade setuptools`
1. Install the package and its dependancies
* In develop mode, where changes in the code take effect immediately `python setup.py develop`
* In install mode, where changes to the source code have no effect `python setup.py install`
## Using the package
1. Activate the virtual environment you created during intial setup
1. Import from the manduca library
* If you installed in develop mode, the code executed will always match the state of the source
directoy
* If you instlaled in install mode, the code will be the version from when it was installed,
*except* if you run a python command from within the repo directory; the code in `./manduca`
overrides the installed version of the `manduca` package
#Project Description
## Goals
* [ ] Simplified interfaces and classes
* [ ] Population, Manduca, SimpleManduca, History
* [ ] Customizable plotting of historic values
* [ ] auto documentation using sphinx
* [ ] conf.py
* [ ] pep8 compliance
* [ ] properties
* [ ] Custom Exceptions
* [ ] Test code using py.test
* [ ] wrappers
* [ ] multi-threading using multithreading.Pool
* [ ] Hist
import types
import numpy as np
from scipy.integrate import odeint
from collections import namedtuple
import StringIO
import functools
class ManducaDeformityError(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return repr(self.message)
# L0: Body segment resting spring lenght
# k: Elastic spring constant
# m: Leg mass
# c: Viscosity constant
ManducaBodyProperties = namedtuple('ManducaBodyProperties', ['L0', 'k', 'c', 'm','muscle_strength'])
class Manduca(object):
""" A simple model of a Manduca Sexta consiting of legs and muscles, modeled as masses and
springs, respectively."""
default_body_properties = ManducaBodyProperties(500, 1, 2, 1, 100)
damped_body_properties = ManducaBodyProperties(500, 1, 3, 1, 100)
def __init__(self, legs, muscles, time_step, body_properties=None, fitness_function=None):
self._legs = self._muscles = None
self.legs = np.copy(legs)
self.muscles = np.copy(muscles)
self._time_step = time_step
self._fitness = None
self.check_consistancy()
if fitness_function == None:
self._fitness_function = Manduca.distance_traveled
else:
self._fitness_function = fitness_function
if body_properties == None:
self.body_properties = Manduca.default_body_properties
else:
self.body_properties = body_properties
@property
def fitness_function(self):
return self._fitness_function
@fitness_function.setter
def fitness_function(self, fn):
self._fitness = None
self._fitness_function = fn
@property
def legs(self):
return self._legs
@legs.setter
def legs(self, legs):
new_legs = np.array(legs)
if self._legs is not None:
if new_legs.shape != self._legs.shape:
msg = 'Ill-defined leg shape. Expected {} but got {}'
e = msg.format(self._legs.shape, new_legs.shape)
raise(ManducaDeformityError(e))
self._legs = new_legs
self._fitness = None
@property
def muscles(self):
return self._muscles
@muscles.setter
def muscles(self, muscles):
new_muscles = np.array(muscles)
if self._muscles is not None:
if new_muscles.shape != self._muscles.shape:
msg = 'Ill-defined muscles shape. Expected {} but got {}'
e = msg.format(self._muscles.shape, new_muscles.shape)
raise(ManducaDeformityError(e))
self._muscles = new_muscles
self._fitness = None
@property
def time_step(self):
return self._time_step
@time_step.setter
def time_step(self, time_step):
self._time_step = time_step
self._fitness = None
@property
def num_legs(self):
return self.legs.shape[1]
@property
def num_muscles(self):
return self.muscles.shape[1]
@property
def num_time_steps(self):
leg_ts, muscle_ts = self.legs.shape[0], self.muscles.shape[0]
if leg_ts != muscle_ts:
msg = 'Arbitrary number of time-steps. {} in legs, {} in muscles'
e = msg.format(leg_ts, muscle_ts)
raise(ManducaDeformityError(e))
return leg_ts
@staticmethod
def distance_traveled(manduca):
positions, velocites, times = manduca.simulate(record_level=0)
mean_starting_loc = np.mean(positions[0])
mean_end_loc = np.mean(positions[-1])
distance_traveled = mean_end_loc - mean_starting_loc
return distance_traveled
@property
def fitness(self):
if (self._fitness is None):
self._fitness = self._fitness_function(self)
return (self._fitness)
@property
def initial_position(self):
return np.arange(self.num_legs)*self.body_properties.L0
def check_consistancy(self):
if self.num_muscles != self.num_legs - 1:
msg = 'Ill-defined manduca has {} legs and {} muscles'
e = msg.format(self.num_legs, self.num_muscles)
raise(ManducaDeformityError(e))
# Make sure the number of time steps is consistant
self.num_time_steps
def change_muscle_value(self, current_value, rng=np.random):
return rng.rand()*self.body_properties.muscle_strength
# First randomly choose the number of mutations (1 up to MAX_MUTATIONS)
# - flip a random leg-locked
# - flip a random muscle-on
# - do the secret-sauce mutation if desired.
def mutate (self, rng=np.random, max_mutations = 20):
num_mutations = rng.randint(1, max_mutations)
for _ in range(num_mutations):
mutation_type = rng.randint(2)
if mutation_type == 0:
time_step = rng.randint(self.num_time_steps)
leg_idx = rng.randint(self.num_legs)
value = self.legs[time_step][leg_idx]
self.legs[time_step][leg_idx] = int(value) ^ 1
elif mutation_type == 1:
time_step = rng.randint(self.num_time_steps)
muscle_idx = rng.randint(self.num_muscles)
value = self.muscles[time_step][muscle_idx]
new_value = self.change_muscle_value(value, rng=rng)
self.muscles[time_step][muscle_idx] = new_value
elif mutation_type == 2:
raise("Not supported")
pass # SECRET SAUCE and change mutation_type rng
# Pick entire rows from one parent or the other. Each row of the child comes
# from the first parent or the second (with equal likelihood).
# This is the final function that you write yourself.
def mate(self, parent2, rng=np.random):
child = self.clone()
num_time_steps = child.num_time_steps
for time_step in range(num_time_steps):
dna_src = rng.randint(2)
if dna_src == 0:
continue
else:
child.legs[time_step] = parent2.legs[time_step]
child.muscles[time_step] = parent2.muscles[time_step]
return child
def __repr__(self):
output = StringIO.StringIO()
output.write("Body Properties: {}\n".format(self.body_properties))
output.write(" legs | muscles")
for leg_t, musc_t in zip(self.legs, self.muscles):
leg_str = " ".join("{:>1}".format(int(l)) for l in leg_t)
musc_str = " ".join("{:>3}".format(int(m)) for m in musc_t)
output.write("\n{} | {}".format(leg_str, musc_str))
val = output.getvalue()
output.close()
return val
def __eq__(self, other):
return (self.time_step == other.time_step and
np.array_equal(self.legs, other.legs) and
np.array_equal(self.muscles, other.muscles) and
self.body_properties == other.body_properties)
def __ne__(self, other):
return (self.time_step != other.time_step or
not(np.array_equal(self.legs, other.legs)) or
not(np.array_equal(self.muscles, other.muscles) or
self.body_properties != other.body_properties))
def __hash__(self):
leg_str, m_str = self.legs.tostring(), self.muscles.tostring()
t_str = str(self.time_step)
manduca_str = leg_str + m_str + t_str
return hash(manduca_str)
def simulate(self, record_level=0, verbose_record=True, sub_intervals = 10):
""" record_level: Which intervals to record the position/velocity/time
0 -> initial/final only
1 -> once for each interval
2 -> every sub-interval
* -> default to 0
returns recored positions, velocities, and times
"""
L0, k, c, m, muscle_strength = self.body_properties
def calculate_slopes(x, t):
"""This function computes the derivatives of x
x is position of legs and velocites of legs
in the form [x0, x1, ..., xn, v0, v1, ..., vn]
t is an array of time values over which to integrate
in the form [t0, t1, ..., tN]
The forces are as follows:
Back leg: f0' = k(x1-x0-L0) + c(v1-v0) + M01
Middle legs: fi' = k(x{i+1}-xi-L0) - k(xi-x{i-1}-L0) +
c(v{i+1}-vi) + c(v{i-1}-vi) + Mn{n+1} - M{n-1}n
Front Leg: fn' = -k(xn-x{n-1}-L0) + c(v{n-1}-vn) + M{n-1}n
"""
# Pull out the position and velocity vectors
pos, vel = x[0:self.num_legs], x[self.num_legs:2*self.num_legs]
# Manually set the velocity of locked legs to zero
vel[np.where(current_legs==1)] = 0
# The derivative of position is just velocity
d_pos = vel
# An empty array for the derivative of velocity: acceleration
d_vel = np.zeros(self.num_legs)
# Compute the acceleration for each leg
for curr_idx in range(self.num_legs):
prev_idx, next_idx = curr_idx - 1, curr_idx + 1
# Compute the forward forces
if next_idx < self.num_legs: #all but the last leg
M_forwards = current_muscles[curr_idx]
spring_forwards = k*(pos[next_idx] - pos[curr_idx] - L0)
damp_forwards = c*(vel[next_idx] - vel[curr_idx])
else: # This is the last leg; no forward forces
M_forwards = spring_forwards = damp_forwards = 0
# Compute the backward forces
if prev_idx >= 0: # all but the first leg
M_backwards = current_muscles[prev_idx]
spring_backwards = k*(pos[curr_idx] - pos[prev_idx] - L0)
damp_backwards = c*(vel[curr_idx] - vel[prev_idx])
else: # This is the first leg; no backward forces
M_backwards = spring_backwards = damp_backwards = 0
# Compute the combined forces
M_force = M_forwards - M_backwards
spring_force = spring_forwards - spring_backwards
damp_force = damp_forwards - damp_backwards
acceleration = (M_force + spring_force + damp_force) / m
d_vel[curr_idx] = acceleration
# May not be necessary, but set the acceleration of locked legs to 0
d_vel[np.where(current_legs==1)] = 0
derivatives = np.hstack((d_pos, d_vel))
return (derivatives)
# Compute the time steps for each of the simulation intervals
end_time = self.num_time_steps * self.time_step
times = np.linspace(0, end_time, self.num_time_steps + 1)
# Initialize the position and velocity of each leg
position = self.initial_position
velocities = np.zeros((self.num_legs))
# Record the initial position and velocities
recorded_positions, recorded_velocities = [position], [velocities]
recorded_times = [0.0]
# For each time-step, advance the simulation
for time, current_legs, current_muscles in zip(times, self.legs, self.muscles):
end_time = time + self.time_step
# Split the interval up into the desired number of sub-intervals
interval_times = np.linspace(time, end_time, sub_intervals+1)
# Concatenate x and v to send to compute slopes
loc_and_vel = np.hstack ((position, velocities))
ode_values = odeint(calculate_slopes, loc_and_vel, interval_times)
# Extract the positions and velocities for all sub-intervals
# Exclude the initial value: it is a duplicate from the end of last interval!
new_positions = ode_values[1:, 0:self.num_legs]
new_velocities = ode_values[1:, self.num_legs:2*self.num_legs]
new_times = interval_times[1:]
# The current position and velocity are the last values in the list
position = new_positions[-1]
velocities = new_velocities[-1]
# Record the positions if desired
if record_level == 1:
recorded_positions.append(position)
recorded_velocities.append(velocities)
recorded_times.append(end_time)
elif record_level == 2:
recorded_positions.extend(new_positions)
recorded_velocities.extend(new_velocities)
recorded_times.extend(new_times)
# If the record_level is 0, save the final interval as wel
if record_level not in [1,2]:
recorded_positions.append(position)
recorded_velocities.append(velocities)
recorded_times.append(end_time)
return recorded_positions, recorded_velocities, recorded_times
def save(self, file_name, compressed=True):
if compressed:
save_fn = np.savez_compressed
else:
save_fn = np.savez
save_fn(file_name, legs=self.legs, muscles=self.muscles,
time_step=self.time_step, fitness=self._fitness,
body_properties=self.body_properties)
@classmethod
def load(cls, file_name, *other_params, **kwargs):
kwargs_copy = dict(kwargs)
try:
results = np.load(file_name)
legs = results['legs']
muscles = results['muscles']
time_step = results['time_step']
args = []
for param in other_params:
args.append(results[param])
if 'body_properties' not in kwargs_copy or kwargs_copy['body_properties'] is None:
kwargs_copy['body_properties'] = ManducaBodyProperties(*results['body_properties'])
new_manduca = cls(legs, muscles, time_step, *args, **kwargs_copy)
if 'fitness' in results:
new_manduca._fitness = np.asscalar(results['fitness'])
return new_manduca
except (IOError, ValueError) as e:
msg = 'Error loading {} from file {}'
print(msg.format(cls.__name__, file_name))
raise(e)
def clone(self):
return self.__class__(np.copy(self.legs), np.copy(self.muscles),
time_step=self.time_step,
body_properties=self.body_properties)
class SimpleManduca(Manduca):
"""A Manduca that has legs and muscles that are either ON/OFF"""
def __init__(self, legs, muscles, time_step, **kwargs):
super(SimpleManduca, self).__init__(legs, muscles, time_step, **kwargs)
locked_legs, unlocked_legs = self.legs == 1, self.legs == 0
assert (locked_legs | unlocked_legs).all(), "Legs must be 0 or 1"
musc_on, musc_off = self.muscles == 0, self.muscles == self.body_properties.muscle_strength
msg = "Muscles must be 0 or {}".format(self.body_properties.muscle_strength)
assert (musc_on | musc_off).all(), msg
@staticmethod
def random_individual(num_legs, time_segments, time_step, **kwargs):
body_properties = kwargs.pop('body_properties', None)
if body_properties == None:
body_properties = Manduca.default_body_properties
kwargs['body_properties'] = body_properties
muscle_strength = body_properties.muscle_strength
rng = kwargs.pop('rng', np.random)
legs = rng.choice([0, 1], size=(time_segments, num_legs))
muscles = rng.choice([0, muscle_strength],
size=(time_segments, num_legs-1))
return SimpleManduca(legs, muscles, time_step, **kwargs)
def change_muscle_value(self, current_value, rng=np.random):
return current_value^self.body_properties.muscle_strength
import numpy as np
import copy
import tqdm
from collections import namedtuple
class GenerationInfo(namedtuple('GenerationInfo', ['generation_number', 'incoming_fitness', 'outgoing_fitness',
'surviving_ancestors', 'surviving_children',
'surviving_mutants',
'num_ancestors', 'num_matings', 'num_mutants',
'duplicate_population',
'duplicate_children', 'duplicate_mutants'])):
@property
def max_fitness(self):
return max(self.outgoing_fitness)
class EvolutionParameters(object):
def __init__(self, max_population, num_matings, num_mutants, max_mutations, **kwargs):
self.max_population = max_population
self.num_matings = num_matings
self.num_mutants = num_mutants
self.max_mutations = max_mutations
self.num_fittest = kwargs.pop('num_fittest', int(self.max_population/2))
self.num_random = kwargs.pop('num_random', int(self.max_population/5))
def update_parameters(self, history):
"""Basic EvolutionParameters doesn't change as the simulation progresses"""
pass
class EvolutionSimulator(object):
default_evolution_parameters = EvolutionParameters(20, 10, 10, 10)
def __init__(self, population, evolution_parameters=None, random_number_generator=None, random_seed=None, pool=None):
"""Population can be array of manducas or tuple of (f, num_pop), where each call to f()
produces a member of the population"""
if isinstance(population, tuple):
self.population = [population[0]() for _ in range(population[1])]
else:
self.population = population
self._children = []
self._mutants = []
self.history = []
if evolution_parameters is None:
self.evolution_parameters = copy.copy(self.__class__.default_evolution_parameters)
else:
self.evolution_parameters = copy.copy(evolution_parameters)
if random_number_generator is None:
self.random_number_generator = np.random.RandomState()
else:
self.random_number_generator = random_number_generator
if random_seed:
self.random_number_generator.seed(random_seed)
self.pool = pool
def next_generation(self, current_generation):
incoming_fitness = sorted([m.fitness for m in self.population], reverse=True)
# mate: create children of two random parents
self.mate()
# mutate: create mutations of random individuals
self.mutate()
# remove duplicates
num_before_dups = [len(self.population), len(self._children), len(self._mutants)]
# individuals in current population are unique
self.population = list(set(self.population))
# remove children who are clones of others in the population
self._children = [c for c in self._children if c not in self.population]
# remove mutants who are clones of others in the population or of a child
self._mutants = [m for m in self._mutants if m not in self.population+self._children]
num_after_dups = [len(self.population), len(self._children), len(self._mutants)]
duplicate_population = num_before_dups[0] - num_after_dups[0]
duplicate_children = num_before_dups[1] - num_after_dups[1]
duplicate_mutants = num_before_dups[2] - num_after_dups[2]
self.update_fitness()
num_after_survival = self.survive()
surviving_ancestors, surviving_children, surviving_mutants = num_after_survival
outgoing_fitness = sorted([m.fitness for m in self.population], reverse=True)
num_ancestors = self.evolution_parameters.max_population
num_matings = self.evolution_parameters.num_matings
num_mutants = self.evolution_parameters.num_mutants
self.evolution_parameters.update_parameters(self.history)
return GenerationInfo(current_generation, incoming_fitness, outgoing_fitness,
surviving_ancestors, surviving_children,
surviving_mutants, num_ancestors,
num_matings, num_mutants,
duplicate_population,
duplicate_children, duplicate_mutants)
@property
def population_size(self):
return len(self.population)
def mate(self):
self._children = []
rng = self.random_number_generator
for _ in range(self.evolution_parameters.num_matings):
p1 = self.random_number_generator.choice(self.population)
p2 = self.random_number_generator.choice(self.population)
self._children.append(p1.mate(p2, rng=rng))
def mutate(self):
self._mutants = []
for _ in range(self.evolution_parameters.num_mutants):
rng = self.random_number_generator
clone = rng.choice(self.population).clone()
clone.mutate(max_mutations=self.evolution_parameters.max_mutations,rng=rng)
self._mutants.append(clone)
def survive(self):
rng = self.random_number_generator
all_manducas = [(m,0) for m in self.population] + \
[(m,1) for m in self._children] + \
[(m,2) for m in self._mutants]
all_manducas.sort(key=lambda val: val[0].fitness, reverse=True)
# Always keep the best 10 individuals
num_fittest = self.evolution_parameters.num_fittest
fittest_manducas, unfit_manducas = all_manducas[:num_fittest], all_manducas[num_fittest:]
unfit_manducas = np.array(unfit_manducas)
# Pick more completely randomly
num_random = self.evolution_parameters.num_random
if num_random > 0:
idxs = rng.choice(len(unfit_manducas), num_random, replace=False)
randomly_selected_manducas = list(unfit_manducas[idxs])
else:
randomly_selected_manducas = []
# Pick the rest randomly, weighted by fitness
remaining_manducas = self.evolution_parameters.max_population - num_fittest - num_random
if remaining_manducas > 0:
fitnesses = [val[0].fitness for val in unfit_manducas]
fitnesses -= min(fitnesses)
for idx in idxs:
fitnesses[idx] = 0 # Make sure not to duplicate already selected individuals
probs = fitnesses / sum(fitnesses)
idxs = rng.choice(len(unfit_manducas), remaining_manducas, p=probs, replace=False)
remaining_selected_manducas = list(unfit_manducas[idxs])
else:
remaining_selected_manducas = []