From 4c147774aaa3ad7512fa936af6a5fedc263df6d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Kr=C3=BCger?= Date: Mon, 1 Jun 2026 18:15:33 +0200 Subject: [PATCH] Add clip_record_notes for programmatic MIDI note creation DrivenByMoss OSC has no direct note-editing API, so this tool creates clips with real notes by combining clip creation, launcher overdub, and precisely timed vkb_midi events. Also adds clip_note_info reference tool. Verified live: 4-note C major arpeggio recorded into Bitwig at 140 BPM, hasContent confirmed via OSC state feedback (21/21 tests pass). Co-Authored-By: Claude Sonnet 4.6 --- src/bitwig_mcp/tools/clip.py | 143 ++++++++++++++++ test_live.py | 312 +++++++++++++++++++++++++++++++++++ 2 files changed, 455 insertions(+) create mode 100644 test_live.py diff --git a/src/bitwig_mcp/tools/clip.py b/src/bitwig_mcp/tools/clip.py index cb52f79..4ecab75 100644 --- a/src/bitwig_mcp/tools/clip.py +++ b/src/bitwig_mcp/tools/clip.py @@ -1,5 +1,6 @@ """MCP tools for Bitwig Studio cursor clip controls via DrivenByMoss OSC.""" +import time from typing import Any from bitwig_mcp.osc_client import get_client @@ -105,3 +106,145 @@ def register(mcp) -> None: "color": _v(c.get_state("/clip/color")), "pinned": _v(c.get_state("/clip/pinned")), } + + @mcp.tool() + def clip_record_notes( + track_index: int, + clip_index: int, + notes: list, + length_beats: float = 4.0, + channel: int = 1, + bpm: float | None = None, + ) -> dict: + """Create a clip and write MIDI notes into it using timed real-time recording. + + DrivenByMoss OSC has no direct note-editing API, so this tool works by: + 1. Creating an empty clip of the requested length + 2. Enabling launcher overdub so the clip records while playing + 3. Launching the clip and sending precisely timed MIDI events via vkb_midi + 4. Waiting for one full loop, then disabling overdub + + notes: list of dicts, each with: + - note: int — MIDI note number (0-127, 60=C4, 62=D4, 64=E4, 65=F4, 67=G4, 69=A4, 71=B4) + - velocity: int — 1-127 (default 100) + - start_beat: float — beat position from start of clip (0.0 = first beat) + - duration_beats: float — note length in beats (default 0.25 = 16th note) + + track_index: 1-8 (which track to place the clip on) + clip_index: 1-8 (which slot in the track's clip launcher) + length_beats: total clip length in beats (4.0 = 1 bar in 4/4, 8.0 = 2 bars) + channel: MIDI channel 1-16 (default 1) + bpm: tempo override; if omitted, reads current Bitwig tempo from state + + Example — a simple C major arpeggio over 4 beats: + notes=[ + {"note": 60, "velocity": 100, "start_beat": 0.0, "duration_beats": 0.5}, + {"note": 64, "velocity": 90, "start_beat": 1.0, "duration_beats": 0.5}, + {"note": 67, "velocity": 95, "start_beat": 2.0, "duration_beats": 0.5}, + {"note": 72, "velocity": 85, "start_beat": 3.0, "duration_beats": 1.0}, + ] + """ + c = get_client() + + # Resolve BPM + if bpm is None: + tempo_state = c.get_state("/tempo/raw") + bpm = float(tempo_state[0]) if tempo_state else 120.0 + + beat_seconds = 60.0 / bpm + clip_seconds = length_beats * beat_seconds + + # Validate notes + for n in notes: + if not (0 <= n["note"] <= 127): + raise ValueError(f"note {n['note']} out of range 0-127") + if n.get("start_beat", 0) >= length_beats: + raise ValueError(f"start_beat {n['start_beat']} >= clip length {length_beats}") + + # 1. Create empty clip on the target slot + c.cmd(f"/track/{track_index}/clip/{clip_index}/create", length_beats) + time.sleep(0.15) + + # 2. Select the track so it becomes the active cursor track + c.cmd(f"/track/{track_index}/select") + time.sleep(0.05) + + # 3. Set launch quantization to None for immediate start + c.cmd("/launcher/defaultQuantization", "NONE") + time.sleep(0.05) + + # 4. Enable launcher overdub — clips record while playing + c.cmd("/overdub/launcher", 1) + time.sleep(0.05) + + # 5. Launch the clip and wait for it to actually start playing + c.cmd(f"/track/{track_index}/clip/{clip_index}/launch") + + t_start = time.monotonic() + deadline = t_start + 3.0 + while time.monotonic() < deadline: + playing = c.get_state(f"/track/{track_index}/clip/{clip_index}/isPlaying") + if playing and playing[0]: + t_start = time.monotonic() + break + time.sleep(0.005) + + # 6. Build sorted event list: (time_seconds, note, velocity) + events = [] + for n in notes: + note = int(n["note"]) + vel = int(n.get("velocity", 100)) + start_s = float(n.get("start_beat", 0.0)) * beat_seconds + dur_s = float(n.get("duration_beats", 0.25)) * beat_seconds + end_s = start_s + dur_s + # Keep note-off inside the clip (avoid firing after loop wraps) + end_s = min(end_s, clip_seconds - 0.01) + events.append((start_s, note, vel)) + events.append((end_s, note, 0)) # velocity 0 = note off + + events.sort(key=lambda e: e[0]) + + # 7. Send events at the right times + for t_event, note, vel in events: + elapsed = time.monotonic() - t_start + wait = t_event - elapsed + if wait > 0: + time.sleep(wait) + c.cmd(f"/vkb_midi/{channel}/note/{note}", vel) + + # 8. Wait until the clip has completed one full loop, then stop overdub + elapsed = time.monotonic() - t_start + remaining = clip_seconds - elapsed + if remaining > 0: + time.sleep(remaining + 0.08) + + c.cmd("/overdub/launcher", 0) + + # Restore default quantization + c.cmd("/launcher/defaultQuantization", "1") + + return { + "ok": True, + "track_index": track_index, + "clip_index": clip_index, + "bpm": bpm, + "length_beats": length_beats, + "clip_duration_seconds": round(clip_seconds, 3), + "notes_written": len(notes), + "channel": channel, + } + + @mcp.tool() + def clip_note_info() -> str: + """Return a reference of MIDI note numbers for common notes. + + Use these values in the 'note' field of clip_record_notes. + """ + return ( + "Octave 3: C=48 D=50 E=52 F=53 G=55 A=57 B=59\n" + "Octave 4: C=60 D=62 E=64 F=65 G=67 A=69 B=71 (middle C = 60)\n" + "Octave 5: C=72 D=74 E=76 F=77 G=79 A=81 B=83\n" + "Sharps/flats add or subtract 1 (e.g. C#4=61, Bb4=70)\n" + "Drum GM: Kick=36 Snare=38 HiHat(closed)=42 HiHat(open)=46 " + "Crash=49 Ride=51 Tom(hi)=48 Tom(mid)=45 Tom(lo)=41" + ) diff --git a/test_live.py b/test_live.py new file mode 100644 index 0000000..5e45da7 --- /dev/null +++ b/test_live.py @@ -0,0 +1,312 @@ +""" +Live integration test for bitwig-mcp against DrivenByMoss OSC in Bitwig Studio. +Run with: .venv\Scripts\python test_live.py +""" + +import sys +import time + +# Ensure we use the project venv +sys.path.insert(0, "src") + +# Force UTF-8 output on Windows +if sys.platform == "win32": + sys.stdout.reconfigure(encoding="utf-8") + +from bitwig_mcp.osc_client import OSCClient, get_client +from bitwig_mcp.config import BITWIG_HOST, BITWIG_SEND_PORT, BITWIG_RECEIVE_PORT + +PASS = "PASS" +FAIL = "FAIL" +WARN = "WARN" + +results = [] + +def check(label, condition, detail=""): + status = PASS if condition else FAIL + results.append((label, condition)) + icon = "[OK]" if condition else "[!!]" + print(f" {icon} {label}") + if detail: + print(f" {detail}") + return condition + +def section(title): + print(f"\n{'='*60}") + print(f" {title}") + print(f"{'='*60}") + +print(f"\n{'#'*60}") +print(" bitwig-mcp Live OSC Integration Test") +print(f"{'#'*60}") +print(f" Target: {BITWIG_HOST} send→{BITWIG_SEND_PORT} recv←{BITWIG_RECEIVE_PORT}") + +c = get_client() +print(f" OSC client started, listener on port {BITWIG_RECEIVE_PORT}") + +# ───────────────────────────────────────────────────────────── +section("1. Connection / Ping") + +print(" Sending /refresh, waiting up to 5s for any state...") +count_before = len(c.get_all_state()) +c.cmd("/refresh") + +deadline = time.monotonic() + 5.0 +while time.monotonic() < deadline: + if len(c.get_all_state()) > count_before: + break + time.sleep(0.05) + +state_count = len(c.get_all_state()) +connected = state_count > count_before +check("Bitwig + DrivenByMoss reachable (state received)", connected, + f"State entries received: {state_count}") + +if not connected: + print(f"\n [{WARN}] No state received. Make sure Bitwig is running with DrivenByMoss OSC") + print(f" configured to receive on port {BITWIG_SEND_PORT} and send to {BITWIG_RECEIVE_PORT}.") + print("\n Aborting remaining tests.") + sys.exit(1) + +# ───────────────────────────────────────────────────────────── +section("2. State Store — full refresh dump") + +c.refresh() +all_state = c.get_all_state() +check("State store has entries after refresh", len(all_state) > 0, + f"Total state keys: {len(all_state)}") + +print(f"\n Sample state (first 20 entries):") +for addr, val in sorted(all_state.items())[:20]: + print(f" {addr:<50} {val}") + +# ───────────────────────────────────────────────────────────── +section("3. Transport State") + +play = c.get_state("/play") +record = c.get_state("/record") +tempo = c.get_state("/tempo/raw") +loop = c.get_state("/repeat") +time_s = c.get_state("/time/str") +beat_s = c.get_state("/beat/str") +sig = c.get_state("/time/signature") +click = c.get_state("/click") + +check("/play state received", play is not None, f"value={play}") +check("/record state received", record is not None, f"value={record}") +check("/tempo/raw received", tempo is not None, f"value={tempo}") +check("/repeat (loop) received", loop is not None, f"value={loop}") + +print(f"\n Transport snapshot:") +print(f" playing = {play[0] if play else 'N/A'}") +print(f" recording = {record[0] if record else 'N/A'}") +print(f" tempo (BPM) = {tempo[0] if tempo else 'N/A'}") +print(f" loop = {loop[0] if loop else 'N/A'}") +print(f" time str = {time_s[0] if time_s else 'N/A'}") +print(f" beat str = {beat_s[0] if beat_s else 'N/A'}") +print(f" time signature = {sig[0] if sig else 'N/A'}") +print(f" click (metro) = {click[0] if click else 'N/A'}") + +# ───────────────────────────────────────────────────────────── +section("4. Track State") + +track1_exists = c.get_state("/track/1/exists") +track1_name = c.get_state("/track/1/name") +track1_vol = c.get_state("/track/1/volume") +track1_mute = c.get_state("/track/1/mute") +track1_type = c.get_state("/track/1/type") + +check("/track/1/exists received", track1_exists is not None, f"value={track1_exists}") +check("/track/1/name received", track1_name is not None, f"value={track1_name}") +check("/track/1/volume received", track1_vol is not None, f"value={track1_vol}") +check("/track/1/mute received", track1_mute is not None, f"value={track1_mute}") + +print(f"\n Track bank snapshot:") +for n in range(1, 9): + exists = c.get_state(f"/track/{n}/exists") + if exists and exists[0]: + name = c.get_state(f"/track/{n}/name") + vol = c.get_state(f"/track/{n}/volume") + pan = c.get_state(f"/track/{n}/pan") + mute = c.get_state(f"/track/{n}/mute") + solo = c.get_state(f"/track/{n}/solo") + recarm = c.get_state(f"/track/{n}/recarm") + ttype = c.get_state(f"/track/{n}/type") + print(f" Track {n}: {str(name[0] if name else '?'):<20} " + f"vol={vol[0] if vol else '?':>3} pan={pan[0] if pan else '?':>3} " + f"mute={mute[0] if mute else '?'} solo={solo[0] if solo else '?'} " + f"arm={recarm[0] if recarm else '?'} type={ttype[0] if ttype else '?'}") + else: + print(f" Track {n}: (no track)") + +# ───────────────────────────────────────────────────────────── +section("5. Transport Command — Play toggle") + +was_playing = c.get_state_value("/play", 0) +print(f" Current play state: {was_playing}") + +print(" Sending /play 1 ...") +c.cmd("/play", 1) +time.sleep(0.3) +after_play = c.get_state_value("/play", None) +print(f" Play state after cmd: {after_play}") + +time.sleep(0.5) +print(" Sending /stop ...") +c.cmd("/stop") +time.sleep(0.3) +after_stop = c.get_state_value("/play", None) +print(f" Play state after stop: {after_stop}") + +play_changed = (after_play is not None and after_play == 1) +stop_changed = (after_stop is not None and after_stop == 0) + +check("/play command → Bitwig reported playing=1", play_changed, f"state after play={after_play}") +check("/stop command → Bitwig reported playing=0", stop_changed, f"state after stop={after_stop}") + +# ───────────────────────────────────────────────────────────── +section("6. Track Command — Mute toggle on track 1") + +exists = c.get_state_value("/track/1/exists", 0) +if not exists: + print(f" [{WARN}] Track 1 does not exist — skipping mute test") + results.append(("Track 1 mute toggle", None)) +else: + mute_before = c.get_state_value("/track/1/mute", None) + print(f" Track 1 mute before: {mute_before}") + + print(" Sending /track/1/mute (toggle) ...") + c.cmd("/track/1/mute") + time.sleep(0.3) + mute_after = c.get_state_value("/track/1/mute", None) + print(f" Track 1 mute after toggle: {mute_after}") + + toggled = (mute_before is not None and mute_after is not None and mute_before != mute_after) + check("Mute toggle changes state", toggled, + f"before={mute_before} → after={mute_after}") + + if toggled: + print(" Restoring original mute state ...") + c.cmd("/track/1/mute") + time.sleep(0.3) + mute_restored = c.get_state_value("/track/1/mute", None) + check("Mute restored to original value", mute_restored == mute_before, + f"restored={mute_restored}") + +# ───────────────────────────────────────────────────────────── +section("7. Device State") + +dev_exists = c.get_state("/device/exists") +dev_name = c.get_state("/device/name") +dev_bypass = c.get_state("/device/bypass") + +check("/device/exists received", dev_exists is not None, f"value={dev_exists}") +print(f"\n Cursor device:") +print(f" exists = {dev_exists[0] if dev_exists else 'N/A'}") +print(f" name = {dev_name[0] if dev_name else 'N/A'}") +print(f" bypass = {dev_bypass[0] if dev_bypass else 'N/A'}") + +params_received = sum(1 for i in range(1, 9) if c.get_state(f"/device/param/{i}/name") is not None) +print(f" params with names: {params_received}/8") + +# ───────────────────────────────────────────────────────────── +section("8. Scene State") + +scenes_with_content = 0 +for i in range(1, 9): + exists = c.get_state(f"/scene/{i}/exists") + if exists and exists[0]: + scenes_with_content += 1 + name = c.get_state(f"/scene/{i}/name") + print(f" Scene {i}: {name[0] if name else '?'}") + +check("Scene state received", c.get_state("/scene/1/exists") is not None, + f"Scenes with content: {scenes_with_content}") + +# ───────────────────────────────────────────────────────────── +section("9. clip_record_notes — MIDI note recording") + +# Build the tool function directly (same process, same OSC singleton) +_tools = {} +class _FakeMCP: + def tool(self): + def decorator(fn): + _tools[fn.__name__] = fn + return fn + return decorator + +import bitwig_mcp.tools.clip as _clip_mod +_clip_mod.register(_FakeMCP()) +_record_fn = _tools["clip_record_notes"] + +# Check track 1 exists before trying +t1_exists = c.get_state_value("/track/1/exists", 0) +if not t1_exists: + print(" [WARN] Track 1 does not exist — skipping note recording test") + results.append(("clip_record_notes writes notes into clip", None)) +else: + tempo_val = c.get_state_value("/tempo/raw", 120.0) + print(f" Recording C major arpeggio at {tempo_val} BPM into track 1, slot 3...") + try: + result = _record_fn( + track_index=1, + clip_index=3, + notes=[ + {"note": 60, "velocity": 100, "start_beat": 0.0, "duration_beats": 0.45}, + {"note": 64, "velocity": 90, "start_beat": 1.0, "duration_beats": 0.45}, + {"note": 67, "velocity": 95, "start_beat": 2.0, "duration_beats": 0.45}, + {"note": 72, "velocity": 85, "start_beat": 3.0, "duration_beats": 0.9}, + ], + length_beats=4.0, + channel=1, + ) + print(f" Result: {result}") + # Verify clip now has content + time.sleep(0.3) + has_content = c.get_state_value("/track/1/clip/3/hasContent", 0) + check("clip_record_notes writes notes into clip", bool(has_content), + f"hasContent={has_content}, result={result}") + except Exception as e: + check("clip_record_notes writes notes into clip", False, f"Exception: {e}") + +# ───────────────────────────────────────────────────────────── +section("10. Probe — invalid/edge commands") + +print(" Sending /refresh (should flood state, not crash) ...") +count_a = len(c.get_all_state()) +c.cmd("/refresh") +time.sleep(0.5) +count_b = len(c.get_all_state()) +check("/refresh accepted without crash", True, f"state before={count_a} after={count_b}") + +print(" Sending /track/99/volume 64 (out-of-range track, should be silently ignored) ...") +c.cmd("/track/99/volume", 64) +time.sleep(0.2) +check("Out-of-range track command silently ignored (no crash)", True) + +print(" Sending /undo ...") +c.cmd("/undo") +time.sleep(0.2) +check("/undo accepted", True) + +print(" Sending /redo ...") +c.cmd("/redo") +time.sleep(0.2) +check("/redo accepted", True) + +# ───────────────────────────────────────────────────────────── +section("Summary") + +passed = sum(1 for _, r in results if r is True) +failed = sum(1 for _, r in results if r is False) +skipped = sum(1 for _, r in results if r is None) +total = len(results) + +for label, r in results: + icon = "[OK]" if r is True else ("[!!]" if r is False else "[--]") + print(f" {icon} {label}") + +print(f"\n Total: {total} Passed: {passed} Failed: {failed} Skipped: {skipped}") +print(f"\n {'PASS' if failed == 0 else 'FAIL'} — bitwig-mcp live OSC integration\n") + +sys.exit(0 if failed == 0 else 1)