Major refactoring for better usability

1. Created interface classes for reading video streams
   (in imagereaders.py)
2. Created a class for ball detection (for reusability)
3. Reworked colorpicker, now it is possible to choose the mode of
   operation from command line (available are live from Nao, from
   video file or from webcam). It is possible to capture only the
   first image of a stream and work on it. Colorpicker now can save
   the settings on exit or load settings on startup.
This commit is contained in:
2018-05-31 20:23:49 +02:00
parent 57cf0b2206
commit 69b1b137a2
4 changed files with 260 additions and 341 deletions

View File

@@ -1,116 +0,0 @@
# This script recognizes the ball in a given video file
# python ball_tracking.py --video test3.avi
# import the necessary packages
from collections import deque
import numpy as np
import argparse
import imutils
import cv2
from time import sleep
# construct the argument parse and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-v", "--video",
help="path to the (optional) video file")
ap.add_argument("-b", "--buffer", type=int, default=64,
help="max buffer size")
args = vars(ap.parse_args())
# define the lower and upper boundaries of the "green"
# ball in the HSV color space, then initialize the
# list of tracked points
#greenLower = (0, 17, 225)
#greenUpper = (42, 255, 255)
greenLower=(0,184,170)
greenUpper=(2,255,255)
#greenLower = (29, 86, 6)
#greenUpper = (64, 255, 255)
pts = deque(maxlen=args["buffer"])
# if a video path was not supplied, grab the reference
# to the webcam
if not args.get("video", False):
camera = cv2.VideoCapture(0)
# otherwise, grab a reference to the video file
else:
camera = cv2.VideoCapture(args["video"])
# keep looping
while True:
# grab the current frame
(grabbed, frame) = camera.read()
# if we are viewing a video and we did not grab a frame,
# then we have reached the end of the video
if args.get("video") and not grabbed:
break
# resize the frame, blur it, and convert it to the HSV
# color space
frame = imutils.resize(frame, width=600)
# blurred = cv2.GaussianBlur(frame, (11, 11), 0)
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# construct a mask for the color "green", then perform
# a series of dilations and erosions to remove any small
# blobs left in the mask
mask = cv2.inRange(hsv, greenLower, greenUpper)
mask = cv2.erode(mask, None, iterations=2)
mask = cv2.dilate(mask, None, iterations=2)
# find contours in the mask and initialize the current
# (x, y) center of the ball
cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)[-2]
center = None
# only proceed if at least one contour was found
if len(cnts) > 0:
# find the largest contour in the mask, then use
# it to compute the minimum enclosing circle and
# centroid
c = max(cnts, key=cv2.contourArea)
((x, y), radius) = cv2.minEnclosingCircle(c)
M = cv2.moments(c)
center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]))
# only proceed if the radius meets a minimum size
if radius > 10:
# draw the circle and centroid on the frame,
# then update the list of tracked points
cv2.circle(frame, (int(x), int(y)), int(radius),
#(0, 255, 255), 2)
(0,255,255),2)
cv2.circle(frame, center, 5, (0, 0, 255), -1)
# update the points queue
pts.appendleft(center)
# loop over the set of tracked points
for i in xrange(1, len(pts)):
# if either of the tracked points are None, ignore
# them
if pts[i - 1] is None or pts[i] is None:
continue
# otherwise, compute the thickness of the line and
# draw the connecting lines
thickness = int(np.sqrt(args["buffer"] / float(i + 1)) * 1.25)
cv2.line(frame, pts[i - 1], pts[i], (0, 255, 0), thickness)
# show the frame to our screen
cv2.imshow("Frame", frame)
key = cv2.waitKey(1) & 0xFF
sleep(0.05)
# if the 'q' key is pressed, stop the loop
if key == ord("q"):
break
# cleanup the camera and close any open windows
camera.release()
cv2.destroyAllWindows()

View File

