Files

468 lines
19 KiB
Python
Raw Permalink Normal View History

"""Handles /live/device/* and /live/device/chain/* OSC addresses."""
import logging
from typing import Any, List, Optional, Tuple
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
def _find_device(song, track_idx: int, device_idx: int,
chain_idx: Optional[int] = None,
sub_device_idx: Optional[int] = None) -> Optional[Any]:
"""Resolve a device from track + device index (optionally into a rack chain)."""
try:
tracks = list(song.tracks)
if not (0 <= track_idx < len(tracks)):
return None
devices = list(tracks[track_idx].devices)
if not (0 <= device_idx < len(devices)):
return None
device = devices[device_idx]
if chain_idx is not None and sub_device_idx is not None:
chains = list(getattr(device, "chains", []))
if not (0 <= chain_idx < len(chains)):
return None
sub_devices = list(chains[chain_idx].devices)
if not (0 <= sub_device_idx < len(sub_devices)):
return None
return sub_devices[sub_device_idx]
return device
except Exception as e:
logger.warning("find_device: %s", e)
return None
class DeviceHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
# --- device-level ---
self._add("/live/device/get/name", self._get_name)
self._add("/live/device/get/class_name", self._get_class_name)
self._add("/live/device/get/type", self._get_type)
self._add("/live/device/get/is_active", self._get_is_active)
self._add("/live/device/set/is_active", self._set_is_active)
self._add("/live/device/start_listen/is_active", self._start_listen_is_active)
self._add("/live/device/stop_listen/is_active", self._stop_listen_is_active)
# --- parameter count ---
self._add("/live/device/get/num_parameters", self._get_num_parameters)
# --- bulk parameter operations ---
self._add("/live/device/get/parameters/name", self._get_params_name)
self._add("/live/device/get/parameters/value", self._get_params_value)
self._add("/live/device/get/parameters/min", self._get_params_min)
self._add("/live/device/get/parameters/max", self._get_params_max)
self._add("/live/device/get/parameters/default_value", self._get_params_default)
self._add("/live/device/get/parameters/is_quantized", self._get_params_is_quantized)
self._add("/live/device/set/parameters/value", self._set_params_value)
# --- individual parameter ---
self._add("/live/device/get/parameter/name", self._get_param_name)
self._add("/live/device/get/parameter/value", self._get_param_value)
self._add("/live/device/get/parameter/value_string", self._get_param_value_string)
self._add("/live/device/get/parameter/min", self._get_param_min)
self._add("/live/device/get/parameter/max", self._get_param_max)
self._add("/live/device/get/parameter/default_value", self._get_param_default)
self._add("/live/device/set/parameter/value", self._set_param_value)
self._add("/live/device/start_listen/parameter/value", self._start_listen_param)
self._add("/live/device/stop_listen/parameter/value", self._stop_listen_param)
# --- rack-specific ---
self._add("/live/device/get/num_chains", self._get_num_chains)
self._add("/live/device/get/chains/name", self._get_chains_name)
self._add("/live/device/get/chains/num_devices", self._get_chains_num_devices)
self._add("/live/device/get/chains/devices/name", self._get_chains_devices_name)
self._add("/live/device/randomize_macros", self._randomize_macros)
# --- drum pad ---
self._add("/live/device/get/drum_pads/name", self._get_drum_pads_name)
self._add("/live/device/get/drum_pads/note", self._get_drum_pads_note)
self._add("/live/device/get/drum_pads/mute", self._get_drum_pads_mute)
self._add("/live/device/set/drum_pads/mute", self._set_drum_pad_mute)
self._add("/live/device/get/drum_pads/solo", self._get_drum_pads_solo)
self._add("/live/device/set/drum_pads/solo", self._set_drum_pad_solo)
# --- chain devices (sub-devices inside racks) ---
self._add("/live/device/chain/get/parameter/value", self._get_chain_param_value)
self._add("/live/device/chain/set/parameter/value", self._set_chain_param_value)
self._add("/live/device/chain/start_listen/parameter/value",
self._start_listen_chain_param)
self._add("/live/device/chain/stop_listen/parameter/value",
self._stop_listen_chain_param)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_device(self, params: tuple) -> Optional[Any]:
if len(params) < 2:
return None
return _find_device(self.song, int(params[0]), int(params[1]))
def _get_param(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, param_idx"""
device = self._get_device(params)
if device is None or len(params) < 3:
return None
param_idx = int(params[2])
try:
params_list = list(device.parameters)
if 0 <= param_idx < len(params_list):
return params_list[param_idx]
except Exception as e:
logger.warning("get_param: %s", e)
return None
def _get_all_params(self, params: tuple) -> List[Any]:
device = self._get_device(params)
if device is None:
return []
try:
return list(device.parameters)
except Exception:
return []
# ------------------------------------------------------------------
# Device properties
# ------------------------------------------------------------------
def _get_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), d.name) if d else None
def _get_class_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), d.class_name) if d else None
def _get_type(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), int(d.type)) if d else None
def _get_is_active(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), bool(d.is_active)) if d else None
def _set_is_active(self, params: tuple) -> None:
if len(params) < 3:
return None
d = self._get_device(params)
if d:
try:
d.is_active = bool(params[2])
except Exception as e:
logger.warning("set is_active: %s", e)
return None
def _start_listen_is_active(self, params: tuple) -> None:
d = self._get_device(params)
if d:
key = f"device.{params[0]}.{params[1]}.is_active"
prefix = (int(params[0]), int(params[1]))
self._register_listener(key, d, "is_active",
"/live/device/get/is_active", prefix)
def _stop_listen_is_active(self, params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"device.{params[0]}.{params[1]}.is_active")
def _get_num_parameters(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), len(list(d.parameters))) if d else None
# ------------------------------------------------------------------
# Bulk parameter operations
# ------------------------------------------------------------------
def _get_params_name(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(p.name for p in ps) if ps else None
def _get_params_value(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.value) for p in ps) if ps else None
def _get_params_min(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.min) for p in ps) if ps else None
def _get_params_max(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.max) for p in ps) if ps else None
def _get_params_default(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.default_value) for p in ps) if ps else None
def _get_params_is_quantized(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(bool(p.is_quantized) for p in ps) if ps else None
def _set_params_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, val0, val1, ..."""
if len(params) < 3:
return None
ps = self._get_all_params(params)
values = params[2:]
for i, val in enumerate(values):
if i < len(ps):
try:
ps[i].value = float(val)
except Exception as e:
logger.warning("set param %d: %s", i, e)
return None
# ------------------------------------------------------------------
# Individual parameter
# ------------------------------------------------------------------
def _get_param_name(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), p.name) if p else None
def _get_param_value(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.value)) if p else None
def _get_param_value_string(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
if p:
try:
s = p.str_for_value(p.value)
return (int(params[0]), int(params[1]), int(params[2]), s)
except Exception:
return (int(params[0]), int(params[1]), int(params[2]), str(p.value))
return None
def _get_param_min(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.min)) if p else None
def _get_param_max(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.max)) if p else None
def _get_param_default(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.default_value)) if p else None
def _set_param_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, param_idx, value"""
if len(params) < 4:
return None
p = self._get_param(params)
if p:
try:
p.value = float(params[3])
except Exception as e:
logger.warning("set param value: %s", e)
return None
def _start_listen_param(self, params: tuple) -> None:
"""params: track_idx, device_idx, param_idx"""
p = self._get_param(params)
if p is None:
return
key = f"device.{params[0]}.{params[1]}.param.{params[2]}"
self._remove_listener(key)
prefix = (int(params[0]), int(params[1]), int(params[2]))
def make_cb(pr, pfx):
def cb():
self._send("/live/device/get/parameter/value", pfx + (float(pr.value),))
return cb
cb = make_cb(p, prefix)
try:
p.add_value_listener(cb)
self._listener_store[key] = (p.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("param listener: %s", e)
def _stop_listen_param(self, params: tuple) -> None:
if len(params) >= 3:
self._remove_listener(f"device.{params[0]}.{params[1]}.param.{params[2]}")
# ------------------------------------------------------------------
# Rack-specific
# ------------------------------------------------------------------
def _get_num_chains(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1]), len(list(d.chains)))
except Exception:
return (int(params[0]), int(params[1]), 0)
return None
def _get_chains_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1])) + tuple(c.name for c in d.chains)
except Exception:
pass
return None
def _get_chains_num_devices(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1])) + tuple(len(c.devices) for c in d.chains)
except Exception:
pass
return None
def _get_chains_devices_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
result = []
for chain in d.chains:
result += [dev.name for dev in chain.devices]
return (int(params[0]), int(params[1])) + tuple(result)
except Exception:
pass
return None
def _randomize_macros(self, params: tuple) -> None:
d = self._get_device(params)
if d:
try:
d.randomize_macros()
except Exception as e:
logger.warning("randomize_macros: %s", e)
return None
# ------------------------------------------------------------------
# Drum pads
# ------------------------------------------------------------------
def _get_drum_pads(self, params: tuple):
d = self._get_device(params)
if d:
try:
return list(d.drum_pads)
except Exception:
pass
return []
def _get_drum_pads_name(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(p.name for p in pads)
return None
def _get_drum_pads_note(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(int(p.note) for p in pads)
return None
def _get_drum_pads_mute(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(bool(p.mute) for p in pads)
return None
def _set_drum_pad_mute(self, params: tuple) -> None:
"""params: track_idx, device_idx, pad_idx, mute"""
if len(params) < 4:
return None
pads = self._get_drum_pads(params)
pad_idx = int(params[2])
if 0 <= pad_idx < len(pads):
try:
pads[pad_idx].mute = bool(params[3])
except Exception as e:
logger.warning("set drum_pad mute: %s", e)
return None
def _get_drum_pads_solo(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(bool(p.solo) for p in pads)
return None
def _set_drum_pad_solo(self, params: tuple) -> None:
"""params: track_idx, device_idx, pad_idx, solo"""
if len(params) < 4:
return None
pads = self._get_drum_pads(params)
pad_idx = int(params[2])
if 0 <= pad_idx < len(pads):
try:
pads[pad_idx].solo = bool(params[3])
except Exception as e:
logger.warning("set drum_pad solo: %s", e)
return None
# ------------------------------------------------------------------
# Chain sub-device parameters
# ------------------------------------------------------------------
def _get_chain_device(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, chain_idx, sub_device_idx"""
if len(params) < 4:
return None
return _find_device(self.song, int(params[0]), int(params[1]),
int(params[2]), int(params[3]))
def _get_chain_param(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx"""
d = self._get_chain_device(params)
if d is None or len(params) < 5:
return None
param_idx = int(params[4])
try:
ps = list(d.parameters)
return ps[param_idx] if 0 <= param_idx < len(ps) else None
except Exception:
return None
def _get_chain_param_value(self, params: tuple) -> Optional[tuple]:
p = self._get_chain_param(params)
if p:
return tuple(int(x) for x in params[:5]) + (float(p.value),)
return None
def _set_chain_param_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx, value"""
if len(params) < 6:
return None
p = self._get_chain_param(params)
if p:
try:
p.value = float(params[5])
except Exception as e:
logger.warning("set chain param: %s", e)
return None
def _start_listen_chain_param(self, params: tuple) -> None:
p = self._get_chain_param(params)
if p is None:
return
key = f"device.chain.{'.'.join(str(x) for x in params[:5])}"
self._remove_listener(key)
prefix = tuple(int(x) for x in params[:5])
def make_cb(pr, pfx):
def cb():
self._send("/live/device/chain/get/parameter/value", pfx + (float(pr.value),))
return cb
cb = make_cb(p, prefix)
try:
p.add_value_listener(cb)
self._listener_store[key] = (p.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("chain param listener: %s", e)
def _stop_listen_chain_param(self, params: tuple) -> None:
if len(params) >= 5:
key = f"device.chain.{'.'.join(str(x) for x in params[:5])}"
self._remove_listener(key)