advent-of-code/2022/python/day16.py

227 lines
6.8 KiB
Python

import sys
import operator
import matrix
import shared
import scanf
from dataclasses import dataclass
from collections import defaultdict, deque
from typing import List, Dict
from pprint import pprint
import networkx as nx
from IPython.display import Image, display
@dataclass
class Valve:
label: str
rate: int
tunnels: List[str]
opened_at: int = -1
potential: Dict[str, int] = None
def set_potential(self, valves):
self.potential = {}
for tunnel in self.tunnels:
self.potential[tunnel] = valves[tunnel].rate
def highest_potential(self):
return max(self.potential, key=self.potential.get)
def parse(rows):
valves = {}
for row in rows:
left, right = row.split(" valve")
right = right.replace("s ", "").lstrip()
valve, rate = scanf.scanf("Valve %s has flow rate=%d; %*s %*s to", left)
tunnels = right.split(", ")
valves[valve] = Valve(label=valve, rate=rate, tunnels=tunnels)
for _, v in valves.items():
v.set_potential(valves)
return valves
def part1(rows, sample=False):
p1 = Part1(rows, sample, 30)
p1.run()
class Part1:
def __init__(self, rows, sample, minutes):
self.rows = rows
self.sample = sample
self.valves = parse(rows)
self.nonzero = {v.label: v for _, v in self.valves.items() if v.rate > 0}
self.cur = "AA"
self.tick = 1
self.minutes = minutes
self.g = nx.DiGraph()
self.path_distances = defaultdict(dict)
self.set_up_graph()
def draw(self):
pdot = nx.drawing.nx_pydot.to_pydot(self.g)
pdot.write_png("15.png")
def set_up_graph(self):
for lbl, v in self.valves.items():
for t in v.tunnels:
# self.g.add_edge(lbl, t, {'weight':self.valves[t].rate})
self.g.add_edge(lbl, t, weight=self.valves[t].rate)
all_keys = self.valves.keys()
l = dict(nx.all_pairs_shortest_path_length(self.g))
for lbl, _ in self.valves.items():
for other in all_keys:
if other == lbl:
continue
self.path_distances[lbl][other] = l[lbl][other]
# min(
# [len(x) for x in nx.shortest_simple_paths(self.g, lbl, other)]
# )
def do_tick(self, minute):
pressure = 0
opened = []
for _, valve in self.valves.items():
if valve.opened_at >= 0:
pressure += valve.rate
opened.append(valve.label)
print(f"== Min {minute+1}:: {len(opened)} Valves {', '.join(opened)} are open, releasing {pressure} pressure")
def calculate_total_flow(self):
total = 0
for label, valve in self.valves.items():
if valve.opened_at > 0:
total += valve.rate * (30 - valve.opened_at)
return total
def run(self):
paths = defaultdict(lambda: -1)
# lbl, flow, time_left, visited
q = deque([('AA', 0, self.minutes, set())])
x = -1
while q:
x+=1
# get recent nodes
pos, flow, minutes_left, cur_path = q.popleft()
# find who we can reach in time
reachable = [n for n in self.path_distances[pos]
if n not in cur_path # not visited
and self.path_distances[pos][n] < minutes_left]
hashable = frozenset(cur_path)
if paths[hashable] < flow:
paths[hashable] = flow
for r in reachable:
d = self.path_distances[pos][r]
r_flow = (minutes_left - d - 1) * self.valves[r].rate
# add neighbor
cur_path.add(r)
q.append((r, flow + r_flow, minutes_left - d -1, cur_path))
print("added",x,r)
print(paths.values())
print(max(paths.values()))
def _run(self):
# Construct the graph with vertices & edges from the input
# Call a function to compute the distances between every pair of vertices
# Create a closed set containing all the valves with non-zero rates
# At each step, iterate over the remaining set of closed, non-zero valves
# - Subtract the distance from remaining minutes
# - Calculate the flow (rate * remaining minutes)
# - Remove the recently opened valve from the closed set (functionally), so the deeper levels won't consider it
def priority(remaining):
print("REMAINING", remaining)
_pris = []
for _,n in self.nonzero.items():
# (time_remaining - distance_to_valve - 1) * flow rate
d = self.path_distances[self.cur][n.label]
pri = (remaining - d) * n.rate
_pris.append((n.label, pri, d))
_pris = list(sorted(_pris, key=operator.itemgetter(2,1)))
print(self.cur, end=' ')
pprint(_pris)
return _pris
remaining = self.minutes
open_order = []
while len(self.nonzero):
if remaining <= 0:
print("ran out of time")
break
self.do_tick(30-remaining)
# CALCULATE PRIORITIES
pris = priority(remaining)
#print(pris)
# GET HIGHEST PRIORITY label
nxt, _, distance = pris.pop(0)
#distance *= -1
# GET HIGHEST PRIORITY VALVE
n = self.nonzero[nxt]
# remove valve from dict
del self.nonzero[nxt]
# keep track of which order opened
open_order.append(n.label)
print("\tMoving to", nxt)
print("\tOpening", n.label)
#distance = self.path_distances[self.cur][n.label]
self.cur = n.label
self.valves[self.cur].opened_at = self.minutes - (remaining - 1)
# Tick tick tick
remaining -= distance # Move
print("\t\tMoved", distance,"distance/minutes")
remaining -= 1 # open
print("\t\tOpened",nxt,"1 minute")
print("total flow:", self.calculate_total_flow())
self.do_tick(30)
print(remaining)
print(open_order)
print("sample: 1651")
print("total flow:", self.calculate_total_flow())
def main():
sample = False
if sys.argv[-1] == "--sample":
sample = True
rows = [row for row in shared.load_rows(16)]
with shared.elapsed_timer() as elapsed:
part1(rows, sample)
print("🕒", elapsed())
# with shared.elapsed_timer() as elapsed:
# part2(rows, sample)
# print("🕒", elapsed())
print("The result for solution 1 is: 1820")
print("The result for solution 2 is: 2602")
if __name__ == "__main__":
main()