@@ -1,131 +1,133 @@
# syntax python colorpicker.py
# or
# python colorpicker.py 640 480 -> set resolution
from __future__ import print_function from __future__ import print_function
from live_recognition import get_frame_nao
import cv2 as cv
import imutils
from naoqi import ALProxy
import sys
max_value = 255 import json
max_value_H = 360 // 2 import argparse
#low_H = 0 import cv2
#low_S = 0 from imagereaders import VideoReader, NaoImageReader
#low_V = 0 # import imutils
#high_H = max_value_H
#high_S = max_value
#high_V = max_value
low_H=0
low_S=185
low_V=170
high_H=2
high_S=255
high_V=255
window_capture_name = 'Video Capture'
window_detection_name = 'Object Detection'
low_H_name = 'Low H'
low_S_name = 'Low S'
low_V_name = 'Low V'
high_H_name = 'High H'
high_S_name = 'High S'
high_V_name = 'High V'
def do_print(): class Colorpicker(object):
print('(%s %s %s): (%s %s %s)' %
(low_H, low_S, low_V, high_H, high_S, high_V))
def on_low_H_thresh_trackbar(val): WINDOW_CAPTURE_NAME = 'Video Capture'
global low_H WINDOW_DETECTION_NAME = 'Object Detection'
low_H = min(high_H-1, val)
cv.setTrackbarPos(low_H_name, window_detection_name, low_H)
do_print()
def on_high_H_thresh_trackbar(val): def __init__(self):
global high_H parameters = ['low_h', 'low_s', 'low_v', 'high_h', 'high_s', 'high_v']
high_H = max(val, low_H+1) maxes = [180, 255, 255, 180, 255, 255]
cv.setTrackbarPos(high_H_name, window_detection_name, high_H) checkers = [
do_print() lambda x: min(x, self.settings['high_h'] - 1), # LOW H
lambda x: min(x, self.settings['high_s'] - 1), # LOW S
lambda x: min(x, self.settings['high_v'] - 1), # LOW H
lambda x: max(x, self.settings['low_h'] + 1), # HIGH H
lambda x: max(x, self.settings['low_s'] + 1), # HIGH S
lambda x: max(x, self.settings['low_v'] + 1), # HIGH V
]
self.settings = {
'low_h': 0,
'low_s': 0,
'low_v': 0,
'high_h': 180,
'high_s': 255,
'high_v': 255
}
cv2.namedWindow(self.WINDOW_CAPTURE_NAME)
cv2.namedWindow(self.WINDOW_DETECTION_NAME)
self.trackers = [
cv2.createTrackbar(
name, self.WINDOW_DETECTION_NAME, self.settings[name], max_v,
lambda val, name=name, checker=checker: self._on_trackbar(
val, name, checker
)
)
for name, max_v, checker in zip(parameters, maxes, checkers)
]
def on_low_S_thresh_trackbar(val): def do_print(self):
global low_S print(self.settings)
low_S = min(high_S-1, val)
cv.setTrackbarPos(low_S_name, window_detection_name, low_S)
do_print()
def on_high_S_thresh_trackbar(val): def _on_trackbar(self, val, name, checker):
global high_S self.settings[name] = checker(val)
high_S = max(val, low_S+1) cv2.setTrackbarPos(name, self.WINDOW_DETECTION_NAME,
cv.setTrackbarPos(high_S_name, window_detection_name, high_S) self.settings[name])
do_print()
def on_low_V_thresh_trackbar(val): def show_frame(self, frame):
global low_V frame_HSV = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
low_V = min(high_V-1, val) frame_threshold = cv2.inRange(
cv.setTrackbarPos(low_V_name, window_detection_name, low_V) frame_HSV,
do_print() tuple(map(self.settings.get, ('low_h', 'low_s', 'low_v'))),
tuple(map(self.settings.get, ('high_h', 'high_s', 'high_v')))
def on_high_V_thresh_trackbar(val):
global high_V
high_V = max(val, low_V+1)
cv.setTrackbarPos(high_V_name, window_detection_name, high_V)
do_print()
cap = cv.VideoCapture(0)
cv.namedWindow(window_capture_name)
cv.namedWindow(window_detection_name)
cv.createTrackbar(
low_H_name, window_detection_name, low_H,
max_value_H, on_low_H_thresh_trackbar
)
cv.createTrackbar(
high_H_name, window_detection_name , high_H, max_value_H,
on_high_H_thresh_trackbar
)
cv.createTrackbar(
low_S_name, window_detection_name , low_S, max_value,
on_low_S_thresh_trackbar
)
cv.createTrackbar(
high_S_name, window_detection_name , high_S, max_value,
on_high_S_thresh_trackbar
)
cv.createTrackbar(
low_V_name, window_detection_name , low_V, max_value,
on_low_V_thresh_trackbar
)
cv.createTrackbar(
high_V_name, window_detection_name , high_V, max_value,
on_high_V_thresh_trackbar
)
vd_proxy = ALProxy('ALVideoDevice', '192.168.0.11', 9559)
cam_subscriber = vd_proxy.subscribeCamera(
"ball_finder", 0, 3, 13, 1
)
frame = get_frame_nao(vd_proxy, cam_subscriber, 1280, 960)
try:
while True:
frame_HSV = cv.cvtColor(frame, cv.COLOR_BGR2HSV)
frame_threshold = cv.inRange(
frame_HSV, (low_H, low_S, low_V), (high_H, high_S, high_V)
) )
if len(sys.argv) > 1: cv2.imshow(self.WINDOW_CAPTURE_NAME, frame)
frame_threshold = cv.resize(frame_threshold, (int(sys.argv[1]), int(sys.argv[2]))) cv2.imshow(self.WINDOW_DETECTION_NAME, frame_threshold)
return cv2.waitKey(1)
cv.imshow(window_capture_name, frame) def save(self, filename):
cv.imshow(window_detection_name, frame_threshold) with open(filename, 'w') as f:
json.dump(self.settings, f, indent=4)
def load(self, filename):
with open(filename) as f:
self.settings = json.load(f)
for name in self.settings:
cv2.setTrackbarPos(name, self.WINDOW_DETECTION_NAME,
self.settings[name])
key = cv.waitKey(1) if __name__ == '__main__':
if key == ord('q') or key == 27: parser = argparse.ArgumentParser(
break epilog='When called without arguments specifying the video source, ' +
finally: 'will try to use the webcam')
vd_proxy.unsubscribe(cam_subscriber) parser.add_argument(
'-o', '--output-config',
help='file, to which the settings will be saved (if given)'
)
parser.add_argument(
'-i', '--input-config',
help='file, from which to read the initial values'
)
parser.add_argument(
'--video-file',
help='video file to use'
)
parser.add_argument(
'--still',
help='only take one image from video stream',
action='store_true'
)
parser.add_argument(
'--nao-ip',
help='ip address of the nao robot, from which to capture'
)
parser.add_argument(
'--nao-cam',
choices=['upper', 'lower'],
help='choose a camera from nao'
)
args = parser.parse_args()
cp = Colorpicker()
if args.input_config:
cp.load(args.input_config)
if args.video_file:
rdr = VideoReader(args.video_file, loop=True)
elif args.nao_ip:
rdr = NaoImageReader(
args.nao_ip,
cam_id=args.nao_cam if args.nao_cam else 0
)
else:
rdr = VideoReader(0)
try:
if args.still:
frame = rdr.get_frame()
while True:
if not args.still:
frame = rdr.get_frame()
key = cp.show_frame(frame)
if key == ord('q') or key == 27:
break
finally:
cp.do_print()
if args.output_config:
cp.save(args.output_config)
rdr.close()

