commit 12ebe6728c1e18ebb7ec1ef9b77b1cceccc7f0f6 Author: Sebastian Krüger Date: Mon Jun 1 17:02:23 2026 +0200 Add full-featured MCP server for Bitwig Studio via DrivenByMoss OSC 204 MCP tools across 11 modules (transport, track, master, device, clip, scene, browser, project, layout, midi, misc) covering every OSC command supported by the DrivenByMoss extension. Architecture mirrors ableton-mcp: FastMCP + python-osc singleton with a push-based state store (DrivenByMoss proactively sends updates). Python sends to port 8000 and listens on port 9000. Co-Authored-By: Claude Sonnet 4.6 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ad19eb1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.venv/ +__pycache__/ +*.egg-info/ +dist/ +build/ +*.pyc diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..68d5112 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "bitwig-mcp" +version = "0.1.0" +description = "Full-featured MCP server for controlling Bitwig Studio via DrivenByMoss OSC" +requires-python = ">=3.10" +dependencies = [ + "mcp[cli]>=1.0", + "python-osc>=1.8", +] + +[project.scripts] +bitwig-mcp = "bitwig_mcp.server:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/bitwig_mcp/__init__.py b/src/bitwig_mcp/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bitwig_mcp/config.py b/src/bitwig_mcp/config.py new file mode 100644 index 0000000..ebf4c6d --- /dev/null +++ b/src/bitwig_mcp/config.py @@ -0,0 +1,9 @@ +import os + +# DrivenByMoss receives on 8000 (we send TO this port) +# DrivenByMoss sends to 9000 (we listen ON this port) +BITWIG_HOST = os.getenv("BITWIG_HOST", "127.0.0.1") +BITWIG_SEND_PORT = int(os.getenv("BITWIG_SEND_PORT", "8000")) +BITWIG_RECEIVE_PORT = int(os.getenv("BITWIG_RECEIVE_PORT", "9000")) +BITWIG_TIMEOUT = float(os.getenv("BITWIG_TIMEOUT", "5.0")) +BITWIG_REFRESH_WAIT = float(os.getenv("BITWIG_REFRESH_WAIT", "0.5")) diff --git a/src/bitwig_mcp/osc_client.py b/src/bitwig_mcp/osc_client.py new file mode 100644 index 0000000..9ffc0d1 --- /dev/null +++ b/src/bitwig_mcp/osc_client.py @@ -0,0 +1,143 @@ +"""Thread-safe OSC client/server for communicating with DrivenByMoss in Bitwig Studio.""" + +import threading +import time +from typing import Any + +from pythonosc import udp_client, dispatcher, osc_server + +from bitwig_mcp.config import ( + BITWIG_HOST, + BITWIG_SEND_PORT, + BITWIG_RECEIVE_PORT, + BITWIG_TIMEOUT, + BITWIG_REFRESH_WAIT, +) + + +class OSCClient: + """Singleton managing bidirectional OSC communication with DrivenByMoss. + + DrivenByMoss is push-based: Bitwig proactively sends state updates whenever + values change. This client maintains a state store updated by all incoming + messages. Call refresh() on startup to trigger a full state dump. + """ + + _instance: "OSCClient | None" = None + _lock = threading.Lock() + + def __new__(cls) -> "OSCClient": + with cls._lock: + if cls._instance is None: + instance = super().__new__(cls) + instance._initialized = False + cls._instance = instance + return cls._instance + + def __init__(self) -> None: + if self._initialized: + return + self._initialized = True + + self._client = udp_client.SimpleUDPClient(BITWIG_HOST, BITWIG_SEND_PORT) + + # State store: address → last received args tuple + self._state: dict[str, tuple] = {} + self._state_lock = threading.Lock() + + # Waiters: address → list of Events to set when state arrives + self._waiters: dict[str, list[threading.Event]] = {} + self._waiters_lock = threading.Lock() + + self._dispatcher = dispatcher.Dispatcher() + self._dispatcher.set_default_handler(self._handle_message) + + self._server = osc_server.ThreadingOSCUDPServer( + ("0.0.0.0", BITWIG_RECEIVE_PORT), self._dispatcher + ) + self._server_thread = threading.Thread( + target=self._server.serve_forever, daemon=True + ) + self._server_thread.start() + + def _handle_message(self, address: str, *args: Any) -> None: + with self._state_lock: + self._state[address] = args + + with self._waiters_lock: + for event in self._waiters.get(address, []): + event.set() + + def cmd(self, address: str, *args: Any) -> None: + """Send a fire-and-forget OSC message to Bitwig.""" + self._client.send_message(address, list(args) if args else []) + + def get_state(self, address: str) -> tuple | None: + """Return the last received state for an OSC address, or None.""" + with self._state_lock: + return self._state.get(address) + + def get_state_value(self, address: str, default: Any = None) -> Any: + """Return the first value from the last received state, or default.""" + state = self.get_state(address) + if state is None or len(state) == 0: + return default + return state[0] + + def get_all_state(self) -> dict[str, tuple]: + """Return a snapshot of the full state store.""" + with self._state_lock: + return dict(self._state) + + def wait_for_state(self, address: str, timeout: float = BITWIG_TIMEOUT) -> tuple | None: + """Block until a state update arrives for the given address.""" + event = threading.Event() + with self._waiters_lock: + self._waiters.setdefault(address, []).append(event) + try: + event.wait(timeout) + return self.get_state(address) + finally: + with self._waiters_lock: + waiters = self._waiters.get(address, []) + try: + waiters.remove(event) + except ValueError: + pass + if not waiters: + self._waiters.pop(address, None) + + def refresh(self) -> None: + """Send /refresh to trigger a full state dump from Bitwig, then wait.""" + self.cmd("/refresh") + time.sleep(BITWIG_REFRESH_WAIT) + + def ping(self) -> bool: + """Send /refresh and return True if any state is received within timeout.""" + count_before = len(self.get_all_state()) + self.cmd("/refresh") + deadline = time.monotonic() + BITWIG_TIMEOUT + while time.monotonic() < deadline: + if len(self.get_all_state()) > count_before: + return True + time.sleep(0.05) + return False + + def get_state_dict(self, prefix: str) -> dict[str, Any]: + """Return all state entries whose address starts with prefix.""" + with self._state_lock: + return { + addr: vals + for addr, vals in self._state.items() + if addr.startswith(prefix) + } + + +_client: OSCClient | None = None + + +def get_client() -> OSCClient: + global _client + if _client is None: + _client = OSCClient() + return _client diff --git a/src/bitwig_mcp/server.py b/src/bitwig_mcp/server.py new file mode 100644 index 0000000..d80af0c --- /dev/null +++ b/src/bitwig_mcp/server.py @@ -0,0 +1,39 @@ +"""bitwig-mcp: Full-featured MCP server for controlling Bitwig Studio via DrivenByMoss OSC.""" + +from mcp.server.fastmcp import FastMCP + +from bitwig_mcp.tools import ( + transport, + track, + master, + device, + clip, + scene, + browser, + project, + layout, + midi, + misc, +) + +mcp = FastMCP("bitwig-mcp") + +transport.register(mcp) +track.register(mcp) +master.register(mcp) +device.register(mcp) +clip.register(mcp) +scene.register(mcp) +browser.register(mcp) +project.register(mcp) +layout.register(mcp) +midi.register(mcp) +misc.register(mcp) + + +def main() -> None: + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/src/bitwig_mcp/tools/__init__.py b/src/bitwig_mcp/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/bitwig_mcp/tools/browser.py b/src/bitwig_mcp/tools/browser.py new file mode 100644 index 0000000..74943d9 --- /dev/null +++ b/src/bitwig_mcp/tools/browser.py @@ -0,0 +1,113 @@ +"""MCP tools for Bitwig Studio browser navigation via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def browser_open_preset() -> dict: + """Open the browser to replace the cursor device with a preset.""" + get_client().cmd("/browser/preset") + return {"sent": "/browser/preset"} + + @mcp.tool() + def browser_insert_device_after() -> dict: + """Open the browser to insert a device after the cursor device.""" + get_client().cmd("/browser/device") + return {"sent": "/browser/device"} + + @mcp.tool() + def browser_insert_device_before() -> dict: + """Open the browser to insert a device before the cursor device.""" + get_client().cmd("/browser/device/before") + return {"sent": "/browser/device/before"} + + @mcp.tool() + def browser_navigate_tab(direction: str) -> dict: + """Navigate browser content type tabs. direction: '+' (next) or '-' (prev).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/browser/tab/{direction}") + return {"sent": f"/browser/tab/{direction}"} + + @mcp.tool() + def browser_navigate_filter(column_index: int, direction: str) -> dict: + """Navigate a browser filter column. column_index: 1-indexed. direction: '+' (next) or '-' (prev).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/browser/filter/{column_index}/{direction}") + return {"sent": f"/browser/filter/{column_index}/{direction}"} + + @mcp.tool() + def browser_reset_filter(column_index: int) -> dict: + """Reset a browser filter column to show all results. column_index: 1-indexed.""" + get_client().cmd(f"/browser/filter/{column_index}/reset") + return {"sent": f"/browser/filter/{column_index}/reset"} + + @mcp.tool() + def browser_navigate_results(direction: str) -> dict: + """Navigate the browser results list. direction: '+' (next) or '-' (prev).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/browser/result/{direction}") + return {"sent": f"/browser/result/{direction}"} + + @mcp.tool() + def browser_commit() -> dict: + """Commit the current browser selection (loads the selected preset/device).""" + get_client().cmd("/browser/commit") + return {"sent": "/browser/commit"} + + @mcp.tool() + def browser_cancel() -> dict: + """Cancel the browser and close it without loading anything.""" + get_client().cmd("/browser/cancel") + return {"sent": "/browser/cancel"} + + @mcp.tool() + def browser_get_state() -> dict: + """Get the current browser state including active tab, filters, and results.""" + c = get_client() + filters = [] + for i in range(1, 7): + items = [] + for j in range(1, 17): + item = { + "index": j, + "exists": _v(c.get_state(f"/browser/filter/{i}/item/{j}/exists")), + "name": _v(c.get_state(f"/browser/filter/{i}/item/{j}/name")), + "hits": _v(c.get_state(f"/browser/filter/{i}/item/{j}/hits")), + "selected": _v(c.get_state(f"/browser/filter/{i}/item/{j}/isSelected")), + } + if item["exists"]: + items.append(item) + filters.append({ + "index": i, + "exists": _v(c.get_state(f"/browser/filter/{i}/exists")), + "name": _v(c.get_state(f"/browser/filter/{i}/name")), + "items": items, + }) + results = [] + for i in range(1, 17): + result = { + "index": i, + "exists": _v(c.get_state(f"/browser/result/{i}/exists")), + "name": _v(c.get_state(f"/browser/result/{i}/name")), + "hits": _v(c.get_state(f"/browser/result/{i}/hits")), + "selected": _v(c.get_state(f"/browser/result/{i}/isSelected")), + } + if result["exists"]: + results.append(result) + return { + "active": _v(c.get_state("/browser/isActive")), + "tab": _v(c.get_state("/browser/tab")), + "filters": filters, + "results": results, + } diff --git a/src/bitwig_mcp/tools/clip.py b/src/bitwig_mcp/tools/clip.py new file mode 100644 index 0000000..cb52f79 --- /dev/null +++ b/src/bitwig_mcp/tools/clip.py @@ -0,0 +1,107 @@ +"""MCP tools for Bitwig Studio cursor clip controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def clip_quantize() -> dict: + """Quantize the cursor clip.""" + get_client().cmd("/clip/quantize") + return {"sent": "/clip/quantize"} + + @mcp.tool() + def clip_set_name(name: str) -> dict: + """Set the name of the cursor clip.""" + get_client().cmd("/clip/name", name) + return {"sent": "/clip/name", "name": name} + + @mcp.tool() + def clip_set_color(color: str) -> dict: + """Set the color of the cursor clip. color: RGB string e.g. 'rgb(255,128,0)'.""" + get_client().cmd("/clip/color", color) + return {"sent": "/clip/color", "color": color} + + @mcp.tool() + def clip_toggle_pinned(value: int | None = None) -> dict: + """Toggle or set the cursor clip pin state. Pinned clips stay focused. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/clip/pinned") + else: + c.cmd("/clip/pinned", value) + return {"sent": "/clip/pinned", "value": value} + + @mcp.tool() + def clip_launch() -> dict: + """Launch the cursor clip.""" + get_client().cmd("/clip/launch") + return {"sent": "/clip/launch"} + + @mcp.tool() + def clip_launch_alt() -> dict: + """Launch the cursor clip (alternative mode).""" + get_client().cmd("/clip/launchAlt") + return {"sent": "/clip/launchAlt"} + + @mcp.tool() + def clip_record() -> dict: + """Start recording into the cursor clip.""" + get_client().cmd("/clip/record") + return {"sent": "/clip/record"} + + @mcp.tool() + def clip_create(length_beats: float = 4.0) -> dict: + """Create a new empty cursor clip. length_beats: clip length in beats.""" + get_client().cmd("/clip/create", length_beats) + return {"sent": "/clip/create", "length_beats": length_beats} + + @mcp.tool() + def clip_stop() -> dict: + """Stop clips on the cursor track.""" + get_client().cmd("/clip/stop") + return {"sent": "/clip/stop"} + + @mcp.tool() + def clip_stop_alt() -> dict: + """Stop clips on the cursor track (alternative mode).""" + get_client().cmd("/clip/stopAlt") + return {"sent": "/clip/stopAlt"} + + @mcp.tool() + def clip_stop_all() -> dict: + """Stop all playing clips across all tracks.""" + get_client().cmd("/clip/stopall") + return {"sent": "/clip/stopall"} + + @mcp.tool() + def clip_stop_all_alt() -> dict: + """Stop all playing clips (alternative mode).""" + get_client().cmd("/clip/stopallAlt") + return {"sent": "/clip/stopallAlt"} + + @mcp.tool() + def clip_navigate(direction: str) -> dict: + """Navigate the cursor clip. direction: '+' (next clip) or '-' (previous clip).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/clip/{direction}") + return {"sent": f"/clip/{direction}"} + + @mcp.tool() + def clip_get_state() -> dict: + """Get the current state of the cursor clip.""" + c = get_client() + return { + "exists": _v(c.get_state("/clip/exists")), + "name": _v(c.get_state("/clip/name")), + "color": _v(c.get_state("/clip/color")), + "pinned": _v(c.get_state("/clip/pinned")), + } diff --git a/src/bitwig_mcp/tools/device.py b/src/bitwig_mcp/tools/device.py new file mode 100644 index 0000000..25769f3 --- /dev/null +++ b/src/bitwig_mcp/tools/device.py @@ -0,0 +1,328 @@ +"""MCP tools for Bitwig Studio device controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + # --- Cursor device --- + + @mcp.tool() + def device_toggle_bypass(value: int | None = None) -> dict: + """Toggle or set the cursor device bypass. value=1 bypasses, value=0 enables, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/device/bypass") + else: + c.cmd("/device/bypass", value) + return {"sent": "/device/bypass", "value": value} + + @mcp.tool() + def device_toggle_expand(value: int | None = None) -> dict: + """Toggle or set the cursor device expanded state. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/device/expand") + else: + c.cmd("/device/expand", value) + return {"sent": "/device/expand", "value": value} + + @mcp.tool() + def device_toggle_parameters(value: int | None = None) -> dict: + """Toggle or set the device parameters panel visibility. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/device/parameters") + else: + c.cmd("/device/parameters", value) + return {"sent": "/device/parameters", "value": value} + + @mcp.tool() + def device_toggle_window(value: int | None = None) -> dict: + """Toggle or set the device plugin window. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/device/window") + else: + c.cmd("/device/window", value) + return {"sent": "/device/window", "value": value} + + @mcp.tool() + def device_toggle_pinned(value: int | None = None) -> dict: + """Toggle or set device cursor pin (locks cursor to this device). omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/device/pinned") + else: + c.cmd("/device/pinned", value) + return {"sent": "/device/pinned", "value": value} + + @mcp.tool() + def device_navigate_next() -> dict: + """Move cursor to the next device in the chain.""" + get_client().cmd("/device/+") + return {"sent": "/device/+"} + + @mcp.tool() + def device_navigate_prev() -> dict: + """Move cursor to the previous device in the chain.""" + get_client().cmd("/device/-") + return {"sent": "/device/-"} + + # --- Device parameters --- + + @mcp.tool() + def device_set_param(param_index: int, value: float) -> dict: + """Set a device parameter value. param_index: 1–8, value: 0.0–1.0.""" + get_client().cmd(f"/device/param/{param_index}/value", value) + return {"sent": f"/device/param/{param_index}/value", "value": value} + + @mcp.tool() + def device_reset_param(param_index: int) -> dict: + """Reset a device parameter to its default value. param_index: 1–8.""" + get_client().cmd(f"/device/param/{param_index}/reset") + return {"sent": f"/device/param/{param_index}/reset"} + + @mcp.tool() + def device_navigate_params(direction: str) -> dict: + """Navigate device parameters. direction: '+' (next), '-' (prev), 'page+' (next page), 'page-' (prev page).""" + addr_map = {"+": "/device/param/+", "-": "/device/param/-", + "page+": "/device/param/bank/page/+", "page-": "/device/param/bank/page/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'page+', or 'page-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + # --- Device pages --- + + @mcp.tool() + def device_select_page(page_index: int) -> dict: + """Select a device parameter page. page_index: 1-indexed.""" + get_client().cmd(f"/device/page/{page_index}/selected", 1) + return {"sent": f"/device/page/{page_index}/selected"} + + # --- Sibling devices --- + + @mcp.tool() + def device_select_sibling(sibling_index: int) -> dict: + """Select a sibling device in the chain. sibling_index: 1-indexed.""" + get_client().cmd(f"/device/sibling/{sibling_index}/select") + return {"sent": f"/device/sibling/{sibling_index}/select"} + + # --- Device layers --- + + @mcp.tool() + def device_layer_select(layer_index: int) -> dict: + """Select a device layer (for instrument layers or FX chains). layer_index: 1-indexed.""" + get_client().cmd(f"/device/layer/{layer_index}/select") + return {"sent": f"/device/layer/{layer_index}/select"} + + @mcp.tool() + def device_layer_set_volume(layer_index: int, value: int) -> dict: + """Set volume for a device layer. layer_index: 1-indexed, value: 0–127.""" + get_client().cmd(f"/device/layer/{layer_index}/volume", value) + return {"sent": f"/device/layer/{layer_index}/volume", "value": value} + + @mcp.tool() + def device_layer_set_pan(layer_index: int, value: int) -> dict: + """Set panning for a device layer. layer_index: 1-indexed, value: 0–127 (64 = center).""" + get_client().cmd(f"/device/layer/{layer_index}/pan", value) + return {"sent": f"/device/layer/{layer_index}/pan", "value": value} + + @mcp.tool() + def device_layer_toggle_mute(layer_index: int, value: int | None = None) -> dict: + """Toggle or set mute on a device layer. layer_index: 1-indexed. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/device/layer/{layer_index}/mute") + else: + c.cmd(f"/device/layer/{layer_index}/mute", value) + return {"sent": f"/device/layer/{layer_index}/mute", "value": value} + + @mcp.tool() + def device_layer_set_send_volume(layer_index: int, send_index: int, value: int) -> dict: + """Set a send volume for a device layer. layer_index: 1-indexed, send_index: 1-indexed, value: 0–127.""" + get_client().cmd(f"/device/layer/{layer_index}/send/{send_index}/volume", value) + return {"sent": f"/device/layer/{layer_index}/send/{send_index}/volume", "value": value} + + @mcp.tool() + def device_layer_enter(layer_index: int) -> dict: + """Enter a device layer to view its nested devices. layer_index: 1-indexed.""" + get_client().cmd(f"/device/layer/{layer_index}/enter") + return {"sent": f"/device/layer/{layer_index}/enter"} + + @mcp.tool() + def device_layer_navigate(direction: str) -> dict: + """Navigate device layers. direction: '+' (next), '-' (prev), 'page+' (next page), 'page-' (prev page).""" + addr_map = {"+": "/device/layer/+", "-": "/device/layer/-", + "page+": "/device/layer/page/+", "page-": "/device/layer/page/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'page+', or 'page-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + # --- Drum pads --- + + @mcp.tool() + def device_drumpad_select(pad_index: int) -> dict: + """Select a drum pad. pad_index: 1-indexed (typically 1–16).""" + get_client().cmd(f"/device/drumpad/{pad_index}/select") + return {"sent": f"/device/drumpad/{pad_index}/select"} + + @mcp.tool() + def device_drumpad_set_volume(pad_index: int, value: int) -> dict: + """Set volume for a drum pad. pad_index: 1-indexed, value: 0–127.""" + get_client().cmd(f"/device/drumpad/{pad_index}/volume", value) + return {"sent": f"/device/drumpad/{pad_index}/volume", "value": value} + + @mcp.tool() + def device_drumpad_set_pan(pad_index: int, value: int) -> dict: + """Set panning for a drum pad. pad_index: 1-indexed, value: 0–127 (64 = center).""" + get_client().cmd(f"/device/drumpad/{pad_index}/pan", value) + return {"sent": f"/device/drumpad/{pad_index}/pan", "value": value} + + @mcp.tool() + def device_drumpad_toggle_mute(pad_index: int, value: int | None = None) -> dict: + """Toggle or set mute on a drum pad. pad_index: 1-indexed. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/device/drumpad/{pad_index}/mute") + else: + c.cmd(f"/device/drumpad/{pad_index}/mute", value) + return {"sent": f"/device/drumpad/{pad_index}/mute", "value": value} + + @mcp.tool() + def device_drumpad_toggle_solo(pad_index: int, value: int | None = None) -> dict: + """Toggle or set solo on a drum pad. pad_index: 1-indexed. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/device/drumpad/{pad_index}/solo") + else: + c.cmd(f"/device/drumpad/{pad_index}/solo", value) + return {"sent": f"/device/drumpad/{pad_index}/solo", "value": value} + + # --- Primary instrument --- + + @mcp.tool() + def primary_set_param(param_index: int, value: float) -> dict: + """Set a parameter on the primary instrument device. param_index: 1–8, value: 0.0–1.0.""" + get_client().cmd(f"/primary/param/{param_index}/value", value) + return {"sent": f"/primary/param/{param_index}/value", "value": value} + + @mcp.tool() + def primary_toggle_bypass(value: int | None = None) -> dict: + """Toggle or set bypass on the primary instrument device. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/primary/bypass") + else: + c.cmd("/primary/bypass", value) + return {"sent": "/primary/bypass", "value": value} + + @mcp.tool() + def primary_navigate_params(direction: str) -> dict: + """Navigate primary instrument parameters. direction: '+', '-', 'page+', 'page-'.""" + addr_map = {"+": "/primary/param/+", "-": "/primary/param/-", + "page+": "/primary/param/bank/page/+", "page-": "/primary/param/bank/page/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'page+', or 'page-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + # --- EQ device --- + + @mcp.tool() + def eq_set_type(band_index: int, eq_type: str) -> dict: + """Set the EQ band type. band_index: 1-indexed. eq_type: 'OFF', 'LP1', 'LP2', 'HP1', 'HP2', 'LS', 'HS', 'BELL', 'NOTCH', etc.""" + get_client().cmd(f"/eq/type/{band_index}", eq_type) + return {"sent": f"/eq/type/{band_index}", "type": eq_type} + + @mcp.tool() + def eq_set_gain(band_index: int, value: float) -> dict: + """Set the EQ band gain. band_index: 1-indexed, value: 0.0–1.0.""" + get_client().cmd(f"/eq/gain/{band_index}", value) + return {"sent": f"/eq/gain/{band_index}", "value": value} + + @mcp.tool() + def eq_set_frequency(band_index: int, value: float) -> dict: + """Set the EQ band frequency. band_index: 1-indexed, value: 0.0–1.0 (normalized).""" + get_client().cmd(f"/eq/freq/{band_index}", value) + return {"sent": f"/eq/freq/{band_index}", "value": value} + + @mcp.tool() + def eq_set_q(band_index: int, value: float) -> dict: + """Set the EQ band Q factor. band_index: 1-indexed, value: 0.0–1.0 (normalized).""" + get_client().cmd(f"/eq/q/{band_index}", value) + return {"sent": f"/eq/q/{band_index}", "value": value} + + @mcp.tool() + def eq_add() -> dict: + """Add an EQ+ device to the selected track.""" + get_client().cmd("/eq/add") + return {"sent": "/eq/add"} + + # --- State getters --- + + @mcp.tool() + def device_get_state() -> dict: + """Get the current state of the cursor device including parameters and pages.""" + c = get_client() + params = [] + for i in range(1, 9): + params.append({ + "index": i, + "exists": _v(c.get_state(f"/device/param/{i}/exists")), + "name": _v(c.get_state(f"/device/param/{i}/name")), + "value": _v(c.get_state(f"/device/param/{i}/value")), + "value_str": _v(c.get_state(f"/device/param/{i}/valueStr")), + "modulated_value": _v(c.get_state(f"/device/param/{i}/modulatedValue")), + }) + pages = [] + for i in range(1, 9): + pages.append({ + "index": i, + "exists": _v(c.get_state(f"/device/page/{i}/exists")), + "name": _v(c.get_state(f"/device/page/{i}/name")), + "selected": _v(c.get_state(f"/device/page/{i}/selected")), + }) + siblings = [] + for i in range(1, 9): + siblings.append({ + "index": i, + "exists": _v(c.get_state(f"/device/sibling/{i}/exists")), + "name": _v(c.get_state(f"/device/sibling/{i}/name")), + "bypass": _v(c.get_state(f"/device/sibling/{i}/bypass")), + "selected": _v(c.get_state(f"/device/sibling/{i}/selected")), + }) + layers = [] + for i in range(1, 9): + layers.append({ + "index": i, + "exists": _v(c.get_state(f"/device/layer/{i}/exists")), + "name": _v(c.get_state(f"/device/layer/{i}/name")), + "volume": _v(c.get_state(f"/device/layer/{i}/volume")), + "pan": _v(c.get_state(f"/device/layer/{i}/pan")), + "mute": _v(c.get_state(f"/device/layer/{i}/mute")), + "solo": _v(c.get_state(f"/device/layer/{i}/solo")), + "selected": _v(c.get_state(f"/device/layer/{i}/selected")), + "color": _v(c.get_state(f"/device/layer/{i}/color")), + }) + return { + "exists": _v(c.get_state("/device/exists")), + "name": _v(c.get_state("/device/name")), + "bypass": _v(c.get_state("/device/bypass")), + "expand": _v(c.get_state("/device/expand")), + "parameters_visible": _v(c.get_state("/device/parameters")), + "window_open": _v(c.get_state("/device/window")), + "pinned": _v(c.get_state("/device/pinned")), + "params": params, + "pages": pages, + "siblings": siblings, + "layers": layers, + } diff --git a/src/bitwig_mcp/tools/layout.py b/src/bitwig_mcp/tools/layout.py new file mode 100644 index 0000000..fc98bdb --- /dev/null +++ b/src/bitwig_mcp/tools/layout.py @@ -0,0 +1,233 @@ +"""MCP tools for Bitwig Studio layout and panel controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + # --- Global layout --- + + @mcp.tool() + def layout_set(layout_name: str) -> dict: + """Set the Bitwig panel layout. layout_name: e.g. 'SINGLE', 'DOUBLE', 'THREE', etc.""" + get_client().cmd("/layout", layout_name) + return {"sent": "/layout", "layout": layout_name} + + # --- Panels --- + + @mcp.tool() + def panel_toggle_note_editor(value: int | None = None) -> dict: + """Toggle or set the note editor panel. value=1 shows, value=0 hides, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/panel/noteEditor") + else: + c.cmd("/panel/noteEditor", value) + return {"sent": "/panel/noteEditor", "value": value} + + @mcp.tool() + def panel_toggle_automation_editor(value: int | None = None) -> dict: + """Toggle or set the automation editor panel. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/panel/automationEditor") + else: + c.cmd("/panel/automationEditor", value) + return {"sent": "/panel/automationEditor", "value": value} + + @mcp.tool() + def panel_toggle_devices(value: int | None = None) -> dict: + """Toggle or set the devices panel. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/panel/devices") + else: + c.cmd("/panel/devices", value) + return {"sent": "/panel/devices", "value": value} + + @mcp.tool() + def panel_toggle_mixer(value: int | None = None) -> dict: + """Toggle or set the mixer panel. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/panel/mixer") + else: + c.cmd("/panel/mixer", value) + return {"sent": "/panel/mixer", "value": value} + + @mcp.tool() + def panel_toggle_fullscreen(value: int | None = None) -> dict: + """Toggle or set fullscreen mode. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/panel/fullscreen") + else: + c.cmd("/panel/fullscreen", value) + return {"sent": "/panel/fullscreen", "value": value} + + # --- Arranger --- + + @mcp.tool() + def arranger_toggle_cue_markers(value: int | None = None) -> dict: + """Toggle cue marker visibility in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/cueMarkerVisibility") + else: + c.cmd("/arranger/cueMarkerVisibility", value) + return {"sent": "/arranger/cueMarkerVisibility", "value": value} + + @mcp.tool() + def arranger_toggle_playback_follow(value: int | None = None) -> dict: + """Toggle playback follow (auto-scroll) in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/playbackFollow") + else: + c.cmd("/arranger/playbackFollow", value) + return {"sent": "/arranger/playbackFollow", "value": value} + + @mcp.tool() + def arranger_toggle_track_height(value: int | None = None) -> dict: + """Toggle double track row height in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/trackRowHeight") + else: + c.cmd("/arranger/trackRowHeight", value) + return {"sent": "/arranger/trackRowHeight", "value": value} + + @mcp.tool() + def arranger_toggle_clip_launcher(value: int | None = None) -> dict: + """Toggle the clip launcher section visibility in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/clipLauncherSectionVisibility") + else: + c.cmd("/arranger/clipLauncherSectionVisibility", value) + return {"sent": "/arranger/clipLauncherSectionVisibility", "value": value} + + @mcp.tool() + def arranger_toggle_timeline(value: int | None = None) -> dict: + """Toggle the timeline visibility in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/timeLineVisibility") + else: + c.cmd("/arranger/timeLineVisibility", value) + return {"sent": "/arranger/timeLineVisibility", "value": value} + + @mcp.tool() + def arranger_toggle_io_section(value: int | None = None) -> dict: + """Toggle the I/O section visibility in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/ioSectionVisibility") + else: + c.cmd("/arranger/ioSectionVisibility", value) + return {"sent": "/arranger/ioSectionVisibility", "value": value} + + @mcp.tool() + def arranger_toggle_effect_tracks(value: int | None = None) -> dict: + """Toggle effect tracks visibility in the arranger. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/arranger/effectTracksVisibility") + else: + c.cmd("/arranger/effectTracksVisibility", value) + return {"sent": "/arranger/effectTracksVisibility", "value": value} + + # --- Mixer --- + + @mcp.tool() + def mixer_toggle_clip_launcher(value: int | None = None) -> dict: + """Toggle clip launcher section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/clipLauncherSectionVisibility") + else: + c.cmd("/mixer/clipLauncherSectionVisibility", value) + return {"sent": "/mixer/clipLauncherSectionVisibility", "value": value} + + @mcp.tool() + def mixer_toggle_crossfade_section(value: int | None = None) -> dict: + """Toggle the crossfade section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/crossFadeSectionVisibility") + else: + c.cmd("/mixer/crossFadeSectionVisibility", value) + return {"sent": "/mixer/crossFadeSectionVisibility", "value": value} + + @mcp.tool() + def mixer_toggle_device_section(value: int | None = None) -> dict: + """Toggle the device section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/deviceSectionVisibility") + else: + c.cmd("/mixer/deviceSectionVisibility", value) + return {"sent": "/mixer/deviceSectionVisibility", "value": value} + + @mcp.tool() + def mixer_toggle_sends_section(value: int | None = None) -> dict: + """Toggle the sends section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/sendsSectionVisibility") + else: + c.cmd("/mixer/sendsSectionVisibility", value) + return {"sent": "/mixer/sendsSectionVisibility", "value": value} + + @mcp.tool() + def mixer_toggle_io_section(value: int | None = None) -> dict: + """Toggle the I/O section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/ioSectionVisibility") + else: + c.cmd("/mixer/ioSectionVisibility", value) + return {"sent": "/mixer/ioSectionVisibility", "value": value} + + @mcp.tool() + def mixer_toggle_meter_section(value: int | None = None) -> dict: + """Toggle the meter/VU section in the mixer. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd("/mixer/meterSectionVisibility") + else: + c.cmd("/mixer/meterSectionVisibility", value) + return {"sent": "/mixer/meterSectionVisibility", "value": value} + + # --- State getters --- + + @mcp.tool() + def layout_get_state() -> dict: + """Get the current layout and panel visibility state.""" + c = get_client() + return { + "layout": _v(c.get_state("/layout")), + "arranger": { + "cue_markers": _v(c.get_state("/arranger/cueMarkerVisibility")), + "playback_follow": _v(c.get_state("/arranger/playbackFollow")), + "double_track_height": _v(c.get_state("/arranger/trackRowHeight")), + "clip_launcher_visible": _v(c.get_state("/arranger/clipLauncherSectionVisibility")), + "timeline_visible": _v(c.get_state("/arranger/timeLineVisibility")), + "io_visible": _v(c.get_state("/arranger/ioSectionVisibility")), + "effect_tracks_visible": _v(c.get_state("/arranger/effectTracksVisibility")), + }, + "mixer": { + "clip_launcher_visible": _v(c.get_state("/mixer/clipLauncherSectionVisibility")), + "crossfade_visible": _v(c.get_state("/mixer/crossFadeSectionVisibility")), + "devices_visible": _v(c.get_state("/mixer/deviceSectionVisibility")), + "sends_visible": _v(c.get_state("/mixer/sendsSectionVisibility")), + "io_visible": _v(c.get_state("/mixer/ioSectionVisibility")), + "meters_visible": _v(c.get_state("/mixer/meterSectionVisibility")), + }, + } diff --git a/src/bitwig_mcp/tools/master.py b/src/bitwig_mcp/tools/master.py new file mode 100644 index 0000000..4d96552 --- /dev/null +++ b/src/bitwig_mcp/tools/master.py @@ -0,0 +1,69 @@ +"""MCP tools for Bitwig Studio master track controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def master_set_volume(value: int) -> dict: + """Set the master track volume. value: 0–127 (100 = unity gain).""" + get_client().cmd("/master/volume", value) + return {"sent": "/master/volume", "value": value} + + @mcp.tool() + def master_reset_volume() -> dict: + """Reset the master track volume to the default (unity gain).""" + get_client().cmd("/master/volume/reset") + return {"sent": "/master/volume/reset"} + + @mcp.tool() + def master_set_pan(value: int) -> dict: + """Set the master track panning. value: 0–127 (64 = center).""" + get_client().cmd("/master/pan", value) + return {"sent": "/master/pan", "value": value} + + @mcp.tool() + def master_reset_pan() -> dict: + """Reset the master track panning to center.""" + get_client().cmd("/master/pan/reset") + return {"sent": "/master/pan/reset"} + + @mcp.tool() + def master_toggle_mute(value: int | None = None) -> dict: + """Toggle or set mute on the master track. value=1 mutes, value=0 unmutes, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/master/mute") + else: + c.cmd("/master/mute", value) + return {"sent": "/master/mute", "value": value} + + @mcp.tool() + def master_select() -> dict: + """Select (focus) the master track.""" + get_client().cmd("/master/select") + return {"sent": "/master/select"} + + @mcp.tool() + def master_get_state() -> dict: + """Get the complete current state of the master track.""" + c = get_client() + return { + "exists": _v(c.get_state("/master/exists")), + "name": _v(c.get_state("/master/name")), + "volume": _v(c.get_state("/master/volume")), + "volume_str": _v(c.get_state("/master/volumeStr")), + "pan": _v(c.get_state("/master/pan")), + "pan_str": _v(c.get_state("/master/panStr")), + "mute": _v(c.get_state("/master/mute")), + "selected": _v(c.get_state("/master/selected")), + "vu_left": _v(c.get_state("/master/vu/left")), + "vu_right": _v(c.get_state("/master/vu/right")), + } diff --git a/src/bitwig_mcp/tools/midi.py b/src/bitwig_mcp/tools/midi.py new file mode 100644 index 0000000..6c6f2ca --- /dev/null +++ b/src/bitwig_mcp/tools/midi.py @@ -0,0 +1,102 @@ +"""MCP tools for Bitwig Studio virtual MIDI keyboard via DrivenByMoss OSC.""" + +from bitwig_mcp.osc_client import get_client + + +def register(mcp) -> None: + + @mcp.tool() + def midi_note(channel: int, note: int, velocity: int) -> dict: + """Send a MIDI note on a channel. channel: 1–16, note: 0–127 (60=C4), velocity: 0–127 (0=note off).""" + get_client().cmd(f"/vkb_midi/{channel}/note/{note}", velocity) + return {"sent": f"/vkb_midi/{channel}/note/{note}", "velocity": velocity} + + @mcp.tool() + def midi_note_with_array(channel: int, note: int, velocity: int) -> dict: + """Send a MIDI note using array format. channel: 1–16, note: 0–127, velocity: 0–127.""" + get_client().cmd(f"/vkb_midi/{channel}/note", note, velocity) + return {"sent": f"/vkb_midi/{channel}/note", "note": note, "velocity": velocity} + + @mcp.tool() + def midi_note_octave(channel: int, direction: str) -> dict: + """Shift the note keyboard octave up or down. channel: 1–16, direction: '+' or '-'.""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/vkb_midi/{channel}/note/{direction}") + return {"sent": f"/vkb_midi/{channel}/note/{direction}"} + + @mcp.tool() + def midi_drum(channel: int, note: int, velocity: int) -> dict: + """Send a MIDI drum note. channel: 1–16, note: 0–127, velocity: 0–127 (0=note off).""" + get_client().cmd(f"/vkb_midi/{channel}/drum/{note}", velocity) + return {"sent": f"/vkb_midi/{channel}/drum/{note}", "velocity": velocity} + + @mcp.tool() + def midi_drum_octave(channel: int, direction: str) -> dict: + """Shift the drum pad octave up or down. channel: 1–16, direction: '+' or '-'.""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/vkb_midi/{channel}/drum/{direction}") + return {"sent": f"/vkb_midi/{channel}/drum/{direction}"} + + @mcp.tool() + def midi_cc(channel: int, cc: int, value: int) -> dict: + """Send a MIDI CC (Control Change) message. channel: 1–16, cc: 0–127, value: 0–127.""" + get_client().cmd(f"/vkb_midi/{channel}/cc/{cc}", value) + return {"sent": f"/vkb_midi/{channel}/cc/{cc}", "value": value} + + @mcp.tool() + def midi_channel_aftertouch(channel: int, pressure: int) -> dict: + """Send MIDI channel aftertouch (pressure). channel: 1–16, pressure: 0–127.""" + get_client().cmd(f"/vkb_midi/{channel}/aftertouch", pressure) + return {"sent": f"/vkb_midi/{channel}/aftertouch", "pressure": pressure} + + @mcp.tool() + def midi_poly_aftertouch(channel: int, note: int, pressure: int) -> dict: + """Send MIDI polyphonic aftertouch for a specific note. channel: 1–16, note: 0–127, pressure: 0–127.""" + get_client().cmd(f"/vkb_midi/{channel}/aftertouch/{note}", pressure) + return {"sent": f"/vkb_midi/{channel}/aftertouch/{note}", "pressure": pressure} + + @mcp.tool() + def midi_pitchbend(channel: int, value: int) -> dict: + """Send a MIDI pitch bend message. channel: 1–16, value: 0–127 (64 = center/no bend).""" + get_client().cmd(f"/vkb_midi/{channel}/pitchbend", value) + return {"sent": f"/vkb_midi/{channel}/pitchbend", "value": value} + + @mcp.tool() + def midi_set_fixed_velocity(velocity: int) -> dict: + """Set a fixed accent velocity for MIDI notes. velocity: 1–127. Set to 0 to disable fixed velocity.""" + get_client().cmd("/vkb_midi/velocity", velocity) + return {"sent": "/vkb_midi/velocity", "velocity": velocity} + + @mcp.tool() + def midi_note_repeat_toggle(value: int | None = None) -> dict: + """Toggle or set note repeat mode. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/vkb_midi/noterepeat/isActive") + else: + c.cmd("/vkb_midi/noterepeat/isActive", value) + return {"sent": "/vkb_midi/noterepeat/isActive", "value": value} + + @mcp.tool() + def midi_note_repeat_period(period: str) -> dict: + """Set the note repeat period. period: e.g. '1/32', '1/16', '1/8', '1/4', '1/2', '1'.""" + get_client().cmd("/vkb_midi/noterepeat/period", period) + return {"sent": "/vkb_midi/noterepeat/period", "period": period} + + @mcp.tool() + def midi_note_repeat_length(length: str) -> dict: + """Set the note repeat length. length: e.g. '1/32', '1/16', '1/8', '1/4', '1/2', '1'.""" + get_client().cmd("/vkb_midi/noterepeat/length", length) + return {"sent": "/vkb_midi/noterepeat/length", "length": length} + + @mcp.tool() + def midi_get_state() -> dict: + """Get the current MIDI virtual keyboard state including note repeat settings.""" + c = get_client() + return { + "note_repeat_active": c.get_state_value("/vkb_midi/noterepeat/isActive"), + "note_repeat_period": c.get_state_value("/vkb_midi/noterepeat/period"), + "note_repeat_length": c.get_state_value("/vkb_midi/noterepeat/length"), + } diff --git a/src/bitwig_mcp/tools/misc.py b/src/bitwig_mcp/tools/misc.py new file mode 100644 index 0000000..7ae082e --- /dev/null +++ b/src/bitwig_mcp/tools/misc.py @@ -0,0 +1,84 @@ +"""MCP tools for Bitwig Studio miscellaneous controls: undo/redo, markers, actions, state.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def undo() -> dict: + """Undo the last action in Bitwig Studio.""" + get_client().cmd("/undo") + return {"sent": "/undo"} + + @mcp.tool() + def redo() -> dict: + """Redo the last undone action in Bitwig Studio.""" + get_client().cmd("/redo") + return {"sent": "/redo"} + + @mcp.tool() + def action_execute(action_index: int) -> dict: + """Execute one of the 20 assignable custom actions. action_index: 1–20.""" + if not 1 <= action_index <= 20: + raise ValueError("action_index must be between 1 and 20") + get_client().cmd(f"/action/{action_index}") + return {"sent": f"/action/{action_index}"} + + @mcp.tool() + def marker_launch(marker_index: int) -> dict: + """Jump to and launch from a cue/arrangement marker. marker_index: 1-indexed.""" + get_client().cmd(f"/marker/{marker_index}/launch") + return {"sent": f"/marker/{marker_index}/launch"} + + @mcp.tool() + def marker_navigate(direction: str) -> dict: + """Navigate the marker bank. direction: 'bank+' (next page) or 'bank-' (prev page).""" + addr_map = {"bank+": "/marker/bank/+", "bank-": "/marker/bank/-"} + if direction not in addr_map: + raise ValueError("direction must be 'bank+' or 'bank-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + @mcp.tool() + def marker_get_all() -> list: + """Get the state of all markers in the current marker bank (up to 8).""" + c = get_client() + markers = [] + for i in range(1, 9): + markers.append({ + "index": i, + "exists": _v(c.get_state(f"/marker/{i}/exists")), + "name": _v(c.get_state(f"/marker/{i}/name")), + "color": _v(c.get_state(f"/marker/{i}/color")), + }) + return markers + + @mcp.tool() + def refresh_state() -> dict: + """Send /refresh to Bitwig to trigger a full state dump, then return the complete state.""" + c = get_client() + c.refresh() + return c.get_all_state() + + @mcp.tool() + def state_dump() -> dict: + """Return the entire current OSC state store without triggering a refresh. + Use refresh_state() first if you need up-to-date values.""" + return get_client().get_all_state() + + @mcp.tool() + def ping_bitwig() -> dict: + """Check if Bitwig Studio is running and DrivenByMoss is active. Returns connection status.""" + connected = get_client().ping() + return { + "connected": connected, + "message": "Bitwig + DrivenByMoss OSC is active" if connected + else "No response from Bitwig. Check that DrivenByMoss OSC extension is enabled.", + } diff --git a/src/bitwig_mcp/tools/project.py b/src/bitwig_mcp/tools/project.py new file mode 100644 index 0000000..9224f85 --- /dev/null +++ b/src/bitwig_mcp/tools/project.py @@ -0,0 +1,92 @@ +"""MCP tools for Bitwig Studio project management via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def project_save() -> dict: + """Save the current Bitwig project.""" + get_client().cmd("/project/save") + return {"sent": "/project/save"} + + @mcp.tool() + def project_navigate(direction: str) -> dict: + """Navigate between open projects. direction: '+' (next project) or '-' (previous project).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/project/{direction}") + return {"sent": f"/project/{direction}"} + + @mcp.tool() + def project_toggle_engine(value: int | None = None) -> dict: + """Toggle or set the Bitwig audio engine active state. value=1 activates, value=0 deactivates, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/project/engine") + else: + c.cmd("/project/engine", value) + return {"sent": "/project/engine", "value": value} + + @mcp.tool() + def project_set_param(param_index: int, value: float) -> dict: + """Set a project remote control parameter. param_index: 1–8, value: 0.0–1.0.""" + get_client().cmd(f"/project/param/{param_index}/value", value) + return {"sent": f"/project/param/{param_index}/value", "value": value} + + @mcp.tool() + def project_reset_param(param_index: int) -> dict: + """Reset a project remote control parameter to its default. param_index: 1–8.""" + get_client().cmd(f"/project/param/{param_index}/reset") + return {"sent": f"/project/param/{param_index}/reset"} + + @mcp.tool() + def project_navigate_params(direction: str) -> dict: + """Navigate project remote control parameters. direction: '+', '-', 'page+', 'page-'.""" + addr_map = {"+": "/project/param/+", "-": "/project/param/-", + "page+": "/project/param/bank/page/+", "page-": "/project/param/bank/page/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'page+', or 'page-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + @mcp.tool() + def project_select_page(page_index: int) -> dict: + """Select a project remote control page. page_index: 1-indexed.""" + get_client().cmd(f"/project/page/{page_index}/selected", 1) + return {"sent": f"/project/page/{page_index}/selected"} + + @mcp.tool() + def project_get_state() -> dict: + """Get the current project state including name, engine status, and parameters.""" + c = get_client() + params = [] + for i in range(1, 9): + params.append({ + "index": i, + "exists": _v(c.get_state(f"/project/param/{i}/exists")), + "name": _v(c.get_state(f"/project/param/{i}/name")), + "value": _v(c.get_state(f"/project/param/{i}/value")), + "value_str": _v(c.get_state(f"/project/param/{i}/valueStr")), + }) + pages = [] + for i in range(1, 9): + pages.append({ + "index": i, + "exists": _v(c.get_state(f"/project/page/{i}/exists")), + "name": _v(c.get_state(f"/project/page/{i}/name")), + "selected": _v(c.get_state(f"/project/page/{i}/selected")), + }) + return { + "name": _v(c.get_state("/project/name")), + "engine_active": _v(c.get_state("/project/engine")), + "params": params, + "pages": pages, + } diff --git a/src/bitwig_mcp/tools/scene.py b/src/bitwig_mcp/tools/scene.py new file mode 100644 index 0000000..2131f1e --- /dev/null +++ b/src/bitwig_mcp/tools/scene.py @@ -0,0 +1,91 @@ +"""MCP tools for Bitwig Studio scene controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def scene_launch(scene_index: int) -> dict: + """Launch a scene (triggers all clips in the row). scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/launch") + return {"sent": f"/scene/{scene_index}/launch"} + + @mcp.tool() + def scene_launch_alt(scene_index: int) -> dict: + """Launch a scene (alternative mode). scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/launchAlt") + return {"sent": f"/scene/{scene_index}/launchAlt"} + + @mcp.tool() + def scene_select(scene_index: int) -> dict: + """Select a scene. scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/select") + return {"sent": f"/scene/{scene_index}/select"} + + @mcp.tool() + def scene_duplicate(scene_index: int) -> dict: + """Duplicate a scene. scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/duplicate") + return {"sent": f"/scene/{scene_index}/duplicate"} + + @mcp.tool() + def scene_remove(scene_index: int) -> dict: + """Remove/delete a scene. scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/remove") + return {"sent": f"/scene/{scene_index}/remove"} + + @mcp.tool() + def scene_set_name(scene_index: int, name: str) -> dict: + """Set the name of a scene. scene_index: 1-indexed.""" + get_client().cmd(f"/scene/{scene_index}/name", name) + return {"sent": f"/scene/{scene_index}/name", "name": name} + + @mcp.tool() + def scene_set_color(scene_index: int, color: str) -> dict: + """Set the color of a scene. scene_index: 1-indexed. color: RGB string e.g. 'rgb(255,128,0)'.""" + get_client().cmd(f"/scene/{scene_index}/color", color) + return {"sent": f"/scene/{scene_index}/color", "color": color} + + @mcp.tool() + def scene_navigate(direction: str) -> dict: + """Scroll the scene bank. direction: '+' (next), '-' (prev), 'bank+' (next page), 'bank-' (prev page).""" + addr_map = {"+": "/scene/+", "-": "/scene/-", + "bank+": "/scene/bank/+", "bank-": "/scene/bank/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'bank+', or 'bank-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + @mcp.tool() + def scene_add() -> dict: + """Add a new empty scene.""" + get_client().cmd("/scene/add") + return {"sent": "/scene/add"} + + @mcp.tool() + def scene_create_from_playing() -> dict: + """Create a new scene from all currently playing clips.""" + get_client().cmd("/scene/create") + return {"sent": "/scene/create"} + + @mcp.tool() + def scene_get_all() -> list: + """Get the state of all scenes in the current scene bank (up to 8).""" + c = get_client() + scenes = [] + for i in range(1, 9): + scenes.append({ + "index": i, + "exists": _v(c.get_state(f"/scene/{i}/exists")), + "name": _v(c.get_state(f"/scene/{i}/name")), + "selected": _v(c.get_state(f"/scene/{i}/selected")), + "color": _v(c.get_state(f"/scene/{i}/color")), + }) + return scenes diff --git a/src/bitwig_mcp/tools/track.py b/src/bitwig_mcp/tools/track.py new file mode 100644 index 0000000..9450482 --- /dev/null +++ b/src/bitwig_mcp/tools/track.py @@ -0,0 +1,419 @@ +"""MCP tools for Bitwig Studio track controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def _track_state(c, n: int) -> dict: + """Build a state dict for track number n (1-indexed).""" + t = f"/track/{n}" + clips = [] + for ci in range(1, 9): + clip_state = { + "has_content": _v(c.get_state(f"{t}/clip/{ci}/hasContent")), + "name": _v(c.get_state(f"{t}/clip/{ci}/name")), + "is_playing": _v(c.get_state(f"{t}/clip/{ci}/isPlaying")), + "is_recording": _v(c.get_state(f"{t}/clip/{ci}/isRecording")), + "is_queued": _v(c.get_state(f"{t}/clip/{ci}/isPlayingQueued")), + "is_recording_queued": _v(c.get_state(f"{t}/clip/{ci}/isRecordingQueued")), + "is_stop_queued": _v(c.get_state(f"{t}/clip/{ci}/isStopQueued")), + "is_selected": _v(c.get_state(f"{t}/clip/{ci}/isSelected")), + "color": _v(c.get_state(f"{t}/clip/{ci}/color")), + } + clips.append(clip_state) + sends = [] + for si in range(1, 9): + send_state = { + "volume": _v(c.get_state(f"{t}/send/{si}/volume")), + "volume_str": _v(c.get_state(f"{t}/send/{si}/volumeStr")), + "activated": _v(c.get_state(f"{t}/send/{si}/activated")), + } + sends.append(send_state) + return { + "index": n, + "exists": _v(c.get_state(f"{t}/exists")), + "name": _v(c.get_state(f"{t}/name")), + "type": _v(c.get_state(f"{t}/type")), + "activated": _v(c.get_state(f"{t}/activated")), + "selected": _v(c.get_state(f"{t}/selected")), + "is_group": _v(c.get_state(f"{t}/isGroup")), + "can_hold_notes": _v(c.get_state(f"{t}/canHoldNotes")), + "can_hold_audio": _v(c.get_state(f"{t}/canHoldAudioData")), + "volume": _v(c.get_state(f"{t}/volume")), + "volume_str": _v(c.get_state(f"{t}/volumeStr")), + "pan": _v(c.get_state(f"{t}/pan")), + "pan_str": _v(c.get_state(f"{t}/panStr")), + "mute": _v(c.get_state(f"{t}/mute")), + "solo": _v(c.get_state(f"{t}/solo")), + "recarm": _v(c.get_state(f"{t}/recarm")), + "monitor": _v(c.get_state(f"{t}/monitor")), + "auto_monitor": _v(c.get_state(f"{t}/autoMonitor")), + "color": _v(c.get_state(f"{t}/color")), + "position": _v(c.get_state(f"{t}/position")), + "clips": clips, + "sends": sends, + } + + +def register(mcp) -> None: + + @mcp.tool() + def track_set_volume(track_index: int, value: int) -> dict: + """Set a track's volume. track_index: 1–8. value: 0–127 (100 = unity gain).""" + get_client().cmd(f"/track/{track_index}/volume", value) + return {"sent": f"/track/{track_index}/volume", "value": value} + + @mcp.tool() + def track_reset_volume(track_index: int) -> dict: + """Reset a track's volume to the default (unity gain). track_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/volume/reset") + return {"sent": f"/track/{track_index}/volume/reset"} + + @mcp.tool() + def track_set_pan(track_index: int, value: int) -> dict: + """Set a track's panning. track_index: 1–8. value: 0–127 (64 = center).""" + get_client().cmd(f"/track/{track_index}/pan", value) + return {"sent": f"/track/{track_index}/pan", "value": value} + + @mcp.tool() + def track_reset_pan(track_index: int) -> dict: + """Reset a track's panning to center. track_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/pan/reset") + return {"sent": f"/track/{track_index}/pan/reset"} + + @mcp.tool() + def track_toggle_mute(track_index: int, value: int | None = None) -> dict: + """Toggle or set mute on a track. track_index: 1–8. value=1 mutes, value=0 unmutes, omit to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/mute") + else: + c.cmd(f"/track/{track_index}/mute", value) + return {"sent": f"/track/{track_index}/mute", "value": value} + + @mcp.tool() + def track_toggle_solo(track_index: int, value: int | None = None) -> dict: + """Toggle or set solo on a track. track_index: 1–8. value=1 solos, value=0 unsolos, omit to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/solo") + else: + c.cmd(f"/track/{track_index}/solo", value) + return {"sent": f"/track/{track_index}/solo", "value": value} + + @mcp.tool() + def track_toggle_recarm(track_index: int, value: int | None = None) -> dict: + """Toggle or set record arm on a track. track_index: 1–8. value=1 arms, value=0 disarms, omit to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/recarm") + else: + c.cmd(f"/track/{track_index}/recarm", value) + return {"sent": f"/track/{track_index}/recarm", "value": value} + + @mcp.tool() + def track_toggle_monitor(track_index: int, value: int | None = None) -> dict: + """Toggle or set input monitoring on a track. track_index: 1–8. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/monitor") + else: + c.cmd(f"/track/{track_index}/monitor", value) + return {"sent": f"/track/{track_index}/monitor", "value": value} + + @mcp.tool() + def track_toggle_auto_monitor(track_index: int, value: int | None = None) -> dict: + """Toggle or set auto-input monitoring on a track. track_index: 1–8. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/autoMonitor") + else: + c.cmd(f"/track/{track_index}/autoMonitor", value) + return {"sent": f"/track/{track_index}/autoMonitor", "value": value} + + @mcp.tool() + def track_select(track_index: int) -> dict: + """Select (focus) a track. track_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/select") + return {"sent": f"/track/{track_index}/select"} + + @mcp.tool() + def track_set_crossfade_mode(track_index: int, mode: str) -> dict: + """Set the crossfade assignment for a track. track_index: 1–8. mode: 'A', 'B', or 'AB'.""" + if mode not in ("A", "B", "AB"): + raise ValueError("mode must be 'A', 'B', or 'AB'") + get_client().cmd(f"/track/{track_index}/crossfadeMode/{mode}") + return {"sent": f"/track/{track_index}/crossfadeMode/{mode}"} + + @mcp.tool() + def track_set_send_volume(track_index: int, send_index: int, value: int) -> dict: + """Set a send volume for a track. track_index: 1–8, send_index: 1–8, value: 0–127.""" + get_client().cmd(f"/track/{track_index}/send/{send_index}/volume", value) + return {"sent": f"/track/{track_index}/send/{send_index}/volume", "value": value} + + @mcp.tool() + def track_reset_send_volume(track_index: int, send_index: int) -> dict: + """Reset a send volume to default. track_index: 1–8, send_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/send/{send_index}/volume/reset") + return {"sent": f"/track/{track_index}/send/{send_index}/volume/reset"} + + @mcp.tool() + def track_toggle_send(track_index: int, send_index: int, value: int | None = None) -> dict: + """Toggle or set a send's active state. track_index: 1–8, send_index: 1–8. omit value to toggle.""" + c = get_client() + if value is None: + c.cmd(f"/track/{track_index}/send/{send_index}/activated") + else: + c.cmd(f"/track/{track_index}/send/{send_index}/activated", value) + return {"sent": f"/track/{track_index}/send/{send_index}/activated", "value": value} + + @mcp.tool() + def track_clip_launch(track_index: int, clip_index: int) -> dict: + """Launch a clip slot. track_index: 1–8, clip_index: 1–8 (scene row).""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/launch") + return {"sent": f"/track/{track_index}/clip/{clip_index}/launch"} + + @mcp.tool() + def track_clip_launch_alt(track_index: int, clip_index: int) -> dict: + """Launch a clip slot (alternative mode — behavior set in DrivenByMoss). track_index: 1–8, clip_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/launchAlt") + return {"sent": f"/track/{track_index}/clip/{clip_index}/launchAlt"} + + @mcp.tool() + def track_clip_record(track_index: int, clip_index: int) -> dict: + """Start recording into a clip slot. track_index: 1–8, clip_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/record") + return {"sent": f"/track/{track_index}/clip/{clip_index}/record"} + + @mcp.tool() + def track_clip_create(track_index: int, clip_index: int, length_beats: float = 4.0) -> dict: + """Create a new empty clip in a slot. track_index: 1–8, clip_index: 1–8, length_beats: clip length in beats.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/create", length_beats) + return {"sent": f"/track/{track_index}/clip/{clip_index}/create", "length_beats": length_beats} + + @mcp.tool() + def track_clip_duplicate(track_index: int, clip_index: int) -> dict: + """Duplicate a clip slot. track_index: 1–8, clip_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/duplicate") + return {"sent": f"/track/{track_index}/clip/{clip_index}/duplicate"} + + @mcp.tool() + def track_clip_remove(track_index: int, clip_index: int) -> dict: + """Remove/delete a clip from a slot. track_index: 1–8, clip_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/remove") + return {"sent": f"/track/{track_index}/clip/{clip_index}/remove"} + + @mcp.tool() + def track_clip_set_name(track_index: int, clip_index: int, name: str) -> dict: + """Set the name of a clip. track_index: 1–8, clip_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/name", name) + return {"sent": f"/track/{track_index}/clip/{clip_index}/name", "name": name} + + @mcp.tool() + def track_clip_set_color(track_index: int, clip_index: int, color: str) -> dict: + """Set the color of a clip. track_index: 1–8, clip_index: 1–8. color: RGB string e.g. 'rgb(255,128,0)'.""" + get_client().cmd(f"/track/{track_index}/clip/{clip_index}/color", color) + return {"sent": f"/track/{track_index}/clip/{clip_index}/color", "color": color} + + @mcp.tool() + def track_navigate_next() -> dict: + """Select the next track in the track bank.""" + get_client().cmd("/track/+") + return {"sent": "/track/+"} + + @mcp.tool() + def track_navigate_prev() -> dict: + """Select the previous track in the track bank.""" + get_client().cmd("/track/-") + return {"sent": "/track/-"} + + @mcp.tool() + def track_bank_next() -> dict: + """Scroll the track bank forward by one page.""" + get_client().cmd("/track/bank/+") + return {"sent": "/track/bank/+"} + + @mcp.tool() + def track_bank_prev() -> dict: + """Scroll the track bank backward by one page.""" + get_client().cmd("/track/bank/-") + return {"sent": "/track/bank/-"} + + @mcp.tool() + def track_navigate_parent() -> dict: + """Navigate to the parent group track (if inside a group).""" + get_client().cmd("/track/parent") + return {"sent": "/track/parent"} + + @mcp.tool() + def track_enter_group(track_index: int) -> dict: + """Enter a group track to view its children. track_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/enter") + return {"sent": f"/track/{track_index}/enter"} + + @mcp.tool() + def track_add_audio() -> dict: + """Add a new audio track to the project.""" + get_client().cmd("/track/add/audio") + return {"sent": "/track/add/audio"} + + @mcp.tool() + def track_add_instrument() -> dict: + """Add a new instrument track to the project.""" + get_client().cmd("/track/add/instrument") + return {"sent": "/track/add/instrument"} + + @mcp.tool() + def track_add_effect() -> dict: + """Add a new effect/FX track to the project.""" + get_client().cmd("/track/add/effect") + return {"sent": "/track/add/effect"} + + @mcp.tool() + def track_stop_clips(track_index: int) -> dict: + """Stop all clips playing on a track. track_index: 1–8.""" + get_client().cmd(f"/track/{track_index}/stop") + return {"sent": f"/track/{track_index}/stop"} + + @mcp.tool() + def track_toggle_bank() -> dict: + """Toggle between the main track bank and the effect/return track bank.""" + get_client().cmd("/track/toggleBank") + return {"sent": "/track/toggleBank"} + + @mcp.tool() + def track_set_param(param_index: int, value: float) -> dict: + """Set a remote control parameter for the selected track. param_index: 1–8, value: 0.0–1.0.""" + get_client().cmd(f"/track/param/{param_index}/value", value) + return {"sent": f"/track/param/{param_index}/value", "value": value} + + @mcp.tool() + def track_reset_param(param_index: int) -> dict: + """Reset a remote control parameter for the selected track to default. param_index: 1–8.""" + get_client().cmd(f"/track/param/{param_index}/reset") + return {"sent": f"/track/param/{param_index}/reset"} + + @mcp.tool() + def track_navigate_params(direction: str) -> dict: + """Navigate track remote control parameters. direction: '+' (next), '-' (prev), 'page+' (next page), 'page-' (prev page).""" + addr_map = {"+": "/track/param/+", "-": "/track/param/-", + "page+": "/track/param/bank/page/+", "page-": "/track/param/bank/page/-"} + if direction not in addr_map: + raise ValueError("direction must be '+', '-', 'page+', or 'page-'") + get_client().cmd(addr_map[direction]) + return {"sent": addr_map[direction]} + + @mcp.tool() + def track_indicate_volume(all_tracks: bool = True) -> dict: + """Enable volume indication on tracks (highlights parameter in Bitwig). all_tracks: affect all or just selected.""" + get_client().cmd("/track/indicate/volume", 1 if all_tracks else 0) + return {"sent": "/track/indicate/volume", "all_tracks": all_tracks} + + @mcp.tool() + def track_indicate_pan(all_tracks: bool = True) -> dict: + """Enable pan indication on tracks. all_tracks: affect all or just selected.""" + get_client().cmd("/track/indicate/pan", 1 if all_tracks else 0) + return {"sent": "/track/indicate/pan", "all_tracks": all_tracks} + + @mcp.tool() + def track_indicate_send(send_index: int) -> dict: + """Enable send indication for a specific send slot. send_index: 1–8.""" + get_client().cmd(f"/track/indicate/send/{send_index}") + return {"sent": f"/track/indicate/send/{send_index}"} + + @mcp.tool() + def track_toggle_vu_meters(value: int | None = None) -> dict: + """Toggle or set VU meter feedback. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/track/vu") + else: + c.cmd("/track/vu", value) + return {"sent": "/track/vu", "value": value} + + @mcp.tool() + def track_get_info(track_index: int) -> dict: + """Get the full current state for a single track. track_index: 1–8.""" + return _track_state(get_client(), track_index) + + @mcp.tool() + def track_get_all() -> list: + """Get the current state for all 8 tracks in the current bank.""" + c = get_client() + return [_track_state(c, n) for n in range(1, 9)] + + @mcp.tool() + def track_selected_get_state() -> dict: + """Get the full state of the currently selected (cursor) track.""" + c = get_client() + prefix = "/track/selected" + clips = [] + for ci in range(1, 9): + clips.append({ + "has_content": _v(c.get_state(f"{prefix}/clip/{ci}/hasContent")), + "name": _v(c.get_state(f"{prefix}/clip/{ci}/name")), + "is_playing": _v(c.get_state(f"{prefix}/clip/{ci}/isPlaying")), + "is_recording": _v(c.get_state(f"{prefix}/clip/{ci}/isRecording")), + "color": _v(c.get_state(f"{prefix}/clip/{ci}/color")), + }) + return { + "name": _v(c.get_state(f"{prefix}/name")), + "type": _v(c.get_state(f"{prefix}/type")), + "volume": _v(c.get_state(f"{prefix}/volume")), + "volume_str": _v(c.get_state(f"{prefix}/volumeStr")), + "pan": _v(c.get_state(f"{prefix}/pan")), + "pan_str": _v(c.get_state(f"{prefix}/panStr")), + "mute": _v(c.get_state(f"{prefix}/mute")), + "solo": _v(c.get_state(f"{prefix}/solo")), + "recarm": _v(c.get_state(f"{prefix}/recarm")), + "monitor": _v(c.get_state(f"{prefix}/monitor")), + "color": _v(c.get_state(f"{prefix}/color")), + "clips": clips, + } + + @mcp.tool() + def track_selected_set_volume(value: int) -> dict: + """Set the volume of the currently selected (cursor) track. value: 0–127.""" + get_client().cmd("/track/selected/volume", value) + return {"sent": "/track/selected/volume", "value": value} + + @mcp.tool() + def track_selected_set_pan(value: int) -> dict: + """Set the pan of the currently selected (cursor) track. value: 0–127 (64 = center).""" + get_client().cmd("/track/selected/pan", value) + return {"sent": "/track/selected/pan", "value": value} + + @mcp.tool() + def track_selected_toggle_mute(value: int | None = None) -> dict: + """Toggle or set mute on the currently selected (cursor) track.""" + c = get_client() + if value is None: + c.cmd("/track/selected/mute") + else: + c.cmd("/track/selected/mute", value) + return {"sent": "/track/selected/mute", "value": value} + + @mcp.tool() + def track_selected_toggle_solo(value: int | None = None) -> dict: + """Toggle or set solo on the currently selected (cursor) track.""" + c = get_client() + if value is None: + c.cmd("/track/selected/solo") + else: + c.cmd("/track/selected/solo", value) + return {"sent": "/track/selected/solo", "value": value} + + @mcp.tool() + def track_selected_toggle_recarm(value: int | None = None) -> dict: + """Toggle or set record arm on the currently selected (cursor) track.""" + c = get_client() + if value is None: + c.cmd("/track/selected/recarm") + else: + c.cmd("/track/selected/recarm", value) + return {"sent": "/track/selected/recarm", "value": value} diff --git a/src/bitwig_mcp/tools/transport.py b/src/bitwig_mcp/tools/transport.py new file mode 100644 index 0000000..c804ac2 --- /dev/null +++ b/src/bitwig_mcp/tools/transport.py @@ -0,0 +1,270 @@ +"""MCP tools for Bitwig Studio transport controls via DrivenByMoss OSC.""" + +from typing import Any + +from bitwig_mcp.osc_client import get_client + + +def _v(state: tuple | None, default: Any = None) -> Any: + return state[0] if state else default + + +def register(mcp) -> None: + + @mcp.tool() + def transport_play(value: int = 1) -> dict: + """Start or set Bitwig playback. value=1 plays, value=0 pauses/stops.""" + get_client().cmd("/play", value) + return {"sent": "/play", "value": value} + + @mcp.tool() + def transport_stop() -> dict: + """Stop Bitwig playback. Stops and rewinds if already stopped.""" + get_client().cmd("/stop") + return {"sent": "/stop"} + + @mcp.tool() + def transport_restart() -> dict: + """Restart Bitwig playback from the beginning.""" + get_client().cmd("/restart") + return {"sent": "/restart"} + + @mcp.tool() + def transport_toggle_record(value: int | None = None) -> dict: + """Toggle or set arranger record mode. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/record") + else: + c.cmd("/record", value) + return {"sent": "/record", "value": value} + + @mcp.tool() + def transport_toggle_overdub(value: int | None = None) -> dict: + """Toggle or set arranger overdub. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/overdub") + else: + c.cmd("/overdub", value) + return {"sent": "/overdub", "value": value} + + @mcp.tool() + def transport_toggle_launcher_overdub(value: int | None = None) -> dict: + """Toggle or set launcher (clip) overdub. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/overdub/launcher") + else: + c.cmd("/overdub/launcher", value) + return {"sent": "/overdub/launcher", "value": value} + + @mcp.tool() + def transport_toggle_loop(value: int | None = None) -> dict: + """Toggle or set loop/repeat mode. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/repeat") + else: + c.cmd("/repeat", value) + return {"sent": "/repeat", "value": value} + + @mcp.tool() + def transport_toggle_punch_in(value: int | None = None) -> dict: + """Toggle or set punch-in recording. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/punchIn") + else: + c.cmd("/punchIn", value) + return {"sent": "/punchIn", "value": value} + + @mcp.tool() + def transport_toggle_punch_out(value: int | None = None) -> dict: + """Toggle or set punch-out recording. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/punchOut") + else: + c.cmd("/punchOut", value) + return {"sent": "/punchOut", "value": value} + + @mcp.tool() + def transport_toggle_click(value: int | None = None) -> dict: + """Toggle or set the metronome click. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/click") + else: + c.cmd("/click", value) + return {"sent": "/click", "value": value} + + @mcp.tool() + def transport_set_click_volume(value: int) -> dict: + """Set the metronome click volume. value: 0–100.""" + get_client().cmd("/click/volume", value) + return {"sent": "/click/volume", "value": value} + + @mcp.tool() + def transport_toggle_click_ticks(value: int | None = None) -> dict: + """Toggle or set metronome ticks. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/click/ticks") + else: + c.cmd("/click/ticks", value) + return {"sent": "/click/ticks", "value": value} + + @mcp.tool() + def transport_toggle_click_preroll(value: int | None = None) -> dict: + """Toggle or set metronome pre-roll. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/click/preroll") + else: + c.cmd("/click/preroll", value) + return {"sent": "/click/preroll", "value": value} + + @mcp.tool() + def transport_quantize() -> dict: + """Quantize the current cursor clip.""" + get_client().cmd("/quantize") + return {"sent": "/quantize"} + + @mcp.tool() + def transport_set_tempo(bpm: float) -> dict: + """Set the project tempo in BPM.""" + get_client().cmd("/tempo/raw", bpm) + return {"sent": "/tempo/raw", "bpm": bpm} + + @mcp.tool() + def transport_tap_tempo() -> dict: + """Tap tempo — call repeatedly to set BPM by tapping.""" + get_client().cmd("/tempo/tap") + return {"sent": "/tempo/tap"} + + @mcp.tool() + def transport_adjust_tempo(amount: float, direction: str = "+") -> dict: + """Nudge the tempo up or down. direction: '+' or '-', amount: BPM delta.""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + get_client().cmd(f"/tempo/{direction}", amount) + return {"sent": f"/tempo/{direction}", "amount": amount} + + @mcp.tool() + def transport_set_position(value: float) -> dict: + """Set the timeline playback position as a beat value (float).""" + get_client().cmd("/time", value) + return {"sent": "/time", "value": value} + + @mcp.tool() + def transport_step_position(direction: str, size: str = "small") -> dict: + """Step the playback position forward or backward. + direction: '+' (forward) or '-' (backward). + size: 'small' (single step) or 'large' (bigger jump).""" + if direction not in ("+", "-"): + raise ValueError("direction must be '+' or '-'") + addr = f"/position/{direction}" if size == "small" else f"/position/{direction * 2}" + get_client().cmd(addr) + return {"sent": addr} + + @mcp.tool() + def transport_goto_start() -> dict: + """Move the playback position to the beginning of the arrangement.""" + get_client().cmd("/position/start") + return {"sent": "/position/start"} + + @mcp.tool() + def transport_set_crossfade(value: int) -> dict: + """Set the crossfader position. value: 0–127 (64 = center).""" + get_client().cmd("/crossfade", value) + return {"sent": "/crossfade", "value": value} + + @mcp.tool() + def transport_reset_crossfade() -> dict: + """Reset the crossfader to the center position.""" + get_client().cmd("/crossfade/reset") + return {"sent": "/crossfade/reset"} + + @mcp.tool() + def transport_toggle_autowrite(value: int | None = None) -> dict: + """Toggle or set arranger automation write mode. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/autowrite") + else: + c.cmd("/autowrite", value) + return {"sent": "/autowrite", "value": value} + + @mcp.tool() + def transport_toggle_launcher_autowrite(value: int | None = None) -> dict: + """Toggle or set launcher automation write mode. value=1 on, value=0 off, omit to toggle.""" + c = get_client() + if value is None: + c.cmd("/autowrite/launcher") + else: + c.cmd("/autowrite/launcher", value) + return {"sent": "/autowrite/launcher", "value": value} + + @mcp.tool() + def transport_set_automation_mode(mode: str) -> dict: + """Set the automation write mode. mode: 'OFF', 'LATCH', or 'TOUCH'.""" + if mode not in ("OFF", "LATCH", "TOUCH"): + raise ValueError("mode must be 'OFF', 'LATCH', or 'TOUCH'") + get_client().cmd("/automationWriteMode", mode) + return {"sent": "/automationWriteMode", "mode": mode} + + @mcp.tool() + def transport_set_preroll(measures: int) -> dict: + """Set the pre-roll measure count before recording starts. measures: integer.""" + get_client().cmd("/preroll", measures) + return {"sent": "/preroll", "measures": measures} + + @mcp.tool() + def transport_set_post_recording_action(action: str) -> dict: + """Set the launcher post-recording action (e.g. 'OFF', 'PLAY_RECORDED', 'RECORD_NEXT_FREE_SLOT', 'STOP', 'RETURN_TO_ARRANGEMENT').""" + get_client().cmd("/launcher/postRecordingAction", action) + return {"sent": "/launcher/postRecordingAction", "action": action} + + @mcp.tool() + def transport_set_post_recording_offset(beats: float) -> dict: + """Set the launcher post-recording time offset in beats.""" + get_client().cmd("/launcher/postRecordingTimeOffset", beats) + return {"sent": "/launcher/postRecordingTimeOffset", "beats": beats} + + @mcp.tool() + def transport_set_launch_quantization(quantization: str) -> dict: + """Set the default clip launch quantization (e.g. '1/8', '1/4', '1/2', '1', '2', '4', '8', 'NONE').""" + get_client().cmd("/launcher/defaultQuantization", quantization) + return {"sent": "/launcher/defaultQuantization", "quantization": quantization} + + @mcp.tool() + def transport_get_state() -> dict: + """Get the complete current transport state from Bitwig.""" + c = get_client() + return { + "playing": _v(c.get_state("/play")), + "recording": _v(c.get_state("/record")), + "overdub": _v(c.get_state("/overdub")), + "launcher_overdub": _v(c.get_state("/overdub/launcher")), + "loop": _v(c.get_state("/repeat")), + "punch_in": _v(c.get_state("/punchIn")), + "punch_out": _v(c.get_state("/punchOut")), + "click": _v(c.get_state("/click")), + "click_ticks": _v(c.get_state("/click/ticks")), + "click_volume": _v(c.get_state("/click/volume")), + "click_preroll": _v(c.get_state("/click/preroll")), + "tempo_bpm": _v(c.get_state("/tempo/raw")), + "time": _v(c.get_state("/time")), + "time_str": _v(c.get_state("/time/str")), + "beat_str": _v(c.get_state("/beat/str")), + "time_signature": _v(c.get_state("/time/signature")), + "crossfade": _v(c.get_state("/crossfade")), + "autowrite": _v(c.get_state("/autowrite")), + "launcher_autowrite": _v(c.get_state("/autowrite/launcher")), + "automation_mode": _v(c.get_state("/automationWriteMode")), + "preroll": _v(c.get_state("/preroll")), + "post_recording_action": _v(c.get_state("/launcher/postRecordingAction")), + "post_recording_offset": _v(c.get_state("/launcher/postRecordingTimeOffset")), + "launch_quantization": _v(c.get_state("/launcher/defaultQuantization")), + }