From 2a493541004d2e8760e14f3584860f522e431275 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kr=C3=BCger?= Date: Tue, 2 Jun 2026 18:53:40 +0200 Subject: [PATCH] Add clip_write_midi and clip_insert_file tools; fix ping() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - clip_write_midi: generates a type-0 MIDI file from note dicts (note, start_beat, duration_beats, velocity, channel) and inserts it into a clip slot via /track/N/clip/N/insertFile. No new deps — MIDI serialisation uses struct + stdlib only. - clip_insert_file: raw wrapper for inserting any .mid or .bwclip. - Both tools accept track/clip params (1-indexed) to target a specific slot; omitting them falls back to /clip/insertFile (cursor clip). - Fix ping(): old impl returned False when state cache was already fully populated. Now waits on /update which Bitwig always emits in response to /refresh. Co-Authored-By: Claude Sonnet 4.6 --- src/bitwig_mcp/osc_client.py | 25 +++++++---- src/bitwig_mcp/tools/clip.py | 81 ++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 9 deletions(-) diff --git a/src/bitwig_mcp/osc_client.py b/src/bitwig_mcp/osc_client.py index 9ffc0d1..adc6143 100644 --- a/src/bitwig_mcp/osc_client.py +++ b/src/bitwig_mcp/osc_client.py @@ -113,15 +113,22 @@ class OSCClient: time.sleep(BITWIG_REFRESH_WAIT) def ping(self) -> bool: - """Send /refresh and return True if any state is received within timeout.""" - count_before = len(self.get_all_state()) - self.cmd("/refresh") - deadline = time.monotonic() + BITWIG_TIMEOUT - while time.monotonic() < deadline: - if len(self.get_all_state()) > count_before: - return True - time.sleep(0.05) - return False + """Send /refresh and return True if Bitwig responds within timeout.""" + event = threading.Event() + with self._waiters_lock: + self._waiters.setdefault("/update", []).append(event) + try: + self.cmd("/refresh") + return event.wait(BITWIG_TIMEOUT) + finally: + with self._waiters_lock: + waiters = self._waiters.get("/update", []) + try: + waiters.remove(event) + except ValueError: + pass + if not waiters: + self._waiters.pop("/update", None) def get_state_dict(self, prefix: str) -> dict[str, Any]: """Return all state entries whose address starts with prefix.""" diff --git a/src/bitwig_mcp/tools/clip.py b/src/bitwig_mcp/tools/clip.py index cb52f79..6b9d1b7 100644 --- a/src/bitwig_mcp/tools/clip.py +++ b/src/bitwig_mcp/tools/clip.py @@ -1,10 +1,51 @@ """MCP tools for Bitwig Studio cursor clip controls via DrivenByMoss OSC.""" +import struct +import tempfile from typing import Any from bitwig_mcp.osc_client import get_client +def _encode_varlen(value: int) -> bytes: + result = [value & 0x7F] + value >>= 7 + while value: + result.append((value & 0x7F) | 0x80) + value >>= 7 + result.reverse() + return bytes(result) + + +def _build_midi(notes: list[dict], ticks_per_beat: int = 960) -> bytes: + events = [] + for n in notes: + ch = int(n.get("channel", 0)) & 0x0F + note = int(n["note"]) & 0x7F + vel = int(n.get("velocity", 100)) & 0x7F + start = round(float(n["start_beat"]) * ticks_per_beat) + end = round((float(n["start_beat"]) + float(n["duration_beats"])) * ticks_per_beat) + events.append((start, 0x90 | ch, note, vel)) + events.append((end, 0x80 | ch, note, 0)) + + events.sort(key=lambda e: e[0]) + + track_data = bytearray() + # 120 BPM = 500000 µs/beat + track_data += _encode_varlen(0) + bytes([0xFF, 0x51, 0x03, 0x07, 0xA1, 0x20]) + + current_tick = 0 + for tick, status, d1, d2 in events: + track_data += _encode_varlen(tick - current_tick) + bytes([status, d1, d2]) + current_tick = tick + + track_data += _encode_varlen(0) + bytes([0xFF, 0x2F, 0x00]) + + header = b"MThd" + struct.pack(">I", 6) + struct.pack(">HHH", 0, 1, ticks_per_beat) + track = b"MTrk" + struct.pack(">I", len(track_data)) + bytes(track_data) + return header + track + + def _v(state: tuple | None, default: Any = None) -> Any: return state[0] if state else default @@ -105,3 +146,43 @@ def register(mcp) -> None: "color": _v(c.get_state("/clip/color")), "pinned": _v(c.get_state("/clip/pinned")), } + + @mcp.tool() + def clip_insert_file(path: str, track: int = 0, clip: int = 0) -> dict: + """Insert a file (MIDI .mid or .bwclip) into a clip slot. + path: absolute path to the file on disk. + track/clip: 1-indexed slot address. Omit both to target the cursor clip (requires a slot selected in the UI). + """ + if track and clip: + address = f"/track/{track}/clip/{clip}/insertFile" + else: + address = "/clip/insertFile" + get_client().cmd(address, path) + return {"sent": address, "path": path} + + @mcp.tool() + def clip_write_midi(notes: list[dict], track: int = 0, clip: int = 0, length_beats: float = 4.0) -> dict: + """Generate a MIDI file from note data and insert it into a clip slot. + + Each note dict requires: note (int 0-127), start_beat (float), duration_beats (float). + Optional per note: velocity (int 1-127, default 100), channel (int 0-15, default 0). + track/clip: 1-indexed slot address. Omit both to target the cursor clip (requires a slot selected in Bitwig). + + Example — kick on every beat of a 4/4 bar into track 1 slot 1: + clip_write_midi(track=1, clip=1, notes=[ + {"note": 36, "start_beat": 0.0, "duration_beats": 0.25}, + {"note": 36, "start_beat": 1.0, "duration_beats": 0.25}, + {"note": 36, "start_beat": 2.0, "duration_beats": 0.25}, + {"note": 36, "start_beat": 3.0, "duration_beats": 0.25}, + ]) + """ + midi_bytes = _build_midi(notes) + with tempfile.NamedTemporaryFile(suffix=".mid", delete=False) as f: + f.write(midi_bytes) + path = f.name + if track and clip: + address = f"/track/{track}/clip/{clip}/insertFile" + else: + address = "/clip/insertFile" + get_client().cmd(address, path) + return {"sent": address, "path": path, "notes_written": len(notes)}