《机器学习》 西瓜书的代码实现.
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as st
data = pd.read_csv("work/西瓜数据集3.0.txt")
data.head()
根据贝叶斯定理 $$P(c \mid \boldsymbol{x}) = \frac{P(c)P(\boldsymbol{x}\mid c)}{P(\boldsymbol{x})}$$ 而朴素贝叶斯因为假设条件之间相互独立, 因此判定准则为 $$h_{nb}(\boldsymbol{x}) =\mathop{\arg \max}\limits_{c \in \mathcal{Y}}\;P(c)\prod_{i = 1}^{d}P(x_i\mid c)$$
将属性全部转化为递增数字
class Bayes:
def train(self, X, y, con_col):
self.con_col = con_col # 连续值项
self.X = X
self.y = y
self.dis_X = np.delete(X, con_col, axis=1) # 离散值项
self.con_X = X[:, con_col] # 连续值项
self.unique_y = np.unique(y) # y的可能取值
self.dis_P_all = self.get_dis_P(self.dis_X, y)
self.P_c = P_c = (self.unique_y == y).sum(axis=0) / data.shape[0]
def get_dis_P(self, X, y):
P_all = []
for i, c in enumerate(self.unique_y): # 枚举y的可能取值
temp = X[(y == c).ravel()]
an_temp_list = []
for x in range(temp.shape[1]): # 枚举离散数据项
temp_list = {}
for x_i in np.unique(X[:, x]): # 枚举每项数据的可能取值
temp_list[x_i] = self.P(temp, x, x_i)
an_temp_list.insert(x, temp_list)
P_all.insert(i, an_temp_list)
return P_all
def get_con_P(self, X, x, x_i): # 获取离散值的分布概率
return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std()) # 我们默认数据服从正态分布
def P(self, X, x, x_i): # y所有取值的概率
return (X[:, x] == x_i).sum() / X.shape[0]
将 data 进行分割
X = np.array(data.iloc[:, :8])
y = np.array(data.iloc[:, 8])[None].T
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
Bayes_machine.dis_P_all
出现了 0 , 为避免这个现象, 我们采用拉普拉斯修正, 重写 Byes 对象.
根据拉普拉斯修正, 我们有 $$\hat{P}(c) = \frac{|D_c| + 1}{|D| + N}\tag{7.19}$$ $$\hat{P}(x_i \mid c) = \frac{|D_{c, x_i}| + 1}{|D_c| + N_i}\tag{7.20}$$
class Bayes:
def train(self, X, y, con_col):
self.con_col = con_col
self.X = X
self.y = y
self.dis_X = np.delete(X, con_col, axis=1)
self.con_X = X[:, con_col]
self.unique_y = np.unique(y)
self.dis_P_all = self.get_dis_P(self.dis_X, y)
self.P_c = ((self.unique_y == y).sum(axis=0) + 1) / (data.shape[0] + self.unique_y.shape[0])
def get_dis_P(self, X, y):
P_all = []
for i, c in enumerate(self.unique_y):
temp = X[(y == c).ravel()]
an_temp_list = []
for x in range(temp.shape[1]):
temp_list = {}
temp_uni = np.unique(X[:, x])
for x_i in temp_uni:
temp_list[x_i] = self.P(temp, x, x_i, temp_uni.shape[0])
an_temp_list.insert(x, temp_list)
P_all.insert(i, an_temp_list)
return P_all
def get_con_P(self, X, x, x_i):
return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std()) # 我们默认数据服从正态分布
def P(self, X, x, x_i, N):
return ((X[:, x] == x_i).sum() + 1) / (X.shape[0] + N)
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
Bayes_machine.dis_P_all
Bayes_machine.P_c
现在我们来进行预测.
class Bayes:
def train(self, X, y, con_col):
self.con_col = con_col
self.X = X
self.y = y
self.dis_X = np.delete(X, con_col, axis=1)
self.con_X = X[:, con_col]
self.unique_y = np.unique(y)
self.dis_P_all = self.get_dis_P(self.dis_X, y)
self.P_c = ((self.unique_y == y).sum(axis=0) + 1) / (data.shape[0] + self.unique_y.shape[0])
def get_dis_P(self, X, y):
P_all = []
for i, c in enumerate(self.unique_y):
temp = X[(y == c).ravel()]
an_temp_list = []
for x in range(temp.shape[1]):
temp_list = {}
temp_uni = np.unique(X[:, x])
for x_i in temp_uni:
temp_list[x_i] = self.P(temp, x, x_i, temp_uni.shape[0])
an_temp_list.insert(x, temp_list)
P_all.insert(i, an_temp_list)
return P_all
def get_con_P(self, X, x, x_i):
return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std()) # 我们默认数据服从正态分布
def P(self, X, x, x_i, N):
return ((X[:, x] == x_i).sum() + 1) / (X.shape[0] + N)
def predict(self, X_pre):
y_pre = np.ones(X_pre.shape[0], dtype='object')
for j, X in enumerate(X_pre):
max = 0
res_c = 0
for i, c in enumerate(self.unique_y): # 枚举y的取值计算概率
res = 1
temp_X = self.X[(self.y == c).ravel()] # 这是y等于c时的集合
a = 0
for x in range(X.shape[0]): # 连续值
if x in self.con_col:
res *= self.get_con_P(temp_X, x, X[x])
else:
res *= (self.dis_P_all[i][a][X[x]]) # 这里进行了小小的修改, 因为之前的代码默认连续值是在最后
a += 1
res *= self.P_c[i]
if res > max: # 看看哪种情况可能性最大
max = res
res_c = c
y_pre[j] = res_c
return y_pre[None].T
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
test_X = np.array(['青绿', '蜷缩', '浊响', '清晰', '凹陷', '硬滑', 0.697, 0.460], dtype='object')
Bayes_machine.predict(X) == y
Bayes_machine.predict(test_X[None])
因此分类为好瓜.
$\mathrm{AODE}$ 分类器基于集成学习机制, 尝试将每个属性作为超父来构建 $\mathrm{SPODE}$.
由于 $\mathrm{AODE}$ 会将所有属性作为超父, 如果用打表的方式 (也就是先计算出来, 用的时候直接使用), 占用空间会非常大, 所以我们采用即时计算的方式 (即拿到预测数据再计算) ,
class AODE:
def train(self, X, y, con_col, m=15):
"""
m即为阀值, 默认为15
"""
self.m = m
self.con_col = con_col
self.X = X
self.y = y
self.dis_X = np.delete(X, con_col, axis=1)
self.con_X = X[:, con_col]
self.unique_y = np.unique(y)
self.unique_X = [1] * self.dis_X.shape[1]
for x in range(self.dis_X.shape[1]): # 枚举每个离散属性计算取值数
self.unique_X[x] = np.unique(self.dis_X[:, x])
def get_con_P(self, X, x, x_i):
"""
X是类别为c, 依赖项取值为x_j的集合, x是主项, x_i是主项的取值
"""
return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std()) # 我们默认数据服从正态分布
def P_xj_c_xi(self, D_c_ij, D_c_x, x):
N_j = self.unique_X[x].shape[0]
return (D_c_ij.shape[0] + 1) / (D_c_x.shape[0] + N_j)
def P_c_xi(self, D_c_x, depend):
"""
X是训练集, y是X对应的类别, c是具体的类别, x_i是依赖项的取值, depend是依赖项, N是y的取值数, N_i是依赖项的取值数
"""
N = self.unique_y.shape[0]
N_i = self.unique_X[depend].shape[0]
return (D_c_x.shape[0] + 1) / (X.shape[0] + N * N_i)
def predict_one_part(self, X, c, depend):
"""
X是预测向量, c是类别
"""
D_x = self.X[self.X[:, depend] == X[depend]]
if D_x.shape[0] < self.m:
return 0
res = 1
D_c_x = self.X[(self.y == c).T[0] | (self.X[:, depend] == X[depend])]
res *= self.P_c_xi(D_c_x, depend)
for x in range(X.shape[0] - len(self.con_col)):
D_c_ij = self.X[(self.y == c).T[0] | (self.X[:, depend] == X[depend]) | (self.X[:, x] == X[x])]
res *= self.P_xj_c_xi(D_c_ij, D_c_x, x)
for x in self.con_col:
res *= self.get_con_P(D_c_x, x, X[x])
return res
def predict(self, X_pre):
y_pre = np.ones(X_pre.shape[0], dtype='object')
for i, X in enumerate(X_pre):
max = 0
res_y = 0
for c in self.unique_y:
res = 0
for depend in range(X.shape[0] - len(self.con_col)):
res += self.predict_one_part(X, c, depend)
if res > max:
max = res
res_y = c
y_pre[i] = res_y
return y_pre
AODE_machine = AODE()
AODE_machine.train(X, y, [6, 7], 2) # 由于我们的训练集比较小, 因此m值也设置得比较小
AODE_machine.predict(X)
AODE_machine.predict(test_X[None])
因此仍然分类为 '是'.
def P(X, x, i, depend, con_col): # 计算单个数据的单个属性概率
raw = False
if depend != []:
for j in depend:
raw = raw | (X[:, j] == x[j])
X = X[raw]
if i in con_col:
return st.norm.pdf(x[i], loc=X[:, i].mean(), scale=X[:, i].std()) # 高斯分布的概率函数
return (X[:, i] == x[i]).sum() / X.shape[0]
def P_one(X, x, B, con_col): # 计算单个数据的概率
res = 1
for i in range(x.shape[0]):
res *= P(X, x, i, B[i], con_col)
return res
def LL(X, B, con_col):
sum = 0
for i in range(X.shape[0]):
sum += np.log(P_one(X, X[i], B, con_col))
return sum
def BIC(X, B, m, con_col):
return np.log(m) / 2 * len(B) - LL(X, B, con_col)
我们构建一个参数集来测试一下, 这里的参数集只表明某个属性的依赖项和依赖项的值, 然后进行在线计算, 而不是打表.
B = [[1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2]]
BIC(X, B, 2, [6, 7]) # 这里m = 2
由于搜索所有可能的网络结构空间来找出最优贝叶斯网是 $\mathrm{NP}$ 难的, 所以我们采用一种更为简单的方法搜索, 即对于某个特征, 依次增加其他特征来作为依赖项, 如果该操作导致 $\mathrm{BIC}$ 评分下降, 那么就对另一个特征做该操作, 直到对所有特征都做过同样的操作.
B = [[]] * X.shape[1]
min = BIC(X, B, 2, [6, 7])
for i in range(X.shape[1]):
for j in range(i + 1, X.shape[1]):
B[i].append(j)
res = BIC(X, B, 2, [6, 7])
if res < min:
min = res
else:
B[i].pop()
B
(我应该没写错吧..答案怎么这么凑巧)
$\mathrm{EM}$ 算法就是, 我 迭 代 我 自 己. 简单来说, 如果一个情形出现的可能性是最大的, 那为什么不让它出现呢,
我们以第一项颜色为例.
unique = np.unique(X[:, 0])
X_test = np.array(data)[:, 1:]
y_test = np.array(data.iloc[:, 0])[None].T
我们用朴素贝叶斯.
Bayes_machine = Bayes()
Bayes_machine.train(X_test, y_test, [5, 6])
Bayes_machine.predict(X_test)
先随机初始化.
y_test = unique[np.random.randint(unique.shape[0], size=y.shape[0])][None].T
while True:
Bayes_machine.train(X_test, y_test, [5, 6])
new = Bayes_machine.predict(X_test)
if (new == y_test).all():
break
y_test = new
y_test
训练完成.
哎呀妈呀终于写完了.