Skip to content

API Reference

This page provides a comprehensive reference for all modules in the teleop_xr package.

Core Module

teleop_xr

teleop_xr

Teleop

Teleop class for controlling a robot remotely using FastAPI and WebSockets.

Parameters:

Name Type Description Default
settings TeleopSettings

Configuration settings for the teleop server.

required
video_sources Optional[dict[str, VideoSource]]

Dictionary of video sources.

None
Source code in teleop_xr/__init__.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
class Teleop:
    """
    Teleop class for controlling a robot remotely using FastAPI and WebSockets.

    Args:
        settings (TeleopSettings): Configuration settings for the teleop server.
        video_sources (Optional[dict[str, VideoSource]]): Dictionary of video sources.
    """

    def __init__(
        self,
        settings: TeleopSettings,
        video_sources: Optional[dict[str, VideoSource]] = None,
    ):
        self.__logger = logging.getLogger("teleop")
        self.__logger.setLevel(logging.INFO)
        if not self.__logger.handlers:
            self.__logger.addHandler(logging.StreamHandler())

        self.__settings = settings
        self.__camera_views = self.__settings.camera_views

        self.__relative_pose_init = None
        self.__absolute_pose_init = None
        self.__previous_received_pose = None
        self.__callbacks = []
        self.__pose = np.eye(4)
        self.__last_joint_state: Optional[Dict[str, float]] = None

        euler = self.__settings.natural_phone_orientation_euler
        self.__natural_phone_pose = t3d.affines.compose(
            self.__settings.natural_phone_position,
            t3d.euler.euler2mat(euler[0], euler[1], euler[2]),
            [1, 1, 1],
        )

        self.__app = FastAPI()
        self.__app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_methods=["*"],
            allow_headers=["*"],
        )
        self.__manager = ConnectionManager()

        self.__ws_connect_lock = asyncio.Lock()
        self.__control_lock = asyncio.Lock()
        self.__controller_client_id: Optional[str] = None
        self.__controller_ws: Optional[WebSocket] = None
        self.__controller_last_seen_s: float = 0.0
        self.__ws_client_ids: dict[WebSocket, str] = {}

        self.__video_streams: list[VideoStreamConfig] = []
        self.__video_sources: dict[str, VideoSource] = video_sources or {}
        self.__video_sessions: dict[WebSocket, VideoStreamManager] = {}

        self.robot_vis: Optional[RobotVisModule] = None
        if self.__settings.robot_vis:
            self.robot_vis = RobotVisModule(self.__app, self.__settings.robot_vis)

        if self.__camera_views is not None and not self.__video_streams:
            self.set_video_streams(build_video_streams(self.__camera_views))

        # Configure logging
        logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
        self.__setup_routes()

    @property
    def input_mode(self):
        return self.__settings.input_mode

    @property
    def app(self) -> FastAPI:
        return self.__app

    def set_pose(self, pose: np.ndarray) -> None:
        """
        Set the current pose of the end-effector.

        Parameters:
        - pose (np.ndarray): A 4x4 transformation matrix representing the pose.
        """
        self.__pose = pose

    def __convert_pose_to_ros(self, pose: dict[str, Any] | None) -> None:
        if not pose or "position" not in pose or "orientation" not in pose:
            return

        position = pose["position"]
        orientation = pose["orientation"]
        position_arr = np.array([position["x"], position["y"], position["z"]])
        quat = np.array(
            [
                orientation["w"],
                orientation["x"],
                orientation["y"],
                orientation["z"],
            ]
        )

        pose_rub = t3d.affines.compose(
            position_arr, t3d.quaternions.quat2mat(quat), [1, 1, 1]
        )
        pose_flu = TF_RUB2FLU @ pose_rub
        pose_flu[:3, :3] = pose_flu[:3, :3] @ TF_RUB2FLU_ROT_INV

        position_flu = pose_flu[:3, 3]
        quat_flu = t3d.quaternions.mat2quat(pose_flu[:3, :3])

        pose["position"] = {
            "x": float(position_flu[0]),
            "y": float(position_flu[1]),
            "z": float(position_flu[2]),
        }
        pose["orientation"] = {
            "w": float(quat_flu[0]),
            "x": float(quat_flu[1]),
            "y": float(quat_flu[2]),
            "z": float(quat_flu[3]),
        }

    def __convert_devices_to_ros(self, devices: list[dict[str, Any]]) -> None:
        for device in devices:
            self.__convert_pose_to_ros(device.get("pose"))
            self.__convert_pose_to_ros(device.get("gripPose"))

            joints = device.get("joints")
            if isinstance(joints, dict):
                for joint_pose in joints.values():
                    self.__convert_pose_to_ros(joint_pose)

    def subscribe(self, callback: Callable[[np.ndarray, Any], None]) -> None:
        """
        Subscribe to receive updates from the teleop module.

        Parameters:
            callback (Callable[[np.ndarray, dict[str, Any]], None]): A callback function that will be called when pose updates are received.
                The callback function should take two arguments:
                    - np.ndarray: A 4x4 transformation matrix representing the end-effector target pose.
                    - dict[str, Any]: A dictionary containing additional information.
        """
        self.__callbacks.append(callback)

    def __notify_subscribers(self, pose, message):
        for callback in self.__callbacks:
            callback(pose, message)

    def set_video_streams(self, payload: Any) -> None:
        self.__video_streams = parse_video_config(payload)

    def clear_video_streams(self) -> None:
        self.__video_streams = []

    async def _start_video_session(self, websocket: WebSocket) -> None:
        sources = self.__video_sources
        if not sources and self.__video_streams:
            sources = build_sources(self.__video_streams)

        manager = VideoStreamManager(sources)
        self.__video_sessions[websocket] = manager
        offer = await manager.create_offer()
        await self.__manager.send_personal_message(
            json.dumps(
                {"type": "video_offer", "data": {"sdp": offer.sdp, "type": offer.type}}
            ),
            websocket,
        )

    async def _handle_video_message(self, websocket: WebSocket, message: Any) -> None:
        msg_type = message.get("type")
        self.__logger.info(f"Received video message: {msg_type}")

        if msg_type == "video_request":
            self.__logger.info("Starting video session")
            await self._start_video_session(websocket)
            return

        session = self.__video_sessions.get(websocket)
        if session:
            self.__logger.debug(f"Routing {msg_type} to session")
            await route_video_message(session, message)
        else:
            self.__logger.warning(
                f"No active video session for websocket, ignoring {msg_type}"
            )

    def apply(self, position, orientation, move=True, scale=1.0, info=None):
        if info is None:
            info = {}

        position_arr = np.array([position["x"], position["y"], position["z"]])
        quat = np.array(
            [orientation["w"], orientation["x"], orientation["y"], orientation["z"]]
        )

        if not move:
            self.__relative_pose_init = None
            self.__absolute_pose_init = None
            self.__notify_subscribers(self.__pose, info)
            return

        received_pose = t3d.affines.compose(
            position_arr, t3d.quaternions.quat2mat(quat), [1, 1, 1]
        )
        received_pose = received_pose @ self.__natural_phone_pose

        # Pose jump protection
        if self.__previous_received_pose is not None:
            if not are_close(
                received_pose,
                self.__previous_received_pose,
                lin_tol=0.05,
                ang_tol=math.radians(35),
            ):
                self.__logger.warning("Pose jump detected, resetting the pose")
                self.__relative_pose_init = None
                self.__previous_received_pose = received_pose
                return
        self.__previous_received_pose = received_pose

        # Accumulate the pose and publish
        if self.__relative_pose_init is None:
            self.__relative_pose_init = received_pose
            self.__absolute_pose_init = self.__pose
            self.__previous_received_pose = None

        assert self.__absolute_pose_init is not None
        relative_position = received_pose[:3, 3] - self.__relative_pose_init[:3, 3]
        relative_orientation = received_pose[:3, :3] @ np.linalg.inv(
            self.__relative_pose_init[:3, :3]
        )

        if scale > 1.0:
            relative_position *= scale

        self.__pose = np.eye(4)
        self.__pose[:3, 3] = self.__absolute_pose_init[:3, 3] + relative_position
        self.__pose[:3, :3] = relative_orientation @ self.__absolute_pose_init[:3, :3]

        # Apply scale
        if scale < 1.0:
            self.__pose = interpolate_transforms(
                self.__absolute_pose_init, self.__pose, scale
            )

        # Notify the subscribers
        self.__notify_subscribers(self.__pose, info)

    def __handle_xr_state(self, message):
        input_mode = self.__settings.input_mode
        devices = message.get("devices", [])

        self.__convert_devices_to_ros(devices)

        # Log fetch latency if present
        fetch_latency = message.get("fetch_latency_ms")
        if fetch_latency is not None:
            self.__logger.debug(f"XR input fetch latency: {fetch_latency:.2f}ms")

        filtered_devices = []
        for d in devices:
            role = d.get("role")
            if role == "head":
                filtered_devices.append(d)
            elif role == "controller" and input_mode in ["controller", "auto"]:
                filtered_devices.append(d)

        message["devices"] = filtered_devices

        # Derive pose from controller device (prefer right then left)
        target_device = None
        for handedness in ["right", "left"]:
            for d in filtered_devices:
                if d.get("role") == "controller" and d.get("handedness") == handedness:
                    target_device = d
                    break
            if target_device:
                break

        if target_device:
            pose_data = target_device.get("gripPose")
            if pose_data:
                self.apply(
                    pose_data["position"],
                    pose_data["orientation"],
                    move=True,
                    scale=1.0,
                    info=message,
                )
                return

        # Fallback to head pose
        for d in filtered_devices:
            if d.get("role") == "head":
                pose_data = d.get("pose")
                if pose_data:
                    self.apply(
                        pose_data["position"],
                        pose_data["orientation"],
                        move=True,
                        scale=1.0,
                        info=message,
                    )
                    return

    async def publish_joint_state(self, joints: Dict[str, float]):
        self.__last_joint_state = joints
        if self.robot_vis:
            await self.robot_vis.broadcast_state(self.__manager, joints)

    def __setup_routes(self):
        static_dir, index_path, mount_path, mount_name = _resolve_frontend_paths(
            THIS_DIR
        )

        @self.__app.get("/")
        async def index():
            return FileResponse(index_path)

        @self.__app.websocket("/ws")
        async def websocket_endpoint(websocket: WebSocket):
            control_timeout_s = 5.0

            await websocket.accept()

            try:
                await asyncio.wait_for(self.__ws_connect_lock.acquire(), timeout=0.05)
            except asyncio.TimeoutError:
                await websocket.send_text(
                    json.dumps(
                        {
                            "type": "connection_error",
                            "data": {
                                "reason": "connecting",
                                "message": "WebSocket busy: another client is connecting.",
                            },
                        }
                    )
                )
                await websocket.close()
                return

            try:
                await self.__manager.register(websocket)

                # Send config message on connect
                await websocket.send_text(
                    json.dumps(
                        {
                            "type": "config",
                            "data": self.__settings.model_dump(),
                        }
                    )
                )

                if self.robot_vis:
                    await websocket.send_text(
                        json.dumps(
                            {
                                "type": "robot_config",
                                "data": self.robot_vis.get_frontend_config(),
                            }
                        )
                    )
                    if self.__last_joint_state:
                        await websocket.send_text(
                            json.dumps(
                                {
                                    "type": "robot_state",
                                    "data": {"joints": self.__last_joint_state},
                                }
                            )
                        )

                if self.__video_streams:
                    await self.__manager.send_personal_message(
                        json.dumps(
                            {
                                "type": "video_config",
                                "data": {
                                    "streams": [
                                        s.__dict__ for s in self.__video_streams
                                    ]
                                },
                            }
                        ),
                        websocket,
                    )
            finally:
                self.__ws_connect_lock.release()

            self.__logger.info("Client connected")

            async def close_video_sessions_for_client_id(
                client_id_to_close: str,
            ) -> None:
                to_close: list[tuple[WebSocket, VideoStreamManager]] = []
                for ws, session in list(self.__video_sessions.items()):
                    ws_id = self.__ws_client_ids.get(ws)
                    if ws_id == client_id_to_close:
                        to_close.append((ws, session))

                for ws, session in to_close:
                    await session.close()
                    self.__video_sessions.pop(ws, None)

            async def close_video_sessions_not_matching(controller_id: str) -> None:
                to_close: list[tuple[WebSocket, VideoStreamManager]] = []
                for ws, session in list(self.__video_sessions.items()):
                    ws_id = self.__ws_client_ids.get(ws)
                    if ws_id != controller_id:
                        to_close.append((ws, session))

                for ws, session in to_close:
                    await session.close()
                    self.__video_sessions.pop(ws, None)

            async def check_or_claim_control(
                claimed_client_id: str,
            ) -> tuple[bool, Optional[str]]:
                expired_controller: Optional[str] = None
                newly_claimed: Optional[str] = None

                async with self.__control_lock:
                    now = time.monotonic()
                    if (
                        self.__controller_client_id is not None
                        and (now - self.__controller_last_seen_s) > control_timeout_s
                    ):
                        expired_controller = self.__controller_client_id
                        self.__controller_client_id = None
                        self.__controller_ws = None
                        self.__controller_last_seen_s = 0.0

                    if self.__controller_client_id is None:
                        self.__controller_client_id = claimed_client_id
                        newly_claimed = claimed_client_id

                    in_control = self.__controller_client_id == claimed_client_id
                    if in_control:
                        self.__controller_ws = websocket
                        self.__controller_last_seen_s = now

                    controller_id = self.__controller_client_id

                if expired_controller is not None:
                    await close_video_sessions_for_client_id(expired_controller)

                if newly_claimed is not None:
                    await close_video_sessions_not_matching(newly_claimed)

                return in_control, controller_id

            try:
                while True:
                    data = await websocket.receive_text()
                    message = json.loads(data)

                    msg_type = message.get("type")
                    client_id = message.get("client_id")
                    if isinstance(client_id, str) and client_id:
                        self.__ws_client_ids[websocket] = client_id

                    if msg_type == "control_check":
                        if not isinstance(client_id, str) or not client_id:
                            await websocket.send_text(
                                json.dumps(
                                    {
                                        "type": "control_status",
                                        "data": {
                                            "in_control": False,
                                            "controller_client_id": self.__controller_client_id,
                                        },
                                    }
                                )
                            )
                            continue

                        in_control, controller_id = await check_or_claim_control(
                            client_id
                        )

                        await websocket.send_text(
                            json.dumps(
                                {
                                    "type": "control_status",
                                    "data": {
                                        "in_control": in_control,
                                        "controller_client_id": controller_id,
                                    },
                                }
                            )
                        )
                        continue

                    if msg_type == "xr_state":
                        if not isinstance(client_id, str) or not client_id:
                            await websocket.send_text(
                                json.dumps(
                                    {
                                        "type": "deny",
                                        "data": {
                                            "reason": "missing_client_id",
                                            "controller_client_id": self.__controller_client_id,
                                        },
                                    }
                                )
                            )
                            continue

                        allowed, controller_id = await check_or_claim_control(client_id)
                        if not allowed:
                            await websocket.send_text(
                                json.dumps(
                                    {
                                        "type": "deny",
                                        "data": {
                                            "reason": "not_in_control",
                                            "controller_client_id": controller_id,
                                        },
                                    }
                                )
                            )
                            continue

                        self.__handle_xr_state(message["data"])
                        continue

                    if msg_type in {"video_request", "video_answer", "video_ice"}:
                        if not isinstance(client_id, str) or not client_id:
                            await websocket.send_text(
                                json.dumps(
                                    {
                                        "type": "deny",
                                        "data": {
                                            "reason": "missing_client_id",
                                            "controller_client_id": self.__controller_client_id,
                                        },
                                    }
                                )
                            )
                            continue

                        allowed, controller_id = await check_or_claim_control(client_id)
                        if not allowed:
                            await websocket.send_text(
                                json.dumps(
                                    {
                                        "type": "deny",
                                        "data": {
                                            "reason": "not_in_control",
                                            "controller_client_id": controller_id,
                                        },
                                    }
                                )
                            )
                            continue

                        await self._handle_video_message(websocket, message)
                        continue

                    if msg_type == "console_log":
                        # Stream console logs from WebXR to terminal
                        log_data = message.get("data", {})
                        level = log_data.get("level", "log")
                        msg = log_data.get("message", "")

                        # Suppress spammy WS errors from WebXR
                        if "WS Error" in msg and "isTrusted" in msg:
                            continue

                        self.__logger.info(f"[WebXR:{level}] {msg}")

            except WebSocketDisconnect:
                pass
            finally:
                disconnected_client_id = self.__ws_client_ids.pop(websocket, None)
                if disconnected_client_id is not None:
                    async with self.__control_lock:
                        if (
                            self.__controller_ws is websocket
                            and self.__controller_client_id == disconnected_client_id
                        ):
                            self.__controller_client_id = None
                            self.__controller_ws = None
                            self.__controller_last_seen_s = 0.0

                session = self.__video_sessions.pop(websocket, None)
                if session:
                    await session.close()
                await self.__manager.disconnect(websocket)
                self.__logger.info("Client disconnected")

        self.__app.mount(mount_path, StaticFiles(directory=static_dir), name=mount_name)

    def run(self) -> None:
        """
        Runs the teleop server. This method is blocking.
        """
        self.__logger.info(self.__settings.model_dump_json())
        self.__logger.info(
            f"Server started at {self.__settings.host}:{self.__settings.port}"
        )
        self.__logger.info(
            f"The phone web app should be available at https://{get_local_ip()}:{self.__settings.port}"
        )

        ssl_keyfile = os.path.join(THIS_DIR, "key.pem")
        ssl_certfile = os.path.join(THIS_DIR, "cert.pem")

        uvicorn.run(
            self.__app,
            host=self.__settings.host,
            port=self.__settings.port,
            ssl_keyfile=ssl_keyfile,
            ssl_certfile=ssl_certfile,
            log_level="warning",
        )

    def stop(self) -> None:
        """
        Stops the teleop server.
        """
        # FastAPI/uvicorn handles shutdown automatically
        pass

