Feeds:
Posts
Comments

In this post we’re going to take what we learned about using nodes as genes in a genetic algorithm in Part 3 and try an interesting problem that came my way from two different sources, an email conversation and this web page. The problem involves reproducing logical operations using other ones. The example given by Gibson is a PDP-8 computer that has AND and NOT operations but does not have OR or XOR operations.

The AND operation takes 2 binary inputs and outputs a 1 if both inputs are 1s, otherwise it outputs 0.

AND operation

The NOT operation takes 1 binary input and outputs the opposite value, 1 if 0, and 0 if 1.

NOT operation

The OR operation takes 2 binary inputs and outputs a 1 if either input is 1, otherwise it outputs 0.

OR operation

We’re going to use some problem specific code and a genetic algorithm via the solver from Part 3 to find a combination of AND and NOT gates that behave like an OR gate. Here’s a representation of the optimal solution:

OR operation built from AND and NOT gates

Our genes will be A and B for inputs, and AND and NOT for the logic gates.

We’ll start with a Node object.

class Node:
    Value = None
    Left = None
    Right = None

    def __init__(self, value, left=None, right=None):
        self.Value = value
        self.Left = left
        self.Right = right

    def isFunction(self):
        return self.Left is not None

    def __str__(self):
        result = self.Value
        if self.isFunction():
            result += "([" + str(self.Left) + "]"
            if self.Right is not None:
                result += ",[" + str(self.Right) + "]"
            result += ")"
        return result + " "

We’ll also use an Operation object.

class Operation:
    Func = None
    HasLeft = None
    HasRight = None

    def __init__(self, func, hasLeft, hasRight):
        self.Func = func
        self.HasLeft = hasLeft
        self.HasRight = hasRight

This will tell us hold a function to perform the operation, and information about how many and which inputs that operation is going to use.

We can use the following truth table to see the possible inputs and expected outputs so we can build the rules we’ll use to evaluate fitness.

OR operation truth table

Here’s the test function we expect to use.

    geneset = None

    @classmethod
    def setUpClass(cls):
        cls.geneset = {'A': Operation(lambda a, b: a, False, False),
                       'B': Operation(lambda a, b: b, False, False),
                       'AND': Operation(lambda a, b: a & b, True, True),
                       'NOT': Operation(lambda a, b: a == 0, True, False)}

    def test_generate_OR(self):
        minNodes = 6  # not( and( not(a), not(b)))
        rules = [[0, 0, 0], [0, 1, 1], [1, 0, 1], [1, 1, 1]]
        maxNodes = 20
        optimalValue = 1000 - minNodes
        startTime = datetime.datetime.now()
        fnDisplay = lambda candidate: displayRaw(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, self.geneset, rules)
        fnCreateGene = lambda index, length: createGene(index, length, self.geneset)
        fnMutate = lambda child: mutate(child, fnCreateGene)
        best = genetic.getBest(fnGetFitness, fnDisplay, minNodes, optimalValue, createGene=fnCreateGene,
                               maxLen=maxNodes, customMutate=fnMutate, customCrossover=crossover)
        self.assertTrue(best.Fitness >= optimalValue)

If you’ve been following along in the previous posts the above should make sense. In Part 3 we passed the optimal number of nodes to getFitness. This time we need to pass it the rules the generated operation needs to pass and the set of operations (geneset) to apply.

We’ll will start by figuring out which indexes are used in the circuit.

def getFitness(candidate, geneset, rules):
    usedIndexes = getUsedIndexes(candidate)
...

def getUsedIndexes(candidate):
    used = {0: [0]}
    if candidate[0].isFunction():
        for i in reversed(range(len(candidate))):
            element = candidate[i]
            iUsed = [i]
            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                if i < leftIndex < len(candidate):
                    iUsed.extend(used[leftIndex])
                if rightIndex is not None:
                    if i < rightIndex < len(candidate):
                        iUsed.extend(used[rightIndex])
            used[i] = iUsed
    return set(used[0])

Then we’ll clear all the nodes that aren’t used so we can save some processing time

    localCopy = candidate[:]
    notUsed = list(set(range(len(candidate))) - usedIndexes)
    for i in notUsed:
        localCopy[i] = None

and check each rule that has to pass.

    fitness = 0
    for rule in rules:
        if getFitnessForRule(localCopy, rule[0], rule[1], geneset) == rule[2]:
            fitness += 1

And just like in Part 3 we’re going to destructively evaluate all the Nodes right-to-left, perform the associated operation, and replace that Node with one containing the result.

def getFitnessForRule(candidate, a, b, geneset):
    if candidate[0].isFunction():
        localCopy = candidate[:]
        for i in reversed(range(len(localCopy))):
            element = localCopy[i]
            if element is None:
                continue
            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                left = None
                if i < leftIndex < len(localCopy):
                    left = localCopy[leftIndex].Value
                right = None
                if rightIndex is not None:
                    if i < rightIndex < len(localCopy):
                        right = localCopy[rightIndex].Value

                value = element.Value
                if isinstance(element.Value, str):
                    gene = geneset[element.Value]
                    value = gene.Func(left if left is not None else 0,
                                      right if right is not None else 0)
                localCopy[i] = Node(value)
            else:
                localCopy[i] = Node(geneset[element.Value].Func(a, b))
        result = localCopy[0].Value
    else:
        result = geneset[candidate[0].Value].Func(a, b)
    return result

Again like in Part 3 we’ll start with a displayRaw function

def displayRaw(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s" %
          ((' '.join(map(str, [str(item) for item in candidate.Genes]))),
           candidate.Fitness,
           str(timeDiff)))

that outputs node details that facilitate debugging.

NOT([4])  NOT([9])  B  NOT([10])  AND([7],[8])  AND([10],[5])  NOT([9])  NOT([10])  NOT([9])  A  B 	994	0:00:06.006344

Next we need a function that creates a gene. Here we take advantage of our knowledge of the parameters used by the particular operation to save some time.

def createGene(index, length, geneset):
    keys = list(geneset.keys())
    key = keys[random.randint(0, len(keys) - 1)]
    op = geneset[key]
    left = random.randint(index, length - 1) if op.HasLeft else None
    right = random.randint(index, length - 1) if op.HasRight else None
    return Node(key, left, right)

Since we’re working with tree nodes, again we can reuse the custom mutate and crossover functions from Part 3 that know how to handle subtrees, q.v.

Let’s run it.

AND([5],[1])  NOT([6])  NOT([4])  AND([4],[8])  NOT([9])  AND([8],[8])  A  AND([7],[8])  AND([8],[9])  NOT([9]) 	1	0:00:00.001000
B  NOT([6])  AND([6],[8])  NOT([5])  B  AND([9],[6])  NOT([9])  B  AND([8],[9])  AND([11],[9])  A  A 	3	0:00:00.014001
NOT([2])  A  AND([4],[8])  A  NOT([6])  A  B  AND([8],[9])  NOT([9])  A  B 	994	0:00:02.630150

It appears to have found the solution. We can use the displayDot implementation from Part 3 with minor changes to let us use GraphViz to visualize the output. Here’s another run with images displayed inline.

0 [label="NOT"];7 -> 0;7 [label="AND"];9 -> 7;10 -> 7;10 [label="B"];9 [label="A"]
fitness: 2	0:00:00	random

0 [label="A"]
fitness: 3	0:00:00.010000	random

0 [label="NOT"];7 -> 0;7 [label="AND"];13 -> 7;17 -> 7;17 [label="NOT"];18 -> 17;18 [label="B"];13 [label="NOT"];16 -> 13;16 [label="A"]
fitness: 994	0:00:02.223127	mutate

Cool! That worked perfectly. Now let’s go after exclusive OR (XOR). The inputs and outputs for XOR are:

truth table for XOR

Here’s a representation of the optimal solution:

XOR operation built from AND and NOT gates

We’ll create a new test with those rules:

    def test_generate_XOR(self):
        minNodes = 9  # and( not( and(a, b)), not( and( not(a), not(b))))
        rules = [[0, 0, 0], [0, 1, 1], [1, 0, 1], [1, 1, 0]]
        maxNodes = 50
        optimalValue = 1000 - minNodes
        startTime = datetime.datetime.now()
        fnDisplay = lambda candidate: displayDot(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, self.geneset, rules)
        fnCreateGene = lambda index, length: createGene(index, length, self.geneset)
        fnMutate = lambda child: mutate(child, fnCreateGene)
        best = genetic.getBest(fnGetFitness, fnDisplay, minNodes, optimalValue, createGene=fnCreateGene,
                               maxLen=maxNodes, customMutate=fnMutate, customCrossover=crossover)
        self.assertTrue(best.Fitness >= optimalValue)

As usual, the first gene sequences aren’t very good.

0 [label="A"]
fitness: 2	0:00:00	random

0 [label="AND"];6 -> 0;3 -> 0;3 [label="NOT"];16 -> 3;16 [label="B"];6 [label="A"]
fitness: 3	0:00:00.025002	mutate

But it quickly finds a non-optimal solution

0 [label="NOT"];3 -> 0;3 [label="AND"];7 -> 3;18 -> 3;18 [label="NOT"];29 -> 18;29 [label="AND"];32 -> 29;30 -> 29;30 [label="NOT"];31 -> 30;31 [label="B"];32 [label="A"];7 [label="NOT"];10 -> 7;10 [label="AND"];21 -> 10;12 -> 10;12 [label="B"];21 [label="NOT"];24 -> 21;24 [label="A"]
fitness: 988	0:00:58.776362	mutate

and it starts optimizing for a smaller number of nodes

0 [label="NOT"];38 -> 0;38 [label="AND"];39 -> 38;42 -> 38;42 [label="NOT"];45 -> 42;45 [label="AND"];48 -> 45;46 -> 45;46 [label="NOT"];47 -> 46;47 [label="B"];48 [label="A"];39 [label="NOT"];40 -> 39;40 [label="AND"];43 -> 40;41 -> 40;41 [label="B"];43 [label="NOT"];48 -> 43
fitness: 989	0:00:58.871367	mutate

0 [label="AND"];2 -> 0;1 -> 0;1 [label="NOT"];3 -> 1;3 [label="AND"];5 -> 3;8 -> 3;8 [label="B"];5 [label="A"];2 [label="NOT"];4 -> 2;4 [label="AND"];6 -> 4;10 -> 4;10 [label="NOT"];12 -> 10;12 [label="A"];6 [label="NOT"];8 -> 6
fitness: 990	0:01:00.549463	mutate

until it finds the optimal solution:

0 [label="AND"];3 -> 0;1 -> 0;1 [label="NOT"];5 -> 1;5 [label="AND"];9 -> 5;11 -> 5;11 [label="NOT"];12 -> 11;12 [label="A"];9 [label="NOT"];10 -> 9;10 [label="B"];3 [label="NOT"];4 -> 3;4 [label="AND"];10 -> 4;12 -> 4
fitness: 991	0:01:24.944859	mutate

That’s awesome!

You can get the code on Github

