Logistic回归

《机器学习实战》

回归

概念:假设有一些数据点,我们用一条直线对这些点进行拟合,这个拟合过程成为回归。这条线称之为 最佳拟合直线。

主要思想:根据现有数据对分类边界线建立回归公式,以此进行分类。

“回归”一词源于最佳拟合,表示要找到最佳拟合参数集。

Sigmoid函数

f(x) = 1/(1 + e(-x))

当 x = 0时, f(x) = 0; 随着 x 的增大,f(x) 逼近于1;随着 x 的减小,f(x) 逼近于0。

2.Logistic回归分类器原理:

我们可以在每个特征上都乘以一个回归系数,然后把所有的结果值相加,将这个总和代入Sigmoid函数中,进而得到一个0~1之间的数值。 任何大于0.5的数据被分为1类,小于0.5则被归为0类。 所以,Logistic回归也可以被看成是一种概率估计。

3.基于最优化方法的最佳回归系数确定

Sigmoid函数 输入记为Z,

Z = wOx0 + w1x1 + w2x2 + ... + wNxN 

可以写成 Z = wTx, 表示将这两个数值向量对应元素相乘然后全部加起来。 其中,x是分类器的输入参数,w就是最佳回归系数。

1
2
def Sigmoid(inX):
return 1.0/(1+math.exp(-inX))

梯度上升算法

我们需要寻找合适的参数 w 使得 wTx = 0 成为很好的分类判定边界。

思想:

要找到某函数的最大值,最好的方法是沿着该函数的梯度方向探寻。

对于Logistic回归而言,损失函数为非凸函数。重新定义损失函数后得到梯度上升算法迭代公式。

梯度上升算法伪代码

每个回归系数初始化为1
重复r次:
计算整个数据集的梯度
使用 alpha * gradient 更新回归系数
返回回归系数

1
2
3
4
5
6
weights = np.ones((n,1))
for k in range(maxCycles):
h = sigmoid(np.dot(dataMatrix,weights))
error = labelMat - h
weights = weights + alpha*dataMatrix.transpose()*error
return weights

Logistic回归 梯度上升优化算法

1.读取数据并提取对应的类别标签

1
2
3
4
5
6
7
def loadDataSet():
dataMat = []; labelMat = []
for line in open('data/testSet.txt').readlines():
lineArr = line.strip().split()
dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])])
labelMat.append(int(lineArr[2]))
return dataMat,labelMat
  1. sigmoid函数和梯度上升的程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def sigmoid(inX):
return 1.0/(1+math.exp(-inX))
def gradAscent(dataMatIn, classLabels):
dataMatrix = np.mat(dataMatIn)
labelMat = np.mat(classLabels).transpose()
m,n = np.shape(dataMatIn)
alpha = 0.001
maxCycles = 500
weights = np.ones((n,1))
for k in range(maxCycles):
h = sigmoid(np.dot(dataMatrix,weights))
error = labelMat - h
weights = weights + alpha*dataMatrix.transpose()*error
return weights

得到的结果为:

1
2
3
4
➜ logistic git:(master) ✗ python logistic.py
[[ 4.12414349]
[ 0.48007329]
[-0.6168482 ]]

3.分析数据:画出决策边界

对于hθ(x)=g(θTx)而言,θTx > 0,y = 1;θTx < 0, y = 0.所以我们认为θTx = 0是一个决策边界。

hθ(x)=g(θ0+θ1X1+θ2X2)

-> θ0+θ1X1+θ2X2 = 0

-> x2 = (-θ0-θ1X1)/θ2

-> y = (-θ0-θ1X)/θ2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def plotBestFit(weights,dataMat,labelMat):
dataArr = np.mat(dataMat)
weights = weights.getA()
n = np.shape(dataArr)[0]
xcord1 = []; ycord1 = []
xcord2 = []; ycord2 = []
for i in range(n):
if int(labelMat[i]) == 1:
xcord1.append(dataArr[i,1])
ycord1.append(dataArr[i,2])
else:
xcord2.append(dataArr[i,1])
ycord2.append(dataArr[i,2])
fig = plt.figure()
ax = fig.add_subplot(111)
ax.scatter(xcord1, ycord1, s=30, c='red', marker='s')
ax.scatter(xcord2, ycord2, s=30, c='green')
x = np.arange(-3.0, 3.0, 0.1)
y = (-weights[0] - weights[1]*x)/weights[2]
ax.plot(x, y)
plt.xlabel('X1'); plt.ylabel('X2')
plt.show()