57
scripts/imagereaders.py Normal file
View File

@@ -0,0 +1,57 @@
import numpy as np
import cv2
# from naoqi import ALProxy
ALProxy = None
class NaoImageReader(object):
RESOLUTIONS = {
1: (240, 320),
2: (480, 640),
3: (960, 1280)
}
def __init__(self, ip, port=9559, res=1, fps=30, cam_id=0):
self.res = self.RESOLUTIONS[res]
self.vd = ALProxy('ALVideoDevice', ip, port)
self.sub = self.vd.subscribeCamera(
"video_streamer", cam_id, res, 13, fps
)
def get_frame(self):
result = self.vd.getImageRemote(self.sub)
self.vd.releaseImage(self.sub)
if result == None:
raise RuntimeError('cannot capture')
elif result[6] == None:
raise ValueError('no image data string')
else:
height, width = self.res
return np.frombuffer(result[6], dtype=np.uint8).reshape(
height, width, 3
)
def close(self):
self.vd.unsubscribe(self.sub)
class VideoReader(object):
def __init__(self, filename=0, loop=False):
self.cap = cv2.VideoCapture(filename)
self.loop = loop if filename else False
self.ctr = 0
def get_frame(self):
succ, frame = self.cap.read()
if not succ:
raise ValueError('Error while reading video')
self.ctr += 1
if self.ctr == self.cap.get(cv2.CAP_PROP_FRAME_COUNT) and self.loop:
self.ctr = 0
self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
return frame
def close(self):
self.cap.release()