In Part 2 we added crossover and a 2nd genetic line to the basic genetic solver we built in Part 1 so that it would be able to solve the 8 Queens Puzzle. Now we’re going to take on a problem in the field of genetic programming that will expand the solver’s capabilities in some very useful ways. The problem we’re going to use to drive out the improvements to the solver is that of equation discovery.

The larger problem we want to solve is this: Given a set of symbols and an implied grammar that connects them, can we generate a sequence of grammatically correct symbols that can be evaluated in order to produce a specific result? That is genetic programming.

Equation discovery will move us in that direction by evolving our solver and giving us experience with symbols and grammar and “program” evaluation. If we can figure out how to evolve an equation then we can open up a whole bunch of really interesting problems for exploration with our solver. So, on to the problem.

Let’s say we have to find some combination of the values 1, 2, 3, 4, 5, 6, 7 and addition and subtraction operations that produces 29. There are many ways to solve this problem. For example: 7+7+7+7+7-6

Using that equation have 11 tokens, 5 ‘7’s, 4 ‘+’s, one ‘6’ and one ‘-‘.

[7][+][7][+][7][+][7][+][7][-][6]
 |  |  |  |  |  |  |  |  |  |  |
 1  2  3  4  5  6  7  8 9  10 11

Which can be visualized like this:

tree view of 7+7+7+7+7-6

Now if we add an additional constraint that we want to use as few tokens as possible we can re-arrange the tree as follows to eliminate the duplicate 7s and end up with only 7 symbols.

tree view of 7+7+7+7+7-6 with '7' node reused

Can we reduce it further? What if node 3 reused node 2 twice? That would be 2*(7+7), which sums to 28, and we’d only have to add 1 to get 29. Which would allow us to collapse nodes 5, 6 and 7, reducing the total node count to 5, an optimal result.

tree view of 2*(7+7)+1 with 5 and 7 reused

We want to figure out how to perform the same process with a genetic algorithm.

As usual, the first thing to consider is what our genes are going to be. We could use a gene set like “1234567+-” but that would not let us model token and subtree reuse. Examining the image above it is clear that our genes need to be tree nodes that represent either an operation (like + and -) with links to the nodes being operated upon, or a numeric value (1 2 3 4 5 6 7).

This implies a Node object as our gene.

class Node:
    Value = None
    Left = None
    Right = None

    def __init__(self, value, left=None, right=None):
        self.Value = value
        self.Left = left
        self.Right = right

    def isFunction(self):
        return self.Left is not None

Next we need to consider how to calculate fitness. We’re trying to accomplish 2 things with the fitness value in this problem. The first goal is the equation, when evaluated, must produce the desired result (29). The second goal is to get the shortest equation possible as measured by total node count. So assuming the equation sums to 29 instead of returning 29 we’ll return a higher value that lets us reward shorter sequences. That value might be 1000 minus the total number of nodes. We also have to consider what the fitness should be if the total is greater than the expected value (like 200 vs 29), and the case where the equation results in a negative value (-99) because these will affect the implementation.

    fitness = total if expectedTotal >= total >= 0 \
        else total - expectedTotal if total < 0 \
        else expectedTotal - total
    if fitness == expectedTotal:
        fitness = 1000 - len(distinctElementsUsed)

with tests

    def test_getFitness_given_total_equals_expectedTotal_should_get_1000_minus_total_Nodes_used(self):
        genes = [Node(1)]
        expectedTotal = 1
        result = getFitness(genes, expectedTotal)
        totalNodesUsed = len(genes)
        self.assertEqual(result, 1000 - totalNodesUsed)

    def test_getFitness_given_expectedTotal_GE_total_and_total_GE_0_should_get_total(self):
        total = 5
        genes = [Node(total)]
        expectedTotal = 9
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, total)

    def test_getFitness_given_total_LT_zero_should_get_total_minus_expectedTotal(self):
        total = -5
        genes = [Node(total)]
        expectedTotal = 9
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, total - expectedTotal)

    def test_getFitness_given_total_GT_expectedTotal_should_get_expectedTotal_minus_total(self):
        total = 14
        genes = [Node(total)]
        expectedTotal = 9
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, expectedTotal - total)

Next we need to consider how to get the total. If we add an implementation constraint that operation nodes can only point to nodes with a higher index then we can eliminate the problem of infinite loops in the equation.

We also need to consider how to handle Left and Right indexes that go out of bounds (beyond the end of the “program”). One solution is to treat them like pointers to an invisible Value-type node containing zero. That way they don’t affect the outcome.

            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                left = 0
                if i < leftIndex < len(localCopy):
                    # evaluate the left node
                right = 0
                if i < rightIndex < len(localCopy):
                    # evaluate the right node

We can perform the addition or subtraction as follows

                op = operator.add
                if element.Value == '-':
                    op = operator.sub
                value = op(left, right)

with tests

    def test_getFitness_given_PLUS_4_7_and_expectedTotal_20_should_get_11(self):
        genes = [Node('+', 1, 2), Node(4), Node(7)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 4 + 7)

    def test_getFitness_given_MINUS_7_4_and_expectedTotal_20_should_get_3(self):
        genes = [Node('-', 1, 2), Node(7), Node(4)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 7 - 4)

    def test_getFitness_given_PLUS_7_and_a_Left_value_GT_the_size_of_genes_and_expectedTotal_20_should_get_7(self):
        genes = [Node('+', 100, 1), Node(7)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 7)

    def test_getFitness_given_PLUS_7_and_a_Right_value_GT_the_size_of_genes_and_expectedTotal_20_should_get_7(self):
        genes = [Node('+', 1, 100), Node(7)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 7)

Now to calculate the values without recursion we can just start at the last gene instead of the first. This means we’re starting at the leaves of the tree. When we encounter a Value-type node there’s nothing to do. But when we encounter a Function-type node we use the above code to calculate its value and replace it in the genes with a Value-type node. The beauty of this is we don’t have to care whether the node was used or not, we just resolve all Function-type nodes all the way to the root. When we get to node zero, it holds the correct result of the equation even if the genes had nodes that weren’t used in the equation. Since we’re changing the genes we need to work from a copy. Here’s the fastest way to copy a list.

    localCopy = candidate[:]

Here’s what we have so far

def getFitness(candidate, expectedTotal):
    localCopy = candidate[:]
    if candidate[0].isFunction():
        for i in reversed(range(len(localCopy))):
            element = localCopy[i]
            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                left = 0
                if i < leftIndex < len(localCopy):
                    left = localCopy[leftIndex].Value
                right = 0
                if i < rightIndex < len(localCopy):
                    right = localCopy[rightIndex].Value
                op = operator.add
                if element.Value == '-':
                    op = operator.sub
                value = op(left, right)
                localCopy[i] = Node(value)

    total = localCopy[0].Value
    fitness = total if expectedTotal >= total >= 0 \
        else total - expectedTotal if total < 0 \
        else expectedTotal - total
    if fitness == expectedTotal:
        fitness = 1000 - len(distinctElementsUsed)

    return fitness

with depth tests

    def test_getFitness_given_PLUS_PLUS_5_4_3_and_expectedTotal_20_should_get_12(self):
        genes = [Node('+', 1, 2), Node('+', 3, 4), Node(5), Node(4), Node(3)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 12)

    def test_getFitness_given_PLUS_PLUS_5_4_3_and_other_unreferenced_nodes_should_ignore_the_unreferenced_nodes(self):
        genes = [Node('+', 2, 4), Node(9), Node('+', 6, 8), Node(8), Node(5), Node(7), Node(4), Node(6), Node(3),
                 Node(2)]
        expectedTotal = 20
        fitness = getFitness(genes, expectedTotal)
        self.assertEqual(fitness, 12)

Now we need to figure out which elements were actually used. We can do that by updating a dictionary of node-index to list of child nodes used by that node as we process the genes.

def getFitness(candidate, expectedTotal):
    used = {0: [0]}
    localCopy = candidate[:]
    if candidate[0].isFunction():
        for i in reversed(range(len(localCopy))):
            element = localCopy[i]
            iUsed = [i]
            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                left = 0
                if i < leftIndex < len(localCopy):
                    left = localCopy[leftIndex].Value
                    iUsed.extend(used[leftIndex])
                right = 0
                if i < rightIndex < len(localCopy):
                    right = localCopy[rightIndex].Value
                    iUsed.extend(used[rightIndex])
                op = operator.add
                if element.Value == '-':
                    op = operator.sub
                value = op(left, right)
                localCopy[i] = Node(value)
            used[i] = iUsed

    total = localCopy[0].Value
    fitness = total if expectedTotal >= total >= 0 \
        else total - expectedTotal if total < 0 \
        else expectedTotal - total
    if fitness == expectedTotal:
        distinctElementsUsed = set(used[0])
        fitness = 1000 - len(distinctElementsUsed)

    return fitness

With passing tests we can turn our attention to the display function. Let’s start with the simplest implementation, which is to just write out the raw Node values.

def displayRaw(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s" %
          ((' '.join(map(str, [item.Value for item in candidate.Genes]))),
           candidate.Fitness,
           str(timeDiff)))

Next we need a main (test) function that glues the parts together. The problem is the solver doesn’t support variable length gene sequences and it certainly doesn’t know how to create a random Node. Let’s start with how we want to call to look:

   best = genetic.getBest(
        fnGetFitness, 
        fnDisplay, 
        minNodes, 
        optimalValue, 
        fnCreateGene, 
        maxNodes)

Python doesn’t have function overloading but it does support default parameter values so…

def getBest(get_fitness, display, minLen, optimalFitness, geneSet=None, createGene=None, maxLen=None):
    random.seed()
    if geneSet is None and createGene is None:
        raise ValueError('must specify geneSet or createGene')
    if geneSet is not None and createGene is not None:
        raise ValueError('cannot specify both geneSet and createGene')
    if maxLen is None:
        maxLen = minLen
    bestParent = generateParent(minLen, maxLen, geneSet, get_fitness, createGene)
    display(bestParent)

    options = {
        0: lambda p: mutate(p, geneSet, get_fitness, createGene),
        1: lambda p: crossover(p, bestParent, get_fitness)
    }

    while bestParent.Fitness < optimalFitness:
        parent = generateParent(minLen, maxLen, geneSet, get_fitness, createGene)
  ...

Now we have the option of passing either a set of genes or a function to call to get a random gene. This cascades to the support functions as follows:

def crossover(parent, parent2, get_fitness):
    destIndex = random.randint(0, len(parent.Genes) - 1)
    srcIndex = destIndex if len(parent2.Genes) > destIndex else random.randint(0, len(parent2.Genes) - 1)
    childGenes = list(parent.Genes)
    childGenes[destIndex] = parent2.Genes[srcIndex]
    fitness = get_fitness(childGenes)
    return Individual(childGenes, fitness)

def mutate(parent, geneSet, get_fitness, createGene):
    index = random.randint(0, len(parent.Genes) - 1)
    childGenes = list(parent.Genes)
    if geneSet is not None:
        geneIndex = random.randint(0, len(geneSet) - 1)
        childGenes[index] = geneSet[geneIndex]
    else:
        childGenes[index] = createGene(index)
    fitness = get_fitness(childGenes)
    return Individual(childGenes, fitness)

