b7cec9d24b
Python package naming convention uses snake_case. Update the import in the root __init__.py and the setuptools include pattern in pyproject.toml. Internal relative imports within the package are unaffected. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
575 lines
22 KiB
Python
575 lines
22 KiB
Python
"""Handles /live/track/* OSC addresses."""
|
|
import logging
|
|
from typing import Any, List, Optional, Tuple
|
|
from .handler import AbletonOSCHandler
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
TRACK_PROPS_RW = [
|
|
"arm", "mute", "solo", "name", "color", "color_index",
|
|
"fold_state", "current_monitoring_state",
|
|
]
|
|
|
|
TRACK_PROPS_RO = [
|
|
"can_be_armed", "has_audio_input", "has_audio_output",
|
|
"has_midi_input", "has_midi_output",
|
|
"is_foldable", "is_grouped", "is_visible",
|
|
"fired_slot_index", "playing_slot_index",
|
|
"output_meter_level", "output_meter_left", "output_meter_right",
|
|
"output_meter_peak_left", "output_meter_peak_right",
|
|
]
|
|
|
|
|
|
class TrackHandler(AbletonOSCHandler):
|
|
def init_api(self) -> None:
|
|
self.clear_listeners()
|
|
|
|
for prop in TRACK_PROPS_RW:
|
|
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
|
|
self._add(f"/live/track/set/{prop}", self._make_setter(prop))
|
|
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
|
|
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
|
|
|
|
for prop in TRACK_PROPS_RO:
|
|
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
|
|
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
|
|
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
|
|
|
|
# Mixer device (volume, panning, sends)
|
|
self._add("/live/track/get/volume", self._get_volume)
|
|
self._add("/live/track/set/volume", self._set_volume)
|
|
self._add("/live/track/start_listen/volume", self._start_listen_volume)
|
|
self._add("/live/track/stop_listen/volume", self._stop_listen_volume)
|
|
|
|
self._add("/live/track/get/panning", self._get_panning)
|
|
self._add("/live/track/set/panning", self._set_panning)
|
|
self._add("/live/track/start_listen/panning", self._start_listen_panning)
|
|
self._add("/live/track/stop_listen/panning", self._stop_listen_panning)
|
|
|
|
self._add("/live/track/get/send", self._get_send)
|
|
self._add("/live/track/set/send", self._set_send)
|
|
self._add("/live/track/start_listen/send", self._start_listen_send)
|
|
self._add("/live/track/stop_listen/send", self._stop_listen_send)
|
|
|
|
# Routing
|
|
self._add("/live/track/get/available_input_routing_types",
|
|
self._get_available_input_routing_types)
|
|
self._add("/live/track/get/available_input_routing_channels",
|
|
self._get_available_input_routing_channels)
|
|
self._add("/live/track/get/input_routing_type", self._get_input_routing_type)
|
|
self._add("/live/track/set/input_routing_type", self._set_input_routing_type)
|
|
self._add("/live/track/get/input_routing_channel", self._get_input_routing_channel)
|
|
self._add("/live/track/set/input_routing_channel", self._set_input_routing_channel)
|
|
self._add("/live/track/get/available_output_routing_types",
|
|
self._get_available_output_routing_types)
|
|
self._add("/live/track/get/available_output_routing_channels",
|
|
self._get_available_output_routing_channels)
|
|
self._add("/live/track/get/output_routing_type", self._get_output_routing_type)
|
|
self._add("/live/track/set/output_routing_type", self._set_output_routing_type)
|
|
self._add("/live/track/get/output_routing_channel", self._get_output_routing_channel)
|
|
self._add("/live/track/set/output_routing_channel", self._set_output_routing_channel)
|
|
|
|
# Aggregate
|
|
self._add("/live/track/get/clips/name", self._get_clips_name)
|
|
self._add("/live/track/get/clips/length", self._get_clips_length)
|
|
self._add("/live/track/get/clips/color", self._get_clips_color)
|
|
self._add("/live/track/get/clips/is_playing", self._get_clips_is_playing)
|
|
self._add("/live/track/get/arrangement_clips/name", self._get_arr_clips_name)
|
|
self._add("/live/track/get/arrangement_clips/length", self._get_arr_clips_length)
|
|
self._add("/live/track/get/arrangement_clips/start_time", self._get_arr_clips_start)
|
|
self._add("/live/track/get/num_devices", self._get_num_devices)
|
|
self._add("/live/track/get/devices/name", self._get_devices_name)
|
|
self._add("/live/track/get/devices/type", self._get_devices_type)
|
|
self._add("/live/track/get/devices/class_name", self._get_devices_class_name)
|
|
self._add("/live/track/get/devices/can_have_chains", self._get_devices_can_have_chains)
|
|
|
|
# Methods
|
|
self._add("/live/track/stop_all_clips", self._stop_all_clips)
|
|
self._add("/live/track/delete_device", self._delete_device)
|
|
self._add("/live/track/duplicate_device", self._duplicate_device)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Track resolution (supports wildcard *)
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_tracks(self, params: tuple) -> List[Tuple[int, Any]]:
|
|
tracks = list(self.song.tracks)
|
|
if not params or params[0] == "*":
|
|
return list(enumerate(tracks))
|
|
idx = int(params[0])
|
|
if 0 <= idx < len(tracks):
|
|
return [(idx, tracks[idx])]
|
|
return []
|
|
|
|
def _get_track(self, params: tuple) -> Optional[Any]:
|
|
tracks = list(self.song.tracks)
|
|
if not params:
|
|
return None
|
|
idx = int(params[0])
|
|
if 0 <= idx < len(tracks):
|
|
return tracks[idx]
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Property factories
|
|
# ------------------------------------------------------------------
|
|
|
|
def _make_getter(self, prop: str):
|
|
def handler(params: tuple) -> Optional[tuple]:
|
|
result = []
|
|
for idx, track in self._get_tracks(params):
|
|
val = self._get_prop(track, prop)
|
|
if val is not None:
|
|
result += [idx, val]
|
|
return tuple(result) if result else None
|
|
return handler
|
|
|
|
def _make_setter(self, prop: str):
|
|
def handler(params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
if track:
|
|
self._set_prop(track, prop, params[1])
|
|
return None
|
|
return handler
|
|
|
|
def _make_start_listen(self, prop: str):
|
|
def handler(params: tuple) -> None:
|
|
for idx, track in self._get_tracks(params):
|
|
key = f"track.{idx}.{prop}"
|
|
self._register_listener(
|
|
key, track, prop,
|
|
f"/live/track/get/{prop}", (idx,)
|
|
)
|
|
return handler
|
|
|
|
def _make_stop_listen(self, prop: str):
|
|
def handler(params: tuple) -> None:
|
|
for idx, track in self._get_tracks(params):
|
|
self._remove_listener(f"track.{idx}.{prop}")
|
|
return handler
|
|
|
|
# ------------------------------------------------------------------
|
|
# Mixer device
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_volume(self, params: tuple) -> Optional[tuple]:
|
|
result = []
|
|
for idx, track in self._get_tracks(params):
|
|
try:
|
|
val = track.mixer_device.volume.value
|
|
result += [idx, val]
|
|
except Exception:
|
|
pass
|
|
return tuple(result) if result else None
|
|
|
|
def _set_volume(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
track.mixer_device.volume.value = float(params[1])
|
|
except Exception as e:
|
|
logger.warning("set volume: %s", e)
|
|
return None
|
|
|
|
def _start_listen_volume(self, params: tuple) -> None:
|
|
for idx, track in self._get_tracks(params):
|
|
key = f"track.{idx}.volume"
|
|
self._remove_listener(key)
|
|
param = track.mixer_device.volume
|
|
|
|
def make_cb(i, p):
|
|
def cb():
|
|
self._send("/live/track/get/volume", (i, p.value))
|
|
return cb
|
|
|
|
cb = make_cb(idx, param)
|
|
try:
|
|
param.add_value_listener(cb)
|
|
self._listener_store[key] = (param.remove_value_listener, cb)
|
|
cb()
|
|
except Exception as e:
|
|
logger.warning("volume listener: %s", e)
|
|
|
|
def _stop_listen_volume(self, params: tuple) -> None:
|
|
for idx, _ in self._get_tracks(params):
|
|
self._remove_listener(f"track.{idx}.volume")
|
|
|
|
def _get_panning(self, params: tuple) -> Optional[tuple]:
|
|
result = []
|
|
for idx, track in self._get_tracks(params):
|
|
try:
|
|
val = track.mixer_device.panning.value
|
|
result += [idx, val]
|
|
except Exception:
|
|
pass
|
|
return tuple(result) if result else None
|
|
|
|
def _set_panning(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
track.mixer_device.panning.value = float(params[1])
|
|
except Exception as e:
|
|
logger.warning("set panning: %s", e)
|
|
return None
|
|
|
|
def _start_listen_panning(self, params: tuple) -> None:
|
|
for idx, track in self._get_tracks(params):
|
|
key = f"track.{idx}.panning"
|
|
self._remove_listener(key)
|
|
param = track.mixer_device.panning
|
|
|
|
def make_cb(i, p):
|
|
def cb():
|
|
self._send("/live/track/get/panning", (i, p.value))
|
|
return cb
|
|
|
|
cb = make_cb(idx, param)
|
|
try:
|
|
param.add_value_listener(cb)
|
|
self._listener_store[key] = (param.remove_value_listener, cb)
|
|
cb()
|
|
except Exception as e:
|
|
logger.warning("panning listener: %s", e)
|
|
|
|
def _stop_listen_panning(self, params: tuple) -> None:
|
|
for idx, _ in self._get_tracks(params):
|
|
self._remove_listener(f"track.{idx}.panning")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Sends
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_send(self, params: tuple) -> Optional[tuple]:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
send_idx = int(params[1])
|
|
if track:
|
|
try:
|
|
sends = list(track.mixer_device.sends)
|
|
if 0 <= send_idx < len(sends):
|
|
return (int(params[0]), send_idx, sends[send_idx].value)
|
|
except Exception as e:
|
|
logger.warning("get send: %s", e)
|
|
return None
|
|
|
|
def _set_send(self, params: tuple) -> None:
|
|
if len(params) < 3:
|
|
return None
|
|
track = self._get_track(params)
|
|
send_idx = int(params[1])
|
|
value = float(params[2])
|
|
if track:
|
|
try:
|
|
sends = list(track.mixer_device.sends)
|
|
if 0 <= send_idx < len(sends):
|
|
sends[send_idx].value = value
|
|
except Exception as e:
|
|
logger.warning("set send: %s", e)
|
|
return None
|
|
|
|
def _start_listen_send(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return
|
|
track = self._get_track(params)
|
|
track_idx = int(params[0])
|
|
send_idx = int(params[1])
|
|
if track:
|
|
key = f"track.{track_idx}.send.{send_idx}"
|
|
self._remove_listener(key)
|
|
try:
|
|
sends = list(track.mixer_device.sends)
|
|
if 0 <= send_idx < len(sends):
|
|
param = sends[send_idx]
|
|
|
|
def make_cb(ti, si, p):
|
|
def cb():
|
|
self._send("/live/track/get/send", (ti, si, p.value))
|
|
return cb
|
|
|
|
cb = make_cb(track_idx, send_idx, param)
|
|
param.add_value_listener(cb)
|
|
self._listener_store[key] = (param.remove_value_listener, cb)
|
|
cb()
|
|
except Exception as e:
|
|
logger.warning("send listener: %s", e)
|
|
|
|
def _stop_listen_send(self, params: tuple) -> None:
|
|
if len(params) >= 2:
|
|
self._remove_listener(f"track.{int(params[0])}.send.{int(params[1])}")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Routing
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_available_input_routing_types(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
types = [rt.display_name for rt in track.available_input_routing_types]
|
|
return tuple(types)
|
|
except Exception as e:
|
|
logger.warning("input routing types: %s", e)
|
|
return None
|
|
|
|
def _get_available_input_routing_channels(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
channels = [rc.display_name for rc in track.available_input_routing_channels]
|
|
return tuple(channels)
|
|
except Exception as e:
|
|
logger.warning("input routing channels: %s", e)
|
|
return None
|
|
|
|
def _get_input_routing_type(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return (track.input_routing_type.display_name,)
|
|
except Exception as e:
|
|
logger.warning("get input_routing_type: %s", e)
|
|
return None
|
|
|
|
def _set_input_routing_type(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
name = str(params[1])
|
|
if track:
|
|
try:
|
|
for rt in track.available_input_routing_types:
|
|
if rt.display_name == name:
|
|
track.input_routing_type = rt
|
|
break
|
|
except Exception as e:
|
|
logger.warning("set input_routing_type: %s", e)
|
|
return None
|
|
|
|
def _get_input_routing_channel(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return (track.input_routing_channel.display_name,)
|
|
except Exception as e:
|
|
logger.warning("get input_routing_channel: %s", e)
|
|
return None
|
|
|
|
def _set_input_routing_channel(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
name = str(params[1])
|
|
if track:
|
|
try:
|
|
for rc in track.available_input_routing_channels:
|
|
if rc.display_name == name:
|
|
track.input_routing_channel = rc
|
|
break
|
|
except Exception as e:
|
|
logger.warning("set input_routing_channel: %s", e)
|
|
return None
|
|
|
|
def _get_available_output_routing_types(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
types = [rt.display_name for rt in track.available_output_routing_types]
|
|
return tuple(types)
|
|
except Exception as e:
|
|
logger.warning("output routing types: %s", e)
|
|
return None
|
|
|
|
def _get_available_output_routing_channels(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
channels = [rc.display_name for rc in track.available_output_routing_channels]
|
|
return tuple(channels)
|
|
except Exception as e:
|
|
logger.warning("output routing channels: %s", e)
|
|
return None
|
|
|
|
def _get_output_routing_type(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return (track.output_routing_type.display_name,)
|
|
except Exception as e:
|
|
logger.warning("get output_routing_type: %s", e)
|
|
return None
|
|
|
|
def _set_output_routing_type(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
name = str(params[1])
|
|
if track:
|
|
try:
|
|
for rt in track.available_output_routing_types:
|
|
if rt.display_name == name:
|
|
track.output_routing_type = rt
|
|
break
|
|
except Exception as e:
|
|
logger.warning("set output_routing_type: %s", e)
|
|
return None
|
|
|
|
def _get_output_routing_channel(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return (track.output_routing_channel.display_name,)
|
|
except Exception as e:
|
|
logger.warning("get output_routing_channel: %s", e)
|
|
return None
|
|
|
|
def _set_output_routing_channel(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
name = str(params[1])
|
|
if track:
|
|
try:
|
|
for rc in track.available_output_routing_channels:
|
|
if rc.display_name == name:
|
|
track.output_routing_channel = rc
|
|
break
|
|
except Exception as e:
|
|
logger.warning("set output_routing_channel: %s", e)
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Aggregate clip info
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_session_clips(self, track):
|
|
return [slot.clip for slot in track.clip_slots if slot.has_clip]
|
|
|
|
def _get_clips_name(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(c.name for c in self._get_session_clips(track))
|
|
return None
|
|
|
|
def _get_clips_length(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(float(c.length) for c in self._get_session_clips(track))
|
|
return None
|
|
|
|
def _get_clips_color(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(c.color for c in self._get_session_clips(track))
|
|
return None
|
|
|
|
def _get_clips_is_playing(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(bool(c.is_playing) for c in self._get_session_clips(track))
|
|
return None
|
|
|
|
def _get_arr_clips_name(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return tuple(c.name for c in track.arrangement_clips)
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
def _get_arr_clips_length(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return tuple(float(c.length) for c in track.arrangement_clips)
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
def _get_arr_clips_start(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
try:
|
|
return tuple(float(c.start_time) for c in track.arrangement_clips)
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Device info
|
|
# ------------------------------------------------------------------
|
|
|
|
def _get_num_devices(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return (len(track.devices),)
|
|
return None
|
|
|
|
def _get_devices_name(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(d.name for d in track.devices)
|
|
return None
|
|
|
|
def _get_devices_type(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(int(d.type) for d in track.devices)
|
|
return None
|
|
|
|
def _get_devices_class_name(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(d.class_name for d in track.devices)
|
|
return None
|
|
|
|
def _get_devices_can_have_chains(self, params: tuple) -> Optional[tuple]:
|
|
track = self._get_track(params)
|
|
if track:
|
|
return tuple(bool(getattr(d, "can_have_chains", False)) for d in track.devices)
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Methods
|
|
# ------------------------------------------------------------------
|
|
|
|
def _stop_all_clips(self, params: tuple) -> None:
|
|
for _, track in self._get_tracks(params):
|
|
try:
|
|
track.stop_all_clips()
|
|
except Exception as e:
|
|
logger.warning("stop_all_clips: %s", e)
|
|
return None
|
|
|
|
def _delete_device(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
device_idx = int(params[1])
|
|
if track:
|
|
try:
|
|
track.delete_device(device_idx)
|
|
except Exception as e:
|
|
logger.warning("delete_device: %s", e)
|
|
return None
|
|
|
|
def _duplicate_device(self, params: tuple) -> None:
|
|
if len(params) < 2:
|
|
return None
|
|
track = self._get_track(params)
|
|
device_idx = int(params[1])
|
|
if track:
|
|
try:
|
|
track.duplicate_device(device_idx)
|
|
except Exception as e:
|
|
logger.warning("duplicate_device: %s", e)
|
|
return None
|