run()

Runs the teleop server. This method is blocking.

Source code in teleop_xr/__init__.py
def run(self) -> None:
    """
    Runs the teleop server. This method is blocking.
    """
    self.__logger.info(self.__settings.model_dump_json())
    self.__logger.info(
        f"Server started at {self.__settings.host}:{self.__settings.port}"
    )
    self.__logger.info(
        f"The phone web app should be available at https://{get_local_ip()}:{self.__settings.port}"
    )

    ssl_keyfile = os.path.join(THIS_DIR, "key.pem")
    ssl_certfile = os.path.join(THIS_DIR, "cert.pem")

    uvicorn.run(
        self.__app,
        host=self.__settings.host,
        port=self.__settings.port,
        ssl_keyfile=ssl_keyfile,
        ssl_certfile=ssl_certfile,
        log_level="warning",
    )

set_pose(pose)

Set the current pose of the end-effector.

Parameters: - pose (np.ndarray): A 4x4 transformation matrix representing the pose.

Source code in teleop_xr/__init__.py
def set_pose(self, pose: np.ndarray) -> None:
    """
    Set the current pose of the end-effector.

    Parameters:
    - pose (np.ndarray): A 4x4 transformation matrix representing the pose.
    """
    self.__pose = pose

stop()

Stops the teleop server.

Source code in teleop_xr/__init__.py
def stop(self) -> None:
    """
    Stops the teleop server.
    """
    # FastAPI/uvicorn handles shutdown automatically
    pass

subscribe(callback)

Subscribe to receive updates from the teleop module.

Parameters:

Name Type Description Default
callback Callable[[ndarray, dict[str, Any]], None]

A callback function that will be called when pose updates are received. The callback function should take two arguments: - np.ndarray: A 4x4 transformation matrix representing the end-effector target pose. - dict[str, Any]: A dictionary containing additional information.

required
Source code in teleop_xr/__init__.py
def subscribe(self, callback: Callable[[np.ndarray, Any], None]) -> None:
    """
    Subscribe to receive updates from the teleop module.

    Parameters:
        callback (Callable[[np.ndarray, dict[str, Any]], None]): A callback function that will be called when pose updates are received.
            The callback function should take two arguments:
                - np.ndarray: A 4x4 transformation matrix representing the end-effector target pose.
                - dict[str, Any]: A dictionary containing additional information.
    """
    self.__callbacks.append(callback)

are_close(a, b=None, lin_tol=1e-09, ang_tol=1e-09)

Check if two transformation matrices are close to each other within specified tolerances.

Parameters:

Name Type Description Default
a ndarray

The first transformation matrix.

required
b ndarray

The second transformation matrix. If not provided, it defaults to the identity matrix.

None
lin_tol float

The linear tolerance for closeness. Defaults to 1e-9.

1e-09
ang_tol float

The angular tolerance for closeness. Defaults to 1e-9.

1e-09

Returns:

Name Type Description
bool bool

True if the matrices are close, False otherwise.

Source code in teleop_xr/__init__.py
def are_close(a, b=None, lin_tol=1e-9, ang_tol=1e-9) -> bool:
    """
    Check if two transformation matrices are close to each other within specified tolerances.

    Parameters:
        a (numpy.ndarray): The first transformation matrix.
        b (numpy.ndarray, optional): The second transformation matrix. If not provided, it defaults to the identity matrix.
        lin_tol (float, optional): The linear tolerance for closeness. Defaults to 1e-9.
        ang_tol (float, optional): The angular tolerance for closeness. Defaults to 1e-9.

    Returns:
        bool: True if the matrices are close, False otherwise.
    """
    if b is None:
        b = np.eye(4)
    d = np.linalg.inv(a) @ b
    if not np.allclose(d[:3, 3], np.zeros(3), atol=lin_tol):
        return False
    yaw = math.atan2(d[1, 0], d[0, 0])
    pitch = math.asin(-d[2, 0])
    roll = math.atan2(d[2, 1], d[2, 2])
    rpy = np.array([roll, pitch, yaw])
    return np.allclose(rpy, np.zeros(3), atol=ang_tol)

interpolate_transforms(T1, T2, alpha)

Interpolate between two 4x4 transformation matrices using SLERP + linear translation.

Parameters:

Name Type Description Default
T1 ndarray

Start transform (4x4)

required
T2 ndarray

End transform (4x4)

required
alpha float

Interpolation factor [0, 1]

required

Returns:

Type Description
ndarray