View File

@@ -1,137 +1,113 @@
from __future__ import print_function from __future__ import print_function
from __future__ import division from __future__ import division
import json
import cv2 import cv2
import numpy as np import numpy as np
import imutils import imutils
from naoqi import ALProxy from imagereaders import NaoImageReader, VideoReader
from collections import deque from collections import deque
# Nao configuration
nao_ip = '192.168.0.11'
nao_port = 9559
res = (1, (240, 320)) # NAOQi code and acutal resolution
fps = 30
cam_id = 1 # 0 := top, 1 := bottom
# Recognition stuff
red_lower = (0, 185, 170) # HSV coded red interval red_lower = (0, 185, 170) # HSV coded red interval
red_upper = (2, 255, 255) red_upper = (2, 255, 255)
min_radius = 5
resized_width = None # Maybe we need it maybe don't (None if don't)
def get_frame_nao(cam_proxy, subscriber, width, height):
result = cam_proxy.getImageRemote(subscriber)
cam_proxy.releaseImage(subscriber)
if result == None:
raise RuntimeError('cannot capture')
elif result[6] == None:
raise ValueError('no image data string')
else:
return np.frombuffer(result[6], dtype=np.uint8).reshape(
height, width, 3
)
# i = 0
# for y in range(res[1][0]):
# for x in range(res[1][1]): # columnwise
# image.itemset((y, x, 0), values[i + 0])
# image.itemset((y, x, 1), values[i + 1])
# image.itemset((y, x, 2), values[i + 2])
# i += 3
# return image
def find_colored_ball(frame, hsv_lower, hsv_upper, min_radius): class BallFinder(object):
hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
# construct a mask for the color "green", then perform a series of def __init__(self, hsv_lower, hsv_upper, min_radius, width):
# dilations and erosions to remove any small blobs left in the mask
mask = cv2.inRange(hsv, hsv_lower, hsv_upper)
mask = cv2.erode(mask, None, iterations=2)
mask = cv2.dilate(mask, None, iterations=2)
cv2.imshow('ball_mask', mask)
cv2.waitKey(1)
# find contours in the mask and initialize the current self.hsv_lower = hsv_lower
# (x, y) center of the ball self.hsv_upper = hsv_upper
cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, self.min_radius = min_radius
cv2.CHAIN_APPROX_SIMPLE)[-2] self.width = width
self.history = deque(maxlen=64)
self.last_center = None
self.last_radius = None
# only proceed if at least one contour was found cv2.namedWindow('ball_mask')
if len(cnts) == 0: cv2.namedWindow('Frame')
return None
# find the largest contour in the mask, then use it to compute def find_colored_ball(self, frame):
# the minimum enclosing circle and centroid hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
c = max(cnts, key=cv2.contourArea)
((x, y), radius) = cv2.minEnclosingCircle(c)
if radius < min_radius: # construct a mask for the color, then perform a series of
return None # dilations and erosions to remove any small blobs left in the mask
mask = cv2.inRange(hsv, self.hsv_lower, self.hsv_upper)
mask = cv2.erode(mask, None, iterations=2)
mask = cv2.dilate(mask, None, iterations=2)
cv2.imshow('ball_mask', mask)
M = cv2.moments(c) # find contours in the mask and initialize the current
center = (int(M["m10"] / M["m00"]),int(M["m01"] // M["m00"])) # (x, y) center of the ball
return center, int(radius) cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL,
cv2.CHAIN_APPROX_SIMPLE)[-2]
# only proceed if at least one contour was found
if len(cnts) == 0:
return None
def draw_ball_markers(frame, center, radius, history): # find the largest contour in the mask, then use it to compute
# draw the enclosing circle and ball's centroid on the frame, # the minimum enclosing circle and centroid
if center is not None and radius is not None: c = max(cnts, key=cv2.contourArea)
cv2.circle(frame, center, radius, (255, 255, 0), 1) ((x, y), radius) = cv2.minEnclosingCircle(c)
cv2.circle(frame, center, 5, (0, 255, 0), -1)
# loop over the set of tracked points if radius < self.min_radius:
for i in range(1, len(history)): return None
# if either of the tracked points are None, ignore them
if history[i - 1] is None or history[i] is None:
continue
# otherwise, compute the thickness of the line and
# draw the connecting lines
thickness = int(np.sqrt(64 / float(i + 1)) * 2.5)
cv2.line(frame, history[i - 1], history[i], (0, 255, 0), thickness)
return frame M = cv2.moments(c)
center = (int(M["m10"] / M["m00"]),int(M["m01"] // M["m00"]))
return center, int(radius)
def next_frame(self, frame):
# maybe resize the frame, maybe blur it
if self.width is not None:
frame = imutils.resize(frame, width=self.width)
try:
self.last_center, self.last_radius = self.find_colored_ball(frame)
except TypeError: # No red ball found and function returned None
self.last_center, self.last_radius = None, None
def nao_demo(): self.history.appendleft(self.last_center)
cv2.namedWindow('ball_mask') self.draw_ball_markers(frame)
cv2.namedWindow('Frame')
vd_proxy = ALProxy('ALVideoDevice', nao_ip, nao_port) # show the frame to screen
cam_subscriber = vd_proxy.subscribeCamera( cv2.imshow("Frame", frame)
"ball_finder", cam_id, res[0], 13, fps return cv2.waitKey(2)
)
history = deque(maxlen=64)
try: def draw_ball_markers(self, frame):
while True: # draw the enclosing circle and ball's centroid on the frame,
frame = get_frame_nao(vd_proxy, cam_subscriber, res[1][1], if self.last_center is not None and self.last_radius is not None:
res[1][0]) cv2.circle(frame, self.last_center, self.last_radius,
(255, 255, 0), 1)
cv2.circle(frame, self.last_center, 5, (0, 255, 0), -1)
# maybe resize the frame, maybe blur it # loop over the set of tracked points
if resized_width is not None: for i in range(1, len(self.history)):
frame = imutils.resize(frame, width=resized_width) # if either of the tracked points are None, ignore them
# blurred = cv2.GaussianBlur(frame, (11, 11), 0) if self.history[i - 1] is None or self.history[i] is None:
continue
# otherwise, compute the thickness of the line and
# draw the connecting lines
thickness = int(np.sqrt(64 / float(i + 1)) * 2.5)
cv2.line(frame, self.history[i - 1], self.history[i],
(0, 255, 0), thickness)
try: return frame
center, radius = find_colored_ball(
frame, red_lower, red_upper, min_radius
)
history.appendleft(center)
draw_ball_markers(frame, center, radius, history)
except TypeError: # No red ball found and function returned None
history.appendleft(None)
draw_ball_markers(frame, None, None, history)
# show the frame to screen def load_hsv_config(self, filename):
cv2.imshow("Frame", frame) with open(filename) as f:
cv2.waitKey(1) hsv = json.load(f)
self.hsv_lower = tuple(map(hsv.get, ('low_h', 'low_s', 'low_v')))
finally: self.hsv_upper = tuple(map(hsv.get, ('high_h', 'high_s', 'high_v')))
vd_proxy.unsubscribe(cam_subscriber)
cv2.destroyAllWindows()
if __name__ == '__main__': if __name__ == '__main__':
nao_demo() # video = NaoImageReader('192.168.0.11')
video = VideoReader(0, loop=True)
finder = BallFinder(red_lower, red_upper, 5, None)
try:
while True:
finder.next_frame(video.get_frame())
finally:
video.close()