commit aad042650e006f1955318f4dd4560466fbb34d28 Author: Sebastian Krüger Date: Mon Jun 1 12:27:47 2026 +0200 Initial implementation: full Ableton Live 11 OSC remote script Covers all major Live API objects with get/set/listen/method handlers: song, track, return_track, clip, clip_slot, device (incl. rack chains and drum pads), scene, view, application, browser, groove. Zero external runtime dependencies — OSC encoded/decoded in osc_server.py. Wildcard * support for track/scene indices. Listener callbacks fire to matching /get/ addresses for bidirectional state sync. Co-Authored-By: Claude Sonnet 4.6 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3ba3dd3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +.venv/ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +dist/ +build/ +.pytest_cache/ diff --git a/AbletonOSC/__init__.py b/AbletonOSC/__init__.py new file mode 100644 index 0000000..4d64c26 --- /dev/null +++ b/AbletonOSC/__init__.py @@ -0,0 +1,9 @@ +""" +AbletonOSC — full Ableton Live 11 OSC remote script. +Ableton calls create_instance() to instantiate the control surface. +""" +from .manager import Manager + + +def create_instance(c_instance): + return Manager(c_instance) diff --git a/AbletonOSC/application.py b/AbletonOSC/application.py new file mode 100644 index 0000000..3318e75 --- /dev/null +++ b/AbletonOSC/application.py @@ -0,0 +1,51 @@ +"""Handles /live/application/* OSC addresses.""" +import logging +from typing import Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + + +class ApplicationHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + self._add("/live/application/get/version", self._get_version) + self._add("/live/application/get/average_process_usage", + self._get_average_process_usage) + self._add("/live/application/get/peak_process_usage", + self._get_peak_process_usage) + + def _app(self): + try: + return self.manager.application() + except Exception: + return None + + def _get_version(self, params: tuple) -> Optional[tuple]: + app = self._app() + if app: + try: + major, minor = app.get_major_version(), app.get_minor_version() + build = app.get_bugfix_version() + return (major, minor, build) + except Exception as e: + logger.warning("get version: %s", e) + return None + + def _get_average_process_usage(self, params: tuple) -> Optional[tuple]: + app = self._app() + if app: + try: + return (float(app.average_process_usage),) + except Exception as e: + logger.warning("get avg process usage: %s", e) + return None + + def _get_peak_process_usage(self, params: tuple) -> Optional[tuple]: + app = self._app() + if app: + try: + return (float(app.peak_process_usage),) + except Exception as e: + logger.warning("get peak process usage: %s", e) + return None diff --git a/AbletonOSC/browser.py b/AbletonOSC/browser.py new file mode 100644 index 0000000..7c3cfb1 --- /dev/null +++ b/AbletonOSC/browser.py @@ -0,0 +1,149 @@ +"""Handles /live/browser/* OSC addresses.""" +import logging +from typing import Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + + +def _iter_items(node, path_parts, depth=0): + """Recursively find a browser item by path.""" + if not path_parts: + return node + name = path_parts[0] + try: + children = list(node.children) + except Exception: + return None + for child in children: + if child.name == name: + return _iter_items(child, path_parts[1:], depth + 1) + return None + + +class BrowserHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + # Category listings (returns flat list of names) + for category in [ + "audio_effects", "instruments", "midi_effects", + "samples", "sounds", "clips", "packs", "plugins", + ]: + self._add(f"/live/browser/get/{category}", + self._make_category_lister(category)) + + # Load by path (e.g., "Instruments/Wavetable") + self._add("/live/browser/load_item", self._load_item) + self._add("/live/browser/preview_item", self._preview_item) + self._add("/live/browser/stop_preview", self._stop_preview) + + # Hotswap + self._add("/live/browser/get/hotswap_target", self._get_hotswap_target) + self._add("/live/browser/begin_hotswap", self._begin_hotswap) + + # ------------------------------------------------------------------ + + def _browser(self): + try: + return self.manager.application().browser + except Exception: + return None + + def _make_category_lister(self, category: str): + def handler(params: tuple) -> Optional[tuple]: + browser = self._browser() + if browser is None: + return None + try: + root = getattr(browser, category) + names = [item.name for item in root.children] + return tuple(names) + except Exception as e: + logger.warning("browser.%s: %s", category, e) + return None + return handler + + def _load_item(self, params: tuple) -> None: + """params: path_part1 [, path_part2, ...] -- e.g. 'Instruments' 'Wavetable'""" + if not params: + return None + browser = self._browser() + if browser is None: + return None + path = [str(p) for p in params] + # Try to find within all top-level categories + for category in [ + "audio_effects", "instruments", "midi_effects", + "samples", "sounds", "clips", "packs", "plugins", + ]: + try: + root = getattr(browser, category) + item = _iter_items(root, path) + if item is not None: + browser.load_item(item) + return None + except Exception: + continue + logger.warning("browser.load_item: item not found: %s", path) + return None + + def _preview_item(self, params: tuple) -> None: + if not params: + return None + browser = self._browser() + if browser is None: + return None + path = [str(p) for p in params] + for category in [ + "audio_effects", "instruments", "midi_effects", + "samples", "sounds", "clips", + ]: + try: + root = getattr(browser, category) + item = _iter_items(root, path) + if item is not None: + browser.preview_item(item) + return None + except Exception: + continue + return None + + def _stop_preview(self, params: tuple) -> None: + browser = self._browser() + if browser: + try: + browser.stop_preview() + except Exception as e: + logger.warning("stop_preview: %s", e) + return None + + def _get_hotswap_target(self, params: tuple) -> Optional[tuple]: + browser = self._browser() + if browser: + try: + target = browser.hotswap_target + if target is not None: + return (target.name,) + return ("",) + except Exception as e: + logger.warning("hotswap_target: %s", e) + return None + + def _begin_hotswap(self, params: tuple) -> None: + """params: track_idx, device_idx""" + if len(params) < 2: + return None + try: + track_idx = int(params[0]) + device_idx = int(params[1]) + tracks = list(self.song.tracks) + if 0 <= track_idx < len(tracks): + devices = list(tracks[track_idx].devices) + if 0 <= device_idx < len(devices): + browser = self._browser() + if browser: + browser.hotswap_target = devices[device_idx] + except Exception as e: + logger.warning("begin_hotswap: %s", e) + return None diff --git a/AbletonOSC/clip.py b/AbletonOSC/clip.py new file mode 100644 index 0000000..02888c4 --- /dev/null +++ b/AbletonOSC/clip.py @@ -0,0 +1,415 @@ +"""Handles /live/clip/* OSC addresses.""" +import logging +from typing import Any, Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +CLIP_PROPS_RW = [ + "name", "color", "color_index", + "muted", "gain", + "pitch_coarse", "pitch_fine", "velocity_amount", + "looping", "loop_start", "loop_end", + "start_marker", "end_marker", + "warping", "warp_mode", + "launch_mode", "launch_quantization", + "ram_mode", +] + +CLIP_PROPS_RO = [ + "is_playing", "is_recording", "is_midi_clip", "is_audio_clip", + "is_overdubbing", "is_triggered", "will_record_on_start", + "has_groove", + "length", "end_time", "start_time", + "playing_position", "sample_length", + "gain_display_string", + "file_path", +] + + +class ClipHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + for prop in CLIP_PROPS_RW: + self._add(f"/live/clip/get/{prop}", self._make_getter(prop)) + self._add(f"/live/clip/set/{prop}", self._make_setter(prop)) + self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop)) + + for prop in CLIP_PROPS_RO: + self._add(f"/live/clip/get/{prop}", self._make_getter(prop)) + self._add(f"/live/clip/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/clip/stop_listen/{prop}", self._make_stop_listen(prop)) + + # Playback + self._add("/live/clip/fire", self._fire) + self._add("/live/clip/stop", self._stop) + self._add("/live/clip/duplicate_loop", self._duplicate_loop) + self._add("/live/clip/quantize", self._quantize) + + # MIDI notes + self._add("/live/clip/get/notes", self._get_notes) + self._add("/live/clip/add/notes", self._add_notes) + self._add("/live/clip/remove/notes", self._remove_notes) + self._add("/live/clip/get/notes_extended", self._get_notes_extended) + self._add("/live/clip/apply_note_modifications", self._apply_note_modifications) + + # Automation envelopes + self._add("/live/clip/get/automation_envelope", self._get_automation_envelope) + self._add("/live/clip/create_automation_envelope", self._create_automation_envelope) + self._add("/live/clip/clear_envelope", self._clear_envelope) + self._add("/live/clip/clear_all_envelopes", self._clear_all_envelopes) + + # Warp markers + self._add("/live/clip/get/warp_markers", self._get_warp_markers) + + # Groove + self._add("/live/clip/get/groove", self._get_groove) + self._add("/live/clip/set/groove", self._set_groove) + + # ------------------------------------------------------------------ + + def _get_clip(self, params: tuple) -> Optional[Any]: + if len(params) < 2: + return None + try: + track_idx = int(params[0]) + clip_idx = int(params[1]) + tracks = list(self.song.tracks) + if 0 <= track_idx < len(tracks): + slots = list(tracks[track_idx].clip_slots) + if 0 <= clip_idx < len(slots) and slots[clip_idx].has_clip: + return slots[clip_idx].clip + except Exception as e: + logger.warning("get_clip(%s): %s", params, e) + return None + + def _clip_key(self, params: tuple) -> str: + return f"clip.{params[0]}.{params[1]}" + + # ------------------------------------------------------------------ + + def _make_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + clip = self._get_clip(params) + if clip is None: + return None + val = self._get_prop(clip, prop) + if val is None: + return None + prefix = (int(params[0]), int(params[1])) + if isinstance(val, (list, tuple)): + return prefix + tuple(val) + return prefix + (val,) + return handler + + def _make_setter(self, prop: str): + def handler(params: tuple) -> None: + if len(params) < 3: + return None + clip = self._get_clip(params) + if clip: + self._set_prop(clip, prop, params[2]) + return None + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + clip = self._get_clip(params) + if clip: + key = f"{self._clip_key(params)}.{prop}" + prefix = (int(params[0]), int(params[1])) + self._register_listener( + key, clip, prop, f"/live/clip/get/{prop}", prefix + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + if len(params) >= 2: + self._remove_listener(f"{self._clip_key(params)}.{prop}") + return handler + + # ------------------------------------------------------------------ + # Playback methods + # ------------------------------------------------------------------ + + def _fire(self, params: tuple) -> None: + clip = self._get_clip(params) + if clip: + try: + clip.fire() + except Exception as e: + logger.warning("clip.fire: %s", e) + return None + + def _stop(self, params: tuple) -> None: + clip = self._get_clip(params) + if clip: + try: + clip.stop() + except Exception as e: + logger.warning("clip.stop: %s", e) + return None + + def _duplicate_loop(self, params: tuple) -> None: + clip = self._get_clip(params) + if clip: + try: + clip.duplicate_loop() + except Exception as e: + logger.warning("clip.duplicate_loop: %s", e) + return None + + def _quantize(self, params: tuple) -> None: + """params: track_idx, clip_idx, quantization, strength""" + clip = self._get_clip(params) + if clip: + try: + quant = int(params[2]) if len(params) > 2 else 4 + strength = float(params[3]) if len(params) > 3 else 1.0 + clip.quantize(quant, strength) + except Exception as e: + logger.warning("clip.quantize: %s", e) + return None + + # ------------------------------------------------------------------ + # MIDI notes + # ------------------------------------------------------------------ + + def _get_notes(self, params: tuple) -> Optional[tuple]: + """params: track_idx, clip_idx [, pitch, pitch_span, time, time_span]""" + clip = self._get_clip(params) + if clip is None or not clip.is_midi_clip: + return None + try: + if len(params) >= 6: + pitch = int(params[2]) + pitch_span = int(params[3]) + time = float(params[4]) + time_span = float(params[5]) + notes = clip.get_notes(time, pitch, time_span, pitch_span) + else: + notes = clip.get_notes(0, 0, clip.length, 128) + result = [] + for note in notes: + result += [note[0], note[1], note[2], note[3], int(note[4])] + return (int(params[0]), int(params[1])) + tuple(result) + except Exception as e: + logger.warning("get_notes: %s", e) + return None + + def _get_notes_extended(self, params: tuple) -> Optional[tuple]: + """Returns notes using the extended API (with IDs).""" + clip = self._get_clip(params) + if clip is None or not clip.is_midi_clip: + return None + try: + if len(params) >= 6: + notes = clip.get_notes_extended( + int(params[2]), int(params[3]), + float(params[4]), float(params[5]) + ) + else: + notes = clip.get_notes_extended(0, 128, 0, clip.length) + result = [] + for note in notes: + result += [ + note.pitch, float(note.start_time), float(note.duration), + note.velocity, int(note.mute), note.note_id, + float(note.release_velocity), float(note.probability) + ] + return (int(params[0]), int(params[1])) + tuple(result) + except Exception as e: + logger.warning("get_notes_extended: %s", e) + return None + + def _add_notes(self, params: tuple) -> None: + """params: track_idx, clip_idx, pitch, start_time, duration, velocity, mute, [...]""" + clip = self._get_clip(params) + if clip is None or not clip.is_midi_clip: + return None + try: + note_data = params[2:] + if len(note_data) % 5 != 0: + logger.warning("add_notes: expected multiple of 5 args after indices") + return None + notes = [] + for i in range(0, len(note_data), 5): + notes.append(( + int(note_data[i]), # pitch + float(note_data[i+1]), # start_time + float(note_data[i+2]), # duration + int(note_data[i+3]), # velocity + bool(note_data[i+4]), # mute + )) + clip.set_notes(tuple(notes)) + except Exception as e: + logger.warning("add_notes: %s", e) + return None + + def _remove_notes(self, params: tuple) -> None: + """params: track_idx, clip_idx [, time, time_span, pitch, pitch_span]""" + clip = self._get_clip(params) + if clip is None or not clip.is_midi_clip: + return None + try: + if len(params) >= 6: + time = float(params[2]) + time_span = float(params[3]) + pitch = int(params[4]) + pitch_span = int(params[5]) + else: + time, time_span, pitch, pitch_span = 0, clip.length, 0, 128 + clip.remove_notes(time, pitch, time_span, pitch_span) + except Exception as e: + logger.warning("remove_notes: %s", e) + return None + + def _apply_note_modifications(self, params: tuple) -> None: + """params: track_idx, clip_idx, note_id, pitch, start_time, duration, velocity, mute, ...""" + clip = self._get_clip(params) + if clip is None or not clip.is_midi_clip: + return None + try: + note_data = params[2:] + # 8 values per note: note_id, pitch, start_time, duration, velocity, mute, + # release_velocity, probability + if len(note_data) % 8 != 0: + logger.warning("apply_note_modifications: expected multiple of 8 args") + return None + notes = [] + for i in range(0, len(note_data), 8): + note = clip.get_notes_extended(0, 128, 0, clip.length) + # Build modification by note_id + notes.append({ + "note_id": int(note_data[i]), + "pitch": int(note_data[i+1]), + "start_time": float(note_data[i+2]), + "duration": float(note_data[i+3]), + "velocity": int(note_data[i+4]), + "mute": bool(note_data[i+5]), + "release_velocity": float(note_data[i+6]), + "probability": float(note_data[i+7]), + }) + clip.apply_note_modifications(notes) + except Exception as e: + logger.warning("apply_note_modifications: %s", e) + return None + + # ------------------------------------------------------------------ + # Automation envelopes + # ------------------------------------------------------------------ + + def _get_automation_envelope(self, params: tuple) -> Optional[tuple]: + """params: track_idx, clip_idx, device_idx, param_idx""" + clip = self._get_clip(params) + if clip is None or len(params) < 4: + return None + try: + track = list(self.song.tracks)[int(params[0])] + device = list(track.devices)[int(params[2])] + param = list(device.parameters)[int(params[3])] + env = clip.automation_envelope(param) + if env is None: + return None + points = [] + for step in range(env.count): + time = step * (clip.length / env.count) + points += [float(time), float(env.value_at_time(time))] + return (int(params[0]), int(params[1]), int(params[2]), int(params[3])) + tuple(points) + except Exception as e: + logger.warning("get_automation_envelope: %s", e) + return None + + def _create_automation_envelope(self, params: tuple) -> None: + """params: track_idx, clip_idx, device_idx, param_idx""" + clip = self._get_clip(params) + if clip is None or len(params) < 4: + return None + try: + track = list(self.song.tracks)[int(params[0])] + device = list(track.devices)[int(params[2])] + param = list(device.parameters)[int(params[3])] + clip.create_automation_envelope(param) + except Exception as e: + logger.warning("create_automation_envelope: %s", e) + return None + + def _clear_envelope(self, params: tuple) -> None: + clip = self._get_clip(params) + if clip is None or len(params) < 4: + return None + try: + track = list(self.song.tracks)[int(params[0])] + device = list(track.devices)[int(params[2])] + param = list(device.parameters)[int(params[3])] + env = clip.automation_envelope(param) + if env: + env.clear_all_events() + except Exception as e: + logger.warning("clear_envelope: %s", e) + return None + + def _clear_all_envelopes(self, params: tuple) -> None: + clip = self._get_clip(params) + if clip: + try: + clip.clear_all_envelopes() + except Exception as e: + logger.warning("clear_all_envelopes: %s", e) + return None + + # ------------------------------------------------------------------ + # Warp markers + # ------------------------------------------------------------------ + + def _get_warp_markers(self, params: tuple) -> Optional[tuple]: + clip = self._get_clip(params) + if clip is None or not clip.is_audio_clip: + return None + try: + markers = list(clip.warp_markers) + result = [] + for m in markers: + result += [float(m.beat_time), float(m.sample_time)] + return (int(params[0]), int(params[1])) + tuple(result) + except Exception as e: + logger.warning("get_warp_markers: %s", e) + return None + + # ------------------------------------------------------------------ + # Groove + # ------------------------------------------------------------------ + + def _get_groove(self, params: tuple) -> Optional[tuple]: + clip = self._get_clip(params) + if clip is None: + return None + try: + groove = clip.groove + if groove is None: + return (int(params[0]), int(params[1]), "") + return (int(params[0]), int(params[1]), groove.name) + except Exception as e: + logger.warning("get groove: %s", e) + return None + + def _set_groove(self, params: tuple) -> None: + """params: track_idx, clip_idx, groove_name""" + clip = self._get_clip(params) + if clip is None or len(params) < 3: + return None + groove_name = str(params[2]) + try: + if groove_name == "": + clip.groove = None + else: + for g in self.song.groove_pool.grooves: + if g.name == groove_name: + clip.groove = g + break + except Exception as e: + logger.warning("set groove: %s", e) + return None diff --git a/AbletonOSC/clip_slot.py b/AbletonOSC/clip_slot.py new file mode 100644 index 0000000..93c3dbb --- /dev/null +++ b/AbletonOSC/clip_slot.py @@ -0,0 +1,158 @@ +"""Handles /live/clip_slot/* OSC addresses.""" +import logging +from typing import Any, Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +SLOT_PROPS_RO = [ + "has_clip", "is_playing", "is_triggered", "is_recording", + "is_group_slot", "controls_other_clips", "playing_status", + "will_record_on_start", +] + +SLOT_PROPS_RW = ["has_stop_button"] + + +class ClipSlotHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + for prop in SLOT_PROPS_RO: + self._add(f"/live/clip_slot/get/{prop}", self._make_getter(prop)) + self._add(f"/live/clip_slot/start_listen/{prop}", + self._make_start_listen(prop)) + self._add(f"/live/clip_slot/stop_listen/{prop}", + self._make_stop_listen(prop)) + + for prop in SLOT_PROPS_RW: + self._add(f"/live/clip_slot/get/{prop}", self._make_getter(prop)) + self._add(f"/live/clip_slot/set/{prop}", self._make_setter(prop)) + self._add(f"/live/clip_slot/start_listen/{prop}", + self._make_start_listen(prop)) + self._add(f"/live/clip_slot/stop_listen/{prop}", + self._make_stop_listen(prop)) + + self._add("/live/clip_slot/fire", self._fire) + self._add("/live/clip_slot/stop", self._stop) + self._add("/live/clip_slot/create_clip", self._create_clip) + self._add("/live/clip_slot/delete_clip", self._delete_clip) + self._add("/live/clip_slot/duplicate_clip_to", self._duplicate_clip_to) + + # ------------------------------------------------------------------ + + def _get_slot(self, params: tuple) -> Optional[Any]: + if len(params) < 2: + return None + try: + track_idx = int(params[0]) + slot_idx = int(params[1]) + tracks = list(self.song.tracks) + if 0 <= track_idx < len(tracks): + slots = list(tracks[track_idx].clip_slots) + if 0 <= slot_idx < len(slots): + return slots[slot_idx] + except Exception as e: + logger.warning("get_slot(%s): %s", params, e) + return None + + def _slot_key(self, params: tuple) -> str: + return f"clip_slot.{params[0]}.{params[1]}" + + # ------------------------------------------------------------------ + + def _make_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + slot = self._get_slot(params) + if slot is None: + return None + val = self._get_prop(slot, prop) + if val is None: + return None + return (int(params[0]), int(params[1]), val) + return handler + + def _make_setter(self, prop: str): + def handler(params: tuple) -> None: + if len(params) < 3: + return None + slot = self._get_slot(params) + if slot: + self._set_prop(slot, prop, params[2]) + return None + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + slot = self._get_slot(params) + if slot: + key = f"{self._slot_key(params)}.{prop}" + prefix = (int(params[0]), int(params[1])) + self._register_listener( + key, slot, prop, f"/live/clip_slot/get/{prop}", prefix + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + if len(params) >= 2: + self._remove_listener(f"{self._slot_key(params)}.{prop}") + return handler + + # ------------------------------------------------------------------ + + def _fire(self, params: tuple) -> None: + slot = self._get_slot(params) + if slot: + try: + slot.fire() + except Exception as e: + logger.warning("clip_slot.fire: %s", e) + return None + + def _stop(self, params: tuple) -> None: + slot = self._get_slot(params) + if slot: + try: + slot.stop() + except Exception as e: + logger.warning("clip_slot.stop: %s", e) + return None + + def _create_clip(self, params: tuple) -> None: + """params: track_idx, slot_idx [, length_in_beats]""" + slot = self._get_slot(params) + if slot: + try: + length = float(params[2]) if len(params) > 2 else 4.0 + slot.create_clip(length) + except Exception as e: + logger.warning("clip_slot.create_clip: %s", e) + return None + + def _delete_clip(self, params: tuple) -> None: + slot = self._get_slot(params) + if slot and slot.has_clip: + try: + slot.delete_clip() + except Exception as e: + logger.warning("clip_slot.delete_clip: %s", e) + return None + + def _duplicate_clip_to(self, params: tuple) -> None: + """params: src_track_idx, src_slot_idx, dst_track_idx, dst_slot_idx""" + if len(params) < 4: + return None + src_slot = self._get_slot(params) + if src_slot and src_slot.has_clip: + try: + dst_track_idx = int(params[2]) + dst_slot_idx = int(params[3]) + tracks = list(self.song.tracks) + if 0 <= dst_track_idx < len(tracks): + dst_slots = list(tracks[dst_track_idx].clip_slots) + if 0 <= dst_slot_idx < len(dst_slots): + src_slot.duplicate_clip_to(dst_slots[dst_slot_idx]) + except Exception as e: + logger.warning("clip_slot.duplicate_clip_to: %s", e) + return None diff --git a/AbletonOSC/device.py b/AbletonOSC/device.py new file mode 100644 index 0000000..d74967b --- /dev/null +++ b/AbletonOSC/device.py @@ -0,0 +1,467 @@ +"""Handles /live/device/* and /live/device/chain/* OSC addresses.""" +import logging +from typing import Any, List, Optional, Tuple +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + + +def _find_device(song, track_idx: int, device_idx: int, + chain_idx: Optional[int] = None, + sub_device_idx: Optional[int] = None) -> Optional[Any]: + """Resolve a device from track + device index (optionally into a rack chain).""" + try: + tracks = list(song.tracks) + if not (0 <= track_idx < len(tracks)): + return None + devices = list(tracks[track_idx].devices) + if not (0 <= device_idx < len(devices)): + return None + device = devices[device_idx] + if chain_idx is not None and sub_device_idx is not None: + chains = list(getattr(device, "chains", [])) + if not (0 <= chain_idx < len(chains)): + return None + sub_devices = list(chains[chain_idx].devices) + if not (0 <= sub_device_idx < len(sub_devices)): + return None + return sub_devices[sub_device_idx] + return device + except Exception as e: + logger.warning("find_device: %s", e) + return None + + +class DeviceHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + # --- device-level --- + self._add("/live/device/get/name", self._get_name) + self._add("/live/device/get/class_name", self._get_class_name) + self._add("/live/device/get/type", self._get_type) + self._add("/live/device/get/is_active", self._get_is_active) + self._add("/live/device/set/is_active", self._set_is_active) + self._add("/live/device/start_listen/is_active", self._start_listen_is_active) + self._add("/live/device/stop_listen/is_active", self._stop_listen_is_active) + + # --- parameter count --- + self._add("/live/device/get/num_parameters", self._get_num_parameters) + + # --- bulk parameter operations --- + self._add("/live/device/get/parameters/name", self._get_params_name) + self._add("/live/device/get/parameters/value", self._get_params_value) + self._add("/live/device/get/parameters/min", self._get_params_min) + self._add("/live/device/get/parameters/max", self._get_params_max) + self._add("/live/device/get/parameters/default_value", self._get_params_default) + self._add("/live/device/get/parameters/is_quantized", self._get_params_is_quantized) + self._add("/live/device/set/parameters/value", self._set_params_value) + + # --- individual parameter --- + self._add("/live/device/get/parameter/name", self._get_param_name) + self._add("/live/device/get/parameter/value", self._get_param_value) + self._add("/live/device/get/parameter/value_string", self._get_param_value_string) + self._add("/live/device/get/parameter/min", self._get_param_min) + self._add("/live/device/get/parameter/max", self._get_param_max) + self._add("/live/device/get/parameter/default_value", self._get_param_default) + self._add("/live/device/set/parameter/value", self._set_param_value) + self._add("/live/device/start_listen/parameter/value", self._start_listen_param) + self._add("/live/device/stop_listen/parameter/value", self._stop_listen_param) + + # --- rack-specific --- + self._add("/live/device/get/num_chains", self._get_num_chains) + self._add("/live/device/get/chains/name", self._get_chains_name) + self._add("/live/device/get/chains/num_devices", self._get_chains_num_devices) + self._add("/live/device/get/chains/devices/name", self._get_chains_devices_name) + self._add("/live/device/randomize_macros", self._randomize_macros) + + # --- drum pad --- + self._add("/live/device/get/drum_pads/name", self._get_drum_pads_name) + self._add("/live/device/get/drum_pads/note", self._get_drum_pads_note) + self._add("/live/device/get/drum_pads/mute", self._get_drum_pads_mute) + self._add("/live/device/set/drum_pads/mute", self._set_drum_pad_mute) + self._add("/live/device/get/drum_pads/solo", self._get_drum_pads_solo) + self._add("/live/device/set/drum_pads/solo", self._set_drum_pad_solo) + + # --- chain devices (sub-devices inside racks) --- + self._add("/live/device/chain/get/parameter/value", self._get_chain_param_value) + self._add("/live/device/chain/set/parameter/value", self._set_chain_param_value) + self._add("/live/device/chain/start_listen/parameter/value", + self._start_listen_chain_param) + self._add("/live/device/chain/stop_listen/parameter/value", + self._stop_listen_chain_param) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _get_device(self, params: tuple) -> Optional[Any]: + if len(params) < 2: + return None + return _find_device(self.song, int(params[0]), int(params[1])) + + def _get_param(self, params: tuple) -> Optional[Any]: + """params: track_idx, device_idx, param_idx""" + device = self._get_device(params) + if device is None or len(params) < 3: + return None + param_idx = int(params[2]) + try: + params_list = list(device.parameters) + if 0 <= param_idx < len(params_list): + return params_list[param_idx] + except Exception as e: + logger.warning("get_param: %s", e) + return None + + def _get_all_params(self, params: tuple) -> List[Any]: + device = self._get_device(params) + if device is None: + return [] + try: + return list(device.parameters) + except Exception: + return [] + + # ------------------------------------------------------------------ + # Device properties + # ------------------------------------------------------------------ + + def _get_name(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + return (int(params[0]), int(params[1]), d.name) if d else None + + def _get_class_name(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + return (int(params[0]), int(params[1]), d.class_name) if d else None + + def _get_type(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + return (int(params[0]), int(params[1]), int(d.type)) if d else None + + def _get_is_active(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + return (int(params[0]), int(params[1]), bool(d.is_active)) if d else None + + def _set_is_active(self, params: tuple) -> None: + if len(params) < 3: + return None + d = self._get_device(params) + if d: + try: + d.is_active = bool(params[2]) + except Exception as e: + logger.warning("set is_active: %s", e) + return None + + def _start_listen_is_active(self, params: tuple) -> None: + d = self._get_device(params) + if d: + key = f"device.{params[0]}.{params[1]}.is_active" + prefix = (int(params[0]), int(params[1])) + self._register_listener(key, d, "is_active", + "/live/device/get/is_active", prefix) + + def _stop_listen_is_active(self, params: tuple) -> None: + if len(params) >= 2: + self._remove_listener(f"device.{params[0]}.{params[1]}.is_active") + + def _get_num_parameters(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + return (int(params[0]), int(params[1]), len(list(d.parameters))) if d else None + + # ------------------------------------------------------------------ + # Bulk parameter operations + # ------------------------------------------------------------------ + + def _get_params_name(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(p.name for p in ps) if ps else None + + def _get_params_value(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(float(p.value) for p in ps) if ps else None + + def _get_params_min(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(float(p.min) for p in ps) if ps else None + + def _get_params_max(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(float(p.max) for p in ps) if ps else None + + def _get_params_default(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(float(p.default_value) for p in ps) if ps else None + + def _get_params_is_quantized(self, params: tuple) -> Optional[tuple]: + ps = self._get_all_params(params) + return (int(params[0]), int(params[1])) + tuple(bool(p.is_quantized) for p in ps) if ps else None + + def _set_params_value(self, params: tuple) -> None: + """params: track_idx, device_idx, val0, val1, ...""" + if len(params) < 3: + return None + ps = self._get_all_params(params) + values = params[2:] + for i, val in enumerate(values): + if i < len(ps): + try: + ps[i].value = float(val) + except Exception as e: + logger.warning("set param %d: %s", i, e) + return None + + # ------------------------------------------------------------------ + # Individual parameter + # ------------------------------------------------------------------ + + def _get_param_name(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + return (int(params[0]), int(params[1]), int(params[2]), p.name) if p else None + + def _get_param_value(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + return (int(params[0]), int(params[1]), int(params[2]), float(p.value)) if p else None + + def _get_param_value_string(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + if p: + try: + s = p.str_for_value(p.value) + return (int(params[0]), int(params[1]), int(params[2]), s) + except Exception: + return (int(params[0]), int(params[1]), int(params[2]), str(p.value)) + return None + + def _get_param_min(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + return (int(params[0]), int(params[1]), int(params[2]), float(p.min)) if p else None + + def _get_param_max(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + return (int(params[0]), int(params[1]), int(params[2]), float(p.max)) if p else None + + def _get_param_default(self, params: tuple) -> Optional[tuple]: + p = self._get_param(params) + return (int(params[0]), int(params[1]), int(params[2]), float(p.default_value)) if p else None + + def _set_param_value(self, params: tuple) -> None: + """params: track_idx, device_idx, param_idx, value""" + if len(params) < 4: + return None + p = self._get_param(params) + if p: + try: + p.value = float(params[3]) + except Exception as e: + logger.warning("set param value: %s", e) + return None + + def _start_listen_param(self, params: tuple) -> None: + """params: track_idx, device_idx, param_idx""" + p = self._get_param(params) + if p is None: + return + key = f"device.{params[0]}.{params[1]}.param.{params[2]}" + self._remove_listener(key) + prefix = (int(params[0]), int(params[1]), int(params[2])) + + def make_cb(pr, pfx): + def cb(): + self._send("/live/device/get/parameter/value", pfx + (float(pr.value),)) + return cb + + cb = make_cb(p, prefix) + try: + p.add_value_listener(cb) + self._listener_store[key] = (p.remove_value_listener, cb) + cb() + except Exception as e: + logger.warning("param listener: %s", e) + + def _stop_listen_param(self, params: tuple) -> None: + if len(params) >= 3: + self._remove_listener(f"device.{params[0]}.{params[1]}.param.{params[2]}") + + # ------------------------------------------------------------------ + # Rack-specific + # ------------------------------------------------------------------ + + def _get_num_chains(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + if d: + try: + return (int(params[0]), int(params[1]), len(list(d.chains))) + except Exception: + return (int(params[0]), int(params[1]), 0) + return None + + def _get_chains_name(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + if d: + try: + return (int(params[0]), int(params[1])) + tuple(c.name for c in d.chains) + except Exception: + pass + return None + + def _get_chains_num_devices(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + if d: + try: + return (int(params[0]), int(params[1])) + tuple(len(c.devices) for c in d.chains) + except Exception: + pass + return None + + def _get_chains_devices_name(self, params: tuple) -> Optional[tuple]: + d = self._get_device(params) + if d: + try: + result = [] + for chain in d.chains: + result += [dev.name for dev in chain.devices] + return (int(params[0]), int(params[1])) + tuple(result) + except Exception: + pass + return None + + def _randomize_macros(self, params: tuple) -> None: + d = self._get_device(params) + if d: + try: + d.randomize_macros() + except Exception as e: + logger.warning("randomize_macros: %s", e) + return None + + # ------------------------------------------------------------------ + # Drum pads + # ------------------------------------------------------------------ + + def _get_drum_pads(self, params: tuple): + d = self._get_device(params) + if d: + try: + return list(d.drum_pads) + except Exception: + pass + return [] + + def _get_drum_pads_name(self, params: tuple) -> Optional[tuple]: + pads = self._get_drum_pads(params) + if pads: + return (int(params[0]), int(params[1])) + tuple(p.name for p in pads) + return None + + def _get_drum_pads_note(self, params: tuple) -> Optional[tuple]: + pads = self._get_drum_pads(params) + if pads: + return (int(params[0]), int(params[1])) + tuple(int(p.note) for p in pads) + return None + + def _get_drum_pads_mute(self, params: tuple) -> Optional[tuple]: + pads = self._get_drum_pads(params) + if pads: + return (int(params[0]), int(params[1])) + tuple(bool(p.mute) for p in pads) + return None + + def _set_drum_pad_mute(self, params: tuple) -> None: + """params: track_idx, device_idx, pad_idx, mute""" + if len(params) < 4: + return None + pads = self._get_drum_pads(params) + pad_idx = int(params[2]) + if 0 <= pad_idx < len(pads): + try: + pads[pad_idx].mute = bool(params[3]) + except Exception as e: + logger.warning("set drum_pad mute: %s", e) + return None + + def _get_drum_pads_solo(self, params: tuple) -> Optional[tuple]: + pads = self._get_drum_pads(params) + if pads: + return (int(params[0]), int(params[1])) + tuple(bool(p.solo) for p in pads) + return None + + def _set_drum_pad_solo(self, params: tuple) -> None: + """params: track_idx, device_idx, pad_idx, solo""" + if len(params) < 4: + return None + pads = self._get_drum_pads(params) + pad_idx = int(params[2]) + if 0 <= pad_idx < len(pads): + try: + pads[pad_idx].solo = bool(params[3]) + except Exception as e: + logger.warning("set drum_pad solo: %s", e) + return None + + # ------------------------------------------------------------------ + # Chain sub-device parameters + # ------------------------------------------------------------------ + + def _get_chain_device(self, params: tuple) -> Optional[Any]: + """params: track_idx, device_idx, chain_idx, sub_device_idx""" + if len(params) < 4: + return None + return _find_device(self.song, int(params[0]), int(params[1]), + int(params[2]), int(params[3])) + + def _get_chain_param(self, params: tuple) -> Optional[Any]: + """params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx""" + d = self._get_chain_device(params) + if d is None or len(params) < 5: + return None + param_idx = int(params[4]) + try: + ps = list(d.parameters) + return ps[param_idx] if 0 <= param_idx < len(ps) else None + except Exception: + return None + + def _get_chain_param_value(self, params: tuple) -> Optional[tuple]: + p = self._get_chain_param(params) + if p: + return tuple(int(x) for x in params[:5]) + (float(p.value),) + return None + + def _set_chain_param_value(self, params: tuple) -> None: + """params: track_idx, device_idx, chain_idx, sub_device_idx, param_idx, value""" + if len(params) < 6: + return None + p = self._get_chain_param(params) + if p: + try: + p.value = float(params[5]) + except Exception as e: + logger.warning("set chain param: %s", e) + return None + + def _start_listen_chain_param(self, params: tuple) -> None: + p = self._get_chain_param(params) + if p is None: + return + key = f"device.chain.{'.'.join(str(x) for x in params[:5])}" + self._remove_listener(key) + prefix = tuple(int(x) for x in params[:5]) + + def make_cb(pr, pfx): + def cb(): + self._send("/live/device/chain/get/parameter/value", pfx + (float(pr.value),)) + return cb + + cb = make_cb(p, prefix) + try: + p.add_value_listener(cb) + self._listener_store[key] = (p.remove_value_listener, cb) + cb() + except Exception as e: + logger.warning("chain param listener: %s", e) + + def _stop_listen_chain_param(self, params: tuple) -> None: + if len(params) >= 5: + key = f"device.chain.{'.'.join(str(x) for x in params[:5])}" + self._remove_listener(key) diff --git a/AbletonOSC/groove.py b/AbletonOSC/groove.py new file mode 100644 index 0000000..0987062 --- /dev/null +++ b/AbletonOSC/groove.py @@ -0,0 +1,60 @@ +"""Handles /live/groove/* OSC addresses.""" +import logging +from typing import Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + + +class GrooveHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + self._add("/live/groove/get/grooves", self._get_grooves) + self._add("/live/groove/get/amount", self._get_groove_amount) + self._add("/live/groove/set/amount", self._set_groove_amount) + + def _pool(self): + try: + return self.song.groove_pool + except Exception: + return None + + def _get_grooves(self, params: tuple) -> Optional[tuple]: + pool = self._pool() + if pool is None: + return None + try: + return tuple(g.name for g in pool.grooves) + except Exception as e: + logger.warning("get grooves: %s", e) + return None + + def _get_groove_amount(self, params: tuple) -> Optional[tuple]: + """params: groove_name""" + pool = self._pool() + if pool is None or not params: + return None + name = str(params[0]) + try: + for g in pool.grooves: + if g.name == name: + return (name, float(g.amount)) + except Exception as e: + logger.warning("get groove amount: %s", e) + return None + + def _set_groove_amount(self, params: tuple) -> None: + """params: groove_name, amount""" + pool = self._pool() + if pool is None or len(params) < 2: + return None + name = str(params[0]) + amount = float(params[1]) + try: + for g in pool.grooves: + if g.name == name: + g.amount = amount + break + except Exception as e: + logger.warning("set groove amount: %s", e) + return None diff --git a/AbletonOSC/handler.py b/AbletonOSC/handler.py new file mode 100644 index 0000000..dc87a6c --- /dev/null +++ b/AbletonOSC/handler.py @@ -0,0 +1,129 @@ +"""Base handler with get/set/listen/call infrastructure.""" +import logging +from functools import partial +from typing import Any, Callable, Dict, Optional, Tuple + +logger = logging.getLogger(__name__) + + +class AbletonOSCHandler: + def __init__(self, manager): + self.manager = manager + self.osc_server = manager.osc_server + self._listener_store: Dict[str, Callable] = {} # key -> (target, remove_fn) + + @property + def song(self): + return self.manager.song() + + # ------------------------------------------------------------------ + # Registration helpers + # ------------------------------------------------------------------ + + def _add(self, address: str, callback: Callable) -> None: + self.osc_server.add_handler(address, callback) + + def _send(self, address: str, args: tuple) -> None: + self.osc_server.send_all_clients(address, args) + + # ------------------------------------------------------------------ + # Generic property get + # ------------------------------------------------------------------ + + def _get_prop(self, target: Any, prop: str) -> Any: + try: + return getattr(target, prop) + except Exception as e: + logger.warning("get %s: %s", prop, e) + return None + + def _set_prop(self, target: Any, prop: str, value: Any) -> None: + try: + setattr(target, prop, value) + except Exception as e: + logger.warning("set %s=%s: %s", prop, value, e) + + # ------------------------------------------------------------------ + # Listener management + # ------------------------------------------------------------------ + + def _register_listener(self, key: str, target: Any, prop: str, + notify_address: str, notify_args_prefix: tuple = ()) -> None: + """Attach a Live listener and remember it so we can remove it later.""" + self._remove_listener(key) + + add_fn_name = f"add_{prop}_listener" + remove_fn_name = f"remove_{prop}_listener" + + add_fn = getattr(target, add_fn_name, None) + remove_fn = getattr(target, remove_fn_name, None) + if add_fn is None: + logger.warning("No listener support for %s", prop) + return + + def callback(): + val = self._get_prop(target, prop) + if val is None: + return + if isinstance(val, (list, tuple)): + self._send(notify_address, notify_args_prefix + tuple(val)) + else: + self._send(notify_address, notify_args_prefix + (val,)) + + add_fn(callback) + self._listener_store[key] = (remove_fn, callback) + # Send current value immediately + callback() + + def _remove_listener(self, key: str) -> None: + entry = self._listener_store.pop(key, None) + if entry is None: + return + remove_fn, callback = entry + try: + if remove_fn and remove_fn(callback): + pass + elif remove_fn: + remove_fn(callback) + except Exception as e: + logger.warning("remove listener %s: %s", key, e) + + def clear_listeners(self) -> None: + for key in list(self._listener_store.keys()): + self._remove_listener(key) + + # ------------------------------------------------------------------ + # Handler factories for the common get/set/listen pattern + # ------------------------------------------------------------------ + + def _make_getter(self, get_target: Callable, prop: str, + prefix_from_params: Callable = None) -> Callable: + """Returns a handler fn that reads target.prop and returns the value.""" + def handler(params: tuple) -> Optional[tuple]: + target = get_target(params) + if target is None: + return None + val = self._get_prop(target, prop) + if val is None: + return None + prefix = prefix_from_params(params) if prefix_from_params else () + if isinstance(val, (list, tuple)): + return prefix + tuple(val) + return prefix + (val,) + return handler + + def _make_setter(self, get_target: Callable, prop: str, + value_index: int = 0) -> Callable: + """Returns a handler fn that sets target.prop from params.""" + def handler(params: tuple) -> None: + target = get_target(params) + if target is None: + return None + value = params[value_index] + self._set_prop(target, prop, value) + return None + return handler + + def init_api(self) -> None: + """Override in subclasses to register all OSC handlers.""" + pass diff --git a/AbletonOSC/manager.py b/AbletonOSC/manager.py new file mode 100644 index 0000000..d6d78c2 --- /dev/null +++ b/AbletonOSC/manager.py @@ -0,0 +1,127 @@ +""" +Main ControlSurface entry point for Ableton Live. +Manages OSC server lifecycle and all handler registrations. +""" +import logging +import os +import sys + +try: + from ableton.v2.control_surface import ControlSurface +except ImportError: + # Allow importing outside of Ableton for tooling + class ControlSurface: # type: ignore + def __init__(self, c_instance=None): + self._c_instance = c_instance + + def song(self): + return None + + def application(self): + return None + + def disconnect(self): + pass + + def update_display(self): + pass + + def log_message(self, msg): + print(msg) + +from .osc_server import OSCServer +from .song import SongHandler +from .track import TrackHandler +from .return_track import ReturnTrackHandler +from .clip import ClipHandler +from .clip_slot import ClipSlotHandler +from .device import DeviceHandler +from .scene import SceneHandler +from .view import ViewHandler +from .application import ApplicationHandler +from .browser import BrowserHandler +from .groove import GrooveHandler + +logger = logging.getLogger(__name__) + +LISTEN_PORT = int(os.environ.get("ABLETON_OSC_LISTEN_PORT", 11000)) +SEND_PORT = int(os.environ.get("ABLETON_OSC_SEND_PORT", 11001)) + + +class Manager(ControlSurface): + def __init__(self, c_instance=None): + super().__init__(c_instance) + self._setup_logging() + self.osc_server = OSCServer(listen_port=LISTEN_PORT, send_port=SEND_PORT) + self._handlers = [] + self._setup_handlers() + self.osc_server.start() + logger.info("AbletonOSC started (listen=%d, send=%d)", LISTEN_PORT, SEND_PORT) + self.osc_server.send("/live/startup", ()) + + # ------------------------------------------------------------------ + + def _setup_logging(self) -> None: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + def _setup_handlers(self) -> None: + handler_classes = [ + ApplicationHandler, + SongHandler, + TrackHandler, + ReturnTrackHandler, + ClipHandler, + ClipSlotHandler, + DeviceHandler, + SceneHandler, + ViewHandler, + BrowserHandler, + GrooveHandler, + ] + for cls in handler_classes: + h = cls(self) + h.init_api() + self._handlers.append(h) + + # Control endpoints + self.osc_server.add_handler("/live/test", self._handle_test) + self.osc_server.add_handler("/live/api/reload", self._handle_reload) + self.osc_server.add_handler("/live/api/show_message", self._handle_show_message) + + # ------------------------------------------------------------------ + # ControlSurface interface + # ------------------------------------------------------------------ + + def update_display(self) -> None: + """Called by Live ~100ms; drives the OSC poll loop.""" + self.osc_server.process() + + def disconnect(self) -> None: + for h in self._handlers: + h.clear_listeners() + self.osc_server.send("/live/shutdown", ()) + self.osc_server.stop() + super().disconnect() + logger.info("AbletonOSC disconnected") + + # ------------------------------------------------------------------ + # Built-in handlers + # ------------------------------------------------------------------ + + def _handle_test(self, params: tuple): + return ("AbletonOSC active",) + + def _handle_reload(self, params: tuple): + # Re-init all handlers (clears listeners and re-registers) + for h in self._handlers: + h.clear_listeners() + h.init_api() + return ("reloaded",) + + def _handle_show_message(self, params: tuple): + if params: + self.log_message(str(params[0])) + return None diff --git a/AbletonOSC/osc_server.py b/AbletonOSC/osc_server.py new file mode 100644 index 0000000..efae9e6 --- /dev/null +++ b/AbletonOSC/osc_server.py @@ -0,0 +1,225 @@ +""" +UDP OSC server with zero external dependencies. +Safe for Ableton's embedded Python (no threading, non-blocking sockets). +""" +import logging +import socket +import struct +from typing import Any, Callable, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +LISTEN_PORT = 11000 +SEND_PORT = 11001 +MAX_PACKET = 65536 + + +# --------------------------------------------------------------------------- +# OSC encoding +# --------------------------------------------------------------------------- + +def _pad4(n: int) -> int: + return (n + 3) & ~3 + + +def _encode_string(s: str) -> bytes: + b = s.encode("utf-8") + b"\x00" + pad = _pad4(len(b)) - len(b) + return b + b"\x00" * pad + + +def _encode_int(i: int) -> bytes: + return struct.pack(">i", int(i)) + + +def _encode_float(f: float) -> bytes: + return struct.pack(">f", float(f)) + + +def _encode_blob(b: bytes) -> bytes: + length = struct.pack(">I", len(b)) + pad = _pad4(len(b)) - len(b) + return length + b + b"\x00" * pad + + +def encode_message(address: str, args: tuple) -> bytes: + type_tags = "," + arg_bytes = b"" + for arg in args: + if isinstance(arg, bool): + type_tags += "T" if arg else "F" + elif isinstance(arg, int): + type_tags += "i" + arg_bytes += _encode_int(arg) + elif isinstance(arg, float): + type_tags += "f" + arg_bytes += _encode_float(arg) + elif isinstance(arg, str): + type_tags += "s" + arg_bytes += _encode_string(arg) + elif isinstance(arg, bytes): + type_tags += "b" + arg_bytes += _encode_blob(arg) + elif arg is None: + type_tags += "N" + return _encode_string(address) + _encode_string(type_tags) + arg_bytes + + +# --------------------------------------------------------------------------- +# OSC decoding +# --------------------------------------------------------------------------- + +def decode_message(data: bytes) -> Tuple[str, List[Any]]: + offset = 0 + + # address + end = data.index(b"\x00", offset) + address = data[offset:end].decode("utf-8") + offset = _pad4(end + 1) + + if offset >= len(data) or data[offset:offset + 1] != b",": + return address, [] + + # type tags + end = data.index(b"\x00", offset) + type_tags = data[offset + 1:end].decode("utf-8") + offset = _pad4(end + 1) + + args: List[Any] = [] + for tag in type_tags: + if tag == "i": + args.append(struct.unpack(">i", data[offset:offset + 4])[0]) + offset += 4 + elif tag == "f": + args.append(struct.unpack(">f", data[offset:offset + 4])[0]) + offset += 4 + elif tag == "s": + end = data.index(b"\x00", offset) + args.append(data[offset:end].decode("utf-8")) + offset = _pad4(end + 1) + elif tag == "b": + length = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + args.append(data[offset:offset + length]) + offset += _pad4(length) + elif tag == "T": + args.append(True) + elif tag == "F": + args.append(False) + elif tag in ("N", "n"): + args.append(None) + elif tag == "I": + args.append(float("inf")) + elif tag in ("h", "t"): + args.append(struct.unpack(">q", data[offset:offset + 8])[0]) + offset += 8 + elif tag == "d": + args.append(struct.unpack(">d", data[offset:offset + 8])[0]) + offset += 8 + elif tag == "c": + args.append(chr(struct.unpack(">I", data[offset:offset + 4])[0])) + offset += 4 + return address, args + + +# --------------------------------------------------------------------------- +# OSC Server +# --------------------------------------------------------------------------- + +class OSCServer: + def __init__(self, listen_port: int = LISTEN_PORT, send_port: int = SEND_PORT): + self.listen_port = listen_port + self.send_port = send_port + self._handlers: Dict[str, Callable] = {} + self._sock: Optional[socket.socket] = None + self._remote_addr: Optional[str] = None + + # ------------------------------------------------------------------ + def start(self) -> None: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._sock.bind(("0.0.0.0", self.listen_port)) + self._sock.setblocking(False) + logger.info("OSC server listening on port %d", self.listen_port) + + def stop(self) -> None: + if self._sock: + self._sock.close() + self._sock = None + + # ------------------------------------------------------------------ + def add_handler(self, address: str, callback: Callable) -> None: + self._handlers[address] = callback + + def remove_handler(self, address: str) -> None: + self._handlers.pop(address, None) + + # ------------------------------------------------------------------ + def process(self) -> None: + """Call from Ableton's update_display tick.""" + if not self._sock: + return + try: + while True: + data, addr = self._sock.recvfrom(MAX_PACKET) + self._remote_addr = addr[0] + self._dispatch(data) + except BlockingIOError: + pass + except Exception as e: + logger.error("OSC recv error: %s", e) + + def _dispatch(self, data: bytes) -> None: + if data[:8] == b"#bundle\x00": + self._dispatch_bundle(data) + return + try: + address, args = decode_message(data) + except Exception as e: + logger.warning("OSC decode error: %s", e) + return + self._call(address, args) + + def _dispatch_bundle(self, data: bytes) -> None: + offset = 16 # skip "#bundle\0" + timetag + while offset < len(data): + size = struct.unpack(">I", data[offset:offset + 4])[0] + offset += 4 + self._dispatch(data[offset:offset + size]) + offset += size + + def _call(self, address: str, args: List[Any]) -> None: + handler = self._handlers.get(address) + if handler is None: + # try prefix match (e.g. wildcard handlers) + for pattern, h in self._handlers.items(): + if pattern.endswith("*") and address.startswith(pattern[:-1]): + handler = h + break + if handler is None: + logger.debug("No handler for %s", address) + return + try: + result = handler(tuple(args)) + if result is not None: + self.send(address, result) + except Exception as e: + logger.error("Handler error for %s: %s", address, e) + self.send("/live/error", (str(e),)) + + # ------------------------------------------------------------------ + def send(self, address: str, args: tuple, target_ip: Optional[str] = None) -> None: + if not self._sock: + return + ip = target_ip or self._remote_addr + if not ip: + return + try: + data = encode_message(address, args) + self._sock.sendto(data, (ip, self.send_port)) + except Exception as e: + logger.error("OSC send error: %s", e) + + def send_all_clients(self, address: str, args: tuple) -> None: + """Broadcast to all known clients (currently just the last sender).""" + self.send(address, args) diff --git a/AbletonOSC/return_track.py b/AbletonOSC/return_track.py new file mode 100644 index 0000000..85361c1 --- /dev/null +++ b/AbletonOSC/return_track.py @@ -0,0 +1,165 @@ +"""Handles /live/return_track/* OSC addresses (mirrors TrackHandler for return tracks).""" +import logging +from typing import Any, List, Optional, Tuple +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +PROPS_RW = ["name", "mute", "solo", "color", "color_index"] +PROPS_RO = ["output_meter_level", "output_meter_left", "output_meter_right"] + + +class ReturnTrackHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + for prop in PROPS_RW: + self._add(f"/live/return_track/get/{prop}", self._make_getter(prop)) + self._add(f"/live/return_track/set/{prop}", self._make_setter(prop)) + self._add(f"/live/return_track/start_listen/{prop}", + self._make_start_listen(prop)) + self._add(f"/live/return_track/stop_listen/{prop}", + self._make_stop_listen(prop)) + + for prop in PROPS_RO: + self._add(f"/live/return_track/get/{prop}", self._make_getter(prop)) + self._add(f"/live/return_track/start_listen/{prop}", + self._make_start_listen(prop)) + self._add(f"/live/return_track/stop_listen/{prop}", + self._make_stop_listen(prop)) + + self._add("/live/return_track/get/volume", self._get_volume) + self._add("/live/return_track/set/volume", self._set_volume) + self._add("/live/return_track/get/panning", self._get_panning) + self._add("/live/return_track/set/panning", self._set_panning) + self._add("/live/return_track/get/send", self._get_send) + self._add("/live/return_track/set/send", self._set_send) + self._add("/live/return_track/get/num_devices", self._get_num_devices) + self._add("/live/return_track/get/devices/name", self._get_devices_name) + + # ------------------------------------------------------------------ + + def _get_return_tracks(self, params) -> List[Tuple[int, Any]]: + tracks = list(self.song.return_tracks) + if not params or params[0] == "*": + return list(enumerate(tracks)) + idx = int(params[0]) + return [(idx, tracks[idx])] if 0 <= idx < len(tracks) else [] + + def _get_return_track(self, params) -> Optional[Any]: + tracks = list(self.song.return_tracks) + if not params: + return None + idx = int(params[0]) + return tracks[idx] if 0 <= idx < len(tracks) else None + + def _make_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_return_tracks(params): + val = self._get_prop(track, prop) + if val is not None: + result += [idx, val] + return tuple(result) if result else None + return handler + + def _make_setter(self, prop: str): + def handler(params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_return_track(params) + if track: + self._set_prop(track, prop, params[1]) + return None + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + for idx, track in self._get_return_tracks(params): + self._register_listener( + f"return_track.{idx}.{prop}", track, prop, + f"/live/return_track/get/{prop}", (idx,) + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + for idx, _ in self._get_return_tracks(params): + self._remove_listener(f"return_track.{idx}.{prop}") + return handler + + def _get_volume(self, params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_return_tracks(params): + try: + result += [idx, track.mixer_device.volume.value] + except Exception: + pass + return tuple(result) if result else None + + def _set_volume(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_return_track(params) + if track: + try: + track.mixer_device.volume.value = float(params[1]) + except Exception as e: + logger.warning("return_track set volume: %s", e) + return None + + def _get_panning(self, params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_return_tracks(params): + try: + result += [idx, track.mixer_device.panning.value] + except Exception: + pass + return tuple(result) if result else None + + def _set_panning(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_return_track(params) + if track: + try: + track.mixer_device.panning.value = float(params[1]) + except Exception as e: + logger.warning("return_track set panning: %s", e) + return None + + def _get_send(self, params: tuple) -> Optional[tuple]: + if len(params) < 2: + return None + track = self._get_return_track(params) + send_idx = int(params[1]) + if track: + try: + sends = list(track.mixer_device.sends) + if 0 <= send_idx < len(sends): + return (int(params[0]), send_idx, sends[send_idx].value) + except Exception as e: + logger.warning("return_track get send: %s", e) + return None + + def _set_send(self, params: tuple) -> None: + if len(params) < 3: + return None + track = self._get_return_track(params) + send_idx = int(params[1]) + if track: + try: + sends = list(track.mixer_device.sends) + if 0 <= send_idx < len(sends): + sends[send_idx].value = float(params[2]) + except Exception as e: + logger.warning("return_track set send: %s", e) + return None + + def _get_num_devices(self, params: tuple) -> Optional[tuple]: + track = self._get_return_track(params) + return (len(track.devices),) if track else None + + def _get_devices_name(self, params: tuple) -> Optional[tuple]: + track = self._get_return_track(params) + return tuple(d.name for d in track.devices) if track else None diff --git a/AbletonOSC/scene.py b/AbletonOSC/scene.py new file mode 100644 index 0000000..829acc5 --- /dev/null +++ b/AbletonOSC/scene.py @@ -0,0 +1,111 @@ +"""Handles /live/scene/* OSC addresses.""" +import logging +from typing import Any, Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +SCENE_PROPS_RW = [ + "name", "color", "color_index", + "tempo", "tempo_enabled", + "time_signature_numerator", "time_signature_denominator", + "time_signature_enabled", +] + +SCENE_PROPS_RO = ["is_empty", "is_triggered"] + + +class SceneHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + for prop in SCENE_PROPS_RW: + self._add(f"/live/scene/get/{prop}", self._make_getter(prop)) + self._add(f"/live/scene/set/{prop}", self._make_setter(prop)) + self._add(f"/live/scene/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/scene/stop_listen/{prop}", self._make_stop_listen(prop)) + + for prop in SCENE_PROPS_RO: + self._add(f"/live/scene/get/{prop}", self._make_getter(prop)) + self._add(f"/live/scene/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/scene/stop_listen/{prop}", self._make_stop_listen(prop)) + + self._add("/live/scene/fire", self._fire) + self._add("/live/scene/fire_as_selected", self._fire_as_selected) + self._add("/live/scene/get/num_clips", self._get_num_clips) + self._add("/live/scene/get/clip_slots/has_clip", self._get_has_clips) + + # ------------------------------------------------------------------ + + def _get_scene(self, params: tuple) -> Optional[Any]: + if not params: + return None + idx = int(params[0]) + scenes = list(self.song.scenes) + return scenes[idx] if 0 <= idx < len(scenes) else None + + def _make_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + scene = self._get_scene(params) + if scene is None: + return None + val = self._get_prop(scene, prop) + return (int(params[0]), val) if val is not None else None + return handler + + def _make_setter(self, prop: str): + def handler(params: tuple) -> None: + if len(params) < 2: + return None + scene = self._get_scene(params) + if scene: + self._set_prop(scene, prop, params[1]) + return None + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + scene = self._get_scene(params) + if scene: + self._register_listener( + f"scene.{params[0]}.{prop}", scene, prop, + f"/live/scene/get/{prop}", (int(params[0]),) + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + if params: + self._remove_listener(f"scene.{params[0]}.{prop}") + return handler + + def _fire(self, params: tuple) -> None: + scene = self._get_scene(params) + if scene: + try: + scene.fire() + except Exception as e: + logger.warning("scene.fire: %s", e) + return None + + def _fire_as_selected(self, params: tuple) -> None: + scene = self._get_scene(params) + if scene: + try: + scene.fire_as_selected() + except Exception as e: + logger.warning("scene.fire_as_selected: %s", e) + return None + + def _get_num_clips(self, params: tuple) -> Optional[tuple]: + scene = self._get_scene(params) + if scene is None: + return None + count = sum(1 for slot in scene.clip_slots if slot.has_clip) + return (int(params[0]), count) + + def _get_has_clips(self, params: tuple) -> Optional[tuple]: + scene = self._get_scene(params) + if scene is None: + return None + return (int(params[0]),) + tuple(bool(slot.has_clip) for slot in scene.clip_slots) diff --git a/AbletonOSC/song.py b/AbletonOSC/song.py new file mode 100644 index 0000000..747265e --- /dev/null +++ b/AbletonOSC/song.py @@ -0,0 +1,275 @@ +"""Handles /live/song/* OSC addresses.""" +import logging +from typing import Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +# Properties that are readable and writable +RW_PROPS = [ + "tempo", + "time_signature_numerator", + "time_signature_denominator", + "loop", + "loop_start", + "loop_length", + "current_song_time", + "metronome", + "record_mode", + "arrangement_overdub", + "session_record", + "overdub", + "groove_amount", + "swing_amount", + "clip_trigger_quantization", + "midi_recording_quantization", + "punch_in", + "punch_out", + "exclusive_arm", + "exclusive_solo", +] + +# Properties that are read-only +RO_PROPS = [ + "is_playing", + "can_undo", + "can_redo", + "song_length", + "session_record_status", +] + + +class SongHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + song = self.song + + # --- get --- + for prop in RW_PROPS + RO_PROPS: + self._add(f"/live/song/get/{prop}", self._make_prop_getter(prop)) + + # --- set --- + for prop in RW_PROPS: + self._add(f"/live/song/set/{prop}", self._make_prop_setter(prop)) + + # --- listeners --- + for prop in RW_PROPS + RO_PROPS: + self._add(f"/live/song/start_listen/{prop}", + self._make_start_listen(prop)) + self._add(f"/live/song/stop_listen/{prop}", + self._make_stop_listen(prop)) + + # --- aggregate queries --- + self._add("/live/song/get/num_tracks", self._get_num_tracks) + self._add("/live/song/get/num_scenes", self._get_num_scenes) + self._add("/live/song/get/num_return_tracks", self._get_num_return_tracks) + self._add("/live/song/get/track_names", self._get_track_names) + self._add("/live/song/get/scene_names", self._get_scene_names) + self._add("/live/song/get/return_track_names", self._get_return_track_names) + self._add("/live/song/get/cue_points", self._get_cue_points) + + # --- method calls --- + self._add("/live/song/start_playing", lambda p: self._call("start_playing")) + self._add("/live/song/stop_playing", lambda p: self._call("stop_playing")) + self._add("/live/song/continue_playing", lambda p: self._call("continue_playing")) + self._add("/live/song/stop_all_clips", lambda p: self._call("stop_all_clips")) + self._add("/live/song/undo", lambda p: self._call("undo")) + self._add("/live/song/redo", lambda p: self._call("redo")) + self._add("/live/song/tap_tempo", lambda p: self._call("tap_tempo")) + self._add("/live/song/trigger_session_record", + lambda p: self._call("trigger_session_record")) + self._add("/live/song/re_enable_automation", + lambda p: self._call("re_enable_automation")) + self._add("/live/song/jump_by", self._jump_by) + self._add("/live/song/jump_to_next_cue", lambda p: self._call("jump_to_next_cue")) + self._add("/live/song/jump_to_prev_cue", lambda p: self._call("jump_to_prev_cue")) + self._add("/live/song/capture_midi", lambda p: self._call("capture_midi")) + + # --- track/scene creation and deletion --- + self._add("/live/song/create_audio_track", self._create_audio_track) + self._add("/live/song/create_midi_track", self._create_midi_track) + self._add("/live/song/create_return_track", + lambda p: self._call("create_return_track")) + self._add("/live/song/create_scene", self._create_scene) + self._add("/live/song/delete_track", self._delete_track) + self._add("/live/song/delete_scene", self._delete_scene) + self._add("/live/song/delete_return_track", self._delete_return_track) + self._add("/live/song/duplicate_track", self._duplicate_track) + self._add("/live/song/duplicate_scene", self._duplicate_scene) + + # --- beat listener (special) --- + self._add("/live/song/start_listen/beat", self._start_beat_listen) + self._add("/live/song/stop_listen/beat", self._stop_beat_listen) + + # ------------------------------------------------------------------ + # Factories + # ------------------------------------------------------------------ + + def _make_prop_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + val = self._get_prop(self.song, prop) + return (val,) if val is not None else None + return handler + + def _make_prop_setter(self, prop: str): + def handler(params: tuple) -> None: + if params: + self._set_prop(self.song, prop, params[0]) + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + self._register_listener( + f"song.{prop}", self.song, prop, f"/live/song/get/{prop}" + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + self._remove_listener(f"song.{prop}") + return handler + + # ------------------------------------------------------------------ + # Aggregate queries + # ------------------------------------------------------------------ + + def _get_num_tracks(self, params: tuple) -> tuple: + return (len(self.song.tracks),) + + def _get_num_scenes(self, params: tuple) -> tuple: + return (len(self.song.scenes),) + + def _get_num_return_tracks(self, params: tuple) -> tuple: + return (len(self.song.return_tracks),) + + def _get_track_names(self, params: tuple) -> tuple: + start = int(params[0]) if len(params) > 0 else 0 + end = int(params[1]) if len(params) > 1 else len(self.song.tracks) + tracks = list(self.song.tracks)[start:end] + return tuple(t.name for t in tracks) + + def _get_scene_names(self, params: tuple) -> tuple: + start = int(params[0]) if len(params) > 0 else 0 + end = int(params[1]) if len(params) > 1 else len(self.song.scenes) + scenes = list(self.song.scenes)[start:end] + return tuple(s.name for s in scenes) + + def _get_return_track_names(self, params: tuple) -> tuple: + return tuple(t.name for t in self.song.return_tracks) + + def _get_cue_points(self, params: tuple) -> tuple: + result = [] + for cp in self.song.cue_points: + result.append(cp.name) + result.append(float(cp.time)) + return tuple(result) + + # ------------------------------------------------------------------ + # Methods + # ------------------------------------------------------------------ + + def _call(self, method: str) -> None: + try: + getattr(self.song, method)() + except Exception as e: + logger.warning("song.%s(): %s", method, e) + return None + + def _jump_by(self, params: tuple) -> None: + if params: + try: + self.song.jump_by(float(params[0])) + except Exception as e: + logger.warning("song.jump_by: %s", e) + return None + + # --- track/scene CRUD --- + + def _create_audio_track(self, params: tuple) -> tuple: + idx = int(params[0]) if params else -1 + try: + self.song.create_audio_track(idx) + except Exception as e: + logger.warning("create_audio_track: %s", e) + return (len(self.song.tracks),) + + def _create_midi_track(self, params: tuple) -> tuple: + idx = int(params[0]) if params else -1 + try: + self.song.create_midi_track(idx) + except Exception as e: + logger.warning("create_midi_track: %s", e) + return (len(self.song.tracks),) + + def _create_scene(self, params: tuple) -> tuple: + idx = int(params[0]) if params else -1 + try: + self.song.create_scene(idx) + except Exception as e: + logger.warning("create_scene: %s", e) + return (len(self.song.scenes),) + + def _delete_track(self, params: tuple) -> None: + if params: + try: + self.song.delete_track(int(params[0])) + except Exception as e: + logger.warning("delete_track: %s", e) + return None + + def _delete_scene(self, params: tuple) -> None: + if params: + try: + self.song.delete_scene(int(params[0])) + except Exception as e: + logger.warning("delete_scene: %s", e) + return None + + def _delete_return_track(self, params: tuple) -> None: + if params: + try: + self.song.delete_return_track(int(params[0])) + except Exception as e: + logger.warning("delete_return_track: %s", e) + return None + + def _duplicate_track(self, params: tuple) -> None: + if params: + try: + self.song.duplicate_track(int(params[0])) + except Exception as e: + logger.warning("duplicate_track: %s", e) + return None + + def _duplicate_scene(self, params: tuple) -> None: + if params: + try: + self.song.duplicate_scene(int(params[0])) + except Exception as e: + logger.warning("duplicate_scene: %s", e) + return None + + # ------------------------------------------------------------------ + # Beat listener + # ------------------------------------------------------------------ + + def _start_beat_listen(self, params: tuple) -> None: + self._remove_listener("song.beat") + self._beat_count = -1 + + def on_beat(): + beat = int(self.song.get_current_beats_song_time().beats) + if beat != self._beat_count: + self._beat_count = beat + self._send("/live/song/get/beat", (beat,)) + + try: + self.song.add_current_song_time_listener(on_beat) + self._listener_store["song.beat"] = ( + self.song.remove_current_song_time_listener, on_beat + ) + except Exception as e: + logger.warning("beat listener: %s", e) + + def _stop_beat_listen(self, params: tuple) -> None: + self._remove_listener("song.beat") diff --git a/AbletonOSC/track.py b/AbletonOSC/track.py new file mode 100644 index 0000000..5038b71 --- /dev/null +++ b/AbletonOSC/track.py @@ -0,0 +1,574 @@ +"""Handles /live/track/* OSC addresses.""" +import logging +from typing import Any, List, Optional, Tuple +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + +TRACK_PROPS_RW = [ + "arm", "mute", "solo", "name", "color", "color_index", + "fold_state", "current_monitoring_state", +] + +TRACK_PROPS_RO = [ + "can_be_armed", "has_audio_input", "has_audio_output", + "has_midi_input", "has_midi_output", + "is_foldable", "is_grouped", "is_visible", + "fired_slot_index", "playing_slot_index", + "output_meter_level", "output_meter_left", "output_meter_right", + "output_meter_peak_left", "output_meter_peak_right", +] + + +class TrackHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + + for prop in TRACK_PROPS_RW: + self._add(f"/live/track/get/{prop}", self._make_getter(prop)) + self._add(f"/live/track/set/{prop}", self._make_setter(prop)) + self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop)) + + for prop in TRACK_PROPS_RO: + self._add(f"/live/track/get/{prop}", self._make_getter(prop)) + self._add(f"/live/track/start_listen/{prop}", self._make_start_listen(prop)) + self._add(f"/live/track/stop_listen/{prop}", self._make_stop_listen(prop)) + + # Mixer device (volume, panning, sends) + self._add("/live/track/get/volume", self._get_volume) + self._add("/live/track/set/volume", self._set_volume) + self._add("/live/track/start_listen/volume", self._start_listen_volume) + self._add("/live/track/stop_listen/volume", self._stop_listen_volume) + + self._add("/live/track/get/panning", self._get_panning) + self._add("/live/track/set/panning", self._set_panning) + self._add("/live/track/start_listen/panning", self._start_listen_panning) + self._add("/live/track/stop_listen/panning", self._stop_listen_panning) + + self._add("/live/track/get/send", self._get_send) + self._add("/live/track/set/send", self._set_send) + self._add("/live/track/start_listen/send", self._start_listen_send) + self._add("/live/track/stop_listen/send", self._stop_listen_send) + + # Routing + self._add("/live/track/get/available_input_routing_types", + self._get_available_input_routing_types) + self._add("/live/track/get/available_input_routing_channels", + self._get_available_input_routing_channels) + self._add("/live/track/get/input_routing_type", self._get_input_routing_type) + self._add("/live/track/set/input_routing_type", self._set_input_routing_type) + self._add("/live/track/get/input_routing_channel", self._get_input_routing_channel) + self._add("/live/track/set/input_routing_channel", self._set_input_routing_channel) + self._add("/live/track/get/available_output_routing_types", + self._get_available_output_routing_types) + self._add("/live/track/get/available_output_routing_channels", + self._get_available_output_routing_channels) + self._add("/live/track/get/output_routing_type", self._get_output_routing_type) + self._add("/live/track/set/output_routing_type", self._set_output_routing_type) + self._add("/live/track/get/output_routing_channel", self._get_output_routing_channel) + self._add("/live/track/set/output_routing_channel", self._set_output_routing_channel) + + # Aggregate + self._add("/live/track/get/clips/name", self._get_clips_name) + self._add("/live/track/get/clips/length", self._get_clips_length) + self._add("/live/track/get/clips/color", self._get_clips_color) + self._add("/live/track/get/clips/is_playing", self._get_clips_is_playing) + self._add("/live/track/get/arrangement_clips/name", self._get_arr_clips_name) + self._add("/live/track/get/arrangement_clips/length", self._get_arr_clips_length) + self._add("/live/track/get/arrangement_clips/start_time", self._get_arr_clips_start) + self._add("/live/track/get/num_devices", self._get_num_devices) + self._add("/live/track/get/devices/name", self._get_devices_name) + self._add("/live/track/get/devices/type", self._get_devices_type) + self._add("/live/track/get/devices/class_name", self._get_devices_class_name) + self._add("/live/track/get/devices/can_have_chains", self._get_devices_can_have_chains) + + # Methods + self._add("/live/track/stop_all_clips", self._stop_all_clips) + self._add("/live/track/delete_device", self._delete_device) + self._add("/live/track/duplicate_device", self._duplicate_device) + + # ------------------------------------------------------------------ + # Track resolution (supports wildcard *) + # ------------------------------------------------------------------ + + def _get_tracks(self, params: tuple) -> List[Tuple[int, Any]]: + tracks = list(self.song.tracks) + if not params or params[0] == "*": + return list(enumerate(tracks)) + idx = int(params[0]) + if 0 <= idx < len(tracks): + return [(idx, tracks[idx])] + return [] + + def _get_track(self, params: tuple) -> Optional[Any]: + tracks = list(self.song.tracks) + if not params: + return None + idx = int(params[0]) + if 0 <= idx < len(tracks): + return tracks[idx] + return None + + # ------------------------------------------------------------------ + # Property factories + # ------------------------------------------------------------------ + + def _make_getter(self, prop: str): + def handler(params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_tracks(params): + val = self._get_prop(track, prop) + if val is not None: + result += [idx, val] + return tuple(result) if result else None + return handler + + def _make_setter(self, prop: str): + def handler(params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + if track: + self._set_prop(track, prop, params[1]) + return None + return handler + + def _make_start_listen(self, prop: str): + def handler(params: tuple) -> None: + for idx, track in self._get_tracks(params): + key = f"track.{idx}.{prop}" + self._register_listener( + key, track, prop, + f"/live/track/get/{prop}", (idx,) + ) + return handler + + def _make_stop_listen(self, prop: str): + def handler(params: tuple) -> None: + for idx, track in self._get_tracks(params): + self._remove_listener(f"track.{idx}.{prop}") + return handler + + # ------------------------------------------------------------------ + # Mixer device + # ------------------------------------------------------------------ + + def _get_volume(self, params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_tracks(params): + try: + val = track.mixer_device.volume.value + result += [idx, val] + except Exception: + pass + return tuple(result) if result else None + + def _set_volume(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + if track: + try: + track.mixer_device.volume.value = float(params[1]) + except Exception as e: + logger.warning("set volume: %s", e) + return None + + def _start_listen_volume(self, params: tuple) -> None: + for idx, track in self._get_tracks(params): + key = f"track.{idx}.volume" + self._remove_listener(key) + param = track.mixer_device.volume + + def make_cb(i, p): + def cb(): + self._send("/live/track/get/volume", (i, p.value)) + return cb + + cb = make_cb(idx, param) + try: + param.add_value_listener(cb) + self._listener_store[key] = (param.remove_value_listener, cb) + cb() + except Exception as e: + logger.warning("volume listener: %s", e) + + def _stop_listen_volume(self, params: tuple) -> None: + for idx, _ in self._get_tracks(params): + self._remove_listener(f"track.{idx}.volume") + + def _get_panning(self, params: tuple) -> Optional[tuple]: + result = [] + for idx, track in self._get_tracks(params): + try: + val = track.mixer_device.panning.value + result += [idx, val] + except Exception: + pass + return tuple(result) if result else None + + def _set_panning(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + if track: + try: + track.mixer_device.panning.value = float(params[1]) + except Exception as e: + logger.warning("set panning: %s", e) + return None + + def _start_listen_panning(self, params: tuple) -> None: + for idx, track in self._get_tracks(params): + key = f"track.{idx}.panning" + self._remove_listener(key) + param = track.mixer_device.panning + + def make_cb(i, p): + def cb(): + self._send("/live/track/get/panning", (i, p.value)) + return cb + + cb = make_cb(idx, param) + try: + param.add_value_listener(cb) + self._listener_store[key] = (param.remove_value_listener, cb) + cb() + except Exception as e: + logger.warning("panning listener: %s", e) + + def _stop_listen_panning(self, params: tuple) -> None: + for idx, _ in self._get_tracks(params): + self._remove_listener(f"track.{idx}.panning") + + # ------------------------------------------------------------------ + # Sends + # ------------------------------------------------------------------ + + def _get_send(self, params: tuple) -> Optional[tuple]: + if len(params) < 2: + return None + track = self._get_track(params) + send_idx = int(params[1]) + if track: + try: + sends = list(track.mixer_device.sends) + if 0 <= send_idx < len(sends): + return (int(params[0]), send_idx, sends[send_idx].value) + except Exception as e: + logger.warning("get send: %s", e) + return None + + def _set_send(self, params: tuple) -> None: + if len(params) < 3: + return None + track = self._get_track(params) + send_idx = int(params[1]) + value = float(params[2]) + if track: + try: + sends = list(track.mixer_device.sends) + if 0 <= send_idx < len(sends): + sends[send_idx].value = value + except Exception as e: + logger.warning("set send: %s", e) + return None + + def _start_listen_send(self, params: tuple) -> None: + if len(params) < 2: + return + track = self._get_track(params) + track_idx = int(params[0]) + send_idx = int(params[1]) + if track: + key = f"track.{track_idx}.send.{send_idx}" + self._remove_listener(key) + try: + sends = list(track.mixer_device.sends) + if 0 <= send_idx < len(sends): + param = sends[send_idx] + + def make_cb(ti, si, p): + def cb(): + self._send("/live/track/get/send", (ti, si, p.value)) + return cb + + cb = make_cb(track_idx, send_idx, param) + param.add_value_listener(cb) + self._listener_store[key] = (param.remove_value_listener, cb) + cb() + except Exception as e: + logger.warning("send listener: %s", e) + + def _stop_listen_send(self, params: tuple) -> None: + if len(params) >= 2: + self._remove_listener(f"track.{int(params[0])}.send.{int(params[1])}") + + # ------------------------------------------------------------------ + # Routing + # ------------------------------------------------------------------ + + def _get_available_input_routing_types(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + types = [rt.display_name for rt in track.available_input_routing_types] + return tuple(types) + except Exception as e: + logger.warning("input routing types: %s", e) + return None + + def _get_available_input_routing_channels(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + channels = [rc.display_name for rc in track.available_input_routing_channels] + return tuple(channels) + except Exception as e: + logger.warning("input routing channels: %s", e) + return None + + def _get_input_routing_type(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return (track.input_routing_type.display_name,) + except Exception as e: + logger.warning("get input_routing_type: %s", e) + return None + + def _set_input_routing_type(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + name = str(params[1]) + if track: + try: + for rt in track.available_input_routing_types: + if rt.display_name == name: + track.input_routing_type = rt + break + except Exception as e: + logger.warning("set input_routing_type: %s", e) + return None + + def _get_input_routing_channel(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return (track.input_routing_channel.display_name,) + except Exception as e: + logger.warning("get input_routing_channel: %s", e) + return None + + def _set_input_routing_channel(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + name = str(params[1]) + if track: + try: + for rc in track.available_input_routing_channels: + if rc.display_name == name: + track.input_routing_channel = rc + break + except Exception as e: + logger.warning("set input_routing_channel: %s", e) + return None + + def _get_available_output_routing_types(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + types = [rt.display_name for rt in track.available_output_routing_types] + return tuple(types) + except Exception as e: + logger.warning("output routing types: %s", e) + return None + + def _get_available_output_routing_channels(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + channels = [rc.display_name for rc in track.available_output_routing_channels] + return tuple(channels) + except Exception as e: + logger.warning("output routing channels: %s", e) + return None + + def _get_output_routing_type(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return (track.output_routing_type.display_name,) + except Exception as e: + logger.warning("get output_routing_type: %s", e) + return None + + def _set_output_routing_type(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + name = str(params[1]) + if track: + try: + for rt in track.available_output_routing_types: + if rt.display_name == name: + track.output_routing_type = rt + break + except Exception as e: + logger.warning("set output_routing_type: %s", e) + return None + + def _get_output_routing_channel(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return (track.output_routing_channel.display_name,) + except Exception as e: + logger.warning("get output_routing_channel: %s", e) + return None + + def _set_output_routing_channel(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + name = str(params[1]) + if track: + try: + for rc in track.available_output_routing_channels: + if rc.display_name == name: + track.output_routing_channel = rc + break + except Exception as e: + logger.warning("set output_routing_channel: %s", e) + return None + + # ------------------------------------------------------------------ + # Aggregate clip info + # ------------------------------------------------------------------ + + def _get_session_clips(self, track): + return [slot.clip for slot in track.clip_slots if slot.has_clip] + + def _get_clips_name(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(c.name for c in self._get_session_clips(track)) + return None + + def _get_clips_length(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(float(c.length) for c in self._get_session_clips(track)) + return None + + def _get_clips_color(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(c.color for c in self._get_session_clips(track)) + return None + + def _get_clips_is_playing(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(bool(c.is_playing) for c in self._get_session_clips(track)) + return None + + def _get_arr_clips_name(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return tuple(c.name for c in track.arrangement_clips) + except Exception: + return None + return None + + def _get_arr_clips_length(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return tuple(float(c.length) for c in track.arrangement_clips) + except Exception: + return None + return None + + def _get_arr_clips_start(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + try: + return tuple(float(c.start_time) for c in track.arrangement_clips) + except Exception: + return None + return None + + # ------------------------------------------------------------------ + # Device info + # ------------------------------------------------------------------ + + def _get_num_devices(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return (len(track.devices),) + return None + + def _get_devices_name(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(d.name for d in track.devices) + return None + + def _get_devices_type(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(int(d.type) for d in track.devices) + return None + + def _get_devices_class_name(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(d.class_name for d in track.devices) + return None + + def _get_devices_can_have_chains(self, params: tuple) -> Optional[tuple]: + track = self._get_track(params) + if track: + return tuple(bool(getattr(d, "can_have_chains", False)) for d in track.devices) + return None + + # ------------------------------------------------------------------ + # Methods + # ------------------------------------------------------------------ + + def _stop_all_clips(self, params: tuple) -> None: + for _, track in self._get_tracks(params): + try: + track.stop_all_clips() + except Exception as e: + logger.warning("stop_all_clips: %s", e) + return None + + def _delete_device(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + device_idx = int(params[1]) + if track: + try: + track.delete_device(device_idx) + except Exception as e: + logger.warning("delete_device: %s", e) + return None + + def _duplicate_device(self, params: tuple) -> None: + if len(params) < 2: + return None + track = self._get_track(params) + device_idx = int(params[1]) + if track: + try: + track.duplicate_device(device_idx) + except Exception as e: + logger.warning("duplicate_device: %s", e) + return None diff --git a/AbletonOSC/view.py b/AbletonOSC/view.py new file mode 100644 index 0000000..9608741 --- /dev/null +++ b/AbletonOSC/view.py @@ -0,0 +1,216 @@ +"""Handles /live/view/* OSC addresses (UI navigation).""" +import logging +from typing import Optional +from .handler import AbletonOSCHandler + +logger = logging.getLogger(__name__) + + +class ViewHandler(AbletonOSCHandler): + def init_api(self) -> None: + self.clear_listeners() + view = self._song_view() + + # selected scene + self._add("/live/view/get/selected_scene", self._get_selected_scene) + self._add("/live/view/set/selected_scene", self._set_selected_scene) + self._add("/live/view/start_listen/selected_scene", self._start_listen_selected_scene) + self._add("/live/view/stop_listen/selected_scene", self._stop_listen_selected_scene) + + # selected track + self._add("/live/view/get/selected_track", self._get_selected_track) + self._add("/live/view/set/selected_track", self._set_selected_track) + self._add("/live/view/start_listen/selected_track", self._start_listen_selected_track) + self._add("/live/view/stop_listen/selected_track", self._stop_listen_selected_track) + + # selected clip + self._add("/live/view/get/selected_clip", self._get_selected_clip) + self._add("/live/view/set/selected_clip", self._set_selected_clip) + + # selected device + self._add("/live/view/get/selected_device", self._get_selected_device) + + # focus + self._add("/live/view/show_clip_detail_view", self._show_clip_detail_view) + self._add("/live/view/show_device_detail_view", self._show_device_detail_view) + self._add("/live/view/focus_browser", self._focus_browser) + + # arrangement / session toggle + self._add("/live/view/get/focused_document_view", self._get_focused_document_view) + + # ------------------------------------------------------------------ + + def _song_view(self): + try: + return self.song.view + except Exception: + return None + + def _get_selected_scene(self, params: tuple) -> Optional[tuple]: + try: + view = self._song_view() + scenes = list(self.song.scenes) + scene = view.selected_scene + idx = scenes.index(scene) + return (idx,) + except Exception as e: + logger.warning("get selected_scene: %s", e) + return None + + def _set_selected_scene(self, params: tuple) -> None: + if not params: + return None + try: + idx = int(params[0]) + scenes = list(self.song.scenes) + if 0 <= idx < len(scenes): + self._song_view().selected_scene = scenes[idx] + except Exception as e: + logger.warning("set selected_scene: %s", e) + return None + + def _start_listen_selected_scene(self, params: tuple) -> None: + view = self._song_view() + if view is None: + return + key = "view.selected_scene" + self._remove_listener(key) + + def cb(): + result = self._get_selected_scene(()) + if result: + self._send("/live/view/get/selected_scene", result) + + try: + view.add_selected_scene_listener(cb) + self._listener_store[key] = (view.remove_selected_scene_listener, cb) + cb() + except Exception as e: + logger.warning("selected_scene listener: %s", e) + + def _stop_listen_selected_scene(self, params: tuple) -> None: + self._remove_listener("view.selected_scene") + + def _get_selected_track(self, params: tuple) -> Optional[tuple]: + try: + view = self._song_view() + all_tracks = list(self.song.tracks) + list(self.song.return_tracks) + track = view.selected_track + idx = all_tracks.index(track) + return (idx,) + except Exception as e: + logger.warning("get selected_track: %s", e) + return None + + def _set_selected_track(self, params: tuple) -> None: + if not params: + return None + try: + idx = int(params[0]) + all_tracks = list(self.song.tracks) + list(self.song.return_tracks) + if 0 <= idx < len(all_tracks): + self._song_view().selected_track = all_tracks[idx] + except Exception as e: + logger.warning("set selected_track: %s", e) + return None + + def _start_listen_selected_track(self, params: tuple) -> None: + view = self._song_view() + if view is None: + return + key = "view.selected_track" + self._remove_listener(key) + + def cb(): + result = self._get_selected_track(()) + if result: + self._send("/live/view/get/selected_track", result) + + try: + view.add_selected_track_listener(cb) + self._listener_store[key] = (view.remove_selected_track_listener, cb) + cb() + except Exception as e: + logger.warning("selected_track listener: %s", e) + + def _stop_listen_selected_track(self, params: tuple) -> None: + self._remove_listener("view.selected_track") + + def _get_selected_clip(self, params: tuple) -> Optional[tuple]: + try: + view = self._song_view() + clip_view = view.detail_clip + if clip_view is None: + return None + tracks = list(self.song.tracks) + for ti, track in enumerate(tracks): + for ci, slot in enumerate(track.clip_slots): + if slot.has_clip and slot.clip == clip_view: + return (ti, ci) + except Exception as e: + logger.warning("get selected_clip: %s", e) + return None + + def _set_selected_clip(self, params: tuple) -> None: + if len(params) < 2: + return None + try: + track_idx = int(params[0]) + clip_idx = int(params[1]) + tracks = list(self.song.tracks) + if 0 <= track_idx < len(tracks): + slots = list(tracks[track_idx].clip_slots) + if 0 <= clip_idx < len(slots) and slots[clip_idx].has_clip: + self._song_view().detail_clip = slots[clip_idx].clip + except Exception as e: + logger.warning("set selected_clip: %s", e) + return None + + def _get_selected_device(self, params: tuple) -> Optional[tuple]: + try: + view = self._song_view() + track = view.selected_track + tracks = list(self.song.tracks) + list(self.song.return_tracks) + track_idx = tracks.index(track) + selected = track.view.selected_device + if selected is None: + return None + devices = list(track.devices) + device_idx = devices.index(selected) + return (track_idx, device_idx) + except Exception as e: + logger.warning("get selected_device: %s", e) + return None + + def _show_clip_detail_view(self, params: tuple) -> None: + try: + app = self.manager.application() + app.view.show_view("Detail/Clip") + except Exception as e: + logger.warning("show_clip_detail_view: %s", e) + return None + + def _show_device_detail_view(self, params: tuple) -> None: + try: + app = self.manager.application() + app.view.show_view("Detail/DeviceChain") + except Exception as e: + logger.warning("show_device_detail_view: %s", e) + return None + + def _focus_browser(self, params: tuple) -> None: + try: + app = self.manager.application() + app.view.show_view("Browser") + except Exception as e: + logger.warning("focus_browser: %s", e) + return None + + def _get_focused_document_view(self, params: tuple) -> Optional[tuple]: + try: + app = self.manager.application() + view = app.view.focused_document_view + return (view,) + except Exception as e: + logger.warning("get focused_document_view: %s", e) + return None diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..651a183 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.backends.legacy:build" + +[project] +name = "ableton-osc" +version = "1.0.0" +description = "Full Ableton Live 11 OSC remote script" +requires-python = ">=3.8" +dependencies = [] + +[project.optional-dependencies] +dev = [ + "python-osc>=1.8", + "pytest>=7", +] + +[tool.setuptools.packages.find] +where = ["."] +include = ["AbletonOSC*"]