《机器学习》 西瓜书的代码实现. 《机器学习》 西瓜书实例 第 7 章

我的博客: https://yunist.cn

《机器学习》西瓜书 第 7 章 编程实例

In [342]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import scipy.stats as st
In [343]:
data = pd.read_csv("work/西瓜数据集3.0.txt")
data.head()
Out[343]:
Color Pedicle Stroke Texture Umbilicus Touch Density Sugar content Good melon
0 青绿 蜷缩 浊响 清晰 凹陷 硬滑 0.697 0.460
1 乌黑 蜷缩 沉闷 清晰 凹陷 硬滑 0.774 0.376
2 乌黑 蜷缩 浊响 清晰 凹陷 硬滑 0.634 0.264
3 青绿 蜷缩 沉闷 清晰 凹陷 硬滑 0.608 0.318
4 浅白 蜷缩 浊响 清晰 凹陷 硬滑 0.556 0.215

朴素贝叶斯

根据贝叶斯定理 $$P(c \mid \boldsymbol{x}) = \frac{P(c)P(\boldsymbol{x}\mid c)}{P(\boldsymbol{x})}$$ 而朴素贝叶斯因为假设条件之间相互独立, 因此判定准则为 $$h_{nb}(\boldsymbol{x}) =\mathop{\arg \max}\limits_{c \in \mathcal{Y}}\;P(c)\prod_{i = 1}^{d}P(x_i\mid c)$$

将属性全部转化为递增数字

In [344]:
class Bayes:
    def train(self, X, y, con_col):
        self.con_col = con_col  # 连续值项
        self.X = X
        self.y = y
        self.dis_X = np.delete(X, con_col, axis=1)  # 离散值项
        self.con_X = X[:, con_col]  # 连续值项
        self.unique_y = np.unique(y)  # y的可能取值
        self.dis_P_all = self.get_dis_P(self.dis_X, y)
        self.P_c = P_c = (self.unique_y == y).sum(axis=0) / data.shape[0]
    def get_dis_P(self, X, y):
        P_all = []
        for i, c in enumerate(self.unique_y):  # 枚举y的可能取值
            temp = X[(y == c).ravel()]
            an_temp_list = []
            for x in range(temp.shape[1]):  # 枚举离散数据项
                temp_list = {}
                for x_i in np.unique(X[:, x]):  # 枚举每项数据的可能取值
                    temp_list[x_i] = self.P(temp, x, x_i)
                an_temp_list.insert(x, temp_list)
            P_all.insert(i, an_temp_list)
        return P_all
    def get_con_P(self, X, x, x_i):  # 获取离散值的分布概率
        return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std())  # 我们默认数据服从正态分布
    def P(self, X, x, x_i):  # y所有取值的概率
        return (X[:, x] == x_i).sum() / X.shape[0]

将 data 进行分割

In [345]:
X = np.array(data.iloc[:, :8])
y = np.array(data.iloc[:, 8])[None].T
In [346]:
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
Bayes_machine.dis_P_all
Out[346]:
[[{'乌黑': 0.2222222222222222,
   '浅白': 0.4444444444444444,
   '青绿': 0.3333333333333333},
  {'硬挺': 0.2222222222222222,
   '稍蜷': 0.4444444444444444,
   '蜷缩': 0.3333333333333333},
  {'沉闷': 0.3333333333333333,
   '浊响': 0.4444444444444444,
   '清脆': 0.2222222222222222},
  {'模糊': 0.3333333333333333,
   '清晰': 0.2222222222222222,
   '稍糊': 0.4444444444444444},
  {'凹陷': 0.2222222222222222,
   '平坦': 0.4444444444444444,
   '稍凹': 0.3333333333333333},
  {'硬滑': 0.6666666666666666, '软粘': 0.3333333333333333}],
 [{'乌黑': 0.5, '浅白': 0.125, '青绿': 0.375},
  {'硬挺': 0.0, '稍蜷': 0.375, '蜷缩': 0.625},
  {'沉闷': 0.25, '浊响': 0.75, '清脆': 0.0},
  {'模糊': 0.0, '清晰': 0.875, '稍糊': 0.125},
  {'凹陷': 0.625, '平坦': 0.0, '稍凹': 0.375},
  {'硬滑': 0.75, '软粘': 0.25}]]

拉普拉斯修正

出现了 0 , 为避免这个现象, 我们采用拉普拉斯修正, 重写 Byes 对象.

根据拉普拉斯修正, 我们有 $$\hat{P}(c) = \frac{|D_c| + 1}{|D| + N}\tag{7.19}$$ $$\hat{P}(x_i \mid c) = \frac{|D_{c, x_i}| + 1}{|D_c| + N_i}\tag{7.20}$$

