""" Live integration test for bitwig-mcp against DrivenByMoss OSC in Bitwig Studio. Run with: .venv\Scripts\python test_live.py """ import sys import time # Ensure we use the project venv sys.path.insert(0, "src") # Force UTF-8 output on Windows if sys.platform == "win32": sys.stdout.reconfigure(encoding="utf-8") from bitwig_mcp.osc_client import OSCClient, get_client from bitwig_mcp.config import BITWIG_HOST, BITWIG_SEND_PORT, BITWIG_RECEIVE_PORT PASS = "PASS" FAIL = "FAIL" WARN = "WARN" results = [] def check(label, condition, detail=""): status = PASS if condition else FAIL results.append((label, condition)) icon = "[OK]" if condition else "[!!]" print(f" {icon} {label}") if detail: print(f" {detail}") return condition def section(title): print(f"\n{'='*60}") print(f" {title}") print(f"{'='*60}") print(f"\n{'#'*60}") print(" bitwig-mcp Live OSC Integration Test") print(f"{'#'*60}") print(f" Target: {BITWIG_HOST} send→{BITWIG_SEND_PORT} recv←{BITWIG_RECEIVE_PORT}") c = get_client() print(f" OSC client started, listener on port {BITWIG_RECEIVE_PORT}") # ───────────────────────────────────────────────────────────── section("1. Connection / Ping") print(" Sending /refresh, waiting up to 5s for any state...") count_before = len(c.get_all_state()) c.cmd("/refresh") deadline = time.monotonic() + 5.0 while time.monotonic() < deadline: if len(c.get_all_state()) > count_before: break time.sleep(0.05) state_count = len(c.get_all_state()) connected = state_count > count_before check("Bitwig + DrivenByMoss reachable (state received)", connected, f"State entries received: {state_count}") if not connected: print(f"\n [{WARN}] No state received. Make sure Bitwig is running with DrivenByMoss OSC") print(f" configured to receive on port {BITWIG_SEND_PORT} and send to {BITWIG_RECEIVE_PORT}.") print("\n Aborting remaining tests.") sys.exit(1) # ───────────────────────────────────────────────────────────── section("2. State Store — full refresh dump") c.refresh() all_state = c.get_all_state() check("State store has entries after refresh", len(all_state) > 0, f"Total state keys: {len(all_state)}") print(f"\n Sample state (first 20 entries):") for addr, val in sorted(all_state.items())[:20]: print(f" {addr:<50} {val}") # ───────────────────────────────────────────────────────────── section("3. Transport State") play = c.get_state("/play") record = c.get_state("/record") tempo = c.get_state("/tempo/raw") loop = c.get_state("/repeat") time_s = c.get_state("/time/str") beat_s = c.get_state("/beat/str") sig = c.get_state("/time/signature") click = c.get_state("/click") check("/play state received", play is not None, f"value={play}") check("/record state received", record is not None, f"value={record}") check("/tempo/raw received", tempo is not None, f"value={tempo}") check("/repeat (loop) received", loop is not None, f"value={loop}") print(f"\n Transport snapshot:") print(f" playing = {play[0] if play else 'N/A'}") print(f" recording = {record[0] if record else 'N/A'}") print(f" tempo (BPM) = {tempo[0] if tempo else 'N/A'}") print(f" loop = {loop[0] if loop else 'N/A'}") print(f" time str = {time_s[0] if time_s else 'N/A'}") print(f" beat str = {beat_s[0] if beat_s else 'N/A'}") print(f" time signature = {sig[0] if sig else 'N/A'}") print(f" click (metro) = {click[0] if click else 'N/A'}") # ───────────────────────────────────────────────────────────── section("4. Track State") track1_exists = c.get_state("/track/1/exists") track1_name = c.get_state("/track/1/name") track1_vol = c.get_state("/track/1/volume") track1_mute = c.get_state("/track/1/mute") track1_type = c.get_state("/track/1/type") check("/track/1/exists received", track1_exists is not None, f"value={track1_exists}") check("/track/1/name received", track1_name is not None, f"value={track1_name}") check("/track/1/volume received", track1_vol is not None, f"value={track1_vol}") check("/track/1/mute received", track1_mute is not None, f"value={track1_mute}") print(f"\n Track bank snapshot:") for n in range(1, 9): exists = c.get_state(f"/track/{n}/exists") if exists and exists[0]: name = c.get_state(f"/track/{n}/name") vol = c.get_state(f"/track/{n}/volume") pan = c.get_state(f"/track/{n}/pan") mute = c.get_state(f"/track/{n}/mute") solo = c.get_state(f"/track/{n}/solo") recarm = c.get_state(f"/track/{n}/recarm") ttype = c.get_state(f"/track/{n}/type") print(f" Track {n}: {str(name[0] if name else '?'):<20} " f"vol={vol[0] if vol else '?':>3} pan={pan[0] if pan else '?':>3} " f"mute={mute[0] if mute else '?'} solo={solo[0] if solo else '?'} " f"arm={recarm[0] if recarm else '?'} type={ttype[0] if ttype else '?'}") else: print(f" Track {n}: (no track)") # ───────────────────────────────────────────────────────────── section("5. Transport Command — Play toggle") was_playing = c.get_state_value("/play", 0) print(f" Current play state: {was_playing}") print(" Sending /play 1 ...") c.cmd("/play", 1) time.sleep(0.3) after_play = c.get_state_value("/play", None) print(f" Play state after cmd: {after_play}") time.sleep(0.5) print(" Sending /stop ...") c.cmd("/stop") time.sleep(0.3) after_stop = c.get_state_value("/play", None) print(f" Play state after stop: {after_stop}") play_changed = (after_play is not None and after_play == 1) stop_changed = (after_stop is not None and after_stop == 0) check("/play command → Bitwig reported playing=1", play_changed, f"state after play={after_play}") check("/stop command → Bitwig reported playing=0", stop_changed, f"state after stop={after_stop}") # ───────────────────────────────────────────────────────────── section("6. Track Command — Mute toggle on track 1") exists = c.get_state_value("/track/1/exists", 0) if not exists: print(f" [{WARN}] Track 1 does not exist — skipping mute test") results.append(("Track 1 mute toggle", None)) else: mute_before = c.get_state_value("/track/1/mute", None) print(f" Track 1 mute before: {mute_before}") print(" Sending /track/1/mute (toggle) ...") c.cmd("/track/1/mute") time.sleep(0.3) mute_after = c.get_state_value("/track/1/mute", None) print(f" Track 1 mute after toggle: {mute_after}") toggled = (mute_before is not None and mute_after is not None and mute_before != mute_after) check("Mute toggle changes state", toggled, f"before={mute_before} → after={mute_after}") if toggled: print(" Restoring original mute state ...") c.cmd("/track/1/mute") time.sleep(0.3) mute_restored = c.get_state_value("/track/1/mute", None) check("Mute restored to original value", mute_restored == mute_before, f"restored={mute_restored}") # ───────────────────────────────────────────────────────────── section("7. Device State") dev_exists = c.get_state("/device/exists") dev_name = c.get_state("/device/name") dev_bypass = c.get_state("/device/bypass") check("/device/exists received", dev_exists is not None, f"value={dev_exists}") print(f"\n Cursor device:") print(f" exists = {dev_exists[0] if dev_exists else 'N/A'}") print(f" name = {dev_name[0] if dev_name else 'N/A'}") print(f" bypass = {dev_bypass[0] if dev_bypass else 'N/A'}") params_received = sum(1 for i in range(1, 9) if c.get_state(f"/device/param/{i}/name") is not None) print(f" params with names: {params_received}/8") # ───────────────────────────────────────────────────────────── section("8. Scene State") scenes_with_content = 0 for i in range(1, 9): exists = c.get_state(f"/scene/{i}/exists") if exists and exists[0]: scenes_with_content += 1 name = c.get_state(f"/scene/{i}/name") print(f" Scene {i}: {name[0] if name else '?'}") check("Scene state received", c.get_state("/scene/1/exists") is not None, f"Scenes with content: {scenes_with_content}") # ───────────────────────────────────────────────────────────── section("9. clip_record_notes — MIDI note recording") # Build the tool function directly (same process, same OSC singleton) _tools = {} class _FakeMCP: def tool(self): def decorator(fn): _tools[fn.__name__] = fn return fn return decorator import bitwig_mcp.tools.clip as _clip_mod _clip_mod.register(_FakeMCP()) _record_fn = _tools["clip_record_notes"] # Check track 1 exists before trying t1_exists = c.get_state_value("/track/1/exists", 0) if not t1_exists: print(" [WARN] Track 1 does not exist — skipping note recording test") results.append(("clip_record_notes writes notes into clip", None)) else: tempo_val = c.get_state_value("/tempo/raw", 120.0) print(f" Recording C major arpeggio at {tempo_val} BPM into track 1, slot 3...") try: result = _record_fn( track_index=1, clip_index=3, notes=[ {"note": 60, "velocity": 100, "start_beat": 0.0, "duration_beats": 0.45}, {"note": 64, "velocity": 90, "start_beat": 1.0, "duration_beats": 0.45}, {"note": 67, "velocity": 95, "start_beat": 2.0, "duration_beats": 0.45}, {"note": 72, "velocity": 85, "start_beat": 3.0, "duration_beats": 0.9}, ], length_beats=4.0, channel=1, ) print(f" Result: {result}") # Verify clip now has content time.sleep(0.3) has_content = c.get_state_value("/track/1/clip/3/hasContent", 0) check("clip_record_notes writes notes into clip", bool(has_content), f"hasContent={has_content}, result={result}") except Exception as e: check("clip_record_notes writes notes into clip", False, f"Exception: {e}") # ───────────────────────────────────────────────────────────── section("10. Probe — invalid/edge commands") print(" Sending /refresh (should flood state, not crash) ...") count_a = len(c.get_all_state()) c.cmd("/refresh") time.sleep(0.5) count_b = len(c.get_all_state()) check("/refresh accepted without crash", True, f"state before={count_a} after={count_b}") print(" Sending /track/99/volume 64 (out-of-range track, should be silently ignored) ...") c.cmd("/track/99/volume", 64) time.sleep(0.2) check("Out-of-range track command silently ignored (no crash)", True) print(" Sending /undo ...") c.cmd("/undo") time.sleep(0.2) check("/undo accepted", True) print(" Sending /redo ...") c.cmd("/redo") time.sleep(0.2) check("/redo accepted", True) # ───────────────────────────────────────────────────────────── section("Summary") passed = sum(1 for _, r in results if r is True) failed = sum(1 for _, r in results if r is False) skipped = sum(1 for _, r in results if r is None) total = len(results) for label, r in results: icon = "[OK]" if r is True else ("[!!]" if r is False else "[--]") print(f" {icon} {label}") print(f"\n Total: {total} Passed: {passed} Failed: {failed} Skipped: {skipped}") print(f"\n {'PASS' if failed == 0 else 'FAIL'} — bitwig-mcp live OSC integration\n") sys.exit(0 if failed == 0 else 1)