Source code for pNeuma_simulator.initialization.equilibrium

# from numpy import concatenate, repeat, cos, pi, isfinite, minimum, all
import numpy as np
from scipy.optimize import root_scalar
from scipy.stats import distributions

from pNeuma_simulator import params
from pNeuma_simulator.initialization import budget, f, vo
from pNeuma_simulator.utils import truncated_rvs


[docs] def synthetic_fd(n_veh: int, random_state, mode: str = "Car", distributed: bool = True): """ Generate synthetic fundamental diagram data. Args: n_veh (int): Number of vehicles. random_state (object): Random number generator. mode (str, optional): Mode of transportation. Defaults to "Car". distributed (bool, optional): Flag indicating if the data should be distributed. Defaults to True. Returns: tuple: Tuple containing the following: - list: List of marginal distributions. - numpy.ndarray: Array of lambda values. - numpy.ndarray: Array of desired speeds. - numpy.ndarray: Array of jam spacings. """ factor = params.factor lam, v0, d = 0, 0, 0 marginals = [] marginal_dists = [ distributions.fisk, distributions.lognorm, distributions.mielke, ] if mode == "Car": args = params.car_args bounds = params.car_bounds else: args = params.moto_args bounds = params.moto_bounds for r, marginal_dist in enumerate(marginal_dists): marginal = marginal_dist(*args[r]) marginals.append(marginal) if distributed: synthetic_data = [] for i in range(3): rng = random_state size = n_veh dist = marginal_dists[i] x_min, x_max = bounds[i] if len(args[i]) == 3: s, loc, scale = args[i] k = None else: k, s, loc, scale = args[i] values = truncated_rvs(rng, size, dist, x_min, x_max, k, s, loc, scale) synthetic_data.append(values) # slope at jam spacing s^-1 lam = synthetic_data[0] # desired speed in m/s v0 = synthetic_data[1] / factor # jam spacing m d = synthetic_data[2].copy() else: lam = np.repeat(marginals[0].mean(), n_veh) v0 = np.repeat(marginals[1].mean() / factor, n_veh) d = np.repeat(marginals[2].mean(), n_veh) return marginals, lam, v0, d
[docs] def equilibrium(L: float, lanes: int, n_cars: int, n_moto: int, rng: object, distributed: bool = True) -> tuple: """ Calculate the equilibrium state of a simulation, assuming lane-based equilibrium Args: L (float): Length of the simulation. lanes (int): Number of lanes in the simulation. n_cars (int): Number of cars in the simulation. n_moto (int): Number of motorcycles in the simulation. rng (object): Random number generator. distributed (bool, optional): Whether the vehicles are distributed or not. Defaults to True. Returns: tuple: A tuple for longitundinal dynamics, containing the adaptation time (tau), lambda (lam), initial velocity (v0), and distance (d). Note: The function assumes that the length (L) is large enough for convergence. The function uses synthetic_fd to generate synthetic fundamental diagrams for cars and motorcycles. The equilibrium speed (v_eq) is calculated using root_scalar. The function checks if the total sum of speeds and distances equals the expected value (lanes * L). The function calculates the adaptation time (tau) based on the equilibrium speeds and fundamental diagrams. The adaptation time is limited by the minimum of 2 / lambda and the calculated tau values. """ while True: # Caution !!! If L is too small this will never converge !!! n_veh = lanes * n_cars + n_moto _, c_lam, c_v0, c_d = synthetic_fd(n_cars * lanes, rng, mode="Car", distributed=distributed) _, m_lam, m_v0, m_d = synthetic_fd(n_moto, rng, mode="Moto", distributed=distributed) lam = np.concatenate((c_lam, m_lam)) v0 = np.concatenate((c_v0, m_v0)) d = np.concatenate((c_d, m_d)) lengths = np.concatenate((np.repeat(params.c_l, lanes * n_cars), np.repeat(params.m_l, n_moto))) def g(x): return budget(x, lam, v0, d, lengths, lanes * L) try: sol = root_scalar(g, bracket=[0, min(v0)]) except ValueError: continue # Equilibrium speed v_eq = sol.root s_eq = vo(v_eq, lam, v0, d) total = round(np.sum(s_eq) + np.sum(lengths), 3) if total == lanes * L: f_eq = f(s_eq, lam, v0, d) # Adaptation time prefactor = 1 + np.cos(2 * np.pi / n_veh) tau = 1 / (prefactor * f_eq) if all(np.isfinite(tau)): tau = np.minimum(tau, 1 / lam) break return (tau, lam, v0, d)