risk_biased_prediction / scripts /scripts_utils /generate_dataset_waymo.py
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import math
import os
import fire
import numpy as np
import pickle
import tensorflow as tf
from tqdm import tqdm
from waymo_open_dataset.protos import scenario_pb2
def scalar_to_one_hot(length, index, has_zero=False):
if has_zero:
offset = 1
offset = 0
assert 0 <= index < length + offset
if index + 1 - offset > 0:
one_hot_type = np.eye(length)[index - offset]
one_hot_type = np.zeros(length)
return one_hot_type
def group_tracks(tracks):
object_types = {
state_size = 11
traj = np.zeros((len(tracks), len(tracks[0].states), state_size))
mask_traj = np.zeros((len(tracks), len(tracks[0].states)), dtype=bool)
traj_type = np.zeros((len(tracks), len(object_types) - 1))
id_to_idx = {}
for i_track, track in enumerate(tracks):
traj_type[i_track, :] = scalar_to_one_hot(
len(object_types) - 1, track.object_type, has_zero=True
id_to_idx[track.id] = i_track
for i_time, state in enumerate(track.states):
if state.valid:
traj[i_track, i_time, 0] = state.center_x
traj[i_track, i_time, 1] = state.center_y
traj[i_track, i_time, 2] = state.heading
traj[i_track, i_time, 3] = state.velocity_x
traj[i_track, i_time, 4] = state.velocity_y
traj[i_track, i_time, 5] = state.width
traj[i_track, i_time, 6] = state.length
traj[i_track, i_time, 7:11] = traj_type[i_track, :]
mask_traj[i_track, i_time] = state.center_x != 0 or state.center_y != 0
mask_traj[i_track, i_time] = False
# Remove trajectories that are masked for the whole time
mask_any_time = mask_traj.any(-1)
to_delete = []
for key, value in id_to_idx.items():
if not mask_any_time[value]:
id_to_idx[key] = np.sum(mask_any_time[:value])
for key in to_delete:
del id_to_idx[key]
traj = traj[mask_any_time]
traj_type = traj_type[mask_any_time]
mask_traj = mask_traj[mask_any_time]
# traj:(n_agents, seq_time, features), mask:(n_agents, seq_time), traj_type:(n_agents, features)
assert (traj[..., :2][mask_traj] != 0).any(-1).all()
return traj, mask_traj, traj_type, id_to_idx
def filter_tracks(
distances2 = ((trajs[:, :, :2] - pos[None, None, :]) ** 2).sum(-1).min(1)
first_non_0_pos = np.take_along_axis(
trajs, np.argmax(mask_trajs, 1)[:, None, None], axis=1
is_moving = (
np.abs((trajs[:, :, :2] - first_non_0_pos[:, 0:1, :2]) * mask_trajs[:, :, None])
> 1
filtered = np.zeros_like(distances2, dtype=bool)
filtered[is_moving] = distances2[is_moving] < max_moving_distance**2
filtered[np.logical_not(is_moving)] = (
distances2[np.logical_not(is_moving)] < max_static_distance**2
filtered = np.logical_or(filtered, mask_keep)
# Filter out trajectories
to_delete = []
idx_to_id = {}
for key, value in id_to_idx.items():
if not filtered[value]:
new_value = np.sum(filtered[:value])
idx_to_id[new_value] = key
id_to_idx[key] = new_value
for key in to_delete:
del id_to_idx[key]
trajs = trajs[filtered]
trajs_type = trajs_type[filtered]
mask_trajs = mask_trajs[filtered]
to_predict = to_predict[filtered]
if mask_keep.all():
return trajs, mask_trajs, trajs_type, to_predict, id_to_idx
# Sort entries from closest to furthest to input pos
distances2 = distances2[filtered]
distance_sort = np.argsort(distances2)
copy_trajs = trajs.copy()
copy_mask_trajs = mask_trajs.copy()
copy_trajs_type = trajs_type.copy()
copy_to_predict = to_predict.copy()
skip = np.argmin(mask_keep)
assert skip > 1
offset = skip
for i, idx in enumerate(distance_sort[skip:]):
if idx > skip:
ii = i + offset
trajs[ii] = copy_trajs[idx]
trajs_type[ii] = copy_trajs_type[idx]
mask_trajs[ii] = copy_mask_trajs[idx]
to_predict[ii] = copy_to_predict[idx]
id_to_idx[idx_to_id[idx]] = ii
offset -= 1
assert (trajs[..., :2][mask_trajs] != 0).any(-1).all()
return trajs, mask_trajs, trajs_type, to_predict, id_to_idx
def cut_lane(lane, pos, max_len):
center_idx = np.argmin(((lane - pos[None, :]) ** 2).sum(-1))
start = max(0, center_idx - max_len // 2)
return lane[start : start + max_len, :]
def group_lanes(map, center, max_lane_len, max_lane_distance):
all_objects = []
all_types = []
max_len = 0
id_to_idx = {}
stride = 2
max_lane_len = max_lane_len * stride
for object in map:
# Type one_hot encoding is as follows: 0: lane, 1: stop_sign, 2: cross_walk, 3: speed_bump
lane = object.lane.polyline
is_cut_lane = len(lane) > max_lane_len
len_lane = min(len(lane), max_lane_len)
len_cross_walk = len(object.crosswalk.polygon)
len_speed_bump = len(object.speed_bump.polygon)
num_obj_types = 4
max_len = max(max_len, len_lane)
max_len = max(max_len, len_cross_walk)
max_len = max(max_len, len_speed_bump)
if len_lane > 0:
current_lane = np.zeros((len(lane), 2))
for i_point, cw in enumerate(lane):
current_lane[i_point, 0] = cw.x
current_lane[i_point, 1] = cw.y
if is_cut_lane:
current_lane = cut_lane(current_lane, center, max_lane_len)
min_distance2 = np.min(((current_lane - center[None, :]) ** 2).sum(-1))
if min_distance2 < max_lane_distance**2:
id_to_idx[object.id] = len(all_objects)
all_types.append(scalar_to_one_hot(num_obj_types, 0))
# elif len_cross_walk > 0:
# current_cross_walk = np.zeros((len_cross_walk, 2))
# for i_point, cw in enumerate(object.crosswalk.polygon):
# current_cross_walk[i_point, 0] = cw.x
# current_cross_walk[i_point, 1] = cw.y
# all_objects.append(current_cross_walk)
# all_types.append(scalar_to_one_hot(num_obj_types, 2))
# elif len_speed_bump > 0:
# current_speed_bump = np.zeros((len_speed_bump, 2))
# for i_point, cw in enumerate(object.speed_bump.polygon):
# current_speed_bump[i_point, 0] = cw.x
# current_speed_bump[i_point, 1] = cw.y
# all_objects.append(current_speed_bump)
# all_types.append(scalar_to_one_hot(num_obj_types, 3))
# elif not (object.stop_sign.position.x == 0 and object.stop_sign.position.y == 0):
# all_objects.append([np.array([object.stop_sign.position.x, object.stop_sign.position.y])])
# all_types.append(scalar_to_one_hot(num_obj_types, 1))
object_array = np.zeros((len(all_objects), (max_len + 1) // stride, 2))
mask_object_array = np.zeros(
(len(all_objects), (max_len + 1) // stride), dtype=bool
object_types_array = np.zeros((len(all_types), num_obj_types))
for i_object, object in enumerate(all_objects):
len_object = (len(object) + 1) // stride
object_array[i_object, :len_object, :] = object[::2]
mask_object_array[i_object, :len_object] = True
object_types_array[i_object] = all_types[i_object]
# for i, lane in enumerate(object_array):
# plt.plot(lane[mask_object_array[i, :], 0], lane[mask_object_array[i, :], 1], alpha=0.3)
idx_to_id = {value: key for key, value in id_to_idx.items()}
# Sort entries from closest to furthest to input center
distances2 = np.min(((object_array - center[None, None, :]) ** 2).sum(-1), 1)
distance_sort = np.argsort(distances2)
copy_object = object_array.copy()
copy_mask_object = mask_object_array.copy()
copy_type = object_types_array.copy()
for i, idx in enumerate(distance_sort):
object_array[i] = copy_object[idx]
mask_object_array[i] = copy_mask_object[idx]
object_types_array[i] = copy_type[idx]
id_to_idx[idx_to_id[idx]] = i
return object_array, mask_object_array, object_types_array, id_to_idx
def group_light_signals(light_signals, id_to_idx, n_map_objects):
state_to_idx = {
len_time = len(light_signals)
all_lanes_states = np.zeros((n_map_objects, len_time, len(state_to_idx) - 1))
for t, lanes_states in enumerate(light_signals):
for lane in lanes_states.lane_states:
if lane.lane in id_to_idx.keys():
all_lanes_states[id_to_idx[lane.lane], t, :] = scalar_to_one_hot(
len(state_to_idx) - 1, lane.state, True
# (n_objects, seq_time, features)
return all_lanes_states
def normalize_all(traj, map, pos, angle):
c = math.cos(angle)
s = math.sin(angle)
rotation_mat = np.array([[c, s], [-s, c]])
traj_clone = traj.clone()
traj_clone[..., :2] = (
traj_clone[..., :2] - pos.reshape(([1] * (traj.ndim - 1)) + [2])
) @ rotation_mat
traj_clone[..., 2] = (traj_clone[..., 2] + angle + np.pi) % (2 * np.pi) - np.pi
if traj.shape[-1] >= 5:
traj_clone[..., 3:5] = traj_clone[..., 3:5] @ rotation_mat
map_clone = (map.clone() - pos.reshape(([1] * (map.ndim - 1)) + [2])) @ rotation_mat
return traj_clone, map_clone
def fill_gaps(trajs, mask_in):
If trajectories are partially observed with gaps (observed then not then observed again), fill the gaps with interpolations.
trajs: size (n_agents, time, features) features are organized as [x, y, angle, vx, vy, other_features ]
mask = mask_in.copy()
first_non_zeros = np.argmax(mask, 1)
last_non_zeros = mask.shape[1] - np.argmax(np.flip(mask, 1), 1)
has_gaps = np.logical_and(
last_non_zeros - first_non_zeros > np.maximum(mask.sum(1), 1), mask.sum(1) > 1
if not has_gaps.any():
# No gap to fill, returning the input
return trajs
# iterate over agents
for i in range(trajs.shape[0]):
if has_gaps[i]:
left = first_non_zeros[i]
right = first_non_zeros[i]
for t in range(first_non_zeros[i], last_non_zeros[i]):
if mask[i, t] and left == right:
left += 1
elif mask[i, t]:
mask[i, t] = True
right += 1
# Linear filling for positions:
trajs[i, left:right, :2] = (np.arange(right - left) / (right - left))[
:, None
] * (trajs[i, right, :2] - trajs[i, left - 1, :2])[None, :] + trajs[
i, left - 1 : left, :2
# Linear filling for velocities and the rest:
trajs[i, left:right, 3:] = (np.arange(right - left) / (right - left))[
:, None
] * (trajs[i, right, 3:] - trajs[i, left - 1, 3:])[None, :] + trajs[
i, left - 1 : left, 3:
# Linear filling for angles (periodicity doesn't allow direct interpolation):
cos_traj = np.cos(trajs[i, left - 1 : right + 1, 2])
sin_traj = np.sin(trajs[i, left - 1 : right + 1, 2])
cos_traj = (np.arange(right - left) / (right - left)) * (
cos_traj[-1] - cos_traj[0]
) + cos_traj[0]
sin_traj = (np.arange(right - left) / (right - left)) * (
sin_traj[-1] - sin_traj[0]
) + sin_traj[0]
trajs[i, left:right, 2] = np.arctan2(sin_traj, cos_traj)
# Only the first gap was filled, recursive call to complete others
return fill_gaps(trajs, mask)
def group_scenario(scenario):
ids_of_interest = list(set(scenario.objects_of_interest))
# Only gather scenario with a pair of interacting vehicles
if len(ids_of_interest) != 2:
return None
traj, mask_traj, traj_type, id_to_idx = group_tracks(scenario.tracks)
assert (traj[..., :2][mask_traj] != 0).any(-1).all()
to_predict = np.zeros(traj.shape[0], dtype=bool)
for idx in scenario.tracks_to_predict:
to_predict[idx.track_index] = True
# # Set ego as the first agent in the list of trajectories
# index_ego = scenario.sdc_track_index
# if index_ego != 0:
# for key, value in id_to_idx.items():
# if value == 0:
# id_0 = key
# traj[[0, index_ego]] = traj[[index_ego, 0]]
# mask_traj[[0, index_ego]] = mask_traj[[index_ego, 0]]
# traj_type[[0, index_ego]] = traj_type[[index_ego, 0]]
# to_predict[[0, index_ego]] = to_predict[[index_ego, 0]]
# id_to_idx[id_0] = index_ego
# id_to_idx[scenario.sdc_track_index] = 0
# Set the agents of interest as the first agents in the list of trajectories
for key, value in id_to_idx.items():
if value == 0:
id_0 = key
elif value == 1:
id_1 = key
indices_of_interest = sorted(
[id_to_idx[ids_of_interest[0]], id_to_idx[ids_of_interest[1]]]
traj[[0, indices_of_interest[0]]] = traj[
mask_traj[[0, indices_of_interest[0]]] = mask_traj[
traj_type[[0, indices_of_interest[0]]] = traj_type[
to_predict[[0, indices_of_interest[0]]] = to_predict[
traj[[1, indices_of_interest[1]]] = traj[[indices_of_interest[1], 1]]
mask_traj[[1, indices_of_interest[1]]] = mask_traj[[indices_of_interest[1], 1]]
traj_type[[1, indices_of_interest[1]]] = traj_type[[indices_of_interest[1], 1]]
to_predict[[1, indices_of_interest[1]]] = to_predict[[indices_of_interest[1], 1]]
id_to_idx[id_0] = id_to_idx[ids_of_interest[0]]
id_to_idx[ids_of_interest[0]] = 0
id_to_idx[id_1] = id_to_idx[ids_of_interest[1]]
id_to_idx[ids_of_interest[1]] = 1
assert (traj[..., :2][mask_traj] != 0).any(-1).all()
# ego_current_state = scenario.tracks[scenario.sdc_track_index].states[scenario.current_time_index]
# angle = ego_current_state.heading
traj = fill_gaps(traj, mask_traj)
pos = traj[0, scenario.current_time_index, :2]
angle = traj[0, scenario.current_time_index, 2]
# mask_agent_of_interest = np.zeros((traj.shape[0]), dtype=bool)
# idx_of_interest = [id_to_idx[id] for id in scenario.objects_of_interest]
# mask_agent_of_interest[idx_of_interest] = True
traj, mask_traj, traj_type, to_predict, id_to_idx = filter_tracks(
assert (traj[..., :2][mask_traj] != 0).any(-1).all()
if traj.shape[0] > 100:
map, mask_map, map_type, map_id_to_idx = group_lanes(
scenario.map_features, pos, max_lane_len=50, max_lane_distance=50
lane_states = group_light_signals(
scenario.dynamic_map_states, map_id_to_idx, map.shape[0]
traj, map = normalize_all(traj, map, pos, -angle)
assert (
traj[0, scenario.current_time_index + 1 :, :2][
mask_traj[0, scenario.current_time_index + 1 :]
!= 0
assert (
traj[0, : scenario.current_time_index, :2][
mask_traj[0, : scenario.current_time_index]
!= 0
assert (traj[1:, :, :2][mask_traj[1:, :]] != 0).any(-1).all()
len_pred = traj.shape[1] - scenario.current_time_index - 1
traj = traj.transpose((1, 0, 2))
mask_traj = mask_traj.transpose((1, 0))
map = map.transpose((1, 0, 2))
mask_map = mask_map.transpose((1, 0))
assert (
traj[scenario.current_time_index + 1 :, 0, :2][
mask_traj[scenario.current_time_index + 1 :, 0]
!= 0
assert (
traj[: scenario.current_time_index, 0, :2][
mask_traj[: scenario.current_time_index, 0]
!= 0
assert (traj[:, 1:, :2][mask_traj[:, 1:]] != 0).any(-1).all()
# Mask futures for trajectories that are not to be predicted
traj = traj * mask_traj[:, :, None]
# to_predict[0] = True
# to_predict[1] = True
# mask_traj[scenario.current_time_index+1:, np.logical_not(to_predict)] = 0
mask_to_predict = mask_traj.copy()
mask_to_predict[:, np.logical_not(to_predict)] = False
assert (
traj[scenario.current_time_index + 1 :, 0, :2][
mask_to_predict[scenario.current_time_index + 1 :, 0]
!= 0
assert (
traj[: scenario.current_time_index, 0, :2][
mask_to_predict[: scenario.current_time_index, 0]
!= 0
assert (traj[:, 1:, :2][mask_to_predict[:, 1:]] != 0).any(-1).all()
return {
"traj": traj,
"mask_traj": mask_traj,
"mask_to_predict": mask_to_predict,
"lanes": map,
"lane_states": lane_states,
"mask_lanes": mask_map,
"len_pred": len_pred,
"mean_pos": pos,
def preprocess_scenario(data, output_dir):
scenario = scenario_pb2.Scenario()
scenario_id = scenario.scenario_id
scenario = group_scenario(scenario)
if scenario is not None:
with open(os.path.join(output_dir, scenario_id), "wb") as handle:
pickle.dump(scenario, handle)
def preprocess_scenarios(scenario_dir, output_dir, debug_size=None, num_parallel=8):
"""Preprocesses waymo motion data in scenario file format.
scenario_dir: Directory containing scenario files.
output_dir: Directory in which to output preprocessed samples
debug_size: If provided, limit to this number of output samples.
This is the _max_ number of samples, but fewer may result.
num_parallel: Number of processes to run in parallel.
Recommend to set this to number of cores - 1.
assert os.path.exists(scenario_dir)
filenames = os.listdir(scenario_dir)
print(f"Saving files in {output_dir}")
filepaths = [os.path.join(scenario_dir, f) for f in filenames]
dataset = tf.data.TFRecordDataset(filepaths)
os.makedirs(output_dir, exist_ok=True)
pool = ProcessPoolExecutor(num_parallel)
futures = []
for i, data in enumerate(tqdm(dataset)):
future = pool.submit(preprocess_scenario, data=data, output_dir=output_dir)
# future = preprocess_scenario(data=data, output_dir=output_dir)
if debug_size is not None and i >= debug_size:
if __name__ == "__main__":
The way this works is it provides a command line interface to the function
where you just pass whatever arguments the function takes to the script.
You can get a help message with:
$ python scripts/interaction_utils/generate_dataset_waymo.py -h
An example you might call with:
$ python scripts/interaction_utils/generate_dataset_waymo.py \
/path/to/scenarios/training/ /path/to/output/training --debug_size=1000 --num_parallel=48