Think about you’re on a trip in Honolulu, Hawaii, and also you get an alert saying the hurricane/tsunami/earthquake is coming in 20 minutes, what would you do? To what extent are you able to belief the route plans out of your cellular apps?
Right here the same sceniro is being studied, utilizing instruments like python packages, and couple of APIs (Directions API). Two (fairly primitive) strategies will likely be launched right here to get the most effective route planning, extra strategies are on the way in which (together with utilizing LLM).
Very first thing first, let’s get a way of honolulu and design a diseaster zone, I designed the middle of diseaster to be at 21°17’48.8″N 157°50’57.8″W, (longitude is -157.8494, latitude is 21.2969), and it extends 0.05 levels (about 5.55km) outwards. So contained in the diseaster circle (blue), it’s harmful.
import osmnx as ox
import geopandas as gpd
import folium
import random
from shapely.geometry import Leveldef initialize_map(center_latitude, center_longitude, radius=0.05):
"""
Initialize a Folium map centered across the catastrophe zone and add the catastrophe circle.
"""
# Create a map centered across the catastrophe zone
m = folium.Map(location=[center_latitude, center_longitude], zoom_start=14)
# Create and add the catastrophe zone circle
center_point = Level(center_longitude, center_latitude)
disaster_zone_circle = center_point.buffer(radius)
folium.GeoJson(disaster_zone_circle).add_to(m)
return m
longitude = -157.8494
latitude = 21.2969
# Initialize map as soon as
m = initialize_map(latitude, longitude, radius=0.05)
m
Now think about we discovered 10 individuals (ranging from a small dataset) which might be contained in the diseaster zone, and:
- these persons are randomly generated and keep contained in the ‘harmful zone’
- these persons are guarnteed to be someplace on the street
- they’re of their automobiles
def generate_random_points_inside_circle(m, center_longitude, center_latitude, num_points=10, radius=0.05):
"""
Generate random evacuation factors on roads inside a round catastrophe zone.
"""
# Create catastrophe zone circle
center_point = Level(center_longitude, center_latitude)
disaster_zone_circle = center_point.buffer(radius)# Load the street community across the catastrophe zone
G = ox.graph_from_point((center_latitude, center_longitude), dist=5000, network_type="drive")
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)
# Filter nodes inside the catastrophe zone
nodes_in_zone = nodes[nodes.geometry.within(disaster_zone_circle)]
# Randomly choose nodes inside the catastrophe zone
selected_nodes = nodes_in_zone.pattern(n=num_points)
points_inside = []
# Add chosen nodes as evacuation factors
for idx, node in selected_nodes.iterrows():
folium.Marker(
location=[node.geometry.y, node.geometry.x],
icon=folium.Icon(colour="pink"),
popup=f"Evacuation Level {idx} (Inside)"
).add_to(m)
points_inside.append([node.geometry.y, node.geometry.x])
return m, points_inside
# Add factors contained in the catastrophe zone to the map
m, points_inside= generate_random_points_inside_circle(m, longitude, latitude, num_points=10, radius=0.05)
m
So, the place ought to they escape to? Effectively, they had been advised, by the announcement earlier, that they will go to five shelters which might be shut (inside 5km) outdoors of the diseaster zone, let’s simulate these 5 factors as nicely.
def generate_evacuation_points_outside_circle(m, center_longitude, center_latitude, num_points=5, radius=0.05):
"""
Generate evacuation factors on roads outdoors a round catastrophe zone.
"""
# Create catastrophe zone circle
center_point = Level(center_longitude, center_latitude)
disaster_zone_circle = center_point.buffer(radius)# Load the street community across the catastrophe zone
G = ox.graph_from_point((center_latitude, center_longitude), dist=5000, network_type="drive")
nodes, edges = ox.graph_to_gdfs(G, nodes=True, edges=True)
# Filter nodes outdoors the catastrophe zone
nodes_outside_zone = nodes[~nodes.geometry.within(disaster_zone_circle)]
# Randomly choose nodes outdoors the catastrophe zone
selected_nodes = nodes_outside_zone.pattern(n=num_points)
points_outside = []
# Add chosen nodes as evacuation factors
for idx, node in selected_nodes.iterrows():
folium.Marker(
location=[node.geometry.y, node.geometry.x],
icon=folium.Icon(colour="blue"),
popup=f"Evacuation Level {idx} (Exterior)"
).add_to(m)
points_outside.append([node.geometry.y, node.geometry.x])
return m, points_outside
# Add factors outdoors the catastrophe zone to the map
m, points_outside = generate_evacuation_points_outside_circle(m, longitude, latitude, num_points=5, radius=0.05)
m
Okay nice, now we have now the diseaster zone to flee from, and secure factors to go to, what does that sound like? Sure, begin and finish factors, which implies we will construct routes!
Now with easy dijkstra’s algorithm, we will add the routes to the map
def get_nearest_nodes(G, factors):
"""
Given a listing of factors (lat, lon), discover the closest node within the graph G for every level.
"""
nearest_nodes = []
for level in factors:
nearest_node = ox.distance.nearest_nodes(G, level[1], level[0]) # notice order (lat, lon)
nearest_nodes.append(nearest_node)
return nearest_nodesdef find_best_routes(G, start_points, end_points):
"""
Calculate the shortest path from every begin level to the closest finish level utilizing Dijkstra's algorithm.
"""
routes = []
start_nodes = get_nearest_nodes(G, start_points)
end_nodes = get_nearest_nodes(G, end_points)
for start_node in start_nodes:
shortest_route = None
shortest_length = float('inf')
for end_node in end_nodes:
strive:
size = nx.shortest_path_length(G, start_node, end_node, weight="size")
if size < shortest_length:
shortest_length = size
shortest_route = nx.shortest_path(G, start_node, end_node, weight="size")
besides nx.NetworkXNoPath:
proceed # Skip if no path is discovered
if shortest_route:
routes.append(shortest_route)
return routes
def add_routes_to_map(G, m, routes):
"""
Add the calculated routes to the Folium map.
"""
for route in routes:
route_coords = [(G.nodes[node]['y'], G.nodes[node]['x']) for node in route]
folium.PolyLine(route_coords, colour="inexperienced", weight=2.5, opacity=0.8).add_to(m)
# Discover finest routes from factors inside to nearest factors outdoors
routes = find_best_routes(G, points_inside, points_outside)
# Add routes to map
add_routes_to_map(G, m, routes)
# Show the map
m
That is essentially the most primitive mannequin, how can we enhance upon it? Effectively, we discover that within the dijstra’s algo we calculate the shortest path by this:
nx.shortest_path_length(G, start_node, end_node, weight="size")
It merely compute the shortest path primarily based on size of the graph. That is very static, we will enhance through the use of some API to present us actual time knowledge, and get the most effective path by the shortest travelling time in actual time. However earlier than that, let’s erease the paths we simply created.
# Perform to initialize/reset the map
def reset_map(center_latitude, center_longitude, points_inside, points_outside, radius=0.05):
"""
Initializes a brand new map, re-plotting the catastrophe zone and evacuation factors.
"""
# Create the map centered on the catastrophe zone
m = folium.Map(location=[center_latitude, center_longitude], zoom_start=14)# Create and add the catastrophe zone circle
center_point = Level(center_longitude, center_latitude)
disaster_zone_circle = center_point.buffer(radius)
folium.GeoJson(disaster_zone_circle).add_to(m)
# Plot factors contained in the catastrophe zone (evacuation factors)
for idx, level in enumerate(points_inside):
folium.Marker(
location=[point[0], level[1]],
icon=folium.Icon(colour="pink"),
popup=f"Evacuation Level {idx} (Inside)"
).add_to(m)
# Plot factors outdoors the catastrophe zone (secure zones)
for idx, level in enumerate(points_outside):
folium.Marker(
location=[point[0], level[1]],
icon=folium.Icon(colour="blue"),
popup=f"Protected Zone {idx} (Exterior)"
).add_to(m)
return m
m = reset_map(latitude, longitude, points_inside, points_outside, radius=0.05)
m
First initialize API:
import json
import requests# Exchange 'your_new_api_key' together with your precise API key
api_key_data = {
"distance_api_key": "your_API_Key"
}
# Save the API key in a JSON file
with open("google_api_key.json", "w") as json_file:
json.dump(api_key_data, json_file)
print("API key saved to google_api_key.json")
# Load the API key from the JSON file
with open("google_api_key.json", "r") as json_file:
key_data = json.load(json_file)
# Get the API key from the dictionary
DIST_API_KEY = key_data['distance_api_key']
Then apply the strategy, notice this time we don’t use networkx, we seize the encoded polyline knowledge for a route between the origin and vacation spot utilizing the Google Instructions API.
import requests
import folium
import polyline
import math# Haversine method to calculate the gap between two factors in kilometers
def calculate_distance(point1, point2):
"""
Calculate the great-circle distance between two factors
given in (latitude, longitude) format utilizing the Haversine method.
"""
lat1, lon1 = point1
lat2, lon2 = point2
radius = 6371 # Earth radius in kilometers
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = radius * c
return distance
# Perform to get the route knowledge from Google Instructions API
def get_directions_route(origin, vacation spot, api_key):
"""
Fetches route knowledge from Google Instructions API.
Returns a listing of (lat, lng) tuples if profitable, or None if there may be an error.
"""
url = f"https://maps.googleapis.com/maps/api/instructions/json?origin={origin}&vacation spot={vacation spot}&key={api_key}"
response = requests.get(url)
knowledge = response.json()
if knowledge['status'] == 'OK':
strive:
# Extract polyline knowledge for the route
route_polyline = knowledge['routes'][0]['overview_polyline']['points']
# Decode polyline to get particular person latitude/longitude factors
route_points = polyline.decode(route_polyline)
return route_points
besides (KeyError, IndexError):
print(f"Route knowledge unavailable for {origin} to {vacation spot}")
return None
else:
print(f"Error fetching route knowledge: {knowledge['status']}")
return None
# Perform to seek out the most effective route primarily based on journey occasions and plot it on the map
def plot_best_routes(m, points_inside, points_outside, api_key):
"""
For every evacuation level contained in the catastrophe zone, discover the closest path to a secure zone and plot it on the map.
Additionally, save every chosen route for analysis.
"""
routes = [] # Retailer all finest routes for analysis
for start_point in points_inside:
best_route_points = None
shortest_distance = float('inf') # Initialize with a big worth
for end_point in points_outside:
origin = f"{start_point[0]},{start_point[1]}"
vacation spot = f"{end_point[0]},{end_point[1]}"
# Get the detailed route factors from Google API
route_points = get_directions_route(origin, vacation spot, api_key)
if route_points:
# Calculate straight-line distance between factors
distance = calculate_distance(start_point, end_point)
# Select the route with the shortest distance
if distance < shortest_distance:
shortest_distance = distance
best_route_points = route_points
# Plot the most effective route for this begin level on the map
if best_route_points:
folium.PolyLine(
places=[(point[0], level[1]) for level in best_route_points],
colour="inexperienced",
weight=2.5,
opacity=0.8,
tooltip="Greatest Route Based mostly on Google Instructions API"
).add_to(m)
routes.append(best_route_points) # Save the most effective route for this begin level
return m, routes
# Assuming you have already got an initialized folium map `m`, hazard zone factors `points_inside`,
# secure zone factors `points_outside`, and your API key `DIST_API_KEY`
# Plot the most effective routes and likewise retrieve the listing of chosen routes
m, routes = plot_best_routes(m, points_inside, points_outside, DIST_API_KEY)
The polyline knowledge is decoded into particular person latitude and longitude factors, which represents the trail Google recommends for that journey. Google gives what it considers the “finest” route, usually primarily based on elements corresponding to journey time, distance, street situations, and visitors knowledge. Nonetheless, when you don’t specify any preferences, this “finest” route is normally the default (usually the quickest route).
Now evaluate the 2 set of route plans:
We are able to see certainly they’re completely different. Additionally since Instructions API enable actual time data, it allows the chance to do significant re-run and re-route because the evacuees are evacuating.
However evaluating them facet by facet does not likely inform us how good or unhealthy every one is, we have to consider some analysis mechanisms to perform that.
I’ll primarily embody the next standards (with weights), one can change these primarily based on the exhausting and comfortable standards for their very own venture. Right here
weights = {
'total_distance': 0.2,
'total_travel_time': 0.3,
'average_travel_time': 0.1,
'unique_safe_zones_covered': 0.1,
'route_redundancy': 0.1,
'distance_from_danger': 0.1,
'estimated_emissions': 0.05,
'congestion_likelihood': 0.05
}
Listed here are the capabilities that carry out the analysis:
# Complete Distance for a set of routes
def total_distance(routes):
return sum(sum(calculate_distance(routes[i][j], routes[i][j + 1]) for j in vary(len(routes[i]) - 1)) for i in vary(len(routes)))# Complete Journey Time for a set of routes (requires real-time API knowledge)
def total_travel_time(routes, api_key):
total_time = 0
for route in routes:
origin = f"{route[0][0]},{route[0][1]}"
vacation spot = f"{route[-1][0]},{route[-1][1]}"
url = f"https://maps.googleapis.com/maps/api/instructions/json?origin={origin}&vacation spot={vacation spot}&key={api_key}"
response = requests.get(url)
knowledge = response.json()
if knowledge['status'] == 'OK':
route_time = knowledge['routes'][0]['legs'][0]['duration']['value'] / 60 # in minutes
total_time += route_time
else:
print(f"Error fetching time for route: {knowledge['status']}")
return total_time
# Common Journey Time
def average_travel_time(routes, api_key):
return total_travel_time(routes, api_key) / len(routes)
# Distinctive Protected Zones Coated
def unique_safe_zones_covered(routes, safe_zones, tolerance_km=0.1):
"""
Calculate the variety of distinctive secure zones lined by checking if the route vacation spot
is inside a specified tolerance (in km) of any secure zone.
"""
covered_zones = set()
for route in routes:
vacation spot = route[-1]
for safe_zone in safe_zones:
# Calculate the gap between the route vacation spot and the secure zone
distance = calculate_distance(vacation spot, safe_zone)
# If the gap is inside the tolerance, think about it as masking the secure zone
if distance <= tolerance_km:
covered_zones.add(tuple(safe_zone))
break # No must examine different secure zones for this route
return len(covered_zones)
# Route Redundancy (overlap rely)
def route_redundancy(routes):
overlap_count = 0
for i in vary(len(routes)):
for j in vary(i + 1, len(routes)):
overlap_count += len(set(routes[i]) & set(routes[j])) # Depend overlapping segments
return overlap_count
# Security Proximity - distance from the catastrophe zone
def average_distance_from_danger(routes, danger_zone_center, danger_radius):
def point_in_circle(level, middle, radius):
return calculate_distance(level, middle) <= radius
total_distance_from_danger = 0
for route in routes:
for level in route:
if point_in_circle(level, danger_zone_center, danger_radius):
dist = calculate_distance(level, danger_zone_center)
total_distance_from_danger += dist
return total_distance_from_danger / len(routes)
# Congestion Probability primarily based on visitors density (use API knowledge if out there)
def congestion_likelihood(routes, api_key):
congestion_score = 0
for route in routes:
origin = f"{route[0][0]},{route[0][1]}"
vacation spot = f"{route[-1][0]},{route[-1][1]}"
url = f"https://maps.googleapis.com/maps/api/instructions/json?origin={origin}&vacation spot={vacation spot}&departure_time=now&key={api_key}"
response = requests.get(url)
knowledge = response.json()
if knowledge['status'] == 'OK':
traffic_time = knowledge['routes'][0]['legs'][0]['duration_in_traffic']['value'] / 60 # in minutes
regular_time = knowledge['routes'][0]['legs'][0]['duration']['value'] / 60
congestion_score += (traffic_time - regular_time) / regular_time # % improve
else:
print(f"Error fetching visitors knowledge for route: {knowledge['status']}")
return congestion_score / len(routes)
We are able to put them collectively on this name
def evaluate_routes(routes, safe_zones, danger_zone_center, danger_radius, api_key, weights):
# Calculate uncooked metric values with out models
scores = {
'total_distance': total_distance(routes),
'total_travel_time': total_travel_time(routes, api_key),
'average_travel_time': average_travel_time(routes, api_key),
'unique_safe_zones_covered': unique_safe_zones_covered(routes, safe_zones),
'route_redundancy': route_redundancy(routes),
'distance_from_danger': average_distance_from_danger(routes, danger_zone_center, danger_radius),
'congestion_likelihood': congestion_likelihood(routes, api_key) * 100 # Convert to share
}# Apply weights to every rating to calculate a remaining weighted rating
final_score = sum(scores[metric] * weights[metric] for metric in scores)
# Put together show model with models
scores_with_units = {
'total_distance': f"{scores['total_distance']} km",
'total_travel_time': f"{scores['total_travel_time']} minutes",
'average_travel_time': f"{scores['average_travel_time']} minutes",
'unique_safe_zones_covered': scores['unique_safe_zones_covered'], # No unit
'route_redundancy': scores['route_redundancy'], # No unit
'distance_from_danger': f"{scores['distance_from_danger']} km",
'congestion_likelihood': f"{scores['congestion_likelihood']} %"
}
# Return the scores with models for show and the ultimate weighted rating
return scores_with_units, final_score
Then assign weights and run!
# Outline weights (modify in line with precedence)
weights = {
'total_distance': 0.2,
'total_travel_time': 0.3,
'average_travel_time': 0.1,
'unique_safe_zones_covered': 0.1,
'route_redundancy': 0.1,
'distance_from_danger': 0.1,
'estimated_emissions': 0.05,
'congestion_likelihood': 0.05
}# Protected zones and hazard zone data
safe_zones = points_outside # Instance secure zones (Tokyo, LA)
danger_zone_center = (21.2969, -157.8494) # Instance catastrophe location (Tokyo)
danger_radius = 5.55 # in km
# Consider the routes
scores, final_score = evaluate_routes(routes, safe_zones, danger_zone_center, danger_radius, DIST_API_KEY, weights)
print("Particular person Scores for Metrics:", scores)
print("Ultimate Weighted Rating:", final_score)
It offers the next output:
Particular person Scores for Metrics:
total_distance: 41.54628599546889 km total_travel_time: 90.41666666666667 minutes
average_travel_time: 9.041666666666668 minutes
unique_safe_zones_covered: 4
route_redundancy: 131
distance_from_danger: 240.15978651315126 km
congestion_likelihood: 2.169499016140182 %
Ultimate Weighted Rating: 73.96287746788258
The remaining weighted rating is the infinity stone that can be utilized in LLM megascript (use the routes and the rating as immediate for the LLM).
Subsequently the analysis perform will likely be vital, and my group will improve these as we do extra Exploratory Spatial Information Evaluation (ESDA).
Now for an entire run, I’ll put the analysis name after every run, after rerunning every little thing, for the 2 strategies:
I get two scores respectively:
Now we will do extra enchancment on the scoring perform, or in higher phrases: the target perform.
For instance because the knowledge have completely different models, so doing a normalization could possibly be useful, I’ve additionally adjusted the weights, I need to prioritize the optimization of journey occasions, not a lot emphasis on route_redundancy and congestion_likelihood. After all sooner or later our group might present extra sophiscated analysis capabilities primarily based on the street situations and geospatial knowledge to make it higher.
def normalize(worth, min_val, max_val, maximize=True):
"""
Normalize a metric to a 0-1 vary. If maximize is True, 1 corresponds to larger values.
"""
if maximize:
return (worth - min_val) / (max_val - min_val)
else:
return (max_val - worth) / (max_val - min_val)def evaluate_routes_v2(routes, safe_zones, danger_zone_center, danger_radius, api_key):
# Calculate uncooked metrics
raw_scores = {
'total_distance': total_distance(routes),
'total_travel_time': total_travel_time(routes, api_key),
'average_travel_time': average_travel_time(routes, api_key),
'unique_safe_zones_covered': unique_safe_zones_covered(routes, safe_zones),
'route_redundancy': route_redundancy(routes),
'distance_from_danger': average_distance_from_danger(routes, danger_zone_center, danger_radius),
'congestion_likelihood': congestion_likelihood(routes, api_key)
}
# Set ranges (these values ought to ideally be primarily based on noticed or sensible ranges)
ranges = {
'total_distance': (0, 100), # e.g., 0 to 100 km
'total_travel_time': (0, 120), # e.g., 0 to 120 minutes
'average_travel_time': (0, 30), # e.g., 0 to 30 minutes
'unique_safe_zones_covered': (0, len(safe_zones)), # e.g., 0 to the variety of secure zones
'route_redundancy': (0, 50), # e.g., 0 to 50 redundant factors
'distance_from_danger': (0, 50), # e.g., 0 to 50 km
'congestion_likelihood': (0, 1) # e.g., 0% to 100% as a fraction
}
# Normalized scores with their optimization course
normalized_scores = {
'total_distance': normalize(raw_scores['total_distance'], *ranges['total_distance'], maximize=False),
'total_travel_time': normalize(raw_scores['total_travel_time'], *ranges['total_travel_time'], maximize=False),
'average_travel_time': normalize(raw_scores['average_travel_time'], *ranges['average_travel_time'], maximize=False),
'unique_safe_zones_covered': normalize(raw_scores['unique_safe_zones_covered'], *ranges['unique_safe_zones_covered'], maximize=True),
'route_redundancy': normalize(raw_scores['route_redundancy'], *ranges['route_redundancy'], maximize=False),
'distance_from_danger': normalize(raw_scores['distance_from_danger'], *ranges['distance_from_danger'], maximize=True),
'congestion_likelihood': normalize(raw_scores['congestion_likelihood'], *ranges['congestion_likelihood'], maximize=False)
}
# Weights
weights = {
'total_distance': 0.15,
'total_travel_time': 0.25,
'average_travel_time': 0.15,
'unique_safe_zones_covered': 0.2,
'route_redundancy': 0.05,
'distance_from_danger': 0.1,
'congestion_likelihood': 0.05
}
# Calculate remaining weighted rating
final_score = sum(normalized_scores[metric] * weights[metric] for metric in normalized_scores)
return normalized_scores, final_score
Additionally added penalty primarily based on thresholds:
def apply_penalties(scores, thresholds):
penalties = {
'total_travel_time': 0 if scores['total_travel_time'] <= thresholds['total_travel_time'] else scores['total_travel_time'] - thresholds['total_travel_time'],
'route_redundancy': 0 if scores['route_redundancy'] <= thresholds['route_redundancy'] else scores['route_redundancy'] - thresholds['route_redundancy'],
'congestion_likelihood': 0 if scores['congestion_likelihood'] <= thresholds['congestion_likelihood'] else scores['congestion_likelihood'] - thresholds['congestion_likelihood']
}
return penalties# Thresholds and weights
thresholds = {
'total_travel_time': 60, # Instance threshold for whole journey time in minutes
'route_redundancy': 20, # Instance threshold for route redundancy
'congestion_likelihood': 0.5 # Instance threshold for congestion probability as fraction
}
Here’s what the output appears like for the newer model:
Keep tuned for once I use LLM to do the route planning!