b7cec9d24b
Python package naming convention uses snake_case. Update the import in the root __init__.py and the setuptools include pattern in pyproject.toml. Internal relative imports within the package are unaffected. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
226 lines
7.4 KiB
Python
226 lines
7.4 KiB
Python
"""
|
|
UDP OSC server with zero external dependencies.
|
|
Safe for Ableton's embedded Python (no threading, non-blocking sockets).
|
|
"""
|
|
import logging
|
|
import socket
|
|
import struct
|
|
from typing import Any, Callable, Dict, List, Optional, Tuple
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
LISTEN_PORT = 11000
|
|
SEND_PORT = 11001
|
|
MAX_PACKET = 65536
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OSC encoding
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _pad4(n: int) -> int:
|
|
return (n + 3) & ~3
|
|
|
|
|
|
def _encode_string(s: str) -> bytes:
|
|
b = s.encode("utf-8") + b"\x00"
|
|
pad = _pad4(len(b)) - len(b)
|
|
return b + b"\x00" * pad
|
|
|
|
|
|
def _encode_int(i: int) -> bytes:
|
|
return struct.pack(">i", int(i))
|
|
|
|
|
|
def _encode_float(f: float) -> bytes:
|
|
return struct.pack(">f", float(f))
|
|
|
|
|
|
def _encode_blob(b: bytes) -> bytes:
|
|
length = struct.pack(">I", len(b))
|
|
pad = _pad4(len(b)) - len(b)
|
|
return length + b + b"\x00" * pad
|
|
|
|
|
|
def encode_message(address: str, args: tuple) -> bytes:
|
|
type_tags = ","
|
|
arg_bytes = b""
|
|
for arg in args:
|
|
if isinstance(arg, bool):
|
|
type_tags += "T" if arg else "F"
|
|
elif isinstance(arg, int):
|
|
type_tags += "i"
|
|
arg_bytes += _encode_int(arg)
|
|
elif isinstance(arg, float):
|
|
type_tags += "f"
|
|
arg_bytes += _encode_float(arg)
|
|
elif isinstance(arg, str):
|
|
type_tags += "s"
|
|
arg_bytes += _encode_string(arg)
|
|
elif isinstance(arg, bytes):
|
|
type_tags += "b"
|
|
arg_bytes += _encode_blob(arg)
|
|
elif arg is None:
|
|
type_tags += "N"
|
|
return _encode_string(address) + _encode_string(type_tags) + arg_bytes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OSC decoding
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def decode_message(data: bytes) -> Tuple[str, List[Any]]:
|
|
offset = 0
|
|
|
|
# address
|
|
end = data.index(b"\x00", offset)
|
|
address = data[offset:end].decode("utf-8")
|
|
offset = _pad4(end + 1)
|
|
|
|
if offset >= len(data) or data[offset:offset + 1] != b",":
|
|
return address, []
|
|
|
|
# type tags
|
|
end = data.index(b"\x00", offset)
|
|
type_tags = data[offset + 1:end].decode("utf-8")
|
|
offset = _pad4(end + 1)
|
|
|
|
args: List[Any] = []
|
|
for tag in type_tags:
|
|
if tag == "i":
|
|
args.append(struct.unpack(">i", data[offset:offset + 4])[0])
|
|
offset += 4
|
|
elif tag == "f":
|
|
args.append(struct.unpack(">f", data[offset:offset + 4])[0])
|
|
offset += 4
|
|
elif tag == "s":
|
|
end = data.index(b"\x00", offset)
|
|
args.append(data[offset:end].decode("utf-8"))
|
|
offset = _pad4(end + 1)
|
|
elif tag == "b":
|
|
length = struct.unpack(">I", data[offset:offset + 4])[0]
|
|
offset += 4
|
|
args.append(data[offset:offset + length])
|
|
offset += _pad4(length)
|
|
elif tag == "T":
|
|
args.append(True)
|
|
elif tag == "F":
|
|
args.append(False)
|
|
elif tag in ("N", "n"):
|
|
args.append(None)
|
|
elif tag == "I":
|
|
args.append(float("inf"))
|
|
elif tag in ("h", "t"):
|
|
args.append(struct.unpack(">q", data[offset:offset + 8])[0])
|
|
offset += 8
|
|
elif tag == "d":
|
|
args.append(struct.unpack(">d", data[offset:offset + 8])[0])
|
|
offset += 8
|
|
elif tag == "c":
|
|
args.append(chr(struct.unpack(">I", data[offset:offset + 4])[0]))
|
|
offset += 4
|
|
return address, args
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OSC Server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OSCServer:
|
|
def __init__(self, listen_port: int = LISTEN_PORT, send_port: int = SEND_PORT):
|
|
self.listen_port = listen_port
|
|
self.send_port = send_port
|
|
self._handlers: Dict[str, Callable] = {}
|
|
self._sock: Optional[socket.socket] = None
|
|
self._remote_addr: Optional[str] = None
|
|
|
|
# ------------------------------------------------------------------
|
|
def start(self) -> None:
|
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self._sock.bind(("0.0.0.0", self.listen_port))
|
|
self._sock.setblocking(False)
|
|
logger.info("OSC server listening on port %d", self.listen_port)
|
|
|
|
def stop(self) -> None:
|
|
if self._sock:
|
|
self._sock.close()
|
|
self._sock = None
|
|
|
|
# ------------------------------------------------------------------
|
|
def add_handler(self, address: str, callback: Callable) -> None:
|
|
self._handlers[address] = callback
|
|
|
|
def remove_handler(self, address: str) -> None:
|
|
self._handlers.pop(address, None)
|
|
|
|
# ------------------------------------------------------------------
|
|
def process(self) -> None:
|
|
"""Call from Ableton's update_display tick."""
|
|
if not self._sock:
|
|
return
|
|
try:
|
|
while True:
|
|
data, addr = self._sock.recvfrom(MAX_PACKET)
|
|
self._remote_addr = addr[0]
|
|
self._dispatch(data)
|
|
except BlockingIOError:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("OSC recv error: %s", e)
|
|
|
|
def _dispatch(self, data: bytes) -> None:
|
|
if data[:8] == b"#bundle\x00":
|
|
self._dispatch_bundle(data)
|
|
return
|
|
try:
|
|
address, args = decode_message(data)
|
|
except Exception as e:
|
|
logger.warning("OSC decode error: %s", e)
|
|
return
|
|
self._call(address, args)
|
|
|
|
def _dispatch_bundle(self, data: bytes) -> None:
|
|
offset = 16 # skip "#bundle\0" + timetag
|
|
while offset < len(data):
|
|
size = struct.unpack(">I", data[offset:offset + 4])[0]
|
|
offset += 4
|
|
self._dispatch(data[offset:offset + size])
|
|
offset += size
|
|
|
|
def _call(self, address: str, args: List[Any]) -> None:
|
|
handler = self._handlers.get(address)
|
|
if handler is None:
|
|
# try prefix match (e.g. wildcard handlers)
|
|
for pattern, h in self._handlers.items():
|
|
if pattern.endswith("*") and address.startswith(pattern[:-1]):
|
|
handler = h
|
|
break
|
|
if handler is None:
|
|
logger.debug("No handler for %s", address)
|
|
return
|
|
try:
|
|
result = handler(tuple(args))
|
|
if result is not None:
|
|
self.send(address, result)
|
|
except Exception as e:
|
|
logger.error("Handler error for %s: %s", address, e)
|
|
self.send("/live/error", (str(e),))
|
|
|
|
# ------------------------------------------------------------------
|
|
def send(self, address: str, args: tuple, target_ip: Optional[str] = None) -> None:
|
|
if not self._sock:
|
|
return
|
|
ip = target_ip or self._remote_addr
|
|
if not ip:
|
|
return
|
|
try:
|
|
data = encode_message(address, args)
|
|
self._sock.sendto(data, (ip, self.send_port))
|
|
except Exception as e:
|
|
logger.error("OSC send error: %s", e)
|
|
|
|
def send_all_clients(self, address: str, args: tuple) -> None:
|
|
"""Broadcast to all known clients (currently just the last sender)."""
|
|
self.send(address, args)
|