Initial implementation: full Ableton Live 11 OSC remote script

Covers all major Live API objects with get/set/listen/method handlers:
song, track, return_track, clip, clip_slot, device (incl. rack chains
and drum pads), scene, view, application, browser, groove.

Zero external runtime dependencies — OSC encoded/decoded in osc_server.py.
Wildcard * support for track/scene indices. Listener callbacks fire to
matching /get/ addresses for bidirectional state sync.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-01 12:27:47 +02:00
commit aad042650e
17 changed files with 3159 additions and 0 deletions
+8
View File
@@ -0,0 +1,8 @@
.venv/
__pycache__/
*.pyc
*.pyo
*.egg-info/
dist/
build/
.pytest_cache/
+9
View File
@@ -0,0 +1,9 @@
"""
AbletonOSC — full Ableton Live 11 OSC remote script.
Ableton calls create_instance() to instantiate the control surface.
"""
from .manager import Manager
def create_instance(c_instance):
return Manager(c_instance)
+51
View File
@@ -0,0 +1,51 @@
"""Handles /live/application/* OSC addresses."""
import logging
from typing import Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
class ApplicationHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
self._add("/live/application/get/version", self._get_version)
self._add("/live/application/get/average_process_usage",
self._get_average_process_usage)
self._add("/live/application/get/peak_process_usage",
self._get_peak_process_usage)
def _app(self):
try:
return self.manager.application()
except Exception:
return None
def _get_version(self, params: tuple) -> Optional[tuple]:
app = self._app()
if app:
try:
major, minor = app.get_major_version(), app.get_minor_version()
build = app.get_bugfix_version()
return (major, minor, build)
except Exception as e:
logger.warning("get version: %s", e)
return None
def _get_average_process_usage(self, params: tuple) -> Optional[tuple]:
app = self._app()
if app:
try:
return (float(app.average_process_usage),)
except Exception as e:
logger.warning("get avg process usage: %s", e)
return None
def _get_peak_process_usage(self, params: tuple) -> Optional[tuple]:
app = self._app()
if app:
try:
return (float(app.peak_process_usage),)
except Exception as e:
logger.warning("get peak process usage: %s", e)
return None
+149
View File
@@ -0,0 +1,149 @@
"""Handles /live/browser/* OSC addresses."""
import logging
from typing import Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
def _iter_items(node, path_parts, depth=0):
"""Recursively find a browser item by path."""
if not path_parts:
return node
name = path_parts[0]
try:
children = list(node.children)
except Exception:
return None
for child in children:
if child.name == name:
return _iter_items(child, path_parts[1:], depth + 1)
return None
class BrowserHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
# Category listings (returns flat list of names)
for category in [
"audio_effects", "instruments", "midi_effects",
"samples", "sounds", "clips", "packs", "plugins",
]:
self._add(f"/live/browser/get/{category}",
self._make_category_lister(category))
# Load by path (e.g., "Instruments/Wavetable")
self._add("/live/browser/load_item", self._load_item)
self._add("/live/browser/preview_item", self._preview_item)
self._add("/live/browser/stop_preview", self._stop_preview)
# Hotswap
self._add("/live/browser/get/hotswap_target", self._get_hotswap_target)
self._add("/live/browser/begin_hotswap", self._begin_hotswap)
# ------------------------------------------------------------------
def _browser(self):
try:
return self.manager.application().browser
except Exception:
return None
def _make_category_lister(self, category: str):
def handler(params: tuple) -> Optional[tuple]:
browser = self._browser()
if browser is None:
return None
try:
root = getattr(browser, category)
names = [item.name for item in root.children]
return tuple(names)
except Exception as e:
logger.warning("browser.%s: %s", category, e)
return None
return handler
def _load_item(self, params: tuple) -> None:
"""params: path_part1 [, path_part2, ...] -- e.g. 'Instruments' 'Wavetable'"""
if not params:
return None
browser = self._browser()
if browser is None:
return None
path = [str(p) for p in params]
# Try to find within all top-level categories
for category in [
"audio_effects", "instruments", "midi_effects",
"samples", "sounds", "clips", "packs", "plugins",
]:
try:
root = getattr(browser, category)
item = _iter_items(root, path)
if item is not None:
browser.load_item(item)
return None
except Exception:
continue
logger.warning("browser.load_item: item not found: %s", path)
return None
def _preview_item(self, params: tuple) -> None:
if not params:
return None
browser = self._browser()
if browser is None:
return None
path = [str(p) for p in params]
for category in [
"audio_effects", "instruments", "midi_effects",
"samples", "sounds", "clips",
]:
try:
root = getattr(browser, category)
item = _iter_items(root, path)
if item is not None:
browser.preview_item(item)
return None
except Exception:
continue
return None
def _stop_preview(self, params: tuple) -> None:
browser = self._browser()
if browser:
try:
browser.stop_preview()
except Exception as e:
logger.warning("stop_preview: %s", e)
return None
def _get_hotswap_target(self, params: tuple) -> Optional[tuple]:
browser = self._browser()
if browser:
try:
target = browser.hotswap_target
if target is not None:
return (target.name,)
return ("",)
except Exception as e:
logger.warning("hotswap_target: %s", e)
return None
def _begin_hotswap(self, params: tuple) -> None:
"""params: track_idx, device_idx"""
if len(params) < 2:
return None
try:
track_idx = int(params[0])
device_idx = int(params[1])
tracks = list(self.song.tracks)
if 0 <= track_idx < len(tracks):
devices = list(tracks[track_idx].devices)
if 0 <= device_idx < len(devices):
browser = self._browser()
if browser:
browser.hotswap_target = devices[device_idx]
except Exception as e:
logger.warning("begin_hotswap: %s", e)
return None
+415
View File
@@ -0,0 +1,415 @@
"""Handles /live/clip/* OSC addresses."""
import logging
from typing import Any, Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
CLIP_PROPS_RW = [
"name", "color", "color_index",
"muted", "gain",
"pitch_coarse", "pitch_fine", "velocity_amount",
"looping", "loop_start", "loop_end",
"start_marker", "end_marker",
"warping", "warp_mode",
"launch_mode", "launch_quantization",
"ram_mode",
]
CLIP_PROPS_RO = [
"is_playing", "is_recording", "is_midi_clip", "is_audio_clip",
"is_overdubbing", "is_triggered", "will_record_on_start",
"has_groove",
"length", "end_time", "start_time",
"playing_position", "sample_length",
"gain_display_string",
"file_path",
]
class ClipHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in CLIP_PROPS_RW:
self._add(f"/live/clip/get/{prop}", self._make_getter(prop))
self._add(f"/live/clip/set/{prop}", self._make_setter(prop))
self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop))
for prop in CLIP_PROPS_RO:
self._add(f"/live/clip/get/{prop}", self._make_getter(prop))
self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop))
# Playback
self._add("/live/clip/fire", self._fire)
self._add("/live/clip/stop", self._stop)
self._add("/live/clip/duplicate_loop", self._duplicate_loop)
self._add("/live/clip/quantize", self._quantize)
# MIDI notes
self._add("/live/clip/get/notes", self._get_notes)
self._add("/live/clip/add/notes", self._add_notes)
self._add("/live/clip/remove/notes", self._remove_notes)
self._add("/live/clip/get/notes_extended", self._get_notes_extended)
self._add("/live/clip/apply_note_modifications", self._apply_note_modifications)
# Automation envelopes
self._add("/live/clip/get/automation_envelope", self._get_automation_envelope)
self._add("/live/clip/create_automation_envelope", self._create_automation_envelope)
self._add("/live/clip/clear_envelope", self._clear_envelope)
self._add("/live/clip/clear_all_envelopes", self._clear_all_envelopes)
# Warp markers
self._add("/live/clip/get/warp_markers", self._get_warp_markers)
# Groove
self._add("/live/clip/get/groove", self._get_groove)
self._add("/live/clip/set/groove", self._set_groove)
# ------------------------------------------------------------------
def _get_clip(self, params: tuple) -> Optional[Any]:
if len(params) < 2:
return None
try:
track_idx = int(params[0])
clip_idx = int(params[1])
tracks = list(self.song.tracks)
if 0 <= track_idx < len(tracks):
slots = list(tracks[track_idx].clip_slots)
if 0 <= clip_idx < len(slots) and slots[clip_idx].has_clip:
return slots[clip_idx].clip
except Exception as e:
logger.warning("get_clip(%s): %s", params, e)
return None
def _clip_key(self, params: tuple) -> str:
return f"clip.{params[0]}.{params[1]}"
# ------------------------------------------------------------------
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
clip = self._get_clip(params)
if clip is None:
return None
val = self._get_prop(clip, prop)
if val is None:
return None
prefix = (int(params[0]), int(params[1]))
if isinstance(val, (list, tuple)):
return prefix + tuple(val)
return prefix + (val,)
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 3:
return None
clip = self._get_clip(params)
if clip:
self._set_prop(clip, prop, params[2])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
clip = self._get_clip(params)
if clip:
key = f"{self._clip_key(params)}.{prop}"
prefix = (int(params[0]), int(params[1]))
self._register_listener(
key, clip, prop, f"/live/clip/get/{prop}", prefix
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"{self._clip_key(params)}.{prop}")
return handler
# ------------------------------------------------------------------
# Playback methods
# ------------------------------------------------------------------
def _fire(self, params: tuple) -> None:
clip = self._get_clip(params)
if clip:
try:
clip.fire()
except Exception as e:
logger.warning("clip.fire: %s", e)
return None
def _stop(self, params: tuple) -> None:
clip = self._get_clip(params)
if clip:
try:
clip.stop()
except Exception as e:
logger.warning("clip.stop: %s", e)
return None
def _duplicate_loop(self, params: tuple) -> None:
clip = self._get_clip(params)
if clip:
try:
clip.duplicate_loop()
except Exception as e:
logger.warning("clip.duplicate_loop: %s", e)
return None
def _quantize(self, params: tuple) -> None:
"""params: track_idx, clip_idx, quantization, strength"""
clip = self._get_clip(params)
if clip:
try:
quant = int(params[2]) if len(params) > 2 else 4
strength = float(params[3]) if len(params) > 3 else 1.0
clip.quantize(quant, strength)
except Exception as e:
logger.warning("clip.quantize: %s", e)
return None
# ------------------------------------------------------------------
# MIDI notes
# ------------------------------------------------------------------
def _get_notes(self, params: tuple) -> Optional[tuple]:
"""params: track_idx, clip_idx [, pitch, pitch_span, time, time_span]"""
clip = self._get_clip(params)
if clip is None or not clip.is_midi_clip:
return None
try:
if len(params) >= 6:
pitch = int(params[2])
pitch_span = int(params[3])
time = float(params[4])
time_span = float(params[5])
notes = clip.get_notes(time, pitch, time_span, pitch_span)
else:
notes = clip.get_notes(0, 0, clip.length, 128)
result = []
for note in notes:
result += [note[0], note[1], note[2], note[3], int(note[4])]
return (int(params[0]), int(params[1])) + tuple(result)
except Exception as e:
logger.warning("get_notes: %s", e)
return None
def _get_notes_extended(self, params: tuple) -> Optional[tuple]:
"""Returns notes using the extended API (with IDs)."""
clip = self._get_clip(params)
if clip is None or not clip.is_midi_clip:
return None
try:
if len(params) >= 6:
notes = clip.get_notes_extended(
int(params[2]), int(params[3]),
float(params[4]), float(params[5])
)
else:
notes = clip.get_notes_extended(0, 128, 0, clip.length)
result = []
for note in notes:
result += [
note.pitch, float(note.start_time), float(note.duration),
note.velocity, int(note.mute), note.note_id,
float(note.release_velocity), float(note.probability)
]
return (int(params[0]), int(params[1])) + tuple(result)
except Exception as e:
logger.warning("get_notes_extended: %s", e)
return None
def _add_notes(self, params: tuple) -> None:
"""params: track_idx, clip_idx, pitch, start_time, duration, velocity, mute, [...]"""
clip = self._get_clip(params)
if clip is None or not clip.is_midi_clip:
return None
try:
note_data = params[2:]
if len(note_data) % 5 != 0:
logger.warning("add_notes: expected multiple of 5 args after indices")
return None
notes = []
for i in range(0, len(note_data), 5):
notes.append((
int(note_data[i]), # pitch
float(note_data[i+1]), # start_time
float(note_data[i+2]), # duration
int(note_data[i+3]), # velocity
bool(note_data[i+4]), # mute
))
clip.set_notes(tuple(notes))
except Exception as e:
logger.warning("add_notes: %s", e)
return None
def _remove_notes(self, params: tuple) -> None:
"""params: track_idx, clip_idx [, time, time_span, pitch, pitch_span]"""
clip = self._get_clip(params)
if clip is None or not clip.is_midi_clip:
return None
try:
if len(params) >= 6:
time = float(params[2])
time_span = float(params[3])
pitch = int(params[4])
pitch_span = int(params[5])
else:
time, time_span, pitch, pitch_span = 0, clip.length, 0, 128
clip.remove_notes(time, pitch, time_span, pitch_span)
except Exception as e:
logger.warning("remove_notes: %s", e)
return None
def _apply_note_modifications(self, params: tuple) -> None:
"""params: track_idx, clip_idx, note_id, pitch, start_time, duration, velocity, mute, ..."""
clip = self._get_clip(params)
if clip is None or not clip.is_midi_clip:
return None
try:
note_data = params[2:]
# 8 values per note: note_id, pitch, start_time, duration, velocity, mute,
# release_velocity, probability
if len(note_data) % 8 != 0:
logger.warning("apply_note_modifications: expected multiple of 8 args")
return None
notes = []
for i in range(0, len(note_data), 8):
note = clip.get_notes_extended(0, 128, 0, clip.length)
# Build modification by note_id
notes.append({
"note_id": int(note_data[i]),
"pitch": int(note_data[i+1]),
"start_time": float(note_data[i+2]),
"duration": float(note_data[i+3]),
"velocity": int(note_data[i+4]),
"mute": bool(note_data[i+5]),
"release_velocity": float(note_data[i+6]),
"probability": float(note_data[i+7]),
})
clip.apply_note_modifications(notes)
except Exception as e:
logger.warning("apply_note_modifications: %s", e)
return None
# ------------------------------------------------------------------
# Automation envelopes
# ------------------------------------------------------------------
def _get_automation_envelope(self, params: tuple) -> Optional[tuple]:
"""params: track_idx, clip_idx, device_idx, param_idx"""
clip = self._get_clip(params)
if clip is None or len(params) < 4:
return None
try:
track = list(self.song.tracks)[int(params[0])]
device = list(track.devices)[int(params[2])]
param = list(device.parameters)[int(params[3])]
env = clip.automation_envelope(param)
if env is None:
return None
points = []
for step in range(env.count):
time = step * (clip.length / env.count)
points += [float(time), float(env.value_at_time(time))]
return (int(params[0]), int(params[1]), int(params[2]), int(params[3])) + tuple(points)
except Exception as e:
logger.warning("get_automation_envelope: %s", e)
return None
def _create_automation_envelope(self, params: tuple) -> None:
"""params: track_idx, clip_idx, device_idx, param_idx"""
clip = self._get_clip(params)
if clip is None or len(params) < 4:
return None
try:
track = list(self.song.tracks)[int(params[0])]
device = list(track.devices)[int(params[2])]
param = list(device.parameters)[int(params[3])]
clip.create_automation_envelope(param)
except Exception as e:
logger.warning("create_automation_envelope: %s", e)
return None
def _clear_envelope(self, params: tuple) -> None:
clip = self._get_clip(params)
if clip is None or len(params) < 4:
return None
try:
track = list(self.song.tracks)[int(params[0])]
device = list(track.devices)[int(params[2])]
param = list(device.parameters)[int(params[3])]
env = clip.automation_envelope(param)
if env:
env.clear_all_events()
except Exception as e:
logger.warning("clear_envelope: %s", e)
return None
def _clear_all_envelopes(self, params: tuple) -> None:
clip = self._get_clip(params)
if clip:
try:
clip.clear_all_envelopes()
except Exception as e:
logger.warning("clear_all_envelopes: %s", e)
return None
# ------------------------------------------------------------------
# Warp markers
# ------------------------------------------------------------------
def _get_warp_markers(self, params: tuple) -> Optional[tuple]:
clip = self._get_clip(params)
if clip is None or not clip.is_audio_clip:
return None
try:
markers = list(clip.warp_markers)
result = []
for m in markers:
result += [float(m.beat_time), float(m.sample_time)]
return (int(params[0]), int(params[1])) + tuple(result)
except Exception as e:
logger.warning("get_warp_markers: %s", e)
return None
# ------------------------------------------------------------------
# Groove
# ------------------------------------------------------------------
def _get_groove(self, params: tuple) -> Optional[tuple]:
clip = self._get_clip(params)
if clip is None:
return None
try:
groove = clip.groove
if groove is None:
return (int(params[0]), int(params[1]), "")
return (int(params[0]), int(params[1]), groove.name)
except Exception as e:
logger.warning("get groove: %s", e)
return None
def _set_groove(self, params: tuple) -> None:
"""params: track_idx, clip_idx, groove_name"""
clip = self._get_clip(params)
if clip is None or len(params) < 3:
return None
groove_name = str(params[2])
try:
if groove_name == "":
clip.groove = None
else:
for g in self.song.groove_pool.grooves:
if g.name == groove_name:
clip.groove = g
break
except Exception as e:
logger.warning("set groove: %s", e)
return None
+158
View File
@@ -0,0 +1,158 @@
"""Handles /live/clip_slot/* OSC addresses."""
import logging
from typing import Any, Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
SLOT_PROPS_RO = [
"has_clip", "is_playing", "is_triggered", "is_recording",
"is_group_slot", "controls_other_clips", "playing_status",
"will_record_on_start",
]
SLOT_PROPS_RW = ["has_stop_button"]
class ClipSlotHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in SLOT_PROPS_RO:
self._add(f"/live/clip_slot/get/{prop}", self._make_getter(prop))
self._add(f"/live/clip_slot/start_listen/{prop}",
self._make_start_listen(prop))
self._add(f"/live/clip_slot/stop_listen/{prop}",
self._make_stop_listen(prop))
for prop in SLOT_PROPS_RW:
self._add(f"/live/clip_slot/get/{prop}", self._make_getter(prop))
self._add(f"/live/clip_slot/set/{prop}", self._make_setter(prop))
self._add(f"/live/clip_slot/start_listen/{prop}",
self._make_start_listen(prop))
self._add(f"/live/clip_slot/stop_listen/{prop}",
self._make_stop_listen(prop))
self._add("/live/clip_slot/fire", self._fire)
self._add("/live/clip_slot/stop", self._stop)
self._add("/live/clip_slot/create_clip", self._create_clip)
self._add("/live/clip_slot/delete_clip", self._delete_clip)
self._add("/live/clip_slot/duplicate_clip_to", self._duplicate_clip_to)
# ------------------------------------------------------------------
def _get_slot(self, params: tuple) -> Optional[Any]:
if len(params) < 2:
return None
try:
track_idx = int(params[0])
slot_idx = int(params[1])
tracks = list(self.song.tracks)
if 0 <= track_idx < len(tracks):
slots = list(tracks[track_idx].clip_slots)
if 0 <= slot_idx < len(slots):
return slots[slot_idx]
except Exception as e:
logger.warning("get_slot(%s): %s", params, e)
return None
def _slot_key(self, params: tuple) -> str:
return f"clip_slot.{params[0]}.{params[1]}"
# ------------------------------------------------------------------
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
slot = self._get_slot(params)
if slot is None:
return None
val = self._get_prop(slot, prop)
if val is None:
return None
return (int(params[0]), int(params[1]), val)
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 3:
return None
slot = self._get_slot(params)
if slot:
self._set_prop(slot, prop, params[2])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
slot = self._get_slot(params)
if slot:
key = f"{self._slot_key(params)}.{prop}"
prefix = (int(params[0]), int(params[1]))
self._register_listener(
key, slot, prop, f"/live/clip_slot/get/{prop}", prefix
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"{self._slot_key(params)}.{prop}")
return handler
# ------------------------------------------------------------------
def _fire(self, params: tuple) -> None:
slot = self._get_slot(params)
if slot:
try:
slot.fire()
except Exception as e:
logger.warning("clip_slot.fire: %s", e)
return None
def _stop(self, params: tuple) -> None:
slot = self._get_slot(params)
if slot:
try:
slot.stop()
except Exception as e:
logger.warning("clip_slot.stop: %s", e)
return None
def _create_clip(self, params: tuple) -> None:
"""params: track_idx, slot_idx [, length_in_beats]"""
slot = self._get_slot(params)
if slot:
try:
length = float(params[2]) if len(params) > 2 else 4.0
slot.create_clip(length)
except Exception as e:
logger.warning("clip_slot.create_clip: %s", e)
return None
def _delete_clip(self, params: tuple) -> None:
slot = self._get_slot(params)
if slot and slot.has_clip:
try:
slot.delete_clip()
except Exception as e:
logger.warning("clip_slot.delete_clip: %s", e)
return None
def _duplicate_clip_to(self, params: tuple) -> None:
"""params: src_track_idx, src_slot_idx, dst_track_idx, dst_slot_idx"""
if len(params) < 4:
return None
src_slot = self._get_slot(params)
if src_slot and src_slot.has_clip:
try:
dst_track_idx = int(params[2])
dst_slot_idx = int(params[3])
tracks = list(self.song.tracks)
if 0 <= dst_track_idx < len(tracks):
dst_slots = list(tracks[dst_track_idx].clip_slots)
if 0 <= dst_slot_idx < len(dst_slots):
src_slot.duplicate_clip_to(dst_slots[dst_slot_idx])
except Exception as e:
logger.warning("clip_slot.duplicate_clip_to: %s", e)
return None
+467
View File
@@ -0,0 +1,467 @@
"""Handles /live/device/* and /live/device/chain/* OSC addresses."""
import logging
from typing import Any, List, Optional, Tuple
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
def _find_device(song, track_idx: int, device_idx: int,
chain_idx: Optional[int] = None,
sub_device_idx: Optional[int] = None) -> Optional[Any]:
"""Resolve a device from track + device index (optionally into a rack chain)."""
try:
tracks = list(song.tracks)
if not (0 <= track_idx < len(tracks)):
return None
devices = list(tracks[track_idx].devices)
if not (0 <= device_idx < len(devices)):
return None
device = devices[device_idx]
if chain_idx is not None and sub_device_idx is not None:
chains = list(getattr(device, "chains", []))
if not (0 <= chain_idx < len(chains)):
return None
sub_devices = list(chains[chain_idx].devices)
if not (0 <= sub_device_idx < len(sub_devices)):
return None
return sub_devices[sub_device_idx]
return device
except Exception as e:
logger.warning("find_device: %s", e)
return None
class DeviceHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
# --- device-level ---
self._add("/live/device/get/name", self._get_name)
self._add("/live/device/get/class_name", self._get_class_name)
self._add("/live/device/get/type", self._get_type)
self._add("/live/device/get/is_active", self._get_is_active)
self._add("/live/device/set/is_active", self._set_is_active)
self._add("/live/device/start_listen/is_active", self._start_listen_is_active)
self._add("/live/device/stop_listen/is_active", self._stop_listen_is_active)
# --- parameter count ---
self._add("/live/device/get/num_parameters", self._get_num_parameters)
# --- bulk parameter operations ---
self._add("/live/device/get/parameters/name", self._get_params_name)
self._add("/live/device/get/parameters/value", self._get_params_value)
self._add("/live/device/get/parameters/min", self._get_params_min)
self._add("/live/device/get/parameters/max", self._get_params_max)
self._add("/live/device/get/parameters/default_value", self._get_params_default)
self._add("/live/device/get/parameters/is_quantized", self._get_params_is_quantized)
self._add("/live/device/set/parameters/value", self._set_params_value)
# --- individual parameter ---
self._add("/live/device/get/parameter/name", self._get_param_name)
self._add("/live/device/get/parameter/value", self._get_param_value)
self._add("/live/device/get/parameter/value_string", self._get_param_value_string)
self._add("/live/device/get/parameter/min", self._get_param_min)
self._add("/live/device/get/parameter/max", self._get_param_max)
self._add("/live/device/get/parameter/default_value", self._get_param_default)
self._add("/live/device/set/parameter/value", self._set_param_value)
self._add("/live/device/start_listen/parameter/value", self._start_listen_param)
self._add("/live/device/stop_listen/parameter/value", self._stop_listen_param)
# --- rack-specific ---
self._add("/live/device/get/num_chains", self._get_num_chains)
self._add("/live/device/get/chains/name", self._get_chains_name)
self._add("/live/device/get/chains/num_devices", self._get_chains_num_devices)
self._add("/live/device/get/chains/devices/name", self._get_chains_devices_name)
self._add("/live/device/randomize_macros", self._randomize_macros)
# --- drum pad ---
self._add("/live/device/get/drum_pads/name", self._get_drum_pads_name)
self._add("/live/device/get/drum_pads/note", self._get_drum_pads_note)
self._add("/live/device/get/drum_pads/mute", self._get_drum_pads_mute)
self._add("/live/device/set/drum_pads/mute", self._set_drum_pad_mute)
self._add("/live/device/get/drum_pads/solo", self._get_drum_pads_solo)
self._add("/live/device/set/drum_pads/solo", self._set_drum_pad_solo)
# --- chain devices (sub-devices inside racks) ---
self._add("/live/device/chain/get/parameter/value", self._get_chain_param_value)
self._add("/live/device/chain/set/parameter/value", self._set_chain_param_value)
self._add("/live/device/chain/start_listen/parameter/value",
self._start_listen_chain_param)
self._add("/live/device/chain/stop_listen/parameter/value",
self._stop_listen_chain_param)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _get_device(self, params: tuple) -> Optional[Any]:
if len(params) < 2:
return None
return _find_device(self.song, int(params[0]), int(params[1]))
def _get_param(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, param_idx"""
device = self._get_device(params)
if device is None or len(params) < 3:
return None
param_idx = int(params[2])
try:
params_list = list(device.parameters)
if 0 <= param_idx < len(params_list):
return params_list[param_idx]
except Exception as e:
logger.warning("get_param: %s", e)
return None
def _get_all_params(self, params: tuple) -> List[Any]:
device = self._get_device(params)
if device is None:
return []
try:
return list(device.parameters)
except Exception:
return []
# ------------------------------------------------------------------
# Device properties
# ------------------------------------------------------------------
def _get_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), d.name) if d else None
def _get_class_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), d.class_name) if d else None
def _get_type(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), int(d.type)) if d else None
def _get_is_active(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), bool(d.is_active)) if d else None
def _set_is_active(self, params: tuple) -> None:
if len(params) < 3:
return None
d = self._get_device(params)
if d:
try:
d.is_active = bool(params[2])
except Exception as e:
logger.warning("set is_active: %s", e)
return None
def _start_listen_is_active(self, params: tuple) -> None:
d = self._get_device(params)
if d:
key = f"device.{params[0]}.{params[1]}.is_active"
prefix = (int(params[0]), int(params[1]))
self._register_listener(key, d, "is_active",
"/live/device/get/is_active", prefix)
def _stop_listen_is_active(self, params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"device.{params[0]}.{params[1]}.is_active")
def _get_num_parameters(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
return (int(params[0]), int(params[1]), len(list(d.parameters))) if d else None
# ------------------------------------------------------------------
# Bulk parameter operations
# ------------------------------------------------------------------
def _get_params_name(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(p.name for p in ps) if ps else None
def _get_params_value(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.value) for p in ps) if ps else None
def _get_params_min(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.min) for p in ps) if ps else None
def _get_params_max(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.max) for p in ps) if ps else None
def _get_params_default(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(float(p.default_value) for p in ps) if ps else None
def _get_params_is_quantized(self, params: tuple) -> Optional[tuple]:
ps = self._get_all_params(params)
return (int(params[0]), int(params[1])) + tuple(bool(p.is_quantized) for p in ps) if ps else None
def _set_params_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, val0, val1, ..."""
if len(params) < 3:
return None
ps = self._get_all_params(params)
values = params[2:]
for i, val in enumerate(values):
if i < len(ps):
try:
ps[i].value = float(val)
except Exception as e:
logger.warning("set param %d: %s", i, e)
return None
# ------------------------------------------------------------------
# Individual parameter
# ------------------------------------------------------------------
def _get_param_name(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), p.name) if p else None
def _get_param_value(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.value)) if p else None
def _get_param_value_string(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
if p:
try:
s = p.str_for_value(p.value)
return (int(params[0]), int(params[1]), int(params[2]), s)
except Exception:
return (int(params[0]), int(params[1]), int(params[2]), str(p.value))
return None
def _get_param_min(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.min)) if p else None
def _get_param_max(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.max)) if p else None
def _get_param_default(self, params: tuple) -> Optional[tuple]:
p = self._get_param(params)
return (int(params[0]), int(params[1]), int(params[2]), float(p.default_value)) if p else None
def _set_param_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, param_idx, value"""
if len(params) < 4:
return None
p = self._get_param(params)
if p:
try:
p.value = float(params[3])
except Exception as e:
logger.warning("set param value: %s", e)
return None
def _start_listen_param(self, params: tuple) -> None:
"""params: track_idx, device_idx, param_idx"""
p = self._get_param(params)
if p is None:
return
key = f"device.{params[0]}.{params[1]}.param.{params[2]}"
self._remove_listener(key)
prefix = (int(params[0]), int(params[1]), int(params[2]))
def make_cb(pr, pfx):
def cb():
self._send("/live/device/get/parameter/value", pfx + (float(pr.value),))
return cb
cb = make_cb(p, prefix)
try:
p.add_value_listener(cb)
self._listener_store[key] = (p.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("param listener: %s", e)
def _stop_listen_param(self, params: tuple) -> None:
if len(params) >= 3:
self._remove_listener(f"device.{params[0]}.{params[1]}.param.{params[2]}")
# ------------------------------------------------------------------
# Rack-specific
# ------------------------------------------------------------------
def _get_num_chains(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1]), len(list(d.chains)))
except Exception:
return (int(params[0]), int(params[1]), 0)
return None
def _get_chains_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1])) + tuple(c.name for c in d.chains)
except Exception:
pass
return None
def _get_chains_num_devices(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
return (int(params[0]), int(params[1])) + tuple(len(c.devices) for c in d.chains)
except Exception:
pass
return None
def _get_chains_devices_name(self, params: tuple) -> Optional[tuple]:
d = self._get_device(params)
if d:
try:
result = []
for chain in d.chains:
result += [dev.name for dev in chain.devices]
return (int(params[0]), int(params[1])) + tuple(result)
except Exception:
pass
return None
def _randomize_macros(self, params: tuple) -> None:
d = self._get_device(params)
if d:
try:
d.randomize_macros()
except Exception as e:
logger.warning("randomize_macros: %s", e)
return None
# ------------------------------------------------------------------
# Drum pads
# ------------------------------------------------------------------
def _get_drum_pads(self, params: tuple):
d = self._get_device(params)
if d:
try:
return list(d.drum_pads)
except Exception:
pass
return []
def _get_drum_pads_name(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(p.name for p in pads)
return None
def _get_drum_pads_note(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(int(p.note) for p in pads)
return None
def _get_drum_pads_mute(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(bool(p.mute) for p in pads)
return None
def _set_drum_pad_mute(self, params: tuple) -> None:
"""params: track_idx, device_idx, pad_idx, mute"""
if len(params) < 4:
return None
pads = self._get_drum_pads(params)
pad_idx = int(params[2])
if 0 <= pad_idx < len(pads):
try:
pads[pad_idx].mute = bool(params[3])
except Exception as e:
logger.warning("set drum_pad mute: %s", e)
return None
def _get_drum_pads_solo(self, params: tuple) -> Optional[tuple]:
pads = self._get_drum_pads(params)
if pads:
return (int(params[0]), int(params[1])) + tuple(bool(p.solo) for p in pads)
return None
def _set_drum_pad_solo(self, params: tuple) -> None:
"""params: track_idx, device_idx, pad_idx, solo"""
if len(params) < 4:
return None
pads = self._get_drum_pads(params)
pad_idx = int(params[2])
if 0 <= pad_idx < len(pads):
try:
pads[pad_idx].solo = bool(params[3])
except Exception as e:
logger.warning("set drum_pad solo: %s", e)
return None
# ------------------------------------------------------------------
# Chain sub-device parameters
# ------------------------------------------------------------------
def _get_chain_device(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, chain_idx, sub_device_idx"""
if len(params) < 4:
return None
return _find_device(self.song, int(params[0]), int(params[1]),
int(params[2]), int(params[3]))
def _get_chain_param(self, params: tuple) -> Optional[Any]:
"""params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx"""
d = self._get_chain_device(params)
if d is None or len(params) < 5:
return None
param_idx = int(params[4])
try:
ps = list(d.parameters)
return ps[param_idx] if 0 <= param_idx < len(ps) else None
except Exception:
return None
def _get_chain_param_value(self, params: tuple) -> Optional[tuple]:
p = self._get_chain_param(params)
if p:
return tuple(int(x) for x in params[:5]) + (float(p.value),)
return None
def _set_chain_param_value(self, params: tuple) -> None:
"""params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx, value"""
if len(params) < 6:
return None
p = self._get_chain_param(params)
if p:
try:
p.value = float(params[5])
except Exception as e:
logger.warning("set chain param: %s", e)
return None
def _start_listen_chain_param(self, params: tuple) -> None:
p = self._get_chain_param(params)
if p is None:
return
key = f"device.chain.{'.'.join(str(x) for x in params[:5])}"
self._remove_listener(key)
prefix = tuple(int(x) for x in params[:5])
def make_cb(pr, pfx):
def cb():
self._send("/live/device/chain/get/parameter/value", pfx + (float(pr.value),))
return cb
cb = make_cb(p, prefix)
try:
p.add_value_listener(cb)
self._listener_store[key] = (p.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("chain param listener: %s", e)
def _stop_listen_chain_param(self, params: tuple) -> None:
if len(params) >= 5:
key = f"device.chain.{'.'.join(str(x) for x in params[:5])}"
self._remove_listener(key)
+60
View File
@@ -0,0 +1,60 @@
"""Handles /live/groove/* OSC addresses."""
import logging
from typing import Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
class GrooveHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
self._add("/live/groove/get/grooves", self._get_grooves)
self._add("/live/groove/get/amount", self._get_groove_amount)
self._add("/live/groove/set/amount", self._set_groove_amount)
def _pool(self):
try:
return self.song.groove_pool
except Exception:
return None
def _get_grooves(self, params: tuple) -> Optional[tuple]:
pool = self._pool()
if pool is None:
return None
try:
return tuple(g.name for g in pool.grooves)
except Exception as e:
logger.warning("get grooves: %s", e)
return None
def _get_groove_amount(self, params: tuple) -> Optional[tuple]:
"""params: groove_name"""
pool = self._pool()
if pool is None or not params:
return None
name = str(params[0])
try:
for g in pool.grooves:
if g.name == name:
return (name, float(g.amount))
except Exception as e:
logger.warning("get groove amount: %s", e)
return None
def _set_groove_amount(self, params: tuple) -> None:
"""params: groove_name, amount"""
pool = self._pool()
if pool is None or len(params) < 2:
return None
name = str(params[0])
amount = float(params[1])
try:
for g in pool.grooves:
if g.name == name:
g.amount = amount
break
except Exception as e:
logger.warning("set groove amount: %s", e)
return None
+129
View File
@@ -0,0 +1,129 @@
"""Base handler with get/set/listen/call infrastructure."""
import logging
from functools import partial
from typing import Any, Callable, Dict, Optional, Tuple
logger = logging.getLogger(__name__)
class AbletonOSCHandler:
def __init__(self, manager):
self.manager = manager
self.osc_server = manager.osc_server
self._listener_store: Dict[str, Callable] = {} # key -> (target, remove_fn)
@property
def song(self):
return self.manager.song()
# ------------------------------------------------------------------
# Registration helpers
# ------------------------------------------------------------------
def _add(self, address: str, callback: Callable) -> None:
self.osc_server.add_handler(address, callback)
def _send(self, address: str, args: tuple) -> None:
self.osc_server.send_all_clients(address, args)
# ------------------------------------------------------------------
# Generic property get
# ------------------------------------------------------------------
def _get_prop(self, target: Any, prop: str) -> Any:
try:
return getattr(target, prop)
except Exception as e:
logger.warning("get %s: %s", prop, e)
return None
def _set_prop(self, target: Any, prop: str, value: Any) -> None:
try:
setattr(target, prop, value)
except Exception as e:
logger.warning("set %s=%s: %s", prop, value, e)
# ------------------------------------------------------------------
# Listener management
# ------------------------------------------------------------------
def _register_listener(self, key: str, target: Any, prop: str,
notify_address: str, notify_args_prefix: tuple = ()) -> None:
"""Attach a Live listener and remember it so we can remove it later."""
self._remove_listener(key)
add_fn_name = f"add_{prop}_listener"
remove_fn_name = f"remove_{prop}_listener"
add_fn = getattr(target, add_fn_name, None)
remove_fn = getattr(target, remove_fn_name, None)
if add_fn is None:
logger.warning("No listener support for %s", prop)
return
def callback():
val = self._get_prop(target, prop)
if val is None:
return
if isinstance(val, (list, tuple)):
self._send(notify_address, notify_args_prefix + tuple(val))
else:
self._send(notify_address, notify_args_prefix + (val,))
add_fn(callback)
self._listener_store[key] = (remove_fn, callback)
# Send current value immediately
callback()
def _remove_listener(self, key: str) -> None:
entry = self._listener_store.pop(key, None)
if entry is None:
return
remove_fn, callback = entry
try:
if remove_fn and remove_fn(callback):
pass
elif remove_fn:
remove_fn(callback)
except Exception as e:
logger.warning("remove listener %s: %s", key, e)
def clear_listeners(self) -> None:
for key in list(self._listener_store.keys()):
self._remove_listener(key)
# ------------------------------------------------------------------
# Handler factories for the common get/set/listen pattern
# ------------------------------------------------------------------
def _make_getter(self, get_target: Callable, prop: str,
prefix_from_params: Callable = None) -> Callable:
"""Returns a handler fn that reads target.prop and returns the value."""
def handler(params: tuple) -> Optional[tuple]:
target = get_target(params)
if target is None:
return None
val = self._get_prop(target, prop)
if val is None:
return None
prefix = prefix_from_params(params) if prefix_from_params else ()
if isinstance(val, (list, tuple)):
return prefix + tuple(val)
return prefix + (val,)
return handler
def _make_setter(self, get_target: Callable, prop: str,
value_index: int = 0) -> Callable:
"""Returns a handler fn that sets target.prop from params."""
def handler(params: tuple) -> None:
target = get_target(params)
if target is None:
return None
value = params[value_index]
self._set_prop(target, prop, value)
return None
return handler
def init_api(self) -> None:
"""Override in subclasses to register all OSC handlers."""
pass
+127
View File
@@ -0,0 +1,127 @@
"""
Main ControlSurface entry point for Ableton Live.
Manages OSC server lifecycle and all handler registrations.
"""
import logging
import os
import sys
try:
from ableton.v2.control_surface import ControlSurface
except ImportError:
# Allow importing outside of Ableton for tooling
class ControlSurface: # type: ignore
def __init__(self, c_instance=None):
self._c_instance = c_instance
def song(self):
return None
def application(self):
return None
def disconnect(self):
pass
def update_display(self):
pass
def log_message(self, msg):
print(msg)
from .osc_server import OSCServer
from .song import SongHandler
from .track import TrackHandler
from .return_track import ReturnTrackHandler
from .clip import ClipHandler
from .clip_slot import ClipSlotHandler
from .device import DeviceHandler
from .scene import SceneHandler
from .view import ViewHandler
from .application import ApplicationHandler
from .browser import BrowserHandler
from .groove import GrooveHandler
logger = logging.getLogger(__name__)
LISTEN_PORT = int(os.environ.get("ABLETON_OSC_LISTEN_PORT", 11000))
SEND_PORT = int(os.environ.get("ABLETON_OSC_SEND_PORT", 11001))
class Manager(ControlSurface):
def __init__(self, c_instance=None):
super().__init__(c_instance)
self._setup_logging()
self.osc_server = OSCServer(listen_port=LISTEN_PORT, send_port=SEND_PORT)
self._handlers = []
self._setup_handlers()
self.osc_server.start()
logger.info("AbletonOSC started (listen=%d, send=%d)", LISTEN_PORT, SEND_PORT)
self.osc_server.send("/live/startup", ())
# ------------------------------------------------------------------
def _setup_logging(self) -> None:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
def _setup_handlers(self) -> None:
handler_classes = [
ApplicationHandler,
SongHandler,
TrackHandler,
ReturnTrackHandler,
ClipHandler,
ClipSlotHandler,
DeviceHandler,
SceneHandler,
ViewHandler,
BrowserHandler,
GrooveHandler,
]
for cls in handler_classes:
h = cls(self)
h.init_api()
self._handlers.append(h)
# Control endpoints
self.osc_server.add_handler("/live/test", self._handle_test)
self.osc_server.add_handler("/live/api/reload", self._handle_reload)
self.osc_server.add_handler("/live/api/show_message", self._handle_show_message)
# ------------------------------------------------------------------
# ControlSurface interface
# ------------------------------------------------------------------
def update_display(self) -> None:
"""Called by Live ~100ms; drives the OSC poll loop."""
self.osc_server.process()
def disconnect(self) -> None:
for h in self._handlers:
h.clear_listeners()
self.osc_server.send("/live/shutdown", ())
self.osc_server.stop()
super().disconnect()
logger.info("AbletonOSC disconnected")
# ------------------------------------------------------------------
# Built-in handlers
# ------------------------------------------------------------------
def _handle_test(self, params: tuple):
return ("AbletonOSC active",)
def _handle_reload(self, params: tuple):
# Re-init all handlers (clears listeners and re-registers)
for h in self._handlers:
h.clear_listeners()
h.init_api()
return ("reloaded",)
def _handle_show_message(self, params: tuple):
if params:
self.log_message(str(params[0]))
return None
+225
View File
@@ -0,0 +1,225 @@
"""
UDP OSC server with zero external dependencies.
Safe for Ableton's embedded Python (no threading, non-blocking sockets).
"""
import logging
import socket
import struct
from typing import Any, Callable, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
LISTEN_PORT = 11000
SEND_PORT = 11001
MAX_PACKET = 65536
# ---------------------------------------------------------------------------
# OSC encoding
# ---------------------------------------------------------------------------
def _pad4(n: int) -> int:
return (n + 3) & ~3
def _encode_string(s: str) -> bytes:
b = s.encode("utf-8") + b"\x00"
pad = _pad4(len(b)) - len(b)
return b + b"\x00" * pad
def _encode_int(i: int) -> bytes:
return struct.pack(">i", int(i))
def _encode_float(f: float) -> bytes:
return struct.pack(">f", float(f))
def _encode_blob(b: bytes) -> bytes:
length = struct.pack(">I", len(b))
pad = _pad4(len(b)) - len(b)
return length + b + b"\x00" * pad
def encode_message(address: str, args: tuple) -> bytes:
type_tags = ","
arg_bytes = b""
for arg in args:
if isinstance(arg, bool):
type_tags += "T" if arg else "F"
elif isinstance(arg, int):
type_tags += "i"
arg_bytes += _encode_int(arg)
elif isinstance(arg, float):
type_tags += "f"
arg_bytes += _encode_float(arg)
elif isinstance(arg, str):
type_tags += "s"
arg_bytes += _encode_string(arg)
elif isinstance(arg, bytes):
type_tags += "b"
arg_bytes += _encode_blob(arg)
elif arg is None:
type_tags += "N"
return _encode_string(address) + _encode_string(type_tags) + arg_bytes
# ---------------------------------------------------------------------------
# OSC decoding
# ---------------------------------------------------------------------------
def decode_message(data: bytes) -> Tuple[str, List[Any]]:
offset = 0
# address
end = data.index(b"\x00", offset)
address = data[offset:end].decode("utf-8")
offset = _pad4(end + 1)
if offset >= len(data) or data[offset:offset + 1] != b",":
return address, []
# type tags
end = data.index(b"\x00", offset)
type_tags = data[offset + 1:end].decode("utf-8")
offset = _pad4(end + 1)
args: List[Any] = []
for tag in type_tags:
if tag == "i":
args.append(struct.unpack(">i", data[offset:offset + 4])[0])
offset += 4
elif tag == "f":
args.append(struct.unpack(">f", data[offset:offset + 4])[0])
offset += 4
elif tag == "s":
end = data.index(b"\x00", offset)
args.append(data[offset:end].decode("utf-8"))
offset = _pad4(end + 1)
elif tag == "b":
length = struct.unpack(">I", data[offset:offset + 4])[0]
offset += 4
args.append(data[offset:offset + length])
offset += _pad4(length)
elif tag == "T":
args.append(True)
elif tag == "F":
args.append(False)
elif tag in ("N", "n"):
args.append(None)
elif tag == "I":
args.append(float("inf"))
elif tag in ("h", "t"):
args.append(struct.unpack(">q", data[offset:offset + 8])[0])
offset += 8
elif tag == "d":
args.append(struct.unpack(">d", data[offset:offset + 8])[0])
offset += 8
elif tag == "c":
args.append(chr(struct.unpack(">I", data[offset:offset + 4])[0]))
offset += 4
return address, args
# ---------------------------------------------------------------------------
# OSC Server
# ---------------------------------------------------------------------------
class OSCServer:
def __init__(self, listen_port: int = LISTEN_PORT, send_port: int = SEND_PORT):
self.listen_port = listen_port
self.send_port = send_port
self._handlers: Dict[str, Callable] = {}
self._sock: Optional[socket.socket] = None
self._remote_addr: Optional[str] = None
# ------------------------------------------------------------------
def start(self) -> None:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(("0.0.0.0", self.listen_port))
self._sock.setblocking(False)
logger.info("OSC server listening on port %d", self.listen_port)
def stop(self) -> None:
if self._sock:
self._sock.close()
self._sock = None
# ------------------------------------------------------------------
def add_handler(self, address: str, callback: Callable) -> None:
self._handlers[address] = callback
def remove_handler(self, address: str) -> None:
self._handlers.pop(address, None)
# ------------------------------------------------------------------
def process(self) -> None:
"""Call from Ableton's update_display tick."""
if not self._sock:
return
try:
while True:
data, addr = self._sock.recvfrom(MAX_PACKET)
self._remote_addr = addr[0]
self._dispatch(data)
except BlockingIOError:
pass
except Exception as e:
logger.error("OSC recv error: %s", e)
def _dispatch(self, data: bytes) -> None:
if data[:8] == b"#bundle\x00":
self._dispatch_bundle(data)
return
try:
address, args = decode_message(data)
except Exception as e:
logger.warning("OSC decode error: %s", e)
return
self._call(address, args)
def _dispatch_bundle(self, data: bytes) -> None:
offset = 16 # skip "#bundle\0" + timetag
while offset < len(data):
size = struct.unpack(">I", data[offset:offset + 4])[0]
offset += 4
self._dispatch(data[offset:offset + size])
offset += size
def _call(self, address: str, args: List[Any]) -> None:
handler = self._handlers.get(address)
if handler is None:
# try prefix match (e.g. wildcard handlers)
for pattern, h in self._handlers.items():
if pattern.endswith("*") and address.startswith(pattern[:-1]):
handler = h
break
if handler is None:
logger.debug("No handler for %s", address)
return
try:
result = handler(tuple(args))
if result is not None:
self.send(address, result)
except Exception as e:
logger.error("Handler error for %s: %s", address, e)
self.send("/live/error", (str(e),))
# ------------------------------------------------------------------
def send(self, address: str, args: tuple, target_ip: Optional[str] = None) -> None:
if not self._sock:
return
ip = target_ip or self._remote_addr
if not ip:
return
try:
data = encode_message(address, args)
self._sock.sendto(data, (ip, self.send_port))
except Exception as e:
logger.error("OSC send error: %s", e)
def send_all_clients(self, address: str, args: tuple) -> None:
"""Broadcast to all known clients (currently just the last sender)."""
self.send(address, args)
+165
View File
@@ -0,0 +1,165 @@
"""Handles /live/return_track/* OSC addresses (mirrors TrackHandler for return tracks)."""
import logging
from typing import Any, List, Optional, Tuple
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
PROPS_RW = ["name", "mute", "solo", "color", "color_index"]
PROPS_RO = ["output_meter_level", "output_meter_left", "output_meter_right"]
class ReturnTrackHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in PROPS_RW:
self._add(f"/live/return_track/get/{prop}", self._make_getter(prop))
self._add(f"/live/return_track/set/{prop}", self._make_setter(prop))
self._add(f"/live/return_track/start_listen/{prop}",
self._make_start_listen(prop))
self._add(f"/live/return_track/stop_listen/{prop}",
self._make_stop_listen(prop))
for prop in PROPS_RO:
self._add(f"/live/return_track/get/{prop}", self._make_getter(prop))
self._add(f"/live/return_track/start_listen/{prop}",
self._make_start_listen(prop))
self._add(f"/live/return_track/stop_listen/{prop}",
self._make_stop_listen(prop))
self._add("/live/return_track/get/volume", self._get_volume)
self._add("/live/return_track/set/volume", self._set_volume)
self._add("/live/return_track/get/panning", self._get_panning)
self._add("/live/return_track/set/panning", self._set_panning)
self._add("/live/return_track/get/send", self._get_send)
self._add("/live/return_track/set/send", self._set_send)
self._add("/live/return_track/get/num_devices", self._get_num_devices)
self._add("/live/return_track/get/devices/name", self._get_devices_name)
# ------------------------------------------------------------------
def _get_return_tracks(self, params) -> List[Tuple[int, Any]]:
tracks = list(self.song.return_tracks)
if not params or params[0] == "*":
return list(enumerate(tracks))
idx = int(params[0])
return [(idx, tracks[idx])] if 0 <= idx < len(tracks) else []
def _get_return_track(self, params) -> Optional[Any]:
tracks = list(self.song.return_tracks)
if not params:
return None
idx = int(params[0])
return tracks[idx] if 0 <= idx < len(tracks) else None
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_return_tracks(params):
val = self._get_prop(track, prop)
if val is not None:
result += [idx, val]
return tuple(result) if result else None
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_return_track(params)
if track:
self._set_prop(track, prop, params[1])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, track in self._get_return_tracks(params):
self._register_listener(
f"return_track.{idx}.{prop}", track, prop,
f"/live/return_track/get/{prop}", (idx,)
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, _ in self._get_return_tracks(params):
self._remove_listener(f"return_track.{idx}.{prop}")
return handler
def _get_volume(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_return_tracks(params):
try:
result += [idx, track.mixer_device.volume.value]
except Exception:
pass
return tuple(result) if result else None
def _set_volume(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_return_track(params)
if track:
try:
track.mixer_device.volume.value = float(params[1])
except Exception as e:
logger.warning("return_track set volume: %s", e)
return None
def _get_panning(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_return_tracks(params):
try:
result += [idx, track.mixer_device.panning.value]
except Exception:
pass
return tuple(result) if result else None
def _set_panning(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_return_track(params)
if track:
try:
track.mixer_device.panning.value = float(params[1])
except Exception as e:
logger.warning("return_track set panning: %s", e)
return None
def _get_send(self, params: tuple) -> Optional[tuple]:
if len(params) < 2:
return None
track = self._get_return_track(params)
send_idx = int(params[1])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
return (int(params[0]), send_idx, sends[send_idx].value)
except Exception as e:
logger.warning("return_track get send: %s", e)
return None
def _set_send(self, params: tuple) -> None:
if len(params) < 3:
return None
track = self._get_return_track(params)
send_idx = int(params[1])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
sends[send_idx].value = float(params[2])
except Exception as e:
logger.warning("return_track set send: %s", e)
return None
def _get_num_devices(self, params: tuple) -> Optional[tuple]:
track = self._get_return_track(params)
return (len(track.devices),) if track else None
def _get_devices_name(self, params: tuple) -> Optional[tuple]:
track = self._get_return_track(params)
return tuple(d.name for d in track.devices) if track else None
+111
View File
@@ -0,0 +1,111 @@
"""Handles /live/scene/* OSC addresses."""
import logging
from typing import Any, Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
SCENE_PROPS_RW = [
"name", "color", "color_index",
"tempo", "tempo_enabled",
"time_signature_numerator", "time_signature_denominator",
"time_signature_enabled",
]
SCENE_PROPS_RO = ["is_empty", "is_triggered"]
class SceneHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in SCENE_PROPS_RW:
self._add(f"/live/scene/get/{prop}", self._make_getter(prop))
self._add(f"/live/scene/set/{prop}", self._make_setter(prop))
self._add(f"/live/scene/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/scene/stop_listen/{prop}", self._make_stop_listen(prop))
for prop in SCENE_PROPS_RO:
self._add(f"/live/scene/get/{prop}", self._make_getter(prop))
self._add(f"/live/scene/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/scene/stop_listen/{prop}", self._make_stop_listen(prop))
self._add("/live/scene/fire", self._fire)
self._add("/live/scene/fire_as_selected", self._fire_as_selected)
self._add("/live/scene/get/num_clips", self._get_num_clips)
self._add("/live/scene/get/clip_slots/has_clip", self._get_has_clips)
# ------------------------------------------------------------------
def _get_scene(self, params: tuple) -> Optional[Any]:
if not params:
return None
idx = int(params[0])
scenes = list(self.song.scenes)
return scenes[idx] if 0 <= idx < len(scenes) else None
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
scene = self._get_scene(params)
if scene is None:
return None
val = self._get_prop(scene, prop)
return (int(params[0]), val) if val is not None else None
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 2:
return None
scene = self._get_scene(params)
if scene:
self._set_prop(scene, prop, params[1])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
scene = self._get_scene(params)
if scene:
self._register_listener(
f"scene.{params[0]}.{prop}", scene, prop,
f"/live/scene/get/{prop}", (int(params[0]),)
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
if params:
self._remove_listener(f"scene.{params[0]}.{prop}")
return handler
def _fire(self, params: tuple) -> None:
scene = self._get_scene(params)
if scene:
try:
scene.fire()
except Exception as e:
logger.warning("scene.fire: %s", e)
return None
def _fire_as_selected(self, params: tuple) -> None:
scene = self._get_scene(params)
if scene:
try:
scene.fire_as_selected()
except Exception as e:
logger.warning("scene.fire_as_selected: %s", e)
return None
def _get_num_clips(self, params: tuple) -> Optional[tuple]:
scene = self._get_scene(params)
if scene is None:
return None
count = sum(1 for slot in scene.clip_slots if slot.has_clip)
return (int(params[0]), count)
def _get_has_clips(self, params: tuple) -> Optional[tuple]:
scene = self._get_scene(params)
if scene is None:
return None
return (int(params[0]),) + tuple(bool(slot.has_clip) for slot in scene.clip_slots)
+275
View File
@@ -0,0 +1,275 @@
"""Handles /live/song/* OSC addresses."""
import logging
from typing import Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
# Properties that are readable and writable
RW_PROPS = [
"tempo",
"time_signature_numerator",
"time_signature_denominator",
"loop",
"loop_start",
"loop_length",
"current_song_time",
"metronome",
"record_mode",
"arrangement_overdub",
"session_record",
"overdub",
"groove_amount",
"swing_amount",
"clip_trigger_quantization",
"midi_recording_quantization",
"punch_in",
"punch_out",
"exclusive_arm",
"exclusive_solo",
]
# Properties that are read-only
RO_PROPS = [
"is_playing",
"can_undo",
"can_redo",
"song_length",
"session_record_status",
]
class SongHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
song = self.song
# --- get ---
for prop in RW_PROPS + RO_PROPS:
self._add(f"/live/song/get/{prop}", self._make_prop_getter(prop))
# --- set ---
for prop in RW_PROPS:
self._add(f"/live/song/set/{prop}", self._make_prop_setter(prop))
# --- listeners ---
for prop in RW_PROPS + RO_PROPS:
self._add(f"/live/song/start_listen/{prop}",
self._make_start_listen(prop))
self._add(f"/live/song/stop_listen/{prop}",
self._make_stop_listen(prop))
# --- aggregate queries ---
self._add("/live/song/get/num_tracks", self._get_num_tracks)
self._add("/live/song/get/num_scenes", self._get_num_scenes)
self._add("/live/song/get/num_return_tracks", self._get_num_return_tracks)
self._add("/live/song/get/track_names", self._get_track_names)
self._add("/live/song/get/scene_names", self._get_scene_names)
self._add("/live/song/get/return_track_names", self._get_return_track_names)
self._add("/live/song/get/cue_points", self._get_cue_points)
# --- method calls ---
self._add("/live/song/start_playing", lambda p: self._call("start_playing"))
self._add("/live/song/stop_playing", lambda p: self._call("stop_playing"))
self._add("/live/song/continue_playing", lambda p: self._call("continue_playing"))
self._add("/live/song/stop_all_clips", lambda p: self._call("stop_all_clips"))
self._add("/live/song/undo", lambda p: self._call("undo"))
self._add("/live/song/redo", lambda p: self._call("redo"))
self._add("/live/song/tap_tempo", lambda p: self._call("tap_tempo"))
self._add("/live/song/trigger_session_record",
lambda p: self._call("trigger_session_record"))
self._add("/live/song/re_enable_automation",
lambda p: self._call("re_enable_automation"))
self._add("/live/song/jump_by", self._jump_by)
self._add("/live/song/jump_to_next_cue", lambda p: self._call("jump_to_next_cue"))
self._add("/live/song/jump_to_prev_cue", lambda p: self._call("jump_to_prev_cue"))
self._add("/live/song/capture_midi", lambda p: self._call("capture_midi"))
# --- track/scene creation and deletion ---
self._add("/live/song/create_audio_track", self._create_audio_track)
self._add("/live/song/create_midi_track", self._create_midi_track)
self._add("/live/song/create_return_track",
lambda p: self._call("create_return_track"))
self._add("/live/song/create_scene", self._create_scene)
self._add("/live/song/delete_track", self._delete_track)
self._add("/live/song/delete_scene", self._delete_scene)
self._add("/live/song/delete_return_track", self._delete_return_track)
self._add("/live/song/duplicate_track", self._duplicate_track)
self._add("/live/song/duplicate_scene", self._duplicate_scene)
# --- beat listener (special) ---
self._add("/live/song/start_listen/beat", self._start_beat_listen)
self._add("/live/song/stop_listen/beat", self._stop_beat_listen)
# ------------------------------------------------------------------
# Factories
# ------------------------------------------------------------------
def _make_prop_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
val = self._get_prop(self.song, prop)
return (val,) if val is not None else None
return handler
def _make_prop_setter(self, prop: str):
def handler(params: tuple) -> None:
if params:
self._set_prop(self.song, prop, params[0])
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
self._register_listener(
f"song.{prop}", self.song, prop, f"/live/song/get/{prop}"
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
self._remove_listener(f"song.{prop}")
return handler
# ------------------------------------------------------------------
# Aggregate queries
# ------------------------------------------------------------------
def _get_num_tracks(self, params: tuple) -> tuple:
return (len(self.song.tracks),)
def _get_num_scenes(self, params: tuple) -> tuple:
return (len(self.song.scenes),)
def _get_num_return_tracks(self, params: tuple) -> tuple:
return (len(self.song.return_tracks),)
def _get_track_names(self, params: tuple) -> tuple:
start = int(params[0]) if len(params) > 0 else 0
end = int(params[1]) if len(params) > 1 else len(self.song.tracks)
tracks = list(self.song.tracks)[start:end]
return tuple(t.name for t in tracks)
def _get_scene_names(self, params: tuple) -> tuple:
start = int(params[0]) if len(params) > 0 else 0
end = int(params[1]) if len(params) > 1 else len(self.song.scenes)
scenes = list(self.song.scenes)[start:end]
return tuple(s.name for s in scenes)
def _get_return_track_names(self, params: tuple) -> tuple:
return tuple(t.name for t in self.song.return_tracks)
def _get_cue_points(self, params: tuple) -> tuple:
result = []
for cp in self.song.cue_points:
result.append(cp.name)
result.append(float(cp.time))
return tuple(result)
# ------------------------------------------------------------------
# Methods
# ------------------------------------------------------------------
def _call(self, method: str) -> None:
try:
getattr(self.song, method)()
except Exception as e:
logger.warning("song.%s(): %s", method, e)
return None
def _jump_by(self, params: tuple) -> None:
if params:
try:
self.song.jump_by(float(params[0]))
except Exception as e:
logger.warning("song.jump_by: %s", e)
return None
# --- track/scene CRUD ---
def _create_audio_track(self, params: tuple) -> tuple:
idx = int(params[0]) if params else -1
try:
self.song.create_audio_track(idx)
except Exception as e:
logger.warning("create_audio_track: %s", e)
return (len(self.song.tracks),)
def _create_midi_track(self, params: tuple) -> tuple:
idx = int(params[0]) if params else -1
try:
self.song.create_midi_track(idx)
except Exception as e:
logger.warning("create_midi_track: %s", e)
return (len(self.song.tracks),)
def _create_scene(self, params: tuple) -> tuple:
idx = int(params[0]) if params else -1
try:
self.song.create_scene(idx)
except Exception as e:
logger.warning("create_scene: %s", e)
return (len(self.song.scenes),)
def _delete_track(self, params: tuple) -> None:
if params:
try:
self.song.delete_track(int(params[0]))
except Exception as e:
logger.warning("delete_track: %s", e)
return None
def _delete_scene(self, params: tuple) -> None:
if params:
try:
self.song.delete_scene(int(params[0]))
except Exception as e:
logger.warning("delete_scene: %s", e)
return None
def _delete_return_track(self, params: tuple) -> None:
if params:
try:
self.song.delete_return_track(int(params[0]))
except Exception as e:
logger.warning("delete_return_track: %s", e)
return None
def _duplicate_track(self, params: tuple) -> None:
if params:
try:
self.song.duplicate_track(int(params[0]))
except Exception as e:
logger.warning("duplicate_track: %s", e)
return None
def _duplicate_scene(self, params: tuple) -> None:
if params:
try:
self.song.duplicate_scene(int(params[0]))
except Exception as e:
logger.warning("duplicate_scene: %s", e)
return None
# ------------------------------------------------------------------
# Beat listener
# ------------------------------------------------------------------
def _start_beat_listen(self, params: tuple) -> None:
self._remove_listener("song.beat")
self._beat_count = -1
def on_beat():
beat = int(self.song.get_current_beats_song_time().beats)
if beat != self._beat_count:
self._beat_count = beat
self._send("/live/song/get/beat", (beat,))
try:
self.song.add_current_song_time_listener(on_beat)
self._listener_store["song.beat"] = (
self.song.remove_current_song_time_listener, on_beat
)
except Exception as e:
logger.warning("beat listener: %s", e)
def _stop_beat_listen(self, params: tuple) -> None:
self._remove_listener("song.beat")
+574
View File
@@ -0,0 +1,574 @@
"""Handles /live/track/* OSC addresses."""
import logging
from typing import Any, List, Optional, Tuple
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
TRACK_PROPS_RW = [
"arm", "mute", "solo", "name", "color", "color_index",
"fold_state", "current_monitoring_state",
]
TRACK_PROPS_RO = [
"can_be_armed", "has_audio_input", "has_audio_output",
"has_midi_input", "has_midi_output",
"is_foldable", "is_grouped", "is_visible",
"fired_slot_index", "playing_slot_index",
"output_meter_level", "output_meter_left", "output_meter_right",
"output_meter_peak_left", "output_meter_peak_right",
]
class TrackHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
for prop in TRACK_PROPS_RW:
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
self._add(f"/live/track/set/{prop}", self._make_setter(prop))
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
for prop in TRACK_PROPS_RO:
self._add(f"/live/track/get/{prop}", self._make_getter(prop))
self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop))
self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop))
# Mixer device (volume, panning, sends)
self._add("/live/track/get/volume", self._get_volume)
self._add("/live/track/set/volume", self._set_volume)
self._add("/live/track/start_listen/volume", self._start_listen_volume)
self._add("/live/track/stop_listen/volume", self._stop_listen_volume)
self._add("/live/track/get/panning", self._get_panning)
self._add("/live/track/set/panning", self._set_panning)
self._add("/live/track/start_listen/panning", self._start_listen_panning)
self._add("/live/track/stop_listen/panning", self._stop_listen_panning)
self._add("/live/track/get/send", self._get_send)
self._add("/live/track/set/send", self._set_send)
self._add("/live/track/start_listen/send", self._start_listen_send)
self._add("/live/track/stop_listen/send", self._stop_listen_send)
# Routing
self._add("/live/track/get/available_input_routing_types",
self._get_available_input_routing_types)
self._add("/live/track/get/available_input_routing_channels",
self._get_available_input_routing_channels)
self._add("/live/track/get/input_routing_type", self._get_input_routing_type)
self._add("/live/track/set/input_routing_type", self._set_input_routing_type)
self._add("/live/track/get/input_routing_channel", self._get_input_routing_channel)
self._add("/live/track/set/input_routing_channel", self._set_input_routing_channel)
self._add("/live/track/get/available_output_routing_types",
self._get_available_output_routing_types)
self._add("/live/track/get/available_output_routing_channels",
self._get_available_output_routing_channels)
self._add("/live/track/get/output_routing_type", self._get_output_routing_type)
self._add("/live/track/set/output_routing_type", self._set_output_routing_type)
self._add("/live/track/get/output_routing_channel", self._get_output_routing_channel)
self._add("/live/track/set/output_routing_channel", self._set_output_routing_channel)
# Aggregate
self._add("/live/track/get/clips/name", self._get_clips_name)
self._add("/live/track/get/clips/length", self._get_clips_length)
self._add("/live/track/get/clips/color", self._get_clips_color)
self._add("/live/track/get/clips/is_playing", self._get_clips_is_playing)
self._add("/live/track/get/arrangement_clips/name", self._get_arr_clips_name)
self._add("/live/track/get/arrangement_clips/length", self._get_arr_clips_length)
self._add("/live/track/get/arrangement_clips/start_time", self._get_arr_clips_start)
self._add("/live/track/get/num_devices", self._get_num_devices)
self._add("/live/track/get/devices/name", self._get_devices_name)
self._add("/live/track/get/devices/type", self._get_devices_type)
self._add("/live/track/get/devices/class_name", self._get_devices_class_name)
self._add("/live/track/get/devices/can_have_chains", self._get_devices_can_have_chains)
# Methods
self._add("/live/track/stop_all_clips", self._stop_all_clips)
self._add("/live/track/delete_device", self._delete_device)
self._add("/live/track/duplicate_device", self._duplicate_device)
# ------------------------------------------------------------------
# Track resolution (supports wildcard *)
# ------------------------------------------------------------------
def _get_tracks(self, params: tuple) -> List[Tuple[int, Any]]:
tracks = list(self.song.tracks)
if not params or params[0] == "*":
return list(enumerate(tracks))
idx = int(params[0])
if 0 <= idx < len(tracks):
return [(idx, tracks[idx])]
return []
def _get_track(self, params: tuple) -> Optional[Any]:
tracks = list(self.song.tracks)
if not params:
return None
idx = int(params[0])
if 0 <= idx < len(tracks):
return tracks[idx]
return None
# ------------------------------------------------------------------
# Property factories
# ------------------------------------------------------------------
def _make_getter(self, prop: str):
def handler(params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
val = self._get_prop(track, prop)
if val is not None:
result += [idx, val]
return tuple(result) if result else None
return handler
def _make_setter(self, prop: str):
def handler(params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
self._set_prop(track, prop, params[1])
return None
return handler
def _make_start_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.{prop}"
self._register_listener(
key, track, prop,
f"/live/track/get/{prop}", (idx,)
)
return handler
def _make_stop_listen(self, prop: str):
def handler(params: tuple) -> None:
for idx, track in self._get_tracks(params):
self._remove_listener(f"track.{idx}.{prop}")
return handler
# ------------------------------------------------------------------
# Mixer device
# ------------------------------------------------------------------
def _get_volume(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
try:
val = track.mixer_device.volume.value
result += [idx, val]
except Exception:
pass
return tuple(result) if result else None
def _set_volume(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
try:
track.mixer_device.volume.value = float(params[1])
except Exception as e:
logger.warning("set volume: %s", e)
return None
def _start_listen_volume(self, params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.volume"
self._remove_listener(key)
param = track.mixer_device.volume
def make_cb(i, p):
def cb():
self._send("/live/track/get/volume", (i, p.value))
return cb
cb = make_cb(idx, param)
try:
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("volume listener: %s", e)
def _stop_listen_volume(self, params: tuple) -> None:
for idx, _ in self._get_tracks(params):
self._remove_listener(f"track.{idx}.volume")
def _get_panning(self, params: tuple) -> Optional[tuple]:
result = []
for idx, track in self._get_tracks(params):
try:
val = track.mixer_device.panning.value
result += [idx, val]
except Exception:
pass
return tuple(result) if result else None
def _set_panning(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
if track:
try:
track.mixer_device.panning.value = float(params[1])
except Exception as e:
logger.warning("set panning: %s", e)
return None
def _start_listen_panning(self, params: tuple) -> None:
for idx, track in self._get_tracks(params):
key = f"track.{idx}.panning"
self._remove_listener(key)
param = track.mixer_device.panning
def make_cb(i, p):
def cb():
self._send("/live/track/get/panning", (i, p.value))
return cb
cb = make_cb(idx, param)
try:
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("panning listener: %s", e)
def _stop_listen_panning(self, params: tuple) -> None:
for idx, _ in self._get_tracks(params):
self._remove_listener(f"track.{idx}.panning")
# ------------------------------------------------------------------
# Sends
# ------------------------------------------------------------------
def _get_send(self, params: tuple) -> Optional[tuple]:
if len(params) < 2:
return None
track = self._get_track(params)
send_idx = int(params[1])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
return (int(params[0]), send_idx, sends[send_idx].value)
except Exception as e:
logger.warning("get send: %s", e)
return None
def _set_send(self, params: tuple) -> None:
if len(params) < 3:
return None
track = self._get_track(params)
send_idx = int(params[1])
value = float(params[2])
if track:
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
sends[send_idx].value = value
except Exception as e:
logger.warning("set send: %s", e)
return None
def _start_listen_send(self, params: tuple) -> None:
if len(params) < 2:
return
track = self._get_track(params)
track_idx = int(params[0])
send_idx = int(params[1])
if track:
key = f"track.{track_idx}.send.{send_idx}"
self._remove_listener(key)
try:
sends = list(track.mixer_device.sends)
if 0 <= send_idx < len(sends):
param = sends[send_idx]
def make_cb(ti, si, p):
def cb():
self._send("/live/track/get/send", (ti, si, p.value))
return cb
cb = make_cb(track_idx, send_idx, param)
param.add_value_listener(cb)
self._listener_store[key] = (param.remove_value_listener, cb)
cb()
except Exception as e:
logger.warning("send listener: %s", e)
def _stop_listen_send(self, params: tuple) -> None:
if len(params) >= 2:
self._remove_listener(f"track.{int(params[0])}.send.{int(params[1])}")
# ------------------------------------------------------------------
# Routing
# ------------------------------------------------------------------
def _get_available_input_routing_types(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
types = [rt.display_name for rt in track.available_input_routing_types]
return tuple(types)
except Exception as e:
logger.warning("input routing types: %s", e)
return None
def _get_available_input_routing_channels(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
channels = [rc.display_name for rc in track.available_input_routing_channels]
return tuple(channels)
except Exception as e:
logger.warning("input routing channels: %s", e)
return None
def _get_input_routing_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.input_routing_type.display_name,)
except Exception as e:
logger.warning("get input_routing_type: %s", e)
return None
def _set_input_routing_type(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rt in track.available_input_routing_types:
if rt.display_name == name:
track.input_routing_type = rt
break
except Exception as e:
logger.warning("set input_routing_type: %s", e)
return None
def _get_input_routing_channel(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.input_routing_channel.display_name,)
except Exception as e:
logger.warning("get input_routing_channel: %s", e)
return None
def _set_input_routing_channel(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rc in track.available_input_routing_channels:
if rc.display_name == name:
track.input_routing_channel = rc
break
except Exception as e:
logger.warning("set input_routing_channel: %s", e)
return None
def _get_available_output_routing_types(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
types = [rt.display_name for rt in track.available_output_routing_types]
return tuple(types)
except Exception as e:
logger.warning("output routing types: %s", e)
return None
def _get_available_output_routing_channels(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
channels = [rc.display_name for rc in track.available_output_routing_channels]
return tuple(channels)
except Exception as e:
logger.warning("output routing channels: %s", e)
return None
def _get_output_routing_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.output_routing_type.display_name,)
except Exception as e:
logger.warning("get output_routing_type: %s", e)
return None
def _set_output_routing_type(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rt in track.available_output_routing_types:
if rt.display_name == name:
track.output_routing_type = rt
break
except Exception as e:
logger.warning("set output_routing_type: %s", e)
return None
def _get_output_routing_channel(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return (track.output_routing_channel.display_name,)
except Exception as e:
logger.warning("get output_routing_channel: %s", e)
return None
def _set_output_routing_channel(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
name = str(params[1])
if track:
try:
for rc in track.available_output_routing_channels:
if rc.display_name == name:
track.output_routing_channel = rc
break
except Exception as e:
logger.warning("set output_routing_channel: %s", e)
return None
# ------------------------------------------------------------------
# Aggregate clip info
# ------------------------------------------------------------------
def _get_session_clips(self, track):
return [slot.clip for slot in track.clip_slots if slot.has_clip]
def _get_clips_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(c.name for c in self._get_session_clips(track))
return None
def _get_clips_length(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(float(c.length) for c in self._get_session_clips(track))
return None
def _get_clips_color(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(c.color for c in self._get_session_clips(track))
return None
def _get_clips_is_playing(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(bool(c.is_playing) for c in self._get_session_clips(track))
return None
def _get_arr_clips_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(c.name for c in track.arrangement_clips)
except Exception:
return None
return None
def _get_arr_clips_length(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(float(c.length) for c in track.arrangement_clips)
except Exception:
return None
return None
def _get_arr_clips_start(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
try:
return tuple(float(c.start_time) for c in track.arrangement_clips)
except Exception:
return None
return None
# ------------------------------------------------------------------
# Device info
# ------------------------------------------------------------------
def _get_num_devices(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return (len(track.devices),)
return None
def _get_devices_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(d.name for d in track.devices)
return None
def _get_devices_type(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(int(d.type) for d in track.devices)
return None
def _get_devices_class_name(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(d.class_name for d in track.devices)
return None
def _get_devices_can_have_chains(self, params: tuple) -> Optional[tuple]:
track = self._get_track(params)
if track:
return tuple(bool(getattr(d, "can_have_chains", False)) for d in track.devices)
return None
# ------------------------------------------------------------------
# Methods
# ------------------------------------------------------------------
def _stop_all_clips(self, params: tuple) -> None:
for _, track in self._get_tracks(params):
try:
track.stop_all_clips()
except Exception as e:
logger.warning("stop_all_clips: %s", e)
return None
def _delete_device(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
device_idx = int(params[1])
if track:
try:
track.delete_device(device_idx)
except Exception as e:
logger.warning("delete_device: %s", e)
return None
def _duplicate_device(self, params: tuple) -> None:
if len(params) < 2:
return None
track = self._get_track(params)
device_idx = int(params[1])
if track:
try:
track.duplicate_device(device_idx)
except Exception as e:
logger.warning("duplicate_device: %s", e)
return None
+216
View File
@@ -0,0 +1,216 @@
"""Handles /live/view/* OSC addresses (UI navigation)."""
import logging
from typing import Optional
from .handler import AbletonOSCHandler
logger = logging.getLogger(__name__)
class ViewHandler(AbletonOSCHandler):
def init_api(self) -> None:
self.clear_listeners()
view = self._song_view()
# selected scene
self._add("/live/view/get/selected_scene", self._get_selected_scene)
self._add("/live/view/set/selected_scene", self._set_selected_scene)
self._add("/live/view/start_listen/selected_scene", self._start_listen_selected_scene)
self._add("/live/view/stop_listen/selected_scene", self._stop_listen_selected_scene)
# selected track
self._add("/live/view/get/selected_track", self._get_selected_track)
self._add("/live/view/set/selected_track", self._set_selected_track)
self._add("/live/view/start_listen/selected_track", self._start_listen_selected_track)
self._add("/live/view/stop_listen/selected_track", self._stop_listen_selected_track)
# selected clip
self._add("/live/view/get/selected_clip", self._get_selected_clip)
self._add("/live/view/set/selected_clip", self._set_selected_clip)
# selected device
self._add("/live/view/get/selected_device", self._get_selected_device)
# focus
self._add("/live/view/show_clip_detail_view", self._show_clip_detail_view)
self._add("/live/view/show_device_detail_view", self._show_device_detail_view)
self._add("/live/view/focus_browser", self._focus_browser)
# arrangement / session toggle
self._add("/live/view/get/focused_document_view", self._get_focused_document_view)
# ------------------------------------------------------------------
def _song_view(self):
try:
return self.song.view
except Exception:
return None
def _get_selected_scene(self, params: tuple) -> Optional[tuple]:
try:
view = self._song_view()
scenes = list(self.song.scenes)
scene = view.selected_scene
idx = scenes.index(scene)
return (idx,)
except Exception as e:
logger.warning("get selected_scene: %s", e)
return None
def _set_selected_scene(self, params: tuple) -> None:
if not params:
return None
try:
idx = int(params[0])
scenes = list(self.song.scenes)
if 0 <= idx < len(scenes):
self._song_view().selected_scene = scenes[idx]
except Exception as e:
logger.warning("set selected_scene: %s", e)
return None
def _start_listen_selected_scene(self, params: tuple) -> None:
view = self._song_view()
if view is None:
return
key = "view.selected_scene"
self._remove_listener(key)
def cb():
result = self._get_selected_scene(())
if result:
self._send("/live/view/get/selected_scene", result)
try:
view.add_selected_scene_listener(cb)
self._listener_store[key] = (view.remove_selected_scene_listener, cb)
cb()
except Exception as e:
logger.warning("selected_scene listener: %s", e)
def _stop_listen_selected_scene(self, params: tuple) -> None:
self._remove_listener("view.selected_scene")
def _get_selected_track(self, params: tuple) -> Optional[tuple]:
try:
view = self._song_view()
all_tracks = list(self.song.tracks) + list(self.song.return_tracks)
track = view.selected_track
idx = all_tracks.index(track)
return (idx,)
except Exception as e:
logger.warning("get selected_track: %s", e)
return None
def _set_selected_track(self, params: tuple) -> None:
if not params:
return None
try:
idx = int(params[0])
all_tracks = list(self.song.tracks) + list(self.song.return_tracks)
if 0 <= idx < len(all_tracks):
self._song_view().selected_track = all_tracks[idx]
except Exception as e:
logger.warning("set selected_track: %s", e)
return None
def _start_listen_selected_track(self, params: tuple) -> None:
view = self._song_view()
if view is None:
return
key = "view.selected_track"
self._remove_listener(key)
def cb():
result = self._get_selected_track(())
if result:
self._send("/live/view/get/selected_track", result)
try:
view.add_selected_track_listener(cb)
self._listener_store[key] = (view.remove_selected_track_listener, cb)
cb()
except Exception as e:
logger.warning("selected_track listener: %s", e)
def _stop_listen_selected_track(self, params: tuple) -> None:
self._remove_listener("view.selected_track")
def _get_selected_clip(self, params: tuple) -> Optional[tuple]:
try:
view = self._song_view()
clip_view = view.detail_clip
if clip_view is None:
return None
tracks = list(self.song.tracks)
for ti, track in enumerate(tracks):
for ci, slot in enumerate(track.clip_slots):
if slot.has_clip and slot.clip == clip_view:
return (ti, ci)
except Exception as e:
logger.warning("get selected_clip: %s", e)
return None
def _set_selected_clip(self, params: tuple) -> None:
if len(params) < 2:
return None
try:
track_idx = int(params[0])
clip_idx = int(params[1])
tracks = list(self.song.tracks)
if 0 <= track_idx < len(tracks):
slots = list(tracks[track_idx].clip_slots)
if 0 <= clip_idx < len(slots) and slots[clip_idx].has_clip:
self._song_view().detail_clip = slots[clip_idx].clip
except Exception as e:
logger.warning("set selected_clip: %s", e)
return None
def _get_selected_device(self, params: tuple) -> Optional[tuple]:
try:
view = self._song_view()
track = view.selected_track
tracks = list(self.song.tracks) + list(self.song.return_tracks)
track_idx = tracks.index(track)
selected = track.view.selected_device
if selected is None:
return None
devices = list(track.devices)
device_idx = devices.index(selected)
return (track_idx, device_idx)
except Exception as e:
logger.warning("get selected_device: %s", e)
return None
def _show_clip_detail_view(self, params: tuple) -> None:
try:
app = self.manager.application()
app.view.show_view("Detail/Clip")
except Exception as e:
logger.warning("show_clip_detail_view: %s", e)
return None
def _show_device_detail_view(self, params: tuple) -> None:
try:
app = self.manager.application()
app.view.show_view("Detail/DeviceChain")
except Exception as e:
logger.warning("show_device_detail_view: %s", e)
return None
def _focus_browser(self, params: tuple) -> None:
try:
app = self.manager.application()
app.view.show_view("Browser")
except Exception as e:
logger.warning("focus_browser: %s", e)
return None
def _get_focused_document_view(self, params: tuple) -> Optional[tuple]:
try:
app = self.manager.application()
view = app.view.focused_document_view
return (view,)
except Exception as e:
logger.warning("get focused_document_view: %s", e)
return None
+20
View File
@@ -0,0 +1,20 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.backends.legacy:build"
[project]
name = "ableton-osc"
version = "1.0.0"
description = "Full Ableton Live 11 OSC remote script"
requires-python = ">=3.8"
dependencies = []
[project.optional-dependencies]
dev = [
"python-osc>=1.8",
"pytest>=7",
]
[tool.setuptools.packages.find]
where = ["."]
include = ["AbletonOSC*"]