From 69b1b137a21ac5879719e043c193c8f2c8313890 Mon Sep 17 00:00:00 2001 From: Pavel Lutskov Date: Thu, 31 May 2018 20:23:49 +0200 Subject: [PATCH] Major refactoring for better usability 1. Created interface classes for reading video streams (in imagereaders.py) 2. Created a class for ball detection (for reusability) 3. Reworked colorpicker, now it is possible to choose the mode of operation from command line (available are live from Nao, from video file or from webcam). It is possible to capture only the first image of a stream and work on it. Colorpicker now can save the settings on exit or load settings on startup. --- scripts/ball_tracking.py | 116 ----------------- scripts/colorpicker.py | 244 ++++++++++++++++++------------------ scripts/imagereaders.py | 57 +++++++++ scripts/live_recognition.py | 184 ++++++++++++--------------- 4 files changed, 260 insertions(+), 341 deletions(-) delete mode 100644 scripts/ball_tracking.py create mode 100644 scripts/imagereaders.py diff --git a/scripts/ball_tracking.py b/scripts/ball_tracking.py deleted file mode 100644 index ec76fcf..0000000 --- a/scripts/ball_tracking.py +++ /dev/null @@ -1,116 +0,0 @@ -# This script recognizes the ball in a given video file -# python ball_tracking.py --video test3.avi - -# import the necessary packages -from collections import deque -import numpy as np -import argparse -import imutils -import cv2 -from time import sleep - -# construct the argument parse and parse the arguments -ap = argparse.ArgumentParser() -ap.add_argument("-v", "--video", - help="path to the (optional) video file") -ap.add_argument("-b", "--buffer", type=int, default=64, - help="max buffer size") -args = vars(ap.parse_args()) - -# define the lower and upper boundaries of the "green" -# ball in the HSV color space, then initialize the -# list of tracked points -#greenLower = (0, 17, 225) -#greenUpper = (42, 255, 255) -greenLower=(0,184,170) -greenUpper=(2,255,255) - -#greenLower = (29, 86, 6) -#greenUpper = (64, 255, 255) -pts = deque(maxlen=args["buffer"]) - -# if a video path was not supplied, grab the reference -# to the webcam -if not args.get("video", False): - camera = cv2.VideoCapture(0) - -# otherwise, grab a reference to the video file -else: - camera = cv2.VideoCapture(args["video"]) - -# keep looping -while True: - # grab the current frame - (grabbed, frame) = camera.read() - - # if we are viewing a video and we did not grab a frame, - # then we have reached the end of the video - if args.get("video") and not grabbed: - break - - # resize the frame, blur it, and convert it to the HSV - # color space - frame = imutils.resize(frame, width=600) - # blurred = cv2.GaussianBlur(frame, (11, 11), 0) - hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) - - # construct a mask for the color "green", then perform - # a series of dilations and erosions to remove any small - # blobs left in the mask - mask = cv2.inRange(hsv, greenLower, greenUpper) - mask = cv2.erode(mask, None, iterations=2) - mask = cv2.dilate(mask, None, iterations=2) - - # find contours in the mask and initialize the current - # (x, y) center of the ball - cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE)[-2] - center = None - - # only proceed if at least one contour was found - if len(cnts) > 0: - # find the largest contour in the mask, then use - # it to compute the minimum enclosing circle and - # centroid - c = max(cnts, key=cv2.contourArea) - ((x, y), radius) = cv2.minEnclosingCircle(c) - M = cv2.moments(c) - center = (int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"])) - - # only proceed if the radius meets a minimum size - if radius > 10: - # draw the circle and centroid on the frame, - # then update the list of tracked points - cv2.circle(frame, (int(x), int(y)), int(radius), - #(0, 255, 255), 2) - (0,255,255),2) - cv2.circle(frame, center, 5, (0, 0, 255), -1) - - # update the points queue - pts.appendleft(center) - - # loop over the set of tracked points - for i in xrange(1, len(pts)): - # if either of the tracked points are None, ignore - # them - if pts[i - 1] is None or pts[i] is None: - continue - - # otherwise, compute the thickness of the line and - # draw the connecting lines - thickness = int(np.sqrt(args["buffer"] / float(i + 1)) * 1.25) - cv2.line(frame, pts[i - 1], pts[i], (0, 255, 0), thickness) - - # show the frame to our screen - cv2.imshow("Frame", frame) - key = cv2.waitKey(1) & 0xFF - - sleep(0.05) - # if the 'q' key is pressed, stop the loop - if key == ord("q"): - break - -# cleanup the camera and close any open windows -camera.release() -cv2.destroyAllWindows() - diff --git a/scripts/colorpicker.py b/scripts/colorpicker.py index e81de62..68e380f 100644 --- a/scripts/colorpicker.py +++ b/scripts/colorpicker.py @@ -1,131 +1,133 @@ -# syntax python colorpicker.py -# or -# python colorpicker.py 640 480 -> set resolution - from __future__ import print_function -from live_recognition import get_frame_nao -import cv2 as cv -import imutils -from naoqi import ALProxy -import sys -max_value = 255 -max_value_H = 360 // 2 -#low_H = 0 -#low_S = 0 -#low_V = 0 -#high_H = max_value_H -#high_S = max_value -#high_V = max_value -low_H=0 -low_S=185 -low_V=170 -high_H=2 -high_S=255 -high_V=255 -window_capture_name = 'Video Capture' -window_detection_name = 'Object Detection' -low_H_name = 'Low H' -low_S_name = 'Low S' -low_V_name = 'Low V' -high_H_name = 'High H' -high_S_name = 'High S' -high_V_name = 'High V' +import json +import argparse +import cv2 +from imagereaders import VideoReader, NaoImageReader +# import imutils -def do_print(): - print('(%s %s %s): (%s %s %s)' % - (low_H, low_S, low_V, high_H, high_S, high_V)) +class Colorpicker(object): -def on_low_H_thresh_trackbar(val): - global low_H - low_H = min(high_H-1, val) - cv.setTrackbarPos(low_H_name, window_detection_name, low_H) - do_print() + WINDOW_CAPTURE_NAME = 'Video Capture' + WINDOW_DETECTION_NAME = 'Object Detection' -def on_high_H_thresh_trackbar(val): - global high_H - high_H = max(val, low_H+1) - cv.setTrackbarPos(high_H_name, window_detection_name, high_H) - do_print() + def __init__(self): + parameters = ['low_h', 'low_s', 'low_v', 'high_h', 'high_s', 'high_v'] + maxes = [180, 255, 255, 180, 255, 255] + checkers = [ + lambda x: min(x, self.settings['high_h'] - 1), # LOW H + lambda x: min(x, self.settings['high_s'] - 1), # LOW S + lambda x: min(x, self.settings['high_v'] - 1), # LOW H + lambda x: max(x, self.settings['low_h'] + 1), # HIGH H + lambda x: max(x, self.settings['low_s'] + 1), # HIGH S + lambda x: max(x, self.settings['low_v'] + 1), # HIGH V + ] + self.settings = { + 'low_h': 0, + 'low_s': 0, + 'low_v': 0, + 'high_h': 180, + 'high_s': 255, + 'high_v': 255 + } + cv2.namedWindow(self.WINDOW_CAPTURE_NAME) + cv2.namedWindow(self.WINDOW_DETECTION_NAME) + self.trackers = [ + cv2.createTrackbar( + name, self.WINDOW_DETECTION_NAME, self.settings[name], max_v, + lambda val, name=name, checker=checker: self._on_trackbar( + val, name, checker + ) + ) + for name, max_v, checker in zip(parameters, maxes, checkers) + ] -def on_low_S_thresh_trackbar(val): - global low_S - low_S = min(high_S-1, val) - cv.setTrackbarPos(low_S_name, window_detection_name, low_S) - do_print() + def do_print(self): + print(self.settings) -def on_high_S_thresh_trackbar(val): - global high_S - high_S = max(val, low_S+1) - cv.setTrackbarPos(high_S_name, window_detection_name, high_S) - do_print() + def _on_trackbar(self, val, name, checker): + self.settings[name] = checker(val) + cv2.setTrackbarPos(name, self.WINDOW_DETECTION_NAME, + self.settings[name]) -def on_low_V_thresh_trackbar(val): - global low_V - low_V = min(high_V-1, val) - cv.setTrackbarPos(low_V_name, window_detection_name, low_V) - do_print() - -def on_high_V_thresh_trackbar(val): - global high_V - high_V = max(val, low_V+1) - cv.setTrackbarPos(high_V_name, window_detection_name, high_V) - do_print() - - -cap = cv.VideoCapture(0) - -cv.namedWindow(window_capture_name) -cv.namedWindow(window_detection_name) - -cv.createTrackbar( - low_H_name, window_detection_name, low_H, - max_value_H, on_low_H_thresh_trackbar -) -cv.createTrackbar( - high_H_name, window_detection_name , high_H, max_value_H, - on_high_H_thresh_trackbar -) -cv.createTrackbar( - low_S_name, window_detection_name , low_S, max_value, - on_low_S_thresh_trackbar -) -cv.createTrackbar( - high_S_name, window_detection_name , high_S, max_value, - on_high_S_thresh_trackbar -) -cv.createTrackbar( - low_V_name, window_detection_name , low_V, max_value, - on_low_V_thresh_trackbar -) -cv.createTrackbar( - high_V_name, window_detection_name , high_V, max_value, - on_high_V_thresh_trackbar -) - -vd_proxy = ALProxy('ALVideoDevice', '192.168.0.11', 9559) -cam_subscriber = vd_proxy.subscribeCamera( - "ball_finder", 0, 3, 13, 1 -) - -frame = get_frame_nao(vd_proxy, cam_subscriber, 1280, 960) - -try: - while True: - - frame_HSV = cv.cvtColor(frame, cv.COLOR_BGR2HSV) - frame_threshold = cv.inRange( - frame_HSV, (low_H, low_S, low_V), (high_H, high_S, high_V) + def show_frame(self, frame): + frame_HSV = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) + frame_threshold = cv2.inRange( + frame_HSV, + tuple(map(self.settings.get, ('low_h', 'low_s', 'low_v'))), + tuple(map(self.settings.get, ('high_h', 'high_s', 'high_v'))) ) - if len(sys.argv) > 1: - frame_threshold = cv.resize(frame_threshold, (int(sys.argv[1]), int(sys.argv[2]))) - - cv.imshow(window_capture_name, frame) - cv.imshow(window_detection_name, frame_threshold) - + cv2.imshow(self.WINDOW_CAPTURE_NAME, frame) + cv2.imshow(self.WINDOW_DETECTION_NAME, frame_threshold) + return cv2.waitKey(1) - key = cv.waitKey(1) - if key == ord('q') or key == 27: - break -finally: - vd_proxy.unsubscribe(cam_subscriber) + def save(self, filename): + with open(filename, 'w') as f: + json.dump(self.settings, f, indent=4) + + def load(self, filename): + with open(filename) as f: + self.settings = json.load(f) + for name in self.settings: + cv2.setTrackbarPos(name, self.WINDOW_DETECTION_NAME, + self.settings[name]) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + epilog='When called without arguments specifying the video source, ' + + 'will try to use the webcam') + parser.add_argument( + '-o', '--output-config', + help='file, to which the settings will be saved (if given)' + ) + parser.add_argument( + '-i', '--input-config', + help='file, from which to read the initial values' + ) + parser.add_argument( + '--video-file', + help='video file to use' + ) + parser.add_argument( + '--still', + help='only take one image from video stream', + action='store_true' + ) + parser.add_argument( + '--nao-ip', + help='ip address of the nao robot, from which to capture' + ) + parser.add_argument( + '--nao-cam', + choices=['upper', 'lower'], + help='choose a camera from nao' + ) + args = parser.parse_args() + + cp = Colorpicker() + if args.input_config: + cp.load(args.input_config) + if args.video_file: + rdr = VideoReader(args.video_file, loop=True) + elif args.nao_ip: + rdr = NaoImageReader( + args.nao_ip, + cam_id=args.nao_cam if args.nao_cam else 0 + ) + else: + rdr = VideoReader(0) + try: + if args.still: + frame = rdr.get_frame() + while True: + if not args.still: + frame = rdr.get_frame() + key = cp.show_frame(frame) + if key == ord('q') or key == 27: + break + finally: + cp.do_print() + if args.output_config: + cp.save(args.output_config) + rdr.close() diff --git a/scripts/imagereaders.py b/scripts/imagereaders.py new file mode 100644 index 0000000..b8a1463 --- /dev/null +++ b/scripts/imagereaders.py @@ -0,0 +1,57 @@ +import numpy as np +import cv2 +# from naoqi import ALProxy +ALProxy = None + + +class NaoImageReader(object): + + RESOLUTIONS = { + 1: (240, 320), + 2: (480, 640), + 3: (960, 1280) + } + + def __init__(self, ip, port=9559, res=1, fps=30, cam_id=0): + self.res = self.RESOLUTIONS[res] + self.vd = ALProxy('ALVideoDevice', ip, port) + self.sub = self.vd.subscribeCamera( + "video_streamer", cam_id, res, 13, fps + ) + + def get_frame(self): + result = self.vd.getImageRemote(self.sub) + self.vd.releaseImage(self.sub) + if result == None: + raise RuntimeError('cannot capture') + elif result[6] == None: + raise ValueError('no image data string') + else: + height, width = self.res + return np.frombuffer(result[6], dtype=np.uint8).reshape( + height, width, 3 + ) + + def close(self): + self.vd.unsubscribe(self.sub) + + +class VideoReader(object): + + def __init__(self, filename=0, loop=False): + self.cap = cv2.VideoCapture(filename) + self.loop = loop if filename else False + self.ctr = 0 + + def get_frame(self): + succ, frame = self.cap.read() + if not succ: + raise ValueError('Error while reading video') + self.ctr += 1 + if self.ctr == self.cap.get(cv2.CAP_PROP_FRAME_COUNT) and self.loop: + self.ctr = 0 + self.cap.set(cv2.CAP_PROP_POS_FRAMES, 0) + return frame + + def close(self): + self.cap.release() diff --git a/scripts/live_recognition.py b/scripts/live_recognition.py index a4e77b7..b1a1297 100644 --- a/scripts/live_recognition.py +++ b/scripts/live_recognition.py @@ -1,137 +1,113 @@ from __future__ import print_function from __future__ import division +import json import cv2 import numpy as np import imutils -from naoqi import ALProxy +from imagereaders import NaoImageReader, VideoReader from collections import deque -# Nao configuration -nao_ip = '192.168.0.11' -nao_port = 9559 -res = (1, (240, 320)) # NAOQi code and acutal resolution -fps = 30 -cam_id = 1 # 0 := top, 1 := bottom - -# Recognition stuff red_lower = (0, 185, 170) # HSV coded red interval red_upper = (2, 255, 255) -min_radius = 5 -resized_width = None # Maybe we need it maybe don't (None if don't) - -def get_frame_nao(cam_proxy, subscriber, width, height): - result = cam_proxy.getImageRemote(subscriber) - cam_proxy.releaseImage(subscriber) - if result == None: - raise RuntimeError('cannot capture') - elif result[6] == None: - raise ValueError('no image data string') - else: - return np.frombuffer(result[6], dtype=np.uint8).reshape( - height, width, 3 - ) - # i = 0 - # for y in range(res[1][0]): - # for x in range(res[1][1]): # columnwise - # image.itemset((y, x, 0), values[i + 0]) - # image.itemset((y, x, 1), values[i + 1]) - # image.itemset((y, x, 2), values[i + 2]) - # i += 3 - # return image -def find_colored_ball(frame, hsv_lower, hsv_upper, min_radius): - hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) +class BallFinder(object): - # construct a mask for the color "green", then perform a series of - # dilations and erosions to remove any small blobs left in the mask - mask = cv2.inRange(hsv, hsv_lower, hsv_upper) - mask = cv2.erode(mask, None, iterations=2) - mask = cv2.dilate(mask, None, iterations=2) - cv2.imshow('ball_mask', mask) - cv2.waitKey(1) + def __init__(self, hsv_lower, hsv_upper, min_radius, width): - # find contours in the mask and initialize the current - # (x, y) center of the ball - cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, - cv2.CHAIN_APPROX_SIMPLE)[-2] + self.hsv_lower = hsv_lower + self.hsv_upper = hsv_upper + self.min_radius = min_radius + self.width = width + self.history = deque(maxlen=64) + self.last_center = None + self.last_radius = None - # only proceed if at least one contour was found - if len(cnts) == 0: - return None + cv2.namedWindow('ball_mask') + cv2.namedWindow('Frame') - # find the largest contour in the mask, then use it to compute - # the minimum enclosing circle and centroid - c = max(cnts, key=cv2.contourArea) - ((x, y), radius) = cv2.minEnclosingCircle(c) + def find_colored_ball(self, frame): + hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) - if radius < min_radius: - return None + # construct a mask for the color, then perform a series of + # dilations and erosions to remove any small blobs left in the mask + mask = cv2.inRange(hsv, self.hsv_lower, self.hsv_upper) + mask = cv2.erode(mask, None, iterations=2) + mask = cv2.dilate(mask, None, iterations=2) + cv2.imshow('ball_mask', mask) - M = cv2.moments(c) - center = (int(M["m10"] / M["m00"]),int(M["m01"] // M["m00"])) - return center, int(radius) + # find contours in the mask and initialize the current + # (x, y) center of the ball + cnts = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL, + cv2.CHAIN_APPROX_SIMPLE)[-2] + # only proceed if at least one contour was found + if len(cnts) == 0: + return None -def draw_ball_markers(frame, center, radius, history): - # draw the enclosing circle and ball's centroid on the frame, - if center is not None and radius is not None: - cv2.circle(frame, center, radius, (255, 255, 0), 1) - cv2.circle(frame, center, 5, (0, 255, 0), -1) + # find the largest contour in the mask, then use it to compute + # the minimum enclosing circle and centroid + c = max(cnts, key=cv2.contourArea) + ((x, y), radius) = cv2.minEnclosingCircle(c) - # loop over the set of tracked points - for i in range(1, len(history)): - # if either of the tracked points are None, ignore them - if history[i - 1] is None or history[i] is None: - continue - # otherwise, compute the thickness of the line and - # draw the connecting lines - thickness = int(np.sqrt(64 / float(i + 1)) * 2.5) - cv2.line(frame, history[i - 1], history[i], (0, 255, 0), thickness) + if radius < self.min_radius: + return None - return frame + M = cv2.moments(c) + center = (int(M["m10"] / M["m00"]),int(M["m01"] // M["m00"])) + return center, int(radius) + def next_frame(self, frame): + # maybe resize the frame, maybe blur it + if self.width is not None: + frame = imutils.resize(frame, width=self.width) + try: + self.last_center, self.last_radius = self.find_colored_ball(frame) + except TypeError: # No red ball found and function returned None + self.last_center, self.last_radius = None, None -def nao_demo(): - cv2.namedWindow('ball_mask') - cv2.namedWindow('Frame') + self.history.appendleft(self.last_center) + self.draw_ball_markers(frame) - vd_proxy = ALProxy('ALVideoDevice', nao_ip, nao_port) - cam_subscriber = vd_proxy.subscribeCamera( - "ball_finder", cam_id, res[0], 13, fps - ) - history = deque(maxlen=64) + # show the frame to screen + cv2.imshow("Frame", frame) + return cv2.waitKey(2) - try: - while True: - frame = get_frame_nao(vd_proxy, cam_subscriber, res[1][1], - res[1][0]) + def draw_ball_markers(self, frame): + # draw the enclosing circle and ball's centroid on the frame, + if self.last_center is not None and self.last_radius is not None: + cv2.circle(frame, self.last_center, self.last_radius, + (255, 255, 0), 1) + cv2.circle(frame, self.last_center, 5, (0, 255, 0), -1) - # maybe resize the frame, maybe blur it - if resized_width is not None: - frame = imutils.resize(frame, width=resized_width) - # blurred = cv2.GaussianBlur(frame, (11, 11), 0) + # loop over the set of tracked points + for i in range(1, len(self.history)): + # if either of the tracked points are None, ignore them + if self.history[i - 1] is None or self.history[i] is None: + continue + # otherwise, compute the thickness of the line and + # draw the connecting lines + thickness = int(np.sqrt(64 / float(i + 1)) * 2.5) + cv2.line(frame, self.history[i - 1], self.history[i], + (0, 255, 0), thickness) - try: - center, radius = find_colored_ball( - frame, red_lower, red_upper, min_radius - ) - history.appendleft(center) - draw_ball_markers(frame, center, radius, history) - except TypeError: # No red ball found and function returned None - history.appendleft(None) - draw_ball_markers(frame, None, None, history) + return frame - # show the frame to screen - cv2.imshow("Frame", frame) - cv2.waitKey(1) - - finally: - vd_proxy.unsubscribe(cam_subscriber) - cv2.destroyAllWindows() + def load_hsv_config(self, filename): + with open(filename) as f: + hsv = json.load(f) + self.hsv_lower = tuple(map(hsv.get, ('low_h', 'low_s', 'low_v'))) + self.hsv_upper = tuple(map(hsv.get, ('high_h', 'high_s', 'high_v'))) if __name__ == '__main__': - nao_demo() + # video = NaoImageReader('192.168.0.11') + video = VideoReader(0, loop=True) + finder = BallFinder(red_lower, red_upper, 5, None) + try: + while True: + finder.next_frame(video.get_frame()) + finally: + video.close()