Package topo :: Package misc :: Module robotics
[hide private]
[frames] | no frames]

Source Code for Module topo.misc.robotics

  1  """ 
  2  Classes for using robotic or other hardware using Topographica. 
  3   
  4  This module contains several classes for constructing robotics 
  5  interfaces to Topographica simulations.  It includes modules that read 
  6  input from or send output to robot devices, and a (quasi) real-time 
  7  simulation object that attempts to maintain a correspondence between 
  8  simulation time and real time. 
  9   
 10  This module requires the PlayerStage robot interface system (from 
 11  playerstage.sourceforge.net), and the playerrobot module for 
 12  high-level communications with Player robots. 
 13   
 14  $Id: robotics.py 9359 2008-10-02 21:53:54Z jbednar $ 
 15  """ 
 16  __version__ = '$Revision: 9359 $' 
 17   
 18   
 19  import time 
 20  import Image,ImageOps 
 21  from math import pi,cos,sin 
 22   
 23  from .. import param 
 24   
 25  from topo.base.simulation import Simulation,EventProcessor 
 26  from topo.pattern.image import GenericImage 
 27   
 28  from playerrobot import CameraDevice,PTZDevice 
 29   
 30   
31 -class CameraImage(GenericImage):
32 """ 33 An image pattern generator that gets its image from a Player 34 camera device. 35 """ 36 37 camera = param.ClassSelector(CameraDevice,default=None,doc=""" 38 An instance of playerrobot.CameraDevice to be used 39 to generate images.""") 40
41 - def __init__(self,**params):
42 super(CameraImage,self).__init__(**params) 43 self._image = None
44
45 - def _get_image(self,params):
46 self._decode_image(*self.camera.image) 47 return True
48
49 - def _decode_image(self,fmt,w,h,bpp,fdiv,data):
50 if fmt==1: 51 self._image = Image.new('L',(w,h)) 52 self._image.fromstring(data,'raw') 53 else: 54 # JPALERT: if not grayscale, then assume color. This 55 # should be expanded for other modes. 56 rgb_im = Image.new('RGB',(w,h)) 57 rgb_im.fromstring(data,'raw') 58 self._image = ImageOps.grayscale(rgb_im)
59 60 61
62 -class CameraImageQueued(CameraImage):
63 """ 64 A version of CameraImage that gets the image from the camera's image queue, 65 rather than directly from the camera object. Using queues is 66 necessary when running the playerrobot in a separate process 67 without shared memory. When getting an image, this pattern 68 generator will fetch every image in the image queue and use the 69 most recent as the current pattern. 70 """ 71
72 - def _get_image(self,params):
73 74 im_spec = None 75 76 if self._image is None: 77 # if we don't have an image then block until we get one 78 im_spec = self.camera.image_queue.get() 79 self.camera.image_queue.task_done() 80 81 # Make sure we clear the image queue and get the most recent image. 82 while not self.camera.image_queue.empty(): 83 im_spec = self.camera.image_queue.get_nowait() 84 self.camera.image_queue.task_done() 85 86 if im_spec: 87 # If we got a new image from the queue, then 88 # construct a PIL image from it. 89 self._decode_image(*im_spec) 90 return True 91 else: 92 return False
93 94
95 -class PTZ(EventProcessor):
96 """ 97 Pan/Tilt/Zoom control. 98 99 This event processor takes input events on its 'Saccade' input 100 port in the form of (amplitude,direction) saccade commands (as 101 produced by the topo.sheet.saccade.SaccadeController class) and 102 appropriately servoes the attached PTZ object. There is not 103 currently any dynamic zoom control, though the static zoom level 104 can be set as a parameter. 105 """ 106 107 ptz = param.ClassSelector(PTZDevice,default=None,doc=""" 108 An instance of playerrobot.PTZDevice to be controlled.""") 109 110 zoom = param.Number(default=120,bounds=(0,None),doc=""" 111 Desired FOV width in degrees.""") 112 113 speed = param.Number(default=200,bounds=(0,None),doc=""" 114 Desired max pan/tilt speed in deg/sec.""") 115 116 invert_amplitude = param.Boolean(default=False,doc=""" 117 Invert the sense of the amplitude signal, in order to get the 118 appropriate ipsi-/contralateral sense of saccades.""") 119 120 dest_ports = ["Saccade"] 121 src_ports = ["State"] 122
123 - def start(self):
124 pass
125 - def input_event(self,conn,data):
126 if conn.dest_port == "Saccade": 127 # the data should be (amplitude,direction) 128 amplitude,direction = data 129 self.shift(amplitude,direction)
130
131 - def shift(self,amplitude,direction):
132 133 self.debug("Executing shift, amplitude=%.2f, direction=%.2f"%(amplitude,direction)) 134 if self.invert_amplitude: 135 amplitude *= -1 136 137 # if the amplitude is negative, invert the direction, so up is still up. 138 if amplitude < 0: 139 direction *= -1 140 angle = direction * pi/180 141 142 pan,tilt,zoom = self.ptz.state_deg 143 pan += amplitude * cos(angle) 144 tilt += amplitude * sin(angle) 145 146 self.ptz.set_ws_deg(pan,tilt,self.zoom,self.speed,self.speed)
147 ## self.ptz.cmd_queue.put_nowait(('set_ws_deg', 148 ## (pan,tilt,self.zoom,self.speed,self.speed))) 149 150 151
152 -class RealTimeSimulation(Simulation):
153 """ 154 A (quasi) real-time simulation object. 155 156 This subclass of Simulation attempts to maintain a correspondence 157 between simulation time and real time, as defined by the timescale 158 parameter. Real time simulation instances still maintain a 159 nominal, discrete simulation time that determines the order of 160 event delivery. 161 162 At the beginning of each simulation time epoch, the simulation 163 marks the actual wall clock time. After event delivery for that 164 epoch has ended, the simulation calculates the amount of 165 computation time used for event processing, and executes a real 166 sleep for the remainder of the epoch. If the computation time for 167 the epoch exceeded the real time, a warning is issued and 168 processing proceeds immediately to the next simulation time epoch. 169 170 171 RUN HOOKS 172 173 The simulation includes as parameters two lists of functions/callables, 174 run_start_hooks and run_stop_hooks, that will be called 175 immediately before and after event processing during a call to 176 .run(). This allows, for example, starting and stopping of 177 real-time devices that might use resources while the simulation is 178 not running. 179 """ 180 181 timescale = param.Number(default=1.0,bounds=(0,None),doc=""" 182 The desired real length of one simulation time unit, in milliseconds.""") 183 184 run_start_hooks = param.HookList(default=[],doc=""" 185 A list of callable objects to be called on entry to .run(), 186 before any events are processed.""") 187 188 189 run_stop_hooks = param.HookList(default=[],doc=""" 190 A list of callable objects to be called on exit from .run() 191 after all events are processed.""") 192
193 - def __init__(self,**params):
194 super(RealTimeSimulation,self).__init__(**params) 195 self._real_timestamp = 0.0
196
197 - def run(self,*args,**kw):
198 for h in self.run_start_hooks: 199 h() 200 self._real_timestamp = self.real_time() 201 super(RealTimeSimulation,self).run(*args,**kw) 202 for h in self.run_stop_hooks: 203 h()
204
205 - def real_time(self):
206 return time.time() * 1000
207
208 - def sleep(self,delay):
209 """ 210 Sleep for the number of real milliseconds seconds corresponding to the 211 given delay, subtracting off the amount of time elapsed since the 212 last sleep. 213 """ 214 sleep_ms = delay*self.timescale-(self.real_time()-self._real_timestamp) 215 216 if sleep_ms < 0: 217 self.warning("Realtime fault. Sleep delay of %f requires realtime sleep of %.2f ms." 218 %(delay,sleep_ms)) 219 else: 220 self.debug("sleeping. delay =",delay,"real delay =",sleep_ms,"ms.") 221 time.sleep(sleep_ms/1000.0) 222 self._real_timestamp = self.real_time() 223 self._time += delay
224