Source code for pNeuma_simulator.simulate

import warnings
from copy import deepcopy
from math import isinf, radians
from typing import Callable

import numpy as np
from joblib import Parallel, delayed
from numba import jit
from numpy.linalg import norm

from pNeuma_simulator import params
from pNeuma_simulator.contact_distance import ellipses
from pNeuma_simulator.gang import navigate
from pNeuma_simulator.gang.neighborhood import neighborhood
from pNeuma_simulator.initialization import PoissonDisc, equilibrium, ov
from pNeuma_simulator.shadowcasting import shadowcasting
from pNeuma_simulator.utils import direction, projection, tangent_dist


[docs] def main(n_cars: int, n_moto: int, seed: int, parallel: Callable, COUNT: int = 500, distributed: bool = True): """ Simulates the main loop of a pNeuma simulator. Args: n_cars (int): Number of cars. n_moto (int): Number of motorcycles. seed (int): Seed for the random number generator. parallel (Callable): Callable object for parallel execution. COUNT (int, optional): Number of iterations in the main loop. Defaults to 500. distributed (bool, optional): Flag indicating if the simulation is distributed. Defaults to True. Returns: Tuple: A tuple containing the list of serialized agents at each iteration and an empty list. """ # Code implementation... rng = np.random.default_rng(seed) ############################################### # Main loop ############################################### E = np.zeros((COUNT, 2 * n_cars + n_moto)) sampler = PoissonDisc( n_cars, n_moto, cell=params.cell, L=params.L, W=params.cell * 3, k=params.k, clearance=params.clearance, rng=rng ) # [car, car, ..., moto] samples, _ = sampler.sample(rng) agents = samples[: 2 * n_cars] if n_moto > 0: agents.extend(rng.choice(samples[2 * n_cars :], n_moto, replace=False)) l_agents = [] tau, lam, v0, d = equilibrium( params.L, params.lanes, n_cars, n_moto, rng, distributed=distributed, ) l_a = [] l_b = [] l_A = np.repeat(params.A, len(agents)) l_B = np.repeat(params.B, len(agents)) for n, agent in enumerate(agents): # Reassign IDs agent.ID = n + 1 agent.image = None agent.styles = None agent.tau = tau[n] agent.lam = lam[n] agent.v0 = v0[n] agent.d = d[n] l_a.append(agent.a) l_b.append(agent.b) for t in range(COUNT - 1): ###################### # Periodic boundary ###################### images = [] serial_agents = [] for agent in agents: if agent.x < -(params.L / 2 - (params.d_max + agent.l)): image = deepcopy(agent) image.x += params.L images.append(image) agent.image = image elif agent.x > params.L / 2 - (params.d_max + agent.l): image = deepcopy(agent) image.x -= params.L images.append(image) agent.image = image serial_agent = deepcopy(agent) serial_agent.pos = serial_agent.pos.tolist() serial_agent.vel = serial_agent.vel.tolist() serial_agents.append(serial_agent.encode()) l_agents.append(serial_agents) ############################## # Field of View analysis ############################## for image in agents + images: cos_angle = np.cos(np.pi - image.theta) sin_angle = np.sin(np.pi - image.theta) xc = params.xv - image.x yc = params.yv - image.y # https://stackoverflow.com/questions/37031356/ xct = xc * cos_angle - yc * sin_angle yct = xc * sin_angle + yc * cos_angle rad = xct**2 / image.l**2 + yct**2 / image.w**2 image.rad = rad matrices = [] origins = [] for agent in agents: matrix = np.zeros(params.shape) matrix[[0, -1]] = 1 for image in agents + images: if image.ID != agent.ID: matrix = identify(matrix, image.rad, image.ID) matrices.append(matrix) origin = np.unravel_index(agent.rad.argmin(), params.shape) origins.append(origin) tuples = parallel( delayed(shadowcasting)(i, j, params.grid, params.L, params.d_max) for i, j in zip(matrices, origins) ) for n, agent in enumerate(agents): interactions = tuples[n] agent.interactions = interactions.tolist() ################################################## # Navigation module ################################################## navigators = [] for agent in agents: interactions = agent.interactions if len(interactions) > 0: navigators.append(agent) if len(navigators) > 0: integers = rng.integers(1e8, size=len(navigators)) tuples = parallel( delayed(navigate)(navigator, agents, integer, params.d_max) for integer, navigator in zip(integers, navigators) ) for n, agent in enumerate(navigators): a0, a_des, f_a, ttc = tuples[n] agent.ttc = ttc if agent.mode == "Moto": agent.a0 = a0 agent.a_des = a_des agent.f_a = f_a.tolist() ################################ # Longitudinal dynamics ################################ l_theta = [] l_gap = [] l_pseudottc = [] l_vel = [] for agent in agents: l_vel.append(agent.vel) # Updated direction of i if agent.mode == "Moto": new_theta = agent.theta + params.dt * (agent.a_des - agent.theta) / params.adaptation_time theta_i = new_theta else: theta_i = agent.theta l_theta.append(theta_i) # Semiaxis dimensions of i l_i, w_i = agent.l, agent.w # Absolute position of i pos_i = agent.pos x_i, y_i = pos_i # Distance from walls k_w = tangent_dist(theta_i, 0, l_i, w_i) if theta_i >= radians(0.5): gap_w = (params.lane - y_i - k_w) / np.sin(theta_i) elif theta_i <= -radians(0.5): gap_w = (params.lane + y_i - k_w) / np.sin(-theta_i) else: gap_w = np.inf interactions = agent.interactions if len(interactions) > 0: gaps = [] neighbors = neighborhood(agent, agents) # Direction vector and its normal e_i, e_i_n = direction(theta_i) for neighbor in neighbors: # Semiaxis dimensions of j l_j, w_j = neighbor.l, neighbor.w # Absolute position of j pos_j = neighbor.pos x_j, y_j = pos_j # Direction of j theta_j = neighbor.theta # Check if neighbor is in front front, e_i_j, s_i_j = infront(e_i, pos_i, pos_j) if front: # Distance from tangent parallel to i k_h = tangent_dist(theta_j, theta_i, l_j, w_j) proj = projection(e_i_n, e_i_j, s_i_j) if proj <= params.scaling * w_i + k_h: # This is extremely important!!! # Distance of closest approach between i and j if proj == 0: min_d = l_i + l_j else: min_d = ellipses( l_j, w_j, l_i, w_i, x_j, y_j, x_i, y_i, theta_j, theta_i, ) gap = s_i_j - min_d else: gap = np.inf else: gap = np.inf gaps.append(gap) if np.isfinite(gaps).sum() > 0: leader = neighbors[np.argmin(gaps)] gap = min(gaps) agent.leader = leader.ID agent.gap = gap else: agent.leader = None if isinf(gap_w): agent.gap = params.d_max else: agent.gap = min([gap_w, params.d_max]) else: agent.leader = None if isinf(gap_w): agent.gap = params.d_max else: agent.gap = min([gap_w, params.d_max]) if agent.gap <= 0: return tuple(agent.pos) raise CollisionException("Accident occurred") # Retrieve inverse ttc if agent.ttc is None: pseudottc = 0 else: pseudottc = -1 / agent.ttc l_pseudottc.append(pseudottc) l_gap.append(agent.gap) dW = params.sqrtdt * rng.standard_normal(len(agents)) E[t + 1] = (1 - params.dt / np.array(l_b)) * E[t] + np.array(l_a) * dW V = norm(l_vel, axis=1) OV = ov(np.array(l_gap), lam, v0, d) + E[t + 1] V_des = OV * (0.5 * (1 + np.tanh(l_A * (np.array(l_pseudottc) + l_B)))) new_V = V + ((V_des - V) / tau) * params.dt new_V = np.maximum(new_V, 0) new_theta = np.array(l_theta) ################################## # Advance the simulation ################################## for n, agent in enumerate(agents): agent.advance(params.dt, new_V[n], new_theta[n]) agent.image = None return (l_agents, [])
[docs] def batch(seed: int, permutation: tuple): """ Run a batch simulation with the given seed and permutation. Args: seed (int): The seed for random number generation. permutation (tuple): A tuple containing the number of cars and motorcycles. Returns: tuple: A tuple containing the simulation results for cars and motorcycles. """ warnings.filterwarnings("ignore", category=RuntimeWarning) n_cars, n_moto = permutation with Parallel(n_jobs=-1, prefer="processes") as parallel: try: item = main(n_cars, n_moto, seed, parallel, params.COUNT) except CollisionException: item = (None, None) return item
[docs] class CollisionException(Exception): """Raised when agents collide""" # https://stackoverflow.com/questions/1319615 def __init__(self, message, payload=None): self.message = message self.payload = payload def __str__(self): return str(self.message)
[docs] @jit(nopython=True) def identify(matrix: np.ndarray, image_rad: np.ndarray, ID: int) -> np.ndarray: """ Identifies and replaces pixels in the matrix with the given ID based on a condition. Args: matrix (ndarray): The input matrix. image_rad (ndarray): The flattened array of image radii. ID (int): The ID to replace the pixels with. Returns: ndarray: The modified matrix with replaced pixels. """ # matrix to array ar_matrix = matrix.flatten() ar_rad = image_rad.flatten() ar_matrix[np.where(ar_rad < 1)] = ID matrix = ar_matrix.reshape(matrix.shape) return matrix
[docs] @jit(nopython=True) def infront(e_i, pos_i, pos_j): """ Determines if a neighbor is in front of a given position. Args: e_i: The direction vector of the current position. pos_i: The current position. pos_j: The position of the neighbor. Returns: front: A boolean indicating if the neighbor is in front of the current position. e_i_j: The unit vector from the current position to the neighbor. s_i_j: The distance from the current position to the neighbor. """ # Distance from i to j s_i_j = norm(pos_j - pos_i) # Unit vector from i to j e_i_j = (pos_j - pos_i) / s_i_j # Check if neighbor is in front front = np.dot(e_i, e_i_j) > 0 return front, e_i_j, s_i_j