__author__ = 'quentin'
from collections import deque
from math import log10, sqrt, pi
import cv2
try:
CV_VERSION = int(cv2.__version__.split(".")[0])
except:
CV_VERSION = 2
import numpy as np
from scipy import ndimage
from ethoscope.core.variables import XPosVariable, YPosVariable, XYDistance, WidthVariable, HeightVariable, PhiVariable, Label
from ethoscope.core.data_point import DataPoint
from ethoscope.trackers.trackers import BaseTracker, NoPositionError
import logging
[docs]class ObjectModel(object):
"""
A class to model, update and predict foreground object (i.e. tracked animal).
"""
_sqrt_2_pi = sqrt(2.0 * pi)
def __init__(self, history_length=1000):
#fixme this should be time, not number of points!
self._features_header = [
"fg_model_area",
"fg_model_height",
#"fg_model_aspect_ratio",
"fg_model_mean_grey"
]
self._history_length = history_length
self._ring_buff = np.zeros((self._history_length, len(self._features_header)), dtype=np.float32, order="F")
self._std_buff = np.zeros((self._history_length, len(self._features_header)), dtype=np.float32, order="F")
self._ring_buff_idx=0
self._is_ready = False
self._roi_img_buff = None
self._mask_img_buff = None
self._img_buff_shape = np.array([0,0])
self._last_updated_time = 0
# If the model is not updated for this duration, it is reset. Patches #39
self._max_unupdated_duration = 1 * 60 * 1000.0 #ms
@property
def is_ready(self):
return self._is_ready
@property
def features_header(self):
return self._features_header
[docs] def update(self, img, contour,time):
self._last_updated_time = time
self._ring_buff[self._ring_buff_idx] = self.compute_features(img,contour)
self._ring_buff_idx += 1
if self._ring_buff_idx == self._history_length:
self._is_ready = True
self._ring_buff_idx = 0
return self._ring_buff[self._ring_buff_idx]
[docs] def distance(self, features,time):
if time - self._last_updated_time > self._max_unupdated_duration:
logging.warning("FG model not updated for too long. Resetting.")
self.__init__(self._history_length)
return 0
if not self._is_ready:
last_row = self._ring_buff_idx + 1
else:
last_row = self._history_length
means = np.mean(self._ring_buff[:last_row ], 0)
np.subtract(self._ring_buff[:last_row], means, self._std_buff[:last_row])
np.abs(self._std_buff[:last_row], self._std_buff[:last_row])
stds = np.mean(self._std_buff[:last_row], 0)
if (stds == 0).any():
return 0
a = 1 / (stds* self._sqrt_2_pi)
b = np.exp(- (features - means) ** 2 / (2 * stds ** 2))
likelihoods = a * b
if np.any(likelihoods==0):
return 0
#print features, means
logls = np.sum(np.log10(likelihoods)) / len(likelihoods)
return -1.0 * logls
[docs] def compute_features(self, img, contour):
x,y,w,h = cv2.boundingRect(contour)
if self._roi_img_buff is None or np.any(self._roi_img_buff.shape < img.shape[0:2]) :
# dynamically reallocate buffer if needed
self._img_buff_shape[1] = max(self._img_buff_shape[1],w)
self._img_buff_shape[0] = max(self._img_buff_shape[0], h)
self._roi_img_buff = np.zeros(self._img_buff_shape, np.uint8)
self._mask_img_buff = np.zeros_like(self._roi_img_buff)
sub_mask = self._mask_img_buff[0 : h, 0 : w]
sub_grey = self._roi_img_buff[ 0 : h, 0: w]
cv2.cvtColor(img[y : y + h, x : x + w, :],cv2.COLOR_BGR2GRAY,sub_grey)
sub_mask.fill(0)
cv2.drawContours(sub_mask,[contour],-1, 255,-1,offset=(-x,-y))
mean_col = cv2.mean(sub_grey, sub_mask)[0]
(_,_) ,(width,height), angle = cv2.minAreaRect(contour)
width, height= max(width,height), min(width,height)
ar = ((height+1) / (width+1))
#todo speed should use time
#
# if len(self.positions) > 2:
#
# pm, pmm = self._positions[-1],self._positions[-2]
# xm, xmm = pm["x"], pmm["x"]
# ym, ymm = pm["y"], pmm["y"]
#
# instantaneous_speed = abs(xm + 1j*ym - xmm + 1j*ymm)
# else:
# instantaneous_speed = 0
# if np.isnan(instantaneous_speed):
# instantaneous_speed = 0
features = np.array([log10(cv2.contourArea(contour) + 1.0),
height + 1,
#sqrt(ar),
#instantaneous_speed +1.0,
mean_col +1
# 1.0
])
return features
[docs]class BackgroundModel(object):
"""
A class to model background. It uses a dynamic running average and support arbitrary and heterogeneous frame rates
"""
def __init__(self, max_half_life=100. * 1000, min_half_life=1.* 1000, increment = 1.2):
# the maximal half life of a pixel from background, in seconds
self._max_half_life = float(max_half_life)
# the minimal one
self._min_half_life = float(min_half_life)
# starts with the fastest learning rate
self._current_half_life = self._min_half_life
# fixme theoritically this should depend on time, not frame index
self._increment = increment
# the mean background
self._bg_mean = None
# self._bg_sd = None
self._buff_alpha_matrix = None
self._buff_invert_alpha_mat = None
# the time stamp of the frame las used to update
self.last_t = 0
@property
def bg_img(self):
return self._bg_mean
[docs] def increase_learning_rate(self):
self._current_half_life /= self._increment
[docs] def decrease_learning_rate(self):
self._current_half_life *= self._increment
[docs] def update(self, img_t, t, fg_mask=None):
dt = float(t - self.last_t)
if dt < 0:
# raise EthoscopeException("Negative time interval between two consecutive frames")
raise NoPositionError("Negative time interval between two consecutive frames")
# clip the half life to possible value:
self._current_half_life = np.clip(self._current_half_life, self._min_half_life, self._max_half_life)
# ensure preallocated buffers exist. otherwise, initialise them
if self._bg_mean is None:
self._bg_mean = img_t.astype(np.float32)
# self._bg_sd = np.zeros_like(img_t)
# self._bg_sd.fill(128)
if self._buff_alpha_matrix is None:
self._buff_alpha_matrix = np.ones_like(img_t,dtype = np.float32)
# the learning rate, alpha, is an exponential function of half life
# it correspond to how much the present frame should account for the background
lam = np.log(2)/self._current_half_life
# how much the current frame should be accounted for
alpha = 1 - np.exp(-lam * dt)
# set-p a matrix of learning rate. it is 0 where foreground map is true
self._buff_alpha_matrix.fill(alpha)
if fg_mask is not None:
cv2.dilate(fg_mask,None,fg_mask)
cv2.subtract(self._buff_alpha_matrix, self._buff_alpha_matrix, self._buff_alpha_matrix, mask=fg_mask)
if self._buff_invert_alpha_mat is None:
self._buff_invert_alpha_mat = 1 - self._buff_alpha_matrix
else:
np.subtract(1, self._buff_alpha_matrix, self._buff_invert_alpha_mat)
np.multiply(self._buff_alpha_matrix, img_t, self._buff_alpha_matrix)
np.multiply(self._buff_invert_alpha_mat, self._bg_mean, self._buff_invert_alpha_mat)
np.add(self._buff_alpha_matrix, self._buff_invert_alpha_mat, self._bg_mean)
self.last_t = t
[docs]class AdaptiveBGModel(BaseTracker):
_description = {"overview": "The default tracker for fruit flies. One animal per ROI.",
"arguments": []}
fg_model = ObjectModel()
def __init__(self, roi, data=None):
"""
An adaptive background subtraction model to find position of one animal in one roi.
TODO more description here
:param roi:
:param data:
:return:
"""
self._previous_shape=None
self._object_expected_size = 0.05 # proportion of the roi main axis
self._max_area = (5 * self._object_expected_size) ** 2
self._smooth_mode = deque()
self._smooth_mode_tstamp = deque()
self._smooth_mode_window_dt = 30 * 1000 #miliseconds
self._bg_model = BackgroundModel()
self._max_m_log_lik = 6.
self._buff_grey = None
self._buff_object = None
self._buff_object_old = None
self._buff_grey_blurred = None
self._buff_fg = None
self._buff_convolved_mask = None
self._buff_fg_backup = None
self._buff_fg_diff = None
self._old_sum_fg = 0
super(AdaptiveBGModel, self).__init__(roi, data)
def _pre_process_input_minimal(self, img, mask, t, darker_fg=True):
blur_rad = int(self._object_expected_size * np.max(img.shape) / 2.0)
if blur_rad % 2 == 0:
blur_rad += 1
if self._buff_grey is None:
self._buff_grey = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
if mask is None:
mask = np.ones_like(self._buff_grey) * 255
cv2.cvtColor(img,cv2.COLOR_BGR2GRAY, self._buff_grey)
# cv2.imshow("dbg",self._buff_grey)
cv2.GaussianBlur(self._buff_grey,(blur_rad,blur_rad),1.2, self._buff_grey)
if darker_fg:
cv2.subtract(255, self._buff_grey, self._buff_grey)
#
mean = cv2.mean(self._buff_grey, mask)
scale = 128. / mean[0]
cv2.multiply(self._buff_grey, scale, dst = self._buff_grey)
if mask is not None:
cv2.bitwise_and(self._buff_grey, mask, self._buff_grey)
return self._buff_grey
def _pre_process_input(self, img, mask, t):
blur_rad = int(self._object_expected_size * np.max(img.shape) * 2.0)
if blur_rad % 2 == 0:
blur_rad += 1
if self._buff_grey is None:
self._buff_grey = cv2.cvtColor(img,cv2.COLOR_BGR2GRAY)
self._buff_grey_blurred = np.empty_like(self._buff_grey)
# self._buff_grey_blurred = np.empty_like(self._buff_grey)
if mask is None:
mask = np.ones_like(self._buff_grey) * 255
mask_conv = cv2.blur(mask,(blur_rad, blur_rad))
self._buff_convolved_mask = (1/255.0 * mask_conv.astype(np.float32))
cv2.cvtColor(img,cv2.COLOR_BGR2GRAY, self._buff_grey)
hist = cv2.calcHist([self._buff_grey], [0], None, [256], [0,255]).ravel()
hist = np.convolve(hist, [1] * 3)
mode = np.argmax(hist)
self._smooth_mode.append(mode)
self._smooth_mode_tstamp.append(t)
if len(self._smooth_mode_tstamp) >2 and self._smooth_mode_tstamp[-1] - self._smooth_mode_tstamp[0] > self._smooth_mode_window_dt:
self._smooth_mode.popleft()
self._smooth_mode_tstamp.popleft()
mode = np.mean(list(self._smooth_mode))
scale = 128. / mode
# cv2.GaussianBlur(self._buff_grey,(5,5), 1.5,self._buff_grey)
cv2.multiply(self._buff_grey, scale, dst = self._buff_grey)
cv2.bitwise_and(self._buff_grey, mask, self._buff_grey)
cv2.blur(self._buff_grey,(blur_rad, blur_rad), self._buff_grey_blurred)
#fixme could be optimised
self._buff_grey_blurred = (self._buff_grey_blurred / self._buff_convolved_mask).astype(np.uint8)
cv2.absdiff(self._buff_grey, self._buff_grey_blurred, self._buff_grey)
if mask is not None:
cv2.bitwise_and(self._buff_grey, mask, self._buff_grey)
return self._buff_grey
def _find_position(self, img, mask,t):
grey = self._pre_process_input_minimal(img, mask, t)
# grey = self._pre_process_input(img, mask, t)
try:
return self._track(img, grey, mask, t)
except NoPositionError:
self._bg_model.update(grey, t)
raise NoPositionError
def _track(self, img, grey, mask,t):
if self._bg_model.bg_img is None:
self._buff_fg = np.empty_like(grey)
self._buff_object= np.empty_like(grey)
self._buff_fg_backup = np.empty_like(grey)
# self._buff_fg_diff = np.empty_like(grey)
self._old_pos = 0.0 +0.0j
# self._old_sum_fg = 0
raise NoPositionError
bg = self._bg_model.bg_img.astype(np.uint8)
cv2.subtract(grey, bg, self._buff_fg)
cv2.threshold(self._buff_fg,20,255,cv2.THRESH_TOZERO, dst=self._buff_fg)
# cv2.bitwise_and(self._buff_fg_backup,self._buff_fg,dst=self._buff_fg_diff)
# sum_fg = cv2.countNonZero(self._buff_fg)
self._buff_fg_backup = np.copy(self._buff_fg)
n_fg_pix = np.count_nonzero(self._buff_fg)
prop_fg_pix = n_fg_pix / (1.0 * grey.shape[0] * grey.shape[1])
is_ambiguous = False
if prop_fg_pix > self._max_area:
self._bg_model.increase_learning_rate()
raise NoPositionError
if prop_fg_pix == 0:
self._bg_model.increase_learning_rate()
raise NoPositionError
if CV_VERSION == 3:
_, contours,hierarchy = cv2.findContours(self._buff_fg, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
else:
contours,hierarchy = cv2.findContours(self._buff_fg, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
contours = [cv2.approxPolyDP(c,1.2,True) for c in contours]
if len(contours) == 0:
self._bg_model.increase_learning_rate()
raise NoPositionError
elif len(contours) > 1:
if not self.fg_model.is_ready:
raise NoPositionError
# hulls = [cv2.convexHull( c) for c in contours]
hulls = contours
#hulls = merge_blobs(hulls)
hulls = [h for h in hulls if h.shape[0] >= 3]
if len(hulls) < 1:
raise NoPositionError
elif len(hulls) > 1:
is_ambiguous = True
cluster_features = [self.fg_model.compute_features(img, h) for h in hulls]
all_distances = [self.fg_model.distance(cf,t) for cf in cluster_features]
good_clust = np.argmin(all_distances)
hull = hulls[good_clust]
distance = all_distances[good_clust]
else:
hull = contours[0]
if hull.shape[0] < 3:
self._bg_model.increase_learning_rate()
raise NoPositionError
features = self.fg_model.compute_features(img, hull)
distance = self.fg_model.distance(features,t)
if distance > self._max_m_log_lik:
self._bg_model.increase_learning_rate()
raise NoPositionError
(x,y) ,(w,h), angle = cv2.minAreaRect(hull)
if w < h:
angle -= 90
w,h = h,w
angle = angle % 180
h_im = min(grey.shape)
w_im = max(grey.shape)
max_h = 2*h_im
if w>max_h or h>max_h:
raise NoPositionError
cv2.ellipse(self._buff_fg ,((x,y), (int(w*1.5),int(h*1.5)),angle),255,-1)
#todo center mass just on the ellipse area
cv2.bitwise_and(self._buff_fg_backup, self._buff_fg,self._buff_fg_backup)
y,x = ndimage.measurements.center_of_mass(self._buff_fg_backup)
pos = x +1.0j*y
pos /= w_im
xy_dist = round(log10(1./float(w_im) + abs(pos - self._old_pos))*1000)
# cv2.bitwise_and(self._buff_fg_diff,self._buff_fg,dst=self._buff_fg_diff)
# sum_diff = cv2.countNonZero(self._buff_fg_diff)
# xor_dist = (sum_fg + self._old_sum_fg - 2*sum_diff) / float(sum_fg + self._old_sum_fg)
# xor_dist *=1000.
# self._old_sum_fg = sum_fg
self._old_pos = pos
if mask is not None:
cv2.bitwise_and(self._buff_fg, mask, self._buff_fg)
if is_ambiguous:
self._bg_model.increase_learning_rate()
self._bg_model.update(grey, t)
else:
self._bg_model.decrease_learning_rate()
self._bg_model.update(grey, t, self._buff_fg)
self.fg_model.update(img, hull,t)
x_var = XPosVariable(int(round(x)))
y_var = YPosVariable(int(round(y)))
distance = XYDistance(int(xy_dist))
#xor_dist = XorDistance(int(xor_dist))
w_var = WidthVariable(int(round(w)))
h_var = HeightVariable(int(round(h)))
phi_var = PhiVariable(int(round(angle)))
# mlogl = mLogLik(int(distance*1000))
out = DataPoint([x_var, y_var, w_var, h_var,
phi_var,
#mlogl,
distance,
#xor_dist
#Label(0)
])
self._previous_shape=np.copy(hull)
return [out]