np.ndarray: Interpolated transform (4x4)

Source code in teleop_xr/__init__.py
def interpolate_transforms(T1, T2, alpha) -> np.ndarray:
    """
    Interpolate between two 4x4 transformation matrices using SLERP + linear translation.

    Args:
        T1 (np.ndarray): Start transform (4x4)
        T2 (np.ndarray): End transform (4x4)
        alpha (float): Interpolation factor [0, 1]

    Returns:
        np.ndarray: Interpolated transform (4x4)
    """
    assert T1.shape == (4, 4) and T2.shape == (4, 4)
    assert 0.0 <= alpha <= 1.0

    # Translation
    t1 = T1[:3, 3]
    t2 = T2[:3, 3]
    t_interp = (1 - alpha) * t1 + alpha * t2

    # Rotation
    R1 = T1[:3, :3]
    R2 = T2[:3, :3]
    q1 = t3d.quaternions.mat2quat(R1)
    q2 = t3d.quaternions.mat2quat(R2)

    # SLERP
    q_interp = slerp(q1, q2, alpha)
    R_interp = t3d.quaternions.quat2mat(q_interp)

    # Final transform
    T_interp = np.eye(4)
    T_interp[:3, :3] = R_interp
    T_interp[:3, 3] = t_interp

    return T_interp

slerp(q1, q2, t)

Spherical linear interpolation between two quaternions.

Source code in teleop_xr/__init__.py
def slerp(q1, q2, t) -> np.ndarray:
    """Spherical linear interpolation between two quaternions."""
    q1 = q1 / np.linalg.norm(q1)
    q2 = q2 / np.linalg.norm(q2)

    dot = np.dot(q1, q2)

    # If the dot product is negative, use the shortest path
    if dot < 0.0:
        q2 = -q2
        dot = -dot

    DOT_THRESHOLD = 0.9995
    if dot > DOT_THRESHOLD:
        # Linear interpolation fallback for nearly identical quaternions
        result = q1 + t * (q2 - q1)
        return result / np.linalg.norm(result)

    theta_0 = np.arccos(dot)
    theta = theta_0 * t

    q3 = q2 - q1 * dot
    q3 = q3 / np.linalg.norm(q3)

    return q1 * np.cos(theta) + q3 * np.sin(theta)

teleop_xr.config

teleop_xr.config

teleop_xr.messages

teleop_xr.messages

Inverse Kinematics (IK)

teleop_xr.ik.controller

teleop_xr.ik.controller

IKController

High-level controller for teleoperation using IK.

This class manages the transition between idle and active teleoperation, handles XR device snapshots for relative motion, and coordinates between the robot model, IK solver, and optional output filtering.

Source code in teleop_xr/ik/controller.py
class IKController:
    """
    High-level controller for teleoperation using IK.

    This class manages the transition between idle and active teleoperation,
    handles XR device snapshots for relative motion, and coordinates between
    the robot model, IK solver, and optional output filtering.
    """

    robot: BaseRobot
    solver: PyrokiSolver | None
    active: bool
    snapshot_xr: dict[str, jaxlie.SE3]
    snapshot_robot: dict[str, jaxlie.SE3]
    filter: WeightedMovingFilter | None
    _warned_unsupported: set[str]

    def __init__(
        self,
        robot: BaseRobot,
        solver: PyrokiSolver | None = None,
        filter_weights: np.ndarray | None = None,
    ):
        """
        Initialize the IK controller.

        Args:
            robot: The robot model.
            solver: The IK solver. If None, step() will return current_config.
            filter_weights: Optional weights for a WeightedMovingFilter on joint outputs.
        """
        self.robot = robot
        self.solver = solver
        self.active = False
        self._warned_unsupported = set()

        # Snapshots
        self.snapshot_xr = {}
        self.snapshot_robot = {}

        # Filter for joint configuration
        self.filter = None
        if filter_weights is not None:
            # We'll initialize the filter when we know the data size (from default config)
            default_config = self.robot.get_default_config()
            self.filter = WeightedMovingFilter(
                filter_weights, data_size=len(default_config)
            )

    def xr_pose_to_se3(self, pose: XRPose) -> jaxlie.SE3:
        """
        Convert an XRPose to a jaxlie SE3 object.

        Args:
            pose: The XR pose to convert.

        Returns:
            jaxlie.SE3: The converted pose.
        """
        translation = jnp.array(
            [pose.position["x"], pose.position["y"], pose.position["z"]]
        )
        rotation = jaxlie.SO3(
            wxyz=jnp.array(
                [
                    pose.orientation["w"],
                    pose.orientation["x"],
                    pose.orientation["y"],
                    pose.orientation["z"],
                ]
            )
        )
        return jaxlie.SE3.from_rotation_and_translation(rotation, translation)

    def compute_teleop_transform(
        self, t_ctrl_curr: jaxlie.SE3, t_ctrl_init: jaxlie.SE3, t_ee_init: jaxlie.SE3
    ) -> jaxlie.SE3:
        """
        Compute the target robot pose based on XR controller motion.

        Args:
            t_ctrl_curr: Current XR controller pose.
            t_ctrl_init: XR controller pose at the start of teleoperation.
            t_ee_init: Robot end-effector pose at the start of teleoperation.

        Returns:
            jaxlie.SE3: The calculated target pose for the robot end-effector.
        """
        t_delta_ros = t_ctrl_curr.translation() - t_ctrl_init.translation()
        t_delta_robot = self.robot.ros_to_base @ t_delta_ros

        q_delta_ros = t_ctrl_curr.rotation() @ t_ctrl_init.rotation().inverse()
        q_delta_robot = self.robot.ros_to_base @ q_delta_ros @ self.robot.base_to_ros

        t_new = t_ee_init.translation() + t_delta_robot
        q_new = q_delta_robot @ t_ee_init.rotation()

        return jaxlie.SE3.from_rotation_and_translation(q_new, t_new)

    def _get_device_poses(self, state: XRState) -> dict[str, jaxlie.SE3]:
        """
        Extract poses for relevant XR devices from the current state.
        """
        poses = {}
        supported = self.robot.supported_frames
        for device in state.devices:
            frame_name = None
            pose_data = None
            if device.role == XRDeviceRole.CONTROLLER:
                if device.handedness == XRHandedness.LEFT and device.gripPose:
                    frame_name = "left"
                    pose_data = device.gripPose
                elif device.handedness == XRHandedness.RIGHT and device.gripPose:
                    frame_name = "right"
                    pose_data = device.gripPose
            elif device.role == XRDeviceRole.HEAD and device.pose:
                frame_name = "head"
                pose_data = device.pose

            if frame_name and pose_data is not None:
                if frame_name in supported:
                    poses[frame_name] = self.xr_pose_to_se3(pose_data)
                elif frame_name not in self._warned_unsupported:
                    logger.warning(
                        f"[IKController] Warning: Frame '{frame_name}' is available in XRState but not supported by robot. Skipping."
                    )
                    self._warned_unsupported.add(frame_name)
        return poses

    def _check_deadman(self, state: XRState) -> bool:
        """
        Check if the deadman switch (usually trigger or grip) is engaged on both controllers.
        """
        left_squeezed = False
        right_squeezed = False
        for device in state.devices:
            if device.role == XRDeviceRole.CONTROLLER:
                is_squeezed = (
                    device.gamepad is not None
                    and len(device.gamepad.buttons) > 1
                    and device.gamepad.buttons[1].pressed
                )
                if device.handedness == XRHandedness.LEFT:
                    left_squeezed = is_squeezed
                elif device.handedness == XRHandedness.RIGHT:
                    right_squeezed = is_squeezed
        return left_squeezed and right_squeezed

    def reset(self) -> None:
        """
        Resets the controller state, forcing it to re-take snapshots on the next step.
        """
        self.active = False
        self.snapshot_xr = {}
        self.snapshot_robot = {}
        if self.filter is not None:
            self.filter.reset()
        logger.info("[IKController] Reset triggered")

    def step(self, state: XRState, q_current: np.ndarray) -> np.ndarray:
        """
        Execute one control step: update targets and solve for new joint configuration.

        Args:
            state: The current XR state from the headset.
            q_current: The current joint configuration of the robot.

        Returns:
            np.ndarray: The new (possibly filtered) joint configuration.
        """
        is_deadman_active = self._check_deadman(state)
        curr_xr_poses = self._get_device_poses(state)

        # Check if we have all necessary poses
        required_keys = self.robot.supported_frames
        has_all_poses = all(k in curr_xr_poses for k in required_keys)

        if is_deadman_active and has_all_poses:
            if not self.active:
                # Engagement transition: take snapshots
                self.active = True
                self.snapshot_xr = curr_xr_poses

                # Get initial robot FK poses
                # Cast q_current to jnp.ndarray for JAX-based robot models
                fk_poses = self.robot.forward_kinematics(jnp.asarray(q_current))
                self.snapshot_robot = {k: fk_poses[k] for k in required_keys}

                logger.info(f"[IKController] Initial Robot FK: {self.snapshot_robot}")
                return q_current

            # Active control
            target_L: jaxlie.SE3 | None = None
            target_R: jaxlie.SE3 | None = None
            target_Head: jaxlie.SE3 | None = None

            if "left" in required_keys:
                target_L = self.compute_teleop_transform(
                    curr_xr_poses["left"],
                    self.snapshot_xr["left"],
                    self.snapshot_robot["left"],
                )
            if "right" in required_keys:
                target_R = self.compute_teleop_transform(
                    curr_xr_poses["right"],
                    self.snapshot_xr["right"],
                    self.snapshot_robot["right"],
                )
            if "head" in required_keys:
                target_Head = self.compute_teleop_transform(
                    curr_xr_poses["head"],
                    self.snapshot_xr["head"],
                    self.snapshot_robot["head"],
                )

            if self.solver is not None:
                # Solve for new configuration using target poses and current config
                # Cast inputs to JAX arrays and output back to numpy
                new_config_jax = self.solver.solve(
                    target_L,
                    target_R,
                    target_Head,
                    jnp.asarray(q_current),
                )
                new_config = np.array(new_config_jax)

                if self.filter is not None:
                    self.filter.add_data(new_config)
                    if self.filter.data_ready():
                        return self.filter.filtered_data

                logger.debug(f"[IKController] New Config: {new_config}")
                return new_config

            return q_current
        else:
            if self.active:
                # Disengagement transition
                self.active = False
                if self.filter is not None:
                    self.filter.reset()
            return q_current

__init__(robot, solver=None, filter_weights=None)

Initialize the IK controller.

Parameters:

Name Type Description Default
robot BaseRobot

The robot model.

required
solver PyrokiSolver | None

The IK solver. If None, step() will return current_config.

None
filter_weights ndarray | None

Optional weights for a WeightedMovingFilter on joint outputs.

None
Source code in teleop_xr/ik/controller.py
def __init__(
    self,
    robot: BaseRobot,
    solver: PyrokiSolver | None = None,
    filter_weights: np.ndarray | None = None,
):
    """
    Initialize the IK controller.

    Args:
        robot: The robot model.
        solver: The IK solver. If None, step() will return current_config.
        filter_weights: Optional weights for a WeightedMovingFilter on joint outputs.
    """
    self.robot = robot
    self.solver = solver
    self.active = False
    self._warned_unsupported = set()

    # Snapshots
    self.snapshot_xr = {}
    self.snapshot_robot = {}

    # Filter for joint configuration
    self.filter = None
    if filter_weights is not None:
        # We'll initialize the filter when we know the data size (from default config)
        default_config = self.robot.get_default_config()
        self.filter = WeightedMovingFilter(
            filter_weights, data_size=len(default_config)
        )

