""" UDP OSC server with zero external dependencies. Safe for Ableton's embedded Python (no threading, non-blocking sockets). """ import logging import socket import struct from typing import Any, Callable, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) LISTEN_PORT = 11000 SEND_PORT = 11001 MAX_PACKET = 65536 # --------------------------------------------------------------------------- # OSC encoding # --------------------------------------------------------------------------- def _pad4(n: int) -> int: return (n + 3) & ~3 def _encode_string(s: str) -> bytes: b = s.encode("utf-8") + b"\x00" pad = _pad4(len(b)) - len(b) return b + b"\x00" * pad def _encode_int(i: int) -> bytes: return struct.pack(">i", int(i)) def _encode_float(f: float) -> bytes: return struct.pack(">f", float(f)) def _encode_blob(b: bytes) -> bytes: length = struct.pack(">I", len(b)) pad = _pad4(len(b)) - len(b) return length + b + b"\x00" * pad def encode_message(address: str, args: tuple) -> bytes: type_tags = "," arg_bytes = b"" for arg in args: if isinstance(arg, bool): type_tags += "T" if arg else "F" elif isinstance(arg, int): type_tags += "i" arg_bytes += _encode_int(arg) elif isinstance(arg, float): type_tags += "f" arg_bytes += _encode_float(arg) elif isinstance(arg, str): type_tags += "s" arg_bytes += _encode_string(arg) elif isinstance(arg, bytes): type_tags += "b" arg_bytes += _encode_blob(arg) elif arg is None: type_tags += "N" return _encode_string(address) + _encode_string(type_tags) + arg_bytes # --------------------------------------------------------------------------- # OSC decoding # --------------------------------------------------------------------------- def decode_message(data: bytes) -> Tuple[str, List[Any]]: offset = 0 # address end = data.index(b"\x00", offset) address = data[offset:end].decode("utf-8") offset = _pad4(end + 1) if offset >= len(data) or data[offset:offset + 1] != b",": return address, [] # type tags end = data.index(b"\x00", offset) type_tags = data[offset + 1:end].decode("utf-8") offset = _pad4(end + 1) args: List[Any] = [] for tag in type_tags: if tag == "i": args.append(struct.unpack(">i", data[offset:offset + 4])[0]) offset += 4 elif tag == "f": args.append(struct.unpack(">f", data[offset:offset + 4])[0]) offset += 4 elif tag == "s": end = data.index(b"\x00", offset) args.append(data[offset:end].decode("utf-8")) offset = _pad4(end + 1) elif tag == "b": length = struct.unpack(">I", data[offset:offset + 4])[0] offset += 4 args.append(data[offset:offset + length]) offset += _pad4(length) elif tag == "T": args.append(True) elif tag == "F": args.append(False) elif tag in ("N", "n"): args.append(None) elif tag == "I": args.append(float("inf")) elif tag in ("h", "t"): args.append(struct.unpack(">q", data[offset:offset + 8])[0]) offset += 8 elif tag == "d": args.append(struct.unpack(">d", data[offset:offset + 8])[0]) offset += 8 elif tag == "c": args.append(chr(struct.unpack(">I", data[offset:offset + 4])[0])) offset += 4 return address, args # --------------------------------------------------------------------------- # OSC Server # --------------------------------------------------------------------------- class OSCServer: def __init__(self, listen_port: int = LISTEN_PORT, send_port: int = SEND_PORT): self.listen_port = listen_port self.send_port = send_port self._handlers: Dict[str, Callable] = {} self._sock: Optional[socket.socket] = None self._remote_addr: Optional[str] = None # ------------------------------------------------------------------ def start(self) -> None: self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._sock.bind(("0.0.0.0", self.listen_port)) self._sock.setblocking(False) logger.info("OSC server listening on port %d", self.listen_port) def stop(self) -> None: if self._sock: self._sock.close() self._sock = None # ------------------------------------------------------------------ def add_handler(self, address: str, callback: Callable) -> None: self._handlers[address] = callback def remove_handler(self, address: str) -> None: self._handlers.pop(address, None) # ------------------------------------------------------------------ def process(self) -> None: """Call from Ableton's update_display tick.""" if not self._sock: return try: while True: data, addr = self._sock.recvfrom(MAX_PACKET) self._remote_addr = addr[0] self._dispatch(data) except BlockingIOError: pass except Exception as e: logger.error("OSC recv error: %s", e) def _dispatch(self, data: bytes) -> None: if data[:8] == b"#bundle\x00": self._dispatch_bundle(data) return try: address, args = decode_message(data) except Exception as e: logger.warning("OSC decode error: %s", e) return self._call(address, args) def _dispatch_bundle(self, data: bytes) -> None: offset = 16 # skip "#bundle\0" + timetag while offset < len(data): size = struct.unpack(">I", data[offset:offset + 4])[0] offset += 4 self._dispatch(data[offset:offset + size]) offset += size def _call(self, address: str, args: List[Any]) -> None: handler = self._handlers.get(address) if handler is None: # try prefix match (e.g. wildcard handlers) for pattern, h in self._handlers.items(): if pattern.endswith("*") and address.startswith(pattern[:-1]): handler = h break if handler is None: logger.debug("No handler for %s", address) return try: result = handler(tuple(args)) if result is not None: self.send(address, result) except Exception as e: logger.error("Handler error for %s: %s", address, e) self.send("/live/error", (str(e),)) # ------------------------------------------------------------------ def send(self, address: str, args: tuple, target_ip: Optional[str] = None) -> None: if not self._sock: return ip = target_ip or self._remote_addr if not ip: return try: data = encode_message(address, args) self._sock.sendto(data, (ip, self.send_port)) except Exception as e: logger.error("OSC send error: %s", e) def send_all_clients(self, address: str, args: tuple) -> None: """Broadcast to all known clients (currently just the last sender).""" self.send(address, args)