Files
valknar b7cec9d24b Rename package directory AbletonOSC/ -> ableton_osc/
Python package naming convention uses snake_case. Update the import in
the root __init__.py and the setuptools include pattern in pyproject.toml.
Internal relative imports within the package are unaffected.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-01 13:46:34 +02:00

575 lines
22 KiB
Python

"""Handles /live/track/* OSC addresses."""
import logging
from typing import Any, List, Optional, Tuple
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
TRACK_PROPS_RW = [
"arm", "mute", "solo", "name", "color", "color_index",
"fold_state", "current_monitoring_state",
]
TRACK_PROPS_RO = [
"can_be_armed", "has_audio_input", "has_audio_output",
"has_midi_input", "has_midi_output",
"is_foldable", "is_grouped", "is_visible",
"fired_slot_index", "playing_slot_index",
"output_meter_level", "output_meter_left", "output_meter_right",
"output_meter_peak_left", "output_meter_peak_right",
]
class TrackHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in TRACK_PROPS_RW:
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
self._add(f"/live/track/set/{prop}", self._make_setter(prop))
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
for prop in TRACK_PROPS_RO:
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
# Mixer device (volume, panning, sends)
self._add("/live/track/get/volume", self._get_volume)
self._add("/live/track/set/volume", self._set_volume)
self._add("/live/track/start_listen/volume", self._start_listen_volume)
self._add("/live/track/stop_listen/volume", self._stop_listen_volume)
self._add("/live/track/get/panning", self._get_panning)
self._add("/live/track/set/panning", self._set_panning)
self._add("/live/track/start_listen/panning", self._start_listen_panning)
self._add("/live/track/stop_listen/panning", self._stop_listen_panning)
self._add("/live/track/get/send", self._get_send)
self._add("/live/track/set/send", self._set_send)
self._add("/live/track/start_listen/send", self._start_listen_send)
self._add("/live/track/stop_listen/send", self._stop_listen_send)
# Routing
self._add("/live/track/get/available_input_routing_types",
self._get_available_input_routing_types)
self._add("/live/track/get/available_input_routing_channels",
self._get_available_input_routing_channels)
self._add("/live/track/get/input_routing_type", self._get_input_routing_type)
self._add("/live/track/set/input_routing_type", self._set_input_routing_type)
self._add("/live/track/get/input_routing_channel", self._get_input_routing_channel)
self._add("/live/track/set/input_routing_channel", self._set_input_routing_channel)
self._add("/live/track/get/available_output_routing_types",
self._get_available_output_routing_types)
self._add("/live/track/get/available_output_routing_channels",
self._get_available_output_routing_channels)
self._add("/live/track/get/output_routing_type", self._get_output_routing_type)
self._add("/live/track/set/output_routing_type", self._set_output_routing_type)
self._add("/live/track/get/output_routing_channel", self._get_output_routing_channel)
self._add("/live/track/set/output_routing_channel", self._set_output_routing_channel)
# Aggregate
self._add("/live/track/get/clips/name", self._get_clips_name)
self._add("/live/track/get/clips/length", self._get_clips_length)
self._add("/live/track/get/clips/color", self._get_clips_color)
self._add("/live/track/get/clips/is_playing", self._get_clips_is_playing)
self._add("/live/track/get/arrangement_clips/name", self._get_arr_clips_name)
self._add("/live/track/get/arrangement_clips/length", self._get_arr_clips_length)
self._add("/live/track/get/arrangement_clips/start_time", self._get_arr_clips_start)
self._add("/live/track/get/num_devices", self._get_num_devices)
self._add("/live/track/get/devices/name", self._get_devices_name)
self._add("/live/track/get/devices/type", self._get_devices_type)
self._add("/live/track/get/devices/class_name", self._get_devices_class_name)
self._add("/live/track/get/devices/can_have_chains", self._get_devices_can_have_chains)
# Methods
self._add("/live/track/stop_all_clips", self._stop_all_clips)
self._add("/live/track/delete_device", self._delete_device)
self._add("/live/track/duplicate_device", self._duplicate_device)
# ------------------------------------------------------------------
# Track resolution (supports wildcard *)
# ------------------------------------------------------------------
def _get_tracks(self, params: tuple) -> List[Tuple[int, Any]]:
tracks = list(self.song.tracks)
if not params or params[0] == "*":
return list(enumerate(tracks))
idx = int(params[0])
if 0 <= idx < len(tracks):
return [(idx, tracks[idx])]
return []
def _get_track(self, params: tuple) -> Optional[Any]:
tracks = list(self.song.tracks)
if not params:
return None
idx = int(params[0])
if 0 <= idx < len(tracks):
return tracks[idx]
return None
# ------------------------------------------------------------------
# Property factories
# ------------------------------------------------------------------
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
val = self._get_prop(track, prop)
if val is not None:
result += [idx, val]
return tuple(result) if result else None
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
self._set_prop(track, prop, params[1])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.{prop}"
self._register_listener(
key, track, prop,
f"/live/track/get/{prop}", (idx,)
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, track in self._get_tracks(params):
self._remove_listener(f"track.{idx}.{prop}")
return handler
# ------------------------------------------------------------------
# Mixer device
# ------------------------------------------------------------------
def _get_volume(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
try:
val = track.mixer_device.volume.value
result += [idx, val]
except Exception:
pass
return tuple(result) if result else None
def _set_volume(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
try:
track.mixer_device.volume.value = float(params[1])
except Exception as e:
logger.warning("set volume: %s", e)
return None
def _start_listen_volume(self, params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.volume"
self._remove_listener(key)
param = track.mixer_device.volume
def make_cb(i, p):
def cb():
self._send("/live/track/get/volume", (i, p.value))
return cb
cb = make_cb(idx, param)
try:
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("volume listener: %s", e)
def _stop_listen_volume(self, params: tuple) -> None:
for idx, _ in self._get_tracks(params):
self._remove_listener(f"track.{idx}.volume")
def _get_panning(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
try:
val = track.mixer_device.panning.value
result += [idx, val]
except Exception:
pass
return tuple(result) if result else None
def _set_panning(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
try:
track.mixer_device.panning.value = float(params[1])
except Exception as e:
logger.warning("set panning: %s", e)
return None
def _start_listen_panning(self, params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.panning"
self._remove_listener(key)
param = track.mixer_device.panning
def make_cb(i, p):
def cb():
self._send("/live/track/get/panning", (i, p.value))
return cb
cb = make_cb(idx, param)
try:
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("panning listener: %s", e)
def _stop_listen_panning(self, params: tuple) -> None:
for idx, _ in self._get_tracks(params):
self._remove_listener(f"track.{idx}.panning")
# ------------------------------------------------------------------
# Sends
# ------------------------------------------------------------------
def _get_send(self, params: tuple) -> Optional[tuple]:
if len(params) < 2:
return None
track = self._get_track(params)
send_idx = int(params[1])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
return (int(params[0]), send_idx, sends[send_idx].value)
except Exception as e:
logger.warning("get send: %s", e)
return None
def _set_send(self, params: tuple) -> None:
if len(params) < 3:
return None
track = self._get_track(params)
send_idx = int(params[1])
value = float(params[2])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
sends[send_idx].value = value
except Exception as e:
logger.warning("set send: %s", e)
return None
def _start_listen_send(self, params: tuple) -> None:
if len(params) < 2:
return
track = self._get_track(params)
track_idx = int(params[0])
send_idx = int(params[1])
if track:
key = f"track.{track_idx}.send.{send_idx}"
self._remove_listener(key)
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
param = sends[send_idx]
def make_cb(ti, si, p):
def cb():
self._send("/live/track/get/send", (ti, si, p.value))
return cb
cb = make_cb(track_idx, send_idx, param)
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("send listener: %s", e)
def _stop_listen_send(self, params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"track.{int(params[0])}.send.{int(params[1])}")
# ------------------------------------------------------------------
# Routing
# ------------------------------------------------------------------
def _get_available_input_routing_types(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
types = [rt.display_name for rt in track.available_input_routing_types]
return tuple(types)
except Exception as e:
logger.warning("input routing types: %s", e)
return None
def _get_available_input_routing_channels(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
channels = [rc.display_name for rc in track.available_input_routing_channels]
return tuple(channels)
except Exception as e:
logger.warning("input routing channels: %s", e)
return None
def _get_input_routing_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.input_routing_type.display_name,)
except Exception as e:
logger.warning("get input_routing_type: %s", e)
return None
def _set_input_routing_type(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rt in track.available_input_routing_types:
if rt.display_name == name:
track.input_routing_type = rt
break
except Exception as e:
logger.warning("set input_routing_type: %s", e)
return None
def _get_input_routing_channel(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.input_routing_channel.display_name,)
except Exception as e:
logger.warning("get input_routing_channel: %s", e)
return None
def _set_input_routing_channel(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rc in track.available_input_routing_channels:
if rc.display_name == name:
track.input_routing_channel = rc
break
except Exception as e:
logger.warning("set input_routing_channel: %s", e)
return None
def _get_available_output_routing_types(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
types = [rt.display_name for rt in track.available_output_routing_types]
return tuple(types)
except Exception as e:
logger.warning("output routing types: %s", e)
return None
def _get_available_output_routing_channels(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
channels = [rc.display_name for rc in track.available_output_routing_channels]
return tuple(channels)
except Exception as e:
logger.warning("output routing channels: %s", e)
return None
def _get_output_routing_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.output_routing_type.display_name,)
except Exception as e:
logger.warning("get output_routing_type: %s", e)
return None
def _set_output_routing_type(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rt in track.available_output_routing_types:
if rt.display_name == name:
track.output_routing_type = rt
break
except Exception as e:
logger.warning("set output_routing_type: %s", e)
return None
def _get_output_routing_channel(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.output_routing_channel.display_name,)
except Exception as e:
logger.warning("get output_routing_channel: %s", e)
return None
def _set_output_routing_channel(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rc in track.available_output_routing_channels:
if rc.display_name == name:
track.output_routing_channel = rc
break
except Exception as e:
logger.warning("set output_routing_channel: %s", e)
return None
# ------------------------------------------------------------------
# Aggregate clip info
# ------------------------------------------------------------------
def _get_session_clips(self, track):
return [slot.clip for slot in track.clip_slots if slot.has_clip]
def _get_clips_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(c.name for c in self._get_session_clips(track))
return None
def _get_clips_length(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(float(c.length) for c in self._get_session_clips(track))
return None
def _get_clips_color(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(c.color for c in self._get_session_clips(track))
return None
def _get_clips_is_playing(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(bool(c.is_playing) for c in self._get_session_clips(track))
return None
def _get_arr_clips_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(c.name for c in track.arrangement_clips)
except Exception:
return None
return None
def _get_arr_clips_length(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(float(c.length) for c in track.arrangement_clips)
except Exception:
return None
return None
def _get_arr_clips_start(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(float(c.start_time) for c in track.arrangement_clips)
except Exception:
return None
return None
# ------------------------------------------------------------------
# Device info
# ------------------------------------------------------------------
def _get_num_devices(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return (len(track.devices),)
return None
def _get_devices_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(d.name for d in track.devices)
return None
def _get_devices_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(int(d.type) for d in track.devices)
return None
def _get_devices_class_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(d.class_name for d in track.devices)
return None
def _get_devices_can_have_chains(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(bool(getattr(d, "can_have_chains", False)) for d in track.devices)
return None
# ------------------------------------------------------------------
# Methods
# ------------------------------------------------------------------
def _stop_all_clips(self, params: tuple) -> None:
for _, track in self._get_tracks(params):
try:
track.stop_all_clips()
except Exception as e:
logger.warning("stop_all_clips: %s", e)
return None
def _delete_device(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
device_idx = int(params[1])
if track:
try:
track.delete_device(device_idx)
except Exception as e:
logger.warning("delete_device: %s", e)
return None
def _duplicate_device(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
device_idx = int(params[1])
if track:
try:
track.duplicate_device(device_idx)
except Exception as e:
logger.warning("duplicate_device: %s", e)
return None