compute_teleop_transform(t_ctrl_curr, t_ctrl_init, t_ee_init)

Compute the target robot pose based on XR controller motion.

Parameters:

Name Type Description Default
t_ctrl_curr SE3

Current XR controller pose.

required
t_ctrl_init SE3

XR controller pose at the start of teleoperation.

required
t_ee_init SE3

Robot end-effector pose at the start of teleoperation.

required

Returns:

Type Description
SE3

jaxlie.SE3: The calculated target pose for the robot end-effector.

Source code in teleop_xr/ik/controller.py
def compute_teleop_transform(
    self, t_ctrl_curr: jaxlie.SE3, t_ctrl_init: jaxlie.SE3, t_ee_init: jaxlie.SE3
) -> jaxlie.SE3:
    """
    Compute the target robot pose based on XR controller motion.

    Args:
        t_ctrl_curr: Current XR controller pose.
        t_ctrl_init: XR controller pose at the start of teleoperation.
        t_ee_init: Robot end-effector pose at the start of teleoperation.

    Returns:
        jaxlie.SE3: The calculated target pose for the robot end-effector.
    """
    t_delta_ros = t_ctrl_curr.translation() - t_ctrl_init.translation()
    t_delta_robot = self.robot.ros_to_base @ t_delta_ros

    q_delta_ros = t_ctrl_curr.rotation() @ t_ctrl_init.rotation().inverse()
    q_delta_robot = self.robot.ros_to_base @ q_delta_ros @ self.robot.base_to_ros

    t_new = t_ee_init.translation() + t_delta_robot
    q_new = q_delta_robot @ t_ee_init.rotation()

    return jaxlie.SE3.from_rotation_and_translation(q_new, t_new)

reset()

Resets the controller state, forcing it to re-take snapshots on the next step.

Source code in teleop_xr/ik/controller.py
def reset(self) -> None:
    """
    Resets the controller state, forcing it to re-take snapshots on the next step.
    """
    self.active = False
    self.snapshot_xr = {}
    self.snapshot_robot = {}
    if self.filter is not None:
        self.filter.reset()
    logger.info("[IKController] Reset triggered")

step(state, q_current)

Execute one control step: update targets and solve for new joint configuration.

Parameters:

Name Type Description Default
state XRState

The current XR state from the headset.

required
q_current ndarray

The current joint configuration of the robot.

required

Returns:

Type Description
ndarray

np.ndarray: The new (possibly filtered) joint configuration.

Source code in teleop_xr/ik/controller.py
def step(self, state: XRState, q_current: np.ndarray) -> np.ndarray:
    """
    Execute one control step: update targets and solve for new joint configuration.

    Args:
        state: The current XR state from the headset.
        q_current: The current joint configuration of the robot.

    Returns:
        np.ndarray: The new (possibly filtered) joint configuration.
    """
    is_deadman_active = self._check_deadman(state)
    curr_xr_poses = self._get_device_poses(state)

    # Check if we have all necessary poses
    required_keys = self.robot.supported_frames
    has_all_poses = all(k in curr_xr_poses for k in required_keys)

    if is_deadman_active and has_all_poses:
        if not self.active:
            # Engagement transition: take snapshots
            self.active = True
            self.snapshot_xr = curr_xr_poses

            # Get initial robot FK poses
            # Cast q_current to jnp.ndarray for JAX-based robot models
            fk_poses = self.robot.forward_kinematics(jnp.asarray(q_current))
            self.snapshot_robot = {k: fk_poses[k] for k in required_keys}

            logger.info(f"[IKController] Initial Robot FK: {self.snapshot_robot}")
            return q_current

        # Active control
        target_L: jaxlie.SE3 | None = None
        target_R: jaxlie.SE3 | None = None
        target_Head: jaxlie.SE3 | None = None

        if "left" in required_keys:
            target_L = self.compute_teleop_transform(
                curr_xr_poses["left"],
                self.snapshot_xr["left"],
                self.snapshot_robot["left"],
            )
        if "right" in required_keys:
            target_R = self.compute_teleop_transform(
                curr_xr_poses["right"],
                self.snapshot_xr["right"],
                self.snapshot_robot["right"],
            )
        if "head" in required_keys:
            target_Head = self.compute_teleop_transform(
                curr_xr_poses["head"],
                self.snapshot_xr["head"],
                self.snapshot_robot["head"],
            )

        if self.solver is not None:
            # Solve for new configuration using target poses and current config
            # Cast inputs to JAX arrays and output back to numpy
            new_config_jax = self.solver.solve(
                target_L,
                target_R,
                target_Head,
                jnp.asarray(q_current),
            )
            new_config = np.array(new_config_jax)

            if self.filter is not None:
                self.filter.add_data(new_config)
                if self.filter.data_ready():
                    return self.filter.filtered_data

            logger.debug(f"[IKController] New Config: {new_config}")
            return new_config

        return q_current
    else:
        if self.active:
            # Disengagement transition
            self.active = False
            if self.filter is not None:
                self.filter.reset()
        return q_current

xr_pose_to_se3(pose)

Convert an XRPose to a jaxlie SE3 object.

Parameters:

Name Type Description Default
pose XRPose

The XR pose to convert.

required

Returns:

Type Description
SE3

jaxlie.SE3: The converted pose.

Source code in teleop_xr/ik/controller.py
def xr_pose_to_se3(self, pose: XRPose) -> jaxlie.SE3:
    """
    Convert an XRPose to a jaxlie SE3 object.

    Args:
        pose: The XR pose to convert.

    Returns:
        jaxlie.SE3: The converted pose.
    """
    translation = jnp.array(
        [pose.position["x"], pose.position["y"], pose.position["z"]]
    )
    rotation = jaxlie.SO3(
        wxyz=jnp.array(
            [
                pose.orientation["w"],
                pose.orientation["x"],
                pose.orientation["y"],
                pose.orientation["z"],
            ]
        )
    )
    return jaxlie.SE3.from_rotation_and_translation(rotation, translation)

teleop_xr.ik.solver

teleop_xr.ik.solver

PyrokiSolver

Inverse Kinematics (IK) solver using Pyroki and jaxls.

This solver uses optimization (Least Squares) to find joint configurations that satisfy target poses for the robot's end-effectors and head. It leverages JAX for high-performance, JIT-compiled solving.

Source code in teleop_xr/ik/solver.py
class PyrokiSolver:
    """
    Inverse Kinematics (IK) solver using Pyroki and jaxls.

    This solver uses optimization (Least Squares) to find joint configurations
    that satisfy target poses for the robot's end-effectors and head.
    It leverages JAX for high-performance, JIT-compiled solving.
    """

    robot: BaseRobot
    _jit_solve: Callable[
        [jaxlie.SE3 | None, jaxlie.SE3 | None, jaxlie.SE3 | None, jnp.ndarray],
        jnp.ndarray,
    ]

    def __init__(self, robot: BaseRobot):
        """
        Initialize the solver with a robot model.

        Args:
            robot: The robot model providing kinematic info and costs.
        """
        self.robot = robot

        # JIT compile the solve function
        self._jit_solve = jax.jit(self._solve_internal)

        # Warmup to trigger JIT compilation
        self._warmup()

    def _solve_internal(
        self,
        target_L: jaxlie.SE3 | None,
        target_R: jaxlie.SE3 | None,
        target_Head: jaxlie.SE3 | None,
        q_current: jnp.ndarray,
    ) -> jnp.ndarray:
        """
        Internal solve function that will be JIT-compiled.

        Args:
            target_L: Target pose for the left end-effector.
            target_R: Target pose for the right end-effector.
            target_Head: Target pose for the head.
            q_current: Current joint configuration (initial guess).

        Returns:
            jnp.ndarray: Optimized joint configuration.
        """
        # 1. Build costs from the robot
        costs = self.robot.build_costs(target_L, target_R, target_Head, q_current)

        # 2. Get the joint variable (assuming single timestep index 0)
        # The robot is expected to have joint_var_cls.
        # We use a single timestep for standard IK.
        var_joints = self.robot.joint_var_cls(jnp.array([0]))

        # 3. Construct initial values
        # q_current is expected to be (num_joints,)
        # Variable values usually expect (timesteps, joint_dims)
        initial_vals = jaxls.VarValues.make(
            [var_joints.with_value(q_current[jnp.newaxis, :])]
        )

        # 4. Construct and solve the LeastSquaresProblem
        # We optimize over joint variables.
        # Note: If the robot defines more variables in costs, they should be added here.
        # For a generic solver, we assume joint variables are the primary optimization targets.
        problem = jaxls.LeastSquaresProblem(costs, [var_joints])

        # solve() returns a VarValues object containing the solution
        solution = problem.analyze().solve(
            initial_vals=initial_vals,
            verbose=False,
            linear_solver="dense_cholesky",
            termination=jaxls.TerminationConfig(max_iterations=15),
        )

        # Return the optimized joint configuration for the first (and only) timestep
        return solution[var_joints][0]

    def solve(
        self,
        target_L: jaxlie.SE3 | None,
        target_R: jaxlie.SE3 | None,
        target_Head: jaxlie.SE3 | None,
        q_current: jnp.ndarray,
    ) -> jnp.ndarray:
        """
        Solve the IK problem for the given targets and current configuration.

        Args:
            target_L: Target pose for the left end-effector.
            target_R: Target pose for the right end-effector.
            target_Head: Target pose for the head.
            q_current: Current joint configuration (initial guess).

        Returns:
            jnp.ndarray: Optimized joint configuration.
        """
        return self._jit_solve(target_L, target_R, target_Head, q_current)

    def _warmup(self) -> None:
        """
        Triggers JIT compilation by running a solve with dummy data.
        """
        try:
            q_dummy = self.robot.get_default_config()
            target_dummy = jaxlie.SE3.identity()

            self.solve(target_dummy, target_dummy, target_dummy, q_dummy)

            self.solve(target_dummy, None, None, q_dummy)
            self.solve(None, target_dummy, None, q_dummy)
            self.solve(None, None, target_dummy, q_dummy)
        except Exception:
            # Warmup might fail if robot is a mock or not fully implemented yet.
            # We don't want to crash during initialization in that case.
            pass

__init__(robot)

Initialize the solver with a robot model.

Parameters:

Name Type Description Default
robot BaseRobot

The robot model providing kinematic info and costs.

required
Source code in teleop_xr/ik/solver.py
def __init__(self, robot: BaseRobot):
    """
    Initialize the solver with a robot model.

    Args:
        robot: The robot model providing kinematic info and costs.
    """
    self.robot = robot

    # JIT compile the solve function
    self._jit_solve = jax.jit(self._solve_internal)

    # Warmup to trigger JIT compilation
    self._warmup()

solve(target_L, target_R, target_Head, q_current)

Solve the IK problem for the given targets and current configuration.

Parameters:

Name Type Description Default
target_L SE3 | None

Target pose for the left end-effector.

required
target_R SE3 | None