随机梯度上升

梯度上升算法在每次更新回归系数时都需要遍历整个数据集,计算复杂度较高。

改进方法是 一次仅用一个样本点来更新回归系数。该方法称为随机梯度上升算法。

伪代码:

所有回归系数初始化为1
对数据集的每个样本:
计算该样本的梯度
使用 alpha * gradient 更新回归系数
返回回归系数

1
2
3
4
5
6
7
8
9
def socGradAscent0(dataMatrix, classLabels):
m,n = np.shape(dataMatrix)
alpha = 0.01
weights = np.ones(n)
for i in range(m):
h = sigmoid(sum(dataMatrix[i]*weights))
error = classLabels[i] - h
weights = weights + alpha * dataMatrix[i] * error
return weights

判断一个优化算法是否可靠的方式是看它是否收敛,也就是说参数是否达到了稳定值。

通过改变该算法的迭代次数可以看出:某些特征的回归系数会有周期性波动。

这是因为数据集中存在一些不能正确分类的样本点,在每次迭代时会引发系数的剧烈波动。

改进的随机梯度上升算法

为了解决随机梯度上升的问题,对算法进行了一些改进。

  1. alpha 在每次迭代时进行调整(随着迭代次数不断减小)
  2. 随机选取样本来更新回归系数
  3. 增加迭代次数
1
2
3
4
5
6
7
8
9
10
11
12
13
def socGradAscent1(dataMatrix, classLabels, numIter = 2):
m,n = np.shape(dataMatrix)
weights = np.ones(n)
for j in range(numIter):
dataIndex = list(range(m))
for i in range(m):
alpha = 0.04/(1.0+j+i) + 0.001
randIndex = int(np.random.uniform(0, len(dataIndex)))
h = sigmoid(sum(dataMatrix[randIndex]*weights))
error = classLabels[randIndex] - h
weights = weights + alpha * error * dataMatrix[randIndex]
del(dataIndex[randIndex])
return weights

示例: 从疝气病症预测病马的死亡率

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -*- coding: utf-8 -*-
# !/usr/bin/env python
import logistic
import numpy as np
TRAIN_TIMES = 500
def classifyVector(inX, weights):
prob = logistic.socSigmoid(sum(inX * weights))
if prob >= 0.5:
return 1.0
else:
return 0.0
def colicTest():
trainingSet = []; trainingLabels = []
for line in open('data/horseColicTraining.txt').readlines():
currLine = line.strip().split('\t')
lineArr = []
for i in range(len(currLine)-1):
lineArr.append(float(currLine[i]))
trainingSet.append(lineArr)
trainingLabels.append(float(currLine[len(currLine)-1]))
weights = logistic.socGradAscent1(np.array(trainingSet),trainingLabels,TRAIN_TIMES)
# print(weights)
errorCount = 0; numTestVec = 0.0
for line in open('data/horseColicTest.txt').readlines():
numTestVec += 1.0
currLine = line.strip().split('\t')
lineArr = []
for i in range(len(currLine)-1):
lineArr.append(float(currLine[i]))
if int(classifyVector(lineArr,weights) != int(currLine[len(currLine)-1])):
errorCount += 1
# print(errorCount,numTestVec)
errorRate = float(errorCount)/numTestVec
print('the error rate of this test is :%f' % errorRate)
return errorRate
def multiTest():
numTests = 10; errorSum = 0.0
for k in range(numTests):
errorSum += colicTest()
print('after %d iterations the average error rate is :%f' % (numTests,errorSum/float(numTests)))
if __name__ == '__main__':
multiTest()