Files
kick-it/pykick/imagereaders.py

180 lines
5.0 KiB
Python

from __future__ import division
import numpy as np
import cv2
try:
from naoqi import ALProxy
except:
ALProxy = None
class NaoImageReader(object):
"""Class for reading images from NAO.
Depends on ALProxy from NAOqi SDK.
"""
RESOLUTIONS = {
0: (120, 160), # 160x120
1: (240, 320), # and so on
2: (480, 640),
3: (960, 1280)
}
def __init__(self, ip, port=9559, res=1, fps=30, cam_id=0,
video_file=None):
"""
Parameters
----------
ip : string
IP-Address of the NAO,
port : int
Port on NAO where ALProxy listens.
res : int
Resolution, see RESOLUTIONS above
fps : int
Frames per second, not sure if it does anything.
cam_id : 0 or 1
0 for top camera, 1 for lower camera.
video_file : string or None
Use None, otherwise is likely to crash.
"""
ip = bytes(ip) # Python2 thing
self.res_id = res
self.recording = []
self.res = self.RESOLUTIONS[res]
self.video_file = video_file
self.cam_id=cam_id
self.fps = fps
self.vd = ALProxy('ALVideoDevice', ip, port)
streamer_name = '_'.join(['lower' if cam_id else 'upper', str(res)])
self.sub = self.vd.subscribeCamera(
streamer_name, cam_id, res, 13, fps
)
def to_angles(self, x, y):
"""Transform relative x, y coordinates into camera angles.
Parameters
----------
x, y : float
Relative X and Y coordinate (0.0 to 1.0) as returned by
`to_relative`.
Returns
-------
tuple
(x_ang, y_ang) angular coordinates in camera frame in radians.
"""
return self.vd.getAngularPositionFromImagePosition(
self.cam_id, [x, y]
)
def to_relative(self, x, y):
"""Transform pixel coordinates into relative coordinates.
Parameters
----------
x, y : float
Pixel coordinates to transform.
Returns
-------
tuple
(x_rel, y_rel) Relative (0.0 to 1.0) coordinates.
"""
print('Camera resolution:', self.res)
return x / self.res[1], y / self.res[0]
def get_frame(self):
"""Get the next frame as a BGR OpenCV image."""
result = self.vd.getImageRemote(self.sub)
self.vd.releaseImage(self.sub)
if result is None or result[6] is None:
self.restart()
raise RuntimeError(self.sub + " couldn't capture")
else:
height, width = self.res
frame = np.frombuffer(result[6], dtype=np.uint8).reshape(
height, width, 3
)
if self.video_file is not None:
self.recording.append(frame)
return frame
def close(self):
"""Stop and clean up."""
self.vd.unsubscribe(self.sub)
print(self.sub + 'captured %s frames' % len(self.recording))
print('Writing to', self.video_file)
if self.video_file is not None:
vf = cv2.VideoWriter(self.video_file,
cv2.cv.FOURCC('X', 'V', 'I', 'D'),
self.fps,
(self.res[1], self.res[0]))
for frame in self.recording:
vf.write(frame)
vf.release()
def restart(self):
"""Restart the proxy if something went wrong."""
self.vd.unsubscribe(self.sub)
self.sub = self.vd.subscribeCamera(
self.sub, self.cam_id, self.res_id, 13, self.fps
)
self.recording = []
class VideoReader(object):
"""Class with the same interface as `NaoImageReader` for videofiles."""
def __init__(self, filename=0, loop=False):
"""
Parameters
----------
filename : string or 0
Will try to read from file if name given or from webcam if 0
specified.
loop : bool
If video is over, start again or stop?
"""
self.cap = cv2.VideoCapture(filename)
self.loop = loop if filename else False
self.ctr = 0
def get_frame(self):
"""Get the next frame as a BGR OpenCV image."""
succ, frame = self.cap.read()
if not succ:
raise ValueError('Error while reading video.\n' +
'Or video is over.')
self.ctr += 1
if (self.ctr == self.cap.get(cv2.cv.CV_CAP_PROP_FRAME_COUNT) and
self.loop):
self.ctr = 0
self.cap.set(cv2.cv.CV_CAP_PROP_POS_FRAMES, 0)
return frame
def close(self):
"""Cleanup and stop."""
self.cap.release()
class PictureReader(object):
"Dummy class for reading image files for maybe convenience."
def __init__(self, filename):
self.frame = cv2.imread(filename)
def get_frame(self):
return self.frame.copy()
def close(self):
self.frame = None