Negative-binomial-regression
Bring in the covariates and all kinds of cool stuff for the counting process
#!pip install numdifftools
import numpy as np
import pandas as pd
from scipy.optimize import minimize
import scipy.special as sc
import os
from IPython.display import display, Image
import plotly.graph_objs as go
import pprint
import math
import statsmodels.discrete.count_model as reg_models
from patsy import dmatrices
import statsmodels.api as sm
import matplotlib.pyplot as plt
def compute_probabilities(a, r, t, values, pi = 0):
'''Compute the probability of a person landing in their bucket'''
if pi == 0:
return math.log(math.gamma(r+ values)/(math.gamma(r) *math.factorial(values)) * (a/(a+t))**(r)*(t/(a+t))**(values))
else:
if values == 0:
prob1 = pi
prob2 = (math.gamma(r+ values)/(math.gamma(r) *math.factorial(values)) * (a/(a+t))**(r)*(t/(a+t))**(values)) * (1-pi)
return math.log(prob1 + prob2)
else:
prob2 = (math.gamma(r+ values)/(math.gamma(r) *math.factorial(values)) * (a/(a+t))**(r)*(t/(a+t))**(values)) * (1-pi)
return math.log(prob2)
def compute_all_probabilities(param,values,t):
'''compute the probabilities of each person conditioned on how long we have observed them'''
a,r = param
x = values
probs = []
for i in range(len(values)):
probs.append(compute_probabilities(a,r,t[i],x[i]))
return np.sum(probs) * -1
def maximize(values, t):
func = lambda x: compute_all_probabilities(x, values, t)
bnds = ((0.00001, None), (0.00001, None))
x0 = [1., 1.]
res = minimize(func, x0, bounds=bnds)
return res.x, res.fun
def log_likelihood_extraction(alpha, expr, data):
Y,X = dmatrices(expr, data, return_type='dataframe')
model = sm.GLM(Y, X, family=sm.families.NegativeBinomial(alpha = alpha)).fit()
return(-1*model.llf)
def reg_maximize(expr, data):
func = lambda x: log_likelihood_extraction(x, expr, data)
Y,X = dmatrices(expr, data, return_type='dataframe')
bnds = ((0, None),)
x0 = [0.1]
res = minimize(func, x0, bounds=bnds)
model = sm.GLM(Y, X, family=sm.families.NegativeBinomial(alpha = res.x)).fit()
r = 1/res.x
a = r/math.exp(model.params[0])
return (['a','r'] + list(model.params.index[0:]),
[a[0],r[0]] + [math.exp(model.params[0])] + list(model.params[1:]),
res.fun)
def zip_maximize(expr, data):
Y,X = dmatrices(expr, data, return_type='dataframe')
out = reg_models.ZeroInflatedNegativeBinomialP(Y,X,inflation='logit')
out_res = out.fit(maxiter = 1000)
r = 1/out_res.params[-1]
pi = math.exp(out_res.params[0])/(1+math.exp(out_res.params[0]))
a = r/math.exp(out_res.params[1])
return (['a','r','pi'] + list(out_res.params.index[1:-1]),
[a,r,pi] + [math.exp(out_res.params[1])] + list(out_res.params[2:-1]),
-1*out_res.llf)
def forecast_histogram(expr, data, right_censor, zip_ = False):
if zip_ == False:
Y,X = dmatrices(expr, data, return_type='dataframe')
maxed = reg_maximize(expr,data)[1]
alpha = maxed[0]
r = maxed[1]
coef = maxed[3:]
over_all = []
coef_exp = np.exp(np.sum(X.iloc[:,1:]* np.array(coef).T,axis = 1))
for i in coef_exp:
person = []
if len(coef) == 0:
func = lambda x:math.exp(compute_probabilities(alpha,r, 1,x))
else:
func = lambda x:math.exp(compute_probabilities(alpha,r, i,x))
person = list(map(func,range(right_censor)))
person.append(1-np.sum(person))
over_all.append(person)
return(over_all)
else:
Y,X = dmatrices(expr, data, return_type='dataframe')
maxed = zip_maximize(expr,data)[1]
alpha = maxed[0]
r = maxed[1]
pi = maxed[2]
coef = maxed[4:]
over_all = []
coef_exp = np.exp(np.sum(X.iloc[:,1:]* np.array(coef).T,axis = 1))
for i in coef_exp:
person = []
if len(coef) == 0:
func = lambda x:math.exp(compute_probabilities(alpha,r, 1,x, pi = pi))
else:
func = lambda x:math.exp(compute_probabilities(alpha,r, i,x, pi = pi))
person = list(map(func,range(right_censor)))
person.append(1-np.sum(person))
over_all.append(person)
return(over_all)
def conditional_expectation(expr,data, num_of_periods, zip_ = False):
if zip_ == False:
Y,X = dmatrices(expr, data, return_type='dataframe')
maxed = reg_maximize(expr,data)[1]
alpha = maxed[0]
r = maxed[1]
coef = maxed[3:]
coef_exp = np.exp(np.sum(X.iloc[:,1:]* np.array(coef).T,axis = 1))
if len(coef) == 0:
return((r + Y.iloc[:,0].to_list())/(alpha+1)) * num_of_periods
else:
return(((r + Y.iloc[:,0].to_list())/(alpha + coef_exp))*coef_exp) * num_of_periods
else:
Y,X = dmatrices(expr, data, return_type='dataframe')
maxed = zip_maximize(expr,data)[1]
alpha = maxed[0]
r = maxed[1]
pi = maxed[2]
coef = maxed[4:]
coef_exp = np.exp(np.sum(X.iloc[:,1:]* np.array(coef).T,axis = 1))
pi_series = abs((np.array(Y.iloc[:,0].to_list()) == 0).astype(int) * (pi) - 1)
if len(coef) == 0:
return ((r + Y.iloc[:,0].to_list())/(alpha+1)) * num_of_periods * pi_series
else:
return ((r + Y.iloc[:,0].to_list())/(alpha + coef_exp))*coef_exp * num_of_periods * pi_series
data = pd.read_csv("data/khakichinos models.csv")
data.head()
maximize(data.Visits,[1]*len(data.Visits))
expr = 'Visits ~ 1'
print(reg_maximize(expr,data))
matrix = np.array(forecast_histogram(expr,data,9))
pred = matrix.mean(axis = 0) * X.shape[0]
cond = conditional_expectation(expr,data,1)
expr = 'Visits ~ Income + Sex + Age + Size'
print(reg_maximize(expr,data))
matrix2 = np.array(forecast_histogram(expr,data,9))
pred2 = matrix2.mean(axis = 0) * X.shape[0]
cond2 = conditional_expectation(expr,data,1)
expr = 'Visits ~ Income + Sex + Age + Size'
print(zip_maximize(expr,data))
matrix3 = np.array(forecast_histogram(expr,data,9,True))
pred3 = matrix3.mean(axis = 0) * X.shape[0]
cond3 = conditional_expectation(expr,data,1, True)
# fig = go.Figure(data=[
# go.Bar(name='Actual', x=np.arange(10), y=np.unique(data.Visits,return_counts=True)[1][0:9]),
# go.Bar(name='Expected', x=np.arange(10), y=pred)
# ])
# fig.update_layout(title='Raw NBD',
# xaxis_title='x',
# yaxis_title='count',
# annotations=[
# ],
# xaxis = dict(
# tickmode = 'linear',
# tick0 = 0,
# dtick = 1
# )
# )
# # Change the bar mode
# fig.update_layout(barmode='group')
# fig.show()
# fig = go.Figure(data=[
# go.Bar(name='Actual', x=np.arange(10), y=np.unique(data.Visits,return_counts=True)[1][0:9]),
# go.Bar(name='Expected', x=np.arange(10), y=pred2)
# ])
# fig.update_layout(title='NBD Regression',
# xaxis_title='x',
# yaxis_title='count',
# annotations=[
# ],
# xaxis = dict(
# tickmode = 'linear',
# tick0 = 0,
# dtick = 1
# )
# )
# # Change the bar mode
# fig.update_layout(barmode='group')
# fig.show()
# fig = go.Figure(data=[
# go.Bar(name='Actual', x=np.arange(10), y=np.unique(data.Visits,return_counts=True)[1][0:9]),
# go.Bar(name='Expected', x=np.arange(10), y=pred3)
# ])
# fig.update_layout(title='ZIP NBD Regression',
# xaxis_title='x',
# yaxis_title='count',
# annotations=[
# ],
# xaxis = dict(
# tickmode = 'linear',
# tick0 = 0,
# dtick = 1
# )
# )
# # Change the bar mode
# fig.update_layout(barmode='group')
# fig.show()
Work In Progress
expr = 'Visits ~ Income + Sex + Age + Size'
Y,X = dmatrices(expr, data, return_type='dataframe')
X = X.iloc[:,1:]
residual = Y.iloc[:,0].to_list() - cond
Y = residual.reshape(-1, 1)
from sklearn.linear_model import Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import plot_partial_dependence
from sklearn.inspection import partial_dependence
Lasso_mod = Lasso(alpha = 0.012,tol=0.0001,max_iter=1000000)
Lasso_mod.fit(X,Y)
print(X.columns)
print(Lasso_mod.coef_)
rf = RandomForestRegressor(n_estimators=100)
rf.fit(X, Y.reshape(len(Y), ))
print(X.columns)
print(rf.feature_importances_)
import matplotlib.pyplot as plt
print(partial_dependence(estimator = rf,
X = X,
features=[0]))