def generateParent(minLength, maxLength, geneSet, get_fitness, createGene):
    childGenes = []
    length = random.randint(minLength, maxLength)
    if geneSet is not None:
        for i in range(0, length):
            geneIndex = random.randint(0, len(geneSet) - 1)
            childGenes.append(geneSet[geneIndex])
    else:
        for i in range(0, length):
            childGenes.append(createGene(i))
    fitness = get_fitness(childGenes)
    return Individual(childGenes, fitness)

Now we can complete our main test function.

    def test(self):
        geneset = [1, 2, 3, 4, 5, 6, 7, '+', '-']
        minNodes = 5  # (+ (+ ->[a] [a](+ 7 [b]5)) [b])
        expectedTotal = 29
        maxNodes = 30
        optimalValue = 1000 - minNodes
        startTime = datetime.datetime.now()
        fnDisplay = lambda candidate: displayRaw(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, expectedTotal)
        fnCreateGene = lambda index: createGene(index, geneset, maxNodes)
        best = genetic.getBest(fnGetFitness, fnDisplay, minNodes, optimalValue, createGene=fnCreateGene,
                               maxLen=maxNodes)
        self.assertTrue(best.Fitness >= optimalValue)

Here’s the output from a run using rawDisplay.

2 1 5 3 5 3 - - 1 5 6 6 5 2 5 6 2 + + 3 1 - 1 6	2	0:00:00
7 7 7 + 7 + 7	7	0:00:00.004000
+ 2 6 1 4 5 6 + 1 3 7 + + 7 7 1 1 4 1 2 3 - 3 2 7 4	28	0:00:00.035002
+ 5 6 6 2 7 4 + - 1 7 + + 7 7 4 - 2 1 6 + - - 3 2 1 4 6	992	0:00:00.535030
+ - 3 7 1 5 7 + + 5 6 + + + 7 4 5 + 5 6 7 3	993	0:00:01.995114
+ 3 1 5 4 4 3 + - 2 2 + + + 7 4	994	0:00:04.943282
+ 6 5 2 6 + 6 + 7 3 2 7 1 + 7 4	995	0:00:23.775360

The fitness value 995 (equals 1000 – 5) tells us it found the optimal result but this output doesn’t give us any idea which nodes are actually used or how to write the equation. Let’s write a display method that outputs prefix notation. For example:

(+ (+ [1](+ 7 7) ->[1]) 1)

Which is equivalent to:

tree view of (7+7)+(7+7)+1 with subtree '7+7' reused

def displayPrefixNotation(candidate, startTime):
    result = createEquation(candidate.Genes)
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s" % (result, candidate.Fitness, str(timeDiff)))

Next we’ll use a recursive function to visit each node and build a prefix notation version.

def visitNode(genes, used, index):
    result = ""
    if used[index] == 0:
        element = genes[index]
        if not element.isFunction():
            result += str(element.Value)
        else:
            used[index] = 1
            result += "[" + str(index) + "]"
            result += "(" + element.Value + " "
            leftIndex = element.Left
            if index < leftIndex < len(genes):
                result += visitNode(genes, used, leftIndex)
            else:
                result += " 0"
            rightIndex = element.Right
            if index < rightIndex < len(genes):
                result += " " + visitNode(genes, used, rightIndex)
            else:
                result += " 0"
            result += ")"
    else:
        used[index] = 2
        result += "->[" + str(index) + "]"
    return result

This puts an id before each subtree node and indicates which ones are calls to a previously referenced node.

[0](+ [1](+ [2](+ 4 7) 7) ->[2])

Then in createEquation we can remove the node ids that aren’t called.

def createEquation(genes):
    used = [0 for i in range(0, len(genes))]
    equationWithReferences = visitNode(genes, used, 0)
    for i in range(len(genes)):
        if used[i] == 0:
            continue
        if used[i] == 1:
            equationWithReferences = equationWithReferences.replace("[" + str(i) + "]", "")
    return equationWithReferences

Now when we run we get output like the following:

1	1	0:00:00
5	5	0:00:00.006000
6	6	0:00:00.008000
(+ [2](- 6 0) ->[2])	12	0:00:00.021001
(+ 7 7)	14	0:00:00.028001
(+ [2](+ 7 7) ->[2])	28	0:00:00.059003
(+ [12](- [19](+ 2 6) (-  0 0)) (+ (- (+ ->[19] ->[19]) 3) ->[12]))	990	0:00:01.479084
(+ [12](+ 5 6) (+ 7 ->[12]))	994	0:00:01.593091
(+ [12](+ 7 7) (+ 1 ->[12]))	995	0:00:04.925281

That is usable but we still can’t visualize Value-type node reuse without a lot of clutter. So we’ll do one more display variation that outputs the nodes and edges in a format that can be fed to graphviz

def displayDot(candidate, startTime):
    result = createDot(candidate.Genes)
    timeDiff = datetime.datetime.now() - startTime
    print("%s\nfitness: %i\t%s" % (";".join(result), candidate.Fitness, str(timeDiff)))

def createDot(genes):
    dotCommands = []
    added = [False for i in range(0, len(genes))]
    stack = [0]
    haveZeroNode = False
    while len(stack) > 0:
        index = stack.pop()
        if added[index]:
            continue
        added[index] = True
        element = genes[index]
        if not element.isFunction():
            dotCommands.append(str(index) + " [label=\"" + str(element.Value) + "\"]")
        else:
            dotCommands.append(str(index) + " [label=\"" + element.Value + "\"]")
            leftIndex = element.Left
            if index < leftIndex < len(genes):
                stack.append(leftIndex)
                dotCommands.append(str(index) + " -> " + str(leftIndex))
            else:
                if not haveZeroNode:
                    dotCommands.append("zero [label=\"0\"]")
                    haveZeroNode = True
                    dotCommands.append(str(index) + " -> zero")
            rightIndex = element.Right
            if index < rightIndex < len(genes):
                stack.append(rightIndex)
                dotCommands.append(str(index) + " -> " + str(rightIndex))
            else:
                if not haveZeroNode:
                    dotCommands.append("zero [label=\"0\"]")
                    haveZeroNode = True
                    dotCommands.append(str(index) + " -> zero")

    return dotCommands

This is a bit messier to look at

fitness: 9	0:00:00.040003
0 [label="+"];0 -> 11;0 -> 2;2 [label="7"];11 [label="7"]
fitness: 14	0:00:00.051003
0 [label="+"];0 -> 11;0 -> 2;2 [label="+"];2 -> 15;2 -> 20;20 [label="+"];20 -> 22;20 -> 25;25 [label="7"];22 [label="7"];15 [label="7"];11 [label="+"];11 -> 27;11 -> 14;14 [label="2"];27 [label="6"]
fitness: 991	0:00:00.911052
0 [label="+"];0 -> 11;0 -> 2;2 [label="+"];2 -> 15;2 -> 20;20 [label="+"];20 -> 26;20 -> 24;24 [label="+"];24 -> 26;24 -> 25;25 [label="7"];26 [label="5"];15 [label="7"];11 [label="5"]
fitness: 992	0:00:02.137122
0 [label="+"];0 -> 20;0 -> 24;24 [label="+"];24 -> 26;24 -> 25;25 [label="7"];26 [label="5"];20 [label="+"];20 -> 26;20 -> 24
fitness: 995	0:00:03.744214

but it is much cooler because but we can copy and paste each recipe into a dot file

digraph finite_state_machine {
	rankdir=LR;
0 [label="+"];0 -> 20;0 -> 24;24 [label="+"];24 -> 26;24 -> 25;25 [label="7"];26 [label="5"];20 [label="+"];20 -> 26;20 -> 24
}

and use graphviz to render them.

tree view of (7+7)

tree view of ((6+2)+(7+(7+7)))

tree view of (5+(7+(5+(7+5)))) with the last two 5s as a single node

tree view of (7+5)+(7+5)+5 with subtree '7+5' and digits 5 and 7 reused

That’s it. You can get the source code from this checkin on Github.


Refactor

Now for some code hygiene.

To start with it might be useful to know which strategy (mutate, crossover, random, etc) produces the better results over time for a particular problem. Implementation here.

Next, displayRaw should provide info about where the Left and Right links go so it can be used for debugging. We can do that by adding a __str__ function to Node:

class Node:
...
    def __str__(self):
        if self.isFunction():
            result = "(" + self.Value + "[" + str(self.Left) + "]" \
                     + " [" + str(self.Right) + "]" \
                     + ") "
        else:
            result = str(self.Value) + " "
        return result

and use that in displayRaw

def displayRaw(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s\t%s" %
          ((' '.join(map(str, [item for item in candidate.Genes]))),
           candidate.Fitness,
           str(timeDiff),
           candidate.Strategy))

Now in output, values in square brackets are the raw indexes

(+[15] [7])  (-[14] [8])  5  5  (+[19] [17])  6  (-[9] [24])  7  6  1  4  3  7  1  4  7  4  4  2  1 	14	0:00:00.016001	crossover

Next we can take on a flaw that wastes a lot of cycles. The genes are intended to be sparse so that we have a lot of latent or potential dna that one random mutation can activate. The problem is much of that latent dna is grabage because the left and right indexes point beyond the end of the array. Take this line for example:

(+[29] [9])  4  6  2  (+[21] [13]) 	0	0:00:00.001000	random

We only have 5 genes and the two Function-type genes point to invalid child nodes. This is because createGene assignes left and right values based on the maximum potential length of a gene sequence.

def createGene(index, geneset, maxNodes):
    value = geneset[random.randint(0, len(geneset) - 1)]
    if isinstance(value, str):
        left = random.randint(index, maxNodes - 1) if index < maxNodes else 0
        right = random.randint(index, maxNodes - 1) if index < maxNodes else 0
        return Node(value, left, right)
    return Node(value)

but in generateParent we decide the length before we start generating genes

def generateParent(minLength, maxLength, geneSet, get_fitness, createGene):
    childGenes = []
    length = random.randint(minLength, maxLength)
...
        for i in range(0, length):
            childGenes.append(createGene(i))

So, if we just pass through the known length we can make the latent genes much more likely to be usable should they be activated.

def createGene(index, length, geneset):
    value = geneset[random.randint(0, len(geneset) - 1)]
    if isinstance(value, str):
        left = random.randint(index, length - 1) if index < length else 0
        right = random.randint(index, length - 1) if index < length else 0
        return Node(value, left, right)
    return Node(value)

    def test(self):
...
        fnCreateGene = lambda index, length: createGene(index, length, geneset)

def mutate(parent, geneSet, get_fitness, createGene, customMutate):
...
        childGenes[index] = createGene(index, len(childGenes))

sample output

6  2  3  4  (+[7] [8])  6  7  6  6  2 	6	0:00:00	random

We have a similar flaw in mutate and crossover. They pick a random location to change but if that location isn’t in the execution path then it won’t affect the fitness of the generated Individual and the time spent will have been wasted. The solution is to only mutate or crossover genes that are being used, that way every change counts. We’ll start by extracting the code from getFitness that figures out which genes are used.

