416 lines
16 KiB
Python
416 lines
16 KiB
Python
|
|
"""Handles /live/clip/* OSC addresses."""
|
||
|
|
import logging
|
||
|
|
from typing import Any, Optional
|
||
|
|
from .handler import AbletonOSCHandler
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
CLIP_PROPS_RW = [
|
||
|
|
"name", "color", "color_index",
|
||
|
|
"muted", "gain",
|
||
|
|
"pitch_coarse", "pitch_fine", "velocity_amount",
|
||
|
|
"looping", "loop_start", "loop_end",
|
||
|
|
"start_marker", "end_marker",
|
||
|
|
"warping", "warp_mode",
|
||
|
|
"launch_mode", "launch_quantization",
|
||
|
|
"ram_mode",
|
||
|
|
]
|
||
|
|
|
||
|
|
CLIP_PROPS_RO = [
|
||
|
|
"is_playing", "is_recording", "is_midi_clip", "is_audio_clip",
|
||
|
|
"is_overdubbing", "is_triggered", "will_record_on_start",
|
||
|
|
"has_groove",
|
||
|
|
"length", "end_time", "start_time",
|
||
|
|
"playing_position", "sample_length",
|
||
|
|
"gain_display_string",
|
||
|
|
"file_path",
|
||
|
|
]
|
||
|
|
|
||
|
|
|
||
|
|
class ClipHandler(AbletonOSCHandler):
|
||
|
|
def init_api(self) -> None:
|
||
|
|
self.clear_listeners()
|
||
|
|
|
||
|
|
for prop in CLIP_PROPS_RW:
|
||
|
|
self._add(f"/live/clip/get/{prop}", self._make_getter(prop))
|
||
|
|
self._add(f"/live/clip/set/{prop}", self._make_setter(prop))
|
||
|
|
self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop))
|
||
|
|
self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop))
|
||
|
|
|
||
|
|
for prop in CLIP_PROPS_RO:
|
||
|
|
self._add(f"/live/clip/get/{prop}", self._make_getter(prop))
|
||
|
|
self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop))
|
||
|
|
self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop))
|
||
|
|
|
||
|
|
# Playback
|
||
|
|
self._add("/live/clip/fire", self._fire)
|
||
|
|
self._add("/live/clip/stop", self._stop)
|
||
|
|
self._add("/live/clip/duplicate_loop", self._duplicate_loop)
|
||
|
|
self._add("/live/clip/quantize", self._quantize)
|
||
|
|
|
||
|
|
# MIDI notes
|
||
|
|
self._add("/live/clip/get/notes", self._get_notes)
|
||
|
|
self._add("/live/clip/add/notes", self._add_notes)
|
||
|
|
self._add("/live/clip/remove/notes", self._remove_notes)
|
||
|
|
self._add("/live/clip/get/notes_extended", self._get_notes_extended)
|
||
|
|
self._add("/live/clip/apply_note_modifications", self._apply_note_modifications)
|
||
|
|
|
||
|
|
# Automation envelopes
|
||
|
|
self._add("/live/clip/get/automation_envelope", self._get_automation_envelope)
|
||
|
|
self._add("/live/clip/create_automation_envelope", self._create_automation_envelope)
|
||
|
|
self._add("/live/clip/clear_envelope", self._clear_envelope)
|
||
|
|
self._add("/live/clip/clear_all_envelopes", self._clear_all_envelopes)
|
||
|
|
|
||
|
|
# Warp markers
|
||
|
|
self._add("/live/clip/get/warp_markers", self._get_warp_markers)
|
||
|
|
|
||
|
|
# Groove
|
||
|
|
self._add("/live/clip/get/groove", self._get_groove)
|
||
|
|
self._add("/live/clip/set/groove", self._set_groove)
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _get_clip(self, params: tuple) -> Optional[Any]:
|
||
|
|
if len(params) < 2:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
track_idx = int(params[0])
|
||
|
|
clip_idx = int(params[1])
|
||
|
|
tracks = list(self.song.tracks)
|
||
|
|
if 0 <= track_idx < len(tracks):
|
||
|
|
slots = list(tracks[track_idx].clip_slots)
|
||
|
|
if 0 <= clip_idx < len(slots) and slots[clip_idx].has_clip:
|
||
|
|
return slots[clip_idx].clip
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get_clip(%s): %s", params, e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _clip_key(self, params: tuple) -> str:
|
||
|
|
return f"clip.{params[0]}.{params[1]}"
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _make_getter(self, prop: str):
|
||
|
|
def handler(params: tuple) -> Optional[tuple]:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None:
|
||
|
|
return None
|
||
|
|
val = self._get_prop(clip, prop)
|
||
|
|
if val is None:
|
||
|
|
return None
|
||
|
|
prefix = (int(params[0]), int(params[1]))
|
||
|
|
if isinstance(val, (list, tuple)):
|
||
|
|
return prefix + tuple(val)
|
||
|
|
return prefix + (val,)
|
||
|
|
return handler
|
||
|
|
|
||
|
|
def _make_setter(self, prop: str):
|
||
|
|
def handler(params: tuple) -> None:
|
||
|
|
if len(params) < 3:
|
||
|
|
return None
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
self._set_prop(clip, prop, params[2])
|
||
|
|
return None
|
||
|
|
return handler
|
||
|
|
|
||
|
|
def _make_start_listen(self, prop: str):
|
||
|
|
def handler(params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
key = f"{self._clip_key(params)}.{prop}"
|
||
|
|
prefix = (int(params[0]), int(params[1]))
|
||
|
|
self._register_listener(
|
||
|
|
key, clip, prop, f"/live/clip/get/{prop}", prefix
|
||
|
|
)
|
||
|
|
return handler
|
||
|
|
|
||
|
|
def _make_stop_listen(self, prop: str):
|
||
|
|
def handler(params: tuple) -> None:
|
||
|
|
if len(params) >= 2:
|
||
|
|
self._remove_listener(f"{self._clip_key(params)}.{prop}")
|
||
|
|
return handler
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Playback methods
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _fire(self, params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
try:
|
||
|
|
clip.fire()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clip.fire: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _stop(self, params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
try:
|
||
|
|
clip.stop()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clip.stop: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _duplicate_loop(self, params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
try:
|
||
|
|
clip.duplicate_loop()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clip.duplicate_loop: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _quantize(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx, quantization, strength"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
try:
|
||
|
|
quant = int(params[2]) if len(params) > 2 else 4
|
||
|
|
strength = float(params[3]) if len(params) > 3 else 1.0
|
||
|
|
clip.quantize(quant, strength)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clip.quantize: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# MIDI notes
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _get_notes(self, params: tuple) -> Optional[tuple]:
|
||
|
|
"""params: track_idx, clip_idx [, pitch, pitch_span, time, time_span]"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_midi_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
if len(params) >= 6:
|
||
|
|
pitch = int(params[2])
|
||
|
|
pitch_span = int(params[3])
|
||
|
|
time = float(params[4])
|
||
|
|
time_span = float(params[5])
|
||
|
|
notes = clip.get_notes(time, pitch, time_span, pitch_span)
|
||
|
|
else:
|
||
|
|
notes = clip.get_notes(0, 0, clip.length, 128)
|
||
|
|
result = []
|
||
|
|
for note in notes:
|
||
|
|
result += [note[0], note[1], note[2], note[3], int(note[4])]
|
||
|
|
return (int(params[0]), int(params[1])) + tuple(result)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get_notes: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _get_notes_extended(self, params: tuple) -> Optional[tuple]:
|
||
|
|
"""Returns notes using the extended API (with IDs)."""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_midi_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
if len(params) >= 6:
|
||
|
|
notes = clip.get_notes_extended(
|
||
|
|
int(params[2]), int(params[3]),
|
||
|
|
float(params[4]), float(params[5])
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
notes = clip.get_notes_extended(0, 128, 0, clip.length)
|
||
|
|
result = []
|
||
|
|
for note in notes:
|
||
|
|
result += [
|
||
|
|
note.pitch, float(note.start_time), float(note.duration),
|
||
|
|
note.velocity, int(note.mute), note.note_id,
|
||
|
|
float(note.release_velocity), float(note.probability)
|
||
|
|
]
|
||
|
|
return (int(params[0]), int(params[1])) + tuple(result)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get_notes_extended: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _add_notes(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx, pitch, start_time, duration, velocity, mute, [...]"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_midi_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
note_data = params[2:]
|
||
|
|
if len(note_data) % 5 != 0:
|
||
|
|
logger.warning("add_notes: expected multiple of 5 args after indices")
|
||
|
|
return None
|
||
|
|
notes = []
|
||
|
|
for i in range(0, len(note_data), 5):
|
||
|
|
notes.append((
|
||
|
|
int(note_data[i]), # pitch
|
||
|
|
float(note_data[i+1]), # start_time
|
||
|
|
float(note_data[i+2]), # duration
|
||
|
|
int(note_data[i+3]), # velocity
|
||
|
|
bool(note_data[i+4]), # mute
|
||
|
|
))
|
||
|
|
clip.set_notes(tuple(notes))
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("add_notes: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _remove_notes(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx [, time, time_span, pitch, pitch_span]"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_midi_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
if len(params) >= 6:
|
||
|
|
time = float(params[2])
|
||
|
|
time_span = float(params[3])
|
||
|
|
pitch = int(params[4])
|
||
|
|
pitch_span = int(params[5])
|
||
|
|
else:
|
||
|
|
time, time_span, pitch, pitch_span = 0, clip.length, 0, 128
|
||
|
|
clip.remove_notes(time, pitch, time_span, pitch_span)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("remove_notes: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _apply_note_modifications(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx, note_id, pitch, start_time, duration, velocity, mute, ..."""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_midi_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
note_data = params[2:]
|
||
|
|
# 8 values per note: note_id, pitch, start_time, duration, velocity, mute,
|
||
|
|
# release_velocity, probability
|
||
|
|
if len(note_data) % 8 != 0:
|
||
|
|
logger.warning("apply_note_modifications: expected multiple of 8 args")
|
||
|
|
return None
|
||
|
|
notes = []
|
||
|
|
for i in range(0, len(note_data), 8):
|
||
|
|
note = clip.get_notes_extended(0, 128, 0, clip.length)
|
||
|
|
# Build modification by note_id
|
||
|
|
notes.append({
|
||
|
|
"note_id": int(note_data[i]),
|
||
|
|
"pitch": int(note_data[i+1]),
|
||
|
|
"start_time": float(note_data[i+2]),
|
||
|
|
"duration": float(note_data[i+3]),
|
||
|
|
"velocity": int(note_data[i+4]),
|
||
|
|
"mute": bool(note_data[i+5]),
|
||
|
|
"release_velocity": float(note_data[i+6]),
|
||
|
|
"probability": float(note_data[i+7]),
|
||
|
|
})
|
||
|
|
clip.apply_note_modifications(notes)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("apply_note_modifications: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Automation envelopes
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _get_automation_envelope(self, params: tuple) -> Optional[tuple]:
|
||
|
|
"""params: track_idx, clip_idx, device_idx, param_idx"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or len(params) < 4:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
track = list(self.song.tracks)[int(params[0])]
|
||
|
|
device = list(track.devices)[int(params[2])]
|
||
|
|
param = list(device.parameters)[int(params[3])]
|
||
|
|
env = clip.automation_envelope(param)
|
||
|
|
if env is None:
|
||
|
|
return None
|
||
|
|
points = []
|
||
|
|
for step in range(env.count):
|
||
|
|
time = step * (clip.length / env.count)
|
||
|
|
points += [float(time), float(env.value_at_time(time))]
|
||
|
|
return (int(params[0]), int(params[1]), int(params[2]), int(params[3])) + tuple(points)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get_automation_envelope: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _create_automation_envelope(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx, device_idx, param_idx"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or len(params) < 4:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
track = list(self.song.tracks)[int(params[0])]
|
||
|
|
device = list(track.devices)[int(params[2])]
|
||
|
|
param = list(device.parameters)[int(params[3])]
|
||
|
|
clip.create_automation_envelope(param)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("create_automation_envelope: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _clear_envelope(self, params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or len(params) < 4:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
track = list(self.song.tracks)[int(params[0])]
|
||
|
|
device = list(track.devices)[int(params[2])]
|
||
|
|
param = list(device.parameters)[int(params[3])]
|
||
|
|
env = clip.automation_envelope(param)
|
||
|
|
if env:
|
||
|
|
env.clear_all_events()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clear_envelope: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _clear_all_envelopes(self, params: tuple) -> None:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip:
|
||
|
|
try:
|
||
|
|
clip.clear_all_envelopes()
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("clear_all_envelopes: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Warp markers
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _get_warp_markers(self, params: tuple) -> Optional[tuple]:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or not clip.is_audio_clip:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
markers = list(clip.warp_markers)
|
||
|
|
result = []
|
||
|
|
for m in markers:
|
||
|
|
result += [float(m.beat_time), float(m.sample_time)]
|
||
|
|
return (int(params[0]), int(params[1])) + tuple(result)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get_warp_markers: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
# Groove
|
||
|
|
# ------------------------------------------------------------------
|
||
|
|
|
||
|
|
def _get_groove(self, params: tuple) -> Optional[tuple]:
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None:
|
||
|
|
return None
|
||
|
|
try:
|
||
|
|
groove = clip.groove
|
||
|
|
if groove is None:
|
||
|
|
return (int(params[0]), int(params[1]), "")
|
||
|
|
return (int(params[0]), int(params[1]), groove.name)
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("get groove: %s", e)
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _set_groove(self, params: tuple) -> None:
|
||
|
|
"""params: track_idx, clip_idx, groove_name"""
|
||
|
|
clip = self._get_clip(params)
|
||
|
|
if clip is None or len(params) < 3:
|
||
|
|
return None
|
||
|
|
groove_name = str(params[2])
|
||
|
|
try:
|
||
|
|
if groove_name == "":
|
||
|
|
clip.groove = None
|
||
|
|
else:
|
||
|
|
for g in self.song.groove_pool.grooves:
|
||
|
|
if g.name == groove_name:
|
||
|
|
clip.groove = g
|
||
|
|
break
|
||
|
|
except Exception as e:
|
||
|
|
logger.warning("set groove: %s", e)
|
||
|
|
return None
|