import json
import os
import zipfile
from math import atan2, cos, sin
import numpy as np
from numpy import array, empty, sort, unique
from numpy.linalg import norm
from scipy.stats import binned_statistic, bootstrap
from pNeuma_simulator import params
from pNeuma_simulator.gang import decay
from pNeuma_simulator.initialization import ov
[docs]
def loader(permutation, path: str, verbose: bool = True):
"""Loads and returns the items from a JSON file within a zip archive.
Args:
permutation: The permutation to be used in the zip file name.
path (str): The path to the directory containing the zip file.
verbose (bool): Specify whether to print the file name
Returns:
list: The items loaded from the JSON file.
"""
with zipfile.ZipFile(f"{path}{permutation}.zip", "r") as ziph:
# ziph is zipfile handle
for filename in ziph.namelist():
if filename.endswith(").json"):
# Opening JSON file
with ziph.open(filename, "r") as openfile:
# Reading from JSON file
items = json.load(openfile)
if verbose:
print(openfile.name)
return items
[docs]
def aggregate(l_agents, n_cars: int, n_moto: int):
"""Calculate various aggregate metrics based on the given list of agents.
Args:
l_agents (list[Particle]): A list of agents. (#list(list(Particles))?)
n_cars (int): The number of cars.
n_moto (int): The number of motorcycles.
Returns:
VKT_cars (float): Vehicle Kilometers Traveled by cars.
VHT_cars (float): Vehicle Hours Traveled by cars.
VKT_moto (float): Vehicle Kilometers Traveled by motorcycles.
VHT_moto (float): Vehicle Hours Traveled by motorcycles.
"""
l_cars_dx = []
l_moto_dx = []
for t, agents in enumerate(l_agents):
cars_dx = 0
moto_dx = 0
for j, agent in enumerate(agents):
try:
speed = agent["speed"]
theta = agent["theta"]
dt = params.dt
except:
speed = norm(agent["vel"])
theta = atan2(agent["vel"][1], agent["vel"][0])
dt = 0.12
dx = speed * np.cos(theta) * dt
if j <= 2 * n_cars - 1:
cars_dx += dx
else:
moto_dx += dx
# Ignore transient
if t >= int(len(l_agents) * (1 - params.keep)):
l_cars_dx.append(cars_dx)
if n_moto > 0:
l_moto_dx.append(moto_dx)
VKT_cars = 1e-3 * sum(l_cars_dx)
VHT_cars = 2e-3 * n_cars * len(l_cars_dx) * dt / params.factor
if n_moto > 0:
VKT_moto = 1e-3 * sum(l_moto_dx)
VHT_moto = 1e-3 * n_moto * len(l_moto_dx) * dt / params.factor
else:
VKT_moto = None
VHT_moto = None
return VKT_cars, VHT_cars, VKT_moto, VHT_moto
[docs]
def intersect(p1, p2, p3, p4):
"""Calculates the intersection point of two line segments.
This function calculates the intersection point of the line segment
defined by points p1 and p2 with the line segment defined by points p3
and p4. If the line segments are parallel or do not intersect within
their lengths, the function returns None.
Args:
p1 (tuple): The first point of the first line segment, as a tuple (x1, y1).
p2 (tuple): The second point of the first line segment, as a tuple (x2, y2).
p3 (tuple): The first point of the second line segment, as a tuple (x3, y3).
p4 (tuple): The second point of the second line segment, as a tuple (x4, y4).
Returns:
tuple or None: The intersection point as a tuple (x, y) if the line
segments intersect within their lengths, otherwise None.
"""
# intersection between line(p1, p2) and line(p3, p4)
# https://stackoverflow.com/questions/3252194/
x1, y1 = p1
x2, y2 = p2
x3, y3 = p3
x4, y4 = p4
denom = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
if denom == 0: # parallel
return None
ua = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / denom
if ua < 0 or ua > 1: # out of range
return None
ub = ((x2 - x1) * (y1 - y3) - (y2 - y1) * (x1 - x3)) / denom
if ub < 0 or ub > 1: # out of range
return None
x = x1 + ua * (x2 - x1)
y = y1 + ua * (y2 - y1)
return (x, y)
[docs]
def normalized(surface, section):
"""Calculate normalized intersection points and values between surface curves and section diagonals.
Calculate the normalized intersection points and their corresponding values between the curves of a surface and the
diagonals of a section.
Args:
surface (object): An object representing the surface, which contains collections of curves and an
array of data values.
section (object): An object representing the section, which contains collections of diagonals.
Returns:
tuple: A tuple containing two lists:
- l_points (list): A list of lists, where each sublist contains the intersection points for each diagonal.
- l_response (list): A list of lists, where each sublist contains the corresponding values at the
intersection points for each diagonal.
"""
data = surface.get_array().data
curves = []
values = []
for n, collection in enumerate(surface.collections):
paths = collection.get_paths()
if len(paths) > 0:
for path in paths:
curves.append(path)
values.append(data[n])
diagonals = []
for collection in section.collections:
path = collection.get_paths()[0]
diagonals.append(path)
# find intersections
l_points = []
l_response = []
for diagonal in diagonals[:]:
points = []
response = []
# https://stackoverflow.com/questions/38151445/
diagonal_segments = list(zip(diagonal.vertices, diagonal.vertices[1:]))
intersecting = []
mask = []
for curve in curves:
if diagonal.intersects_path(curve):
mask.append(False)
intersecting.append(curve)
else:
mask.append(True)
elevations = array(values)[~array(mask)]
for diagonal_segment in diagonal_segments:
p1, p2 = diagonal_segment
for n, curve in enumerate(intersecting):
# https://stackoverflow.com/questions/38151445/
curve_segments = list(zip(curve.vertices, curve.vertices[1:]))
for curve_segment in curve_segments:
p3, p4 = curve_segment
intersection = intersect(p1, p2, p3, p4)
if intersection:
points.append(intersection)
response.append(elevations[n])
l_points.append(points)
l_response.append(response)
return l_points, l_response
[docs]
def percolate(items, n_cars, n_moto, rng, start: int = 1):
"""Analyzes the percolation of vehicles and motorcycles in a given dataset.
Args:
items (list): A list of items where each item is a list of frames. Each frame is a dictionary
containing vehicle data.
n_cars (int): The number of cars in the dataset.
n_moto (int): The number of motorcycles in the dataset.
rng (numpy.random.Generator): A random number generator instance for reproducibility.
start (int, optional): The starting frame index to consider for analysis. Defaults to 1.
Returns:
tuple: A tuple containing four lists:
- x (list): The bin centers for the binned data.
- y (list): The mean difference in velocity between motorcycles and cars for each bin.
- l_T (list): Control data (T).
- l_DPhi (list): Response data (Dphi).
- binder (list): The binder cumulant for each bin.
"""
l_T = []
l_DPhi = []
l_DPhi_2 = []
l_DPhi_4 = []
for item in items:
if isinstance(item[0], list):
n_veh = 2 * n_cars + n_moto
lam = empty(shape=n_veh, dtype=float)
v0 = empty(shape=n_veh, dtype=float)
s0 = empty(shape=n_veh, dtype=float)
for t, frame in enumerate(item[0]):
if t == 0:
for j, _ in enumerate(frame):
v0[j] = frame[j]["v0"]
lam[j] = frame[j]["lam"]
try:
s0[j] = frame[j]["s0"]
except:
s0[j] = frame[j]["d"]
v_max = list(ov(params.d_max, lam, v0, s0))
if t > start:
deg_range = []
vel_car = []
vel_x = []
vel_y = []
for j, _ in enumerate(frame):
try:
speed = item[0][t - 1][j]["speed"]
theta = item[0][t - 1][j]["theta"]
except:
speed = norm(item[0][t - 1][j]["vel"])
theta = atan2(item[0][t - 1][j]["vel"][1], item[0][t - 1][j]["vel"][0])
if j <= 2 * n_cars - 1:
vel_car.append(speed / v_max[j])
else:
alphas = decay(speed, theta)
degs = np.round(np.degrees(alphas), 2)
deg_range.append(degs[0] - degs[-1])
vel_x.append(speed * cos(theta) / v_max[j])
vel_y.append(speed * sin(theta) / v_max[j])
l_T.append(np.mean(deg_range))
phi_cars = np.mean(vel_car)
phi_moto = norm([np.sum(vel_x), np.sum(vel_y)]) / n_moto
DPhi = phi_moto - phi_cars
l_DPhi.append(DPhi)
l_DPhi_2.append(DPhi**2)
l_DPhi_4.append(DPhi**4)
l_T = np.round(l_T) / 2
bins = sort(unique(l_T))
y, bin_edges, _ = binned_statistic(l_T, l_DPhi, statistic="mean", bins=bins)
y_2, _, _ = binned_statistic(l_T, l_DPhi_2, statistic="mean", bins=bins)
y_4, _, _ = binned_statistic(l_T, l_DPhi_4, statistic="mean", bins=bins)
x = (bin_edges[1:] + bin_edges[:-1]) / 2
binder = 1 - y_4 / (3 * (y_2**2))
return list(x), list(y), list(l_T), list(l_DPhi), list(binder)
[docs]
def polarize(items, n_cars, n_moto, rng, start: int = 1):
"""Analyzes the polarization of motorcycles in a given dataset.
Args:
items (list): A list of items where each item is a list of frames. Each frame is a dictionary
containing vehicle data.
n_cars (int): The number of cars in the dataset.
n_moto (int): The number of motorcycles in the dataset.
rng (numpy.random.Generator): A random number generator instance for reproducibility.
start (int, optional): The starting frame index to consider for analysis. Defaults to 1.
Returns:
tuple: A tuple containing four lists:
- x (list): The bin centers for the binned data.
- y (list): The mean difference in velocity between motorcycles and cars for each bin.
- low (list): The lower bound of the confidence interval for each bin.
- high (list): The upper bound of the confidence interval for each bin.
"""
l_T = []
l_phi = []
for item in items:
if isinstance(item[0], list):
for t, frame in enumerate(item[0]):
if t > start:
deg_range = []
direction = np.array([0.0, 0.0])
for j, _ in enumerate(frame):
speed = item[0][t - 1][j]["speed"]
theta = item[0][t - 1][j]["theta"]
if j <= 2 * n_cars - 1:
pass
else:
alphas = decay(speed, theta)
degs = np.round(np.degrees(alphas), 2)
deg_range.append(degs[0] - degs[-1])
direction += np.array([np.cos(theta), np.sin(theta)])
l_T.append(np.mean(deg_range))
phi = norm(direction) / n_moto
l_phi.append(phi)
l_T = np.round(l_T) / 2
bins = np.sort(unique(l_T))
y, bin_edges, _ = binned_statistic(l_T, l_phi, statistic="mean", bins=bins)
x = (bin_edges[1:] + bin_edges[:-1]) / 2
low, _, _ = binned_statistic(l_T, l_phi, statistic=lambda y: confidence_interval(y, rng, "low"), bins=bins)
high, _, _ = binned_statistic(l_T, l_phi, statistic=lambda y: confidence_interval(y, rng, "high"), bins=bins)
return list(x), list(y), list(low), list(high)
[docs]
def zipdir(path: str, permutation, ziph) -> None:
"""Zip the directory at the given path.
Args:
path (str): The path of the directory to be zipped.
permutation (tuple): The permutation considered.
ziph: The zipfile handle.
"""
# ziph is zipfile handle
# https://stackoverflow.com/questions/1855095/
# https://stackoverflow.com/questions/36740683/
for root, _, files in os.walk(path):
for file in files:
if file.endswith(f"{permutation}.jsonl"):
os.chdir(root)
ziph.write(file)
os.remove(file)
path_parent = os.path.dirname(os.getcwd())
os.chdir(path_parent)
[docs]
def confidence_interval(data, rng, setting="sem"):
"""Calculate the confidence interval or standard error of the mean (SEM) for a given dataset.
Args:
data (array-like): The dataset for which the confidence interval or SEM is to be calculated.
rng (numpy.random.Generator): A random number generator instance for reproducibility.
setting (str, optional): The type of result to return. Options are:
- "low": Return the lower bound of the confidence interval.
- "high": Return the upper bound of the confidence interval.
- "sem": Return the standard error of the mean (default).
Returns:
float or None: The requested confidence interval bound or SEM if the dataset has more than one element,
otherwise None.
"""
if len(data) > 1:
# https://github.com/scipy/scipy/issues/14645
res = bootstrap(
(data,),
np.mean,
batch=10,
confidence_level=0.95,
random_state=rng,
method="basic",
)
if setting == "low":
return res.confidence_interval.low
elif setting == "high":
return res.confidence_interval.high
elif setting == "sem":
return res.standard_error
else:
return None