Source code for ethoscope.web_utils.record

from os import path
# from threading import Thread
import traceback
import logging
import time
from ethoscope.web_utils.control_thread import ControlThread, ExperimentalInformations
from ethoscope.utils.description import DescribedObject
import os
import tempfile
import shutil
import multiprocessing
import glob
import os
import datetime

[docs]class PiCameraProcess(multiprocessing.Process): _VIDEO_CHUNCK_DURATION = 30 * 10 def __init__(self, stop_queue,video_prefix, video_root_dir, img_path, width, height, fps, bitrate): self._stop_queue = stop_queue self._img_path = img_path self._resolution = (width, height) self._fps = fps self._bitrate = bitrate self._video_prefix = video_prefix self._video_root_dir = video_root_dir super(PiCameraProcess, self).__init__() def _make_video_name(self, i): w,h = self._resolution video_info= "%ix%i@%i" %(w, h, self._fps) return '%s_%s_%05d.h264' % (self._video_prefix, video_info, i) # def _write_video_index(self): # index_file = os.path.join(self._video_root_dir, "index.html") # all_video_files = [y for x in os.walk(self._video_root_dir) for y in glob.glob(os.path.join(x[0], '*.h264'))] # # with open(index_file, "w") as index: # for f in all_video_files: # index.write(f + "\n")
[docs] def run(self): import picamera i = 0 try: with picamera.PiCamera() as camera: camera.resolution = self._resolution camera.framerate = self._fps camera.start_recording(self._make_video_name(i), bitrate=self._bitrate) # self._write_video_index() start_time = time.time() i += 1 while True: camera.wait_recording(2) camera.capture(self._img_path, use_video_port=True, quality=50) if time.time() - start_time >= self._VIDEO_CHUNCK_DURATION: camera.split_recording(self._make_video_name(i)) # self._write_video_index() start_time = time.time() i += 1 if not self._stop_queue.empty(): self._stop_queue.get() self._stop_queue.task_done() break camera.wait_recording(1) camera.stop_recording() except Exception as e: logging.error("Error or starting video record:" + traceback.format_exc(e))
[docs]class GeneralVideoRecorder(DescribedObject): _description = { "overview": "A video simple recorder", "arguments": [ {"type": "number", "name":"width", "description": "The width of the frame","default":1280, "min":480, "max":1980,"step":1}, {"type": "number", "name":"height", "description": "The height of the frame","default":960, "min":360, "max":1080,"step":1}, {"type": "number", "name":"fps", "description": "The target number of frames per seconds","default":25, "min":1, "max":25,"step":1}, {"type": "number", "name":"bitrate", "description": "The target bitrate","default":200000, "min":0, "max":10000000,"step":1000} ]} def __init__(self, video_prefix, video_dir, img_path,width=1280, height=960,fps=25,bitrate=200000): self._stop_queue = multiprocessing.JoinableQueue(maxsize=1) self._p = PiCameraProcess(self._stop_queue, video_prefix, video_dir, img_path, width, height,fps, bitrate)
[docs] def run(self): self._is_recording = True self._p.start() while self._p.is_alive(): time.sleep(.25)
[docs] def stop(self): self._is_recording = False self._stop_queue.put(None) self._stop_queue.close() self._p.join(10)
[docs]class HDVideoRecorder(GeneralVideoRecorder): _description = { "overview": "A preset 1920 x 1080, 25fps, bitrate = 5e5 video recorder. " "At this resolution, the field of view is only partial, " "so we effectively zoom in the middle of arenas","arguments": []} def __init__(self, video_prefix, video_dir, img_path): super(HDVideoRecorder, self).__init__(video_prefix, video_dir, img_path, width=1920, height=1080,fps=25,bitrate=1000000)
[docs]class StandardVideoRecorder(GeneralVideoRecorder): _description = { "overview": "A preset 1280 x 960, 25fps, bitrate = 2e5 video recorder.", "arguments": []} def __init__(self, video_prefix, video_dir, img_path): super(StandardVideoRecorder, self).__init__(video_prefix, video_dir, img_path, width=1280, height=960,fps=25,bitrate=500000)
[docs]class ControlThreadVideoRecording(ControlThread): _evanescent = False _option_dict = { "recorder":{ "possible_classes":[StandardVideoRecorder, HDVideoRecorder, GeneralVideoRecorder], }, "experimental_info":{ "possible_classes":[ExperimentalInformations], } } for k in _option_dict: _option_dict[k]["class"] =_option_dict[k]["possible_classes"][0] _option_dict[k]["kwargs"] ={} _tmp_last_img_file = "last_img.jpg" _dbg_img_file = "dbg_img.png" _log_file = "ethoscope.log" def __init__(self, machine_id, name, version, ethoscope_dir, data=None, *args, **kwargs): # for FPS computation self._last_info_t_stamp = 0 self._last_info_frame_idx = 0 self._recorder = None self._machine_id = machine_id self._device_name = name self._video_root_dir = ethoscope_dir self._tmp_dir = tempfile.mkdtemp(prefix="ethoscope_") #todo add 'data' -> how monitor was started to metadata self._info = {"status": "stopped", "time": time.time(), "error": None, "log_file": os.path.join(ethoscope_dir, self._log_file), "dbg_img": os.path.join(ethoscope_dir, self._dbg_img_file), "last_drawn_img": os.path.join(self._tmp_dir, self._tmp_last_img_file), "id": machine_id, "name": name, "version": version, "experimental_info": {} } self._parse_user_options(data) super(ControlThread, self).__init__() def _update_info(self): if self._recorder is None: return self._last_info_t_stamp = time.time() def _parse_one_user_option(self,field, data): try: subdata = data[field] except KeyError: logging.warning("No field %s, using default" % field) return None, {} Class = eval(subdata["name"]) kwargs = subdata["arguments"] return Class, kwargs
[docs] def run(self): try: self._info["status"] = "initialising" logging.info("Starting Monitor thread") self._info["error"] = None self._last_info_t_stamp = 0 self._last_info_frame_idx = 0 ExpInfoClass = self._option_dict["experimental_info"]["class"] exp_info_kwargs = self._option_dict["experimental_info"]["kwargs"] self._info["experimental_info"] = ExpInfoClass(**exp_info_kwargs).info_dic self._info["time"] = time.time() date_time = datetime.datetime.fromtimestamp(self._info["time"]) formated_time = date_time.strftime('%Y-%m-%d_%H-%M-%S') try: code = self._info["experimental_info"]["code"] except KeyError: code = "NA" logging.warning("No code field in experimental info") file_prefix = "%s_%s_%s" % (formated_time, self._machine_id, code) import os self._output_video_full_prefix = os.path.join(self._video_root_dir, self._machine_id, self._device_name, formated_time, file_prefix ) try: os.makedirs(os.path.dirname(self._output_video_full_prefix)) except OSError: pass logging.info("Start recording") RecorderClass = self._option_dict["recorder"]["class"] recorder_kwargs = self._option_dict["recorder"]["kwargs"] self._recorder = RecorderClass(video_prefix = self._output_video_full_prefix, video_dir = self._video_root_dir, img_path=self._info["last_drawn_img"],**recorder_kwargs) self._info["status"] = "recording" self._recorder.run() logging.warning("recording RUN finished") except Exception as e: self.stop(traceback.format_exc(e)) #for testing purposes if self._evanescent: import os self.stop() os._exit(0)
[docs] def stop(self, error=None): if error is not None: logging.error("Recorder closed with an error:") logging.error(error) else: logging.info("Recorder closed all right") self._info["status"] = "stopping" self._info["time"] = time.time() self._info["experimental_info"] = {} logging.info("Stopping monitor") if self._recorder is not None: logging.warning("Control thread asking recorder to stop") self._recorder.stop() self._recorder = None self._info["status"] = "stopped" self._info["time"] = time.time() self._info["error"] = error