advent-of-code/2022/python/day16.py

227 lines
6.8 KiB
Python
Raw Normal View History

2022-12-16 06:13:24 +00:00
import sys
2022-12-17 04:44:57 +00:00
import operator
2022-12-16 06:13:24 +00:00
import matrix
import shared
import scanf
from dataclasses import dataclass
2022-12-18 06:36:15 +00:00
from collections import defaultdict, deque
2022-12-16 06:13:24 +00:00
from typing import List, Dict
from pprint import pprint
2022-12-17 03:07:17 +00:00
import networkx as nx
2022-12-16 06:13:24 +00:00
2022-12-17 03:07:17 +00:00
from IPython.display import Image, display
2022-12-16 06:13:24 +00:00
@dataclass
class Valve:
label: str
rate: int
tunnels: List[str]
opened_at: int = -1
2022-12-17 03:07:17 +00:00
potential: Dict[str, int] = None
2022-12-16 06:13:24 +00:00
def set_potential(self, valves):
self.potential = {}
for tunnel in self.tunnels:
self.potential[tunnel] = valves[tunnel].rate
def highest_potential(self):
return max(self.potential, key=self.potential.get)
2022-12-17 03:07:17 +00:00
2022-12-16 06:13:24 +00:00
def parse(rows):
valves = {}
for row in rows:
2022-12-16 06:19:30 +00:00
left, right = row.split(" valve")
right = right.replace("s ", "").lstrip()
2022-12-16 06:13:24 +00:00
valve, rate = scanf.scanf("Valve %s has flow rate=%d; %*s %*s to", left)
2022-12-16 06:19:30 +00:00
tunnels = right.split(", ")
2022-12-17 03:07:17 +00:00
valves[valve] = Valve(label=valve, rate=rate, tunnels=tunnels)
2022-12-16 06:13:24 +00:00
2022-12-17 03:07:17 +00:00
for _, v in valves.items():
2022-12-16 06:13:24 +00:00
v.set_potential(valves)
return valves
def part1(rows, sample=False):
2022-12-17 03:07:17 +00:00
p1 = Part1(rows, sample, 30)
2022-12-16 06:13:24 +00:00
p1.run()
class Part1:
def __init__(self, rows, sample, minutes):
self.rows = rows
self.sample = sample
self.valves = parse(rows)
2022-12-17 03:07:17 +00:00
self.nonzero = {v.label: v for _, v in self.valves.items() if v.rate > 0}
2022-12-17 04:44:57 +00:00
self.cur = "AA"
2022-12-16 06:13:24 +00:00
self.tick = 1
self.minutes = minutes
2022-12-17 03:07:17 +00:00
self.g = nx.DiGraph()
self.path_distances = defaultdict(dict)
self.set_up_graph()
def draw(self):
pdot = nx.drawing.nx_pydot.to_pydot(self.g)
pdot.write_png("15.png")
def set_up_graph(self):
for lbl, v in self.valves.items():
for t in v.tunnels:
# self.g.add_edge(lbl, t, {'weight':self.valves[t].rate})
self.g.add_edge(lbl, t, weight=self.valves[t].rate)
all_keys = self.valves.keys()
2022-12-17 04:44:57 +00:00
l = dict(nx.all_pairs_shortest_path_length(self.g))
2022-12-17 03:07:17 +00:00
for lbl, _ in self.valves.items():
for other in all_keys:
if other == lbl:
continue
2022-12-17 04:44:57 +00:00
self.path_distances[lbl][other] = l[lbl][other]
# min(
# [len(x) for x in nx.shortest_simple_paths(self.g, lbl, other)]
# )
2022-12-16 06:13:24 +00:00
def do_tick(self, minute):
2022-12-17 03:07:17 +00:00
pressure = 0
2022-12-16 06:13:24 +00:00
opened = []
for _, valve in self.valves.items():
2022-12-17 04:44:57 +00:00
if valve.opened_at >= 0:
2022-12-17 03:07:17 +00:00
pressure += valve.rate
opened.append(valve.label)
2022-12-18 06:36:15 +00:00
print(f"== Min {minute+1}:: {len(opened)} Valves {', '.join(opened)} are open, releasing {pressure} pressure")
2022-12-16 06:13:24 +00:00
def calculate_total_flow(self):
total = 0
for label, valve in self.valves.items():
2022-12-17 03:07:17 +00:00
if valve.opened_at > 0:
2022-12-16 06:13:24 +00:00
total += valve.rate * (30 - valve.opened_at)
return total
2022-12-18 06:36:15 +00:00
2022-12-16 06:13:24 +00:00
def run(self):
2022-12-18 06:36:15 +00:00
paths = defaultdict(lambda: -1)
# lbl, flow, time_left, visited
q = deque([('AA', 0, self.minutes, set())])
x = -1
while q:
x+=1
# get recent nodes
pos, flow, minutes_left, cur_path = q.popleft()
# find who we can reach in time
reachable = [n for n in self.path_distances[pos]
if n not in cur_path # not visited
and self.path_distances[pos][n] < minutes_left]
hashable = frozenset(cur_path)
if paths[hashable] < flow:
paths[hashable] = flow
for r in reachable:
d = self.path_distances[pos][r]
r_flow = (minutes_left - d - 1) * self.valves[r].rate
# add neighbor
cur_path.add(r)
q.append((r, flow + r_flow, minutes_left - d -1, cur_path))
print("added",x,r)
print(paths.values())
print(max(paths.values()))
def _run(self):
2022-12-17 03:07:17 +00:00
# Construct the graph with vertices & edges from the input
# Call a function to compute the distances between every pair of vertices
# Create a closed set containing all the valves with non-zero rates
# At each step, iterate over the remaining set of closed, non-zero valves
# - Subtract the distance from remaining minutes
# - Calculate the flow (rate * remaining minutes)
# - Remove the recently opened valve from the closed set (functionally), so the deeper levels won't consider it
def priority(remaining):
2022-12-17 04:44:57 +00:00
print("REMAINING", remaining)
2022-12-17 03:07:17 +00:00
_pris = []
for _,n in self.nonzero.items():
# (time_remaining - distance_to_valve - 1) * flow rate
2022-12-17 04:44:57 +00:00
d = self.path_distances[self.cur][n.label]
pri = (remaining - d) * n.rate
_pris.append((n.label, pri, d))
_pris = list(sorted(_pris, key=operator.itemgetter(2,1)))
print(self.cur, end=' ')
pprint(_pris)
2022-12-17 03:07:17 +00:00
return _pris
remaining = self.minutes
open_order = []
while len(self.nonzero):
if remaining <= 0:
print("ran out of time")
break
2022-12-18 06:36:15 +00:00
self.do_tick(30-remaining)
2022-12-17 04:44:57 +00:00
# CALCULATE PRIORITIES
2022-12-17 03:07:17 +00:00
pris = priority(remaining)
2022-12-17 04:44:57 +00:00
#print(pris)
# GET HIGHEST PRIORITY label
nxt, _, distance = pris.pop(0)
#distance *= -1
# GET HIGHEST PRIORITY VALVE
n = self.nonzero[nxt]
# remove valve from dict
del self.nonzero[nxt]
# keep track of which order opened
2022-12-17 03:07:17 +00:00
open_order.append(n.label)
2022-12-17 04:44:57 +00:00
print("\tMoving to", nxt)
print("\tOpening", n.label)
#distance = self.path_distances[self.cur][n.label]
self.cur = n.label
self.valves[self.cur].opened_at = self.minutes - (remaining - 1)
# Tick tick tick
2022-12-17 03:07:17 +00:00
remaining -= distance # Move
2022-12-17 04:44:57 +00:00
print("\t\tMoved", distance,"distance/minutes")
2022-12-17 03:07:17 +00:00
remaining -= 1 # open
2022-12-17 04:44:57 +00:00
print("\t\tOpened",nxt,"1 minute")
print("total flow:", self.calculate_total_flow())
self.do_tick(30)
2022-12-17 03:07:17 +00:00
print(remaining)
print(open_order)
print("sample: 1651")
print("total flow:", self.calculate_total_flow())
2022-12-16 06:13:24 +00:00
def main():
sample = False
if sys.argv[-1] == "--sample":
sample = True
rows = [row for row in shared.load_rows(16)]
with shared.elapsed_timer() as elapsed:
part1(rows, sample)
print("🕒", elapsed())
2022-12-17 03:07:17 +00:00
# with shared.elapsed_timer() as elapsed:
2022-12-16 06:13:24 +00:00
# part2(rows, sample)
# print("🕒", elapsed())
2022-12-17 04:44:57 +00:00
print("The result for solution 1 is: 1820")
print("The result for solution 2 is: 2602")
2022-12-16 06:13:24 +00:00
if __name__ == "__main__":
main()