In [347]:
class Bayes:
    def train(self, X, y, con_col):
        self.con_col = con_col
        self.X = X
        self.y = y
        self.dis_X = np.delete(X, con_col, axis=1)
        self.con_X = X[:, con_col]
        self.unique_y = np.unique(y)
        self.dis_P_all = self.get_dis_P(self.dis_X, y)
        self.P_c = ((self.unique_y == y).sum(axis=0) + 1) / (data.shape[0] + self.unique_y.shape[0])
    def get_dis_P(self, X, y):
        P_all = []
        for i, c in enumerate(self.unique_y):
            temp = X[(y == c).ravel()]
            an_temp_list = []
            for x in range(temp.shape[1]):
                temp_list = {}
                temp_uni = np.unique(X[:, x])
                for x_i in temp_uni:
                    temp_list[x_i] = self.P(temp, x, x_i, temp_uni.shape[0])
                an_temp_list.insert(x, temp_list)
            P_all.insert(i, an_temp_list)
        return P_all
    def get_con_P(self, X, x, x_i):
        return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std())  # 我们默认数据服从正态分布
    def P(self, X, x, x_i, N):
        return ((X[:, x] == x_i).sum() + 1) / (X.shape[0] + N)
In [348]:
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
Bayes_machine.dis_P_all
Out[348]:
[[{'乌黑': 0.25, '浅白': 0.4166666666666667, '青绿': 0.3333333333333333},
  {'硬挺': 0.25, '稍蜷': 0.4166666666666667, '蜷缩': 0.3333333333333333},
  {'沉闷': 0.3333333333333333, '浊响': 0.4166666666666667, '清脆': 0.25},
  {'模糊': 0.3333333333333333, '清晰': 0.25, '稍糊': 0.4166666666666667},
  {'凹陷': 0.25, '平坦': 0.4166666666666667, '稍凹': 0.3333333333333333},
  {'硬滑': 0.6363636363636364, '软粘': 0.36363636363636365}],
 [{'乌黑': 0.45454545454545453,
   '浅白': 0.18181818181818182,
   '青绿': 0.36363636363636365},
  {'硬挺': 0.09090909090909091,
   '稍蜷': 0.36363636363636365,
   '蜷缩': 0.5454545454545454},
  {'沉闷': 0.2727272727272727,
   '浊响': 0.6363636363636364,
   '清脆': 0.09090909090909091},
  {'模糊': 0.09090909090909091,
   '清晰': 0.7272727272727273,
   '稍糊': 0.18181818181818182},
  {'凹陷': 0.5454545454545454,
   '平坦': 0.09090909090909091,
   '稍凹': 0.36363636363636365},
  {'硬滑': 0.7, '软粘': 0.3}]]
In [349]:
Bayes_machine.P_c
Out[349]:
array([0.52631579, 0.47368421])

现在我们来进行预测.

In [350]:
class Bayes:
    def train(self, X, y, con_col):
        self.con_col = con_col
        self.X = X
        self.y = y
        self.dis_X = np.delete(X, con_col, axis=1)
        self.con_X = X[:, con_col]
        self.unique_y = np.unique(y)
        self.dis_P_all = self.get_dis_P(self.dis_X, y)
        self.P_c = ((self.unique_y == y).sum(axis=0) + 1) / (data.shape[0] + self.unique_y.shape[0])
    def get_dis_P(self, X, y):
        P_all = []
        for i, c in enumerate(self.unique_y):
            temp = X[(y == c).ravel()]
            an_temp_list = []
            for x in range(temp.shape[1]):
                temp_list = {}
                temp_uni = np.unique(X[:, x])
                for x_i in temp_uni:
                    temp_list[x_i] = self.P(temp, x, x_i, temp_uni.shape[0])
                an_temp_list.insert(x, temp_list)
            P_all.insert(i, an_temp_list)
        return P_all
    def get_con_P(self, X, x, x_i):
        return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std())  # 我们默认数据服从正态分布
    def P(self, X, x, x_i, N):
        return ((X[:, x] == x_i).sum() + 1) / (X.shape[0] + N)
    def predict(self, X_pre):
        y_pre = np.ones(X_pre.shape[0], dtype='object')
        for j, X in enumerate(X_pre):
            max = 0
            res_c = 0
            for i, c in enumerate(self.unique_y):  # 枚举y的取值计算概率
                res = 1
                temp_X = self.X[(self.y == c).ravel()]  # 这是y等于c时的集合
                a = 0
                for x in range(X.shape[0]):  # 连续值
                    if x in self.con_col:
                        res *= self.get_con_P(temp_X, x, X[x])
                    else:
                        res *= (self.dis_P_all[i][a][X[x]])  # 这里进行了小小的修改, 因为之前的代码默认连续值是在最后
                        a += 1
                res *= self.P_c[i]
                if res > max:  # 看看哪种情况可能性最大
                    max = res
                    res_c = c
            y_pre[j] = res_c
        return y_pre[None].T