Target pose for the right end-effector.

required
target_Head SE3 | None

Target pose for the head.

required
q_current ndarray

Current joint configuration (initial guess).

required

Returns:

Type Description
ndarray

jnp.ndarray: Optimized joint configuration.

Source code in teleop_xr/ik/solver.py
def solve(
    self,
    target_L: jaxlie.SE3 | None,
    target_R: jaxlie.SE3 | None,
    target_Head: jaxlie.SE3 | None,
    q_current: jnp.ndarray,
) -> jnp.ndarray:
    """
    Solve the IK problem for the given targets and current configuration.

    Args:
        target_L: Target pose for the left end-effector.
        target_R: Target pose for the right end-effector.
        target_Head: Target pose for the head.
        q_current: Current joint configuration (initial guess).

    Returns:
        jnp.ndarray: Optimized joint configuration.
    """
    return self._jit_solve(target_L, target_R, target_Head, q_current)

teleop_xr.ik.robot

teleop_xr.ik.robot

BaseRobot

Bases: ABC

Abstract base class for robot models used in IK optimization.

This class defines the interface required by the IK solver and controller to compute kinematics and optimization costs.

Source code in teleop_xr/ik/robot.py
class BaseRobot(ABC):
    """
    Abstract base class for robot models used in IK optimization.

    This class defines the interface required by the IK solver and controller
    to compute kinematics and optimization costs.
    """

    @property
    def orientation(self) -> jaxlie.SO3:
        """
        Rotation from the robot's base frame to the canonical ROS2 frame (X-forward, Z-up).
        """
        return jaxlie.SO3.identity()

    @property
    def base_to_ros(self) -> jaxlie.SO3:
        """Rotation from the robot base frame to the canonical ROS2 frame."""
        return self.orientation

    @property
    def ros_to_base(self) -> jaxlie.SO3:
        """Rotation from the canonical ROS2 frame to the robot base frame."""
        return self.orientation.inverse()

    @abstractmethod
    def get_vis_config(self) -> RobotVisConfig | None:
        """
        Get the visualization configuration for this robot.

        Returns:
            RobotVisConfig | None: Configuration for rendering the robot, or None if not supported.

        Note:
            Subclasses should use `self.orientation` to populate
            `RobotVisConfig.initial_rotation_euler`.
        """
        pass  # pragma: no cover

    @property
    @abstractmethod
    def actuated_joint_names(self) -> list[str]:
        """
        Get the names of the actuated joints.

        Returns:
            list[str]: A list of joint names.
        """
        pass  # pragma: no cover

    @property
    @abstractmethod
    def joint_var_cls(self) -> Any:
        """
        The jaxls.Var class used for joint configurations.

        Returns:
            Type[jaxls.Var]: The variable class to use for optimization.
        """
        pass  # pragma: no cover

    @property
    def supported_frames(self) -> set[str]:
        """
        Get the set of supported frames for this robot.

        Returns:
            set[str]: A set of frame names (e.g., {"left", "right", "head"}).
        """
        return {"left", "right", "head"}

    @property
    def default_speed_ratio(self) -> float:
        """
        Get the default teleop speed ratio for this robot.

        Returns:
            float: The default speed ratio (e.g., 1.0 for 100%, 1.2 for 120%).
        """
        return 1.0

    @abstractmethod
    def forward_kinematics(self, config: jnp.ndarray) -> dict[str, jaxlie.SE3]:
        """
        Compute the forward kinematics for the given configuration.

        Args:
            config: The robot configuration (e.g., joint angles) as a JAX array.

        Returns:
            dict[str, jaxlie.SE3]: A dictionary mapping link names (e.g., "left", "right", "head")
                                  to their respective SE3 poses.
        """
        pass  # pragma: no cover

    @abstractmethod
    def get_default_config(self) -> jnp.ndarray:
        """
        Get the default configuration for the robot.

        Returns:
            jnp.ndarray: The default configuration as a JAX array.
        """
        pass  # pragma: no cover

    @abstractmethod
    def build_costs(
        self,
        target_L: jaxlie.SE3 | None,
        target_R: jaxlie.SE3 | None,
        target_Head: jaxlie.SE3 | None,
        q_current: jnp.ndarray | None = None,
    ) -> list[Cost]:
        """
        Build a list of costs for the robot-specific formulation.

        Args:
            target_L: Target pose for the left end-effector.
            target_R: Target pose for the right end-effector.
            target_Head: Target pose for the head.
            q_current: Current joint configuration (initial guess).

        Returns:
            list[Cost]: A list of jaxls Cost objects representing the optimization objectives.
        """
        pass  # pragma: no cover

actuated_joint_names abstractmethod property

Get the names of the actuated joints.

Returns:

Type Description
list[str]

list[str]: A list of joint names.

base_to_ros property

Rotation from the robot base frame to the canonical ROS2 frame.

default_speed_ratio property

Get the default teleop speed ratio for this robot.

Returns:

Name Type Description
float float

The default speed ratio (e.g., 1.0 for 100%, 1.2 for 120%).

joint_var_cls abstractmethod property

The jaxls.Var class used for joint configurations.

Returns:

Type Description
Any

Type[jaxls.Var]: The variable class to use for optimization.

orientation property

Rotation from the robot's base frame to the canonical ROS2 frame (X-forward, Z-up).

ros_to_base property

Rotation from the canonical ROS2 frame to the robot base frame.

supported_frames property

Get the set of supported frames for this robot.

Returns:

Type Description
set[str]

set[str]: A set of frame names (e.g., {"left", "right", "head"}).

build_costs(target_L, target_R, target_Head, q_current=None) abstractmethod

Build a list of costs for the robot-specific formulation.

Parameters:

Name Type Description Default
target_L SE3 | None

Target pose for the left end-effector.

required
target_R SE3 | None

Target pose for the right end-effector.

required
target_Head SE3 | None

Target pose for the head.

required
q_current ndarray | None

Current joint configuration (initial guess).

None

Returns:

Type Description
list[Cost]

list[Cost]: A list of jaxls Cost objects representing the optimization objectives.

Source code in teleop_xr/ik/robot.py
@abstractmethod
def build_costs(
    self,
    target_L: jaxlie.SE3 | None,
    target_R: jaxlie.SE3 | None,
    target_Head: jaxlie.SE3 | None,
    q_current: jnp.ndarray | None = None,
) -> list[Cost]:
    """
    Build a list of costs for the robot-specific formulation.

    Args:
        target_L: Target pose for the left end-effector.
        target_R: Target pose for the right end-effector.
        target_Head: Target pose for the head.
        q_current: Current joint configuration (initial guess).

    Returns:
        list[Cost]: A list of jaxls Cost objects representing the optimization objectives.
    """
    pass  # pragma: no cover

forward_kinematics(config) abstractmethod

Compute the forward kinematics for the given configuration.

Parameters:

Name Type Description Default
config ndarray

The robot configuration (e.g., joint angles) as a JAX array.

required

Returns:

Type Description
dict[str, SE3]

dict[str, jaxlie.SE3]: A dictionary mapping link names (e.g., "left", "right", "head") to their respective SE3 poses.

Source code in teleop_xr/ik/robot.py
@abstractmethod
def forward_kinematics(self, config: jnp.ndarray) -> dict[str, jaxlie.SE3]:
    """
    Compute the forward kinematics for the given configuration.

    Args:
        config: The robot configuration (e.g., joint angles) as a JAX array.

    Returns:
        dict[str, jaxlie.SE3]: A dictionary mapping link names (e.g., "left", "right", "head")
                              to their respective SE3 poses.
    """
    pass  # pragma: no cover

get_default_config() abstractmethod

Get the default configuration for the robot.

Returns:

Type Description
ndarray

jnp.ndarray: The default configuration as a JAX array.

Source code in teleop_xr/ik/robot.py
@abstractmethod
def get_default_config(self) -> jnp.ndarray:
    """
    Get the default configuration for the robot.

    Returns:
        jnp.ndarray: The default configuration as a JAX array.
    """
    pass  # pragma: no cover

get_vis_config() abstractmethod

Get the visualization configuration for this robot.

Returns:

Type Description
RobotVisConfig | None

RobotVisConfig | None: Configuration for rendering the robot, or None if not supported.

Note

Subclasses should use self.orientation to populate RobotVisConfig.initial_rotation_euler.

Source code in teleop_xr/ik/robot.py
@abstractmethod
def get_vis_config(self) -> RobotVisConfig | None:
    """
    Get the visualization configuration for this robot.

    Returns:
        RobotVisConfig | None: Configuration for rendering the robot, or None if not supported.

    Note:
        Subclasses should use `self.orientation` to populate
        `RobotVisConfig.initial_rotation_euler`.
    """
    pass  # pragma: no cover

teleop_xr.ik.robots.h1_2

teleop_xr.ik.robots.h1_2

UnitreeH1Robot

Bases: BaseRobot

Unitree H1_2 robot implementation for IK.

