import numpy as np from statsmodels.base import model import statsmodels.base.model as base from import cache_readonly from scipy.optimize import brent """ Implementation of proportional hazards regression models for duration data that may be censored ("Cox models"). References ---------- T Therneau (1996). Extending the Cox model. Technical report. G Rodriguez (2005). Non-parametric estimation in survival models. B Gillespie (2006). Checking the assumptions in the Cox proportional hazards model. """ class PHSurvivalTime(object): def __init__(self, time, status, exog, strata=None, entry=None, offset=None): """ Represent a collection of survival times with possible stratification and left truncation. Parameters ---------- time : array_like The times at which either the event (failure) occurs or the observation is censored. status : array_like Indicates whether the event (failure) occurs at `time` (`status` is 1), or if `time` is a censoring time (`status` is 0). exog : array_like The exogeneous (covariate) data matrix, cases are rows and variables are columns. strata : array_like Grouping variable defining the strata. If None, all observations are in a single stratum. entry : array_like Entry (left truncation) times. The observation is not part of the risk set for times before the entry time. If None, the entry time is treated as being zero, which gives no left truncation. The entry time must be less than or equal to `time`. offset : array-like An optional array of offsets """ # Default strata if strata is None: strata = np.zeros(len(time), dtype=np.int32) # Default entry times if entry is None: entry = np.zeros(len(time)) # Parameter validity checks. n1, n2, n3, n4 = len(time), len(status), len(strata),\ len(entry) nv = [n1, n2, n3, n4] if max(nv) != min(nv): raise ValueError("endog, status, strata, and " + "entry must all have the same length") if min(time) < 0: raise ValueError("endog must be non-negative") if min(entry) < 0: raise ValueError("entry time must be non-negative") # In Stata, this is entry >= time, in R it is >. if np.any(entry > time): raise ValueError("entry times may not occur " + "after event or censoring times") # Get the row indices for the cases in each stratum if strata is not None: stu = np.unique(strata) #sth = {x: [] for x in stu} # needs >=2.7 sth = dict([(x, []) for x in stu]) for i,k in enumerate(strata): sth[k].append(i) stratum_rows = [np.asarray(sth[k], dtype=np.int32) for k in stu] stratum_names = stu else: stratum_rows = [np.arange(len(time)),] stratum_names = [0,] # Remove strata with no events ix = [i for i,ix in enumerate(stratum_rows) if status[ix].sum() > 0] stratum_rows = [stratum_rows[i] for i in ix] stratum_names = [stratum_names[i] for i in ix] # The number of strata nstrat = len(stratum_rows) self.nstrat = nstrat # Remove subjects whose entry time occurs after the last event # in their stratum. for stx,ix in enumerate(stratum_rows): last_failure = max(time[ix][status[ix] == 1]) # Stata uses < here, R uses <= ii = [i for i,t in enumerate(entry[ix]) if t <= last_failure] stratum_rows[stx] = stratum_rows[stx][ii] # Remove subjects who are censored before the first event in # their stratum. for stx,ix in enumerate(stratum_rows): first_failure = min(time[ix][status[ix] == 1]) ii = [i for i,t in enumerate(time[ix]) if t >= first_failure] stratum_rows[stx] = stratum_rows[stx][ii] # Order by time within each stratum for stx,ix in enumerate(stratum_rows): ii = np.argsort(time[ix]) stratum_rows[stx] = stratum_rows[stx][ii] if offset is not None: self.offset_s = [] for stx in range(nstrat): self.offset_s.append(offset[stratum_rows[stx]]) else: self.offset_s = None # Number of informative subjects self.n_obs = sum([len(ix) for ix in stratum_rows]) # Split everything by stratum self.time_s = [] self.exog_s = [] self.status_s = [] self.entry_s = [] for ix in stratum_rows: self.time_s.append(time[ix]) self.exog_s.append(exog[ix,:]) self.status_s.append(status[ix]) self.entry_s.append(entry[ix]) self.stratum_rows = stratum_rows self.stratum_names = stratum_names # Precalculate some indices needed to fit Cox models. # Distinct failure times within a stratum are always taken to # be sorted in ascending order. # # ufailt_ix[stx][k] is a list of indices for subjects who fail # at the k^th sorted unique failure time in stratum stx # # risk_enter[stx][k] is a list of indices for subjects who # enter the risk set at the k^th sorted unique failure time in # stratum stx # # risk_exit[stx][k] is a list of indices for subjects who exit # the risk set at the k^th sorted unique failure time in # stratum stx self.ufailt_ix, self.risk_enter, self.risk_exit, self.ufailt =\ [], [], [], [] for stx in range(self.nstrat): # All failure times ift = np.flatnonzero(self.status_s[stx] == 1) ft = self.time_s[stx][ift] # Unique failure times uft = np.unique(ft) nuft = len(uft) # Indices of cases that fail at each unique failure time #uft_map = {x:i for i,x in enumerate(uft)} # requires >=2.7 uft_map = dict([(x, i) for i,x in enumerate(uft)]) # 2.6 uft_ix = [[] for k in range(nuft)] for ix,ti in zip(ift,ft): uft_ix[uft_map[ti]].append(ix) # Indices of cases (failed or censored) that enter the # risk set at each unique failure time. risk_enter1 = [[] for k in range(nuft)] for i,t in enumerate(self.time_s[stx]): ix = np.searchsorted(uft, t, "right") - 1 if ix >= 0: risk_enter1[ix].append(i) # Indices of cases (failed or censored) that exit the # risk set at each unique failure time. risk_exit1 = [[] for k in range(nuft)] for i,t in enumerate(self.entry_s[stx]): ix = np.searchsorted(uft, t) risk_exit1[ix].append(i) self.ufailt.append(uft) self.ufailt_ix.append([np.asarray(x, dtype=np.int32) for x in uft_ix]) self.risk_enter.append([np.asarray(x, dtype=np.int32) for x in risk_enter1]) self.risk_exit.append([np.asarray(x, dtype=np.int32) for x in risk_exit1]) class PHReg(model.LikelihoodModel): """ Fit the Cox proportional hazards regression model for right censored data. Parameters ---------- endog : array-like The observed times (event or censoring) exog : 2D array-like The covariates or exogeneous variables status : array-like The censoring status values; status=1 indicates that an event occured (e.g. failure or death), status=0 indicates that the observation was right censored. If None, defaults to status=1 for all cases. entry : array-like The entry times, if left truncation occurs strata : array-like Stratum labels. If None, all observations are taken to be in a single stratum. ties : string The method used to handle tied times, must be either 'breslow' or 'efron'. offset : array-like Array of offset values missing : string The method used to handle missing data Notes ----- Proportional hazards regression models should not include an explicit or implicit intercept. The effect of an intercept is not identified using the partial likelihood approach. `endog`, `event`, `strata`, `entry`, and the first dimension of `exog` all must have the same length """ def __init__(self, endog, exog, status=None, entry=None, strata=None, offset=None, ties='breslow', missing='drop', **kwargs): # Default is no censoring if status is None: status = np.ones(len(endog)) super(PHReg, self).__init__(endog, exog, status=status, entry=entry, strata=strata, offset=offset, missing=missing, **kwargs) # endog and exog are automatically converted, but these are # not if self.status is not None: self.status = np.asarray(self.status) if self.entry is not None: self.entry = np.asarray(self.entry) if self.strata is not None: self.strata = np.asarray(self.strata) if self.offset is not None: self.offset = np.asarray(self.offset) self.surv = PHSurvivalTime(self.endog, self.status, self.exog, self.strata, self.entry, self.offset) # TODO: not used? self.missing = missing ties = ties.lower() if ties not in ("efron", "breslow"): raise ValueError("`ties` must be either `efron` or " + "`breslow`") self.ties = ties @classmethod def from_formula(cls, formula, data, status=None, entry=None, strata=None, offset=None, subset=None, ties='breslow', missing='drop', *args, **kwargs): """ Create a proportional hazards regression model from a formula and dataframe. Parameters ---------- formula : str or generic Formula object The formula specifying the model data : array-like The data for the model. See Notes. status : array-like The censoring status values; status=1 indicates that an event occured (e.g. failure or death), status=0 indicates that the observation was right censored. If None, defaults to status=1 for all cases. entry : array-like The entry times, if left truncation occurs strata : array-like Stratum labels. If None, all observations are taken to be in a single stratum. offset : array-like Array of offset values subset : array-like An array-like object of booleans, integers, or index values that indicate the subset of df to use in the model. Assumes df is a `pandas.DataFrame` ties : string The method used to handle tied times, must be either 'breslow' or 'efron'. missing : string The method used to handle missing data args : extra arguments These are passed to the model kwargs : extra keyword arguments These are passed to the model with one exception. The ``eval_env`` keyword is passed to patsy. It can be either a :class:`patsy:patsy.EvalEnvironment` object or an integer indicating the depth of the namespace to use. For example, the default ``eval_env=0`` uses the calling namespace. If you wish to use a "clean" environment set ``eval_env=-1``. Returns ------- model : PHReg model instance """ # Allow array arguments to be passed by column name. if type(status) is str: status = data[status] if type(entry) is str: entry = data[entry] if type(strata) is str: strata = data[strata] if type(offset) is str: offset = data[offset] mod = super(PHReg, cls).from_formula(formula, data, status=status, entry=entry, strata=strata, offset=offset, subset=subset, ties=ties, missing=missing, *args, **kwargs) return mod def fit(self, groups=None, **args): """ Fit a proportional hazards regression model. Parameters ---------- groups : array-like Labels indicating groups of observations that may be dependent. If present, the standard errors account for this dependence. Does not affect fitted values. Returns a PHregResults instance. """ # TODO process for missing values if groups is not None: self.groups = np.asarray(groups) else: self.groups = None if 'disp' not in args: args['disp'] = False fit_rslts = super(PHReg, self).fit(**args) if self.groups is None: cov_params = fit_rslts.cov_params() else: cov_params = self.robust_covariance(fit_rslts.params) results = PHRegResults(self, fit_rslts.params, cov_params) return results def fit_regularized(self, method="coord_descent", maxiter=100, alpha=0., L1_wt=1., start_params=None, cnvrg_tol=1e-7, zero_tol=1e-8, **kwargs): """ Return a regularized fit to a linear regression model. Parameters ---------- method : Only the coordinate descent algorithm is implemented. maxiter : integer The maximum number of iteration cycles (an iteration cycle involves running coordinate descent on all variables). alpha : scalar or array-like The penalty weight. If a scalar, the same penalty weight applies to all variables in the model. If a vector, it must have the same length as `params`, and contains a penalty weight for each coefficient. L1_wt : scalar The fraction of the penalty given to the L1 penalty term. Must be between 0 and 1 (inclusive). If 0, the fit is a ridge fit, if 1 it is a lasso fit. start_params : array-like Starting values for `params`. cnvrg_tol : scalar If `params` changes by less than this amount (in sup-norm) in once iteration cycle, the algorithm terminates with convergence. zero_tol : scalar Any estimated coefficient smaller than this value is replaced with zero. Returns ------- A PHregResults object, of the same type returned by `fit`. Notes ----- The penalty is the"elastic net" penalty, which is a convex combination of L1 and L2 penalties. The function that is minimized is: ..math:: -loglike/n + alpha*((1-L1_wt)*|params|_2^2/2 + L1_wt*|params|_1) where :math:`|*|_1` and :math:`|*|_2` are the L1 and L2 norms. Post-estimation results are based on the same data used to select variables, hence may be subject to overfitting biases. """ k_exog = self.exog.shape[1] n_exog = self.exog.shape[0] if np.isscalar(alpha): alpha = alpha * np.ones(k_exog, dtype=np.float64) # regularization cannot be used with groups self.groups = None # Define starting params if start_params is None: params = np.zeros(k_exog, dtype=np.float64) else: params = start_params.copy() # Maybe could be a shallow copy, but just in case... import copy surv = copy.deepcopy(self.surv) # This is the base offset, onto which the effects of # constrained variables are added. if self.offset is None: offset_s_base = [np.zeros(len(x)) for x in surv.stratum_rows] surv.offset_s = [x.copy() for x in offset_s_base] else: offset_s_base = [x.copy() for x in surv.offset_s] # Create a model instance for optimizing a single variable model_1var = copy.deepcopy(self) model_1var.surv = surv model_1var.ties = self.ties # All the negative penalized loglikeihood functions. def gen_npfuncs(k): def nploglike(params): pen = alpha[k]*((1 - L1_wt)*params**2/2 + L1_wt*np.abs(params)) return -model_1var.loglike(np.r_[params]) / n_exog + pen def npscore(params): pen_grad = alpha[k]*(1 - L1_wt)*params return -model_1var.score(np.r_[params])[0] / n_exog + pen_grad def nphess(params): pen_hess = alpha[k]*(1 - L1_wt) return -model_1var.hessian(np.r_[params])[0,0] / n_exog + pen_hess return nploglike, npscore, nphess nploglike_funcs = [gen_npfuncs(k) for k in range(len(params))] # 1-dimensional exog's exog_s = [] for k in range(k_exog): ex = [x[:, k][:, None] for x in surv.exog_s] exog_s.append(ex) converged = False btol = 1e-8 params_zero = np.zeros(len(params), dtype=bool) for itr in range(maxiter): # Sweep through the parameters params_save = params.copy() for k in range(k_exog): # Under the active set method, if a parameter becomes # zero we don't try to change it again. if params_zero[k]: continue # Set exog to include only the variable whose effect # is being estimated. surv.exog_s = exog_s[k] # Set the offset to account for the variables that are # being held fixed. params0 = params.copy() params0[k] = 0 for stx in range(self.surv.nstrat): v =[stx], params0) surv.offset_s[stx] = offset_s_base[stx] + v params[k] = _opt_1d(nploglike_funcs[k], params[k], alpha[k]*L1_wt, tol=btol) # Update the active set if itr > 0 and np.abs(params[k]) < zero_tol: params_zero[k] = True params[k] = 0. # Check for convergence pchange = np.max(np.abs(params - params_save)) if pchange < cnvrg_tol: converged = True break # Set approximate zero coefficients to be exactly zero params *= np.abs(params) >= zero_tol # Fit the reduced model to get standard errors and other # post-estimation results. ii = np.flatnonzero(params) cov = np.zeros((k_exog, k_exog), dtype=np.float64) if len(ii) > 0: model = self.__class__(self.endog, self.exog[:, ii], status=self.status, entry=self.entry, strata=self.strata, offset=self.offset, ties=self.ties, missing=self.missing) rslt = cov[np.ix_(ii, ii)] = rslt.normalized_cov_params rfit = PHRegResults(self, params, cov_params=cov) rfit.converged = converged rfit.regularized = True return rfit def loglike(self, params): """ Returns the log partial likelihood function evaluated at `params`. """ if self.ties == "breslow": return self.breslow_loglike(params) elif self.ties == "efron": return self.efron_loglike(params) def score(self, params): """ Returns the score function evaluated at `params`. """ if self.ties == "breslow": return self.breslow_gradient(params) elif self.ties == "efron": return self.efron_gradient(params) def hessian(self, params): """ Returns the Hessian matrix of the log partial likelihood function evaluated at `params`. """ if self.ties == "breslow": return self.breslow_hessian(params) else: return self.efron_hessian(params) def breslow_loglike(self, params): """ Returns the value of the log partial likelihood function evaluated at `params`, using the Breslow method to handle tied times. """ surv = self.surv like = 0. # Loop over strata for stx in range(surv.nstrat): uft_ix = surv.ufailt_ix[stx] exog_s = surv.exog_s[stx] nuft = len(uft_ix) linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0 = 0. # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] xp0 += e_linpred[ix].sum() # Account for all cases that fail at this point. ix = uft_ix[i] like += (linpred[ix] - np.log(xp0)).sum() # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] xp0 -= e_linpred[ix].sum() return like def efron_loglike(self, params): """ Returns the value of the log partial likelihood function evaluated at `params`, using the Efron method to handle tied times. """ surv = self.surv like = 0. # Loop over strata for stx in range(surv.nstrat): # exog and linear predictor for this stratum exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0 = 0. # Iterate backward through the unique failure times. uft_ix = surv.ufailt_ix[stx] nuft = len(uft_ix) for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] xp0 += e_linpred[ix].sum() xp0f = e_linpred[uft_ix[i]].sum() # Account for all cases that fail at this point. ix = uft_ix[i] like += linpred[ix].sum() m = len(ix) J = np.arange(m, dtype=np.float64) / m like -= np.log(xp0 - J*xp0f).sum() # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] xp0 -= e_linpred[ix].sum() return like def breslow_gradient(self, params): """ Returns the gradient of the log partial likelihood, using the Breslow method to handle tied times. """ surv = self.surv grad = 0. # Loop over strata for stx in range(surv.nstrat): # Indices of subjects in the stratum strat_ix = surv.stratum_rows[stx] # Unique failure times in the stratum uft_ix = surv.ufailt_ix[stx] nuft = len(uft_ix) # exog and linear predictor for the stratum exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0, xp1 = 0., 0. # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] if len(ix) > 0: v = exog_s[ix,:] xp0 += e_linpred[ix].sum() xp1 += (e_linpred[ix][:,None] * v).sum(0) # Account for all cases that fail at this point. ix = uft_ix[i] grad += (exog_s[ix,:] - xp1 / xp0).sum(0) # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] if len(ix) > 0: v = exog_s[ix,:] xp0 -= e_linpred[ix].sum() xp1 -= (e_linpred[ix][:,None] * v).sum(0) return grad def efron_gradient(self, params): """ Returns the gradient of the log partial likelihood evaluated at `params`, using the Efron method to handle tied times. """ surv = self.surv grad = 0. # Loop over strata for stx in range(surv.nstrat): # Indices of cases in the stratum strat_ix = surv.stratum_rows[stx] # exog and linear predictor of the stratum exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0, xp1 = 0., 0. # Iterate backward through the unique failure times. uft_ix = surv.ufailt_ix[stx] nuft = len(uft_ix) for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] if len(ix) > 0: v = exog_s[ix,:] xp0 += e_linpred[ix].sum() xp1 += (e_linpred[ix][:,None] * v).sum(0) ixf = uft_ix[i] if len(ixf) > 0: v = exog_s[ixf,:] xp0f = e_linpred[ixf].sum() xp1f = (e_linpred[ixf][:,None] * v).sum(0) # Consider all cases that fail at this point. grad += v.sum(0) m = len(ixf) J = np.arange(m, dtype=np.float64) / m numer = xp1 - np.outer(J, xp1f) denom = xp0 - np.outer(J, xp0f) ratio = numer / denom rsum = ratio.sum(0) grad -= rsum # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] if len(ix) > 0: v = exog_s[ix,:] xp0 -= e_linpred[ix].sum() xp1 -= (e_linpred[ix][:,None] * v).sum(0) return grad def breslow_hessian(self, params): """ Returns the Hessian of the log partial likelihood evaluated at `params`, using the Breslow method to handle tied times. """ surv = self.surv hess = 0. # Loop over strata for stx in range(surv.nstrat): uft_ix = surv.ufailt_ix[stx] nuft = len(uft_ix) exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0, xp1, xp2 = 0., 0., 0. # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] if len(ix) > 0: xp0 += e_linpred[ix].sum() v = exog_s[ix,:] xp1 += (e_linpred[ix][:,None] * v).sum(0) mat = v[None,:,:] elx = e_linpred[ix] xp2 += (mat.T * mat * elx[None,:,None]).sum(1) # Account for all cases that fail at this point. m = len(uft_ix[i]) hess += m*(xp2 / xp0 - np.outer(xp1, xp1) / xp0**2) # Update for new cases entering the risk set. ix = surv.risk_exit[stx][i] if len(ix) > 0: xp0 -= e_linpred[ix].sum() v = exog_s[ix,:] xp1 -= (e_linpred[ix][:,None] * v).sum(0) mat = v[None,:,:] elx = e_linpred[ix] xp2 -= (mat.T * mat * elx[None,:,None]).sum(1) return -hess def efron_hessian(self, params): """ Returns the Hessian matrix of the partial log-likelihood evaluated at `params`, using the Efron method to handle tied times. """ surv = self.surv hess = 0. # Loop over strata for stx in range(surv.nstrat): exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) xp0, xp1, xp2 = 0., 0., 0. # Iterate backward through the unique failure times. uft_ix = surv.ufailt_ix[stx] nuft = len(uft_ix) for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] if len(ix) > 0: xp0 += e_linpred[ix].sum() v = exog_s[ix,:] xp1 += (e_linpred[ix][:,None] * v).sum(0) mat = v[None,:,:] elx = e_linpred[ix] xp2 += (mat.T * mat * elx[None,:,None]).sum(1) ixf = uft_ix[i] if len(ixf) > 0: v = exog_s[ixf,:] xp0f = e_linpred[ixf].sum() xp1f = (e_linpred[ixf][:,None] * v).sum(0) mat = v[None,:,:] elx = e_linpred[ixf] xp2f = (mat.T * mat * elx[None,:,None]).sum(1) # Account for all cases that fail at this point. m = len(uft_ix[i]) J = np.arange(m, dtype=np.float64) / m c0 = xp0 - J*xp0f mat = (xp2[None,:,:] - J[:,None,None]*xp2f) / c0[:,None,None] hess += mat.sum(0) mat = (xp1[None, :] - np.outer(J, xp1f)) / c0[:, None] mat = mat[:, :, None] * mat[:, None, :] hess -= mat.sum(0) # Update for new cases entering the risk set. ix = surv.risk_exit[stx][i] if len(ix) > 0: xp0 -= e_linpred[ix].sum() v = exog_s[ix,:] xp1 -= (e_linpred[ix][:,None] * v).sum(0) mat = v[None,:,:] elx = e_linpred[ix] xp2 -= (mat.T * mat * elx[None,:,None]).sum(1) return -hess def robust_covariance(self, params): """ Returns a covariance matrix for the proportional hazards model regresion coefficient estimates that is robust to certain forms of model misspecification. Parameters ---------- params : ndarray The parameter vector at which the covariance matrix is calculated. Returns ------- The robust covariance matrix as a square ndarray. Notes ----- This function uses the `groups` argument to determine groups within which observations may be dependent. The covariance matrix is calculated using the Huber-White "sandwich" approach. """ if self.groups is None: raise ValueError("`groups` must be specified to calculate the robust covariance matrix") hess = self.hessian(params) score_obs = self.score_residuals(params) # Collapse grads = {} for i,g in enumerate(self.groups): if g not in grads: grads[g] = 0. grads[g] += score_obs[i, :] grads = np.asarray(list(grads.values())) mat = grads[None, :, :] mat = mat.T * mat mat = mat.sum(1) hess_inv = np.linalg.inv(hess) cmat =,, hess_inv)) return cmat def score_residuals(self, params): """ Returns the score residuals calculated at a given vector of parameters. Parameters ---------- params : ndarray The parameter vector at which the score residuals are calculated. Returns ------- The score residuals, returned as a ndarray having the same shape as `exog`. Notes ----- Observations in a stratum with no observed events have undefined score residuals, and contain NaN in the returned matrix. """ surv = self.surv score_resid = np.zeros(self.exog.shape, dtype=np.float64) # Use to set undefined values to NaN. mask = np.zeros(self.exog.shape[0], dtype=np.int32) w_avg = self.weighted_covariate_averages(params) # Loop over strata for stx in range(surv.nstrat): uft_ix = surv.ufailt_ix[stx] exog_s = surv.exog_s[stx] nuft = len(uft_ix) strat_ix = surv.stratum_rows[stx] xp0 = 0. linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) at_risk_ix = set([]) # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] at_risk_ix |= set(ix) xp0 += e_linpred[ix].sum() atr_ix = list(at_risk_ix) leverage = exog_s[atr_ix, :] - w_avg[stx][i, :] # Event indicators d = np.zeros(exog_s.shape[0]) d[uft_ix[i]] = 1 # The increment in the cumulative hazard dchaz = len(uft_ix[i]) / xp0 # Piece of the martingale residual mrp = d[atr_ix] - e_linpred[atr_ix] * dchaz # Update the score residuals ii = strat_ix[atr_ix] score_resid[ii,:] += leverage * mrp[:, None] mask[ii] = 1 # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] at_risk_ix -= set(ix) xp0 -= e_linpred[ix].sum() jj = np.flatnonzero(mask == 0) if len(jj) > 0: score_resid[jj, :] = np.nan return score_resid def weighted_covariate_averages(self, params): """ Returns the hazard-weighted average of covariate values for subjects who are at-risk at a particular time. Parameters ---------- params : ndarray Parameter vector Returns ------- averages : list of ndarrays averages[stx][i,:] is a row vector containing the weighted average values (for all the covariates) of at-risk subjects a the i^th largest observed failure time in stratum `stx`, using the hazard multipliers as weights. Notes ----- Used to calculate leverages and score residuals. """ surv = self.surv averages = [] xp0, xp1 = 0., 0. # Loop over strata for stx in range(surv.nstrat): uft_ix = surv.ufailt_ix[stx] exog_s = surv.exog_s[stx] nuft = len(uft_ix) average_s = np.zeros((len(uft_ix), exog_s.shape[1]), dtype=np.float64) linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] linpred -= linpred.max() e_linpred = np.exp(linpred) # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] xp0 += e_linpred[ix].sum() xp1 +=[ix], exog_s[ix, :]) average_s[i, :] = xp1 / xp0 # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] xp0 -= e_linpred[ix].sum() xp1 -=[ix], exog_s[ix, :]) averages.append(average_s) return averages def baseline_cumulative_hazard(self, params): """ Estimate the baseline cumulative hazard and survival functions. Parameters ---------- params : ndarray The model parameters. Returns ------- A list of triples (time, hazard, survival) containing the time values and corresponding cumulative hazard and survival function values for each stratum. Notes ----- Uses the Nelson-Aalen estimator. """ # TODO: some disagreements with R, not the same algorithm but # hard to deduce what R is doing. Our results are reasonable. surv = self.surv rslt = [] # Loop over strata for stx in range(surv.nstrat): uft = surv.ufailt[stx] uft_ix = surv.ufailt_ix[stx] exog_s = surv.exog_s[stx] nuft = len(uft_ix) linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] e_linpred = np.exp(linpred) xp0 = 0. h0 = np.zeros(nuft, dtype=np.float64) # Iterate backward through the unique failure times. for i in range(nuft)[::-1]: # Update for new cases entering the risk set. ix = surv.risk_enter[stx][i] xp0 += e_linpred[ix].sum() # Account for all cases that fail at this point. ix = uft_ix[i] h0[i] = len(ix) / xp0 # Update for cases leaving the risk set. ix = surv.risk_exit[stx][i] xp0 -= e_linpred[ix].sum() cumhaz = np.cumsum(h0) - h0 surv = np.exp(-cumhaz) rslt.append([uft, cumhaz, surv]) return rslt def baseline_cumulative_hazard_function(self, params): """ Returns a function that calculates the baseline cumulative hazard function for each stratum. Parameters ---------- params : ndarray The model parameters. Returns ------- A dict mapping stratum names to the estimated baseline cumulative hazard function. """ from scipy.interpolate import interp1d surv = self.surv base = self.baseline_cumulative_hazard(params) cumhaz_f = {} for stx in range(surv.nstrat): time_h = base[stx][0] cumhaz = base[stx][1] time_h = np.r_[-np.inf, time_h, np.inf] cumhaz = np.r_[cumhaz[0], cumhaz, cumhaz[-1]] func = interp1d(time_h, cumhaz, kind='zero') cumhaz_f[self.surv.stratum_names[stx]] = func return cumhaz_f def predict(self, params, cov_params=None, endog=None, exog=None, strata=None, offset=None, pred_type="lhr"): """ Returns predicted values from the proportional hazards regression model. Parameters ---------- params : array-like The proportional hazards model parameters. cov_params : array-like The covariance matrix of the estimated `params` vector, used to obtain prediction errors if pred_type='lhr', otherwise optional. endog : array-like Duration (time) values at which the predictions are made. Only used if pred_type is either 'cumhaz' or 'surv'. If using model `exog`, defaults to model `endog` (time), but may be provided explicitly to make predictions at alternative times. exog : array-like Data to use as `exog` in forming predictions. If not provided, the `exog` values from the model used to fit the data are used. strata : array-like A vector of stratum values used to form the predictions. Not used (may be 'None') if pred_type is 'lhr' or 'hr'. If `exog` is None, the model stratum values are used. If `exog` is not None and pred_type is 'surv' or 'cumhaz', stratum values must be provided (unless there is only one stratum). offset : array-like Offset values used to create the predicted values. pred_type : string If 'lhr', returns log hazard ratios, if 'hr' returns hazard ratios, if 'surv' returns the survival function, if 'cumhaz' returns the cumulative hazard function. Returns ------- A bunch containing two fields: `predicted_values` and `standard_errors`. Notes ----- Standard errors are only returned when predicting the log hazard ratio (pred_type is 'lhr'). Types `surv` and `cumhaz` require estimation of the cumulative hazard function. """ pred_type = pred_type.lower() if pred_type not in ["lhr", "hr", "surv", "cumhaz"]: msg = "Type %s not allowed for prediction" % pred_type raise ValueError(msg) class bunch: predicted_values = None standard_errors = None ret_val = bunch() # Don't do anything with offset here because we want to allow # different offsets to be specified even if exog is the model # exog. exog_provided = True if exog is None: exog = self.exog exog_provided = False lhr =, params) if offset is not None: lhr += offset # Never use self.offset unless we are also using self.exog elif self.offset is not None and not exog_provided: lhr += self.offset # Handle lhr and hr prediction first, since they don't make # use of the hazard function. if pred_type == "lhr": ret_val.predicted_values = lhr mat =, cov_params) va = (mat * exog).sum(1) ret_val.standard_errors = np.sqrt(va) return ret_val hr = np.exp(lhr) if pred_type == "hr": ret_val.predicted_values = hr return ret_val # Makes sure endog is defined if endog is None and exog_provided: msg = "If `exog` is provided `endog` must be provided." raise ValueError(msg) # Use model endog if using model exog elif endog is None and not exog_provided: endog = self.endog # Make sure strata is defined if strata is None: if exog_provided and self.surv.nstrat > 1: raise ValueError("`strata` must be provided") if self.strata is None: strata = [self.surv.stratum_names[0],] * len(endog) else: strata = self.strata cumhaz = np.nan * np.ones(len(endog), dtype=np.float64) stv = np.unique(strata) bhaz = self.baseline_cumulative_hazard_function(params) for stx in stv: ix = np.flatnonzero(strata == stx) func = bhaz[stx] cumhaz[ix] = func(endog[ix]) * hr[ix] if pred_type == "cumhaz": ret_val.predicted_values = cumhaz elif pred_type == "surv": ret_val.predicted_values = np.exp(-cumhaz) return ret_val def get_distribution(self, params): """ Returns a scipy distribution object corresponding to the distribution of uncensored endog (duration) values for each case. Parameters ---------- params : arrayh-like The model proportional hazards model parameters. Returns ------- A list of objects of type scipy.stats.distributions.rv_discrete Notes ----- The distributions are obtained from a simple discrete estimate of the survivor function that puts all mass on the observed failure times wihtin a stratum. """ # TODO: this returns a Python list of rv_discrete objects, so # nothing can be vectorized. It appears that rv_discrete does # not allow vectorization. from scipy.stats.distributions import rv_discrete surv = self.surv bhaz = self.baseline_cumulative_hazard(params) # The arguments to rv_discrete_float, first obtained by # stratum pk, xk = [], [] for stx in range(self.surv.nstrat): exog_s = surv.exog_s[stx] linpred =, params) if surv.offset_s is not None: linpred += surv.offset_s[stx] e_linpred = np.exp(linpred) # The unique failure times for this stratum (the support # of the distribution). pts = bhaz[stx][0] # The individual cumulative hazards for everyone in this # stratum. ichaz = np.outer(e_linpred, bhaz[stx][1]) # The individual survival functions. usurv = np.exp(-ichaz) usurv = np.concatenate((usurv, np.zeros((usurv.shape[0], 1))), axis=1) # The individual survival probability masses. probs = -np.diff(usurv, 1) pk.append(probs) xk.append(np.outer(np.ones(probs.shape[0]), pts)) # Pad to make all strata have the same shape mxc = max([x.shape[1] for x in xk]) for k in range(self.surv.nstrat): if xk[k].shape[1] < mxc: xk1 = np.zeros((xk.shape[0], mxc)) pk1 = np.zeros((pk.shape[0], mxc)) xk1[:, -mxc:] = xk pk1[:, -mxc:] = pk xk[k], pk[k] = xk1, pk1 xka = np.nan * np.zeros((len(self.endog), mxc), dtype=np.float64) pka = np.ones((len(self.endog), mxc), dtype=np.float64) / mxc for stx in range(self.surv.nstrat): ix = self.surv.stratum_rows[stx] xka[ix, :] = xk[stx] pka[ix, :] = pk[stx] dist = rv_discrete_float(xka, pka) return dist class PHRegResults(base.LikelihoodModelResults): ''' Class to contain results of fitting a Cox proportional hazards survival model. PHregResults inherits from statsmodels.LikelihoodModelResults Parameters ---------- See statsmodels.LikelihoodModelResults Returns ------- **Attributes** model : class instance PHreg model instance that called fit. normalized_cov_params : array The sampling covariance matrix of the estimates params : array The coefficients of the fitted model. Each coefficient is the log hazard ratio corresponding to a 1 unit difference in a single covariate while holding the other covariates fixed. bse : array The standard errors of the fitted parameters. See Also -------- statsmodels.LikelihoodModelResults ''' def __init__(self, model, params, cov_params, covariance_type="naive"): self.covariance_type = covariance_type super(PHRegResults, self).__init__(model, params, normalized_cov_params=cov_params) @cache_readonly def standard_errors(self): """ Returns the standard errors of the parameter estimates. """ return np.sqrt(np.diag(self.cov_params())) @cache_readonly def bse(self): """ Returns the standard errors of the parameter estimates. """ return self.standard_errors def get_distribution(self): """ Returns a scipy distribution object corresponding to the distribution of uncensored endog (duration) values for each case. Returns ------- A list of objects of type scipy.stats.distributions.rv_discrete Notes ----- The distributions are obtained from a simple discrete estimate of the survivor function that puts all mass on the observed failure times wihtin a stratum. """ return self.model.get_distribution(self.params) def predict(self, endog=None, exog=None, strata=None, offset=None, pred_type="lhr"): """ Returns predicted values from the fitted proportional hazards regression model. Parameters ---------- params : array-;like The proportional hazards model parameters. endog : array-like Duration (time) values at which the predictions are made. Only used if pred_type is either 'cumhaz' or 'surv'. If using model `exog`, defaults to model `endog` (time), but may be provided explicitly to make predictions at alternative times. exog : array-like Data to use as `exog` in forming predictions. If not provided, the `exog` values from the model used to fit the data are used. strata : array-like A vector of stratum values used to form the predictions. Not used (may be 'None') if pred_type is 'lhr' or 'hr'. If `exog` is None, the model stratum values are used. If `exog` is not None and pred_type is 'surv' or 'cumhaz', stratum values must be provided (unless there is only one stratum). offset : array-like Offset values used to create the predicted values. pred_type : string If 'lhr', returns log hazard ratios, if 'hr' returns hazard ratios, if 'surv' returns the survival function, if 'cumhaz' returns the cumulative hazard function. Returns ------- A bunch containing two fields: `predicted_values` and `standard_errors`. Notes ----- Standard errors are only returned when predicting the log hazard ratio (pred_type is 'lhr'). Types `surv` and `cumhaz` require estimation of the cumulative hazard function. """ return self.model.predict(self.params, self.cov_params(), endog, exog, strata, offset, pred_type) def _group_stats(self, groups): """ Descriptive statistics of the groups. """ gsize = {} for x in groups: if x not in gsize: gsize[x] = 0 gsize[x] += 1 gsize = np.asarray(gsize.values()) return gsize.min(), gsize.max(), gsize.mean() @cache_readonly def weighted_covariate_averages(self): """ The average covariate values within the at-risk set at each event time point, weighted by hazard. """ return self.model.weighted_covariate_averages(self.params) @cache_readonly def score_residuals(self): """ A matrix containing the score residuals. """ return self.model.score_residuals(self.params) @cache_readonly def baseline_cumulative_hazard(self): """ A list (corresponding to the strata) containing the baseline cumulative hazard function evaluated at the event points. """ return self.model.baseline_cumulative_hazard(self.params) @cache_readonly def baseline_cumulative_hazard_function(self): """ A list (corresponding to the strata) containing function objects that calculate the cumulative hazard function. """ return self.model.baseline_cumulative_hazard_function(self.params) @cache_readonly def schoenfeld_residuals(self): """ A matrix containing the Schoenfeld residuals. Notes ----- Schoenfeld residuals for censored observations are set to zero. """ surv = self.model.surv w_avg = self.weighted_covariate_averages # Initialize at NaN since rows that belong to strata with no # events have undefined residuals. sch_resid = np.nan*np.ones(self.model.exog.shape, dtype=np.float64) # Loop over strata for stx in range(surv.nstrat): uft = surv.ufailt[stx] exog_s = surv.exog_s[stx] time_s = surv.time_s[stx] strat_ix = surv.stratum_rows[stx] ii = np.searchsorted(uft, time_s) # These subjects are censored after the last event in # their stratum, so have empty risk sets and undefined # residuals. jj = np.flatnonzero(ii < len(uft)) sch_resid[strat_ix[jj], :] = exog_s[jj, :] - w_avg[stx][ii[jj], :] jj = np.flatnonzero(self.model.status == 0) sch_resid[jj, :] = np.nan return sch_resid @cache_readonly def martingale_residuals(self): """ The martingale residuals. """ surv = self.model.surv # Initialize at NaN since rows that belong to strata with no # events have undefined residuals. mart_resid = np.nan*np.ones(len(self.model.endog), dtype=np.float64) cumhaz_f_list = self.baseline_cumulative_hazard_function # Loop over strata for stx in range(surv.nstrat): cumhaz_f = cumhaz_f_list[stx] exog_s = surv.exog_s[stx] time_s = surv.time_s[stx] linpred =, self.params) if surv.offset_s is not None: linpred += surv.offset_s[stx] e_linpred = np.exp(linpred) ii = surv.stratum_rows[stx] chaz = cumhaz_f(time_s) mart_resid[ii] = self.model.status[ii] - e_linpred * chaz return mart_resid def summary(self, yname=None, xname=None, title=None, alpha=.05): """ Summarize the proportional hazards regression results. Parameters ----------- yname : string, optional Default is `y` xname : list of strings, optional Default is `x#` for ## in p the number of regressors title : string, optional Title for the top table. If not None, then this replaces the default title alpha : float significance level for the confidence intervals Returns ------- smry : Summary instance this holds the summary tables and text, which can be printed or converted to various output formats. See Also -------- statsmodels.iolib.summary.Summary : class to hold summary results """ from statsmodels.iolib import summary2 from statsmodels.compat.collections import OrderedDict smry = summary2.Summary() float_format = "%8.3f" info = OrderedDict() info["Model:"] = "PH Reg" if yname is None: yname = self.model.endog_names info["Dependent variable:"] = yname info["Ties:"] = self.model.ties.capitalize() info["Sample size:"] = str(self.model.surv.n_obs) info["Num. events:"] = str(int(sum(self.model.status))) if self.model.groups is not None: mn, mx, avg = self._group_stats(self.model.groups) info["Max. group size:"] = str(mx) info["Min. group size:"] = str(mn) info["Avg. group size:"] = str(avg) smry.add_dict(info, align='l', float_format=float_format) param = summary2.summary_params(self, alpha=alpha) param = param.rename(columns={"Coef.": "log HR", "Std.Err.": "log HR SE"}) param.insert(2, "HR", np.exp(param["log HR"])) a = "[%.3f" % (alpha / 2) param.loc[:, a] = np.exp(param.loc[:, a]) a = "%.3f]" % (1 - alpha / 2) param.loc[:, a] = np.exp(param.loc[:, a]) if xname != None: param.index = xname smry.add_df(param, float_format=float_format) smry.add_title(title=title, results=self) smry.add_text("Confidence intervals are for the hazard ratios") if self.model.groups is not None: smry.add_text("Standard errors account for dependence within groups") if hasattr(self, "regularized"): smry.add_text("Standard errors do not account for the regularization") return smry class rv_discrete_float(object): """ A class representing a collection of discrete distributions. Parameters ---------- xk : 2d array-like The support points, should be non-decreasing within each row. pk : 2d array-like The probabilities, should sum to one within each row. Notes ----- Each row of `xk`, and the corresponding row of `pk` describe a discrete distribution. `xk` and `pk` should both be two-dimensional ndarrays. Each row of `pk` should sum to 1. This class is used as a substitute for scipy.distributions. rv_discrete, since that class does not allow non-integer support points, or vectorized operations. Only a limited number of methods are implemented here compared to the other scipy distribution classes. """ def __init__(self, xk, pk): self.xk = xk = pk self.cpk = np.cumsum(, axis=1) def rvs(self): """ Returns a random sample from the discrete distribution. A vector is returned containing a single draw from each row of `xk`, using the probabilities of the corresponding row of `pk` """ n = self.xk.shape[0] u = np.random.uniform(size=n) ix = (self.cpk < u[:, None]).sum(1) ii = np.arange(n, dtype=np.int32) return self.xk[(ii,ix)] def mean(self): """ Returns a vector containing the mean values of the discrete distributions. A vector is returned containing the mean value of each row of `xk`, using the probabilities in the corresponding row of `pk`. """ return (self.xk * def var(self): """ Returns a vector containing the variances of the discrete distributions. A vector is returned containing the variance for each row of `xk`, using the probabilities in the corresponding row of `pk`. """ mn = self.mean() xkc = self.xk - mn[:, None] return ( * (self.xk - xkc)**2).sum(1) def std(self): """ Returns a vector containing the standard deviations of the discrete distributions. A vector is returned containing the standard deviation for each row of `xk`, using the probabilities in the corresponding row of `pk`. """ return np.sqrt(self.var()) def _opt_1d(funcs, start, L1_wt, tol): """ Optimize a L1-penalized smooth one-dimensional function of a single variable. Parameters ---------- funcs : tuple of functions funcs[0] is the objective function to be minimized. funcs[1] and funcs[2] are, respectively, the first and second derivatives of the smooth part of funcs[0] (i.e. excluding the L1 penalty). start : real A starting value for the function argument L1_wt : non-negative real The weight for the L1 penalty function. tol : non-negative real A convergence threshold. Returns ------- The argmin of the objective function. """ # TODO: can we detect failures without calling funcs[0] twice? x = start f = funcs[0](x) b = funcs[1](x) c = funcs[2](x) d = b - c*x if L1_wt > np.abs(d): return 0. elif d >= 0: x += (L1_wt - b) / c elif d < 0: x -= (L1_wt + b) / c f1 = funcs[0](x) # This is an expensive fall-back if the quadratic # approximation is poor and sends us far off-course. if f1 > f + 1e-10: return brent(funcs[0], brack=(x-0.2, x+0.2), tol=tol) return x