In [351]:
Bayes_machine = Bayes()
Bayes_machine.train(X, y, [6, 7])
In [352]:
test_X = np.array(['青绿', '蜷缩', '浊响', '清晰', '凹陷', '硬滑', 0.697, 0.460], dtype='object')
In [353]:
Bayes_machine.predict(X) == y
Out[353]:
array([[ True],
       [ True],
       [ True],
       [ True],
       [ True],
       [ True],
       [False],
       [ True],
       [ True],
       [ True],
       [ True],
       [ True],
       [False],
       [ True],
       [False],
       [ True],
       [ True]])
In [354]:
Bayes_machine.predict(test_X[None])
Out[354]:
array([['是']], dtype=object)

因此分类为好瓜.

$\mathrm{AODE}$ 分类器

$\mathrm{AODE}$ 分类器基于集成学习机制, 尝试将每个属性作为超父来构建 $\mathrm{SPODE}$.

由于 $\mathrm{AODE}$ 会将所有属性作为超父, 如果用打表的方式 (也就是先计算出来, 用的时候直接使用), 占用空间会非常大, 所以我们采用即时计算的方式 (即拿到预测数据再计算) ,

In [355]:
class AODE:
    def train(self, X, y, con_col, m=15):
        """
        m即为阀值, 默认为15
        """
        self.m = m
        self.con_col = con_col
        self.X = X
        self.y = y
        self.dis_X = np.delete(X, con_col, axis=1)
        self.con_X = X[:, con_col]
        self.unique_y = np.unique(y)
        self.unique_X = [1] * self.dis_X.shape[1]
        for x in range(self.dis_X.shape[1]):  # 枚举每个离散属性计算取值数
            self.unique_X[x] = np.unique(self.dis_X[:, x])
    def get_con_P(self, X, x, x_i):
        """
        X是类别为c, 依赖项取值为x_j的集合, x是主项, x_i是主项的取值
        """
        return st.norm.pdf(x_i, loc=X[:, x].mean(), scale=X[:, x].std())  # 我们默认数据服从正态分布
    def P_xj_c_xi(self, D_c_ij, D_c_x, x):
        N_j = self.unique_X[x].shape[0]
        return (D_c_ij.shape[0] + 1) / (D_c_x.shape[0] + N_j)
    def P_c_xi(self, D_c_x, depend):  
        """
        X是训练集, y是X对应的类别, c是具体的类别, x_i是依赖项的取值, depend是依赖项, N是y的取值数, N_i是依赖项的取值数
        """
        N = self.unique_y.shape[0]
        N_i = self.unique_X[depend].shape[0]
        return (D_c_x.shape[0] + 1) / (X.shape[0] + N * N_i)
    def predict_one_part(self, X, c, depend):
        """
        X是预测向量, c是类别
        """
        D_x = self.X[self.X[:, depend] == X[depend]]
        if D_x.shape[0] < self.m:
            return 0
        res = 1
        D_c_x = self.X[(self.y == c).T[0] | (self.X[:, depend] == X[depend])]
        res *= self.P_c_xi(D_c_x, depend)
        for x in range(X.shape[0] - len(self.con_col)):
            D_c_ij = self.X[(self.y == c).T[0] | (self.X[:, depend] == X[depend]) | (self.X[:, x] == X[x])]
            res *= self.P_xj_c_xi(D_c_ij, D_c_x, x)
        for x in self.con_col:
            res *= self.get_con_P(D_c_x, x, X[x])
        return res
    def predict(self, X_pre):
        y_pre = np.ones(X_pre.shape[0], dtype='object')
        for i, X in enumerate(X_pre):
            max = 0
            res_y = 0
            for c in self.unique_y:
                res = 0
                for depend in range(X.shape[0] - len(self.con_col)):
                    res += self.predict_one_part(X, c, depend)
                if res > max:
                    max = res
                    res_y = c
            y_pre[i] = res_y
        return y_pre
In [356]:
AODE_machine = AODE()
AODE_machine.train(X, y, [6, 7], 2)  # 由于我们的训练集比较小, 因此m值也设置得比较小
In [357]:
AODE_machine.predict(X)
Out[357]:
array(['是', '是', '是', '是', '是', '否', '否', '否', '否', '否', '否', '否', '是',
       '是', '是', '否', '否'], dtype=object)
