Source code for austin.gripper

#!/usr/bin/env python3
import logging
import sys
import time
import asyncio
from functools import partial
from threading import Lock
from typing import Literal, Union, OrderedDict
import enum

from caproto import ChannelType
from caproto.server import (
    PVGroup,
    ioc_arg_parser,
    pvproperty,
    run,
    PvpropertyDouble,
    PvpropertyShort,
    PvpropertyShortRO,
    PvpropertyChar,
    SubGroup,
    pvfunction,
)
from caproto.server.autosave import autosaved

from .driver import RobotDriver

log = logging.getLogger(__name__)


POLL_TIME = 2  # Seconds


[docs]class GripperGroup(PVGroup): """PVs for controller the robot's gripper hand.""" act = pvproperty( name=".ACT", value="Off", dtype=bool, doc="Request that the gripper activate." ) @act.startup async def act(self, instance, async_lib): self.async_lib = async_lib @act.putter async def act(self, instance, value): if value == "On": self.parent.driver.activate_gripper() print("Activating the gripper") return "Off" act_rbv = pvproperty( name=".ACR", value="Off", dtype=bool, doc="Whether the gripper is activated", read_only=True, ) @act_rbv.scan(POLL_TIME) async def act_rbv(self, instance, async_lib): new_value = self.parent.driver.gripper_act_status() if new_value != instance.value: await instance.write(new_value) cls = pvproperty( name=".CLS", dtype=int, value=0, doc="Calibrated 'closed' position", read_only=True, ) @cls.scan(POLL_TIME) async def cls(self, instance, async_lib): new_value = self.parent.driver.gripper_cls_position() if new_value != instance.value: await instance.write(new_value) opn = pvproperty( name=".OPN", dtype=int, value=120, doc="Calibrated 'open' position", read_only=True, ) @opn.scan(POLL_TIME) async def opn(self, instance, async_lib): new_value = self.parent.driver.gripper_opn_position() if new_value != instance.value: await instance.write(new_value) cal = pvproperty( name=".CAL", dtype=bool, value="Off", doc="Calibrate the robot's open/closed range", ) @cal.putter async def cal(self, instance, value): if value == "On": self.parent.driver.gripper_cal() print("Starting gripper calibration") return "Off" rbv = pvproperty( name=".RBV", dtype=int, value=0, doc="Current gripper position readback value", read_only=True, ) @rbv.scan(POLL_TIME) async def rbv(self, instance, async_lib): new_value = self.parent.driver.gripper_cur_position() if new_value != instance.value: await instance.write(new_value) val = pvproperty(name=".VAL", dtype=int, value=0, doc="Desired position set point") vel = autosaved(pvproperty( name=".VEL", dtype=int, value=120, doc="How fast the gripper should move" )) frc = autosaved(pvproperty( name=".FRC", dtype=int, value=50, doc="How much force the gripper may apply" )) @val.putter async def val(self, instance, value): velv = self.vel.value frcv = self.frc.value loop = self.async_lib.library.get_running_loop() await loop.run_in_executor( None, self.parent.driver.gripper_move, value, velv, frcv )