Learning Python with Advent of Code Walkthroughs

Dazbo's Advent of Code solutions, written in Python

Pipes and Tunnels

Advent of Code 2022 - Day 16

Day 16: Proboscidea Volcanium

Useful Links

Concepts and Packages Demonstrated

regexGraphmemoizationrecursionFunctools caching (memoize)

Problem Intro

Well, this sucked.

So, we find ourselves in the cave system of a volcano, which contains pipes connected by valves, as well as a bunch of elephants. Standard.

Valves are connected to other valves by tunnels. Each valve starts closed. Opening a valve releases pressure at a specific flow rate, given in units of pressure per minute. We start at valve AA and it takes 1 minute to move from any valve to any adjacent valve. It also takes 1 minute to open any valve we arrive at.

Our input data looks something like this:

Valve AA has flow rate=0; tunnels lead to valves DD, II, BB
Valve BB has flow rate=13; tunnels lead to valves CC, AA
Valve CC has flow rate=2; tunnels lead to valves DD, BB
Valve DD has flow rate=20; tunnels lead to valves CC, AA, EE
Valve EE has flow rate=3; tunnels lead to valves FF, DD
Valve FF has flow rate=0; tunnels lead to valves EE, GG
Valve GG has flow rate=0; tunnels lead to valves FF, HH
Valve HH has flow rate=22; tunnel leads to valve GG
Valve II has flow rate=0; tunnels lead to valves AA, JJ
Valve JJ has flow rate=21; tunnel leads to valve II

Part 1

Work out the optimum steps to take, in order to release the most pressure in 30 minutes. What is the most pressure you can release?

I spent a fair bit of time trying to solve this with a Dijkstra solution, before giving up and looking for inspiration. After reading a couple of hints on Reddit, I settled for a memoization solution. I last used memoization when solving the 2021 Day 21 Dirac Dice problem. It’s basically recursion, caching any states we’ve seen before. Thus, if our recursion ever finds a state we’ve seen before, we can immediately return and avoid wasted recursion. This is also known as dynamic programming.

Our network of valves and tunnels can be represented using a graph of nodes and edges.

First, let’s read in the data:

def parse_input(data) -> dict[str, Valve]:
    pattern = re.compile(r"Valve ([A-Z]{2}) has flow rate=(\d+);.+[valve]s? (.+)")
    valves = {}
    for line in data:
        valve, rate, leads_to = pattern.findall(line)[0]
        valves[valve] = Valve(valve, int(rate), {x.strip() for x in leads_to.split(",")})
    
    return valves

Some notes about this regex:

We then split the last group at the comma to to return a set of of all the destination values.

I then use a Valve class to store what we’ve captured:

@dataclass(frozen=True, order=True) # state is mutable, but we hash only on other fields
class Valve():
    """ Valve has an ID, a flow rate, and valves it is connected to. """
    id: str    # E.g. "AA"
    rate: int  # E.g. 13
    leads_to: set[str]  # E.g. {"DD", "II", "BB"}

We can check perform our parsing and check the output:

def main():
    with open(INPUT_FILE, mode="rt") as f:
        data = f.read().splitlines()
        
    valves = parse_input(data)
    print("\n".join(str(valve) for valve in valves.values()))
Valve(id='GV', rate=23, leads_to={'WO'})
Valve(id='TS', rate=0, leads_to={'TX', 'IG'})
Valve(id='UC', rate=0, leads_to={'XJ', 'VZ'})
Valve(id='TJ', rate=0, leads_to={'GJ', 'YV'})
Valve(id='KF', rate=0, leads_to={'QY', 'VP'})
Valve(id='PO', rate=0, leads_to={'YF', 'VP'})
Valve(id='CV', rate=0, leads_to={'VB', 'QK'})
Valve(id='NK', rate=6, leads_to={'QY', 'DO', 'YH', 'MI', 'QJ'})
Valve(id='IG', rate=4, leads_to={'TS', 'OP', 'MI', 'FP', 'UV'})
Valve(id='KN', rate=0, leads_to={'RF', 'CY'})
// etc

Here’s the core of the solution:

    @functools.cache
    def calc_max_relief(opened, mins_remaining, curr_valve_id):
        """ Return maximum pressure that can be vented if we start at curr_valve)id, 
        with n minutes remaining. We need to embed this function to make our valves dict available. 
        We can't pass valves to the method, because it the dict is not hashable and can't be cached. """
        
        # Base case
        if mins_remaining <= 0:
            return 0

        most_relief = 0
        current_valve = valves[curr_valve_id]
        for neighbour in current_valve.leads_to:
            # Recurse for each neighbouring position
            most_relief = max(most_relief, calc_max_relief(opened, mins_remaining - 1, neighbour, elephant))

        # We only want to open valves that are closed, and where flow rate is > 0
        if curr_valve_id not in opened and current_valve.rate > 0 and mins_remaining > 0:
            opened = set(opened)
            opened.add(curr_valve_id)
            mins_remaining -= 1
            total_released = mins_remaining * current_valve.rate

            for neighbour in current_valve.leads_to:
                # Try each neighbour and recurse in. Save the best one.
                most_relief = max(most_relief, 
                           total_released + calc_max_relief(frozenset(opened), mins_remaining - 1, neighbour))

        return most_relief

