"""Handles /live/device/* and /live/device/chain/* OSC addresses.""" import logging from typing import Any, List, Optional, Tuple from .handler import AbletonOSCHandler logger = logging.getLogger(__name__) def _find_device(song, track_idx: int, device_idx: int, chain_idx: Optional[int] = None, sub_device_idx: Optional[int] = None) -> Optional[Any]: """Resolve a device from track + device index (optionally into a rack chain).""" try: tracks = list(song.tracks) if not (0 <= track_idx < len(tracks)): return None devices = list(tracks[track_idx].devices) if not (0 <= device_idx < len(devices)): return None device = devices[device_idx] if chain_idx is not None and sub_device_idx is not None: chains = list(getattr(device, "chains", [])) if not (0 <= chain_idx < len(chains)): return None sub_devices = list(chains[chain_idx].devices) if not (0 <= sub_device_idx < len(sub_devices)): return None return sub_devices[sub_device_idx] return device except Exception as e: logger.warning("find_device: %s", e) return None class DeviceHandler(AbletonOSCHandler): def init_api(self) -> None: self.clear_listeners() # --- device-level --- self._add("/live/device/get/name", self._get_name) self._add("/live/device/get/class_name", self._get_class_name) self._add("/live/device/get/type", self._get_type) self._add("/live/device/get/is_active", self._get_is_active) self._add("/live/device/set/is_active", self._set_is_active) self._add("/live/device/start_listen/is_active", self._start_listen_is_active) self._add("/live/device/stop_listen/is_active", self._stop_listen_is_active) # --- parameter count --- self._add("/live/device/get/num_parameters", self._get_num_parameters) # --- bulk parameter operations --- self._add("/live/device/get/parameters/name", self._get_params_name) self._add("/live/device/get/parameters/value", self._get_params_value) self._add("/live/device/get/parameters/min", self._get_params_min) self._add("/live/device/get/parameters/max", self._get_params_max) self._add("/live/device/get/parameters/default_value", self._get_params_default) self._add("/live/device/get/parameters/is_quantized", self._get_params_is_quantized) self._add("/live/device/set/parameters/value", self._set_params_value) # --- individual parameter --- self._add("/live/device/get/parameter/name", self._get_param_name) self._add("/live/device/get/parameter/value", self._get_param_value) self._add("/live/device/get/parameter/value_string", self._get_param_value_string) self._add("/live/device/get/parameter/min", self._get_param_min) self._add("/live/device/get/parameter/max", self._get_param_max) self._add("/live/device/get/parameter/default_value", self._get_param_default) self._add("/live/device/set/parameter/value", self._set_param_value) self._add("/live/device/start_listen/parameter/value", self._start_listen_param) self._add("/live/device/stop_listen/parameter/value", self._stop_listen_param) # --- rack-specific --- self._add("/live/device/get/num_chains", self._get_num_chains) self._add("/live/device/get/chains/name", self._get_chains_name) self._add("/live/device/get/chains/num_devices", self._get_chains_num_devices) self._add("/live/device/get/chains/devices/name", self._get_chains_devices_name) self._add("/live/device/randomize_macros", self._randomize_macros) # --- drum pad --- self._add("/live/device/get/drum_pads/name", self._get_drum_pads_name) self._add("/live/device/get/drum_pads/note", self._get_drum_pads_note) self._add("/live/device/get/drum_pads/mute", self._get_drum_pads_mute) self._add("/live/device/set/drum_pads/mute", self._set_drum_pad_mute) self._add("/live/device/get/drum_pads/solo", self._get_drum_pads_solo) self._add("/live/device/set/drum_pads/solo", self._set_drum_pad_solo) # --- chain devices (sub-devices inside racks) --- self._add("/live/device/chain/get/parameter/value", self._get_chain_param_value) self._add("/live/device/chain/set/parameter/value", self._set_chain_param_value) self._add("/live/device/chain/start_listen/parameter/value", self._start_listen_chain_param) self._add("/live/device/chain/stop_listen/parameter/value", self._stop_listen_chain_param) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _get_device(self, params: tuple) -> Optional[Any]: if len(params) < 2: return None return _find_device(self.song, int(params[0]), int(params[1])) def _get_param(self, params: tuple) -> Optional[Any]: """params: track_idx, device_idx, param_idx""" device = self._get_device(params) if device is None or len(params) < 3: return None param_idx = int(params[2]) try: params_list = list(device.parameters) if 0 <= param_idx < len(params_list): return params_list[param_idx] except Exception as e: logger.warning("get_param: %s", e) return None def _get_all_params(self, params: tuple) -> List[Any]: device = self._get_device(params) if device is None: return [] try: return list(device.parameters) except Exception: return [] # ------------------------------------------------------------------ # Device properties # ------------------------------------------------------------------ def _get_name(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) return (int(params[0]), int(params[1]), d.name) if d else None def _get_class_name(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) return (int(params[0]), int(params[1]), d.class_name) if d else None def _get_type(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) return (int(params[0]), int(params[1]), int(d.type)) if d else None def _get_is_active(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) return (int(params[0]), int(params[1]), bool(d.is_active)) if d else None def _set_is_active(self, params: tuple) -> None: if len(params) < 3: return None d = self._get_device(params) if d: try: d.is_active = bool(params[2]) except Exception as e: logger.warning("set is_active: %s", e) return None def _start_listen_is_active(self, params: tuple) -> None: d = self._get_device(params) if d: key = f"device.{params[0]}.{params[1]}.is_active" prefix = (int(params[0]), int(params[1])) self._register_listener(key, d, "is_active", "/live/device/get/is_active", prefix) def _stop_listen_is_active(self, params: tuple) -> None: if len(params) >= 2: self._remove_listener(f"device.{params[0]}.{params[1]}.is_active") def _get_num_parameters(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) return (int(params[0]), int(params[1]), len(list(d.parameters))) if d else None # ------------------------------------------------------------------ # Bulk parameter operations # ------------------------------------------------------------------ def _get_params_name(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(p.name for p in ps) if ps else None def _get_params_value(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(float(p.value) for p in ps) if ps else None def _get_params_min(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(float(p.min) for p in ps) if ps else None def _get_params_max(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(float(p.max) for p in ps) if ps else None def _get_params_default(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(float(p.default_value) for p in ps) if ps else None def _get_params_is_quantized(self, params: tuple) -> Optional[tuple]: ps = self._get_all_params(params) return (int(params[0]), int(params[1])) + tuple(bool(p.is_quantized) for p in ps) if ps else None def _set_params_value(self, params: tuple) -> None: """params: track_idx, device_idx, val0, val1, ...""" if len(params) < 3: return None ps = self._get_all_params(params) values = params[2:] for i, val in enumerate(values): if i < len(ps): try: ps[i].value = float(val) except Exception as e: logger.warning("set param %d: %s", i, e) return None # ------------------------------------------------------------------ # Individual parameter # ------------------------------------------------------------------ def _get_param_name(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) return (int(params[0]), int(params[1]), int(params[2]), p.name) if p else None def _get_param_value(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) return (int(params[0]), int(params[1]), int(params[2]), float(p.value)) if p else None def _get_param_value_string(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) if p: try: s = p.str_for_value(p.value) return (int(params[0]), int(params[1]), int(params[2]), s) except Exception: return (int(params[0]), int(params[1]), int(params[2]), str(p.value)) return None def _get_param_min(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) return (int(params[0]), int(params[1]), int(params[2]), float(p.min)) if p else None def _get_param_max(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) return (int(params[0]), int(params[1]), int(params[2]), float(p.max)) if p else None def _get_param_default(self, params: tuple) -> Optional[tuple]: p = self._get_param(params) return (int(params[0]), int(params[1]), int(params[2]), float(p.default_value)) if p else None def _set_param_value(self, params: tuple) -> None: """params: track_idx, device_idx, param_idx, value""" if len(params) < 4: return None p = self._get_param(params) if p: try: p.value = float(params[3]) except Exception as e: logger.warning("set param value: %s", e) return None def _start_listen_param(self, params: tuple) -> None: """params: track_idx, device_idx, param_idx""" p = self._get_param(params) if p is None: return key = f"device.{params[0]}.{params[1]}.param.{params[2]}" self._remove_listener(key) prefix = (int(params[0]), int(params[1]), int(params[2])) def make_cb(pr, pfx): def cb(): self._send("/live/device/get/parameter/value", pfx + (float(pr.value),)) return cb cb = make_cb(p, prefix) try: p.add_value_listener(cb) self._listener_store[key] = (p.remove_value_listener, cb) cb() except Exception as e: logger.warning("param listener: %s", e) def _stop_listen_param(self, params: tuple) -> None: if len(params) >= 3: self._remove_listener(f"device.{params[0]}.{params[1]}.param.{params[2]}") # ------------------------------------------------------------------ # Rack-specific # ------------------------------------------------------------------ def _get_num_chains(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) if d: try: return (int(params[0]), int(params[1]), len(list(d.chains))) except Exception: return (int(params[0]), int(params[1]), 0) return None def _get_chains_name(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) if d: try: return (int(params[0]), int(params[1])) + tuple(c.name for c in d.chains) except Exception: pass return None def _get_chains_num_devices(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) if d: try: return (int(params[0]), int(params[1])) + tuple(len(c.devices) for c in d.chains) except Exception: pass return None def _get_chains_devices_name(self, params: tuple) -> Optional[tuple]: d = self._get_device(params) if d: try: result = [] for chain in d.chains: result += [dev.name for dev in chain.devices] return (int(params[0]), int(params[1])) + tuple(result) except Exception: pass return None def _randomize_macros(self, params: tuple) -> None: d = self._get_device(params) if d: try: d.randomize_macros() except Exception as e: logger.warning("randomize_macros: %s", e) return None # ------------------------------------------------------------------ # Drum pads # ------------------------------------------------------------------ def _get_drum_pads(self, params: tuple): d = self._get_device(params) if d: try: return list(d.drum_pads) except Exception: pass return [] def _get_drum_pads_name(self, params: tuple) -> Optional[tuple]: pads = self._get_drum_pads(params) if pads: return (int(params[0]), int(params[1])) + tuple(p.name for p in pads) return None def _get_drum_pads_note(self, params: tuple) -> Optional[tuple]: pads = self._get_drum_pads(params) if pads: return (int(params[0]), int(params[1])) + tuple(int(p.note) for p in pads) return None def _get_drum_pads_mute(self, params: tuple) -> Optional[tuple]: pads = self._get_drum_pads(params) if pads: return (int(params[0]), int(params[1])) + tuple(bool(p.mute) for p in pads) return None def _set_drum_pad_mute(self, params: tuple) -> None: """params: track_idx, device_idx, pad_idx, mute""" if len(params) < 4: return None pads = self._get_drum_pads(params) pad_idx = int(params[2]) if 0 <= pad_idx < len(pads): try: pads[pad_idx].mute = bool(params[3]) except Exception as e: logger.warning("set drum_pad mute: %s", e) return None def _get_drum_pads_solo(self, params: tuple) -> Optional[tuple]: pads = self._get_drum_pads(params) if pads: return (int(params[0]), int(params[1])) + tuple(bool(p.solo) for p in pads) return None def _set_drum_pad_solo(self, params: tuple) -> None: """params: track_idx, device_idx, pad_idx, solo""" if len(params) < 4: return None pads = self._get_drum_pads(params) pad_idx = int(params[2]) if 0 <= pad_idx < len(pads): try: pads[pad_idx].solo = bool(params[3]) except Exception as e: logger.warning("set drum_pad solo: %s", e) return None # ------------------------------------------------------------------ # Chain sub-device parameters # ------------------------------------------------------------------ def _get_chain_device(self, params: tuple) -> Optional[Any]: """params: track_idx, device_idx, chain_idx, sub_device_idx""" if len(params) < 4: return None return _find_device(self.song, int(params[0]), int(params[1]), int(params[2]), int(params[3])) def _get_chain_param(self, params: tuple) -> Optional[Any]: """params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx""" d = self._get_chain_device(params) if d is None or len(params) < 5: return None param_idx = int(params[4]) try: ps = list(d.parameters) return ps[param_idx] if 0 <= param_idx < len(ps) else None except Exception: return None def _get_chain_param_value(self, params: tuple) -> Optional[tuple]: p = self._get_chain_param(params) if p: return tuple(int(x) for x in params[:5]) + (float(p.value),) return None def _set_chain_param_value(self, params: tuple) -> None: """params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx, value""" if len(params) < 6: return None p = self._get_chain_param(params) if p: try: p.value = float(params[5]) except Exception as e: logger.warning("set chain param: %s", e) return None def _start_listen_chain_param(self, params: tuple) -> None: p = self._get_chain_param(params) if p is None: return key = f"device.chain.{'.'.join(str(x) for x in params[:5])}" self._remove_listener(key) prefix = tuple(int(x) for x in params[:5]) def make_cb(pr, pfx): def cb(): self._send("/live/device/chain/get/parameter/value", pfx + (float(pr.value),)) return cb cb = make_cb(p, prefix) try: p.add_value_listener(cb) self._listener_store[key] = (p.remove_value_listener, cb) cb() except Exception as e: logger.warning("chain param listener: %s", e) def _stop_listen_chain_param(self, params: tuple) -> None: if len(params) >= 5: key = f"device.chain.{'.'.join(str(x) for x in params[:5])}" self._remove_listener(key)