Python SDK
The rosfit Python package provides a synchronous and async client for the RosFit platform, purpose-built for backend services, data pipelines, automation scripts, and direct integration with ROS 2 via rclpy.
Installation
pip install rosfit
For async support (recommended for high-throughput use cases):
pip install rosfit[async]
The SDK requires Python 3.10+ and works on Linux, macOS, and Windows.
Authentication
from rosfit import RosFitClient
client = RosFitClient(
base_url="http://localhost:8000/api/v1",
api_key="rft_live_a1b2c3d4e5f6",
)
# Or authenticate with credentials
client = RosFitClient(base_url="http://localhost:8000/api/v1")
client.auth.login(email="[email protected]", password="securepassword")
For async usage:
from rosfit import AsyncRosFitClient
client = AsyncRosFitClient(
base_url="http://localhost:8000/api/v1",
api_key="rft_live_a1b2c3d4e5f6",
)
Device management
List and filter devices
# List all online robots
devices = client.devices.list(status="online", type="ros2_robot")
for device in devices:
print(f"{device.name} [{device.status}] - last seen {device.last_seen}")
# Get a single device
bot = client.devices.get("dev_k8m2n4")
print(bot.name, bot.group, bot.tags)
Register a new device
new_device = client.devices.create(
name="arm-controller-01",
device_type="ros2_robot",
group="assembly-line",
tags=["arm", "welding"],
metadata={
"ros_distro": "jazzy",
"hardware": {"model": "ur5e", "serial": "UR5E-2026-042"},
},
)
print(f"Device ID: {new_device.device_id}")
print(f"MQTT Token: {new_device.access_token}")
Update and decommission
client.devices.update("dev_k8m2n4", group="warehouse-west", tags=["amr", "relocated"])
client.devices.delete("dev_old_unit")
Telemetry
Query latest snapshot
telemetry = client.telemetry.latest("dev_k8m2n4")
print(f"Battery: {telemetry.data['battery_percent']}%")
print(f"Position: {telemetry.data['odom']['position']}")
Query historical data
from datetime import datetime, timedelta
history = client.telemetry.history(
device_id="dev_k8m2n4",
metric="battery_percent",
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
interval="15m",
)
for point in history.data:
print(f"{point.time}: avg={point.avg:.1f}%, min={point.min:.1f}%, max={point.max:.1f}%")
Live telemetry stream
import asyncio
from rosfit import AsyncRosFitClient
async def stream_telemetry():
client = AsyncRosFitClient(
base_url="http://localhost:8000/api/v1",
api_key="rft_live_a1b2c3d4e5f6",
)
async for point in client.telemetry.stream("dev_k8m2n4", topics=["odom", "battery_state"]):
print(f"[{point.topic}] {point.timestamp}: {point.data}")
asyncio.run(stream_telemetry())
Commands
Send a command
cmd = client.commands.send(
device_id="dev_k8m2n4",
command="navigate_to",
data={
"goal": {
"position": {"x": 5.0, "y": 3.0, "z": 0.0},
"orientation": {"x": 0.0, "y": 0.0, "z": 0.0, "w": 1.0},
},
},
timeout_sec=120,
)
print(f"Command {cmd.command_id}: {cmd.status}")
Track and cancel
status = client.commands.get(cmd.command_id)
print(f"Status: {status.status}, Result: {status.result}")
client.commands.cancel(cmd.command_id)
Broadcast emergency stop
result = client.commands.broadcast(
group="warehouse-east",
command="emergency_stop",
data={},
priority="critical",
)
print(f"Sent to {result.device_count} devices")
rclpy integration
The SDK integrates directly with rclpy for hybrid workflows where you need both cloud API access and local ROS 2 communication.
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from rosfit import RosFitClient
class CloudControlledNode(Node):
def __init__(self):
super().__init__("cloud_controller")
self.publisher = self.create_publisher(Twist, "/cmd_vel", 10)
self.client = RosFitClient(
base_url="http://localhost:8000/api/v1",
api_key="rft_live_a1b2c3d4e5f6",
)
self.timer = self.create_timer(1.0, self.check_commands)
def check_commands(self):
pending = self.client.commands.list(
device_id="dev_k8m2n4",
status="pending",
)
for cmd in pending:
if cmd.command == "cmd_vel":
twist = Twist()
twist.linear.x = cmd.data["linear"]["x"]
twist.angular.z = cmd.data["angular"]["z"]
self.publisher.publish(twist)
self.client.commands.ack(cmd.command_id)
def main():
rclpy.init()
node = CloudControlledNode()
rclpy.spin(node)
rclpy.shutdown()
Jupyter notebooks
The SDK works in Jupyter environments, making it straightforward to explore telemetry data interactively.
import pandas as pd
import matplotlib.pyplot as plt
from rosfit import RosFitClient
from datetime import datetime, timedelta
client = RosFitClient(
base_url="http://localhost:8000/api/v1",
api_key="rft_live_a1b2c3d4e5f6",
)
# Fetch 24 hours of battery data
history = client.telemetry.history(
device_id="dev_k8m2n4",
metric="battery_percent",
from_time=datetime.now() - timedelta(hours=24),
to_time=datetime.now(),
interval="5m",
)
# Convert to DataFrame
df = pd.DataFrame([
{"time": p.time, "battery": p.avg}
for p in history.data
])
df["time"] = pd.to_datetime(df["time"])
# Plot battery over time
plt.figure(figsize=(12, 4))
plt.plot(df["time"], df["battery"], color="teal")
plt.xlabel("Time")
plt.ylabel("Battery %")
plt.title("Battery Level - turtlebot-01 (24h)")
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
You can also combine fleet data with pandas for cross-device analysis:
fleet = client.devices.list(status="online")
rows = []
for device in fleet:
tel = client.telemetry.latest(device.device_id)
rows.append({
"device": device.name,
"group": device.group,
"battery": tel.data.get("battery_percent"),
"cpu_temp": tel.data.get("cpu_temp"),
})
df = pd.DataFrame(rows)
print(df.sort_values("battery").to_string(index=False))