It is a recursive function. The base case is to return 0 when we have run out of time. Otherwise, we recurse by calling calc_max_relief() for every possible adjacent valve. There are two things we can do in this recursion:

  1. We can move to an adjacent valve, which costs a minute. However, it does not release any pressure from a new valve.
  2. We can open the current valve, which costs a minute.
    • We can only do this if the current valve is closed.
    • Furthermore, it’s only worth doing if opening the current valve would improve the overall flow rate.
    • If we open a valve, then we need to add the total released by this valve for all remaining minutes, and then add this number to the most that can be released by the recursive call.

We simply want the recursive call that returns the largest number.

This function is itself nested in our main() function. The reason is that we want to be able to use the @functools.cache decorator to cache function responses for any given input to the function. But in order for this to work, all the input parameters need to be hashable. However, our valves variable is a dict and is not hashable. If our function was not nested in main(), then we would have to pass this variable as a parameter to the cached function. And this would be a PITA.

Caching is effective here, because the total number of states is much smaller than the total number permutations of actions we could take.

Part 2

We’re told we get an elephant to help us. Now we spend 4 minutes teaching the elephant how to open valves, leaving 26 minutes for me and the elephant to open valves to release the most pressure.

The only significant changes are:

    @functools.cache
    def calc_max_relief(opened, mins_remaining, curr_valve_id, elephant=False):
        """ Return maximum pressure that can be vented if we start at curr_valve)id, 
        with n minutes remaining.
        We need to embed this function to make our valves dict available. 
        We can't pass valves to the method, because it the dict is not hashable and can't be cached. """
        
        # Base case
        if mins_remaining <= 0:
            if elephant: # Perform again, but for the elephant
                return calc_max_relief(opened, 26, "AA")
            else:
                return 0

One way to think of this help is: I can make all my moves, and then the elephant can spend 26 minutes performing its moves. The crucial observation is that either me or the elephant can open any given valve; but a valve needs only be opened once. So, if the elephant moves after me, it doesn’t need to consider any valves that I’ve already opened.

Our recursive function now takes an elephant parameter, which defaults to False for Part 1.

Results

The final code:

from dataclasses import dataclass
from pathlib import Path
import re
import functools
import time

SCRIPT_DIR = Path(__file__).parent
# INPUT_FILE = Path(SCRIPT_DIR, "input/sample_input.txt")
INPUT_FILE = Path(SCRIPT_DIR, "input/input.txt")

# sys.setrecursionlimit(10000)

@dataclass(frozen=True)
class Valve():
    """ Valve has an ID, a flow rate, and valves it is connected to. """
    id: str    # E.g. "AA"
    rate: int  # E.g. 13
    leads_to: set[str]  # E.g. {"DD", "II", "BB"}
        
def main():
    with open(INPUT_FILE, mode="rt") as f:
        data = f.read().splitlines()
        
    valves = parse_input(data)
    print("\n".join(str(valve) for valve in valves.values()))

    @functools.cache
    def calc_max_relief(opened, mins_remaining, curr_valve_id, elephant=False):
        """ Return maximum pressure that can be vented if we start at curr_valve)id, 
        with n minutes remaining.
        We need to embed this function to make our valves dict available. 
        We can't pass valves to the method, because it the dict is not hashable and can't be cached. """
        
        # Base case
        if mins_remaining <= 0:
            if elephant: # Perform again, but for the elephant
                return calc_max_relief(opened, 26, "AA")
            else:
                return 0

        most_relief = 0
        current_valve = valves[curr_valve_id]
        for neighbour in current_valve.leads_to:
            # Recurse for each neighbouring position
            most_relief = max(most_relief, calc_max_relief(opened, mins_remaining - 1, neighbour, elephant))

        # We only want to open valves that are closed, and where flow rate is > 0
        if curr_valve_id not in opened and current_valve.rate > 0 and mins_remaining > 0:
            opened = set(opened)
            opened.add(curr_valve_id)
            mins_remaining -= 1
            total_released = mins_remaining * current_valve.rate

            for neighbour in current_valve.leads_to:
                # Try each neighbour and recurse in. Save the best one.
                most_relief = max(most_relief, 
                           total_released + calc_max_relief(frozenset(opened), mins_remaining - 1, neighbour, elephant))

        return most_relief

    print(f"Part 1: {calc_max_relief(frozenset(), 30, 'AA')}")
    print(f"Part 2: {calc_max_relief(frozenset(), 26, 'AA', True)}")

def parse_input(data) -> dict[str, Valve]:
    pattern = re.compile(r"Valve ([A-Z]{2}) has flow rate=(\d+);.+[valve]s? (.+)")
    valves = {}
    for line in data:
        valve, rate, leads_to = pattern.findall(line)[0]
        valves[valve] = Valve(valve, int(rate), {x.strip() for x in leads_to.split(",")})
    
    return valves

if __name__ == "__main__":
    t1 = time.perf_counter()
    main()
    t2 = time.perf_counter()
    print(f"Execution time: {t2 - t1:0.4f} seconds")

Here’s my output:

Part 1: 1617
Part 2: 2171
Execution time: 68.9002 seconds

Not particularly fast, but also not unbearably slow.