Python SDK

The rosfit Python package provides a synchronous and async client for the RosFit platform, purpose-built for backend services, data pipelines, automation scripts, and direct integration with ROS 2 via rclpy.

Installation

pip install rosfit

For async support (recommended for high-throughput use cases):

pip install rosfit[async]

The SDK requires Python 3.10+ and works on Linux, macOS, and Windows.

Authentication

from rosfit import RosFitClient

client = RosFitClient(
    base_url="http://localhost:8000/api/v1",
    api_key="rft_live_a1b2c3d4e5f6",
)

# Or authenticate with credentials
client = RosFitClient(base_url="http://localhost:8000/api/v1")
client.auth.login(email="[email protected]", password="securepassword")

For async usage:

from rosfit import AsyncRosFitClient

client = AsyncRosFitClient(
    base_url="http://localhost:8000/api/v1",
    api_key="rft_live_a1b2c3d4e5f6",
)

Device management

List and filter devices

# List all online robots
devices = client.devices.list(status="online", type="ros2_robot")
for device in devices:
    print(f"{device.name} [{device.status}] - last seen {device.last_seen}")

# Get a single device
bot = client.devices.get("dev_k8m2n4")
print(bot.name, bot.group, bot.tags)

Register a new device

new_device = client.devices.create(
    name="arm-controller-01",
    device_type="ros2_robot",
    group="assembly-line",
    tags=["arm", "welding"],
    metadata={
        "ros_distro": "jazzy",
        "hardware": {"model": "ur5e", "serial": "UR5E-2026-042"},
    },
)

print(f"Device ID: {new_device.device_id}")
print(f"MQTT Token: {new_device.access_token}")

Update and decommission

client.devices.update("dev_k8m2n4", group="warehouse-west", tags=["amr", "relocated"])

client.devices.delete("dev_old_unit")

Telemetry

Query latest snapshot

telemetry = client.telemetry.latest("dev_k8m2n4")
print(f"Battery: {telemetry.data['battery_percent']}%")
print(f"Position: {telemetry.data['odom']['position']}")

Query historical data

from datetime import datetime, timedelta

history = client.telemetry.history(
    device_id="dev_k8m2n4",
    metric="battery_percent",
    from_time=datetime.now() - timedelta(hours=24),
    to_time=datetime.now(),
    interval="15m",
)

for point in history.data:
    print(f"{point.time}: avg={point.avg:.1f}%, min={point.min:.1f}%, max={point.max:.1f}%")

Live telemetry stream

import asyncio
from rosfit import AsyncRosFitClient

async def stream_telemetry():
    client = AsyncRosFitClient(
        base_url="http://localhost:8000/api/v1",
        api_key="rft_live_a1b2c3d4e5f6",
    )

    async for point in client.telemetry.stream("dev_k8m2n4", topics=["odom", "battery_state"]):
        print(f"[{point.topic}] {point.timestamp}: {point.data}")

asyncio.run(stream_telemetry())

Commands

Send a command

cmd = client.commands.send(
    device_id="dev_k8m2n4",
    command="navigate_to",
    data={
        "goal": {
            "position": {"x": 5.0, "y": 3.0, "z": 0.0},
            "orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
        },
    },
    timeout_sec=120,
)

print(f"Command {cmd.command_id}: {cmd.status}")

Track and cancel

status = client.commands.get(cmd.command_id)
print(f"Status: {status.status}, Result: {status.result}")

client.commands.cancel(cmd.command_id)

Broadcast emergency stop

result = client.commands.broadcast(
    group="warehouse-east",
    command="emergency_stop",
    data={},
    priority="critical",
)

print(f"Sent to {result.device_count} devices")

rclpy integration

The SDK integrates directly with rclpy for hybrid workflows where you need both cloud API access and local ROS 2 communication.

import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from rosfit import RosFitClient

class CloudControlledNode(Node):
    def __init__(self):
        super().__init__("cloud_controller")
        self.publisher = self.create_publisher(Twist, "/cmd_vel", 10)
        self.client = RosFitClient(
            base_url="http://localhost:8000/api/v1",
            api_key="rft_live_a1b2c3d4e5f6",
        )
        self.timer = self.create_timer(1.0, self.check_commands)

    def check_commands(self):
        pending = self.client.commands.list(
            device_id="dev_k8m2n4",
            status="pending",
        )
        for cmd in pending:
            if cmd.command == "cmd_vel":
                twist = Twist()
                twist.linear.x = cmd.data["linear"]["x"]
                twist.angular.z = cmd.data["angular"]["z"]
                self.publisher.publish(twist)
                self.client.commands.ack(cmd.command_id)

def main():
    rclpy.init()
    node = CloudControlledNode()
    rclpy.spin(node)
    rclpy.shutdown()

Jupyter notebooks

The SDK works in Jupyter environments, making it straightforward to explore telemetry data interactively.

import pandas as pd
import matplotlib.pyplot as plt
from rosfit import RosFitClient
from datetime import datetime, timedelta

client = RosFitClient(
    base_url="http://localhost:8000/api/v1",
    api_key="rft_live_a1b2c3d4e5f6",
)

# Fetch 24 hours of battery data
history = client.telemetry.history(
    device_id="dev_k8m2n4",
    metric="battery_percent",
    from_time=datetime.now() - timedelta(hours=24),
    to_time=datetime.now(),
    interval="5m",
)

# Convert to DataFrame
df = pd.DataFrame([
    {"time": p.time, "battery": p.avg}
    for p in history.data
])
df["time"] = pd.to_datetime(df["time"])

# Plot battery over time
plt.figure(figsize=(12, 4))
plt.plot(df["time"], df["battery"], color="teal")
plt.xlabel("Time")
plt.ylabel("Battery %")
plt.title("Battery Level - turtlebot-01 (24h)")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

You can also combine fleet data with pandas for cross-device analysis:

fleet = client.devices.list(status="online")

rows = []
for device in fleet:
    tel = client.telemetry.latest(device.device_id)
    rows.append({
        "device": device.name,
        "group": device.group,
        "battery": tel.data.get("battery_percent"),
        "cpu_temp": tel.data.get("cpu_temp"),
    })

df = pd.DataFrame(rows)
print(df.sort_values("battery").to_string(index=False))