from copy import deepcopy
from math import cos, inf, isinf, pi, radians, sin
from typing import Callable
import numpy as np
from joblib import Parallel, delayed, parallel_backend
from numba import jit
from numpy.linalg import norm
from pNeuma_simulator import params
from pNeuma_simulator.contact_distance import ellipses
from pNeuma_simulator.gang import navigate
from pNeuma_simulator.gang.neighborhood import neighborhood
from pNeuma_simulator.initialization import PoissonDisc, equilibrium, ov
from pNeuma_simulator.shadowcasting import shadowcasting
from pNeuma_simulator.utils import direction, projection, tangent_dist
[docs]
def main(
n_cars: int,
n_moto: int,
seed: int,
parallel: Callable,
COUNT: int = 500,
distributed: bool = True,
stochastic: bool = True,
):
"""
Simulates the main loop of a pNeuma simulator.
Args:
n_cars (int): Number of cars.
n_moto (int): Number of motorcycles.
seed (int): Seed for the random number generator.
parallel (Callable): Callable object for parallel execution.
COUNT (int, optional): Number of iterations in the main loop. Defaults to 500.
distributed (bool, optional): Flag indicating if the simulation is distributed. Defaults to True.
stochastic (bool, optional): Flag indicating if the simulation is stochastic. Defaults to True.
Returns:
Tuple: A tuple containing the list of serialized agents at each iteration and an empty list.
"""
# Code implementation...
rng = np.random.default_rng(seed)
###############################################
# Main loop
###############################################
E = np.zeros((COUNT, 2 * n_cars + n_moto))
sampler = PoissonDisc(
n_cars, n_moto, cell=params.cell, L=params.L, W=params.cell * 3, k=params.k, clearance=params.clearance, rng=rng
)
# [car, car, ..., moto]
samples, _ = sampler.sample(rng)
agents = samples[: 2 * n_cars]
if n_moto > 0:
agents.extend(rng.choice(samples[2 * n_cars :], n_moto, replace=False))
l_agents = []
tau, lam, v0, s0 = equilibrium(
params.L,
params.lanes,
n_cars,
n_moto,
rng,
distributed=distributed,
)
l_a = []
l_b = []
l_A = np.repeat(params.A, len(agents))
l_B = np.repeat(params.B, len(agents))
for n, agent in enumerate(agents):
# Reassign IDs
agent.ID = n + 1
agent.image = None
agent.styles = None
agent.tau = tau[n]
agent.lam = lam[n]
agent.v0 = v0[n]
agent.s0 = s0[n]
l_a.append(agent.a)
l_b.append(agent.b)
for t in range(COUNT - 1):
######################
# Periodic boundary
######################
images = []
serial_agents = []
for agent in agents:
if agent.x < -(params.L / 2 - (params.d_max + agent.l)):
image = deepcopy(agent)
image.x += params.L
images.append(image)
agent.image = image
elif agent.x > params.L / 2 - (params.d_max + agent.l):
image = deepcopy(agent)
image.x -= params.L
images.append(image)
agent.image = image
serial_agent = deepcopy(agent)
serial_agent.pos = serial_agent.pos.tolist()
serial_agent.vel = serial_agent.vel.tolist()
serial_agents.append(serial_agent.encode(t))
l_agents.append(serial_agents)
##############################
# Field of View analysis
##############################
for image in agents + images:
cos_angle = cos(pi - image.theta)
sin_angle = sin(pi - image.theta)
xc = params.xv - image.x
yc = params.yv - image.y
# https://stackoverflow.com/questions/37031356/
xct = xc * cos_angle - yc * sin_angle
yct = xc * sin_angle + yc * cos_angle
rad = xct**2 / image.l**2 + yct**2 / image.w**2
image.rad = rad
matrices = []
origins = []
for agent in agents:
matrix = np.zeros(params.shape)
matrix[[0, -1]] = 1
for image in agents + images:
if image.ID != agent.ID:
matrix = identify(matrix, image.rad, image.ID)
matrices.append(matrix)
origin = np.unravel_index(agent.rad.argmin(), params.shape)
origins.append(origin)
tuples = parallel(
delayed(shadowcasting)(i, j, params.grid, params.L, params.d_max) for i, j in zip(matrices, origins)
)
for n, agent in enumerate(agents):
interactions = tuples[n]
agent.interactions = interactions.tolist()
##################################################
# Navigation module
##################################################
navigators = []
for agent in agents:
interactions = agent.interactions
if len(interactions) > 0:
navigators.append(agent)
if len(navigators) > 0:
tuples = parallel(delayed(navigate)(navigator, agents) for navigator in navigators)
for n, agent in enumerate(navigators):
a0, f_a, ttc = tuples[n]
agent.ttc = ttc
if agent.mode == "Moto":
agent.a0 = a0
agent.f_a = f_a.tolist()
################################
# Longitudinal dynamics
################################
l_theta = []
l_speed = []
l_gap = []
l_pseudottc = []
for agent in agents:
l_speed.append(agent.speed)
# Updated direction of i
if agent.mode == "Moto":
new_theta = agent.theta + params.dt * (agent.a0 - agent.theta) / params.tau
theta_i = new_theta
else:
theta_i = agent.theta
l_theta.append(theta_i)
# Semiaxis dimensions of i
l_i, w_i = agent.l, agent.w
# Absolute position of i
pos_i = agent.pos
x_i, y_i = pos_i
# Distance from walls
k_w = tangent_dist(theta_i, 0, l_i, w_i)
if theta_i >= radians(params.da):
gap_w = (params.lane - y_i - k_w) / sin(theta_i)
elif theta_i <= -radians(params.da):
gap_w = (params.lane + y_i - k_w) / sin(-theta_i)
else:
gap_w = inf
interactions = agent.interactions
if len(interactions) > 0:
gaps = []
neighbors = neighborhood(agent, agents)
# Direction vector and its normal
e_i, e_i_n = direction(theta_i)
for neighbor in neighbors:
# Semiaxis dimensions of j
l_j, w_j = neighbor.l, neighbor.w
# Absolute position of j
pos_j = neighbor.pos
x_j, y_j = pos_j
# Direction of j
theta_j = neighbor.theta
# Check if neighbor is in front
front, e_i_j, s_i_j = infront(e_i, pos_i, pos_j)
if front:
# Distance from tangent parallel to i
k_h = tangent_dist(theta_j, theta_i, l_j, w_j)
proj = projection(e_i_n, e_i_j, s_i_j)
# This is extremely important!!!
if proj <= params.scaling * w_i + k_h:
# Distance of closest approach between i and j
if proj == 0:
min_d = l_i + l_j
else:
min_d = ellipses(
l_j,
w_j,
l_i,
w_i,
x_j,
y_j,
x_i,
y_i,
theta_j,
theta_i,
)
gap = s_i_j - min_d
else:
gap = inf
else:
gap = inf
gaps.append(gap)
if np.isfinite(gaps).sum() > 0:
leader = neighbors[np.argmin(gaps)]
gap = min(gaps)
agent.leader = leader.ID
agent.gap = gap
else:
agent.leader = None
if isinf(gap_w):
agent.gap = params.d_max
else:
agent.gap = min([gap_w, params.d_max])
else:
agent.leader = None
if isinf(gap_w):
agent.gap = params.d_max
else:
agent.gap = min([gap_w, params.d_max])
if agent.gap <= 0:
return tuple(agent.pos)
raise CollisionException("Accident occurred")
# Retrieve inverse ttc
if agent.ttc is None:
pseudottc = 0
else:
pseudottc = -1 / agent.ttc
l_pseudottc.append(pseudottc)
l_gap.append(agent.gap)
if stochastic:
dW = params.sqrtdt * rng.standard_normal(len(agents))
E[t + 1] = (1 - params.dt / np.array(l_b)) * E[t] + np.array(l_a) * dW
OV = ov(np.array(l_gap), lam, v0, s0) + E[t + 1]
else:
OV = ov(np.array(l_gap), lam, v0, s0)
V_des = OV * (0.5 * (1 + np.tanh(l_A * (np.array(l_pseudottc) + l_B))))
V = np.array(l_speed)
new_V = V + ((V_des - V) / tau) * params.dt
new_V = np.maximum(new_V, 0)
new_theta = np.array(l_theta)
##################################
# Advance the simulation
##################################
for n, agent in enumerate(agents):
agent.advance(params.dt, new_V[n], new_theta[n])
agent.image = None
return (l_agents, [])
[docs]
def batch(seed: int, permutation: tuple, n_jobs: int, distributed: bool = True, stochastic: bool = True):
"""
Run a batch simulation with the given seed and permutation.
Args:
seed (int): The seed for random number generation.
permutation (tuple): A tuple containing the number of cars and motorcycles.
n_jobs (int): Number of parallel jobs.
distributed (bool, optional): Flag indicating if the simulation is distributed. Defaults to True.
stochastic (bool, optional): Flag indicating if the simulation is stochastic. Defaults to True.
Returns:
tuple: A tuple containing the simulation results for cars and motorcycles.
"""
n_cars, n_moto = permutation
with parallel_backend("loky", inner_max_num_threads=n_jobs):
with Parallel(n_jobs=n_jobs) as parallel:
try:
item = main(n_cars, n_moto, seed, parallel, params.COUNT, distributed, stochastic)
except CollisionException:
item = (None, None)
return item
[docs]
class CollisionException(Exception):
"""Raised when agents collide"""
# https://stackoverflow.com/questions/1319615
def __init__(self, message, payload=None):
self.message = message
self.payload = payload
def __str__(self):
return str(self.message)
[docs]
@jit(nopython=True)
def identify(matrix: np.ndarray, image_rad: np.ndarray, ID: int) -> np.ndarray:
"""
Identifies and replaces pixels in the matrix with the given ID based on a condition.
Args:
matrix (ndarray): The input matrix.
image_rad (ndarray): The flattened array of image radii.
ID (int): The ID to replace the pixels with.
Returns:
ndarray: The modified matrix with replaced pixels.
"""
# matrix to array
ar_matrix = matrix.flatten()
ar_rad = image_rad.flatten()
ar_matrix[np.where(ar_rad < 1)] = ID
matrix = ar_matrix.reshape(matrix.shape)
return matrix
[docs]
@jit(nopython=True)
def infront(e_i, pos_i, pos_j):
"""
Determines if a neighbor is in front of a given position.
Args:
e_i: The direction vector of the current position.
pos_i: The current position.
pos_j: The position of the neighbor.
Returns:
front: A boolean indicating if the neighbor is in front of the current position.
e_i_j: The unit vector from the current position to the neighbor.
s_i_j: The distance from the current position to the neighbor.
"""
# Distance from i to j
s_i_j = norm(pos_j - pos_i)
# Unit vector from i to j
e_i_j = (pos_j - pos_i) / s_i_j
# Check if neighbor is in front
front = np.dot(e_i, e_i_j) > 0
return front, e_i_j, s_i_j