409 lines
13 KiB
Python
409 lines
13 KiB
Python
# ------------------------------------------------------------------------------
|
|
# Part 1 - Parse a bushfire scenario file
|
|
|
|
# Sample solution 1
|
|
|
|
def parse_scenario_solution_1(filename):
|
|
# read file
|
|
with open(filename) as f:
|
|
# get size
|
|
grid_size = int(f.readline())
|
|
# get fuel load grid
|
|
f_grid = []
|
|
for i in range(grid_size):
|
|
row = f.readline()
|
|
f_grid.append([int(x) for x in row.strip().split(',')])
|
|
# get height grid
|
|
h_grid = []
|
|
for i in range(grid_size):
|
|
row = f.readline()
|
|
h_grid.append([int(x) for x in row.strip().split(',')])
|
|
# get ignition threshold
|
|
i_threshold = int(f.readline())
|
|
# get wind direction
|
|
w_direction = f.readline().strip()
|
|
if w_direction == 'None':
|
|
w_direction = None
|
|
# get initial burning cells
|
|
burn_seeds = []
|
|
for row in f.readlines():
|
|
x, y = [int(x) for x in row.strip().split(',')]
|
|
burn_seeds.append((x, y))
|
|
# validate values
|
|
if i_threshold > 8 or i_threshold < 1:
|
|
return None
|
|
if w_direction not in ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', 'None']:
|
|
return None
|
|
for i, j in burn_seeds:
|
|
if i > grid_size-1 or j > grid_size-1:
|
|
return None
|
|
if f_grid[i][j] < 1:
|
|
return None
|
|
return {'f_grid': f_grid, 'h_grid': h_grid,
|
|
'i_threshold': i_threshold, 'w_direction': w_direction, 'burn_seeds':
|
|
burn_seeds}
|
|
|
|
# Sample solution 2
|
|
|
|
WIND_DIRECTIONS = {'N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', None}
|
|
|
|
def is_valid(data):
|
|
size = len(data['f_grid'])
|
|
if data['i_threshold'] < 1 or data['i_threshold'] > 8:
|
|
return False
|
|
if data['w_direction'] not in WIND_DIRECTIONS:
|
|
return False
|
|
for r, c in data['burn_seeds']:
|
|
if r < 0 or r >= size or c < 0 or c >= size:
|
|
return False
|
|
elif data['f_grid'][r][c] < 1:
|
|
return False
|
|
return True
|
|
|
|
def parse_scenario_solution_2(filename):
|
|
# Read the whole file.
|
|
with open(filename) as f:
|
|
lines = f.readlines()[::-1]
|
|
|
|
# Extract the size of the grid.
|
|
size = int(lines.pop())
|
|
|
|
# Extract the initial fuel values for each cell in the grid.
|
|
fuels = [list(map(int, lines.pop().split(','))) for r in range(size)]
|
|
|
|
# Extract the height values for each cell in the grid.
|
|
heights = [list(map(int, lines.pop().split(','))) for r in range(size)]
|
|
|
|
# Extract the ignition threshold.
|
|
ignition_threshold = int(lines.pop())
|
|
|
|
# Extract the initial wind direction.
|
|
wind_direction = lines.pop().strip()
|
|
if wind_direction == 'None':
|
|
wind_direction = None
|
|
|
|
# Extract the grid locations for the cells that are initially burning.
|
|
burning_cells = [tuple(map(int, line.split(','))) for line in lines[::-1]]
|
|
|
|
# Validate the data and return appropriately.
|
|
data = {
|
|
'f_grid': fuels,
|
|
'h_grid': heights,
|
|
'i_threshold': ignition_threshold,
|
|
'w_direction': wind_direction,
|
|
'burn_seeds': burning_cells,
|
|
}
|
|
return data if is_valid(data) else None
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Part 2 - Determine if a cell starts burning
|
|
|
|
# Sample solution 1
|
|
|
|
# ignition factors
|
|
UPHILL = 2
|
|
DOWNHILL = 0.5
|
|
|
|
def check_ignition_solution_1(b_grid, f_grid, h_grid, i_threshold, w_direction, i, j):
|
|
# False if no fuel at (i, j) or (i, j) already burning
|
|
if b_grid[i][j] or not f_grid[i][j]:
|
|
return False
|
|
|
|
# neighbouring cells
|
|
n_list = [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]
|
|
|
|
# supplement neighbour list based on wind direction
|
|
if w_direction == 'N':
|
|
n_list += [(-2, -1), (-2, 0), (-2, 1)]
|
|
elif w_direction == 'NE':
|
|
n_list += [(-2, 1), (-2, 2), (-1, 2)]
|
|
elif w_direction == 'E':
|
|
n_list += [(-1, 2), (0, 2), (1, 2)]
|
|
elif w_direction == 'SE':
|
|
n_list += [(1, 2), (2, 2), (2, 1)]
|
|
elif w_direction == 'S':
|
|
n_list += [(2, 1), (2, 0), (2, -1)]
|
|
elif w_direction == 'SW':
|
|
n_list += [(2, -1), (2, -2), (1, -2)]
|
|
elif w_direction == 'W':
|
|
n_list += [(-1, -2), (0, -2), (1, -2)]
|
|
elif w_direction == 'NW':
|
|
n_list += [(-1, -2), (-2, -2), (-2, -1)]
|
|
else:
|
|
pass # no (valid) wind direction
|
|
|
|
# get size
|
|
grid_size = len(b_grid)
|
|
|
|
# calculate ignition factor
|
|
i_factor = 0
|
|
for (d_i, d_j) in n_list:
|
|
# check neighbour is on the grid
|
|
if not (0 <= i + d_i < grid_size and 0 <= j + d_j < grid_size):
|
|
continue
|
|
# fire spreading uphill
|
|
if h_grid[i + d_i][j + d_j] < h_grid[i][j]:
|
|
i_factor += UPHILL * b_grid[i + d_i][j + d_j]
|
|
# fire spreading downhill
|
|
elif h_grid[i + d_i][j + d_j] > h_grid[i][j]:
|
|
i_factor += DOWNHILL * b_grid[i + d_i][j + d_j]
|
|
# fire spreading on level
|
|
else:
|
|
i_factor += b_grid[i + d_i][j + d_j]
|
|
|
|
return i_factor >= i_threshold
|
|
|
|
# Sample solution 2
|
|
|
|
WIND_DIRECTION_DELTAS = {
|
|
'N': [(-2, -1), (-2, 0), (-2, 1)],
|
|
'NE': [(-2, 1), (-2, 2), (-1, 2)],
|
|
'E': [(-1, 2), (0, 2), (1, 2)],
|
|
'SE': [(1, 2), (2, 2), (2, 1)],
|
|
'S': [(2, 1), (2, 0), (2, -1)],
|
|
'SW': [(1, -2), (2, -2), (2, -1)],
|
|
'W': [(-1, -2), (0, -2), (1, -2)],
|
|
'NW': [(-2, -1), (-2, -2), (-1, -2)],
|
|
None: [],
|
|
|
|
}
|
|
|
|
MOVE_DELTAS = [
|
|
(-1, -1), (-1, 0), (-1, 1),
|
|
(0, -1), (0, 1),
|
|
(1, -1), (1, 0), (1, 1),
|
|
]
|
|
|
|
# ignition factors
|
|
UPHILL = 2.0
|
|
LEVEL = 1.0
|
|
DOWNHILL = 0.5
|
|
|
|
def check_ignition_solution_2(b_grid, f_grid, h_grid, i_threshold, w_direction, i, j):
|
|
# If the cell is currently burning, bail.
|
|
if b_grid[i][j]:
|
|
return False
|
|
|
|
# If the cell has no fuel, bail.
|
|
if f_grid[i][j] == 0:
|
|
return False
|
|
|
|
# Work out the cells we need to check relative to the given cell.
|
|
deltas = list(MOVE_DELTAS)
|
|
if w_direction != '0':
|
|
deltas += WIND_DIRECTION_DELTAS[w_direction]
|
|
|
|
# Compute the ignition factor.
|
|
i_factor = 0.0
|
|
size = len(b_grid)
|
|
for di, dj in deltas:
|
|
# Compute the i, j value of the new cell.
|
|
ni, nj = i + di, j + dj
|
|
|
|
# Ensure that new cell is on the grid and that it's currently burning.
|
|
if ni < 0 or ni >= size or nj < 0 or nj >= size:
|
|
continue
|
|
elif not b_grid[ni][nj]:
|
|
continue
|
|
|
|
# Account for the height differential between cells.
|
|
dh = h_grid[ni][nj] - h_grid[i][j]
|
|
if dh == 0:
|
|
i_factor += LEVEL
|
|
elif dh < 0:
|
|
i_factor += UPHILL
|
|
else:
|
|
i_factor += DOWNHILL
|
|
|
|
return i_factor >= i_threshold
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Part 3 - Run the model
|
|
|
|
# Sample solution 1
|
|
|
|
def update_state(b_grid, f_grid, h_grid, i_threshold, w_direction):
|
|
# create matrices to store next state
|
|
b_grid_next = [[x for x in y] for y in b_grid]
|
|
f_grid_next = [[x for x in y] for y in f_grid]
|
|
ignitions = 0
|
|
grid_size = len(b_grid)
|
|
# update each cell in turn
|
|
for i in range(grid_size):
|
|
for j in range(grid_size):
|
|
# handle fuel depletion
|
|
if b_grid[i][j]:
|
|
f_grid_next[i][j] -= 1
|
|
# extinguish fire at (i, j) if fuel depleted
|
|
if f_grid_next[i][j] == 0:
|
|
b_grid_next[i][j] = False
|
|
# check for new ignitions
|
|
else:
|
|
new_ignition = check_ignition(b_grid, f_grid, h_grid,
|
|
i_threshold, w_direction, i, j)
|
|
if new_ignition:
|
|
ignitions += 1
|
|
b_grid_next[i][j] = True
|
|
return b_grid_next, f_grid_next, ignitions
|
|
|
|
def burning(b_grid):
|
|
# check if any cells are burning
|
|
r = [any(x) for x in b_grid]
|
|
return any(r)
|
|
|
|
def run_model(f_grid, h_grid, i_threshold, w_direction, seed_cells):
|
|
grid_size = len(f_grid)
|
|
# initialise burn grid
|
|
b_grid = [[False for _ in range(grid_size)] for _ in range(grid_size)]
|
|
burnt_cells = 0
|
|
for i, j in set(seed_cells):
|
|
if f_grid[i][j] > 0:
|
|
b_grid[i][j] = 1
|
|
burnt_cells += 1
|
|
# repeat updates while there is still fire present
|
|
while burning(b_grid):
|
|
b_grid, f_grid, ignitions = update_state(b_grid, f_grid, h_grid, i_threshold, w_direction)
|
|
burnt_cells += ignitions
|
|
return f_grid, burnt_cells
|
|
|
|
# Sample solution 2
|
|
|
|
import copy
|
|
import itertools
|
|
|
|
def run_model(f_grid, h_grid, i_threshold, w_direction, burn_seeds):
|
|
size = len(f_grid)
|
|
|
|
# Keep a set of all (i, j) pairs that have been burnt by fire.
|
|
burnt = set()
|
|
|
|
# Compute the initial burn grid.
|
|
b_grid = [[False] * size for _ in range(size)]
|
|
for i, j in burn_seeds:
|
|
b_grid[i][j] = True
|
|
burnt.add((i, j))
|
|
|
|
# Run the simulation while at least one cell is burning.
|
|
while any(itertools.chain.from_iterable(b_grid)):
|
|
new_b_grid = copy.deepcopy(b_grid)
|
|
new_f_grid = copy.deepcopy(f_grid)
|
|
|
|
# Work out what new cell should ignite.
|
|
for i in range(size):
|
|
for j in range(size):
|
|
if check_ignition(b_grid, f_grid, h_grid,
|
|
i_threshold, w_direction, i, j):
|
|
new_b_grid[i][j] = True
|
|
burnt.add((i, j))
|
|
|
|
# Decrease fuel for all currently burning cells.
|
|
for i in range(size):
|
|
for j in range(size):
|
|
if b_grid[i][j]:
|
|
new_f_grid[i][j] -= 1
|
|
if new_f_grid[i][j] == 0:
|
|
new_b_grid[i][j] = False
|
|
|
|
# Update our burn state for the next iteration.
|
|
b_grid = new_b_grid
|
|
f_grid = new_f_grid
|
|
|
|
return (f_grid, len(burnt))
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Part 4 - Test cases
|
|
|
|
# No solution provided.
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Part 5 - Bonus
|
|
|
|
from collections import defaultdict
|
|
|
|
# valid wind directions
|
|
w_dirs = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW', None]
|
|
|
|
def test_burn(f_grid, h_grid, i_threshold, town_cell):
|
|
'''
|
|
Run all possible prescribed burns on a landscape containing a town,
|
|
returning a dictionary of valid prescribed burn cells, together with the
|
|
fuel load following that burn.
|
|
'''
|
|
grid_size = len(f_grid)
|
|
town_i, town_j = town_cell
|
|
|
|
# maps valid prescribed burn cells to the subsequent fuel load matrix.
|
|
valid_burns = {}
|
|
|
|
# test prescribed burn from each valid starting cell
|
|
for i in range(grid_size):
|
|
for j in range(grid_size):
|
|
cur_seed = (i, j)
|
|
# skip town cell, and cells with zero fuel load
|
|
if cur_seed == town_cell or f_grid[i][j] == 0:
|
|
continue
|
|
|
|
# test prescribed burn
|
|
new_f_grid, burnt = run_model(f_grid, h_grid,
|
|
i_threshold * 2, None, [cur_seed])
|
|
# if town did not catch fire, add seed to valid burn cell dictionary
|
|
if new_f_grid[town_i][town_j] == f_grid[town_i][town_j]:
|
|
valid_burns[(cur_seed)] = new_f_grid
|
|
|
|
return valid_burns
|
|
|
|
def test_fire(f_grid, h_grid, i_threshold, town_cell):
|
|
'''
|
|
evaluate all possible bushfires (each starting cell, except town, and
|
|
each wind direction), returning the proportion of scenarios in which
|
|
the town was burnt
|
|
'''
|
|
grid_size = len(f_grid)
|
|
town_i, town_j = town_cell
|
|
|
|
# store True if town burnt, otherwise False
|
|
town_burnt = []
|
|
|
|
# test each starting cell
|
|
for i in range(grid_size):
|
|
for j in range(grid_size):
|
|
# skip town cell, and cells with zero fuel load
|
|
if (i, j) == town_cell or f_grid[i][j] == 0:
|
|
continue
|
|
|
|
# test each wind direction
|
|
for cur_w_dir in w_dirs:
|
|
new_f_grid, burnt = run_model(f_grid, h_grid,
|
|
i_threshold, cur_w_dir, [(i, j)])
|
|
# keep track of whether town was burnt
|
|
town_burnt.append(new_f_grid[town_i][town_j]
|
|
< f_grid[town_i][town_j])
|
|
|
|
if town_burnt:
|
|
return sum(town_burnt) / len(town_burnt)
|
|
else:
|
|
return 0
|
|
|
|
|
|
def plan_burn(f_grid, h_grid, i_threshold, town_cell):
|
|
'''
|
|
determine the optimal cells in which to conduct a prescribed burn in order
|
|
to reduce the probability of a future bushfire burning the town cell.
|
|
'''
|
|
# determine valid burn cells
|
|
valid_burns = test_burn(f_grid, h_grid, i_threshold, town_cell)
|
|
|
|
# build dictionary mapping burn scores to burn seeds
|
|
burn_scores = defaultdict(list)
|
|
|
|
# calculate burn score for each valid burn cell
|
|
for cur_seed, cur_f_grid in valid_burns.items():
|
|
cur_burnt = test_fire(cur_f_grid, h_grid, i_threshold, town_cell)
|
|
burn_scores[cur_burnt].append(cur_seed)
|
|
|
|
# sort burn scores, sort seeds for each burn score,
|
|
# and return list of optimal burn seeds
|
|
if burn_scores:
|
|
return [sorted(burn_scores[k]) for k in sorted(burn_scores)][0]
|
|
else:
|
|
return []
|