def getUsedIndexes(candidate):
    used = {0: [0]}
    if candidate[0].isFunction():
        for i in reversed(range(len(candidate))):
            element = candidate[i]
            iUsed = [i]
            if element.isFunction():
                leftIndex = element.Left
                rightIndex = element.Right
                if i < leftIndex < len(candidate):
                    iUsed.extend(used[leftIndex])
                if rightIndex is not None:
                    if i < rightIndex < len(candidate):
                        iUsed.extend(used[rightIndex])
            used[i] = iUsed
    return set(used[0])

def getFitness(candidate, expectedTotal):
    usedIndexes = list(sorted(getUsedIndexes(candidate)))
    localCopy = candidate[:]
    if candidate[0].isFunction():
        for i in reversed(usedIndexes):
 

This lets us create a custom mutate function that only mutates indexes that are in use.

def mutate(childGenes, fnCreateGene):
    childIndexesUsed = list(getUsedIndexes(childGenes))
    index = childIndexesUsed[random.randint(0, len(childIndexesUsed) - 1)]
    childGenes[index] = fnCreateGene(index, len(childGenes))

And we modify the solver to let us have the option to use a custom mutate function

def getBest(get_fitness, display, minLen, optimalFitness,
            geneSet=None, createGene=None, maxLen=None,
            customMutate=None):
...
        0: lambda p: mutate(p, geneSet, get_fitness, createGene, customMutate),

def mutate(parent, geneSet, get_fitness, createGene, customMutate):
    childGenes = parent.Genes[:]
    if customMutate is not None:
        customMutate(childGenes)
    else:
        index = random.randint(0, len(parent.Genes) - 1)

and pass it in from the test.

    def test(self):
...
        fnMutate = lambda child: mutate(child, fnCreateGene)
        best = genetic.getBest(fnGetFitness, fnDisplay, minNodes, optimalValue,
                               createGene=fnCreateGene, maxLen=maxNodes,
                               customMutate=fnMutate)

Note that almost all of the progress is now made by mutate.

7  5  4  5  1  (-[8] [11])  1  4  5  (-[13] [9])  4  6  5  5 	7	0:00:00	random
(-[1] [15])  (+[18] [4])  (+[7] [19])  1  7  (+[13] [17])  7  (-[18] [18])  4  3  7  4  2  (-[22] [15])  6  (-[19] [16])  (+[20] [21])  7  7  (-[22] [20])  7  5  4 	990	0:00:00.032002	mutate
(+[3] [13])  1  6  (+[16] [7])  (-[12] [17])  (-[15] [7])  2  (-[12] [10])  7  (-[16] [15])  (-[10] [12])  7  4  (+[15] [16])  5  7  7  (+[17] [17]) 	992	0:00:00.090005	mutate
(+[7] [5])  (+[25] [11])  3  4  (-[25] [20])  (+[24] [7])  1  (+[26] [8])  7  2  6  7  3  (-[22] [15])  3  6  (-[23] [26])  (-[24] [25])  (-[24] [24])  (-[19] [20])  4  (+[3] [13])  6  4  1  (-[27] [27])  7  (+[27] [27]) 	994	0:00:00.267015	mutate
(+[7] [9])  (-[10] [1])  5  (+[3] [9])  4  3  4  (+[8] [8])  (+[9] [10])  7  4 	995	0:00:00.933053	mutate

We need to do something similar for crossover because moving a Function-type node only moves the function and its references. Moving (+ [2],[3]) for example would have a completely different meaning if the nodes at [2] and [3] have different values in the child. Also, that node would become dead code if moved to an index greater than [3]. So where we previously crossed over just the one node, now we will cross over that node and all its children, rewriting the indexes as necessary to preserve the relationships. We could transfer them to the same index in the child but that might overwrite other nodes that are in use, likely reducing the fitness. So we’ll only crossover the child elements into unused node locations and tack on to the end if necessary.

Start by getting the list of used nodes in the child and parent

def crossover(child, parent):
    usedParentIndexes = list(sorted(getUsedIndexes(parent)))
    usedChildIndexes = list(getUsedIndexes(child))

If there’s only one of each then both point to the root and there are no child elements, just copy the node.

    if len(usedParentIndexes) == 1 and len(usedChildIndexes) == 1:
        # node 0 has no child nodes, just copy it
        child[0] = parent[0]
        return

Otherwise, we never want to copy the root to the root and we know that either the child or the parent has more than 1 node to choose from, so make sure we map subtree-to-root or root-to-subtree or subtree-to-subtree.

    while True:
        parentIndex = usedParentIndexes[random.randint(0, len(usedParentIndexes) - 1)]
        childIndex = usedChildIndexes[random.randint(0, len(usedChildIndexes) - 1)]
        if parentIndex != 0 or childIndex != 0:
            # don't copy the root to the root
            break

Next get the list of child nodes we can overwrite, including the 1 used node we’re replacing, in order from lowest to highest index.

    unusedChildIndexes = list(sorted(set(range(childIndex, len(child))) - set(usedChildIndexes)))
    unusedChildIndexes.insert(0, childIndex)

Then remap the indexes from the parent nodes to ones in the child. Just index them all… anything we don’t use will become latent dna.

    mappedIndexes = {}
    nextIndex = 0
    for pIndex in usedParentIndexes:
        if pIndex < parentIndex:
            continue
        if len(unusedChildIndexes) > nextIndex:
            mappedIndexes[pIndex] = unusedChildIndexes[nextIndex]
        else:
            mappedIndexes[pIndex] = len(child) + nextIndex - len(unusedChildIndexes)
        nextIndex += 1

Now transfer them all and rewrite the child indexes as necessary.

    for parentIndex in mappedIndexes.keys():
        node = parent[parentIndex]
        childIndex = mappedIndexes[parentIndex]
        childNode = Node(node.Value, node.Left, node.Right)
        if childIndex < len(child):
            child[childIndex] = childNode
        else:
            child.append(childNode)
        left = node.Left
        if left is not None:
            childNode.Left = mappedIndexes[left] if left in mappedIndexes else 0
        right = node.Right
        if right is not None:
            childNode.Right = mappedIndexes[right] if right in mappedIndexes else 0

We also need to add support for a custom crossover function to the solver:

def getBest(get_fitness, display, minLen, optimalFitness,
            geneSet=None, createGene=None, maxLen=None,
            customMutate=None, customCrossover=None):
...
        1: lambda p: crossover(p, bestParent, get_fitness, customCrossover)

def crossover(parent, parent2, get_fitness, customCrossover):
    childGenes = parent.Genes[:]
    if customCrossover is not None:
        customCrossover(childGenes, parent2.Genes[:])
    else:

Now when we run the test both crossover and mutate provide improvements.

5  1  2  (+[5] [10])  1  5  7  6  3  1  3  5  1  5  1  1  2  6  1  6  (-[22] [24])  3  4  1  4 	5	0:00:00.007001	random
(+[16] [17])  (+[3] [24])  7  3  5  7  7  1  6  5  1  2  7  1  2  4  7  7  (-[21] [24])  1  (-[24] [22])  4  4  2  (-[24] [24]) 	14	0:00:00.020001	mutate
(+[7] [5])  7  (+[7] [8])  1  (-[13] [12])  (+[6] [10])  7  (+[8] [9])  7  7  7  (-[11] [14])  (-[12] [14])  6  (+[14] [14]) 	28	0:00:00.036002	crossover
(+[7] [14])  7  5  4  7  7  4  (+[15] [11])  4  (+[13] [12])  4  3  6  4  (-[17] [18])  7  3  (+[19] [23])  2  (+[20] [25])  7  (+[22] [24])  7  7  7  7 	989	0:00:00.272016	crossover
(-[2] [3])  7  (+[4] [6])  2  (+[5] [7])  7  (+[7] [8])  (+[11] [9])  4  3  (-[12] [13])  7  7  7  7 	990	0:00:00.375022	mutate
(+[1] [3])  7  (+[3] [4])  (+[6] [5])  4  (+[12] [9])  (+[10] [8])  4  4  7  7  4  4  3  7 	991	0:00:00.416024	crossover
(+[2] [7])  2  (+[7] [9])  1  6  5  (-[9] [12])  4  2  (+[12] [10])  (+[16] [14])  (+[15] [13])  7  4  7  7  7  (+[19] [18])  (+[23] [21])  (+[22] [20])  4  7  7  4 	992	0:00:00.489028	crossover
(+[1] [2])  7  (+[7] [8])  (+[6] [5])  7  7  7  (+[8] [9])  4  (+[12] [12])  7  7  7 	993	0:00:00.631036	mutate
(+[1] [2])  7  (+[3] [3])  (+[8] [6])  6  3  7  (+[9] [10])  4  (+[10] [11])  4  (+[12] [12])  7 	994	0:00:00.815047	crossover
(+[2] [1])  (+[3] [2])  (+[3] [5])  7  7  4  4 	995	0:00:02.651152	mutate

The final code is available from this checkin.

Graph coloring is an interesting problem. Variations involve using the fewest number of colors while making each node a unique color, trying to use an equal number of each color, etc. Take for example this map of the United States.

multi colored U.S. map

How can we keep the constraint that adjacent states do not have the same color while reducing the number of colors to only four? If you try this by hand you start by picking a state and coloring the states around it alternating colors. You can do this with 3 colors if there an even number of adjacent states.

Tennessee and adjacent states 3-colored

But if there are an odd number then you need 4 colors.

Kentucky and adjacent states 4-colored

Then you continue the pattern to other states and introduce a new color only when necessary.

We’re going to do this for the map of 50 U.S. states using the genetic solver from the previous post.

We don’t care about the visual representation per se. We only care about the physical relationships between the states, which we can represent with a graph, or more simply as a set of rules indicating which states cannot have the same color.

We start off with a file containing a list of states and the set of states that are adjacent to each. The row for Tennessee might be as follows:

TN,AL;AR;GA;KY;MO;MS;NC;VA

Next we need to read that file.

def loadData(localFileName):
    # expects: AA,BB;CC;DD where BB, CC and DD are the initial column values in other rows
    with open(localFileName, mode='r') as infile:
        reader = csv.reader(infile)
        mydict = {row[0]: row[1].split(';') for row in reader if row}
        return mydict

Then we need to build the rules. A Rule connects two states indicating that they are adjacent. We want to be able to put rules in a dictionary and find them in a list so we need to define __hash__ and __eq__. We might also want to be able to display a rule so we’ll add a __str__ implementation.

class Rule:
    Item = None
    Other = None
    Stringified = None

    def __init__(self, item, other, stringified):
        self.Item = item
        self.Other = other
        self.Stringified = stringified

    def __eq__(self, another):
        return hasattr(another, 'Item') and \
               hasattr(another, 'Other') and \
               self.Item == another.Item and \
               self.Other == another.Other

    def __hash__(self):
        return hash(self.Item) * 397 ^ hash(self.Other)

    def __str__(self):
        return self.Stringified

Next we’re going to build the set of rules. While we’re doing so we’re going to perform a sanity check on the data. Whenever a state says it is adjacent to another state, the adjacent state should also say it is adjacent to the first state.

def buildLookup(items):
    itemToIndex = {}
    index = 0
    for key in sorted(items):
        itemToIndex[key] = index
        index += 1
    return itemToIndex