In [358]:
AODE_machine.predict(test_X[None])
Out[358]:
array(['是'], dtype=object)

因此仍然分类为 '是'.

基于 $\mathrm{BIC}$ 准则的贝叶斯网.

$$\mathrm{BIC}(B\mid D) = \frac{\log m}{2}|B| - LL(B\mid D)$$
In [359]:
def P(X, x, i, depend, con_col):  # 计算单个数据的单个属性概率
    raw = False
    if depend != []:
        for j in depend:
            raw = raw | (X[:, j] == x[j])
        X = X[raw]
    if i in con_col:
        return st.norm.pdf(x[i], loc=X[:, i].mean(), scale=X[:, i].std())  # 高斯分布的概率函数
    return (X[:, i] == x[i]).sum() / X.shape[0]
In [360]:
def P_one(X, x, B, con_col):  # 计算单个数据的概率
    res = 1
    for i in range(x.shape[0]):
        res *= P(X, x, i, B[i], con_col)
    return res
In [361]:
def LL(X, B, con_col):
    sum = 0
    for i in range(X.shape[0]):
        sum += np.log(P_one(X, X[i], B, con_col))
    return sum
In [362]:
 def BIC(X, B, m, con_col):
     return np.log(m) / 2 * len(B) - LL(X, B, con_col)

我们构建一个参数集来测试一下, 这里的参数集只表明某个属性的依赖项和依赖项的值, 然后进行在线计算, 而不是打表.

In [363]:
B = [[1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2], [1, 2]]
In [364]:
BIC(X, B, 2, [6, 7])  # 这里m = 2
Out[364]:
45.97577407749653

由于搜索所有可能的网络结构空间来找出最优贝叶斯网是 $\mathrm{NP}$ 难的, 所以我们采用一种更为简单的方法搜索, 即对于某个特征, 依次增加其他特征来作为依赖项, 如果该操作导致 $\mathrm{BIC}$ 评分下降, 那么就对另一个特征做该操作, 直到对所有特征都做过同样的操作.

In [365]:
B = [[]] * X.shape[1]
In [366]:
min = BIC(X, B, 2, [6, 7])
for i in range(X.shape[1]):
    for j in range(i + 1, X.shape[1]):
        B[i].append(j)
        res = BIC(X, B, 2, [6, 7])
        if res < min:
            min = res
        else:
            B[i].pop()
B
Out[366]:
[[1], [1], [1], [1], [1], [1], [1], [1]]

(我应该没写错吧..答案怎么这么凑巧)

$\mathrm{EM}$ 算法

$\mathrm{EM}$ 算法就是, 我 迭 代 我 自 己. 简单来说, 如果一个情形出现的可能性是最大的, 那为什么不让它出现呢,

我们以第一项颜色为例.

In [367]:
unique = np.unique(X[:, 0])
In [368]:
X_test = np.array(data)[:, 1:]
y_test = np.array(data.iloc[:, 0])[None].T

我们用朴素贝叶斯.

In [369]:
Bayes_machine = Bayes()
Bayes_machine.train(X_test, y_test, [5, 6])
Bayes_machine.predict(X_test)
Out[369]:
array([['乌黑'],
       ['青绿'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['青绿'],
       ['浅白'],
       ['浅白'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['浅白'],
       ['浅白']], dtype=object)

先随机初始化.

In [370]:
y_test = unique[np.random.randint(unique.shape[0], size=y.shape[0])][None].T
In [375]:
while True:
    Bayes_machine.train(X_test, y_test, [5, 6])
    new = Bayes_machine.predict(X_test)
    if (new == y_test).all():
        break
    y_test = new
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/scipy/stats/_distn_infrastructure.py:1720: RuntimeWarning: divide by zero encountered in double_scalars
  x = np.asarray((x - loc)/scale, dtype=dtyp)
/opt/conda/envs/python35-paddle120-env/lib/python3.7/site-packages/scipy/stats/_distn_infrastructure.py:1720: RuntimeWarning: invalid value encountered in double_scalars
  x = np.asarray((x - loc)/scale, dtype=dtyp)
In [376]:
y_test
Out[376]:
array([['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['青绿'],
       ['乌黑'],
       ['乌黑'],
       ['乌黑'],
       ['青绿'],
       ['青绿'],
       ['青绿'],
       ['乌黑'],
       ['乌黑'],
       ['青绿'],
       ['青绿'],
       ['青绿']], dtype=object)

训练完成.

哎呀妈呀终于写完了.