Source code in teleop_xr/ik/robots/h1_2.py
class UnitreeH1Robot(BaseRobot):
    """
    Unitree H1_2 robot implementation for IK.
    """

    def __init__(self) -> None:
        # Load URDF from external repository via RAM
        self.urdf_path = str(
            ram.get_resource(
                repo_url="https://github.com/unitreerobotics/xr_teleoperate.git",
                path_inside_repo="assets/h1_2/h1_2.urdf",
            )
        )
        self.mesh_path = os.path.dirname(self.urdf_path)
        urdf = yourdfpy.URDF.load(self.urdf_path)

        # Identify leg joints names to freeze
        self.leg_joint_names = [
            "left_hip_yaw_joint",
            "left_hip_pitch_joint",
            "left_hip_roll_joint",
            "left_knee_joint",
            "left_ankle_pitch_joint",
            "left_ankle_roll_joint",
            "right_hip_yaw_joint",
            "right_hip_pitch_joint",
            "right_hip_roll_joint",
            "right_knee_joint",
            "right_ankle_pitch_joint",
            "right_ankle_roll_joint",
        ]

        for joint_name in self.leg_joint_names:
            if joint_name in urdf.joint_map:
                urdf.joint_map[joint_name].type = "fixed"

        self.robot = pk.Robot.from_urdf(urdf)
        self.robot_coll = pk.collision.RobotCollision.from_urdf(urdf)

        # End effector and torso link indices
        # We use hand base links as end effectors (L_ee, R_ee frames)
        self.L_ee = "L_hand_base_link"
        self.R_ee = "R_hand_base_link"
        self.L_ee_link_idx = self.robot.links.names.index(self.L_ee)
        self.R_ee_link_idx = self.robot.links.names.index(self.R_ee)
        self.torso_link_idx = self.robot.links.names.index("torso_link")

    @property
    @override
    def orientation(self) -> jaxlie.SO3:
        return jaxlie.SO3.identity()

    @override
    def get_vis_config(self) -> RobotVisConfig | None:
        if not self.urdf_path:
            return None
        return RobotVisConfig(
            urdf_path=self.urdf_path,
            mesh_path=self.mesh_path,
            model_scale=0.5,
            initial_rotation_euler=[
                float(x) for x in self.orientation.as_rpy_radians()
            ],
        )

    @property
    def joint_var_cls(self) -> Any:
        """
        The jaxls.Var class used for joint configurations.
        """
        return self.robot.joint_var_cls

    @property
    def actuated_joint_names(self) -> list[str]:
        return list(self.robot.joints.actuated_names)

    @property
    def default_speed_ratio(self) -> float:
        # Unitree H1 often benefits from slightly amplified motion mapping
        return 1.2

    def forward_kinematics(self, config: jax.Array) -> dict[str, jaxlie.SE3]:
        """
        Compute the forward kinematics for the given configuration.
        """
        fk = self.robot.forward_kinematics(config)
        return {
            "left": jaxlie.SE3(fk[self.L_ee_link_idx]),
            "right": jaxlie.SE3(fk[self.R_ee_link_idx]),
            "head": jaxlie.SE3(fk[self.torso_link_idx]),
        }

    def get_default_config(self) -> jax.Array:
        return jnp.zeros_like(self.robot.joints.lower_limits)

    def build_costs(
        self,
        target_L: jaxlie.SE3 | None,
        target_R: jaxlie.SE3 | None,
        target_Head: jaxlie.SE3 | None,
        q_current: jnp.ndarray | None = None,
    ) -> list[Cost]:
        """
        Build a list of Pyroki cost objects.
        """
        costs = []
        JointVar = self.robot.joint_var_cls

        if q_current is not None:
            costs.append(
                pk.costs.rest_cost(
                    JointVar(0),
                    rest_pose=q_current,
                    weight=5.0,
                )
            )

        costs.append(
            pk.costs.manipulability_cost(
                self.robot,
                JointVar(0),
                jnp.array([self.L_ee_link_idx, self.R_ee_link_idx], dtype=jnp.int32),
                weight=0.01,
            )
        )

        # 1. Bimanual costs (L/R EE frames: L_ee, R_ee)
        # Using analytic jacobian for efficiency
        if target_L is not None:
            costs.append(
                pk.costs.pose_cost_analytic_jac(
                    self.robot,
                    JointVar(0),
                    target_L,
                    jnp.array(self.L_ee_link_idx, dtype=jnp.int32),
                    pos_weight=50.0,
                    ori_weight=10.0,
                )
            )

        if target_R is not None:
            costs.append(
                pk.costs.pose_cost_analytic_jac(
                    self.robot,
                    JointVar(0),
                    target_R,
                    jnp.array(self.R_ee_link_idx, dtype=jnp.int32),
                    pos_weight=50.0,
                    ori_weight=10.0,
                )
            )

        costs.append(
            pk.costs.limit_cost(  # pyright: ignore[reportCallIssue]
                self.robot, JointVar(0), weight=100.0
            )
        )

        if target_Head is not None:
            costs.append(
                pk.costs.pose_cost(  # pyright: ignore[reportCallIssue]
                    robot=self.robot,
                    joint_var=JointVar(0),
                    target_pose=target_Head,
                    target_link_index=jnp.array(self.torso_link_idx, dtype=jnp.int32),
                    pos_weight=0.0,
                    ori_weight=jnp.array([0.0, 0.0, 20.0]),
                )
            )

        return costs

joint_var_cls property

The jaxls.Var class used for joint configurations.

build_costs(target_L, target_R, target_Head, q_current=None)

Build a list of Pyroki cost objects.

Source code in teleop_xr/ik/robots/h1_2.py
def build_costs(
    self,
    target_L: jaxlie.SE3 | None,
    target_R: jaxlie.SE3 | None,
    target_Head: jaxlie.SE3 | None,
    q_current: jnp.ndarray | None = None,
) -> list[Cost]:
    """
    Build a list of Pyroki cost objects.
    """
    costs = []
    JointVar = self.robot.joint_var_cls

    if q_current is not None:
        costs.append(
            pk.costs.rest_cost(
                JointVar(0),
                rest_pose=q_current,
                weight=5.0,
            )
        )

    costs.append(
        pk.costs.manipulability_cost(
            self.robot,
            JointVar(0),
            jnp.array([self.L_ee_link_idx, self.R_ee_link_idx], dtype=jnp.int32),
            weight=0.01,
        )
    )

    # 1. Bimanual costs (L/R EE frames: L_ee, R_ee)
    # Using analytic jacobian for efficiency
    if target_L is not None:
        costs.append(
            pk.costs.pose_cost_analytic_jac(
                self.robot,
                JointVar(0),
                target_L,
                jnp.array(self.L_ee_link_idx, dtype=jnp.int32),
                pos_weight=50.0,
                ori_weight=10.0,
            )
        )

    if target_R is not None:
        costs.append(
            pk.costs.pose_cost_analytic_jac(
                self.robot,
                JointVar(0),
                target_R,
                jnp.array(self.R_ee_link_idx, dtype=jnp.int32),
                pos_weight=50.0,
                ori_weight=10.0,
            )
        )

    costs.append(
        pk.costs.limit_cost(  # pyright: ignore[reportCallIssue]
            self.robot, JointVar(0), weight=100.0
        )
    )

    if target_Head is not None:
        costs.append(
            pk.costs.pose_cost(  # pyright: ignore[reportCallIssue]
                robot=self.robot,
                joint_var=JointVar(0),
                target_pose=target_Head,
                target_link_index=jnp.array(self.torso_link_idx, dtype=jnp.int32),
                pos_weight=0.0,
                ori_weight=jnp.array([0.0, 0.0, 20.0]),
            )
        )

    return costs

forward_kinematics(config)

Compute the forward kinematics for the given configuration.

Source code in teleop_xr/ik/robots/h1_2.py
def forward_kinematics(self, config: jax.Array) -> dict[str, jaxlie.SE3]:
    """
    Compute the forward kinematics for the given configuration.
    """
    fk = self.robot.forward_kinematics(config)
    return {
        "left": jaxlie.SE3(fk[self.L_ee_link_idx]),
        "right": jaxlie.SE3(fk[self.R_ee_link_idx]),
        "head": jaxlie.SE3(fk[self.torso_link_idx]),
    }

Events & Logic

teleop_xr.events

teleop_xr.events

XR event system types and utilities.

This module provides enums and dataclasses for XR button events, including button identification, controller identification, and event data.

ButtonDetector

Detects complex button events (double-press, long-press) from raw button states.

Source code in teleop_xr/events.py
class ButtonDetector:
    """Detects complex button events (double-press, long-press) from raw button states."""

    _settings: EventSettings
    _states: dict[tuple[XRButton, XRController], ButtonState]

    def __init__(self, settings: EventSettings):
        """Initialize the ButtonDetector.

        Args:
            settings: Configuration settings for event detection.
        """
        self._settings = settings
        self._states = {}

    def update(
        self,
        button: XRButton,
        controller: XRController,
        is_pressed: bool,
        timestamp_ms: float,
    ) -> list[ButtonEvent]:
        """Update the state of a button and return any detected events.

        Args:
            button: The button being updated.
            controller: The controller the button belongs to.
            is_pressed: The current raw pressed state of the button.
            timestamp_ms: The timestamp of the update in milliseconds.

        Returns:
            A list of ButtonEvent objects detected during this update.
        """
        key = (button, controller)
        if key not in self._states:
            self._states[key] = ButtonState()

        state = self._states[key]
        events: list[ButtonEvent] = []

        if is_pressed and not state.is_pressed:
            state.is_pressed = True
            state.press_time_ms = timestamp_ms
            state.long_press_fired = False
            state.double_press_fired = False

            events.append(
                ButtonEvent(
                    type=ButtonEventType.BUTTON_DOWN,
                    button=button,
                    controller=controller,
                    timestamp_ms=timestamp_ms,
                )
            )

            time_since_last_release = timestamp_ms - state.last_release_time_ms
            if (
                state.last_release_time_ms > 0
                and time_since_last_release < self._settings.double_press_threshold_ms
            ):
                state.double_press_fired = True
                events.append(
                    ButtonEvent(
                        type=ButtonEventType.DOUBLE_PRESS,
                        button=button,
                        controller=controller,
                        timestamp_ms=timestamp_ms,
                    )
                )

        elif not is_pressed and state.is_pressed:
            state.is_pressed = False
            hold_duration = timestamp_ms - state.press_time_ms

            if state.double_press_fired:
                state.last_release_time_ms = 0.0
            else:
                state.last_release_time_ms = timestamp_ms

            events.append(
                ButtonEvent(
                    type=ButtonEventType.BUTTON_UP,
                    button=button,
                    controller=controller,
                    timestamp_ms=timestamp_ms,
                    hold_duration_ms=hold_duration,
                )
            )

        elif is_pressed and state.is_pressed and not state.long_press_fired:
            hold_duration = timestamp_ms - state.press_time_ms
            if hold_duration >= self._settings.long_press_threshold_ms:
                state.long_press_fired = True
                events.append(
                    ButtonEvent(
                        type=ButtonEventType.LONG_PRESS,
                        button=button,
                        controller=controller,
                        timestamp_ms=timestamp_ms,
                    )
                )

        return events

__init__(settings)

Initialize the ButtonDetector.

Parameters:

Name Type Description Default
settings EventSettings

Configuration settings for event detection.

required
Source code in teleop_xr/events.py
def __init__(self, settings: EventSettings):
    """Initialize the ButtonDetector.

    Args:
        settings: Configuration settings for event detection.
    """
    self._settings = settings
    self._states = {}

update(button, controller, is_pressed, timestamp_ms)

Update the state of a button and return any detected events.

Parameters:

Name Type Description Default
button XRButton

The button being updated.

required
controller XRController

The controller the button belongs to.

required
is_pressed bool

The current raw pressed state of the button.

required
timestamp_ms float

The timestamp of the update in milliseconds.

required

Returns:

Type Description
list[ButtonEvent]

A list of ButtonEvent objects detected during this update.

Source code in teleop_xr/events.py
def update(
    self,
    button: XRButton,
    controller: XRController,
    is_pressed: bool,
    timestamp_ms: float,
) -> list[ButtonEvent]:
    """Update the state of a button and return any detected events.

    Args:
        button: The button being updated.
        controller: The controller the button belongs to.
        is_pressed: The current raw pressed state of the button.
        timestamp_ms: The timestamp of the update in milliseconds.

    Returns:
        A list of ButtonEvent objects detected during this update.
    """
    key = (button, controller)
    if key not in self._states:
        self._states[key] = ButtonState()

    state = self._states[key]
    events: list[ButtonEvent] = []

    if is_pressed and not state.is_pressed:
        state.is_pressed = True
        state.press_time_ms = timestamp_ms
        state.long_press_fired = False
        state.double_press_fired = False

        events.append(
            ButtonEvent(
                type=ButtonEventType.BUTTON_DOWN,
                button=button,
                controller=controller,
                timestamp_ms=timestamp_ms,
            )
        )

        time_since_last_release = timestamp_ms - state.last_release_time_ms
        if (
            state.last_release_time_ms > 0
            and time_since_last_release < self._settings.double_press_threshold_ms
        ):
            state.double_press_fired = True
            events.append(
                ButtonEvent(
                    type=ButtonEventType.DOUBLE_PRESS,
                    button=button,
                    controller=controller,
                    timestamp_ms=timestamp_ms,
                )
            )

    elif not is_pressed and state.is_pressed:
        state.is_pressed = False
        hold_duration = timestamp_ms - state.press_time_ms

        if state.double_press_fired:
            state.last_release_time_ms = 0.0
        else:
            state.last_release_time_ms = timestamp_ms

        events.append(
            ButtonEvent(
                type=ButtonEventType.BUTTON_UP,
                button=button,
                controller=controller,
                timestamp_ms=timestamp_ms,
                hold_duration_ms=hold_duration,
            )
        )

    elif is_pressed and state.is_pressed and not state.long_press_fired:
        hold_duration = timestamp_ms - state.press_time_ms
        if hold_duration >= self._settings.long_press_threshold_ms:
            state.long_press_fired = True
            events.append(
                ButtonEvent(
                    type=ButtonEventType.LONG_PRESS,
                    button=button,
                    controller=controller,
                    timestamp_ms=timestamp_ms,
                )
            )

    return events

