|
|
|
import csv |
|
import functools |
|
import math |
|
import os |
|
|
|
from scipy.io.wavfile import read as wav_read |
|
|
|
from .categories import NodeCategories |
|
from .shared import hashed_as_strings |
|
from .dreamtypes import SharedTypes, FrameCounter |
|
|
|
|
|
def _linear_value_calc(x, x_start, x_end, y_start, y_end): |
|
if x <= x_start: |
|
return y_start |
|
if x >= x_end: |
|
return y_end |
|
dx = max(x_end - x_start, 0.0001) |
|
n = (x - x_start) / dx |
|
return (y_end - y_start) * n + y_start |
|
|
|
|
|
def _curve_result(f: float): |
|
return (f, int(round(f))) |
|
|
|
|
|
class DreamSineWave: |
|
NODE_NAME = "Sine Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}), |
|
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}), |
|
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase): |
|
x = frame_counter.current_time_in_seconds |
|
a = (max_value - min_value) * 0.5 |
|
c = phase |
|
b = 2 * math.pi / periodicity_seconds |
|
d = (max_value + min_value) / 2 |
|
y = a * math.sin(b * (x + c)) + d |
|
return _curve_result(y) |
|
|
|
|
|
class DreamSawWave: |
|
NODE_NAME = "Saw Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}), |
|
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}), |
|
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase): |
|
x = frame_counter.current_time_in_seconds |
|
x = ((x + periodicity_seconds * phase) % periodicity_seconds) / periodicity_seconds |
|
y = x * (max_value - min_value) + min_value |
|
return _curve_result(y) |
|
|
|
|
|
class DreamTriangleWave: |
|
NODE_NAME = "Triangle Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}), |
|
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"periodicity_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.01}), |
|
"phase": ("FLOAT", {"default": 0.0, "multiline": False, "min": -1, "max": 1}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, max_value, min_value, periodicity_seconds, phase): |
|
x = frame_counter.current_time_in_seconds |
|
x = ((x + periodicity_seconds * phase) % periodicity_seconds) / periodicity_seconds |
|
if x <= 0.5: |
|
x *= 2 |
|
y = x * (max_value - min_value) + min_value |
|
else: |
|
x = (x - 0.5) * 2 |
|
y = max_value - x * (max_value - min_value) |
|
return _curve_result(y) |
|
|
|
|
|
class WavData: |
|
def __init__(self, sampling_rate: float, single_channel_samples, fps: float): |
|
self._length_in_seconds = len(single_channel_samples) / sampling_rate |
|
self._num_buckets = round(self._length_in_seconds * fps * 3) |
|
self._bucket_size = len(single_channel_samples) / float(self._num_buckets) |
|
self._buckets = list() |
|
self._rate = sampling_rate |
|
self._max_bucket_value = 0 |
|
for i in range(self._num_buckets): |
|
start_index = round(i * self._bucket_size) |
|
end_index = round((i + 1) * self._bucket_size) - 1 |
|
samples = list(map(lambda n: abs(n), single_channel_samples[start_index:end_index])) |
|
bucket_total = sum(samples) |
|
self._buckets.append(bucket_total) |
|
self._max_bucket_value=max(bucket_total, self._max_bucket_value) |
|
|
|
for i in range(self._num_buckets): |
|
self._buckets[i] = float(self._buckets[i]) / self._max_bucket_value |
|
|
|
def value_at_time(self, second: float) -> float: |
|
if second < 0.0 or second > self._length_in_seconds: |
|
return 0.0 |
|
nsample = second * self._rate |
|
nbucket = min(max(0, round(nsample / self._bucket_size)), self._num_buckets - 1) |
|
return self._buckets[nbucket] |
|
|
|
|
|
@functools.lru_cache(4) |
|
def _wav_loader(filepath, fps): |
|
sampling_rate, samples = wav_read(filepath) |
|
single_channel = samples[:, 0] |
|
return WavData(sampling_rate, single_channel, fps) |
|
|
|
|
|
class DreamWavCurve: |
|
NODE_NAME = "WAV Curve" |
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
ICON = "∿" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"wav_path": ("STRING", {"default": "audio.wav"}), |
|
"scale": ("FLOAT", {"default": 1.0, "multiline": False}) |
|
}, |
|
} |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, wav_path, scale): |
|
if not os.path.isfile(wav_path): |
|
return (0.0, 0) |
|
data = _wav_loader(wav_path, frame_counter.frames_per_second) |
|
frame_counter.current_time_in_seconds |
|
v = data.value_at_time(frame_counter.current_time_in_seconds) |
|
return (v * scale, round(v * scale)) |
|
|
|
|
|
class DreamTriangleEvent: |
|
NODE_NAME = "Triangle Event Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}), |
|
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"width_seconds": ("FLOAT", {"default": 1.0, "multiline": False, "min": 0.1}), |
|
"center_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.0}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, max_value, min_value, width_seconds, center_seconds): |
|
x = frame_counter.current_time_in_seconds |
|
start = center_seconds - width_seconds * 0.5 |
|
end = center_seconds + width_seconds * 0.5 |
|
if start <= x <= center_seconds: |
|
y = _linear_value_calc(x, start, center_seconds, min_value, max_value) |
|
elif center_seconds < x <= end: |
|
y = _linear_value_calc(x, center_seconds, end, max_value, min_value) |
|
else: |
|
y = min_value |
|
return _curve_result(y) |
|
|
|
|
|
class DreamSmoothEvent: |
|
NODE_NAME = "Smooth Event Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"max_value": ("FLOAT", {"default": 1.0, "multiline": False}), |
|
"min_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"width_seconds": ("FLOAT", {"default": 1.0, "multiline": False, "min": 0.1}), |
|
"center_seconds": ("FLOAT", {"default": 10.0, "multiline": False, "min": 0.0}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, frame_counter: FrameCounter, max_value, min_value, width_seconds, center_seconds): |
|
x = frame_counter.current_time_in_seconds |
|
start = center_seconds - width_seconds * 0.5 |
|
end = center_seconds + width_seconds * 0.5 |
|
if start <= x <= center_seconds: |
|
y = _linear_value_calc(x, start, center_seconds, 0.0, 1.0) |
|
elif center_seconds < x <= end: |
|
y = _linear_value_calc(x, center_seconds, end, 1.0, 0.0) |
|
else: |
|
y = 0.0 |
|
if y < 0.5: |
|
y = ((y + y) * (y + y)) * 0.5 |
|
else: |
|
a = (y - 0.5) * 2 |
|
y = math.pow(a, 0.25) * 0.5 + 0.5 |
|
return _curve_result(y * (max_value - min_value) + min_value) |
|
|
|
|
|
class DreamBeatCurve: |
|
NODE_NAME = "Beat Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"bpm": ("FLOAT", {"default": 100.0, "multiline": False}), |
|
"time_offset": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"measure_length": ("INT", {"default": 4, "min": 1}), |
|
"low_value": ("FLOAT", {"default": 0.0}), |
|
"high_value": ("FLOAT", {"default": 1.0}), |
|
"invert": (["no", "yes"],), |
|
"power": ("FLOAT", {"default": 2.0, "min": 0.25, "max": 4}), |
|
"accent_1": ("INT", {"default": 1, "min": 1, "max": 24}), |
|
}, |
|
"optional": { |
|
"accent_2": ("INT", {"default": 3, "min": 1, "max": 24}), |
|
"accent_3": ("INT", {"default": 0}), |
|
"accent_4": ("INT", {"default": 0}), |
|
} |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def _get_value_for_accent(self, accent, measure_length, bpm, frame_counter: FrameCounter, frame_offset): |
|
current_frame = frame_counter.current_frame + frame_offset |
|
frames_per_minute = frame_counter.frames_per_second * 60.0 |
|
frames_per_beat = frames_per_minute / bpm |
|
frames_per_measure = frames_per_beat * measure_length |
|
frame = (current_frame % frames_per_measure) |
|
accent_start = (accent - 1) * frames_per_beat |
|
accent_end = accent * frames_per_beat |
|
if frame >= accent_start and frame < accent_end: |
|
return 1.0 - ((frame - accent_start) / frames_per_beat) |
|
return 0 |
|
|
|
def result(self, bpm, frame_counter: FrameCounter, measure_length, low_value, high_value, power, invert, |
|
time_offset, **accents): |
|
frame_offset = int(round(time_offset * frame_counter.frames_per_second)) |
|
accents_set = set(filter(lambda v: v >= 1 and v <= measure_length, |
|
map(lambda i: accents.get("accent_" + str(i), -1), range(30)))) |
|
v = 0.0 |
|
for a in accents_set: |
|
v += math.pow(self._get_value_for_accent(a, measure_length, bpm, frame_counter, frame_offset), power) |
|
if invert == "yes": |
|
v = 1.0 - v |
|
|
|
r = low_value + v * (high_value - low_value) |
|
return _curve_result(r) |
|
|
|
|
|
class DreamLinear: |
|
NODE_NAME = "Linear Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"initial_value": ("FLOAT", {"default": 0.0, "multiline": False}), |
|
"final_value": ("FLOAT", {"default": 100.0, "multiline": False}), |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def result(self, initial_value, final_value, frame_counter: FrameCounter): |
|
d = final_value - initial_value |
|
v = initial_value + frame_counter.progress * d |
|
return (v, int(round(v))) |
|
|
|
|
|
def _is_as_float(s: str): |
|
try: |
|
float(s) |
|
return True |
|
except ValueError: |
|
return False |
|
|
|
|
|
class DreamCSVGenerator: |
|
NODE_NAME = "CSV Generator" |
|
ICON = "⌗" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"value": ("FLOAT", {"forceInput": True, "default": 0.0}), |
|
"csvfile": ("STRING", {"default": "", "multiline": False}), |
|
"csv_dialect": (csv.list_dialects(),) |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = () |
|
RETURN_NAMES = () |
|
FUNCTION = "write" |
|
OUTPUT_NODE = True |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def write(self, csvfile, frame_counter: FrameCounter, value, csv_dialect): |
|
if frame_counter.is_first_frame and csvfile: |
|
with open(csvfile, 'w', newline='') as csvfile: |
|
csvwriter = csv.writer(csvfile, dialect=csv_dialect) |
|
csvwriter.writerow(['Frame', 'Value']) |
|
csvwriter.writerow([frame_counter.current_frame, str(value)]) |
|
else: |
|
with open(csvfile, 'a', newline='') as csvfile: |
|
csvwriter = csv.writer(csvfile, dialect=csv_dialect) |
|
csvwriter.writerow([frame_counter.current_frame, str(value)]) |
|
return () |
|
|
|
|
|
class DreamCSVCurve: |
|
NODE_NAME = "CSV Curve" |
|
|
|
@classmethod |
|
def INPUT_TYPES(cls): |
|
return { |
|
"required": SharedTypes.frame_counter | { |
|
"csvfile": ("STRING", {"default": "", "multiline": False}), |
|
"first_column_type": (["seconds", "frames"],), |
|
"interpolate": (["true", "false"],), |
|
"csv_dialect": (csv.list_dialects(),) |
|
}, |
|
} |
|
|
|
CATEGORY = NodeCategories.ANIMATION_CURVES |
|
RETURN_TYPES = ("FLOAT", "INT") |
|
RETURN_NAMES = ("FLOAT", "INT") |
|
FUNCTION = "result" |
|
|
|
@classmethod |
|
def IS_CHANGED(cls, *values): |
|
return hashed_as_strings(*values) |
|
|
|
def _row_yield(self, file, csv_dialect): |
|
prev_row = None |
|
for row in csv.reader(file, dialect=csv_dialect): |
|
if len(row) == 2 and _is_as_float(row[0]) and _is_as_float(row[1]): |
|
row = list(map(float, row)) |
|
yield (prev_row, row) |
|
prev_row = row |
|
if prev_row is not None: |
|
yield (prev_row, None) |
|
|
|
def result(self, csvfile, frame_counter: FrameCounter, first_column_type, interpolate, csv_dialect): |
|
interpolate = interpolate == "true" |
|
|
|
def _first_col_to_frame(v: float): |
|
if first_column_type == "frames": |
|
return round(v) |
|
else: |
|
return round(v * frame_counter.frames_per_second) |
|
|
|
with open(csvfile) as f: |
|
for (prev, current) in self._row_yield(f, csv_dialect): |
|
if prev is None and frame_counter.current_frame < _first_col_to_frame(current[0]): |
|
|
|
return (current[1], int(round(current[1]))) |
|
if current is None: |
|
|
|
return (prev[1], int(round(prev[1]))) |
|
if prev is not None and current is not None: |
|
frame1 = _first_col_to_frame(prev[0]) |
|
value1 = prev[1] |
|
frame2 = _first_col_to_frame(current[0]) |
|
value2 = current[1] |
|
if frame1 <= frame_counter.current_frame and interpolate and frame2 > frame_counter.current_frame: |
|
offset = (frame_counter.current_frame - frame1) / float(frame2 - frame1) |
|
v = value1 * (1.0 - offset) + value2 * offset |
|
return (v, int(round(v))) |
|
elif frame1 <= frame_counter.current_frame and frame2 > frame_counter.current_frame: |
|
return (value1, int(round(value1))) |
|
return (0.0, 0) |
|
|