def buildRules(items):
    itemToIndex = buildLookup(items.keys())
    rulesAdded = {}
    rules = []
    keys = sorted(list(items.keys()))

    for key in sorted(items.keys()):
        keyIndex = itemToIndex[key]
        adjacentKeys = items[key]
        for adjacentKey in adjacentKeys:
            if adjacentKey == '':
                continue
            adjacentIndex = itemToIndex[adjacentKey]
            temp = keyIndex
            if adjacentIndex &lt; temp:
                temp, adjacentIndex = adjacentIndex, temp
            ruleKey = keys[temp] + "-&gt;" + keys[adjacentIndex]
            rule = Rule(temp, adjacentIndex, ruleKey)
            if rule in rulesAdded:
                rulesAdded[rule] += 1
            else:
                rulesAdded[rule] = 1
                rules.append(rule)

    for k, v in rulesAdded.items():
        if v == 1:
            print("rule %s is not bidirectional" % k)

    return rules

Now we have the ability to convert a file of node relationships to a set of adjacency rules. Next we need to build the code used by the genetic solver. We’ll start by determining what our genes will be. In this case since we want to four-color the 50 states our geneset will be four color codes.

        colors = ["Orange", "Yellow", "Green", "Blue"]
        colorLookup = {}
        for color in colors:
            colorLookup[color[0]] = color
        geneset = list(colorLookup.keys())

Our Individuals will have 50 genes, one for each state, in alphabetical order. This lets us use the index into the genes as an index into the set of sorted state codes.

Since the expected optimal situation will be that all adjacent states have different colors we can set the optimal value to the number of rules.

At the end we’ll write out the color of each state.

class GraphColoringTests(unittest.TestCase):
    def test(self):
        states = loadData("adjacent_states.csv")
        rules = buildRules(states)
        colors = ["Orange", "Yellow", "Green", "Blue"]
        colorLookup = {}
        for color in colors:
            colorLookup[color[0]] = color
        geneset = list(colorLookup.keys())
        optimalValue = len(rules)
        startTime = datetime.datetime.now()
        fnDisplay = lambda candidate: display(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, rules)
        best = genetic.getBest(fnGetFitness, fnDisplay, len(states), optimalValue, geneset)
        self.assertEqual(best.Fitness, optimalValue)

        keys = sorted(states.keys())

        for index in range(len(states)):
            print(keys[index] + " is " + colorLookup[best.Genes[index]])

As for display, it should be sufficient to output the color codes.