ButtonEvent dataclass

Represents a button event with timing information.

Attributes:

Name Type Description
type ButtonEventType

The type of button event.

button XRButton

The button that triggered the event.

controller XRController

The controller (left/right) that triggered the event.

timestamp_ms float

Unix timestamp in milliseconds when the event occurred.

hold_duration_ms float | None

Duration the button was held (for release events), or None.

Source code in teleop_xr/events.py
@dataclass
class ButtonEvent:
    """Represents a button event with timing information.

    Attributes:
        type: The type of button event.
        button: The button that triggered the event.
        controller: The controller (left/right) that triggered the event.
        timestamp_ms: Unix timestamp in milliseconds when the event occurred.
        hold_duration_ms: Duration the button was held (for release events), or None.
    """

    type: ButtonEventType
    button: XRButton
    controller: XRController
    timestamp_ms: float
    hold_duration_ms: float | None = None

ButtonEventType

Bases: str, Enum

Types of button events that can be detected.

Source code in teleop_xr/events.py
class ButtonEventType(str, Enum):
    """Types of button events that can be detected."""

    BUTTON_DOWN = "button_down"
    BUTTON_UP = "button_up"
    DOUBLE_PRESS = "double_press"
    LONG_PRESS = "long_press"

ButtonState dataclass

Internal state for tracking a single button.

Attributes:

Name Type Description
is_pressed bool

Whether the button is currently pressed.

press_time_ms float

Timestamp of the last press event.

last_release_time_ms float

Timestamp of the last release event.

long_press_fired bool

Whether a long-press event has already been fired for the current press.

double_press_fired bool

Whether a double-press event was just fired (for debounce).

Source code in teleop_xr/events.py
@dataclass
class ButtonState:
    """Internal state for tracking a single button.

    Attributes:
        is_pressed: Whether the button is currently pressed.
        press_time_ms: Timestamp of the last press event.
        last_release_time_ms: Timestamp of the last release event.
        long_press_fired: Whether a long-press event has already been fired for the current press.
        double_press_fired: Whether a double-press event was just fired (for debounce).
    """

    is_pressed: bool = False
    press_time_ms: float = 0.0
    last_release_time_ms: float = 0.0
    long_press_fired: bool = False
    double_press_fired: bool = False

EventProcessor

Orchestrates event detection and manages subscriptions.

This class parses raw XR state dictionaries, feeds them to a ButtonDetector, and invokes registered callbacks when events are detected.

Source code in teleop_xr/events.py
class EventProcessor:
    """Orchestrates event detection and manages subscriptions.

    This class parses raw XR state dictionaries, feeds them to a ButtonDetector,
    and invokes registered callbacks when events are detected.
    """

    _detector: ButtonDetector
    _callbacks: dict[
        ButtonEventType, list[tuple[Callable, XRButton | None, XRController | None]]
    ]

    def __init__(self, settings: EventSettings):
        """Initialize the EventProcessor.

        Args:
            settings: Configuration settings for event detection.
        """
        self._detector = ButtonDetector(settings)
        self._callbacks = {t: [] for t in ButtonEventType}

    def on_button_down(
        self,
        button: XRButton | None = None,
        controller: XRController | None = None,
        callback: Callable[[ButtonEvent], None] | None = None,
    ) -> None:
        """Register a callback for button down events.

        Args:
            button: Optional button filter. If set, only events for this button trigger the callback.
            controller: Optional controller filter. If set, only events for this controller trigger the callback.
            callback: The function to call when the event occurs.
        """
        if callback:
            self._callbacks[ButtonEventType.BUTTON_DOWN].append(
                (callback, button, controller)
            )

    def on_button_up(
        self,
        button: XRButton | None = None,
        controller: XRController | None = None,
        callback: Callable[[ButtonEvent], None] | None = None,
    ) -> None:
        """Register a callback for button up events.

        Args:
            button: Optional button filter.
            controller: Optional controller filter.
            callback: The function to call when the event occurs.
        """
        if callback:
            self._callbacks[ButtonEventType.BUTTON_UP].append(
                (callback, button, controller)
            )

    def on_double_press(
        self,
        button: XRButton | None = None,
        controller: XRController | None = None,
        callback: Callable[[ButtonEvent], None] | None = None,
    ) -> None:
        """Register a callback for double press events.

        Args:
            button: Optional button filter.
            controller: Optional controller filter.
            callback: The function to call when the event occurs.
        """
        if callback:
            self._callbacks[ButtonEventType.DOUBLE_PRESS].append(
                (callback, button, controller)
            )

    def on_long_press(
        self,
        button: XRButton | None = None,
        controller: XRController | None = None,
        callback: Callable[[ButtonEvent], None] | None = None,
    ) -> None:
        """Register a callback for long press events.

        Args:
            button: Optional button filter.
            controller: Optional controller filter.
            callback: The function to call when the event occurs.
        """
        if callback:
            self._callbacks[ButtonEventType.LONG_PRESS].append(
                (callback, button, controller)
            )

    def process(self, pose: np.ndarray, xr_state_dict: dict) -> None:
        """Process XR state and trigger events.

        Args:
            pose: The 4x4 transformation matrix (unused, for Teleop.subscribe compatibility).
            xr_state_dict: The raw XR state dictionary.
        """
        timestamp_ms = xr_state_dict.get("timestamp_unix_ms", 0.0)
        devices = xr_state_dict.get("devices", [])

        for device in devices:
            if device.get("role") != "controller":
                continue

            handedness = device.get("handedness")
            try:
                controller = XRController(handedness)
            except ValueError:
                continue

            gamepad = device.get("gamepad")
            if not gamepad:
                continue

            buttons = gamepad.get("buttons", [])
            for i, button_state in enumerate(buttons):
                button_enum = button_index_to_enum(i)
                if not button_enum:
                    continue

                is_pressed = button_state.get("pressed", False)
                detected_events = self._detector.update(
                    button_enum, controller, is_pressed, timestamp_ms
                )

                for event in detected_events:
                    self._fire_callbacks(event)

    def _fire_callbacks(self, event: ButtonEvent) -> None:
        """Fire all callbacks matching the event filters."""
        for callback, b_filter, c_filter in self._callbacks.get(event.type, []):
            if (b_filter is None or b_filter == event.button) and (
                c_filter is None or c_filter == event.controller
            ):
                callback(event)

__init__(settings)

Initialize the EventProcessor.

Parameters:

Name Type Description Default
settings EventSettings

Configuration settings for event detection.

required
Source code in teleop_xr/events.py
def __init__(self, settings: EventSettings):
    """Initialize the EventProcessor.

    Args:
        settings: Configuration settings for event detection.
    """
    self._detector = ButtonDetector(settings)
    self._callbacks = {t: [] for t in ButtonEventType}

on_button_down(button=None, controller=None, callback=None)

Register a callback for button down events.

Parameters:

Name Type Description Default
button XRButton | None

Optional button filter. If set, only events for this button trigger the callback.

None
controller XRController | None

Optional controller filter. If set, only events for this controller trigger the callback.

None
callback Callable[[ButtonEvent], None] | None

The function to call when the event occurs.

None
Source code in teleop_xr/events.py
def on_button_down(
    self,
    button: XRButton | None = None,
    controller: XRController | None = None,
    callback: Callable[[ButtonEvent], None] | None = None,
) -> None:
    """Register a callback for button down events.

    Args:
        button: Optional button filter. If set, only events for this button trigger the callback.
        controller: Optional controller filter. If set, only events for this controller trigger the callback.
        callback: The function to call when the event occurs.
    """
    if callback:
        self._callbacks[ButtonEventType.BUTTON_DOWN].append(
            (callback, button, controller)
        )

on_button_up(button=None, controller=None, callback=None)

Register a callback for button up events.

Parameters:

Name Type Description Default
button XRButton | None

Optional button filter.

None
controller XRController | None

Optional controller filter.

None
callback Callable[[ButtonEvent], None] | None

The function to call when the event occurs.

None
Source code in teleop_xr/events.py
def on_button_up(
    self,
    button: XRButton | None = None,
    controller: XRController | None = None,
    callback: Callable[[ButtonEvent], None] | None = None,
) -> None:
    """Register a callback for button up events.

    Args:
        button: Optional button filter.
        controller: Optional controller filter.
        callback: The function to call when the event occurs.
    """
    if callback:
        self._callbacks[ButtonEventType.BUTTON_UP].append(
            (callback, button, controller)
        )

on_double_press(button=None, controller=None, callback=None)

Register a callback for double press events.

Parameters:

Name Type Description Default
button XRButton | None

Optional button filter.

None
controller XRController | None

Optional controller filter.

None
callback Callable[[ButtonEvent], None] | None

The function to call when the event occurs.

None
Source code in teleop_xr/events.py
def on_double_press(
    self,
    button: XRButton | None = None,
    controller: XRController | None = None,
    callback: Callable[[ButtonEvent], None] | None = None,
) -> None:
    """Register a callback for double press events.

    Args:
        button: Optional button filter.
        controller: Optional controller filter.
        callback: The function to call when the event occurs.
    """
    if callback:
        self._callbacks[ButtonEventType.DOUBLE_PRESS].append(
            (callback, button, controller)
        )

on_long_press(button=None, controller=None, callback=None)

Register a callback for long press events.

Parameters:

Name Type Description Default
button XRButton | None

Optional button filter.

None
controller XRController | None

Optional controller filter.

None
callback Callable[[ButtonEvent], None] | None

The function to call when the event occurs.

None
Source code in teleop_xr/events.py
def on_long_press(
    self,
    button: XRButton | None = None,
    controller: XRController | None = None,
    callback: Callable[[ButtonEvent], None] | None = None,
) -> None:
    """Register a callback for long press events.

    Args:
        button: Optional button filter.
        controller: Optional controller filter.
        callback: The function to call when the event occurs.
    """
    if callback:
        self._callbacks[ButtonEventType.LONG_PRESS].append(
            (callback, button, controller)
        )

process(pose, xr_state_dict)

Process XR state and trigger events.

Parameters:

Name Type Description Default
pose ndarray

The 4x4 transformation matrix (unused, for Teleop.subscribe compatibility).

required
xr_state_dict dict

The raw XR state dictionary.

