Initial implementation of ableton-mcp MCP server
Full-featured MCP server exposing 124 tools for complete Ableton Live control via AbletonOSC over OSC/UDP. Covers transport, tracks, clips, clip slots, scenes, devices, view selection, and real-time listeners. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
+40
@@ -0,0 +1,40 @@
|
||||
# Virtual environment
|
||||
.venv/
|
||||
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*.pyo
|
||||
*.pyd
|
||||
.Python
|
||||
*.egg-info/
|
||||
dist/
|
||||
build/
|
||||
*.egg
|
||||
.eggs/
|
||||
|
||||
# Distribution / packaging
|
||||
*.whl
|
||||
*.tar.gz
|
||||
|
||||
# IDE
|
||||
.idea/
|
||||
.vscode/
|
||||
*.suo
|
||||
*.ntvs*
|
||||
*.njsproj
|
||||
*.sln
|
||||
*.sw?
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
# Testing
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
|
||||
# Env files
|
||||
.env
|
||||
.env.*
|
||||
@@ -0,0 +1,19 @@
|
||||
[build-system]
|
||||
requires = ["setuptools>=68", "wheel"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "ableton-mcp"
|
||||
version = "0.1.0"
|
||||
description = "Full-featured MCP server for controlling Ableton Live via AbletonOSC"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"mcp[cli]>=1.0",
|
||||
"python-osc>=1.8",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
ableton-mcp = "ableton_mcp.server:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
@@ -0,0 +1,6 @@
|
||||
import os
|
||||
|
||||
ABLETON_HOST = os.getenv("ABLETON_HOST", "127.0.0.1")
|
||||
ABLETON_SEND_PORT = int(os.getenv("ABLETON_SEND_PORT", "11000"))
|
||||
ABLETON_RECEIVE_PORT = int(os.getenv("ABLETON_RECEIVE_PORT", "11001"))
|
||||
ABLETON_TIMEOUT = float(os.getenv("ABLETON_TIMEOUT", "5.0"))
|
||||
@@ -0,0 +1,145 @@
|
||||
"""Thread-safe OSC client/server for communicating with AbletonOSC."""
|
||||
|
||||
import threading
|
||||
import time
|
||||
from collections import deque
|
||||
from typing import Any, Callable
|
||||
|
||||
from pythonosc import udp_client, dispatcher, osc_server
|
||||
|
||||
from ableton_mcp.config import (
|
||||
ABLETON_HOST,
|
||||
ABLETON_SEND_PORT,
|
||||
ABLETON_RECEIVE_PORT,
|
||||
ABLETON_TIMEOUT,
|
||||
)
|
||||
|
||||
|
||||
class OSCClient:
|
||||
"""Singleton managing send/receive OSC communication with AbletonOSC."""
|
||||
|
||||
_instance: "OSCClient | None" = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls) -> "OSCClient":
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
instance = super().__new__(cls)
|
||||
instance._initialized = False
|
||||
cls._instance = instance
|
||||
return cls._instance
|
||||
|
||||
def __init__(self) -> None:
|
||||
if self._initialized:
|
||||
return
|
||||
self._initialized = True
|
||||
|
||||
self._client = udp_client.SimpleUDPClient(ABLETON_HOST, ABLETON_SEND_PORT)
|
||||
self._pending: dict[str, tuple[threading.Event, list]] = {}
|
||||
self._pending_lock = threading.Lock()
|
||||
self._listeners: dict[str, deque] = {}
|
||||
self._listener_callbacks: dict[str, list[Callable]] = {}
|
||||
|
||||
self._dispatcher = dispatcher.Dispatcher()
|
||||
self._dispatcher.set_default_handler(self._handle_message)
|
||||
|
||||
self._server = osc_server.ThreadingOSCUDPServer(
|
||||
("0.0.0.0", ABLETON_RECEIVE_PORT), self._dispatcher
|
||||
)
|
||||
self._server_thread = threading.Thread(
|
||||
target=self._server.serve_forever, daemon=True
|
||||
)
|
||||
self._server_thread.start()
|
||||
|
||||
def _handle_message(self, address: str, *args: Any) -> None:
|
||||
# Resolve pending query
|
||||
with self._pending_lock:
|
||||
if address in self._pending:
|
||||
event, result = self._pending[address]
|
||||
result.clear()
|
||||
result.extend(args)
|
||||
event.set()
|
||||
|
||||
# Push to listeners
|
||||
if address in self._listeners:
|
||||
self._listeners[address].append(args)
|
||||
for cb in self._listener_callbacks.get(address, []):
|
||||
try:
|
||||
cb(address, *args)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def cmd(self, address: str, *args: Any) -> None:
|
||||
"""Send a fire-and-forget OSC command."""
|
||||
self._client.send_message(address, list(args) if args else [])
|
||||
|
||||
def query(self, address: str, *args: Any, timeout: float | None = None) -> tuple:
|
||||
"""Send an OSC message and wait synchronously for a response."""
|
||||
if timeout is None:
|
||||
timeout = ABLETON_TIMEOUT
|
||||
|
||||
event = threading.Event()
|
||||
result: list = []
|
||||
|
||||
with self._pending_lock:
|
||||
self._pending[address] = (event, result)
|
||||
|
||||
try:
|
||||
self._client.send_message(address, list(args) if args else [])
|
||||
if not event.wait(timeout):
|
||||
raise TimeoutError(
|
||||
f"No response from AbletonOSC for '{address}' within {timeout}s. "
|
||||
"Ensure Ableton Live is running with AbletonOSC enabled."
|
||||
)
|
||||
finally:
|
||||
with self._pending_lock:
|
||||
self._pending.pop(address, None)
|
||||
|
||||
return tuple(result)
|
||||
|
||||
def start_listener(
|
||||
self,
|
||||
address: str,
|
||||
callback: Callable | None = None,
|
||||
maxlen: int = 200,
|
||||
) -> None:
|
||||
"""Register a real-time listener for an OSC address."""
|
||||
if address not in self._listeners:
|
||||
self._listeners[address] = deque(maxlen=maxlen)
|
||||
if callback is not None:
|
||||
self._listener_callbacks.setdefault(address, []).append(callback)
|
||||
|
||||
def stop_listener(self, address: str) -> None:
|
||||
"""Remove all listeners for an OSC address."""
|
||||
self._listeners.pop(address, None)
|
||||
self._listener_callbacks.pop(address, None)
|
||||
|
||||
def drain_listener(self, address: str, max_events: int = 50) -> list[tuple]:
|
||||
"""Drain and return up to max_events queued listener events."""
|
||||
q = self._listeners.get(address)
|
||||
if q is None:
|
||||
return []
|
||||
events = []
|
||||
for _ in range(min(max_events, len(q))):
|
||||
try:
|
||||
events.append(q.popleft())
|
||||
except IndexError:
|
||||
break
|
||||
return events
|
||||
|
||||
def ping(self) -> float:
|
||||
"""Return round-trip latency in ms, or raise TimeoutError."""
|
||||
t0 = time.monotonic()
|
||||
self.query("/live/test")
|
||||
return (time.monotonic() - t0) * 1000
|
||||
|
||||
|
||||
# Module-level singleton accessor
|
||||
_client: OSCClient | None = None
|
||||
|
||||
|
||||
def get_client() -> OSCClient:
|
||||
global _client
|
||||
if _client is None:
|
||||
_client = OSCClient()
|
||||
return _client
|
||||
@@ -0,0 +1,35 @@
|
||||
"""ableton-mcp: Full-featured MCP server for controlling Ableton Live via AbletonOSC."""
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
from ableton_mcp.tools import (
|
||||
system,
|
||||
song,
|
||||
track,
|
||||
clip,
|
||||
clip_slot,
|
||||
scene,
|
||||
device,
|
||||
view,
|
||||
listener,
|
||||
)
|
||||
|
||||
mcp = FastMCP("ableton-mcp")
|
||||
|
||||
system.register(mcp)
|
||||
song.register(mcp)
|
||||
track.register(mcp)
|
||||
clip.register(mcp)
|
||||
clip_slot.register(mcp)
|
||||
scene.register(mcp)
|
||||
device.register(mcp)
|
||||
view.register(mcp)
|
||||
listener.register(mcp)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
mcp.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,181 @@
|
||||
"""MCP tools for Ableton Live clip control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def clip_fire(track_index: int, clip_index: int) -> None:
|
||||
"""Launch a clip."""
|
||||
get_client().cmd("/live/clip/fire", track_index, clip_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_stop(track_index: int, clip_index: int) -> None:
|
||||
"""Stop a playing clip."""
|
||||
get_client().cmd("/live/clip/stop", track_index, clip_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_get_info(track_index: int, clip_index: int) -> dict:
|
||||
"""Get all properties of a clip."""
|
||||
c = get_client()
|
||||
return {
|
||||
"name": c.query("/live/clip/get/name", track_index, clip_index)[0],
|
||||
"length": c.query("/live/clip/get/length", track_index, clip_index)[0],
|
||||
"is_playing": bool(c.query("/live/clip/get/is_playing", track_index, clip_index)[0]),
|
||||
"is_recording": bool(c.query("/live/clip/get/is_recording", track_index, clip_index)[0]),
|
||||
"is_midi_clip": bool(c.query("/live/clip/get/is_midi_clip", track_index, clip_index)[0]),
|
||||
"is_audio_clip": bool(c.query("/live/clip/get/is_audio_clip", track_index, clip_index)[0]),
|
||||
"muted": bool(c.query("/live/clip/get/muted", track_index, clip_index)[0]),
|
||||
"looping": bool(c.query("/live/clip/get/looping", track_index, clip_index)[0]),
|
||||
"loop_start": c.query("/live/clip/get/loop_start", track_index, clip_index)[0],
|
||||
"loop_end": c.query("/live/clip/get/loop_end", track_index, clip_index)[0],
|
||||
"start_marker": c.query("/live/clip/get/start_marker", track_index, clip_index)[0],
|
||||
"end_marker": c.query("/live/clip/get/end_marker", track_index, clip_index)[0],
|
||||
"pitch_coarse": c.query("/live/clip/get/pitch_coarse", track_index, clip_index)[0],
|
||||
"pitch_fine": c.query("/live/clip/get/pitch_fine", track_index, clip_index)[0],
|
||||
"gain": c.query("/live/clip/get/gain", track_index, clip_index)[0],
|
||||
"gain_display_string": c.query("/live/clip/get/gain_display_string", track_index, clip_index)[0],
|
||||
"color": c.query("/live/clip/get/color", track_index, clip_index)[0],
|
||||
"launch_mode": c.query("/live/clip/get/launch_mode", track_index, clip_index)[0],
|
||||
"launch_quantization": c.query("/live/clip/get/launch_quantization", track_index, clip_index)[0],
|
||||
"playing_position": c.query("/live/clip/get/playing_position", track_index, clip_index)[0],
|
||||
"will_record_on_start": bool(c.query("/live/clip/get/will_record_on_start", track_index, clip_index)[0]),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_name(track_index: int, clip_index: int, name: str) -> None:
|
||||
"""Rename a clip."""
|
||||
get_client().cmd("/live/clip/set/name", track_index, clip_index, name)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_color(track_index: int, clip_index: int, color_index: int) -> None:
|
||||
"""Set clip color by palette index (0–69)."""
|
||||
get_client().cmd("/live/clip/set/color_index", track_index, clip_index, color_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_gain(track_index: int, clip_index: int, gain: float) -> None:
|
||||
"""Set clip gain in dB."""
|
||||
get_client().cmd("/live/clip/set/gain", track_index, clip_index, gain)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_muted(track_index: int, clip_index: int, muted: bool) -> None:
|
||||
"""Mute or unmute a clip."""
|
||||
get_client().cmd("/live/clip/set/muted", track_index, clip_index, int(muted))
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_looping(track_index: int, clip_index: int, looping: bool) -> None:
|
||||
"""Enable or disable looping for a clip."""
|
||||
get_client().cmd("/live/clip/set/looping", track_index, clip_index, int(looping))
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_loop_points(track_index: int, clip_index: int, start: float, end: float) -> None:
|
||||
"""Set loop start and end points in beats."""
|
||||
c = get_client()
|
||||
c.cmd("/live/clip/set/loop_start", track_index, clip_index, start)
|
||||
c.cmd("/live/clip/set/loop_end", track_index, clip_index, end)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_markers(track_index: int, clip_index: int, start: float, end: float) -> None:
|
||||
"""Set the start and end markers of a clip in beats."""
|
||||
c = get_client()
|
||||
c.cmd("/live/clip/set/start_marker", track_index, clip_index, start)
|
||||
c.cmd("/live/clip/set/end_marker", track_index, clip_index, end)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_pitch(track_index: int, clip_index: int, semitones: int, cents: int = 0) -> None:
|
||||
"""Transpose a clip. semitones: -48 to +48, cents: -50 to +50."""
|
||||
c = get_client()
|
||||
c.cmd("/live/clip/set/pitch_coarse", track_index, clip_index, semitones)
|
||||
c.cmd("/live/clip/set/pitch_fine", track_index, clip_index, cents)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_warp_mode(track_index: int, clip_index: int, mode: int) -> None:
|
||||
"""Set warp mode: 0=Beats, 1=Tones, 2=Texture, 3=Re-Pitch, 4=Complex, 5=Complex Pro."""
|
||||
get_client().cmd("/live/clip/set/warp_mode", track_index, clip_index, mode)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_warping(track_index: int, clip_index: int, enabled: bool) -> None:
|
||||
"""Enable or disable warping for an audio clip."""
|
||||
get_client().cmd("/live/clip/set/warping", track_index, clip_index, int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_launch_mode(track_index: int, clip_index: int, mode: int) -> None:
|
||||
"""Set clip launch mode: 0=Trigger, 1=Gate, 2=Toggle, 3=Repeat."""
|
||||
get_client().cmd("/live/clip/set/launch_mode", track_index, clip_index, mode)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_launch_quantization(track_index: int, clip_index: int, quantization: int) -> None:
|
||||
"""Set per-clip launch quantization (0–14, same enum as global quantization)."""
|
||||
get_client().cmd("/live/clip/set/launch_quantization", track_index, clip_index, quantization)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_set_legato(track_index: int, clip_index: int, legato: bool) -> None:
|
||||
"""Enable or disable legato mode for a clip."""
|
||||
get_client().cmd("/live/clip/set/legato", track_index, clip_index, int(legato))
|
||||
|
||||
@mcp.tool()
|
||||
def clip_duplicate_loop(track_index: int, clip_index: int) -> None:
|
||||
"""Double the clip loop length by duplicating its content."""
|
||||
get_client().cmd("/live/clip/duplicate_loop", track_index, clip_index)
|
||||
|
||||
# --- MIDI Notes ---
|
||||
|
||||
@mcp.tool()
|
||||
def clip_get_notes(
|
||||
track_index: int,
|
||||
clip_index: int,
|
||||
pitch_start: int = 0,
|
||||
pitch_span: int = 127,
|
||||
time_start: float = 0.0,
|
||||
time_span: float = 1000.0,
|
||||
) -> list:
|
||||
"""Get MIDI notes from a clip. Returns list of {pitch, start, duration, velocity, mute}."""
|
||||
result = get_client().query(
|
||||
"/live/clip/get/notes",
|
||||
track_index, clip_index,
|
||||
pitch_start, pitch_span,
|
||||
time_start, time_span,
|
||||
)
|
||||
notes = []
|
||||
for i in range(0, len(result), 5):
|
||||
notes.append({
|
||||
"pitch": result[i],
|
||||
"start": result[i + 1],
|
||||
"duration": result[i + 2],
|
||||
"velocity": result[i + 3],
|
||||
"mute": bool(result[i + 4]),
|
||||
})
|
||||
return notes
|
||||
|
||||
@mcp.tool()
|
||||
def clip_add_notes(track_index: int, clip_index: int, notes: list) -> None:
|
||||
"""Add MIDI notes to a clip. notes: list of {pitch, start, duration, velocity, mute}.
|
||||
Example: [{"pitch": 60, "start": 0.0, "duration": 0.5, "velocity": 100, "mute": 0}]"""
|
||||
args = [track_index, clip_index]
|
||||
for note in notes:
|
||||
args.extend([
|
||||
int(note["pitch"]),
|
||||
float(note["start"]),
|
||||
float(note["duration"]),
|
||||
int(note["velocity"]),
|
||||
int(note.get("mute", 0)),
|
||||
])
|
||||
get_client().cmd("/live/clip/add/notes", *args)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_remove_notes(
|
||||
track_index: int,
|
||||
clip_index: int,
|
||||
pitch_start: int = 0,
|
||||
pitch_span: int = 127,
|
||||
time_start: float = 0.0,
|
||||
time_span: float = 1000.0,
|
||||
) -> None:
|
||||
"""Remove MIDI notes from a clip within the given pitch/time range."""
|
||||
get_client().cmd(
|
||||
"/live/clip/remove/notes",
|
||||
track_index, clip_index,
|
||||
pitch_start, pitch_span,
|
||||
time_start, time_span,
|
||||
)
|
||||
@@ -0,0 +1,55 @@
|
||||
"""MCP tools for Ableton Live clip slot control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_get_info(track_index: int, slot_index: int) -> dict:
|
||||
"""Get info about a clip slot. Returns {has_clip, is_playing, is_triggered, will_record, is_group_slot}."""
|
||||
c = get_client()
|
||||
return {
|
||||
"has_clip": bool(c.query("/live/clip_slot/get/has_clip", track_index, slot_index)[0]),
|
||||
"is_playing": bool(c.query("/live/clip_slot/get/is_playing", track_index, slot_index)[0]),
|
||||
"is_triggered": bool(c.query("/live/clip_slot/get/is_triggered", track_index, slot_index)[0]),
|
||||
"will_record_on_start": bool(c.query("/live/clip_slot/get/will_record_on_start", track_index, slot_index)[0]),
|
||||
"is_group_slot": bool(c.query("/live/clip_slot/get/is_group_slot", track_index, slot_index)[0]),
|
||||
"controls_other_clips": bool(c.query("/live/clip_slot/get/controls_other_clips", track_index, slot_index)[0]),
|
||||
"has_stop_button": bool(c.query("/live/clip_slot/get/has_stop_button", track_index, slot_index)[0]),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_fire(track_index: int, slot_index: int) -> None:
|
||||
"""Trigger the clip slot (launch clip or start recording if empty)."""
|
||||
get_client().cmd("/live/clip_slot/fire", track_index, slot_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_stop(track_index: int, slot_index: int) -> None:
|
||||
"""Stop the clip slot."""
|
||||
get_client().cmd("/live/clip_slot/stop", track_index, slot_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_create_clip(track_index: int, slot_index: int, length_beats: float = 4.0) -> None:
|
||||
"""Create a new empty MIDI clip in a slot with the given length in beats."""
|
||||
get_client().cmd("/live/clip_slot/create_clip", track_index, slot_index, length_beats)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_delete_clip(track_index: int, slot_index: int) -> None:
|
||||
"""Delete the clip from a slot."""
|
||||
get_client().cmd("/live/clip_slot/delete_clip", track_index, slot_index)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_duplicate_to(
|
||||
src_track: int, src_slot: int, dst_track: int, dst_slot: int
|
||||
) -> None:
|
||||
"""Copy a clip from one slot to another."""
|
||||
get_client().cmd(
|
||||
"/live/clip_slot/duplicate_clip_to",
|
||||
src_track, src_slot, dst_track, dst_slot,
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
def clip_slot_set_stop_button(track_index: int, slot_index: int, enabled: bool) -> None:
|
||||
"""Enable or disable the stop button for a clip slot."""
|
||||
get_client().cmd("/live/clip_slot/set/has_stop_button", track_index, slot_index, int(enabled))
|
||||
@@ -0,0 +1,86 @@
|
||||
"""MCP tools for Ableton Live device / plugin parameter control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def device_get_info(track_index: int, device_index: int) -> dict:
|
||||
"""Get info about a device. Returns {name, type, class_name, num_parameters}."""
|
||||
c = get_client()
|
||||
type_map = {1: "audio_effect", 2: "instrument", 4: "midi_effect"}
|
||||
raw_type = c.query("/live/device/get/type", track_index, device_index)[0]
|
||||
return {
|
||||
"name": c.query("/live/device/get/name", track_index, device_index)[0],
|
||||
"type": type_map.get(raw_type, raw_type),
|
||||
"class_name": c.query("/live/device/get/class_name", track_index, device_index)[0],
|
||||
"num_parameters": c.query("/live/device/get/num_parameters", track_index, device_index)[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def device_get_parameters(track_index: int, device_index: int) -> list:
|
||||
"""Get all parameters of a device.
|
||||
Returns list of {index, name, value, min, max, is_quantized, value_string}."""
|
||||
c = get_client()
|
||||
names = c.query("/live/device/get/parameters/name", track_index, device_index)
|
||||
values = c.query("/live/device/get/parameters/value", track_index, device_index)
|
||||
mins = c.query("/live/device/get/parameters/min", track_index, device_index)
|
||||
maxs = c.query("/live/device/get/parameters/max", track_index, device_index)
|
||||
quantized = c.query("/live/device/get/parameters/is_quantized", track_index, device_index)
|
||||
return [
|
||||
{
|
||||
"index": i,
|
||||
"name": n,
|
||||
"value": v,
|
||||
"min": mn,
|
||||
"max": mx,
|
||||
"is_quantized": bool(q),
|
||||
}
|
||||
for i, (n, v, mn, mx, q) in enumerate(zip(names, values, mins, maxs, quantized))
|
||||
]
|
||||
|
||||
@mcp.tool()
|
||||
def device_get_parameter(track_index: int, device_index: int, param_index: int) -> dict:
|
||||
"""Get a single device parameter. Returns {name, value, min, max, value_string}."""
|
||||
c = get_client()
|
||||
return {
|
||||
"name": c.query("/live/device/get/parameter/name", track_index, device_index, param_index)[0],
|
||||
"value": c.query("/live/device/get/parameter/value", track_index, device_index, param_index)[0],
|
||||
"value_string": c.query("/live/device/get/parameter/value_string", track_index, device_index, param_index)[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def device_set_parameter(
|
||||
track_index: int, device_index: int, param_index: int, value: float
|
||||
) -> None:
|
||||
"""Set a device parameter value (normalized 0.0–1.0)."""
|
||||
get_client().cmd(
|
||||
"/live/device/set/parameter/value",
|
||||
track_index, device_index, param_index, value,
|
||||
)
|
||||
|
||||
@mcp.tool()
|
||||
def device_set_parameters_bulk(
|
||||
track_index: int, device_index: int, params: list
|
||||
) -> None:
|
||||
"""Set multiple device parameters in one call.
|
||||
params: list of {index, value}, e.g. [{"index": 0, "value": 0.5}, ...]"""
|
||||
args = [track_index, device_index]
|
||||
for p in params:
|
||||
args.extend([int(p["index"]), float(p["value"])])
|
||||
get_client().cmd("/live/device/set/parameters/value", *args)
|
||||
|
||||
@mcp.tool()
|
||||
def device_map_midi_cc(
|
||||
track_index: int,
|
||||
device_index: int,
|
||||
param_index: int,
|
||||
channel: int,
|
||||
cc: int,
|
||||
) -> None:
|
||||
"""Map a MIDI CC to a device parameter. channel: 0–15, cc: 0–127."""
|
||||
get_client().cmd(
|
||||
"/live/midimap/map_cc",
|
||||
track_index, device_index, param_index, channel, cc,
|
||||
)
|
||||
@@ -0,0 +1,120 @@
|
||||
"""MCP tools for subscribing to and polling real-time AbletonOSC property changes."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
# Maps (object_type, property_name) to the OSC start/stop/response addresses.
|
||||
_LISTENER_MAP: dict[tuple[str, str], tuple[str, str, str]] = {
|
||||
# (object, property): (start_address, stop_address, response_address)
|
||||
("song", "beat"): ("/live/song/start_listen/beat", "/live/song/stop_listen/beat", "/live/song/get/beat"),
|
||||
("song", "is_playing"): ("/live/song/start_listen/is_playing", "/live/song/stop_listen/is_playing", "/live/song/get/is_playing"),
|
||||
("song", "tempo"): ("/live/song/start_listen/tempo", "/live/song/stop_listen/tempo", "/live/song/get/tempo"),
|
||||
("song", "current_song_time"):("/live/song/start_listen/current_song_time","/live/song/stop_listen/current_song_time","/live/song/get/current_song_time"),
|
||||
("view", "selected_track"): ("/live/view/start_listen/selected_track", "/live/view/stop_listen/selected_track", "/live/view/get/selected_track"),
|
||||
("view", "selected_scene"): ("/live/view/start_listen/selected_scene", "/live/view/stop_listen/selected_scene", "/live/view/get/selected_scene"),
|
||||
}
|
||||
|
||||
def _track_clip_addresses(object_type: str, property_name: str, track_index: int, clip_index: int | None = None) -> tuple[str, str, str, list]:
|
||||
"""Build start/stop/response OSC addresses for track- or clip-level listeners."""
|
||||
if object_type == "track":
|
||||
start = f"/live/track/start_listen/{property_name}"
|
||||
stop = f"/live/track/stop_listen/{property_name}"
|
||||
response = f"/live/track/get/{property_name}"
|
||||
args = [track_index]
|
||||
elif object_type == "clip" and clip_index is not None:
|
||||
start = f"/live/clip/start_listen/{property_name}"
|
||||
stop = f"/live/clip/stop_listen/{property_name}"
|
||||
response = f"/live/clip/get/{property_name}"
|
||||
args = [track_index, clip_index]
|
||||
elif object_type == "clip_slot" and clip_index is not None:
|
||||
start = f"/live/clip_slot/start_listen/{property_name}"
|
||||
stop = f"/live/clip_slot/stop_listen/{property_name}"
|
||||
response = f"/live/clip_slot/get/{property_name}"
|
||||
args = [track_index, clip_index]
|
||||
else:
|
||||
raise ValueError(f"Unknown object_type '{object_type}' or missing clip_index.")
|
||||
return start, stop, response, args
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def listener_start(
|
||||
object_type: str,
|
||||
property_name: str,
|
||||
track_index: int | None = None,
|
||||
clip_index: int | None = None,
|
||||
) -> str:
|
||||
"""Start listening to real-time changes for a property.
|
||||
|
||||
object_type: 'song' | 'view' | 'track' | 'clip' | 'clip_slot'
|
||||
property_name: e.g. 'beat', 'tempo', 'is_playing', 'volume', 'mute', 'name', etc.
|
||||
track_index: required for track/clip/clip_slot listeners.
|
||||
clip_index: required for clip/clip_slot listeners.
|
||||
|
||||
Returns the OSC response address (use this when calling listener_get_events).
|
||||
"""
|
||||
c = get_client()
|
||||
key = (object_type, property_name)
|
||||
if key in _LISTENER_MAP:
|
||||
start_addr, _, response_addr = _LISTENER_MAP[key]
|
||||
c.cmd(start_addr)
|
||||
c.start_listener(response_addr)
|
||||
elif object_type in ("track", "clip", "clip_slot"):
|
||||
if track_index is None:
|
||||
raise ValueError("track_index is required for track/clip/clip_slot listeners.")
|
||||
start_addr, _, response_addr, args = _track_clip_addresses(
|
||||
object_type, property_name, track_index, clip_index
|
||||
)
|
||||
c.cmd(start_addr, *args)
|
||||
c.start_listener(response_addr)
|
||||
else:
|
||||
raise ValueError(f"Unknown object_type '{object_type}'. Use: song, view, track, clip, clip_slot.")
|
||||
return response_addr
|
||||
|
||||
@mcp.tool()
|
||||
def listener_stop(
|
||||
object_type: str,
|
||||
property_name: str,
|
||||
track_index: int | None = None,
|
||||
clip_index: int | None = None,
|
||||
) -> None:
|
||||
"""Stop listening to real-time changes for a property."""
|
||||
c = get_client()
|
||||
key = (object_type, property_name)
|
||||
if key in _LISTENER_MAP:
|
||||
_, stop_addr, response_addr = _LISTENER_MAP[key]
|
||||
c.cmd(stop_addr)
|
||||
c.stop_listener(response_addr)
|
||||
elif object_type in ("track", "clip", "clip_slot"):
|
||||
if track_index is None:
|
||||
raise ValueError("track_index is required.")
|
||||
_, stop_addr, response_addr, args = _track_clip_addresses(
|
||||
object_type, property_name, track_index, clip_index
|
||||
)
|
||||
c.cmd(stop_addr, *args)
|
||||
c.stop_listener(response_addr)
|
||||
|
||||
@mcp.tool()
|
||||
def listener_get_events(
|
||||
object_type: str,
|
||||
property_name: str,
|
||||
track_index: int | None = None,
|
||||
clip_index: int | None = None,
|
||||
max_events: int = 50,
|
||||
) -> list:
|
||||
"""Drain and return up to max_events queued listener events.
|
||||
Each event is a list of the OSC arguments received (e.g. [value] or [beat, index, bar, time]).
|
||||
Call listener_start first to register the listener."""
|
||||
key = (object_type, property_name)
|
||||
if key in _LISTENER_MAP:
|
||||
_, _, response_addr = _LISTENER_MAP[key]
|
||||
elif object_type in ("track", "clip", "clip_slot"):
|
||||
if track_index is None:
|
||||
raise ValueError("track_index is required.")
|
||||
_, _, response_addr, _ = _track_clip_addresses(
|
||||
object_type, property_name, track_index, clip_index
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unknown object_type '{object_type}'.")
|
||||
events = get_client().drain_listener(response_addr, max_events)
|
||||
return [list(e) for e in events]
|
||||
@@ -0,0 +1,59 @@
|
||||
"""MCP tools for Ableton Live scene control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def scene_fire(scene_index: int) -> None:
|
||||
"""Launch a scene by index, triggering all clips in that row."""
|
||||
get_client().cmd("/live/scene/fire", scene_index)
|
||||
|
||||
@mcp.tool()
|
||||
def scene_fire_selected() -> None:
|
||||
"""Launch the currently selected scene."""
|
||||
get_client().cmd("/live/scene/fire_selected")
|
||||
|
||||
@mcp.tool()
|
||||
def scene_get_info(scene_index: int) -> dict:
|
||||
"""Get all properties of a scene."""
|
||||
c = get_client()
|
||||
return {
|
||||
"name": c.query("/live/scene/get/name", scene_index)[0],
|
||||
"color": c.query("/live/scene/get/color", scene_index)[0],
|
||||
"tempo": c.query("/live/scene/get/tempo", scene_index)[0],
|
||||
"tempo_enabled": bool(c.query("/live/scene/get/tempo_enabled", scene_index)[0]),
|
||||
"time_signature_numerator": c.query("/live/scene/get/time_signature_numerator", scene_index)[0],
|
||||
"time_signature_denominator": c.query("/live/scene/get/time_signature_denominator", scene_index)[0],
|
||||
"time_signature_enabled": bool(c.query("/live/scene/get/time_signature_enabled", scene_index)[0]),
|
||||
"is_empty": bool(c.query("/live/scene/get/is_empty", scene_index)[0]),
|
||||
"is_triggered": bool(c.query("/live/scene/get/is_triggered", scene_index)[0]),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def scene_set_name(scene_index: int, name: str) -> None:
|
||||
"""Rename a scene."""
|
||||
get_client().cmd("/live/scene/set/name", scene_index, name)
|
||||
|
||||
@mcp.tool()
|
||||
def scene_set_color(scene_index: int, color_index: int) -> None:
|
||||
"""Set scene color by palette index (0–69)."""
|
||||
get_client().cmd("/live/scene/set/color_index", scene_index, color_index)
|
||||
|
||||
@mcp.tool()
|
||||
def scene_set_tempo(scene_index: int, tempo: float, enabled: bool = True) -> None:
|
||||
"""Set a scene's tempo override and whether it's enabled."""
|
||||
c = get_client()
|
||||
c.cmd("/live/scene/set/tempo", scene_index, tempo)
|
||||
c.cmd("/live/scene/set/tempo_enabled", scene_index, int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def scene_set_time_signature(
|
||||
scene_index: int, numerator: int, denominator: int, enabled: bool = True
|
||||
) -> None:
|
||||
"""Set a scene's time signature override."""
|
||||
c = get_client()
|
||||
c.cmd("/live/scene/set/time_signature_numerator", scene_index, numerator)
|
||||
c.cmd("/live/scene/set/time_signature_denominator", scene_index, denominator)
|
||||
c.cmd("/live/scene/set/time_signature_enabled", scene_index, int(enabled))
|
||||
@@ -0,0 +1,314 @@
|
||||
"""MCP tools for Ableton Live song / transport control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
# --- Transport ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_state() -> dict:
|
||||
"""Get a comprehensive snapshot of the current song state."""
|
||||
c = get_client()
|
||||
return {
|
||||
"is_playing": bool(c.query("/live/song/get/is_playing")[0]),
|
||||
"tempo": c.query("/live/song/get/tempo")[0],
|
||||
"current_song_time": c.query("/live/song/get/current_song_time")[0],
|
||||
"song_length": c.query("/live/song/get/song_length")[0],
|
||||
"loop": bool(c.query("/live/song/get/loop")[0]),
|
||||
"loop_start": c.query("/live/song/get/loop_start")[0],
|
||||
"loop_length": c.query("/live/song/get/loop_length")[0],
|
||||
"metronome": bool(c.query("/live/song/get/metronome")[0]),
|
||||
"signature_numerator": c.query("/live/song/get/signature_numerator")[0],
|
||||
"signature_denominator": c.query("/live/song/get/signature_denominator")[0],
|
||||
"arrangement_overdub": bool(c.query("/live/song/get/arrangement_overdub")[0]),
|
||||
"session_record": bool(c.query("/live/song/get/session_record")[0]),
|
||||
"record_mode": bool(c.query("/live/song/get/record_mode")[0]),
|
||||
"can_undo": bool(c.query("/live/song/get/can_undo")[0]),
|
||||
"can_redo": bool(c.query("/live/song/get/can_redo")[0]),
|
||||
"num_tracks": c.query("/live/song/get/num_tracks")[0],
|
||||
"num_scenes": c.query("/live/song/get/num_scenes")[0],
|
||||
"groove_amount": c.query("/live/song/get/groove_amount")[0],
|
||||
"is_ableton_link_enabled": bool(c.query("/live/song/get/is_ableton_link_enabled")[0]),
|
||||
"root_note": c.query("/live/song/get/root_note")[0],
|
||||
"scale_name": c.query("/live/song/get/scale_name")[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def song_start_playing() -> None:
|
||||
"""Start song playback from the current position."""
|
||||
get_client().cmd("/live/song/start_playing")
|
||||
|
||||
@mcp.tool()
|
||||
def song_stop_playing() -> None:
|
||||
"""Stop song playback."""
|
||||
get_client().cmd("/live/song/stop_playing")
|
||||
|
||||
@mcp.tool()
|
||||
def song_continue_playing() -> None:
|
||||
"""Continue playback without resetting to the start."""
|
||||
get_client().cmd("/live/song/continue_playing")
|
||||
|
||||
@mcp.tool()
|
||||
def song_stop_all_clips() -> None:
|
||||
"""Stop all currently playing clips."""
|
||||
get_client().cmd("/live/song/stop_all_clips")
|
||||
|
||||
@mcp.tool()
|
||||
def song_tap_tempo() -> None:
|
||||
"""Send a tap-tempo pulse to set the BPM by tapping."""
|
||||
get_client().cmd("/live/song/tap_tempo")
|
||||
|
||||
@mcp.tool()
|
||||
def song_undo() -> None:
|
||||
"""Undo the last action in Ableton Live."""
|
||||
get_client().cmd("/live/song/undo")
|
||||
|
||||
@mcp.tool()
|
||||
def song_redo() -> None:
|
||||
"""Redo the last undone action in Ableton Live."""
|
||||
get_client().cmd("/live/song/redo")
|
||||
|
||||
@mcp.tool()
|
||||
def song_capture_midi() -> None:
|
||||
"""Capture incoming MIDI into a new clip."""
|
||||
get_client().cmd("/live/song/capture_midi")
|
||||
|
||||
@mcp.tool()
|
||||
def song_re_enable_automation() -> None:
|
||||
"""Re-enable automation that has been overridden."""
|
||||
get_client().cmd("/live/song/re_enable_automation")
|
||||
|
||||
@mcp.tool()
|
||||
def song_trigger_session_record() -> None:
|
||||
"""Toggle session record mode."""
|
||||
get_client().cmd("/live/song/trigger_session_record")
|
||||
|
||||
# --- Tempo / Time ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_tempo() -> float:
|
||||
"""Get the current song tempo in BPM."""
|
||||
return get_client().query("/live/song/get/tempo")[0]
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_tempo(bpm: float) -> None:
|
||||
"""Set the song tempo. bpm: 20.0–300.0."""
|
||||
get_client().cmd("/live/song/set/tempo", bpm)
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_time() -> float:
|
||||
"""Get the current playhead position in beats."""
|
||||
return get_client().query("/live/song/get/current_song_time")[0]
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_time(beats: float) -> None:
|
||||
"""Jump the playhead to a position in beats."""
|
||||
get_client().cmd("/live/song/set/current_song_time", beats)
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_time_signature() -> dict:
|
||||
"""Get the global time signature. Returns {numerator, denominator}."""
|
||||
c = get_client()
|
||||
return {
|
||||
"numerator": c.query("/live/song/get/signature_numerator")[0],
|
||||
"denominator": c.query("/live/song/get/signature_denominator")[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_time_signature(numerator: int, denominator: int) -> None:
|
||||
"""Set the global time signature. denominator must be a power of 2 (1-16)."""
|
||||
c = get_client()
|
||||
c.cmd("/live/song/set/signature_numerator", numerator)
|
||||
c.cmd("/live/song/set/signature_denominator", denominator)
|
||||
|
||||
# --- Loop ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_loop() -> dict:
|
||||
"""Get loop settings. Returns {loop, loop_start, loop_length}."""
|
||||
c = get_client()
|
||||
return {
|
||||
"loop": bool(c.query("/live/song/get/loop")[0]),
|
||||
"loop_start": c.query("/live/song/get/loop_start")[0],
|
||||
"loop_length": c.query("/live/song/get/loop_length")[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_loop(enabled: bool, start_beats: float | None = None, length_beats: float | None = None) -> None:
|
||||
"""Set loop on/off and optionally update loop start and length (in beats)."""
|
||||
c = get_client()
|
||||
c.cmd("/live/song/set/loop", int(enabled))
|
||||
if start_beats is not None:
|
||||
c.cmd("/live/song/set/loop_start", start_beats)
|
||||
if length_beats is not None:
|
||||
c.cmd("/live/song/set/loop_length", length_beats)
|
||||
|
||||
# --- Metronome / Quantization / Groove ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_metronome() -> bool:
|
||||
"""Get metronome state."""
|
||||
return bool(get_client().query("/live/song/get/metronome")[0])
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_metronome(enabled: bool) -> None:
|
||||
"""Enable or disable the metronome."""
|
||||
get_client().cmd("/live/song/set/metronome", int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_quantization(value: int) -> None:
|
||||
"""Set clip trigger quantization (0=none, 1=8 bars, 2=4 bars, 3=2 bars, 4=1 bar, 5=1/2, 6=1/2T, 7=1/4, 8=1/4T, 9=1/8, 10=1/8T, 11=1/16, 12=1/16T, 13=1/32)."""
|
||||
get_client().cmd("/live/song/set/clip_trigger_quantization", value)
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_midi_recording_quantization(value: int) -> None:
|
||||
"""Set MIDI recording quantization (same enum as clip trigger quantization)."""
|
||||
get_client().cmd("/live/song/set/midi_recording_quantization", value)
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_groove(amount: float) -> None:
|
||||
"""Set global groove amount (0.0–1.0)."""
|
||||
get_client().cmd("/live/song/set/groove_amount", amount)
|
||||
|
||||
# --- Record / Overdub ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_arrangement_overdub(enabled: bool) -> None:
|
||||
"""Enable or disable arrangement overdub."""
|
||||
get_client().cmd("/live/song/set/arrangement_overdub", int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_session_record(enabled: bool) -> None:
|
||||
"""Enable or disable session record."""
|
||||
get_client().cmd("/live/song/set/session_record", int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_punch_in(enabled: bool) -> None:
|
||||
"""Enable or disable punch-in recording."""
|
||||
get_client().cmd("/live/song/set/punch_in", int(enabled))
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_punch_out(enabled: bool) -> None:
|
||||
"""Enable or disable punch-out recording."""
|
||||
get_client().cmd("/live/song/set/punch_out", int(enabled))
|
||||
|
||||
# --- Scale / Root Note ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_root_note(note: int) -> None:
|
||||
"""Set the root note (0=C, 1=C#, ..., 11=B)."""
|
||||
get_client().cmd("/live/song/set/root_note", note)
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_scale_name(scale_name: str) -> None:
|
||||
"""Set the scale name, e.g. 'Major', 'Minor', 'Dorian'."""
|
||||
get_client().cmd("/live/song/set/scale_name", scale_name)
|
||||
|
||||
# --- Tracks ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_tracks() -> list:
|
||||
"""Get all track names. Returns list of {index, name}."""
|
||||
names = get_client().query("/live/song/get/track_names")
|
||||
return [{"index": i, "name": n} for i, n in enumerate(names)]
|
||||
|
||||
@mcp.tool()
|
||||
def song_create_audio_track(index: int = -1) -> None:
|
||||
"""Create a new audio track at the given index (-1 = end)."""
|
||||
get_client().cmd("/live/song/create_audio_track", index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_create_midi_track(index: int = -1) -> None:
|
||||
"""Create a new MIDI track at the given index (-1 = end)."""
|
||||
get_client().cmd("/live/song/create_midi_track", index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_create_return_track() -> None:
|
||||
"""Create a new return/auxiliary track."""
|
||||
get_client().cmd("/live/song/create_return_track")
|
||||
|
||||
@mcp.tool()
|
||||
def song_delete_track(track_index: int) -> None:
|
||||
"""Delete the track at track_index."""
|
||||
get_client().cmd("/live/song/delete_track", track_index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_delete_return_track(track_index: int) -> None:
|
||||
"""Delete the return track at track_index."""
|
||||
get_client().cmd("/live/song/delete_return_track", track_index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_duplicate_track(track_index: int) -> None:
|
||||
"""Duplicate the track at track_index."""
|
||||
get_client().cmd("/live/song/duplicate_track", track_index)
|
||||
|
||||
# --- Scenes ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_scenes() -> list:
|
||||
"""Get all scene names. Returns list of {index, name}."""
|
||||
names = get_client().query("/live/song/get/scenes/name")
|
||||
return [{"index": i, "name": n} for i, n in enumerate(names)]
|
||||
|
||||
@mcp.tool()
|
||||
def song_create_scene(index: int = -1) -> None:
|
||||
"""Create a new scene at the given index (-1 = end)."""
|
||||
get_client().cmd("/live/song/create_scene", index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_delete_scene(scene_index: int) -> None:
|
||||
"""Delete the scene at scene_index."""
|
||||
get_client().cmd("/live/song/delete_scene", scene_index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_duplicate_scene(scene_index: int) -> None:
|
||||
"""Duplicate the scene at scene_index."""
|
||||
get_client().cmd("/live/song/duplicate_scene", scene_index)
|
||||
|
||||
@mcp.tool()
|
||||
def song_capture_and_insert_scene() -> None:
|
||||
"""Capture the current session state and insert it as a new scene."""
|
||||
get_client().cmd("/live/song/capture_and_insert_scene")
|
||||
|
||||
# --- Cue Points ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_get_cue_points() -> list:
|
||||
"""Get all cue point names. Returns list of {index, name}."""
|
||||
names = get_client().query("/live/song/get/cue_points")
|
||||
return [{"index": i, "name": n} for i, n in enumerate(names)]
|
||||
|
||||
@mcp.tool()
|
||||
def song_jump_to_cue(index_or_name: str) -> None:
|
||||
"""Jump to a cue point by index (int as string) or name."""
|
||||
try:
|
||||
val = int(index_or_name)
|
||||
except ValueError:
|
||||
val = index_or_name
|
||||
get_client().cmd("/live/song/cue_point/jump", val)
|
||||
|
||||
@mcp.tool()
|
||||
def song_add_or_delete_cue() -> None:
|
||||
"""Toggle a cue point at the current playhead position."""
|
||||
get_client().cmd("/live/song/set_or_delete_cue")
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_cue_name(cue_index: int, name: str) -> None:
|
||||
"""Set the name of a cue point by index."""
|
||||
get_client().cmd("/live/song/cue_point/set/name", cue_index, name)
|
||||
|
||||
# --- Structure export ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_export_structure() -> None:
|
||||
"""Export the full session structure to a JSON file in the temp directory."""
|
||||
get_client().cmd("/live/song/export/structure")
|
||||
|
||||
# --- Link ---
|
||||
|
||||
@mcp.tool()
|
||||
def song_set_ableton_link(enabled: bool) -> None:
|
||||
"""Enable or disable Ableton Link."""
|
||||
get_client().cmd("/live/song/set/is_ableton_link_enabled", int(enabled))
|
||||
@@ -0,0 +1,47 @@
|
||||
"""MCP tools for Ableton Live system / application level."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
@mcp.tool()
|
||||
def system_test_connection() -> dict:
|
||||
"""Test connectivity to AbletonOSC. Returns {connected, latency_ms}."""
|
||||
try:
|
||||
latency = get_client().ping()
|
||||
return {"connected": True, "latency_ms": round(latency, 2)}
|
||||
except TimeoutError as e:
|
||||
return {"connected": False, "error": str(e)}
|
||||
|
||||
@mcp.tool()
|
||||
def system_get_version() -> dict:
|
||||
"""Get the Ableton Live version. Returns {major, minor}."""
|
||||
result = get_client().query("/live/application/get/version")
|
||||
return {"major": result[0], "minor": result[1]}
|
||||
|
||||
@mcp.tool()
|
||||
def system_get_cpu_usage() -> float:
|
||||
"""Get Ableton Live average CPU process usage (0.0–1.0)."""
|
||||
result = get_client().query("/live/application/get/average_process_usage")
|
||||
return result[0]
|
||||
|
||||
@mcp.tool()
|
||||
def system_show_message(message: str) -> None:
|
||||
"""Display a message in the Ableton Live status bar."""
|
||||
get_client().cmd("/live/api/show_message", message)
|
||||
|
||||
@mcp.tool()
|
||||
def system_set_log_level(level: str) -> None:
|
||||
"""Set AbletonOSC log level. level: debug | info | warning | error | critical."""
|
||||
get_client().cmd("/live/api/set/log_level", level)
|
||||
|
||||
@mcp.tool()
|
||||
def system_get_log_level() -> str:
|
||||
"""Get the current AbletonOSC log level."""
|
||||
result = get_client().query("/live/api/get/log_level")
|
||||
return result[0]
|
||||
|
||||
@mcp.tool()
|
||||
def system_reload_api() -> None:
|
||||
"""Reload all AbletonOSC modules (useful after script updates)."""
|
||||
get_client().cmd("/live/api/reload")
|
||||
@@ -0,0 +1,161 @@
|
||||
"""MCP tools for Ableton Live track control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_info(track_index: int) -> dict:
|
||||
"""Get all properties of a track. Returns a comprehensive dict."""
|
||||
c = get_client()
|
||||
return {
|
||||
"name": c.query("/live/track/get/name", track_index)[0],
|
||||
"volume": c.query("/live/track/get/volume", track_index)[0],
|
||||
"panning": c.query("/live/track/get/panning", track_index)[0],
|
||||
"mute": bool(c.query("/live/track/get/mute", track_index)[0]),
|
||||
"solo": bool(c.query("/live/track/get/solo", track_index)[0]),
|
||||
"arm": bool(c.query("/live/track/get/arm", track_index)[0]),
|
||||
"can_be_armed": bool(c.query("/live/track/get/can_be_armed", track_index)[0]),
|
||||
"is_foldable": bool(c.query("/live/track/get/is_foldable", track_index)[0]),
|
||||
"is_grouped": bool(c.query("/live/track/get/is_grouped", track_index)[0]),
|
||||
"is_visible": bool(c.query("/live/track/get/is_visible", track_index)[0]),
|
||||
"has_audio_input": bool(c.query("/live/track/get/has_audio_input", track_index)[0]),
|
||||
"has_audio_output": bool(c.query("/live/track/get/has_audio_output", track_index)[0]),
|
||||
"has_midi_input": bool(c.query("/live/track/get/has_midi_input", track_index)[0]),
|
||||
"has_midi_output": bool(c.query("/live/track/get/has_midi_output", track_index)[0]),
|
||||
"color": c.query("/live/track/get/color", track_index)[0],
|
||||
"current_monitoring_state": c.query("/live/track/get/current_monitoring_state", track_index)[0],
|
||||
"playing_slot_index": c.query("/live/track/get/playing_slot_index", track_index)[0],
|
||||
"fired_slot_index": c.query("/live/track/get/fired_slot_index", track_index)[0],
|
||||
"num_devices": c.query("/live/track/get/num_devices", track_index)[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_volume(track_index: int, volume: float) -> None:
|
||||
"""Set track volume (0.0 = -inf, 0.85 ≈ 0 dB, 1.0 = +6 dB)."""
|
||||
get_client().cmd("/live/track/set/volume", track_index, volume)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_pan(track_index: int, pan: float) -> None:
|
||||
"""Set track panning (-1.0 = full left, 0.0 = center, 1.0 = full right)."""
|
||||
get_client().cmd("/live/track/set/panning", track_index, pan)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_mute(track_index: int, muted: bool) -> None:
|
||||
"""Mute or unmute a track."""
|
||||
get_client().cmd("/live/track/set/mute", track_index, int(muted))
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_solo(track_index: int, solo: bool) -> None:
|
||||
"""Solo or unsolo a track."""
|
||||
get_client().cmd("/live/track/set/solo", track_index, int(solo))
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_arm(track_index: int, armed: bool) -> None:
|
||||
"""Arm or disarm a track for recording."""
|
||||
get_client().cmd("/live/track/set/arm", track_index, int(armed))
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_send(track_index: int, send_index: int, value: float) -> None:
|
||||
"""Set send level from a track to a return track (0.0–1.0)."""
|
||||
get_client().cmd("/live/track/set/send", track_index, send_index, value)
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_send(track_index: int, send_index: int) -> float:
|
||||
"""Get the send level from a track to a return track."""
|
||||
return get_client().query("/live/track/get/send", track_index, send_index)[0]
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_name(track_index: int, name: str) -> None:
|
||||
"""Rename a track."""
|
||||
get_client().cmd("/live/track/set/name", track_index, name)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_color(track_index: int, color_index: int) -> None:
|
||||
"""Set track color by palette index (0–69)."""
|
||||
get_client().cmd("/live/track/set/color_index", track_index, color_index)
|
||||
|
||||
@mcp.tool()
|
||||
def track_stop_clips(track_index: int) -> None:
|
||||
"""Stop all clips on a track."""
|
||||
get_client().cmd("/live/track/stop_all_clips", track_index)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_monitoring(track_index: int, state: int) -> None:
|
||||
"""Set monitoring state: 0=Auto, 1=In, 2=Off."""
|
||||
get_client().cmd("/live/track/set/current_monitoring_state", track_index, state)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_fold(track_index: int, folded: bool) -> None:
|
||||
"""Fold or unfold a group track."""
|
||||
get_client().cmd("/live/track/set/fold_state", track_index, int(folded))
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_clips(track_index: int) -> list:
|
||||
"""Get all clips on a track. Returns list of {index, name, length, color}."""
|
||||
c = get_client()
|
||||
names = c.query("/live/track/get/clips/name", track_index)
|
||||
lengths = c.query("/live/track/get/clips/length", track_index)
|
||||
colors = c.query("/live/track/get/clips/color", track_index)
|
||||
result = []
|
||||
for i, (name, length, color) in enumerate(zip(names, lengths, colors)):
|
||||
if name:
|
||||
result.append({"index": i, "name": name, "length": length, "color": color})
|
||||
return result
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_devices(track_index: int) -> list:
|
||||
"""Get all devices on a track. Returns list of {index, name, type, class_name}."""
|
||||
c = get_client()
|
||||
names = c.query("/live/track/get/devices/name", track_index)
|
||||
types = c.query("/live/track/get/devices/type", track_index)
|
||||
class_names = c.query("/live/track/get/devices/class_name", track_index)
|
||||
type_map = {1: "audio_effect", 2: "instrument", 4: "midi_effect"}
|
||||
return [
|
||||
{"index": i, "name": n, "type": type_map.get(t, t), "class_name": cn}
|
||||
for i, (n, t, cn) in enumerate(zip(names, types, class_names))
|
||||
]
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_meter(track_index: int) -> dict:
|
||||
"""Get track output meter levels. Returns {level, left, right}."""
|
||||
c = get_client()
|
||||
return {
|
||||
"level": c.query("/live/track/get/output_meter_level", track_index)[0],
|
||||
"left": c.query("/live/track/get/output_meter_left", track_index)[0],
|
||||
"right": c.query("/live/track/get/output_meter_right", track_index)[0],
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_arrangement_clips(track_index: int) -> list:
|
||||
"""Get arrangement clips on a track. Returns list of {index, name, length, start_time}."""
|
||||
c = get_client()
|
||||
names = c.query("/live/track/get/arrangement_clips/name", track_index)
|
||||
lengths = c.query("/live/track/get/arrangement_clips/length", track_index)
|
||||
starts = c.query("/live/track/get/arrangement_clips/start_time", track_index)
|
||||
return [
|
||||
{"index": i, "name": n, "length": l, "start_time": s}
|
||||
for i, (n, l, s) in enumerate(zip(names, lengths, starts))
|
||||
]
|
||||
|
||||
@mcp.tool()
|
||||
def track_delete_device(track_index: int, device_index: int) -> None:
|
||||
"""Delete a device from a track."""
|
||||
get_client().cmd("/live/track/delete_device", track_index, device_index)
|
||||
|
||||
@mcp.tool()
|
||||
def track_get_available_input_routing_types(track_index: int) -> list:
|
||||
"""Get available input routing types for a track."""
|
||||
result = get_client().query("/live/track/get/available_input_routing_types", track_index)
|
||||
return list(result)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_input_routing_type(track_index: int, routing_type: int) -> None:
|
||||
"""Set the input routing type for a track."""
|
||||
get_client().cmd("/live/track/set/input_routing_type", track_index, routing_type)
|
||||
|
||||
@mcp.tool()
|
||||
def track_set_output_routing_type(track_index: int, routing_type: int) -> None:
|
||||
"""Set the output routing type for a track."""
|
||||
get_client().cmd("/live/track/set/output_routing_type", track_index, routing_type)
|
||||
@@ -0,0 +1,44 @@
|
||||
"""MCP tools for Ableton Live view / selection control."""
|
||||
|
||||
from ableton_mcp.osc_client import get_client
|
||||
|
||||
|
||||
def register(mcp):
|
||||
|
||||
@mcp.tool()
|
||||
def view_get_selection() -> dict:
|
||||
"""Get the currently selected track, scene, clip, and device.
|
||||
Returns {selected_track, selected_scene, selected_clip_track, selected_clip_index, selected_device_track, selected_device_index}."""
|
||||
c = get_client()
|
||||
scene = c.query("/live/view/get/selected_scene")[0]
|
||||
track = c.query("/live/view/get/selected_track")[0]
|
||||
clip = c.query("/live/view/get/selected_clip")
|
||||
device = c.query("/live/view/get/selected_device")
|
||||
return {
|
||||
"selected_track": track,
|
||||
"selected_scene": scene,
|
||||
"selected_clip_track": clip[0] if len(clip) >= 1 else None,
|
||||
"selected_clip_index": clip[1] if len(clip) >= 2 else None,
|
||||
"selected_device_track": device[0] if len(device) >= 1 else None,
|
||||
"selected_device_index": device[1] if len(device) >= 2 else None,
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def view_set_selected_track(track_index: int) -> None:
|
||||
"""Select a track in the Ableton UI."""
|
||||
get_client().cmd("/live/view/set/selected_track", track_index)
|
||||
|
||||
@mcp.tool()
|
||||
def view_set_selected_scene(scene_index: int) -> None:
|
||||
"""Select a scene in the Ableton UI."""
|
||||
get_client().cmd("/live/view/set/selected_scene", scene_index)
|
||||
|
||||
@mcp.tool()
|
||||
def view_set_selected_clip(track_index: int, clip_index: int) -> None:
|
||||
"""Select a clip in the Ableton UI."""
|
||||
get_client().cmd("/live/view/set/selected_clip", track_index, clip_index)
|
||||
|
||||
@mcp.tool()
|
||||
def view_set_selected_device(track_index: int, device_index: int) -> None:
|
||||
"""Select a device in the Ableton UI."""
|
||||
get_client().cmd("/live/view/set/selected_device", track_index, device_index)
|
||||
Reference in New Issue
Block a user