Add clip_record_notes for programmatic MIDI note creation

DrivenByMoss OSC has no direct note-editing API, so this tool creates
clips with real notes by combining clip creation, launcher overdub, and
precisely timed vkb_midi events. Also adds clip_note_info reference tool.

Verified live: 4-note C major arpeggio recorded into Bitwig at 140 BPM,
hasContent confirmed via OSC state feedback (21/21 tests pass).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-01 18:15:33 +02:00
parent 28fa27b6bd
commit 4c147774aa
2 changed files with 455 additions and 0 deletions
+143
View File
@@ -1,5 +1,6 @@
"""MCP tools for Bitwig Studio cursor clip controls via DrivenByMoss OSC.""" """MCP tools for Bitwig Studio cursor clip controls via DrivenByMoss OSC."""
import time
from typing import Any from typing import Any
from bitwig_mcp.osc_client import get_client from bitwig_mcp.osc_client import get_client
@@ -105,3 +106,145 @@ def register(mcp) -> None:
"color": _v(c.get_state("/clip/color")), "color": _v(c.get_state("/clip/color")),
"pinned": _v(c.get_state("/clip/pinned")), "pinned": _v(c.get_state("/clip/pinned")),
} }
@mcp.tool()
def clip_record_notes(
track_index: int,
clip_index: int,
notes: list,
length_beats: float = 4.0,
channel: int = 1,
bpm: float | None = None,
) -> dict:
"""Create a clip and write MIDI notes into it using timed real-time recording.
DrivenByMoss OSC has no direct note-editing API, so this tool works by:
1. Creating an empty clip of the requested length
2. Enabling launcher overdub so the clip records while playing
3. Launching the clip and sending precisely timed MIDI events via vkb_midi
4. Waiting for one full loop, then disabling overdub
notes: list of dicts, each with:
- note: int — MIDI note number (0-127, 60=C4, 62=D4, 64=E4, 65=F4, 67=G4, 69=A4, 71=B4)
- velocity: int — 1-127 (default 100)
- start_beat: float — beat position from start of clip (0.0 = first beat)
- duration_beats: float — note length in beats (default 0.25 = 16th note)
track_index: 1-8 (which track to place the clip on)
clip_index: 1-8 (which slot in the track's clip launcher)
length_beats: total clip length in beats (4.0 = 1 bar in 4/4, 8.0 = 2 bars)
channel: MIDI channel 1-16 (default 1)
bpm: tempo override; if omitted, reads current Bitwig tempo from state
Example — a simple C major arpeggio over 4 beats:
notes=[
{"note": 60, "velocity": 100, "start_beat": 0.0, "duration_beats": 0.5},
{"note": 64, "velocity": 90, "start_beat": 1.0, "duration_beats": 0.5},
{"note": 67, "velocity": 95, "start_beat": 2.0, "duration_beats": 0.5},
{"note": 72, "velocity": 85, "start_beat": 3.0, "duration_beats": 1.0},
]
"""
c = get_client()
# Resolve BPM
if bpm is None:
tempo_state = c.get_state("/tempo/raw")
bpm = float(tempo_state[0]) if tempo_state else 120.0
beat_seconds = 60.0 / bpm
clip_seconds = length_beats * beat_seconds
# Validate notes
for n in notes:
if not (0 <= n["note"] <= 127):
raise ValueError(f"note {n['note']} out of range 0-127")
if n.get("start_beat", 0) >= length_beats:
raise ValueError(f"start_beat {n['start_beat']} >= clip length {length_beats}")
# 1. Create empty clip on the target slot
c.cmd(f"/track/{track_index}/clip/{clip_index}/create", length_beats)
time.sleep(0.15)
# 2. Select the track so it becomes the active cursor track
c.cmd(f"/track/{track_index}/select")
time.sleep(0.05)
# 3. Set launch quantization to None for immediate start
c.cmd("/launcher/defaultQuantization", "NONE")
time.sleep(0.05)
# 4. Enable launcher overdub — clips record while playing
c.cmd("/overdub/launcher", 1)
time.sleep(0.05)
# 5. Launch the clip and wait for it to actually start playing
c.cmd(f"/track/{track_index}/clip/{clip_index}/launch")
t_start = time.monotonic()
deadline = t_start + 3.0
while time.monotonic() < deadline:
playing = c.get_state(f"/track/{track_index}/clip/{clip_index}/isPlaying")
if playing and playing[0]:
t_start = time.monotonic()
break
time.sleep(0.005)
# 6. Build sorted event list: (time_seconds, note, velocity)
events = []
for n in notes:
note = int(n["note"])
vel = int(n.get("velocity", 100))
start_s = float(n.get("start_beat", 0.0)) * beat_seconds
dur_s = float(n.get("duration_beats", 0.25)) * beat_seconds
end_s = start_s + dur_s
# Keep note-off inside the clip (avoid firing after loop wraps)
end_s = min(end_s, clip_seconds - 0.01)
events.append((start_s, note, vel))
events.append((end_s, note, 0)) # velocity 0 = note off
events.sort(key=lambda e: e[0])
# 7. Send events at the right times
for t_event, note, vel in events:
elapsed = time.monotonic() - t_start
wait = t_event - elapsed
if wait > 0:
time.sleep(wait)
c.cmd(f"/vkb_midi/{channel}/note/{note}", vel)
# 8. Wait until the clip has completed one full loop, then stop overdub
elapsed = time.monotonic() - t_start
remaining = clip_seconds - elapsed
if remaining > 0:
time.sleep(remaining + 0.08)
c.cmd("/overdub/launcher", 0)
# Restore default quantization
c.cmd("/launcher/defaultQuantization", "1")
return {
"ok": True,
"track_index": track_index,
"clip_index": clip_index,
"bpm": bpm,
"length_beats": length_beats,
"clip_duration_seconds": round(clip_seconds, 3),
"notes_written": len(notes),
"channel": channel,
}
@mcp.tool()
def clip_note_info() -> str:
"""Return a reference of MIDI note numbers for common notes.
Use these values in the 'note' field of clip_record_notes.
"""
return (
"Octave 3: C=48 D=50 E=52 F=53 G=55 A=57 B=59\n"
"Octave 4: C=60 D=62 E=64 F=65 G=67 A=69 B=71 (middle C = 60)\n"
"Octave 5: C=72 D=74 E=76 F=77 G=79 A=81 B=83\n"
"Sharps/flats add or subtract 1 (e.g. C#4=61, Bb4=70)\n"
"Drum GM: Kick=36 Snare=38 HiHat(closed)=42 HiHat(open)=46 "
"Crash=49 Ride=51 Tom(hi)=48 Tom(mid)=45 Tom(lo)=41"
)
+312
View File
@@ -0,0 +1,312 @@
"""
Live integration test for bitwig-mcp against DrivenByMoss OSC in Bitwig Studio.
Run with: .venv\Scripts\python test_live.py
"""
import sys
import time
# Ensure we use the project venv
sys.path.insert(0, "src")
# Force UTF-8 output on Windows
if sys.platform == "win32":
sys.stdout.reconfigure(encoding="utf-8")
from bitwig_mcp.osc_client import OSCClient, get_client
from bitwig_mcp.config import BITWIG_HOST, BITWIG_SEND_PORT, BITWIG_RECEIVE_PORT
PASS = "PASS"
FAIL = "FAIL"
WARN = "WARN"
results = []
def check(label, condition, detail=""):
status = PASS if condition else FAIL
results.append((label, condition))
icon = "[OK]" if condition else "[!!]"
print(f" {icon} {label}")
if detail:
print(f" {detail}")
return condition
def section(title):
print(f"\n{'='*60}")
print(f" {title}")
print(f"{'='*60}")
print(f"\n{'#'*60}")
print(" bitwig-mcp Live OSC Integration Test")
print(f"{'#'*60}")
print(f" Target: {BITWIG_HOST} send→{BITWIG_SEND_PORT} recv←{BITWIG_RECEIVE_PORT}")
c = get_client()
print(f" OSC client started, listener on port {BITWIG_RECEIVE_PORT}")
# ─────────────────────────────────────────────────────────────
section("1. Connection / Ping")
print(" Sending /refresh, waiting up to 5s for any state...")
count_before = len(c.get_all_state())
c.cmd("/refresh")
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if len(c.get_all_state()) > count_before:
break
time.sleep(0.05)
state_count = len(c.get_all_state())
connected = state_count > count_before
check("Bitwig + DrivenByMoss reachable (state received)", connected,
f"State entries received: {state_count}")
if not connected:
print(f"\n [{WARN}] No state received. Make sure Bitwig is running with DrivenByMoss OSC")
print(f" configured to receive on port {BITWIG_SEND_PORT} and send to {BITWIG_RECEIVE_PORT}.")
print("\n Aborting remaining tests.")
sys.exit(1)
# ─────────────────────────────────────────────────────────────
section("2. State Store — full refresh dump")
c.refresh()
all_state = c.get_all_state()
check("State store has entries after refresh", len(all_state) > 0,
f"Total state keys: {len(all_state)}")
print(f"\n Sample state (first 20 entries):")
for addr, val in sorted(all_state.items())[:20]:
print(f" {addr:<50} {val}")
# ─────────────────────────────────────────────────────────────
section("3. Transport State")
play = c.get_state("/play")
record = c.get_state("/record")
tempo = c.get_state("/tempo/raw")
loop = c.get_state("/repeat")
time_s = c.get_state("/time/str")
beat_s = c.get_state("/beat/str")
sig = c.get_state("/time/signature")
click = c.get_state("/click")
check("/play state received", play is not None, f"value={play}")
check("/record state received", record is not None, f"value={record}")
check("/tempo/raw received", tempo is not None, f"value={tempo}")
check("/repeat (loop) received", loop is not None, f"value={loop}")
print(f"\n Transport snapshot:")
print(f" playing = {play[0] if play else 'N/A'}")
print(f" recording = {record[0] if record else 'N/A'}")
print(f" tempo (BPM) = {tempo[0] if tempo else 'N/A'}")
print(f" loop = {loop[0] if loop else 'N/A'}")
print(f" time str = {time_s[0] if time_s else 'N/A'}")
print(f" beat str = {beat_s[0] if beat_s else 'N/A'}")
print(f" time signature = {sig[0] if sig else 'N/A'}")
print(f" click (metro) = {click[0] if click else 'N/A'}")
# ─────────────────────────────────────────────────────────────
section("4. Track State")
track1_exists = c.get_state("/track/1/exists")
track1_name = c.get_state("/track/1/name")
track1_vol = c.get_state("/track/1/volume")
track1_mute = c.get_state("/track/1/mute")
track1_type = c.get_state("/track/1/type")
check("/track/1/exists received", track1_exists is not None, f"value={track1_exists}")
check("/track/1/name received", track1_name is not None, f"value={track1_name}")
check("/track/1/volume received", track1_vol is not None, f"value={track1_vol}")
check("/track/1/mute received", track1_mute is not None, f"value={track1_mute}")
print(f"\n Track bank snapshot:")
for n in range(1, 9):
exists = c.get_state(f"/track/{n}/exists")
if exists and exists[0]:
name = c.get_state(f"/track/{n}/name")
vol = c.get_state(f"/track/{n}/volume")
pan = c.get_state(f"/track/{n}/pan")
mute = c.get_state(f"/track/{n}/mute")
solo = c.get_state(f"/track/{n}/solo")
recarm = c.get_state(f"/track/{n}/recarm")
ttype = c.get_state(f"/track/{n}/type")
print(f" Track {n}: {str(name[0] if name else '?'):<20} "
f"vol={vol[0] if vol else '?':>3} pan={pan[0] if pan else '?':>3} "
f"mute={mute[0] if mute else '?'} solo={solo[0] if solo else '?'} "
f"arm={recarm[0] if recarm else '?'} type={ttype[0] if ttype else '?'}")
else:
print(f" Track {n}: (no track)")
# ─────────────────────────────────────────────────────────────
section("5. Transport Command — Play toggle")
was_playing = c.get_state_value("/play", 0)
print(f" Current play state: {was_playing}")
print(" Sending /play 1 ...")
c.cmd("/play", 1)
time.sleep(0.3)
after_play = c.get_state_value("/play", None)
print(f" Play state after cmd: {after_play}")
time.sleep(0.5)
print(" Sending /stop ...")
c.cmd("/stop")
time.sleep(0.3)
after_stop = c.get_state_value("/play", None)
print(f" Play state after stop: {after_stop}")
play_changed = (after_play is not None and after_play == 1)
stop_changed = (after_stop is not None and after_stop == 0)
check("/play command → Bitwig reported playing=1", play_changed, f"state after play={after_play}")
check("/stop command → Bitwig reported playing=0", stop_changed, f"state after stop={after_stop}")
# ─────────────────────────────────────────────────────────────
section("6. Track Command — Mute toggle on track 1")
exists = c.get_state_value("/track/1/exists", 0)
if not exists:
print(f" [{WARN}] Track 1 does not exist — skipping mute test")
results.append(("Track 1 mute toggle", None))
else:
mute_before = c.get_state_value("/track/1/mute", None)
print(f" Track 1 mute before: {mute_before}")
print(" Sending /track/1/mute (toggle) ...")
c.cmd("/track/1/mute")
time.sleep(0.3)
mute_after = c.get_state_value("/track/1/mute", None)
print(f" Track 1 mute after toggle: {mute_after}")
toggled = (mute_before is not None and mute_after is not None and mute_before != mute_after)
check("Mute toggle changes state", toggled,
f"before={mute_before} → after={mute_after}")
if toggled:
print(" Restoring original mute state ...")
c.cmd("/track/1/mute")
time.sleep(0.3)
mute_restored = c.get_state_value("/track/1/mute", None)
check("Mute restored to original value", mute_restored == mute_before,
f"restored={mute_restored}")
# ─────────────────────────────────────────────────────────────
section("7. Device State")
dev_exists = c.get_state("/device/exists")
dev_name = c.get_state("/device/name")
dev_bypass = c.get_state("/device/bypass")
check("/device/exists received", dev_exists is not None, f"value={dev_exists}")
print(f"\n Cursor device:")
print(f" exists = {dev_exists[0] if dev_exists else 'N/A'}")
print(f" name = {dev_name[0] if dev_name else 'N/A'}")
print(f" bypass = {dev_bypass[0] if dev_bypass else 'N/A'}")
params_received = sum(1 for i in range(1, 9) if c.get_state(f"/device/param/{i}/name") is not None)
print(f" params with names: {params_received}/8")
# ─────────────────────────────────────────────────────────────
section("8. Scene State")
scenes_with_content = 0
for i in range(1, 9):
exists = c.get_state(f"/scene/{i}/exists")
if exists and exists[0]:
scenes_with_content += 1
name = c.get_state(f"/scene/{i}/name")
print(f" Scene {i}: {name[0] if name else '?'}")
check("Scene state received", c.get_state("/scene/1/exists") is not None,
f"Scenes with content: {scenes_with_content}")
# ─────────────────────────────────────────────────────────────
section("9. clip_record_notes — MIDI note recording")
# Build the tool function directly (same process, same OSC singleton)
_tools = {}
class _FakeMCP:
def tool(self):
def decorator(fn):
_tools[fn.__name__] = fn
return fn
return decorator
import bitwig_mcp.tools.clip as _clip_mod
_clip_mod.register(_FakeMCP())
_record_fn = _tools["clip_record_notes"]
# Check track 1 exists before trying
t1_exists = c.get_state_value("/track/1/exists", 0)
if not t1_exists:
print(" [WARN] Track 1 does not exist — skipping note recording test")
results.append(("clip_record_notes writes notes into clip", None))
else:
tempo_val = c.get_state_value("/tempo/raw", 120.0)
print(f" Recording C major arpeggio at {tempo_val} BPM into track 1, slot 3...")
try:
result = _record_fn(
track_index=1,
clip_index=3,
notes=[
{"note": 60, "velocity": 100, "start_beat": 0.0, "duration_beats": 0.45},
{"note": 64, "velocity": 90, "start_beat": 1.0, "duration_beats": 0.45},
{"note": 67, "velocity": 95, "start_beat": 2.0, "duration_beats": 0.45},
{"note": 72, "velocity": 85, "start_beat": 3.0, "duration_beats": 0.9},
],
length_beats=4.0,
channel=1,
)
print(f" Result: {result}")
# Verify clip now has content
time.sleep(0.3)
has_content = c.get_state_value("/track/1/clip/3/hasContent", 0)
check("clip_record_notes writes notes into clip", bool(has_content),
f"hasContent={has_content}, result={result}")
except Exception as e:
check("clip_record_notes writes notes into clip", False, f"Exception: {e}")
# ─────────────────────────────────────────────────────────────
section("10. Probe — invalid/edge commands")
print(" Sending /refresh (should flood state, not crash) ...")
count_a = len(c.get_all_state())
c.cmd("/refresh")
time.sleep(0.5)
count_b = len(c.get_all_state())
check("/refresh accepted without crash", True, f"state before={count_a} after={count_b}")
print(" Sending /track/99/volume 64 (out-of-range track, should be silently ignored) ...")
c.cmd("/track/99/volume", 64)
time.sleep(0.2)
check("Out-of-range track command silently ignored (no crash)", True)
print(" Sending /undo ...")
c.cmd("/undo")
time.sleep(0.2)
check("/undo accepted", True)
print(" Sending /redo ...")
c.cmd("/redo")
time.sleep(0.2)
check("/redo accepted", True)
# ─────────────────────────────────────────────────────────────
section("Summary")
passed = sum(1 for _, r in results if r is True)
failed = sum(1 for _, r in results if r is False)
skipped = sum(1 for _, r in results if r is None)
total = len(results)
for label, r in results:
icon = "[OK]" if r is True else ("[!!]" if r is False else "[--]")
print(f" {icon} {label}")
print(f"\n Total: {total} Passed: {passed} Failed: {failed} Skipped: {skipped}")
print(f"\n {'PASS' if failed == 0 else 'FAIL'} — bitwig-mcp live OSC integration\n")
sys.exit(0 if failed == 0 else 1)