"""Handles /live/track/* OSC addresses.""" import logging from typing import Any, List, Optional, Tuple from .handler import AbletonOSCHandler logger = logging.getLogger(__name__) TRACK_PROPS_RW = [ "arm", "mute", "solo", "name", "color", "color_index", "fold_state", "current_monitoring_state", ] TRACK_PROPS_RO = [ "can_be_armed", "has_audio_input", "has_audio_output", "has_midi_input", "has_midi_output", "is_foldable", "is_grouped", "is_visible", "fired_slot_index", "playing_slot_index", "output_meter_level", "output_meter_left", "output_meter_right", "output_meter_peak_left", "output_meter_peak_right", ] class TrackHandler(AbletonOSCHandler): def init_api(self) -> None: self.clear_listeners() for prop in TRACK_PROPS_RW: self._add(f"/live/track/get/{prop}", self._make_getter(prop)) self._add(f"/live/track/set/{prop}", self._make_setter(prop)) self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop)) self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop)) for prop in TRACK_PROPS_RO: self._add(f"/live/track/get/{prop}", self._make_getter(prop)) self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop)) self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop)) # Mixer device (volume, panning, sends) self._add("/live/track/get/volume", self._get_volume) self._add("/live/track/set/volume", self._set_volume) self._add("/live/track/start_listen/volume", self._start_listen_volume) self._add("/live/track/stop_listen/volume", self._stop_listen_volume) self._add("/live/track/get/panning", self._get_panning) self._add("/live/track/set/panning", self._set_panning) self._add("/live/track/start_listen/panning", self._start_listen_panning) self._add("/live/track/stop_listen/panning", self._stop_listen_panning) self._add("/live/track/get/send", self._get_send) self._add("/live/track/set/send", self._set_send) self._add("/live/track/start_listen/send", self._start_listen_send) self._add("/live/track/stop_listen/send", self._stop_listen_send) # Routing self._add("/live/track/get/available_input_routing_types", self._get_available_input_routing_types) self._add("/live/track/get/available_input_routing_channels", self._get_available_input_routing_channels) self._add("/live/track/get/input_routing_type", self._get_input_routing_type) self._add("/live/track/set/input_routing_type", self._set_input_routing_type) self._add("/live/track/get/input_routing_channel", self._get_input_routing_channel) self._add("/live/track/set/input_routing_channel", self._set_input_routing_channel) self._add("/live/track/get/available_output_routing_types", self._get_available_output_routing_types) self._add("/live/track/get/available_output_routing_channels", self._get_available_output_routing_channels) self._add("/live/track/get/output_routing_type", self._get_output_routing_type) self._add("/live/track/set/output_routing_type", self._set_output_routing_type) self._add("/live/track/get/output_routing_channel", self._get_output_routing_channel) self._add("/live/track/set/output_routing_channel", self._set_output_routing_channel) # Aggregate self._add("/live/track/get/clips/name", self._get_clips_name) self._add("/live/track/get/clips/length", self._get_clips_length) self._add("/live/track/get/clips/color", self._get_clips_color) self._add("/live/track/get/clips/is_playing", self._get_clips_is_playing) self._add("/live/track/get/arrangement_clips/name", self._get_arr_clips_name) self._add("/live/track/get/arrangement_clips/length", self._get_arr_clips_length) self._add("/live/track/get/arrangement_clips/start_time", self._get_arr_clips_start) self._add("/live/track/get/num_devices", self._get_num_devices) self._add("/live/track/get/devices/name", self._get_devices_name) self._add("/live/track/get/devices/type", self._get_devices_type) self._add("/live/track/get/devices/class_name", self._get_devices_class_name) self._add("/live/track/get/devices/can_have_chains", self._get_devices_can_have_chains) # Methods self._add("/live/track/stop_all_clips", self._stop_all_clips) self._add("/live/track/delete_device", self._delete_device) self._add("/live/track/duplicate_device", self._duplicate_device) # ------------------------------------------------------------------ # Track resolution (supports wildcard *) # ------------------------------------------------------------------ def _get_tracks(self, params: tuple) -> List[Tuple[int, Any]]: tracks = list(self.song.tracks) if not params or params[0] == "*": return list(enumerate(tracks)) idx = int(params[0]) if 0 <= idx < len(tracks): return [(idx, tracks[idx])] return [] def _get_track(self, params: tuple) -> Optional[Any]: tracks = list(self.song.tracks) if not params: return None idx = int(params[0]) if 0 <= idx < len(tracks): return tracks[idx] return None # ------------------------------------------------------------------ # Property factories # ------------------------------------------------------------------ def _make_getter(self, prop: str): def handler(params: tuple) -> Optional[tuple]: result = [] for idx, track in self._get_tracks(params): val = self._get_prop(track, prop) if val is not None: result += [idx, val] return tuple(result) if result else None return handler def _make_setter(self, prop: str): def handler(params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) if track: self._set_prop(track, prop, params[1]) return None return handler def _make_start_listen(self, prop: str): def handler(params: tuple) -> None: for idx, track in self._get_tracks(params): key = f"track.{idx}.{prop}" self._register_listener( key, track, prop, f"/live/track/get/{prop}", (idx,) ) return handler def _make_stop_listen(self, prop: str): def handler(params: tuple) -> None: for idx, track in self._get_tracks(params): self._remove_listener(f"track.{idx}.{prop}") return handler # ------------------------------------------------------------------ # Mixer device # ------------------------------------------------------------------ def _get_volume(self, params: tuple) -> Optional[tuple]: result = [] for idx, track in self._get_tracks(params): try: val = track.mixer_device.volume.value result += [idx, val] except Exception: pass return tuple(result) if result else None def _set_volume(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) if track: try: track.mixer_device.volume.value = float(params[1]) except Exception as e: logger.warning("set volume: %s", e) return None def _start_listen_volume(self, params: tuple) -> None: for idx, track in self._get_tracks(params): key = f"track.{idx}.volume" self._remove_listener(key) param = track.mixer_device.volume def make_cb(i, p): def cb(): self._send("/live/track/get/volume", (i, p.value)) return cb cb = make_cb(idx, param) try: param.add_value_listener(cb) self._listener_store[key] = (param.remove_value_listener, cb) cb() except Exception as e: logger.warning("volume listener: %s", e) def _stop_listen_volume(self, params: tuple) -> None: for idx, _ in self._get_tracks(params): self._remove_listener(f"track.{idx}.volume") def _get_panning(self, params: tuple) -> Optional[tuple]: result = [] for idx, track in self._get_tracks(params): try: val = track.mixer_device.panning.value result += [idx, val] except Exception: pass return tuple(result) if result else None def _set_panning(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) if track: try: track.mixer_device.panning.value = float(params[1]) except Exception as e: logger.warning("set panning: %s", e) return None def _start_listen_panning(self, params: tuple) -> None: for idx, track in self._get_tracks(params): key = f"track.{idx}.panning" self._remove_listener(key) param = track.mixer_device.panning def make_cb(i, p): def cb(): self._send("/live/track/get/panning", (i, p.value)) return cb cb = make_cb(idx, param) try: param.add_value_listener(cb) self._listener_store[key] = (param.remove_value_listener, cb) cb() except Exception as e: logger.warning("panning listener: %s", e) def _stop_listen_panning(self, params: tuple) -> None: for idx, _ in self._get_tracks(params): self._remove_listener(f"track.{idx}.panning") # ------------------------------------------------------------------ # Sends # ------------------------------------------------------------------ def _get_send(self, params: tuple) -> Optional[tuple]: if len(params) < 2: return None track = self._get_track(params) send_idx = int(params[1]) if track: try: sends = list(track.mixer_device.sends) if 0 <= send_idx < len(sends): return (int(params[0]), send_idx, sends[send_idx].value) except Exception as e: logger.warning("get send: %s", e) return None def _set_send(self, params: tuple) -> None: if len(params) < 3: return None track = self._get_track(params) send_idx = int(params[1]) value = float(params[2]) if track: try: sends = list(track.mixer_device.sends) if 0 <= send_idx < len(sends): sends[send_idx].value = value except Exception as e: logger.warning("set send: %s", e) return None def _start_listen_send(self, params: tuple) -> None: if len(params) < 2: return track = self._get_track(params) track_idx = int(params[0]) send_idx = int(params[1]) if track: key = f"track.{track_idx}.send.{send_idx}" self._remove_listener(key) try: sends = list(track.mixer_device.sends) if 0 <= send_idx < len(sends): param = sends[send_idx] def make_cb(ti, si, p): def cb(): self._send("/live/track/get/send", (ti, si, p.value)) return cb cb = make_cb(track_idx, send_idx, param) param.add_value_listener(cb) self._listener_store[key] = (param.remove_value_listener, cb) cb() except Exception as e: logger.warning("send listener: %s", e) def _stop_listen_send(self, params: tuple) -> None: if len(params) >= 2: self._remove_listener(f"track.{int(params[0])}.send.{int(params[1])}") # ------------------------------------------------------------------ # Routing # ------------------------------------------------------------------ def _get_available_input_routing_types(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: types = [rt.display_name for rt in track.available_input_routing_types] return tuple(types) except Exception as e: logger.warning("input routing types: %s", e) return None def _get_available_input_routing_channels(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: channels = [rc.display_name for rc in track.available_input_routing_channels] return tuple(channels) except Exception as e: logger.warning("input routing channels: %s", e) return None def _get_input_routing_type(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return (track.input_routing_type.display_name,) except Exception as e: logger.warning("get input_routing_type: %s", e) return None def _set_input_routing_type(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) name = str(params[1]) if track: try: for rt in track.available_input_routing_types: if rt.display_name == name: track.input_routing_type = rt break except Exception as e: logger.warning("set input_routing_type: %s", e) return None def _get_input_routing_channel(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return (track.input_routing_channel.display_name,) except Exception as e: logger.warning("get input_routing_channel: %s", e) return None def _set_input_routing_channel(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) name = str(params[1]) if track: try: for rc in track.available_input_routing_channels: if rc.display_name == name: track.input_routing_channel = rc break except Exception as e: logger.warning("set input_routing_channel: %s", e) return None def _get_available_output_routing_types(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: types = [rt.display_name for rt in track.available_output_routing_types] return tuple(types) except Exception as e: logger.warning("output routing types: %s", e) return None def _get_available_output_routing_channels(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: channels = [rc.display_name for rc in track.available_output_routing_channels] return tuple(channels) except Exception as e: logger.warning("output routing channels: %s", e) return None def _get_output_routing_type(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return (track.output_routing_type.display_name,) except Exception as e: logger.warning("get output_routing_type: %s", e) return None def _set_output_routing_type(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) name = str(params[1]) if track: try: for rt in track.available_output_routing_types: if rt.display_name == name: track.output_routing_type = rt break except Exception as e: logger.warning("set output_routing_type: %s", e) return None def _get_output_routing_channel(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return (track.output_routing_channel.display_name,) except Exception as e: logger.warning("get output_routing_channel: %s", e) return None def _set_output_routing_channel(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) name = str(params[1]) if track: try: for rc in track.available_output_routing_channels: if rc.display_name == name: track.output_routing_channel = rc break except Exception as e: logger.warning("set output_routing_channel: %s", e) return None # ------------------------------------------------------------------ # Aggregate clip info # ------------------------------------------------------------------ def _get_session_clips(self, track): return [slot.clip for slot in track.clip_slots if slot.has_clip] def _get_clips_name(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(c.name for c in self._get_session_clips(track)) return None def _get_clips_length(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(float(c.length) for c in self._get_session_clips(track)) return None def _get_clips_color(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(c.color for c in self._get_session_clips(track)) return None def _get_clips_is_playing(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(bool(c.is_playing) for c in self._get_session_clips(track)) return None def _get_arr_clips_name(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return tuple(c.name for c in track.arrangement_clips) except Exception: return None return None def _get_arr_clips_length(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return tuple(float(c.length) for c in track.arrangement_clips) except Exception: return None return None def _get_arr_clips_start(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: try: return tuple(float(c.start_time) for c in track.arrangement_clips) except Exception: return None return None # ------------------------------------------------------------------ # Device info # ------------------------------------------------------------------ def _get_num_devices(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return (len(track.devices),) return None def _get_devices_name(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(d.name for d in track.devices) return None def _get_devices_type(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(int(d.type) for d in track.devices) return None def _get_devices_class_name(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(d.class_name for d in track.devices) return None def _get_devices_can_have_chains(self, params: tuple) -> Optional[tuple]: track = self._get_track(params) if track: return tuple(bool(getattr(d, "can_have_chains", False)) for d in track.devices) return None # ------------------------------------------------------------------ # Methods # ------------------------------------------------------------------ def _stop_all_clips(self, params: tuple) -> None: for _, track in self._get_tracks(params): try: track.stop_all_clips() except Exception as e: logger.warning("stop_all_clips: %s", e) return None def _delete_device(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) device_idx = int(params[1]) if track: try: track.delete_device(device_idx) except Exception as e: logger.warning("delete_device: %s", e) return None def _duplicate_device(self, params: tuple) -> None: if len(params) < 2: return None track = self._get_track(params) device_idx = int(params[1]) if track: try: track.duplicate_device(device_idx) except Exception as e: logger.warning("duplicate_device: %s", e) return None