def display(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s" % (''.join(map(str, candidate.Genes)), candidate.Fitness, str(timeDiff)))

This gets output like the following. The number to the right of the gene sequence will indicate how many rules this gene sequence satisfies.

YGGBOOGOOBBYGGYYYYGBGYOOGBOYGGOOOYBOYBBGGOBYOGOGOGG	74	0:00:00.001000

Finally we need a fitness function that checks all the rules assuming the states are colored according to the gene sequence.

def getFitness(candidate, rules):
    rulesThatPass = 0
    for rule in rules:
        if candidate[rule.Item] != candidate[rule.Other]:
            rulesThatPass += 1

    return rulesThatPass

That’s it. Now when we run our main test function we get the following output:

OOYYOBBGYOOYGBBYOOOBOGYGYGGBBYGOGGOYOYGYBBOBOBGOBBG	82	0:00:00
YYBOYGGGGOBYOYBGBOOBOOBBGGBGGYGBBGOBOBYYOGYYBBYOYGO	102	0:00:00.016001
BOOGOGGOGBGYGGBGOOYBYOBYGBBOGBGBBBOYYYGYYOYOOGYBOBY	103	0:00:00.316018
GOBOGGOYGBGOBGOGYBYBOOYYGGBBGOYBYYYOOBGYYOYGBGGOGYY	104	0:00:01.602092
BBBBGGBYGOGYBGOGBBYGOGYYYGYBBOOBYYYOOOGOYGOGOGBBGYB	105	0:00:04.933282
AK is Blue
AL is Blue
AR is Blue
AZ is Blue
CA is Green
CO is Green
CT is Blue
DC is Yellow
DE is Green
FL is Orange
GA is Green
HI is Yellow
IA is Blue
ID is Green
IL is Orange
IN is Green
KS is Blue
KY is Blue
LA is Yellow
MA is Green
MD is Orange
ME is Green
MI is Yellow
MN is Yellow
MO is Yellow
MS is Green
MT is Yellow
NC is Blue
ND is Blue
NE is Orange
NH is Orange
NJ is Blue
NM is Yellow
NV is Yellow
NY is Yellow
OH is Orange
OK is Orange
OR is Orange
PA is Green
RI is Orange
SC is Yellow
SD is Green
TN is Orange
TX is Green
UT is Orange
VA is Green
VT is Blue
WA is Blue
WI is Green
WV is Yellow
WY is Blue

Which looks like this:

4-color us states

You can get the source code on Github.

In Part 1 we built a basic genetic solver that used mutation to solve problems. In this part we’re going to tackle a slightly more complex problem, the 8 Queens Puzzle, and then expand the solver as necessary.

The 8 Queens Puzzle involves putting 8 queens on a standard chess board such that none are under attack. According to WikiPedia there are only 92 solutions to this puzzle and once we remove mirrorings and rotations there are only 12 unique solutions.

To start with we need to define a genome. The chess board conveniently has the same number of rows as columns (8) so we’ll use the digits 1-8 for our genes.

geneset = '12345678'

Array indexes are zero based however, so we’ll need to convert gene symbols to row and column values. To do that we’ll find the gene’s index into the set of gene symbols then use that index as a row or column, and combine those to make a Point.

class Point:
    Row = None
    Col = None

    def __init__(self, row, col):
        self.Row = row
        self.Col = col

def getPoint(rowGene, colGene, gene_set):
    rowIndex = gene_set.index(rowGene)
    if rowIndex == -1:
        raise ValueError("'" + rowGene + "' is an invalid gene")
    colIndex = gene_set.index(colGene)
    if colIndex == -1:
        raise ValueError("'" + colGene + "' is an invalid gene")
    return Point(rowIndex, colIndex)

We also need to be able plot the Points on a board.

def getBoard(candidate, gene_set):
    board = [['.'] * 8 for i in range(8)]
    for index in range(0, len(candidate), 2):
        point = getPoint(candidate[index], candidate[index + 1], gene_set)
        board[point.Row][point.Col] = 'Q'
    return board

And we want to be able to visualize the board.

def display(candidate, gene_set, startTime):
    timeDiff = datetime.datetime.now() - startTime
    board = getBoard(candidate.Genes, gene_set)
    for i in range(8):
        print(board[i][0],
              board[i][1],
              board[i][2],
              board[i][3],
              board[i][4],
              board[i][5],
              board[i][6],
              board[i][7]
              )
    print("%s\t%i\t%s" % (candidate.Genes, candidate.Fitness, str(timeDiff)))

This will give us output like the following

. Q . . . . . .
. . Q . . . . .
. . . . Q Q . .
. . . . . . . Q
. . . . . . . .
Q . . . . . . .
. . . . . . Q .
. . . . Q . . .
7761124823353685	29	0:00:00.013001

The row of digits under the board is the set of genes that created the board layout. The number to the right will be the fitness, a measure of how close this set of genes is to the desired result. To drive improvement we’ll want to increase the fitness value whenever the related board position lets more queens coexist on the board. Let’s think about how we can do that.

We’ll start with counting the number of columns that have a queen. Here’s a layout that gets an optimal score but is undesirable

Q Q Q Q Q Q Q Q
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .
. . . . . . . .

We’ll also count the number of rows that have queens. Here’s a revised board where both counts are optimal but the layout still allows queens to attack each other.

Q . . . . . . .
. Q . . . . . .
. . Q . . . . .
. . . Q . . . .
. . . . Q . . .
. . . . . Q . .
. . . . . . Q .
. . . . . . . Q

To fix this problem we’ll include the count of the number of southeast diagonals that have a queen. Again we can find a corner case as follows:

. . . . . . . Q
. . . . . . Q .
. . . . . Q . .
. . . . Q . . .
. . . Q . . . .
. . Q . . . . .
. Q . . . . . .
Q . . . . . . .

To fix this final problem we’ll include the count of the number of northeast diagonals that have a queen.

Our fitness value will be the sum of those four counts, 8+8+8+8 being optimal.

Side note on implementation, we can calculate the northeast diagonals in Excel using the formula =$A2+B$1 which results in a grid as follows

    0   1   2   3   4   5   6   7
0   0   1   2   3   4   5   6   7
1   1   2   3   4   5   6   7   8
2   2   3   4   5   6   7   8   9
3   3   4   5   6   7   8   9   10
4   4   5   6   7   8   9   10  11
5   5   6   7   8   9   10  11  12
6   6   7   8   9   10  11  12  13
7   7   8   9   10  11  12  13  14

The southeast diagonals can be calculated using =(8-1-$A2)+B$1 which we can visualize as follows:

    0   1   2   3   4   5   6   7
0   7   8   9   10  11  12  13  14
1   6   7   8   9   10  11  12  13
2   5   6   7   8   9   10  11  12
3   4   5   6   7   8   9   10  11
4   3   4   5   6   7   8   9   10
5   2   3   4   5   6   7   8   9
6   1   2   3   4   5   6   7   8
7   0   1   2   3   4   5   6   7

Using the above 2 formulas along with the row and column values lets us touch each board position exactly once, which makes our fitness function run fast.

def getFitness(candidate, gene_set):
    board = getBoard(candidate, gene_set)
    rowsWithQueens = {}
    colsWithQueens = {}
    northEastDiagonalsWithQueens = {}
    southEastDiagonalsWithQueens = {}
    for row in range(8):
        for col in range(8):
            if board[row][col] == 'Q':
                rowsWithQueens[row] = 1
                colsWithQueens[col] = 1
                northEastDiagonalsWithQueens[row + col] = 1
                southEastDiagonalsWithQueens[8 - 1 - row + col] = 1

    return len(rowsWithQueens) \
           + len(colsWithQueens) \
           + len(northEastDiagonalsWithQueens) \
           + len(southEastDiagonalsWithQueens)

Finally our main (integration test) method will bring all the parts together.

class EightQueensTests(unittest.TestCase):
    def test(self):
        geneset = '12345678'
        startTime = datetime.datetime.now()
        fnDisplay = lambda candidate: display(candidate, geneset, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, geneset)
        best = genetic.getBest(fnGetFitness, fnDisplay, 16, 8 + 8 + 8 + 8, geneset)
        self.assertEqual(best.Fitness, 8 + 8 + 8 + 8)

Note that we’ve added a parameter to getBest. This is because in Part 1 the solver determined the optimal fitness internally but now we want pass it in.

def getBest(get_fitness, display, targetLen, optimalFitness, geneSet):
...
    while bestParent.Fitness < optimalFitness:

We can run our test to see if the solver can find a solution.

Q . . . . . . .
. . . . . . . .
. . . . Q . . .
. . . . Q . . .
. Q . . Q . . .
. . . . . . . .
. . . Q . . . .
Q . Q . . . . .
4574811152355583	23	0:00:00
Q . . . . . . .
. . . . . . . .
. . . . Q . . .
. . . . Q . . .
. . . . Q . . Q
. . . . . . . .
. . . Q . . . .
Q . Q . . . . .
4574811158355583	24	0:00:00
Q . . . . . . .
. . . . . . . .
. . . . Q . . .
. . Q . Q . . .
. . . . Q . . Q
. . . . . . . .
. . . Q . . . .
Q . . . . . . .
4574811158355543	25	0:00:00.001000
Q . . . . . . .
. . . . . . . .
. . . . Q . . .
. . Q . Q . . .
. . . . . . Q Q
. . . . . . . .
. . . Q . . . .
Q . . . . . . .
4574811158355743	26	0:00:00.002000
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . Q . Q . . .
. . . . . . Q Q
. . . . . . . .
. . . Q . . . .
Q . . . . . . .
4574811158315743	27	0:00:00.003000
Q . . . . . . .
. . . . . . . Q
Q . . . . . . .
. . Q . Q . . .
. . . . . . Q .
. . . . . . . .
. . . Q . . . .
Q . . . . . . .
4574811128315743	28	0:00:00.004000
Q . . . . . . .
. . . . . . . Q
Q . . . . . . .
. . Q . Q . . .
. . . . . . Q .
. . . . . . . .
. . . Q . . . .
. Q . . . . . .
4574821128315743	29	0:00:00.005001
Q . . . . . . .
. . . . . Q . .
Q . . . . . . .
. . Q . Q . . .
. . . . . . Q .
. . . . . . . .
. . . Q . . . .
. Q . . . . . .
4574821126315743	30	0:00:00.009001

After several test runs we find that most of the time it gets close but can’t get all the way to the optimal value of 32. We need to enhance the solver’s capabilities for it to be able to handle this problem.

If at first you don’t succeed, try, try again!

We’re going to do that by introducing a second genetic line. We’ll mutate the 2nd line as long as it is improving. If it ends up with a better fitness than bestParent then it will become the bestParent. Otherwise, we’ll start a new genetic line again with a random gene sequence. We repeat this process over and over until we find an optimal result. Here’s the updated solver loop:

def getBest(get_fitness, display, targetLen, optimalFitness, geneSet):
    random.seed()
    bestParent = generateParent(targetLen, geneSet, get_fitness)
    display(bestParent)

    while bestParent.Fitness < optimalFitness:
        parent = generateParent(targetLen, geneSet, get_fitness)
        attemptsSinceLastImprovement = 0
        while attemptsSinceLastImprovement < 128:
            child = mutate(parent, geneSet, get_fitness)
            if child.Fitness > parent.Fitness:
                parent = child
                attemptsSinceLastImprovement = 0
            attemptsSinceLastImprovement += 1

        if bestParent.Fitness < parent.Fitness:
            bestParent, parent = parent, bestParent
            display(bestParent)

    return bestParent

Now when we run the 8 queens test we find an optimal solution every time.

. Q . . Q . . .
. . . . Q . . .
. . . . . . . Q
. . . . . . . Q
. . . . . . . .
. . . . . . . .
. . . . . . . .
Q . . . . . . .
8148381525122525	20	0:00:00
. . Q . . . . .
. . Q . . . . .
. . . . . . Q .
. . Q . . . . .
. . . . . . . .
. . . Q . . . .
Q . . . . . . .
. . . . Q . . Q
2385884313647137	28	0:00:00.013001
. . . . . . Q .
. . . . . . . .
. . . Q . . . .
. . . . . Q . Q
Q . . . . . . .
. . . . Q . . .
. Q . . . . . .
. . . . Q . . .
3417486546857251	30	0:00:00.041003
. . Q . . . . .
. . . . Q . . .
. Q . . . . . .
. . . Q . . . .
Q . . . . . . .
. . Q . . . . .
. . . . . . . Q
. . . . . Q . .
5178258613324463	31	0:00:00.143008
. . . . Q . . .
. . Q . . . . .
Q . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . Q
. . . . . Q . .
. . . Q . . . .
5247312368841576	32	0:00:00.308018

However, when we run the string duplication test from Part 1 it now struggles to find a solution. This is because we ignore our best line completely once we find it and only try to improve by mutating a new genetic line.

JoyEMlECsatqzfBVFkvIHduMg!HAFtnWcC	3	0:00:00
Nj!HBLwEHvdse wGGOKbJctg AriqlNDPP	8	0:00:00.022001
NBiBRlE thoHfFWhW !tnnFyuHAv wputM	11	0:00:00.062004
NUt alx xmusGiwTmEwprder are lKsdG	19	0:00:00.112007
Not allkthosbHnhoCHinvBr OQe losmR	21	0:00:00.852049
NotW!lT NhoJe VhRFwaDderiaCe most.	22	0:00:02.510144
Not alF tQose S!o wDndUZ yre lostA	25	0:00:05.847335
NotXLll those whopwWnder are lost.	30	0:01:33.040322

We need a way to continue to take advantage of the genes in bestParent. One way nature handles this is through crossbreeding so we’ll introduce a crossover strategy where we take one random gene from a 2nd (the best) parent.

def crossover(parent, parent2, get_fitness):
    index = random.randint(0, len(parent.Genes) - 1)
    genes = list(parent.Genes)
    genes[index] = parent2.Genes[index]
    childGenes = (''.join(genes))
    fitness = get_fitness(childGenes)
    return Individual(childGenes, fitness)

And update the main loop to randomly choose whether to use mutation or crossover to improve the 2nd genetic line.

    options = {
        0: lambda p: mutate(p, geneSet, get_fitness),
        1: lambda p: crossover(p, bestParent, get_fitness)
    }

    while bestParent.Fitness < optimalFitness:
        parent = generateParent(targetLen, geneSet, get_fitness)
        attemptsSinceLastImprovement = 0
        while attemptsSinceLastImprovement < 128:
            child = options[random.randint(0, len(options) - 1)](parent)

Now when we run the tests both are able to achieve optimal results every time.

You can get the source code to this point from Github.


Refactor

Now that everything works we’re going to do some code hygiene. In solving this problem we used genes “12345678” but it would have been more convenient, and faster, if we could have used raw integers 0-7. So let’s make that change. I’ll show selected changes below but you can get the full set from github.

class EightQueensTests(unittest.TestCase):
    def test(self):
        geneset = [0, 1, 2, 3, 4, 5, 6, 7]
    ...
        fnDisplay = lambda candidate: display(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate)
    ...

This means both Point and getPoint can go away, simplifying getBoard to:

def getBoard(candidate):
    board = [['.'] * 8 for i in range(8)]
    for index in range(0, len(candidate), 2):
        board[candidate[index]][candidate[index + 1]] = 'Q'
    return board

In display we go the other way so we can show all the genes as a string.

def display(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    board = getBoard(candidate.Genes)
    for i in range(8):
        print(board[i][0],
              board[i][1],
              board[i][2],
              board[i][3],
              board[i][4],
              board[i][5],
              board[i][6],
              board[i][7]
              )
    print("%s\t%i\t%s" % (''.join(map(str, candidate.Genes)), candidate.Fitness, str(timeDiff)))

To support the geneset being an array in the solver we only have to eliminate the line in crossover, mutate and generateParent that converts the child gene sequence to a string.

Finally, in the string duplication test code we have to change the code to convert the array to a string for display and the final assertion.

def display(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print("%s\t%i\t%s" % ((''.join(candidate.Genes)), candidate.Fitness, str(timeDiff)))

class StringDuplicationTests(unittest.TestCase):
    def test(self):
        geneset = ' abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!.'
        startTime = datetime.datetime.now()
        target = "Not all those who wander are lost."
        fnDisplay = lambda candidate: display(candidate, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, target)
        best = genetic.getBest(fnGetFitness, fnDisplay, len(target), len(target), geneset)
        self.assertEqual((''.join(best.Genes)), target)

Pretty painless really. The updated code is available in this changeset.

Go on to Part 3 in which we evolve the solver to let us discover equations that evaluate to a specific value.

The goal of this, my first program in Python, is to reproduce a target string (like Hello World!) without looking directly at it. I’ll do this with a simple genetic algorithm that randomly generates an initial sequence of characters and then mutates one random character in that sequence at a time until it matches the target. Think of this like playing a Hangman variant where you pass a letter sequence to the person who knows the target word, and the only feedback you get is how many of your letters are correct. It is also reminiscent of the game Hotter Colder, except we’re doing it with code.

We start off with a standard set of letters for genes and a target string:

    geneset = " abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!."
    target = "Not all those who wander are lost."

Next we need a way to generate a random gene sequence from the gene set.

def generateParent(length):
    genes = list("")
    for i in range(0,length):
        geneIndex = random.randint(0, len(geneset) -1);
        genes.append(geneset[geneIndex])
    return(''.join(genes))

There are many ways to calculate a fitness value (how close the guess is to the target) for the generated string. For this particular problem we’ll simply count the number of characters that are the same between the candidate string and the target string.

def getFitness(candidate, target):
   fitness = 0
   for i in range(0, len(candidate)):
       if target[i] == candidate[i]:
           fitness += 1
   return(fitness)

We also need a way to produce a new child gene sequence by mutating the existing (parent) string.  The point is to create a copy of the parent then replace 1 letter/gene in the copy with a randomly selected one from the set of all possible genes/letters.

def mutate(parent):
   geneIndex = random.randint(0, len(geneset) -1);
   index = random.randint(0, len(parent) - 1)
   genes = list(parent)
   genes[index] = geneset[geneIndex]
   return(''.join(genes))

Next we need a way to display a gene sequence, its fitness value and how much time has elapsed.

def display(candidate, startTime):
    timeDiff = datetime.datetime.now() - startTime
    fitness = getFitness(candidate, target)
    print ("%s\t%i\t%s" % (candidate, fitness, str(timeDiff)))

The heart of the genetic solver is a loop that uses the functions above to generate a candidate gene sequence, compare it to the previous best, and randomly mutate it until all the genes match those in the target.

bestParent = generateParent(len(target))
bestFitness = getFitness(bestParent, target)
display(bestParent, startTime)

while bestFitness < len(bestParent):
   child = mutate(bestParent)
   childFitness = getFitness(child, target)

   if childFitness > bestFitness:
      bestFitness = childFitness
      bestParent = child
      display(bestParent, startTime)

Get the source here.

Sample output:

Python 3.4.3 (v3.4.3:9b73f1c3e601, Feb 24 2015, 22:44:40) [MSC v.1600 64 bit (AMD64)] on win32
Type "copyright", "credits" or "license()" for more information.
<<< ================================ RESTART ================================
<<<
iEjRmPrBUUDosXupcEw.Ji.qTCtRYawlry	1	0:00:00
iEjRmPrBUUDosXupcEw.Ji.qTCtRYaolry	2	0:00:00.004001
iEjRmPr UUDosXupcEw.Ji.qTCtRYaolry	3	0:00:00.007001
iEjRmPr UUDosXupcEw.JieqTCtRYaolry	4	0:00:00.011001
iEjRmPr UUDosXupcEw.nieqTCtRYaolry	5	0:00:00.014002
iEjRmPl UUDosXupcEw.nieqTCtRYaolry	6	0:00:00.016002
iEjRmPl UhDosXupcEw.nieqTCtRYaolry	7	0:00:00.023003
iEjRmPl UhoosXupcEw.nieqTCtRYaolry	8	0:00:00.026003
iEjRmPl UhossXupcEw.nieqTCtRYaolry	9	0:00:00.029003
iEjRmPl UhossXupcEw.nierTCtRYaolry	10	0:00:00.032003
iEj mPl UhossXupcEw.nierTCtRYaolry	11	0:00:00.036004
iEj mPl UhossXupcEw.nierTCrRYaolry	12	0:00:00.038004
iEj aPl UhossXupcEw.nierTCrRYaolry	13	0:00:00.041004
iEj aPl UhossXupcEw.nderTCrRYaolry	14	0:00:00.044005
iEj aPl UhossXupcEw.nderTCreYaolry	15	0:00:00.045005
iEj aPl UhossXupoEw.nderTCreYaolry	16	0:00:00.047005
iEj aPl UhoseXupoEw.nderTCreYaolry	17	0:00:00.051005
iEj aPl UhoseXupoEw.nderTCreYlolry	18	0:00:00.054006
iEj aPl UhoseXupoEw.nderTCreYlolr.	19	0:00:00.057006
iEj aPl thoseXupoEw.nderTCreYlolr.	20	0:00:00.059006
iEj aPl thoseXupoEwanderTCreYlolr.	21	0:00:00.065007
iEj aPl thoseXupo wanderTCreYlolr.	22	0:00:00.068007
NEj aPl thoseXupo wanderTCreYlolr.	23	0:00:00.073008
NEj aPl thoseXupo wander CreYlolr.	24	0:00:00.075008
NEj aPl thoseXupo wander CreYlolt.	25	0:00:00.081008
NEj aPl thoseXwpo wander CreYlolt.	26	0:00:00.083009
Noj aPl thoseXwpo wander CreYlolt.	27	0:00:00.090009
Noj aPl thoseXwho wander CreYlolt.	28	0:00:00.093010
Noj aPl thoseXwho wander CreYlost.	29	0:00:00.097010
Not aPl thoseXwho wander CreYlost.	30	0:00:00.106011
Not all thoseXwho wander CreYlost.	31	0:00:00.117012
Not all thoseXwho wander areYlost.	32	0:00:00.120012
Not all thoseXwho wander are lost.	33	0:00:00.123013
Not all those who wander are lost.	34	0:00:00.151015


Refactor

Good, it works. Now we need to separate the solver code from that specific to the string duplication problem so we can use it to solve other problems. We’ll start by moving our main loop into a function called getBest. That function’s parameters will include the functions it should call to get the candidate’s fitness and to display a new best sequence.

def getBest(get_fitness, display, targetLen, geneSet):
    random.seed()
    bestParent = generateParent(targetLen, geneSet)
    bestFitness = get_fitness(bestParent)
    display(bestParent)

    while bestFitness < len(bestParent):
       child = mutate(bestParent, geneSet)
       childFitness = get_fitness(child)

       if childFitness > bestFitness:
          bestFitness = childFitness
          bestParent = child
          display(bestParent)
    return bestParent

We also need to pass the gene set to generateParent and mutate so they aren’t using global variables.

def mutate(parent, geneSet):
   geneIndex = random.randint(0, len(geneSet) -1);
   index = random.randint(0, len(parent) - 1)
   genes = list(parent)
   genes[index] = geneSet[geneIndex]
   return(''.join(genes))

def generateParent(length, geneSet):
    genes = list("")
    for i in range(0,length):
        geneIndex = random.randint(0, len(geneSet) -1);
        genes.append(geneSet[geneIndex])
    return(''.join(genes))

Now move the rest of the code to a new file named stringDuplicationTests.py. Notice that display and getFitness no longer have the same number of parameters as those above.

def getFitness(candidate, target):
   fitness = 0
   for i in range(0, len(candidate)):
       if target[i] == candidate[i]:
           fitness += 1
   return(fitness)

def display(candidate, target, startTime):
    timeDiff = datetime.datetime.now() - startTime
    fitness = getFitness(candidate, target)
    print ("%s\t%i\t%s" % (candidate, fitness, str(timeDiff)))

In our main test method we’ll pass lambdas that take the candidate gene sequence passed by the solver as a parameter and call the local methods with additional required parameters as necessary.

def test_string_duplication():
    geneset = " abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!."
    startTime = datetime.datetime.now()
    target = "Not all those who wander are lost."
    fnDisplay = lambda candidate: display(candidate, target, startTime)
    fnGetFitness = lambda candidate: getFitness(candidate, target)
    best = genetic.getBest(fnGetFitness, fnDisplay, len(target), geneset)

Next we’ll make it so that executing this file calls our test function.

if __name__ == '__main__':
    test_string_duplication()

That’s it. We now have separation of concerns and when we run the test code the program still works. You can get the updated code from this checkin on Github.

Next, we’ll modify the test file to make it compatible with Python’s built in test framework.

import unittest
import datetime
import genetic

def getFitness(candidate, target):
   fitness = 0
   for i in range(0, len(candidate)):
       if target[i] == candidate[i]:
           fitness += 1
   return(fitness)

def display(candidate, target, startTime):
    timeDiff = datetime.datetime.now() - startTime
    fitness = getFitness(candidate, target)
    print ("%s\t%i\t%s" % (candidate, fitness, str(timeDiff)))

class StringDuplicationTests(unittest.TestCase):
    def test(self):
        geneset = " abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!."
        startTime = datetime.datetime.now()
        target = "Not all those who wander are lost."
        fnDisplay = lambda candidate: display(candidate, target, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, target)
        best = genetic.getBest(fnGetFitness, fnDisplay, len(target), geneset)
        self.assertEqual(best, target)

if __name__ == '__main__':
    unittest.main()

This allows us to run the tests from the commandline and without writing the output, which, by the way, makes it run twice as fast.

python -m unittest -b geneticTests.py
.
----------------------------------------------------------------------
Ran 1 test in 0.108s

OK

The final refactoring will be to introduce an Individual object that has both Genes and Fitness attributes.

class Individual:
   Genes = None
   Fitness = None
   def __init__(self, genes, fitness):
        self.Genes = genes
        self.Fitness = fitness

This lets us pass those values around as a unit and reduces some double work in the display function.

import random

def mutate(parent, geneSet, get_fitness):
   geneIndex = random.randint(0, len(geneSet) -1);
   index = random.randint(0, len(parent.Genes) - 1)
   genes = list(parent.Genes)
   genes[index] = geneSet[geneIndex]
   childGenes = (''.join(genes))
   fitness = get_fitness(childGenes)
   return Individual(childGenes, fitness)

def generateParent(length, geneSet, get_fitness):
    genes = list("")
    for i in range(0,length):
        geneIndex = random.randint(0, len(geneSet) -1);
        genes.append(geneSet[geneIndex])
    childGenes = (''.join(genes))
    fitness = get_fitness(childGenes)
    return Individual(childGenes,fitness)

def getBest(get_fitness, display, targetLen, geneSet):
    random.seed()
    bestParent = generateParent(targetLen, geneSet, get_fitness)
    display(bestParent)

    while bestParent.Fitness < targetLen:
       child = mutate(bestParent, geneSet, get_fitness)

       if child.Fitness > bestParent.Fitness:
          bestParent = child
          display(bestParent)
    return bestParent

We also have to make compensating changes to the test file methods.

def display(candidate, target, startTime):
    timeDiff = datetime.datetime.now() - startTime
    print ("%s\t%i\t%s" % (candidate.Genes, candidate.Fitness, str(timeDiff)))

class StringDuplicationTests(unittest.TestCase):
    def test(self):
        geneset = " abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!."
        startTime = datetime.datetime.now()
        target = "Not all those who wander are lost."
        fnDisplay = lambda candidate: display(candidate, target, startTime)
        fnGetFitness = lambda candidate: getFitness(candidate, target)
        best = genetic.getBest(fnGetFitness, fnDisplay, len(target), geneset)
        self.assertEqual(best.Genes, target)

The final code for this project is available on Github.

Go on to Part 2 in which we evolve the solver to handle the 8 Queens Puzzle.

In Part 3 we added an integration test to verify that the solver can find a solution to the 8 Queens puzzle. However, Mutation is the only strategy currently implemented in the solver and it fails to find a solution to that problem 80% of the time. We need to introduce a new strategy from nature – crossover from another successful genetic line.

But first, to simplify some code sections I’m going to introduce a container so we can pass both the genes and their fitness as a single object.

    pub struct Individual {
        pub genes: String,
        pub fitness: usize
    }

This means we’ll call the display function with an Individual instead of a String, but also that we don’t have to call get_fitness in our tests in order to display the interim results. There are no surprises in this change so I won’t repeat them here. You can browse these code changes on Github.

We need to make two improvements to the solver in order for it to be able to solve this problem every time. The first is to add crossover as a genetic strategy. This is similar to mutation in that we’re going to replace 1 gene at a specific location. But this time instead of substituting a random gene we’re going to get the same gene location from a second successful parent.

    fn crossover<F>(parent1: &Individual, parent2: &Individual, get_fitness: F) -> Individual
    where F : Fn(&String)->usize
    {
        let mut rng = thread_rng();
        let parent_index = rng.gen::<usize>() % parent1.genes.len();
        let mut candidate = String::with_capacity(parent1.genes.len());

        if parent_index > 0 {
            candidate.push_str(&parent1.genes[..parent_index]);
        }
        candidate.push_str(&parent2.genes[parent_index..(1+parent_index)]); // replace 1 gene
        if parent_index+1 < parent1.genes.len() {
            candidate.push_str(&parent1.genes[parent_index+1..]);
        }

        let fitness = get_fitness(&candidate);
        Individual { genes: candidate, fitness: fitness }
    } 

The second improvement is to have an additional genetic line. We start with a random gene sequence and use our strategies to improve it until we fail to improve it 100 times in a row. Then we compare it to the current best parent and keep the better of the two. This process repeats until we hit the optimal fitness value.

    pub fn get_best<F,D>(
        get_fitness: F, 
        display: D, 
        length: usize, 
        optimal_fitness_value: usize,
        gene_set: &str) -> Individual
        where F : Fn(&String)->usize,
        D : Fn(&Individual)
    {
        let mut best_parent = generate_parent(&get_fitness, gene_set, length);
        display(&best_parent);

        while best_parent.fitness < optimal_fitness_value {
            let mut candidate = generate_parent(&get_fitness, gene_set, length);
            let mut attempts_since_last_improvement = 0;
            while attempts_since_last_improvement < 100 {
                let child = match attempts_since_last_improvement % 3 {
                    0 => generate_parent(&get_fitness, gene_set, length),
                    1 => mutate_parent(&candidate, &get_fitness, gene_set),
                    _ => crossover(&candidate, &best_parent, &get_fitness)
                };

                if child.fitness > candidate.fitness { 
                    candidate = child;
                    attempts_since_last_improvement = 0;
                }
                attempts_since_last_improvement = attempts_since_last_improvement + 1;
            }
            if candidate.fitness > best_parent.fitness { 
                best_parent = candidate;
                display(&best_parent);
            }
        }

        best_parent 
    }

Now when we run the integration tests they all pass 100% of the time, though some rounds take longer than others. Explore this code change on Github.

In Part 2 of this series we converted our code to a library, made our initial puzzle into an integration test, and extracted test related parameters and methods from the library.

Now we’re ready to try a new puzzle. This time we’ll expand our solver to handle a slightly more difficult problem – the 8 Queens puzzle.

In the 8 Queens puzzle we wan’t to place 8 chess Queens on a Chess board such that none of them are under attack.

7 Chess Queens in safe positions on a Chess board

According to WikiPedia there are only 92 solutions to this puzzle and once we remove mirrorings and rotations there are only 12 unique solutions.

We start off by figuring out how we’re going to map genes to the problem. One solution that I’ve used before is to assign each square on the 8×8 Chess board a symbol from the 64 symbol set ([a-z][A-Z][0-9]@#) as follows:

board positions labelled first rows then columns

We need to be able to convert a symbol (gene) to a board position. To do that we’ll find its index in the set of genes then convert that index to a row and column, or Point.

    fn to_point(gene: char, gene_set: &str) -> Point {
        let location = gene_set.find(gene);
        assert_eq!(location.is_some(), true);
        let index = location.unwrap() as i32;
        let row = index / 8i32;
        let column = index % 8i32;
        return Point{row: row, col: column};
    }

    struct Point {
        row: i32,
        col: i32
    }

We also need a way to visualize a set of Points (Queens) on a Chess board.

    fn get_board(candidate: &String, gene_set: &str) -> [[char; 8]; 8] {
        let mut board:[[char; 8]; 8] = [['.'; 8]; 8];
        
        for point in candidate.chars().map(|c| to_point(c, gene_set)) {
            board[point.row as usize][point.col as usize] = 'Q';
        }
        board
    }

    fn display_8_queens(candidate: &String, gene_set: &str, start: PreciseTime) {
        let now = PreciseTime::now();
        let elapsed = start.to(now);
        let board:[[char; 8]; 8] = get_board(candidate, gene_set);
        for i in 0..8 {
            let mut row = "".to_string();
            for j in 0..8 {
                row.push(board[i][j]);
                row.push(' ');
            }
            println!("{}", row);
        }
        
        println!("{}\t{}\t{}", candidate, get_8_queens_fitness(&candidate, gene_set), elapsed);
    }

The output of display_8_queens looks like this:

. . Q . . . . .
. . . . . Q . .
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. . . . Q . . .
. Q . . . . . .
0ncJ5yUx        4096    PT0.192306457S

Next we need a way to see 1) if we have 8 Queens on the board, we might not if the same gene appears twice in the generated sequence, and 2) if each Queen is on its own row, column and diagonals.

First the row and column checks. We’ll keep a count of how many rows and columns have exactly one Queen.

    fn get_8_queens_fitness(candidate: &String, gene_set: &str) -> usize {
        let board = get_board(candidate, gene_set);
        
        // count rows with 1 queen
        let indexes: Vec<i32> = (0..8).collect();
        let mut correct_queens_in_row = 0; 
        for i in 0..8 {
            let row_count = indexes.iter()
                .cloned()
                .map(|col| board[i][col as usize])
                .filter(|ch|'Q' == *ch)
                .count();
            if row_count == 1 {
                correct_queens_in_row = correct_queens_in_row + 1;
            }
        }
        let mut correct_queens_in_column = 0; 
        for i in 0..8 {
            let column_count = indexes.iter()
                .cloned()
                .map(|row| board[row as usize][i])
                .filter(|ch|'Q' == *ch)
                .count();
            if column_count == 1 {
                correct_queens_in_column = correct_queens_in_column + 1;
            }
        }

To count the number of diagonals that have exactly one Queen we’ll introduce a generator that creates Points starting from an initial position and then moving by a given row and column offset. First the generator and some tests for it.

    struct Diagonal {
        row: i32,
        col: i32,
        row_offset: i32,
        col_offset: i32
    }

    impl Iterator for Diagonal {
        type Item = Point;
        fn next(&mut self) -> Option<Point> {
            let prev_row = self.row;
            let prev_col = self.col;
            self.row = prev_row + self.row_offset;
            self.col = prev_col + self.col_offset;
            
            // this is an infinite value generator
            Some(Point{row:prev_row,col:prev_col})
        }
    }

    #[test]
    fn test_diagonal_iterator_first_value_returned_should_be_the_start_state() {
        let mut diag = Diagonal {row:5,col:6,row_offset:1,col_offset:1};
        let first = diag.next();
        assert_eq!(true,first.is_some());
        let point = first.unwrap() as Point;
        assert_eq!(point.row,5);
        assert_eq!(point.col,6);
    }
    
    #[test]
    fn test_diagonal_iterator_second_value_returned_should_be_with_offsets_added_to_start_state() {
        let diag = Diagonal {row:5,col:4,row_offset:1,col_offset:-1};
        let first = diag.skip(1).next();
        assert_eq!(true,first.is_some());
        let point = first.unwrap() as Point;
        assert_eq!(point.row,6);
        assert_eq!(point.col,3);
    }

Note: As of this 5/29 the above code does not work with the 1.0.0 release of rustc due to a bug in the compiler… however the nightly build no longer has the bug. Try it on Playpen

And now the diagonal checks…

    let mut correct_queens_in_northeast_diagonal = 0; 
    for i in 0..15 {
        let diag = Diagonal {row:i,col:0,row_offset:-1,col_offset:1};
        let diagonal_count = diag
        .take_while(|point|point.row >= 0 && point.col >= 0)
        .skip_while(|point|point.row >=8 || point.col >=8)
        .take_while(|point|point.col < 8)
        .map(|point| board[point.row as usize][point.col as usize])
        .filter(|ch| 'Q' == *ch)
        .count();
        if diagonal_count == 1 {
        correct_queens_in_northeast_diagonal = correct_queens_in_northeast_diagonal + 1;
        }
    }    

    let mut correct_queens_in_southeast_diagonal = 0; 
    for i in -8..8 {
        let diag = Diagonal {row:i,col:0,row_offset:1,col_offset:1};
        let diagonal_count = diag
        .skip_while(|point|point.row < 0 || point.col < 0)
        .take_while(|point|point.col < 8 && point.row < 8)
        .map(|point| board[point.row as usize][point.col as usize])
        .filter(|ch| 'Q' == *ch)
        .count();
        if diagonal_count == 1 {
        correct_queens_in_southeast_diagonal = correct_queens_in_southeast_diagonal + 1;
        }
    }    

At the end of the method we’ll multiply all the row count values to get the fitness value. Given this the best possible solution would be 8*8*8*8.

    (if correct_queens_in_row == 0 { 1 } else { correct_queens_in_row })
    * (if correct_queens_in_column == 0 { 1 } else { correct_queens_in_column })
    * (if correct_queens_in_northeast_diagonal == 0 { 1 } else { correct_queens_in_northeast_diagonal })
    * (if correct_queens_in_southeast_diagonal == 0 { 1 } else { correct_queens_in_southeast_diagonal })
}

Language aside, rust doesn’t have a trinary operator but it does let you use an if statement inline.

Finally we need a main method to call the solver and verify the results. Once again we’ll create it as an integration test.

    #[test]
    fn test_8_queens() {
        let start = PreciseTime::now();
        let gene_set = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789@#";

        let wrapped_display = |candidate: &String| {display_8_queens(&candidate, gene_set, start);};
        let wrapped_get_fitness = |candidate: &String| -> usize {get_8_queens_fitness(&candidate, gene_set)};
    
        let best = genetic::get_best(wrapped_get_fitness, wrapped_display, 8, 8*8*8*8, gene_set);
        println!("Total time: {}", start.to(PreciseTime::now()));
        let best_fitness = get_8_queens_fitness(&best, gene_set);
        assert_eq!(best_fitness,8*8*8*8);
    }

When we run that test we may get a successful result like the following.

cargo test test_8_queens -- --nocapture
     Running target\debug\genetic-debe16ff205aab6a.exe

running 0 tests

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured

     Running target\debug\lib_8_queens_tests-afc3deed67cad86e.exe

running 1 test
. . Q . Q . . .
Q Q . . . . . .
Q . . . Q . . .
. . . . . . . .
. . . . . . . .
. . . . . . Q .
. . . . . . . .
. . Q . . . . .
ujcieqU6        120     PT0.003442522S
. . Q . Q . . .
Q . . . . . . .
Q . . . Q . . .
. . . . . . . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . Q . . . . .
uXcieqU6        192     PT0.004552093S
. . Q . Q . . .
Q . . . . . . .
Q . . . Q . . .
. . . . . Q . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . .
uXcieqUD        480     PT0.005474583S
. . Q . Q . . .
Q . . . . . . .
. . . . Q . . .
Q . . . . Q . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . .
uXcieyUD        640     PT0.006346324S
. . Q . Q . . .
Q . . . . . . .
. . . . Q . . .
Q . . . . . . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
Q . . . . . . .
uXcieyU4        648     PT0.007207255S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
Q . . . . . . .
eXcieyU4        700     PT0.009275650S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . . . . . . .
. . . . . Q Q .
. Q . . . . . .
. . . . . . . .
eXcieyUT        735     PT0.010128473S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . . . . . . .
. Q . . . Q Q .
. Q . . . . . .
. . . . . . . .
eXciPyUT        768     PT0.010938355S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . . . . . . .
. . . . . Q Q .
. Q . . . . . .
. . . . . . . Q
eXci#yUT        1152    PT0.011762050S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . .
Q . . . . . . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . Q
eXci#yU#        1225    PT0.013189927S
. . Q . Q . . .
Q . . . . . . .
. . . . . . . Q
Q . . . . . . .
. . . . . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . Q
eXci#yUx        1536    PT0.014965841S
. . Q . Q . . .
. . . . . . . .
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. Q . . . . . .
. . . . . . . Q
eXcJ#yUx        1728    PT0.021601343S
. . Q . Q . . .
. . . . . . . Q
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. . . . . . . .
. . . . . . . Q
epcJ#yUx        1920    PT0.023504280S
. . Q . . . . .
. . . . . . . Q
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. . . . Q . . .
. . . . . . . Q
0pcJ#yUx        2560    PT0.030906422S
. . Q . . . . .
. . . . . Q . .
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. . . . Q . . .
. . . . . . . Q
0ncJ#yUx        3072    PT0.166375236S
. . Q . . . . .
. . . . . Q . .
. . . . . . . Q
Q . . . . . . .
. . . Q . . . .
. . . . . . Q .
. . . . Q . . .
. Q . . . . . .
0ncJ5yUx        4096    PT0.192306457S
Total time: PT0.193881176S
test tests::test_8_queens ... ok

But the odds are against it… That’s because the Mutation genetic strategy can’t always solve this problem. For our solver to be able to find a solution every time we’re going to have to introduce a new strategy. That is the subject of Part 4.

The source code to this point is available on Github if you want to experiment.

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: