Source code for ethoscope.hardware.input.cameras

__author__ = 'quentin'

import cv2
try:
    from cv2.cv import CV_CAP_PROP_FRAME_WIDTH as CAP_PROP_FRAME_WIDTH
    from cv2.cv import CV_CAP_PROP_FRAME_HEIGHT as CAP_PROP_FRAME_HEIGHT
    from cv2.cv import CV_CAP_PROP_FRAME_COUNT as CAP_PROP_FRAME_COUNT
    from cv2.cv import CV_CAP_PROP_POS_MSEC as CAP_PROP_POS_MSEC
    from cv2.cv import CV_CAP_PROP_FPS as CAP_PROP_FPS

except ImportError:
    from cv2 import CAP_PROP_FRAME_WIDTH, CAP_PROP_FRAME_HEIGHT, CAP_PROP_FRAME_COUNT, CAP_PROP_POS_MSEC, CAP_PROP_FPS

import time
import logging
import os
from ethoscope.utils.debug import EthoscopeException
import multiprocessing
import traceback

[docs]class BaseCamera(object): capture = None _resolution = None _frame_idx = 0 def __init__(self,drop_each=1, max_duration=None, *args, **kwargs): """ The template class to generate and use video streams. :param drop_each: keep only ``1/drop_each``'th frame :param max_duration: stop the video stream if ``t > max_duration`` (in seconds). :param args: additional arguments :param kwargs: additional keyword arguments """ self._drop_each = drop_each self._max_duration = max_duration def __exit__(self): logging.info("Closing camera") self._close() def _close(self): pass def __iter__(self): """ Iterate thought consecutive frames of this camera. :return: the time (in ms) and a frame (numpy array). :rtype: (int, :class:`~numpy.ndarray`) """ at_least_one_frame = False while True: if self.is_last_frame() or not self.is_opened(): if not at_least_one_frame: raise EthoscopeException("Camera could not read the first frame") break t,out = self._next_time_image() if out is None: break t_ms = int(1000*t) at_least_one_frame = True if (self._frame_idx % self._drop_each) == 0: yield t_ms,out if self._max_duration is not None and t > self._max_duration: break @property def resolution(self): """ :return: The resolution of the camera W x H. :rtype: (int, int) """ return self._resolution @property def width(self): """ :return: the width of the returned frames :rtype: int """ return self._resolution[0] @property def height(self): """ :return: the height of the returned frames :rtype: int """ return self._resolution[1] def _next_time_image(self): time = self._time_stamp() im = self._next_image() self._frame_idx += 1 return time, im
[docs] def is_last_frame(self): raise NotImplementedError
def _next_image(self): raise NotImplementedError def _time_stamp(self): raise NotImplementedError
[docs] def is_opened(self): raise NotImplementedError
[docs] def restart(self): """ Restarts a camera (also resets time). :return: """ raise NotImplementedError
[docs]class MovieVirtualCamera(BaseCamera): _description = {"overview": "Class to acquire frames from a video file.", "arguments": [ {"type": "filepath", "name": "path", "description": "The path to the video file to use as virtual camera","default":"/home/gg/Desktop/demo_monitor_x5.avi.mp4"}, ]} def __init__(self, path, use_wall_clock = False, *args, **kwargs ): """ Class to acquire frames from a video file. :param path: the path of the video file :type path: str :param use_wall_clock: whether to use the real time from the machine (True) or from the video file (False).\ The former can be useful for prototyping. :type use_wall_clock: bool :param args: additional arguments. :param kwargs: additional keyword arguments. """ #print "path", path self._frame_idx = 0 self._path = path self._use_wall_clock = use_wall_clock if not (isinstance(path, str) or isinstance(path, unicode)): raise EthoscopeException("path to video must be a string") if not os.path.exists(path): raise EthoscopeException("'%s' does not exist. No such file" % path) self.canbepickled = False #cv2.videocapture object cannot be serialized, hence cannot be picked self.capture = cv2.VideoCapture(path) w = self.capture.get(CAP_PROP_FRAME_WIDTH) h = self.capture.get(CAP_PROP_FRAME_HEIGHT) self._total_n_frames =self.capture.get(CAP_PROP_FRAME_COUNT) if self._total_n_frames == 0.: self._has_end_of_file = False else: self._has_end_of_file = True self._resolution = (int(w),int(h)) super(MovieVirtualCamera, self).__init__(*args, **kwargs) # emulates v4l2 (real time camera) from video file if self._use_wall_clock: self._start_time = time.time() else: self._start_time = 0 @property def start_time(self): return self._start_time @property def path(self): return self._path
[docs] def is_opened(self): return True
[docs] def restart(self): self.__init__(self._path, use_wall_clock=self._use_wall_clock, drop_each=self._drop_each, max_duration = self._max_duration)
def _next_image(self): _, frame = self.capture.read() return frame def _time_stamp(self): if self._use_wall_clock: now = time.time() return now - self._start_time time_s = self.capture.get(CAP_PROP_POS_MSEC) / 1e3 return time_s
[docs] def is_last_frame(self): if self._has_end_of_file and self._frame_idx >= self._total_n_frames: return True return False
def _close(self): self.capture.release()
[docs]class V4L2Camera(BaseCamera): _description = {"overview": "Class to acquire frames from the V4L2 default interface (e.g. a webcam).", "arguments": [ {"type": "number", "min": 0, "max": 4, "step": 1, "name": "device", "description": "The device to be open", "default":0}, ]} def __init__(self, device=0, target_fps=5, target_resolution=(960,720), *args, **kwargs): """ class to acquire stream from a video for linux compatible device (v4l2). :param device: The index of the device, or its path. :type device: int or str :param target_fps: the desired number of frames par second (FPS) :type target_fps: int :param target_fps: the desired resolution (W x H) :param target_resolution: (int,int) :param args: additional arguments :param kwargs: additional keyword arguments """ self.canbepickled = False self.capture = cv2.VideoCapture(device) self._warm_up() w, h = target_resolution if w <0 or h <0: self.capture.set(CAP_PROP_FRAME_WIDTH, 99999) self.capture.set(CAP_PROP_FRAME_HEIGHT, 99999) else: self.capture.set(CAP_PROP_FRAME_WIDTH, w) self.capture.set(CAP_PROP_FRAME_HEIGHT, h) if not isinstance(target_fps, int): raise EthoscopeException("FPS must be an integer number") if target_fps < 2: raise EthoscopeException("FPS must be at least 2") self.capture.set(CAP_PROP_FPS, target_fps) self._target_fps = float(target_fps) _, im = self.capture.read() # preallocate image buffer => faster if im is None: raise EthoscopeException("Error whist retrieving video frame. Got None instead. Camera not plugged?") self._frame = im assert(len(im.shape) >1) self._resolution = (im.shape[1], im.shape[0]) if self._resolution != target_resolution: if w > 0 and h > 0: logging.warning('Target resolution "%s" could NOT be achieved. Effective resolution is "%s"' % (target_resolution, self._resolution )) else: logging.info('Maximal effective resolution is "%s"' % str(self._resolution)) super(V4L2Camera, self).__init__(*args, **kwargs) self._start_time = time.time() def _warm_up(self): logging.info("%s is warming up" % (str(self))) time.sleep(2)
[docs] def restart(self): self._frame_idx = 0 self._start_time = time.time()
[docs] def is_opened(self): return self.capture.isOpened()
[docs] def is_last_frame(self): return False
def _time_stamp(self): now = time.time() # relative time stamp return now - self._start_time @property def start_time(self): return self._start_time def _close(self): self.capture.release() def _next_image(self): if self._frame_idx >0 : expected_time = self._start_time + self._frame_idx / self._target_fps now = time.time() to_sleep = expected_time - now # Warnings if the fps is so high that we cannot grab fast enough if to_sleep < 0: if self._frame_idx % 5000 == 0: logging.warning("The target FPS (%f) could not be reached. Effective FPS is about %f" % (self._target_fps, self._frame_idx/(now - self._start_time))) self.capture.grab() # we simply drop frames until we go above expected time while now < expected_time: self.capture.grab() now = time.time() else: self.capture.grab() self.capture.retrieve(self._frame) return self._frame
[docs]class PiFrameGrabber(multiprocessing.Process): def __init__(self, target_fps, target_resolution, queue,stop_queue, *args, **kwargs): """ Class to grab frames from pi camera. Designed to be used within :class:`~ethoscope.hardware.camreras.camreras.OurPiCameraAsync` This allows to get frames asynchronously as acquisition is a bottleneck. :param target_fps: desired fps :type target_fps: int :param target_resolution: the desired resolution (w, h) :type target_resolution: (int, int) :param queue: a queue that stores frame and makes them available to the parent process :type queue: :class:`~multiprocessing.JoinableQueue` :param stop_queue: a queue that can stop the async acquisition :type stop_queue: :class:`~multiprocessing.JoinableQueue` :param args: additional arguments :param kwargs: additional keyword arguments """ self._queue = queue self._stop_queue = stop_queue self._target_fps = target_fps self._target_resolution = target_resolution super(PiFrameGrabber, self).__init__()
[docs] def run(self): """ Initialise pi camera, get frames, convert them fo greyscale, and make them available in a queue. Run stops if the _stop_queue is not empty. """ # lazy import should only use those on devices # from picamera.array import PiRGBArray # from picamera import PiCamera try: # lazy import should only use those on devices from picamera.array import PiRGBArray from picamera import PiCamera with PiCamera() as capture: logging.warning(capture) capture.resolution = self._target_resolution capture.framerate = self._target_fps raw_capture = PiRGBArray(capture, size=self._target_resolution) for frame in capture.capture_continuous(raw_capture, format="bgr", use_video_port=True): if not self._stop_queue.empty(): logging.warning("The stop queue is not empty. Stop acquiring frames") self._stop_queue.get() self._stop_queue.task_done() logging.warning("Stop Task Done") break raw_capture.truncate(0) # out = np.copy(frame.array) out = cv2.cvtColor(frame.array,cv2.COLOR_BGR2GRAY) #fixme here we could actually pass a JPG compressed file object (http://docs.scipy.org/doc/scipy-0.16.0/reference/generated/scipy.misc.imsave.html) # This way, we would manage to get faster FPS self._queue.put(out) finally: logging.warning("Closing frame grabber process") self._stop_queue.close() self._queue.close() logging.warning("Camera Frame grabber stopped acquisition cleanly")
[docs]class OurPiCameraAsync(BaseCamera): _description = {"overview": "Default class to acquire frames from the raspberry pi camera asynchronously.", "arguments": []} _frame_grabber_class = PiFrameGrabber def __init__(self, target_fps=20, target_resolution=(1280, 960), *args, **kwargs): """ Class to acquire frames from the raspberry pi camera asynchronously. At the moment, frames are only greyscale images. :param target_fps: the desired number of frames par second (FPS) :type target_fps: int :param target_fps: the desired resolution (W x H) :param target_resolution: (int,int) :param args: additional arguments :param kwargs: additional keyword arguments """ logging.info("Initialising camera") self.canbepickled = True #cv2.videocapture object cannot be serialized, hence cannot be picked w,h = target_resolution if not isinstance(target_fps, int): raise EthoscopeException("FPS must be an integer number") self._args = args self._kwargs = kwargs self._queue = multiprocessing.Queue(maxsize=1) self._stop_queue = multiprocessing.JoinableQueue(maxsize=1) self._p = self._frame_grabber_class(target_fps,target_resolution,self._queue,self._stop_queue, *args, **kwargs) self._p.daemon = True self._p.start() try: im = self._queue.get(timeout=10) except Exception as e: logging.error("Could not get any frame from the camera") self._stop_queue.cancel_join_thread() self._queue.cancel_join_thread() logging.warning("Stopping stop queue") self._stop_queue.close() logging.warning("Stopping queue") self._queue.close() logging.warning("Joining process") # we kill the frame grabber if it does not reply within 10s self._p.join(10) logging.warning("Process joined") raise e self._frame = cv2.cvtColor(im,cv2.COLOR_GRAY2BGR) if len(im.shape) < 2: raise EthoscopeException("The camera image is corrupted (less that 2 dimensions)") self._resolution = (im.shape[1], im.shape[0]) if self._resolution != target_resolution: if w > 0 and h > 0: logging.warning('Target resolution "%s" could NOT be achieved. Effective resolution is "%s"' % (target_resolution, self._resolution )) else: logging.info('Maximal effective resolution is "%s"' % str(self._resolution)) super(OurPiCameraAsync, self).__init__(*args, **kwargs) self._start_time = time.time() logging.info("Camera initialised")
[docs] def restart(self): self._frame_idx = 0 self._start_time = time.time()
def __getstate__(self): return {"args": self._args, "kwargs": self._kwargs, "frame_idx": self._frame_idx, "start_time": self._start_time} def __setstate__(self, state): self.__init__(*state["args"], **state["kwargs"]) self._frame_idx = int(state["frame_idx"]) self._start_time = int(state["start_time"])
[docs] def is_opened(self): return True
# return self.capture.isOpened()
[docs] def is_last_frame(self): return False
def _time_stamp(self): now = time.time() # relative time stamp return now - self._start_time @property def start_time(self): return self._start_time def _close(self): logging.info("Requesting grabbing process to stop!") self._stop_queue.put(None) while not self._queue.empty(): self._queue.get() logging.info("Joining stop queue") self._stop_queue.cancel_join_thread() self._queue.cancel_join_thread() logging.info("Stopping stop queue") self._stop_queue.close() logging.info("Stopping queue") self._queue.close() logging.info("Joining process") self._p.join() logging.info("All joined ok") def _next_image(self): try: g = self._queue.get(timeout=30) cv2.cvtColor(g,cv2.COLOR_GRAY2BGR,self._frame) return self._frame except Exception as e: raise EthoscopeException("Could not get frame from camera\n%s", traceback.format_exc(e))
[docs]class DummyFrameGrabber(multiprocessing.Process): def __init__(self, target_fps, target_resolution, queue, stop_queue, path, *args, **kwargs): """ Class to mimic the behaviour of :class:`~ethoscope.hardware.input.cameras.PiFrameGrabber`. This is intended for testing purposes. This way, we can emulate the async functionality of the hardware camera by a video file. :param target_fps: the desired number of frames par second (FPS) :type target_fps: int :param target_fps: the desired resolution (W x H) :param target_resolution: (int,int) :param args: additional arguments :param kwargs: additional keyword arguments """ self._queue = queue self._stop_queue = stop_queue self._target_fps = target_fps self._target_resolution = target_resolution self._video_file = path super(DummyFrameGrabber, self).__init__()
[docs] def run(self): try: cap = cv2.VideoCapture(self._video_file) while True: if not self._stop_queue.empty(): logging.warning("The stop queue is not empty. Stop acquiring frames") self._stop_queue.get() self._stop_queue.task_done() logging.warning("Stop Task Done") break _, out = cap.read() #todo sleep here out = cv2.cvtColor(out, cv2.COLOR_BGR2GRAY) self._queue.put(out) finally: logging.warning("Closing frame grabber process") self._stop_queue.close() self._queue.close() logging.warning("Camera Frame grabber stopped acquisition cleanly")
[docs]class DummyPiCameraAsync(OurPiCameraAsync): """ Class to mimic the behaviour of :class:`~ethoscope.hardware.input.cameras.OurPiCameraAsync`. This is intended for testing purposes. This way, we can emulate the async functionality of the hardware camera by a video file. """ _frame_grabber_class = DummyFrameGrabber