From de1ab00f7186a453dfa4e94cc02b9913ed905dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kr=C3=BCger?= Date: Sun, 31 May 2026 21:07:47 +0200 Subject: [PATCH] Initial implementation of ableton-mcp MCP server Full-featured MCP server exposing 124 tools for complete Ableton Live control via AbletonOSC over OSC/UDP. Covers transport, tracks, clips, clip slots, scenes, devices, view selection, and real-time listeners. Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 40 ++++ pyproject.toml | 19 ++ src/ableton_mcp/__init__.py | 0 src/ableton_mcp/config.py | 6 + src/ableton_mcp/osc_client.py | 145 +++++++++++++ src/ableton_mcp/server.py | 35 ++++ src/ableton_mcp/tools/__init__.py | 0 src/ableton_mcp/tools/clip.py | 181 +++++++++++++++++ src/ableton_mcp/tools/clip_slot.py | 55 +++++ src/ableton_mcp/tools/device.py | 86 ++++++++ src/ableton_mcp/tools/listener.py | 120 +++++++++++ src/ableton_mcp/tools/scene.py | 59 ++++++ src/ableton_mcp/tools/song.py | 314 +++++++++++++++++++++++++++++ src/ableton_mcp/tools/system.py | 47 +++++ src/ableton_mcp/tools/track.py | 161 +++++++++++++++ src/ableton_mcp/tools/view.py | 44 ++++ 16 files changed, 1312 insertions(+) create mode 100644 .gitignore create mode 100644 pyproject.toml create mode 100644 src/ableton_mcp/__init__.py create mode 100644 src/ableton_mcp/config.py create mode 100644 src/ableton_mcp/osc_client.py create mode 100644 src/ableton_mcp/server.py create mode 100644 src/ableton_mcp/tools/__init__.py create mode 100644 src/ableton_mcp/tools/clip.py create mode 100644 src/ableton_mcp/tools/clip_slot.py create mode 100644 src/ableton_mcp/tools/device.py create mode 100644 src/ableton_mcp/tools/listener.py create mode 100644 src/ableton_mcp/tools/scene.py create mode 100644 src/ableton_mcp/tools/song.py create mode 100644 src/ableton_mcp/tools/system.py create mode 100644 src/ableton_mcp/tools/track.py create mode 100644 src/ableton_mcp/tools/view.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ef4bf30 --- /dev/null +++ b/.gitignore @@ -0,0 +1,40 @@ +# Virtual environment +.venv/ + +# Python +__pycache__/ +*.py[cod] +*.pyo +*.pyd +.Python +*.egg-info/ +dist/ +build/ +*.egg +.eggs/ + +# Distribution / packaging +*.whl +*.tar.gz + +# IDE +.idea/ +.vscode/ +*.suo +*.ntvs* +*.njsproj +*.sln +*.sw? + +# OS +.DS_Store +Thumbs.db + +# Testing +.pytest_cache/ +.coverage +htmlcov/ + +# Env files +.env +.env.* diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e674ca3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "ableton-mcp" +version = "0.1.0" +description = "Full-featured MCP server for controlling Ableton Live via AbletonOSC" +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.0", + "python-osc>=1.8", +] + +[project.scripts] +ableton-mcp = "ableton_mcp.server:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/ableton_mcp/__init__.py b/src/ableton_mcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ableton_mcp/config.py b/src/ableton_mcp/config.py new file mode 100644 index 0000000..215adad --- /dev/null +++ b/src/ableton_mcp/config.py @@ -0,0 +1,6 @@ +import os + +ABLETON_HOST = os.getenv("ABLETON_HOST", "127.0.0.1") +ABLETON_SEND_PORT = int(os.getenv("ABLETON_SEND_PORT", "11000")) +ABLETON_RECEIVE_PORT = int(os.getenv("ABLETON_RECEIVE_PORT", "11001")) +ABLETON_TIMEOUT = float(os.getenv("ABLETON_TIMEOUT", "5.0")) diff --git a/src/ableton_mcp/osc_client.py b/src/ableton_mcp/osc_client.py new file mode 100644 index 0000000..dcaf4f2 --- /dev/null +++ b/src/ableton_mcp/osc_client.py @@ -0,0 +1,145 @@ +"""Thread-safe OSC client/server for communicating with AbletonOSC.""" + +import threading +import time +from collections import deque +from typing import Any, Callable + +from pythonosc import udp_client, dispatcher, osc_server + +from ableton_mcp.config import ( + ABLETON_HOST, + ABLETON_SEND_PORT, + ABLETON_RECEIVE_PORT, + ABLETON_TIMEOUT, +) + + +class OSCClient: + """Singleton managing send/receive OSC communication with AbletonOSC.""" + + _instance: "OSCClient | None" = None + _lock = threading.Lock() + + def __new__(cls) -> "OSCClient": + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + self._initialized = True + + self._client = udp_client.SimpleUDPClient(ABLETON_HOST, ABLETON_SEND_PORT) + self._pending: dict[str, tuple[threading.Event, list]] = {} + self._pending_lock = threading.Lock() + self._listeners: dict[str, deque] = {} + self._listener_callbacks: dict[str, list[Callable]] = {} + + self._dispatcher = dispatcher.Dispatcher() + self._dispatcher.set_default_handler(self._handle_message) + + self._server = osc_server.ThreadingOSCUDPServer( + ("0.0.0.0", ABLETON_RECEIVE_PORT), self._dispatcher + ) + self._server_thread = threading.Thread( + target=self._server.serve_forever, daemon=True + ) + self._server_thread.start() + + def _handle_message(self, address: str, *args: Any) -> None: + # Resolve pending query + with self._pending_lock: + if address in self._pending: + event, result = self._pending[address] + result.clear() + result.extend(args) + event.set() + + # Push to listeners + if address in self._listeners: + self._listeners[address].append(args) + for cb in self._listener_callbacks.get(address, []): + try: + cb(address, *args) + except Exception: + pass + + def cmd(self, address: str, *args: Any) -> None: + """Send a fire-and-forget OSC command.""" + self._client.send_message(address, list(args) if args else []) + + def query(self, address: str, *args: Any, timeout: float | None = None) -> tuple: + """Send an OSC message and wait synchronously for a response.""" + if timeout is None: + timeout = ABLETON_TIMEOUT + + event = threading.Event() + result: list = [] + + with self._pending_lock: + self._pending[address] = (event, result) + + try: + self._client.send_message(address, list(args) if args else []) + if not event.wait(timeout): + raise TimeoutError( + f"No response from AbletonOSC for '{address}' within {timeout}s. " + "Ensure Ableton Live is running with AbletonOSC enabled." + ) + finally: + with self._pending_lock: + self._pending.pop(address, None) + + return tuple(result) + + def start_listener( + self, + address: str, + callback: Callable | None = None, + maxlen: int = 200, + ) -> None: + """Register a real-time listener for an OSC address.""" + if address not in self._listeners: + self._listeners[address] = deque(maxlen=maxlen) + if callback is not None: + self._listener_callbacks.setdefault(address, []).append(callback) + + def stop_listener(self, address: str) -> None: + """Remove all listeners for an OSC address.""" + self._listeners.pop(address, None) + self._listener_callbacks.pop(address, None) + + def drain_listener(self, address: str, max_events: int = 50) -> list[tuple]: + """Drain and return up to max_events queued listener events.""" + q = self._listeners.get(address) + if q is None: + return [] + events = [] + for _ in range(min(max_events, len(q))): + try: + events.append(q.popleft()) + except IndexError: + break + return events + + def ping(self) -> float: + """Return round-trip latency in ms, or raise TimeoutError.""" + t0 = time.monotonic() + self.query("/live/test") + return (time.monotonic() - t0) * 1000 + + +# Module-level singleton accessor +_client: OSCClient | None = None + + +def get_client() -> OSCClient: + global _client + if _client is None: + _client = OSCClient() + return _client diff --git a/src/ableton_mcp/server.py b/src/ableton_mcp/server.py new file mode 100644 index 0000000..c0d48ca --- /dev/null +++ b/src/ableton_mcp/server.py @@ -0,0 +1,35 @@ +"""ableton-mcp: Full-featured MCP server for controlling Ableton Live via AbletonOSC.""" + +from mcp.server.fastmcp import FastMCP + +from ableton_mcp.tools import ( + system, + song, + track, + clip, + clip_slot, + scene, + device, + view, + listener, +) + +mcp = FastMCP("ableton-mcp") + +system.register(mcp) +song.register(mcp) +track.register(mcp) +clip.register(mcp) +clip_slot.register(mcp) +scene.register(mcp) +device.register(mcp) +view.register(mcp) +listener.register(mcp) + + +def main() -> None: + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/src/ableton_mcp/tools/__init__.py b/src/ableton_mcp/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ableton_mcp/tools/clip.py b/src/ableton_mcp/tools/clip.py new file mode 100644 index 0000000..6452e2d --- /dev/null +++ b/src/ableton_mcp/tools/clip.py @@ -0,0 +1,181 @@ +"""MCP tools for Ableton Live clip control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def clip_fire(track_index: int, clip_index: int) -> None: + """Launch a clip.""" + get_client().cmd("/live/clip/fire", track_index, clip_index) + + @mcp.tool() + def clip_stop(track_index: int, clip_index: int) -> None: + """Stop a playing clip.""" + get_client().cmd("/live/clip/stop", track_index, clip_index) + + @mcp.tool() + def clip_get_info(track_index: int, clip_index: int) -> dict: + """Get all properties of a clip.""" + c = get_client() + return { + "name": c.query("/live/clip/get/name", track_index, clip_index)[0], + "length": c.query("/live/clip/get/length", track_index, clip_index)[0], + "is_playing": bool(c.query("/live/clip/get/is_playing", track_index, clip_index)[0]), + "is_recording": bool(c.query("/live/clip/get/is_recording", track_index, clip_index)[0]), + "is_midi_clip": bool(c.query("/live/clip/get/is_midi_clip", track_index, clip_index)[0]), + "is_audio_clip": bool(c.query("/live/clip/get/is_audio_clip", track_index, clip_index)[0]), + "muted": bool(c.query("/live/clip/get/muted", track_index, clip_index)[0]), + "looping": bool(c.query("/live/clip/get/looping", track_index, clip_index)[0]), + "loop_start": c.query("/live/clip/get/loop_start", track_index, clip_index)[0], + "loop_end": c.query("/live/clip/get/loop_end", track_index, clip_index)[0], + "start_marker": c.query("/live/clip/get/start_marker", track_index, clip_index)[0], + "end_marker": c.query("/live/clip/get/end_marker", track_index, clip_index)[0], + "pitch_coarse": c.query("/live/clip/get/pitch_coarse", track_index, clip_index)[0], + "pitch_fine": c.query("/live/clip/get/pitch_fine", track_index, clip_index)[0], + "gain": c.query("/live/clip/get/gain", track_index, clip_index)[0], + "gain_display_string": c.query("/live/clip/get/gain_display_string", track_index, clip_index)[0], + "color": c.query("/live/clip/get/color", track_index, clip_index)[0], + "launch_mode": c.query("/live/clip/get/launch_mode", track_index, clip_index)[0], + "launch_quantization": c.query("/live/clip/get/launch_quantization", track_index, clip_index)[0], + "playing_position": c.query("/live/clip/get/playing_position", track_index, clip_index)[0], + "will_record_on_start": bool(c.query("/live/clip/get/will_record_on_start", track_index, clip_index)[0]), + } + + @mcp.tool() + def clip_set_name(track_index: int, clip_index: int, name: str) -> None: + """Rename a clip.""" + get_client().cmd("/live/clip/set/name", track_index, clip_index, name) + + @mcp.tool() + def clip_set_color(track_index: int, clip_index: int, color_index: int) -> None: + """Set clip color by palette index (0–69).""" + get_client().cmd("/live/clip/set/color_index", track_index, clip_index, color_index) + + @mcp.tool() + def clip_set_gain(track_index: int, clip_index: int, gain: float) -> None: + """Set clip gain in dB.""" + get_client().cmd("/live/clip/set/gain", track_index, clip_index, gain) + + @mcp.tool() + def clip_set_muted(track_index: int, clip_index: int, muted: bool) -> None: + """Mute or unmute a clip.""" + get_client().cmd("/live/clip/set/muted", track_index, clip_index, int(muted)) + + @mcp.tool() + def clip_set_looping(track_index: int, clip_index: int, looping: bool) -> None: + """Enable or disable looping for a clip.""" + get_client().cmd("/live/clip/set/looping", track_index, clip_index, int(looping)) + + @mcp.tool() + def clip_set_loop_points(track_index: int, clip_index: int, start: float, end: float) -> None: + """Set loop start and end points in beats.""" + c = get_client() + c.cmd("/live/clip/set/loop_start", track_index, clip_index, start) + c.cmd("/live/clip/set/loop_end", track_index, clip_index, end) + + @mcp.tool() + def clip_set_markers(track_index: int, clip_index: int, start: float, end: float) -> None: + """Set the start and end markers of a clip in beats.""" + c = get_client() + c.cmd("/live/clip/set/start_marker", track_index, clip_index, start) + c.cmd("/live/clip/set/end_marker", track_index, clip_index, end) + + @mcp.tool() + def clip_set_pitch(track_index: int, clip_index: int, semitones: int, cents: int = 0) -> None: + """Transpose a clip. semitones: -48 to +48, cents: -50 to +50.""" + c = get_client() + c.cmd("/live/clip/set/pitch_coarse", track_index, clip_index, semitones) + c.cmd("/live/clip/set/pitch_fine", track_index, clip_index, cents) + + @mcp.tool() + def clip_set_warp_mode(track_index: int, clip_index: int, mode: int) -> None: + """Set warp mode: 0=Beats, 1=Tones, 2=Texture, 3=Re-Pitch, 4=Complex, 5=Complex Pro.""" + get_client().cmd("/live/clip/set/warp_mode", track_index, clip_index, mode) + + @mcp.tool() + def clip_set_warping(track_index: int, clip_index: int, enabled: bool) -> None: + """Enable or disable warping for an audio clip.""" + get_client().cmd("/live/clip/set/warping", track_index, clip_index, int(enabled)) + + @mcp.tool() + def clip_set_launch_mode(track_index: int, clip_index: int, mode: int) -> None: + """Set clip launch mode: 0=Trigger, 1=Gate, 2=Toggle, 3=Repeat.""" + get_client().cmd("/live/clip/set/launch_mode", track_index, clip_index, mode) + + @mcp.tool() + def clip_set_launch_quantization(track_index: int, clip_index: int, quantization: int) -> None: + """Set per-clip launch quantization (0–14, same enum as global quantization).""" + get_client().cmd("/live/clip/set/launch_quantization", track_index, clip_index, quantization) + + @mcp.tool() + def clip_set_legato(track_index: int, clip_index: int, legato: bool) -> None: + """Enable or disable legato mode for a clip.""" + get_client().cmd("/live/clip/set/legato", track_index, clip_index, int(legato)) + + @mcp.tool() + def clip_duplicate_loop(track_index: int, clip_index: int) -> None: + """Double the clip loop length by duplicating its content.""" + get_client().cmd("/live/clip/duplicate_loop", track_index, clip_index) + + # --- MIDI Notes --- + + @mcp.tool() + def clip_get_notes( + track_index: int, + clip_index: int, + pitch_start: int = 0, + pitch_span: int = 127, + time_start: float = 0.0, + time_span: float = 1000.0, + ) -> list: + """Get MIDI notes from a clip. Returns list of {pitch, start, duration, velocity, mute}.""" + result = get_client().query( + "/live/clip/get/notes", + track_index, clip_index, + pitch_start, pitch_span, + time_start, time_span, + ) + notes = [] + for i in range(0, len(result), 5): + notes.append({ + "pitch": result[i], + "start": result[i + 1], + "duration": result[i + 2], + "velocity": result[i + 3], + "mute": bool(result[i + 4]), + }) + return notes + + @mcp.tool() + def clip_add_notes(track_index: int, clip_index: int, notes: list) -> None: + """Add MIDI notes to a clip. notes: list of {pitch, start, duration, velocity, mute}. + Example: [{"pitch": 60, "start": 0.0, "duration": 0.5, "velocity": 100, "mute": 0}]""" + args = [track_index, clip_index] + for note in notes: + args.extend([ + int(note["pitch"]), + float(note["start"]), + float(note["duration"]), + int(note["velocity"]), + int(note.get("mute", 0)), + ]) + get_client().cmd("/live/clip/add/notes", *args) + + @mcp.tool() + def clip_remove_notes( + track_index: int, + clip_index: int, + pitch_start: int = 0, + pitch_span: int = 127, + time_start: float = 0.0, + time_span: float = 1000.0, + ) -> None: + """Remove MIDI notes from a clip within the given pitch/time range.""" + get_client().cmd( + "/live/clip/remove/notes", + track_index, clip_index, + pitch_start, pitch_span, + time_start, time_span, + ) diff --git a/src/ableton_mcp/tools/clip_slot.py b/src/ableton_mcp/tools/clip_slot.py new file mode 100644 index 0000000..52883a1 --- /dev/null +++ b/src/ableton_mcp/tools/clip_slot.py @@ -0,0 +1,55 @@ +"""MCP tools for Ableton Live clip slot control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def clip_slot_get_info(track_index: int, slot_index: int) -> dict: + """Get info about a clip slot. Returns {has_clip, is_playing, is_triggered, will_record, is_group_slot}.""" + c = get_client() + return { + "has_clip": bool(c.query("/live/clip_slot/get/has_clip", track_index, slot_index)[0]), + "is_playing": bool(c.query("/live/clip_slot/get/is_playing", track_index, slot_index)[0]), + "is_triggered": bool(c.query("/live/clip_slot/get/is_triggered", track_index, slot_index)[0]), + "will_record_on_start": bool(c.query("/live/clip_slot/get/will_record_on_start", track_index, slot_index)[0]), + "is_group_slot": bool(c.query("/live/clip_slot/get/is_group_slot", track_index, slot_index)[0]), + "controls_other_clips": bool(c.query("/live/clip_slot/get/controls_other_clips", track_index, slot_index)[0]), + "has_stop_button": bool(c.query("/live/clip_slot/get/has_stop_button", track_index, slot_index)[0]), + } + + @mcp.tool() + def clip_slot_fire(track_index: int, slot_index: int) -> None: + """Trigger the clip slot (launch clip or start recording if empty).""" + get_client().cmd("/live/clip_slot/fire", track_index, slot_index) + + @mcp.tool() + def clip_slot_stop(track_index: int, slot_index: int) -> None: + """Stop the clip slot.""" + get_client().cmd("/live/clip_slot/stop", track_index, slot_index) + + @mcp.tool() + def clip_slot_create_clip(track_index: int, slot_index: int, length_beats: float = 4.0) -> None: + """Create a new empty MIDI clip in a slot with the given length in beats.""" + get_client().cmd("/live/clip_slot/create_clip", track_index, slot_index, length_beats) + + @mcp.tool() + def clip_slot_delete_clip(track_index: int, slot_index: int) -> None: + """Delete the clip from a slot.""" + get_client().cmd("/live/clip_slot/delete_clip", track_index, slot_index) + + @mcp.tool() + def clip_slot_duplicate_to( + src_track: int, src_slot: int, dst_track: int, dst_slot: int + ) -> None: + """Copy a clip from one slot to another.""" + get_client().cmd( + "/live/clip_slot/duplicate_clip_to", + src_track, src_slot, dst_track, dst_slot, + ) + + @mcp.tool() + def clip_slot_set_stop_button(track_index: int, slot_index: int, enabled: bool) -> None: + """Enable or disable the stop button for a clip slot.""" + get_client().cmd("/live/clip_slot/set/has_stop_button", track_index, slot_index, int(enabled)) diff --git a/src/ableton_mcp/tools/device.py b/src/ableton_mcp/tools/device.py new file mode 100644 index 0000000..2c9ac07 --- /dev/null +++ b/src/ableton_mcp/tools/device.py @@ -0,0 +1,86 @@ +"""MCP tools for Ableton Live device / plugin parameter control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def device_get_info(track_index: int, device_index: int) -> dict: + """Get info about a device. Returns {name, type, class_name, num_parameters}.""" + c = get_client() + type_map = {1: "audio_effect", 2: "instrument", 4: "midi_effect"} + raw_type = c.query("/live/device/get/type", track_index, device_index)[0] + return { + "name": c.query("/live/device/get/name", track_index, device_index)[0], + "type": type_map.get(raw_type, raw_type), + "class_name": c.query("/live/device/get/class_name", track_index, device_index)[0], + "num_parameters": c.query("/live/device/get/num_parameters", track_index, device_index)[0], + } + + @mcp.tool() + def device_get_parameters(track_index: int, device_index: int) -> list: + """Get all parameters of a device. + Returns list of {index, name, value, min, max, is_quantized, value_string}.""" + c = get_client() + names = c.query("/live/device/get/parameters/name", track_index, device_index) + values = c.query("/live/device/get/parameters/value", track_index, device_index) + mins = c.query("/live/device/get/parameters/min", track_index, device_index) + maxs = c.query("/live/device/get/parameters/max", track_index, device_index) + quantized = c.query("/live/device/get/parameters/is_quantized", track_index, device_index) + return [ + { + "index": i, + "name": n, + "value": v, + "min": mn, + "max": mx, + "is_quantized": bool(q), + } + for i, (n, v, mn, mx, q) in enumerate(zip(names, values, mins, maxs, quantized)) + ] + + @mcp.tool() + def device_get_parameter(track_index: int, device_index: int, param_index: int) -> dict: + """Get a single device parameter. Returns {name, value, min, max, value_string}.""" + c = get_client() + return { + "name": c.query("/live/device/get/parameter/name", track_index, device_index, param_index)[0], + "value": c.query("/live/device/get/parameter/value", track_index, device_index, param_index)[0], + "value_string": c.query("/live/device/get/parameter/value_string", track_index, device_index, param_index)[0], + } + + @mcp.tool() + def device_set_parameter( + track_index: int, device_index: int, param_index: int, value: float + ) -> None: + """Set a device parameter value (normalized 0.0–1.0).""" + get_client().cmd( + "/live/device/set/parameter/value", + track_index, device_index, param_index, value, + ) + + @mcp.tool() + def device_set_parameters_bulk( + track_index: int, device_index: int, params: list + ) -> None: + """Set multiple device parameters in one call. + params: list of {index, value}, e.g. [{"index": 0, "value": 0.5}, ...]""" + args = [track_index, device_index] + for p in params: + args.extend([int(p["index"]), float(p["value"])]) + get_client().cmd("/live/device/set/parameters/value", *args) + + @mcp.tool() + def device_map_midi_cc( + track_index: int, + device_index: int, + param_index: int, + channel: int, + cc: int, + ) -> None: + """Map a MIDI CC to a device parameter. channel: 0–15, cc: 0–127.""" + get_client().cmd( + "/live/midimap/map_cc", + track_index, device_index, param_index, channel, cc, + ) diff --git a/src/ableton_mcp/tools/listener.py b/src/ableton_mcp/tools/listener.py new file mode 100644 index 0000000..7d79c0a --- /dev/null +++ b/src/ableton_mcp/tools/listener.py @@ -0,0 +1,120 @@ +"""MCP tools for subscribing to and polling real-time AbletonOSC property changes.""" + +from ableton_mcp.osc_client import get_client + +# Maps (object_type, property_name) to the OSC start/stop/response addresses. +_LISTENER_MAP: dict[tuple[str, str], tuple[str, str, str]] = { + # (object, property): (start_address, stop_address, response_address) + ("song", "beat"): ("/live/song/start_listen/beat", "/live/song/stop_listen/beat", "/live/song/get/beat"), + ("song", "is_playing"): ("/live/song/start_listen/is_playing", "/live/song/stop_listen/is_playing", "/live/song/get/is_playing"), + ("song", "tempo"): ("/live/song/start_listen/tempo", "/live/song/stop_listen/tempo", "/live/song/get/tempo"), + ("song", "current_song_time"):("/live/song/start_listen/current_song_time","/live/song/stop_listen/current_song_time","/live/song/get/current_song_time"), + ("view", "selected_track"): ("/live/view/start_listen/selected_track", "/live/view/stop_listen/selected_track", "/live/view/get/selected_track"), + ("view", "selected_scene"): ("/live/view/start_listen/selected_scene", "/live/view/stop_listen/selected_scene", "/live/view/get/selected_scene"), +} + +def _track_clip_addresses(object_type: str, property_name: str, track_index: int, clip_index: int | None = None) -> tuple[str, str, str, list]: + """Build start/stop/response OSC addresses for track- or clip-level listeners.""" + if object_type == "track": + start = f"/live/track/start_listen/{property_name}" + stop = f"/live/track/stop_listen/{property_name}" + response = f"/live/track/get/{property_name}" + args = [track_index] + elif object_type == "clip" and clip_index is not None: + start = f"/live/clip/start_listen/{property_name}" + stop = f"/live/clip/stop_listen/{property_name}" + response = f"/live/clip/get/{property_name}" + args = [track_index, clip_index] + elif object_type == "clip_slot" and clip_index is not None: + start = f"/live/clip_slot/start_listen/{property_name}" + stop = f"/live/clip_slot/stop_listen/{property_name}" + response = f"/live/clip_slot/get/{property_name}" + args = [track_index, clip_index] + else: + raise ValueError(f"Unknown object_type '{object_type}' or missing clip_index.") + return start, stop, response, args + + +def register(mcp): + + @mcp.tool() + def listener_start( + object_type: str, + property_name: str, + track_index: int | None = None, + clip_index: int | None = None, + ) -> str: + """Start listening to real-time changes for a property. + + object_type: 'song' | 'view' | 'track' | 'clip' | 'clip_slot' + property_name: e.g. 'beat', 'tempo', 'is_playing', 'volume', 'mute', 'name', etc. + track_index: required for track/clip/clip_slot listeners. + clip_index: required for clip/clip_slot listeners. + + Returns the OSC response address (use this when calling listener_get_events). + """ + c = get_client() + key = (object_type, property_name) + if key in _LISTENER_MAP: + start_addr, _, response_addr = _LISTENER_MAP[key] + c.cmd(start_addr) + c.start_listener(response_addr) + elif object_type in ("track", "clip", "clip_slot"): + if track_index is None: + raise ValueError("track_index is required for track/clip/clip_slot listeners.") + start_addr, _, response_addr, args = _track_clip_addresses( + object_type, property_name, track_index, clip_index + ) + c.cmd(start_addr, *args) + c.start_listener(response_addr) + else: + raise ValueError(f"Unknown object_type '{object_type}'. Use: song, view, track, clip, clip_slot.") + return response_addr + + @mcp.tool() + def listener_stop( + object_type: str, + property_name: str, + track_index: int | None = None, + clip_index: int | None = None, + ) -> None: + """Stop listening to real-time changes for a property.""" + c = get_client() + key = (object_type, property_name) + if key in _LISTENER_MAP: + _, stop_addr, response_addr = _LISTENER_MAP[key] + c.cmd(stop_addr) + c.stop_listener(response_addr) + elif object_type in ("track", "clip", "clip_slot"): + if track_index is None: + raise ValueError("track_index is required.") + _, stop_addr, response_addr, args = _track_clip_addresses( + object_type, property_name, track_index, clip_index + ) + c.cmd(stop_addr, *args) + c.stop_listener(response_addr) + + @mcp.tool() + def listener_get_events( + object_type: str, + property_name: str, + track_index: int | None = None, + clip_index: int | None = None, + max_events: int = 50, + ) -> list: + """Drain and return up to max_events queued listener events. + Each event is a list of the OSC arguments received (e.g. [value] or [beat, index, bar, time]). + Call listener_start first to register the listener.""" + key = (object_type, property_name) + if key in _LISTENER_MAP: + _, _, response_addr = _LISTENER_MAP[key] + elif object_type in ("track", "clip", "clip_slot"): + if track_index is None: + raise ValueError("track_index is required.") + _, _, response_addr, _ = _track_clip_addresses( + object_type, property_name, track_index, clip_index + ) + else: + raise ValueError(f"Unknown object_type '{object_type}'.") + events = get_client().drain_listener(response_addr, max_events) + return [list(e) for e in events] diff --git a/src/ableton_mcp/tools/scene.py b/src/ableton_mcp/tools/scene.py new file mode 100644 index 0000000..a742b13 --- /dev/null +++ b/src/ableton_mcp/tools/scene.py @@ -0,0 +1,59 @@ +"""MCP tools for Ableton Live scene control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def scene_fire(scene_index: int) -> None: + """Launch a scene by index, triggering all clips in that row.""" + get_client().cmd("/live/scene/fire", scene_index) + + @mcp.tool() + def scene_fire_selected() -> None: + """Launch the currently selected scene.""" + get_client().cmd("/live/scene/fire_selected") + + @mcp.tool() + def scene_get_info(scene_index: int) -> dict: + """Get all properties of a scene.""" + c = get_client() + return { + "name": c.query("/live/scene/get/name", scene_index)[0], + "color": c.query("/live/scene/get/color", scene_index)[0], + "tempo": c.query("/live/scene/get/tempo", scene_index)[0], + "tempo_enabled": bool(c.query("/live/scene/get/tempo_enabled", scene_index)[0]), + "time_signature_numerator": c.query("/live/scene/get/time_signature_numerator", scene_index)[0], + "time_signature_denominator": c.query("/live/scene/get/time_signature_denominator", scene_index)[0], + "time_signature_enabled": bool(c.query("/live/scene/get/time_signature_enabled", scene_index)[0]), + "is_empty": bool(c.query("/live/scene/get/is_empty", scene_index)[0]), + "is_triggered": bool(c.query("/live/scene/get/is_triggered", scene_index)[0]), + } + + @mcp.tool() + def scene_set_name(scene_index: int, name: str) -> None: + """Rename a scene.""" + get_client().cmd("/live/scene/set/name", scene_index, name) + + @mcp.tool() + def scene_set_color(scene_index: int, color_index: int) -> None: + """Set scene color by palette index (0–69).""" + get_client().cmd("/live/scene/set/color_index", scene_index, color_index) + + @mcp.tool() + def scene_set_tempo(scene_index: int, tempo: float, enabled: bool = True) -> None: + """Set a scene's tempo override and whether it's enabled.""" + c = get_client() + c.cmd("/live/scene/set/tempo", scene_index, tempo) + c.cmd("/live/scene/set/tempo_enabled", scene_index, int(enabled)) + + @mcp.tool() + def scene_set_time_signature( + scene_index: int, numerator: int, denominator: int, enabled: bool = True + ) -> None: + """Set a scene's time signature override.""" + c = get_client() + c.cmd("/live/scene/set/time_signature_numerator", scene_index, numerator) + c.cmd("/live/scene/set/time_signature_denominator", scene_index, denominator) + c.cmd("/live/scene/set/time_signature_enabled", scene_index, int(enabled)) diff --git a/src/ableton_mcp/tools/song.py b/src/ableton_mcp/tools/song.py new file mode 100644 index 0000000..00635ee --- /dev/null +++ b/src/ableton_mcp/tools/song.py @@ -0,0 +1,314 @@ +"""MCP tools for Ableton Live song / transport control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + # --- Transport --- + + @mcp.tool() + def song_get_state() -> dict: + """Get a comprehensive snapshot of the current song state.""" + c = get_client() + return { + "is_playing": bool(c.query("/live/song/get/is_playing")[0]), + "tempo": c.query("/live/song/get/tempo")[0], + "current_song_time": c.query("/live/song/get/current_song_time")[0], + "song_length": c.query("/live/song/get/song_length")[0], + "loop": bool(c.query("/live/song/get/loop")[0]), + "loop_start": c.query("/live/song/get/loop_start")[0], + "loop_length": c.query("/live/song/get/loop_length")[0], + "metronome": bool(c.query("/live/song/get/metronome")[0]), + "signature_numerator": c.query("/live/song/get/signature_numerator")[0], + "signature_denominator": c.query("/live/song/get/signature_denominator")[0], + "arrangement_overdub": bool(c.query("/live/song/get/arrangement_overdub")[0]), + "session_record": bool(c.query("/live/song/get/session_record")[0]), + "record_mode": bool(c.query("/live/song/get/record_mode")[0]), + "can_undo": bool(c.query("/live/song/get/can_undo")[0]), + "can_redo": bool(c.query("/live/song/get/can_redo")[0]), + "num_tracks": c.query("/live/song/get/num_tracks")[0], + "num_scenes": c.query("/live/song/get/num_scenes")[0], + "groove_amount": c.query("/live/song/get/groove_amount")[0], + "is_ableton_link_enabled": bool(c.query("/live/song/get/is_ableton_link_enabled")[0]), + "root_note": c.query("/live/song/get/root_note")[0], + "scale_name": c.query("/live/song/get/scale_name")[0], + } + + @mcp.tool() + def song_start_playing() -> None: + """Start song playback from the current position.""" + get_client().cmd("/live/song/start_playing") + + @mcp.tool() + def song_stop_playing() -> None: + """Stop song playback.""" + get_client().cmd("/live/song/stop_playing") + + @mcp.tool() + def song_continue_playing() -> None: + """Continue playback without resetting to the start.""" + get_client().cmd("/live/song/continue_playing") + + @mcp.tool() + def song_stop_all_clips() -> None: + """Stop all currently playing clips.""" + get_client().cmd("/live/song/stop_all_clips") + + @mcp.tool() + def song_tap_tempo() -> None: + """Send a tap-tempo pulse to set the BPM by tapping.""" + get_client().cmd("/live/song/tap_tempo") + + @mcp.tool() + def song_undo() -> None: + """Undo the last action in Ableton Live.""" + get_client().cmd("/live/song/undo") + + @mcp.tool() + def song_redo() -> None: + """Redo the last undone action in Ableton Live.""" + get_client().cmd("/live/song/redo") + + @mcp.tool() + def song_capture_midi() -> None: + """Capture incoming MIDI into a new clip.""" + get_client().cmd("/live/song/capture_midi") + + @mcp.tool() + def song_re_enable_automation() -> None: + """Re-enable automation that has been overridden.""" + get_client().cmd("/live/song/re_enable_automation") + + @mcp.tool() + def song_trigger_session_record() -> None: + """Toggle session record mode.""" + get_client().cmd("/live/song/trigger_session_record") + + # --- Tempo / Time --- + + @mcp.tool() + def song_get_tempo() -> float: + """Get the current song tempo in BPM.""" + return get_client().query("/live/song/get/tempo")[0] + + @mcp.tool() + def song_set_tempo(bpm: float) -> None: + """Set the song tempo. bpm: 20.0–300.0.""" + get_client().cmd("/live/song/set/tempo", bpm) + + @mcp.tool() + def song_get_time() -> float: + """Get the current playhead position in beats.""" + return get_client().query("/live/song/get/current_song_time")[0] + + @mcp.tool() + def song_set_time(beats: float) -> None: + """Jump the playhead to a position in beats.""" + get_client().cmd("/live/song/set/current_song_time", beats) + + @mcp.tool() + def song_get_time_signature() -> dict: + """Get the global time signature. Returns {numerator, denominator}.""" + c = get_client() + return { + "numerator": c.query("/live/song/get/signature_numerator")[0], + "denominator": c.query("/live/song/get/signature_denominator")[0], + } + + @mcp.tool() + def song_set_time_signature(numerator: int, denominator: int) -> None: + """Set the global time signature. denominator must be a power of 2 (1-16).""" + c = get_client() + c.cmd("/live/song/set/signature_numerator", numerator) + c.cmd("/live/song/set/signature_denominator", denominator) + + # --- Loop --- + + @mcp.tool() + def song_get_loop() -> dict: + """Get loop settings. Returns {loop, loop_start, loop_length}.""" + c = get_client() + return { + "loop": bool(c.query("/live/song/get/loop")[0]), + "loop_start": c.query("/live/song/get/loop_start")[0], + "loop_length": c.query("/live/song/get/loop_length")[0], + } + + @mcp.tool() + def song_set_loop(enabled: bool, start_beats: float | None = None, length_beats: float | None = None) -> None: + """Set loop on/off and optionally update loop start and length (in beats).""" + c = get_client() + c.cmd("/live/song/set/loop", int(enabled)) + if start_beats is not None: + c.cmd("/live/song/set/loop_start", start_beats) + if length_beats is not None: + c.cmd("/live/song/set/loop_length", length_beats) + + # --- Metronome / Quantization / Groove --- + + @mcp.tool() + def song_get_metronome() -> bool: + """Get metronome state.""" + return bool(get_client().query("/live/song/get/metronome")[0]) + + @mcp.tool() + def song_set_metronome(enabled: bool) -> None: + """Enable or disable the metronome.""" + get_client().cmd("/live/song/set/metronome", int(enabled)) + + @mcp.tool() + def song_set_quantization(value: int) -> None: + """Set clip trigger quantization (0=none, 1=8 bars, 2=4 bars, 3=2 bars, 4=1 bar, 5=1/2, 6=1/2T, 7=1/4, 8=1/4T, 9=1/8, 10=1/8T, 11=1/16, 12=1/16T, 13=1/32).""" + get_client().cmd("/live/song/set/clip_trigger_quantization", value) + + @mcp.tool() + def song_set_midi_recording_quantization(value: int) -> None: + """Set MIDI recording quantization (same enum as clip trigger quantization).""" + get_client().cmd("/live/song/set/midi_recording_quantization", value) + + @mcp.tool() + def song_set_groove(amount: float) -> None: + """Set global groove amount (0.0–1.0).""" + get_client().cmd("/live/song/set/groove_amount", amount) + + # --- Record / Overdub --- + + @mcp.tool() + def song_set_arrangement_overdub(enabled: bool) -> None: + """Enable or disable arrangement overdub.""" + get_client().cmd("/live/song/set/arrangement_overdub", int(enabled)) + + @mcp.tool() + def song_set_session_record(enabled: bool) -> None: + """Enable or disable session record.""" + get_client().cmd("/live/song/set/session_record", int(enabled)) + + @mcp.tool() + def song_set_punch_in(enabled: bool) -> None: + """Enable or disable punch-in recording.""" + get_client().cmd("/live/song/set/punch_in", int(enabled)) + + @mcp.tool() + def song_set_punch_out(enabled: bool) -> None: + """Enable or disable punch-out recording.""" + get_client().cmd("/live/song/set/punch_out", int(enabled)) + + # --- Scale / Root Note --- + + @mcp.tool() + def song_set_root_note(note: int) -> None: + """Set the root note (0=C, 1=C#, ..., 11=B).""" + get_client().cmd("/live/song/set/root_note", note) + + @mcp.tool() + def song_set_scale_name(scale_name: str) -> None: + """Set the scale name, e.g. 'Major', 'Minor', 'Dorian'.""" + get_client().cmd("/live/song/set/scale_name", scale_name) + + # --- Tracks --- + + @mcp.tool() + def song_get_tracks() -> list: + """Get all track names. Returns list of {index, name}.""" + names = get_client().query("/live/song/get/track_names") + return [{"index": i, "name": n} for i, n in enumerate(names)] + + @mcp.tool() + def song_create_audio_track(index: int = -1) -> None: + """Create a new audio track at the given index (-1 = end).""" + get_client().cmd("/live/song/create_audio_track", index) + + @mcp.tool() + def song_create_midi_track(index: int = -1) -> None: + """Create a new MIDI track at the given index (-1 = end).""" + get_client().cmd("/live/song/create_midi_track", index) + + @mcp.tool() + def song_create_return_track() -> None: + """Create a new return/auxiliary track.""" + get_client().cmd("/live/song/create_return_track") + + @mcp.tool() + def song_delete_track(track_index: int) -> None: + """Delete the track at track_index.""" + get_client().cmd("/live/song/delete_track", track_index) + + @mcp.tool() + def song_delete_return_track(track_index: int) -> None: + """Delete the return track at track_index.""" + get_client().cmd("/live/song/delete_return_track", track_index) + + @mcp.tool() + def song_duplicate_track(track_index: int) -> None: + """Duplicate the track at track_index.""" + get_client().cmd("/live/song/duplicate_track", track_index) + + # --- Scenes --- + + @mcp.tool() + def song_get_scenes() -> list: + """Get all scene names. Returns list of {index, name}.""" + names = get_client().query("/live/song/get/scenes/name") + return [{"index": i, "name": n} for i, n in enumerate(names)] + + @mcp.tool() + def song_create_scene(index: int = -1) -> None: + """Create a new scene at the given index (-1 = end).""" + get_client().cmd("/live/song/create_scene", index) + + @mcp.tool() + def song_delete_scene(scene_index: int) -> None: + """Delete the scene at scene_index.""" + get_client().cmd("/live/song/delete_scene", scene_index) + + @mcp.tool() + def song_duplicate_scene(scene_index: int) -> None: + """Duplicate the scene at scene_index.""" + get_client().cmd("/live/song/duplicate_scene", scene_index) + + @mcp.tool() + def song_capture_and_insert_scene() -> None: + """Capture the current session state and insert it as a new scene.""" + get_client().cmd("/live/song/capture_and_insert_scene") + + # --- Cue Points --- + + @mcp.tool() + def song_get_cue_points() -> list: + """Get all cue point names. Returns list of {index, name}.""" + names = get_client().query("/live/song/get/cue_points") + return [{"index": i, "name": n} for i, n in enumerate(names)] + + @mcp.tool() + def song_jump_to_cue(index_or_name: str) -> None: + """Jump to a cue point by index (int as string) or name.""" + try: + val = int(index_or_name) + except ValueError: + val = index_or_name + get_client().cmd("/live/song/cue_point/jump", val) + + @mcp.tool() + def song_add_or_delete_cue() -> None: + """Toggle a cue point at the current playhead position.""" + get_client().cmd("/live/song/set_or_delete_cue") + + @mcp.tool() + def song_set_cue_name(cue_index: int, name: str) -> None: + """Set the name of a cue point by index.""" + get_client().cmd("/live/song/cue_point/set/name", cue_index, name) + + # --- Structure export --- + + @mcp.tool() + def song_export_structure() -> None: + """Export the full session structure to a JSON file in the temp directory.""" + get_client().cmd("/live/song/export/structure") + + # --- Link --- + + @mcp.tool() + def song_set_ableton_link(enabled: bool) -> None: + """Enable or disable Ableton Link.""" + get_client().cmd("/live/song/set/is_ableton_link_enabled", int(enabled)) diff --git a/src/ableton_mcp/tools/system.py b/src/ableton_mcp/tools/system.py new file mode 100644 index 0000000..4d1a34a --- /dev/null +++ b/src/ableton_mcp/tools/system.py @@ -0,0 +1,47 @@ +"""MCP tools for Ableton Live system / application level.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + @mcp.tool() + def system_test_connection() -> dict: + """Test connectivity to AbletonOSC. Returns {connected, latency_ms}.""" + try: + latency = get_client().ping() + return {"connected": True, "latency_ms": round(latency, 2)} + except TimeoutError as e: + return {"connected": False, "error": str(e)} + + @mcp.tool() + def system_get_version() -> dict: + """Get the Ableton Live version. Returns {major, minor}.""" + result = get_client().query("/live/application/get/version") + return {"major": result[0], "minor": result[1]} + + @mcp.tool() + def system_get_cpu_usage() -> float: + """Get Ableton Live average CPU process usage (0.0–1.0).""" + result = get_client().query("/live/application/get/average_process_usage") + return result[0] + + @mcp.tool() + def system_show_message(message: str) -> None: + """Display a message in the Ableton Live status bar.""" + get_client().cmd("/live/api/show_message", message) + + @mcp.tool() + def system_set_log_level(level: str) -> None: + """Set AbletonOSC log level. level: debug | info | warning | error | critical.""" + get_client().cmd("/live/api/set/log_level", level) + + @mcp.tool() + def system_get_log_level() -> str: + """Get the current AbletonOSC log level.""" + result = get_client().query("/live/api/get/log_level") + return result[0] + + @mcp.tool() + def system_reload_api() -> None: + """Reload all AbletonOSC modules (useful after script updates).""" + get_client().cmd("/live/api/reload") diff --git a/src/ableton_mcp/tools/track.py b/src/ableton_mcp/tools/track.py new file mode 100644 index 0000000..3a61173 --- /dev/null +++ b/src/ableton_mcp/tools/track.py @@ -0,0 +1,161 @@ +"""MCP tools for Ableton Live track control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def track_get_info(track_index: int) -> dict: + """Get all properties of a track. Returns a comprehensive dict.""" + c = get_client() + return { + "name": c.query("/live/track/get/name", track_index)[0], + "volume": c.query("/live/track/get/volume", track_index)[0], + "panning": c.query("/live/track/get/panning", track_index)[0], + "mute": bool(c.query("/live/track/get/mute", track_index)[0]), + "solo": bool(c.query("/live/track/get/solo", track_index)[0]), + "arm": bool(c.query("/live/track/get/arm", track_index)[0]), + "can_be_armed": bool(c.query("/live/track/get/can_be_armed", track_index)[0]), + "is_foldable": bool(c.query("/live/track/get/is_foldable", track_index)[0]), + "is_grouped": bool(c.query("/live/track/get/is_grouped", track_index)[0]), + "is_visible": bool(c.query("/live/track/get/is_visible", track_index)[0]), + "has_audio_input": bool(c.query("/live/track/get/has_audio_input", track_index)[0]), + "has_audio_output": bool(c.query("/live/track/get/has_audio_output", track_index)[0]), + "has_midi_input": bool(c.query("/live/track/get/has_midi_input", track_index)[0]), + "has_midi_output": bool(c.query("/live/track/get/has_midi_output", track_index)[0]), + "color": c.query("/live/track/get/color", track_index)[0], + "current_monitoring_state": c.query("/live/track/get/current_monitoring_state", track_index)[0], + "playing_slot_index": c.query("/live/track/get/playing_slot_index", track_index)[0], + "fired_slot_index": c.query("/live/track/get/fired_slot_index", track_index)[0], + "num_devices": c.query("/live/track/get/num_devices", track_index)[0], + } + + @mcp.tool() + def track_set_volume(track_index: int, volume: float) -> None: + """Set track volume (0.0 = -inf, 0.85 ≈ 0 dB, 1.0 = +6 dB).""" + get_client().cmd("/live/track/set/volume", track_index, volume) + + @mcp.tool() + def track_set_pan(track_index: int, pan: float) -> None: + """Set track panning (-1.0 = full left, 0.0 = center, 1.0 = full right).""" + get_client().cmd("/live/track/set/panning", track_index, pan) + + @mcp.tool() + def track_set_mute(track_index: int, muted: bool) -> None: + """Mute or unmute a track.""" + get_client().cmd("/live/track/set/mute", track_index, int(muted)) + + @mcp.tool() + def track_set_solo(track_index: int, solo: bool) -> None: + """Solo or unsolo a track.""" + get_client().cmd("/live/track/set/solo", track_index, int(solo)) + + @mcp.tool() + def track_set_arm(track_index: int, armed: bool) -> None: + """Arm or disarm a track for recording.""" + get_client().cmd("/live/track/set/arm", track_index, int(armed)) + + @mcp.tool() + def track_set_send(track_index: int, send_index: int, value: float) -> None: + """Set send level from a track to a return track (0.0–1.0).""" + get_client().cmd("/live/track/set/send", track_index, send_index, value) + + @mcp.tool() + def track_get_send(track_index: int, send_index: int) -> float: + """Get the send level from a track to a return track.""" + return get_client().query("/live/track/get/send", track_index, send_index)[0] + + @mcp.tool() + def track_set_name(track_index: int, name: str) -> None: + """Rename a track.""" + get_client().cmd("/live/track/set/name", track_index, name) + + @mcp.tool() + def track_set_color(track_index: int, color_index: int) -> None: + """Set track color by palette index (0–69).""" + get_client().cmd("/live/track/set/color_index", track_index, color_index) + + @mcp.tool() + def track_stop_clips(track_index: int) -> None: + """Stop all clips on a track.""" + get_client().cmd("/live/track/stop_all_clips", track_index) + + @mcp.tool() + def track_set_monitoring(track_index: int, state: int) -> None: + """Set monitoring state: 0=Auto, 1=In, 2=Off.""" + get_client().cmd("/live/track/set/current_monitoring_state", track_index, state) + + @mcp.tool() + def track_set_fold(track_index: int, folded: bool) -> None: + """Fold or unfold a group track.""" + get_client().cmd("/live/track/set/fold_state", track_index, int(folded)) + + @mcp.tool() + def track_get_clips(track_index: int) -> list: + """Get all clips on a track. Returns list of {index, name, length, color}.""" + c = get_client() + names = c.query("/live/track/get/clips/name", track_index) + lengths = c.query("/live/track/get/clips/length", track_index) + colors = c.query("/live/track/get/clips/color", track_index) + result = [] + for i, (name, length, color) in enumerate(zip(names, lengths, colors)): + if name: + result.append({"index": i, "name": name, "length": length, "color": color}) + return result + + @mcp.tool() + def track_get_devices(track_index: int) -> list: + """Get all devices on a track. Returns list of {index, name, type, class_name}.""" + c = get_client() + names = c.query("/live/track/get/devices/name", track_index) + types = c.query("/live/track/get/devices/type", track_index) + class_names = c.query("/live/track/get/devices/class_name", track_index) + type_map = {1: "audio_effect", 2: "instrument", 4: "midi_effect"} + return [ + {"index": i, "name": n, "type": type_map.get(t, t), "class_name": cn} + for i, (n, t, cn) in enumerate(zip(names, types, class_names)) + ] + + @mcp.tool() + def track_get_meter(track_index: int) -> dict: + """Get track output meter levels. Returns {level, left, right}.""" + c = get_client() + return { + "level": c.query("/live/track/get/output_meter_level", track_index)[0], + "left": c.query("/live/track/get/output_meter_left", track_index)[0], + "right": c.query("/live/track/get/output_meter_right", track_index)[0], + } + + @mcp.tool() + def track_get_arrangement_clips(track_index: int) -> list: + """Get arrangement clips on a track. Returns list of {index, name, length, start_time}.""" + c = get_client() + names = c.query("/live/track/get/arrangement_clips/name", track_index) + lengths = c.query("/live/track/get/arrangement_clips/length", track_index) + starts = c.query("/live/track/get/arrangement_clips/start_time", track_index) + return [ + {"index": i, "name": n, "length": l, "start_time": s} + for i, (n, l, s) in enumerate(zip(names, lengths, starts)) + ] + + @mcp.tool() + def track_delete_device(track_index: int, device_index: int) -> None: + """Delete a device from a track.""" + get_client().cmd("/live/track/delete_device", track_index, device_index) + + @mcp.tool() + def track_get_available_input_routing_types(track_index: int) -> list: + """Get available input routing types for a track.""" + result = get_client().query("/live/track/get/available_input_routing_types", track_index) + return list(result) + + @mcp.tool() + def track_set_input_routing_type(track_index: int, routing_type: int) -> None: + """Set the input routing type for a track.""" + get_client().cmd("/live/track/set/input_routing_type", track_index, routing_type) + + @mcp.tool() + def track_set_output_routing_type(track_index: int, routing_type: int) -> None: + """Set the output routing type for a track.""" + get_client().cmd("/live/track/set/output_routing_type", track_index, routing_type) diff --git a/src/ableton_mcp/tools/view.py b/src/ableton_mcp/tools/view.py new file mode 100644 index 0000000..7b5d9f7 --- /dev/null +++ b/src/ableton_mcp/tools/view.py @@ -0,0 +1,44 @@ +"""MCP tools for Ableton Live view / selection control.""" + +from ableton_mcp.osc_client import get_client + + +def register(mcp): + + @mcp.tool() + def view_get_selection() -> dict: + """Get the currently selected track, scene, clip, and device. + Returns {selected_track, selected_scene, selected_clip_track, selected_clip_index, selected_device_track, selected_device_index}.""" + c = get_client() + scene = c.query("/live/view/get/selected_scene")[0] + track = c.query("/live/view/get/selected_track")[0] + clip = c.query("/live/view/get/selected_clip") + device = c.query("/live/view/get/selected_device") + return { + "selected_track": track, + "selected_scene": scene, + "selected_clip_track": clip[0] if len(clip) >= 1 else None, + "selected_clip_index": clip[1] if len(clip) >= 2 else None, + "selected_device_track": device[0] if len(device) >= 1 else None, + "selected_device_index": device[1] if len(device) >= 2 else None, + } + + @mcp.tool() + def view_set_selected_track(track_index: int) -> None: + """Select a track in the Ableton UI.""" + get_client().cmd("/live/view/set/selected_track", track_index) + + @mcp.tool() + def view_set_selected_scene(scene_index: int) -> None: + """Select a scene in the Ableton UI.""" + get_client().cmd("/live/view/set/selected_scene", scene_index) + + @mcp.tool() + def view_set_selected_clip(track_index: int, clip_index: int) -> None: + """Select a clip in the Ableton UI.""" + get_client().cmd("/live/view/set/selected_clip", track_index, clip_index) + + @mcp.tool() + def view_set_selected_device(track_index: int, device_index: int) -> None: + """Select a device in the Ableton UI.""" + get_client().cmd("/live/view/set/selected_device", track_index, device_index)