required
Source code in teleop_xr/events.py
def process(self, pose: np.ndarray, xr_state_dict: dict) -> None:
    """Process XR state and trigger events.

    Args:
        pose: The 4x4 transformation matrix (unused, for Teleop.subscribe compatibility).
        xr_state_dict: The raw XR state dictionary.
    """
    timestamp_ms = xr_state_dict.get("timestamp_unix_ms", 0.0)
    devices = xr_state_dict.get("devices", [])

    for device in devices:
        if device.get("role") != "controller":
            continue

        handedness = device.get("handedness")
        try:
            controller = XRController(handedness)
        except ValueError:
            continue

        gamepad = device.get("gamepad")
        if not gamepad:
            continue

        buttons = gamepad.get("buttons", [])
        for i, button_state in enumerate(buttons):
            button_enum = button_index_to_enum(i)
            if not button_enum:
                continue

            is_pressed = button_state.get("pressed", False)
            detected_events = self._detector.update(
                button_enum, controller, is_pressed, timestamp_ms
            )

            for event in detected_events:
                self._fire_callbacks(event)

EventSettings dataclass

Configuration settings for event detection.

Attributes:

Name Type Description
double_press_threshold_ms float

Maximum time between presses to count as double-press.

long_press_threshold_ms float

Minimum hold time to count as long-press.

Source code in teleop_xr/events.py
@dataclass
class EventSettings:
    """Configuration settings for event detection.

    Attributes:
        double_press_threshold_ms: Maximum time between presses to count as double-press.
        long_press_threshold_ms: Minimum hold time to count as long-press.
    """

    double_press_threshold_ms: float = 300
    long_press_threshold_ms: float = 500

XRButton

Bases: str, Enum

XR controller button identifiers following xr-standard gamepad mapping.

Source code in teleop_xr/events.py
class XRButton(str, Enum):
    """XR controller button identifiers following xr-standard gamepad mapping."""

    TRIGGER = "trigger"
    SQUEEZE = "squeeze"
    TOUCHPAD = "touchpad"
    THUMBSTICK = "thumbstick"
    BUTTON_PRIMARY = "button_primary"
    BUTTON_SECONDARY = "button_secondary"
    MENU = "menu"

XRController

Bases: str, Enum

XR controller hand identifiers.

Source code in teleop_xr/events.py
class XRController(str, Enum):
    """XR controller hand identifiers."""

    LEFT = "left"
    RIGHT = "right"

button_index_to_enum(index)

Convert xr-standard gamepad button index to XRButton enum.

Parameters:

Name Type Description Default
index int

Button index from xr-standard gamepad (0-6).

required

Returns:

Type Description
XRButton | None

Corresponding XRButton enum value, or None if index is invalid.

Source code in teleop_xr/events.py
def button_index_to_enum(index: int) -> XRButton | None:
    """Convert xr-standard gamepad button index to XRButton enum.

    Args:
        index: Button index from xr-standard gamepad (0-6).

    Returns:
        Corresponding XRButton enum value, or None if index is invalid.
    """
    return _BUTTON_INDEX_MAP.get(index)

Video & Cameras

teleop_xr.video_stream

teleop_xr.video_stream

teleop_xr.camera_views

teleop_xr.camera_views

build_camera_views_config(head=None, wrist_left=None, wrist_right=None, extra_streams=None)

Inputs are optional device specs (int/str/None) Returns dict with keys for provided values only: 'head', 'wrist_left', 'wrist_right', and any extras Each entry is { "device": } If the same device is mapped to multiple views, allow it but emit a logging.warning

Source code in teleop_xr/camera_views.py
def build_camera_views_config(
    head=None, wrist_left=None, wrist_right=None, extra_streams: dict | None = None
) -> dict:
    """
    Inputs are optional device specs (int/str/None)
    Returns dict with keys for provided values only: 'head', 'wrist_left', 'wrist_right', and any extras
    Each entry is { "device": <normalized> }
    If the same device is mapped to multiple views, allow it but emit a logging.warning
    """
    inputs = {"head": head, "wrist_left": wrist_left, "wrist_right": wrist_right}
    if extra_streams:
        inputs.update(extra_streams)

    config = {}
    device_to_views = {}

    for view_key, raw_value in inputs.items():
        if raw_value is not None:
            normalized = parse_device_spec(raw_value)
            config[view_key] = {"device": normalized}

            if normalized not in device_to_views:
                device_to_views[normalized] = []
            device_to_views[normalized].append(view_key)

    for device, views in device_to_views.items():
        if len(views) > 1:
            logging.warning(
                f"Device {device} is mapped to multiple views: {', '.join(views)}"
            )

    return config

build_video_streams(camera_views)

Returns { "streams": [{"id": , "device": }, ...] } Preserve stable order: head, wrist_left, wrist_right if present, then others alphabetically

Source code in teleop_xr/camera_views.py
def build_video_streams(camera_views: dict) -> dict:
    """
    Returns { "streams": [{"id": <view_key>, "device": <device>}, ...] }
    Preserve stable order: head, wrist_left, wrist_right if present, then others alphabetically
    """
    order = ["head", "wrist_left", "wrist_right"]
    streams = []

    # First, add ordered known keys
    for view_key in order:
        if view_key in camera_views:
            val = camera_views[view_key]
            # Handle both Pydantic model and dict
            if hasattr(val, "device"):
                device = val.device
            else:
                device = val["device"]

            streams.append({"id": view_key, "device": device})

    # Then add any other keys alphabetically
    extra_keys = sorted([k for k in camera_views.keys() if k not in order])
    for view_key in extra_keys:
        val = camera_views[view_key]
        if hasattr(val, "device"):
            device = val.device
        else:
            device = val["device"]

        streams.append({"id": view_key, "device": device})

    return {"streams": streams}

Visualization

teleop_xr.robot_vis

teleop_xr.robot_vis

RobotVisModule

Module for serving robot visualization assets (URDF, meshes) and broadcasting state.

Source code in teleop_xr/robot_vis.py
class RobotVisModule:
    """
    Module for serving robot visualization assets (URDF, meshes) and broadcasting state.
    """

    def __init__(self, app: FastAPI, config: RobotVisConfig):
        self.app = app
        self.config = config
        self.logger = logging.getLogger("teleop.robot_vis")
        self._setup_routes()

    def _setup_routes(self):
        @self.app.get("/robot_assets/{file_path:path}")
        async def get_asset(file_path: str):
            self.logger.info(f"Asset request: {file_path}")
            full_path = ""

            if file_path == "robot.urdf":
                full_path = self.config.urdf_path
                # If we have a mesh path (repo root), try to rewrite absolute paths in URDF
                # to be relative, so the frontend can request them via /robot_assets/
                if self.config.mesh_path:
                    try:
                        with open(full_path, "r") as f:
                            content = f.read()

                        mesh_path_abs = os.path.abspath(self.config.mesh_path)
                        if not mesh_path_abs.endswith(os.sep):
                            mesh_path_abs += os.sep

                        # Simple string replacement of absolute path prefix
                        if mesh_path_abs in content:
                            new_content = content.replace(mesh_path_abs, "")
                            return Response(
                                content=new_content, media_type="application/xml"
                            )
                    except Exception as e:
                        self.logger.warning(f"Failed to rewrite URDF paths: {e}")
                        # Fallback to standard file serving

            elif "package://" in file_path:
                clean_path = file_path.split("package://")[-1]
                if self.config.mesh_path:
                    full_path = os.path.join(self.config.mesh_path, clean_path)
                else:
                    self.logger.warning(
                        f"Request for package resource '{file_path}' but 'mesh_path' is not configured."
                    )
                    full_path = clean_path
            else:
                # Try relative to URDF directory first
                urdf_dir = os.path.dirname(os.path.abspath(self.config.urdf_path))
                potential_paths = [os.path.join(urdf_dir, file_path)]

                # If mesh_path is configured, try resolving against it
                if self.config.mesh_path:
                    potential_paths.append(
                        os.path.join(self.config.mesh_path, file_path)
                    )

                full_path = potential_paths[0]
                for p in potential_paths:
                    if os.path.exists(p):
                        full_path = p
                        break

            if not os.path.exists(full_path):
                self.logger.warning(f"Asset not found: {full_path}")
                raise HTTPException(
                    status_code=404, detail=f"Asset not found: {file_path}"
                )

            media_type = None
            ext = os.path.splitext(full_path)[1].lower()
            if ext == ".stl":
                media_type = "application/octet-stream"
            elif ext == ".dae":
                media_type = "model/vnd.collada+xml"
            elif ext == ".obj":
                media_type = "text/plain"
            elif ext == ".urdf":
                media_type = "application/xml"
            elif ext == ".glb":
                media_type = "model/gltf-binary"
            elif ext == ".gltf":
                media_type = "model/gltf+json"

            return FileResponse(full_path, media_type=media_type)

    def get_frontend_config(self) -> Dict[str, Any]:
        return {
            "urdf_url": "/robot_assets/robot.urdf",
            "model_scale": self.config.model_scale,
            "initial_rotation_euler": self.config.initial_rotation_euler,
        }

    async def broadcast_state(self, connection_manager: Any, joints: Dict[str, float]):
        """
        Broadcasts the current joint state to all connected clients.

        Args:
            connection_manager: The ConnectionManager instance from Teleop class.
            joints: Dictionary mapping joint names to values (radians/meters).
        """
        message = {"type": "robot_state", "data": {"joints": joints}}
        await connection_manager.broadcast(json.dumps(message))

broadcast_state(connection_manager, joints) async

Broadcasts the current joint state to all connected clients.

Parameters:

Name Type Description Default
connection_manager Any

The ConnectionManager instance from Teleop class.

required
joints Dict[str, float]

Dictionary mapping joint names to values (radians/meters).

required
Source code in teleop_xr/robot_vis.py
async def broadcast_state(self, connection_manager: Any, joints: Dict[str, float]):
    """
    Broadcasts the current joint state to all connected clients.

    Args:
        connection_manager: The ConnectionManager instance from Teleop class.
        joints: Dictionary mapping joint names to values (radians/meters).
    """
    message = {"type": "robot_state", "data": {"joints": joints}}
    await connection_manager.broadcast(json.dumps(message))

Utilities

teleop_xr.utils.filter

teleop_xr.utils.filter

teleop_xr.utils.transform_limiter

teleop_xr.utils.transform_limiter

apply_twist(T, v, w, dt=1.0)

Apply twist (v, w) to transformation T over time dt.

Source code in teleop_xr/utils/transform_limiter.py
def apply_twist(T, v, w, dt=1.0):
    """Apply twist (v, w) to transformation T over time dt."""
    # Translation update
    t_new = T[:3, 3] + v * dt

    # Rotation update
    angle = np.linalg.norm(w)
    if angle < 1e-8:
        R_update = np.eye(3)
    else:
        axis = w / angle
        q_update = axangle2quat(axis, angle * dt)
        R_update = quat2mat(q_update)

    R_new = R_update @ T[:3, :3]

    T_new = np.eye(4)
    T_new[:3, :3] = R_new
    T_new[:3, 3] = t_new
    return T_new

se3_to_twist(T1, T2)

Compute linear and angular velocity from T1 to T2.

Source code in teleop_xr/utils/transform_limiter.py
def se3_to_twist(T1, T2):
    """Compute linear and angular velocity from T1 to T2."""
    dT = np.linalg.inv(T1) @ T2
    # Linear velocity
    v = dT[:3, 3]

    # Angular velocity
    R = dT[:3, :3]
    q = mat2quat(R)
    angle, axis = quat2axangle(q)
    w = angle * np.array(axis)

    return v, w

teleop_xr.common_cli

teleop_xr.common_cli