Source code for austin.dashboard

#!/usr/bin/env python3
import logging
import sys
import time
import asyncio
from functools import partial
from threading import Lock
import re
import enum

from caproto import ChannelType, SkipWrite
from caproto.server import (
    PVGroup,
    pvproperty,
    PvpropertyDouble,
    PvpropertyShort,
    PvpropertyShortRO,
    PvpropertyChar,
    SubGroup,
    scan_wrapper,
)

from .driver import RobotDriver, RobotDisconnected

POLL_SLOWDOWN = 60  # Should be 1 for normal operation, high for debugging
POLL_TIME = 0.5 * POLL_SLOWDOWN


[docs]def reset_trigger(value): return False
[docs]class RobotCommandFailed(RuntimeError): ...
[docs]class SafetyStatus(enum.IntEnum): UNKNOWN = 0 NORMAL = 1 REDUCED = 2 PROTECTIVE_STOP = 3 RECOVERY = 4 SAFEGUARD_STOP = 5 SYSTEM_EMERGENCY_STOP = 6 ROBOT_EMERGENCY_STOP = 7 VIOLATION = 8 FAULT = 9 AUTOMATIC_MODE_SAFEGUARD_STOP = 10 SYSTEM_THREE_POSITION_ENABLING_STOP = 11
[docs]class RobotMode(enum.IntEnum): UNKNOWN = 0 NO_CONTROLLER = 1 DISCONNECTED = 2 CONFIRM_SAFETY = 3 BOOTING = 4 POWER_OFF = 5 POWER_ON = 6 IDLE = 7 BACKDRIVE = 8 RUNNING = 9
[docs]class ProgramState(enum.IntEnum): UNKNOWN = 0 STOPPED = 1 PLAYING = 2 PAUSED = 3
[docs]class OperationalMode(enum.IntEnum): MANUAL = 0 AUTOMATIC = 1 NONE = 2
[docs]class DashboardGroup(PVGroup): lock: asyncio.Lock = None
[docs] async def send_command(self, message: str, fmt: str = None): """Ask the robot for information via a serial command. Parameters ========== message Message to send to the robot to ask for information. fmt Regular expression format string to parse the response. Returns ======= new_value The answer replied by the robot. """ # Prepare the socket lock if necessary if self.lock is None: self.lock = self.parent.async_lib.library.Lock() # Send the message in a separate thread loop = self.parent.async_lib.library.get_running_loop() async with self.lock: try: reply = await loop.run_in_executor( None, self.parent.driver.send_and_receive, message ) except RobotDisconnected: return None # Parse the response if fmt is not None: value = re.match(fmt, reply) if value is None: await self.message.write(reply) raise RobotCommandFailed(reply) elif len(value.groups()) > 0: return value.group(1)
[docs] async def put_command(self, instance, value, message, fmt, convert=lambda x: None): """Send a command to perform some change on the robot's state.""" message = message.format(value=value) print(f"Sending command: ``{message}``") updated_val = await self.send_command(message, fmt=fmt) print(f"Received response: {updated_val}") updated_val = convert(updated_val) return updated_val
[docs] async def scan_command(self, instance, async_lib, message, fmt, convert=None): """Send a request for status update, and return the parsed value.""" new_val = await self.send_command(message, fmt=fmt) # Convert if convert is not None: new_val = convert(new_val) if new_val is not None and new_val != instance.value: await instance.write(new_val)
# Define PVs below remote_control = pvproperty( name=":remote_control", value=False, doc="Whether the robot is in remote control mode.", read_only=True, ) remote_control = remote_control.scan(POLL_TIME)( partial( scan_command, message="is in remote control", fmt="(.+)", convert=lambda x: x == "true", ) ) current_program = pvproperty( name=":program_rbv", value="", dtype=str, read_only=True, report_as_string=True, max_length=255, doc="Name of the currently loaded program.", ) current_program = current_program.scan(POLL_TIME)( partial(scan_command, message="get loaded program", fmt="Loaded program: (.+)") ) program = pvproperty( name=":program", value="", dtype=PvpropertyChar, doc="Change to a new loaded program.", report_as_string=True, max_length=255, ) program = program.putter( partial(put_command, message="load {value}", fmt="Loading program: (.+)") ) installation = pvproperty( name=":installation", value="", dtype=PvpropertyChar, doc="Change to a new loaded installation file.", report_as_string=True, max_length=255, ) installation = installation.putter( partial( put_command, message="load installation {value}", fmt="Loading installation: (.+)", ) ) play = pvproperty( name=":play", value=False, dtype=bool, doc="Instruct the robot to play the current program.", put=partial( put_command, message="play", fmt="Starting program", convert=reset_trigger ), ) stop = pvproperty( name=":stop", value=False, dtype=bool, doc="Instruct the robot to stop the current program.", put=partial(put_command, message="stop", fmt="Stopped", convert=reset_trigger), ) pause = pvproperty( name=":pause", value=False, dtype=bool, doc="Instruct the robot to pause the current program.", put=partial( put_command, message="pause", fmt="Pausing program", convert=reset_trigger ), ) quit_ = pvproperty( name=":quit", value=False, dtype=bool, doc="Instruct the robot to quit.", put=partial( put_command, message="quit", fmt="Disconnected", convert=reset_trigger ), ) shutdown = pvproperty( name=":shutdown", value=False, dtype=bool, doc="Instruct the robot to shut down.", put=partial( put_command, message="shutdown", fmt="Shutting down", convert=reset_trigger ), ) release_brake = pvproperty( name=":release_brake", value=False, dtype=bool, doc="Instruct the robot to release the brake.", put=partial( put_command, message="brake release", fmt="Brake releasing", convert=reset_trigger, ), ) close_safety_popup = pvproperty( name=":close_safety_popup", value=False, dtype=bool, doc="Closes the displayed safety popup.", put=partial( put_command, message="close safety popup", fmt="closing safety popup", convert=reset_trigger, ), ) unlock_protective_stop = pvproperty( name=":unlock_protective_stop", value=False, dtype=bool, doc="Closes the current popup and unlocks protective stop.", put=partial( put_command, message="unlock protective stop", fmt="Protective stop releasing", convert=reset_trigger, ), ) restart_safety = pvproperty( name=":restart_safety", value=False, dtype=bool, doc="Used when robot gets a safety fault or violation to restart the safety.", put=partial( put_command, message="restart safety", fmt="Restarting safety", convert=reset_trigger, ), ) # Read-only status PVs serial_number = pvproperty( name=":serial_number", value="", dtype=PvpropertyChar, doc="Serial number of the robot.", read_only=True, report_as_string=True, max_length=255, startup=partial( scan_command, message="get serial number", fmt="(.+)", ), ) model_number = pvproperty( name=":model_number", value="unknown", dtype=PvpropertyChar, doc="Model number of the robot.", read_only=True, report_as_string=True, max_length=255, startup=partial( scan_command, message="get robot model", fmt="(.+)", ), ) software_version = pvproperty( name=":software_version", value="unknown", dtype=PvpropertyChar, doc="Version number of the software installed on the robot.", read_only=True, report_as_string=True, max_length=255, startup=partial( scan_command, message="PolyscopeVersion", fmt="(.+)", ), ) program_running = pvproperty( name=":program_running", value=False, dtype=bool, doc="Whether a program is currently running on the robot.", read_only=True, ) program_running = program_running.scan(POLL_TIME)( partial( scan_command, message="running", fmt="Program running: (true|false)", convert=lambda x: x == "true", ) ) program_saved = pvproperty( name=":program_saved", value=False, dtype=bool, doc="Whether a current program is saved.", read_only=True, ) program_saved = program_saved.scan(POLL_TIME)( partial( scan_command, message="isProgramSaved", fmt="(true|false) .+", convert=lambda x: x == "true", ) ) # This :safety_status PV should use the SafetyStatus enum, but # some of the strings are longer than the 26 character limit, and # so this will need to be factored. safety_status = pvproperty( name=":safety_status", value="", dtype=str, read_only=True, report_as_string=True, max_length=255, doc="Reports the current robot safety stats.", ) safety_status = safety_status.scan(POLL_TIME)( partial( scan_command, message="safetystatus", fmt="Safetystatus: (.+)", ), ) robot_mode = pvproperty( name=":robot_mode", value=RobotMode.UNKNOWN, read_only=True, doc="Reports the current robot operating mode.", ) @robot_mode.scan(POLL_TIME) async def robot_mode(self, instance, async_lib): new_val = await self.send_command(message="robotmode", fmt="Robotmode: (.+)") if new_val is not None and new_val != instance.value: await instance.write(new_val) # Set the power on/off state based on the new mode power_state = getattr(RobotMode, new_val) >= RobotMode.POWER_ON await self.power_rbv.write(power_state) program_state = pvproperty( name=":program_state", value=ProgramState.UNKNOWN, read_only=True, doc="Reports the state of the active program.", ) program_state = program_state.scan(POLL_TIME)( partial( scan_command, message="programState", fmt="([A-Z_]+) .+", ), ) # PVs for change/reading the power state power = pvproperty( name=":power", dtype=bool, value=False, doc="Powers the robot arm on and off." ) @power.putter async def power(self, instance, value): """Turn the robot arm power on or off.""" if value == "On": cmd = "power on" elif value == "Off": cmd = "power off" else: raise ValueError(f"Power setting must be 'On' or 'Off'. Got: {value}") return await self.put_command( instance, value, message=cmd, fmt="Powering (.+)", convert=lambda x: x == "on", ) # Read-back value for power state is set during ``robot_mode`` scan power_rbv = pvproperty( name=":power_rbv", value=False, dtype=bool, read_only=True, doc="The current power on/off state.", ) # PVs for change/reading the Robot's operational mode operational_mode = pvproperty( name=":operational_mode", value=OperationalMode.MANUAL, doc="The requested operational mode: MANUAL or AUTOMATIC (the password has not been set).", put=partial( put_command, message="set operational mode {value}", fmt="Operational mode '(.+)' is set", ), ) operational_mode_rbv = pvproperty( name=":operational_mode_rbv", value=OperationalMode.MANUAL, read_only=True, doc="The robot's current operational mode: MANUAL, AUTOMATIC, or NONE (the password has not been set)..", ) operational_mode_rbv = operational_mode_rbv.scan(POLL_TIME)( partial( scan_command, message="get operational mode", fmt="(.+)", ), ) # A generic PV that can be used by other PVs to report an error message message = pvproperty( name=":message", read_only=True, dtype=PvpropertyChar, value="", report_as_string=True, max_length=255, doc="Most recent dashboard message reported by the robot.", )