首页 > 代码库 > 逻辑回归(logistic regression)

逻辑回归(logistic regression)

1、简介

回归是一种极易理解的模型,就相当于y=f(x),表明自变量x与因变量y的关系。最简单的回归是线性回归,然而线性回归的鲁棒性很差,使回归模型在训练集上表现都很差。这主要是由于线性回归在整个实数域内敏感度一致,而分类范围,需要在[0,1]。逻辑回归就是一种减小预测范围,将预测值限定为[0,1]间的一种回归模型。

2、模型

逻辑回归其实仅为在线性回归的基础上,套用了一个逻辑函数,首先介绍逻辑斯谛分布,该分布的定义是:设X是连续随机变量,X服从逻辑斯谛分布是指X服从如下分布函数和密度函数:

技术分享

其中,技术分享为位置参数,技术分享> 0 为形状参数。

可以通过其图像观察:

技术分享

右边的逻辑斯蒂分布函数以点技术分享中心对称,即满足:

技术分享

形状参数技术分享越小,曲线在中心的增长速度越快。

二项逻辑斯蒂回归模型

这是一种由条件概率表示的模型,其条件概率模型如下:

技术分享

其中,exp为以e为底的指数函数,x∈Rn是输入,y∈{0,1}输出,w,b是模型参数——w是权值向量,b称作偏置,w·x是向量内积。

有了后验概率,逻辑斯蒂回归模型选择二分类中较大的那一个完成分类。

另外,逻辑斯特回归模型还有一个方便的形式,如果将权值向量w和输入向量x拓充为w=(w(1),w(2),…w(n),b)T,x=(x(1),…x(n),1)T,此时逻辑斯谛模型可以表示为:

技术分享

为什么要重新提一个形式出来呢?这是因为,这个形式跟几率的表达式很像。

定义事件的几率:发生概率与不发生概率的比值——技术分享

定义对数几率:

技术分享

 

将逻辑斯蒂模型的便捷形式做一个变换恰好可以得到:

技术分享

 

这也就是说,在逻辑斯蒂回归模型中,输出Y=1的对数几率是输入x的线性函数。或者说输出Y=1的对数几率是由输入x的线性函数表示的模型,即逻辑斯蒂回归模型。反过来讲,如果知道权值向量,给定输入x,就能求出Y=1的概率:

技术分享

 

线性函数w·x的值越接近正无穷,概率值越接近1;反之,越接近负无穷,概率值越接近0——这就是逻辑斯谛回归模型。

模型参数估计

在模型学习的时候,对于给定训练集T = {(x1,y1)…(xN,yN)},x∈Rn,y∈{0,1}

技术分享

定义似然函数

技术分享

则有对数似然函数

技术分享

这个好说,把后面括号里的负π提到前面去就行了。

对L(w)求极大值就可以得出权值向量w的估计值。

解决以L(w)为目标函数的最优化问题的一般方法是梯度下降法及拟牛顿法。

梯度下降算法

求函数最小值的时候用的是梯度下降算法,而此处求的是对数似然函数的最大值,所以应该称为梯度上升算法。函数的梯度由其偏导数构成:

技术分享

技术分享

梯度是函数增长最快的方向,记移动补偿为α,则梯度算法的迭代公式为:

技术分享

假定权值向量w有了,怎么计算模型输出呢?

特征向量乘以权值向量得出一个实数z

技术分享

希望通过该实数输出一个0或1的类别,这时候就需要利用Sigmoid函数了:

技术分享

其图像如下:

技术分享

将该实数代入Sigmoid函数后,得到一个0~1之间的数,大于0.5归入1,小于0.5归入0即可。

利用Sigmoid函数,梯度上升算法的代码如下:

from numpy import *
 
def sigmoid(inX):
    return 1.0/(1+exp(-inX))
 
def gradAscent(dataMatIn, classLabels):
    """
    逻辑斯谛回归梯度上升优化算法
    :param dataMatIn:输入X矩阵(100*3的矩阵,每一行代表一个实例,每列分别是X0 X1 X2)
    :param classLabels: 输出Y矩阵(类别标签组成的向量)
    :return:权值向量
    """
    dataMatrix = mat(dataMatIn)             #转换为 NumPy 矩阵数据类型
    labelMat = mat(classLabels).transpose() #转换为 NumPy 矩阵数据类型
    m,n = shape(dataMatrix)                 #矩阵大小
    alpha = 0.001                           #步长
    maxCycles = 500
    weights = ones((n,1))
    for k in range(maxCycles):              #最大迭代次数
        h = sigmoid(dataMatrix*weights)     #矩阵内积
        error = (labelMat - h)              #向量减法
        weights += alpha * dataMatrix.transpose() * error  #矩阵内积
    return weights

 

随机梯度上升算法

梯度下降算法在每次更新权值向量的时候都需要遍历整个数据集,该方法对小数据集尚可。但如果有数十亿样本和成千上万的特征时,它的计算复杂度就太高了。一种改进的方法是一次仅用一个样本点的回归误差来更新权值向量,这个方法叫随机梯度下降算法。由于可以在遇到新样本的时候再对分类器进行增量式更新,所以随机梯度上升算法是一个在线学习算法;与此对应,一次处理完所有数据的算法(如梯度上升算法)被称作“批处理”。

随机梯度上升算法的代码:

def stocGradAscent0(dataMatrix, classLabels, history_weight):
    """
    随机梯度上升算法
    :param dataMatIn:输入X矩阵(100*3的矩阵,每一行代表一个实例,每列分别是X0 X1 X2)
    :param classLabels: 输出Y矩阵(类别标签组成的向量)
    :return:权值向量
    """
    dataMatrix = array(dataMatrix)
    m,n = shape(dataMatrix)
    alpha = 0.01
    weights = ones(n)                               #初始化为单位矩阵
    for i in range(m):
        h = sigmoid(sum(dataMatrix[i]*weights))     #挑选(伪随机)第i个实例来更新权值向量
        error = classLabels[i] - h
        weights = weights + dataMatrix[i] * alpha * error
        history_weight.append(copy(weights))
    return weights

 

改进的随机梯度上升算法

既然随机梯度上升算法最终给出的参数不好,那是否仅仅是因为参数没有足够收敛,而算法本质是优秀的呢?对此,可以逐步减小步长,避免参数周期性的抖动。代码:

def stocGradAscent1(dataMatrix, classLabels, numIter=150):
    """
    改进的随机梯度上升算法
    :param dataMatIn:输入X矩阵(100*3的矩阵,每一行代表一个实例,每列分别是X0 X1 X2)
    :param classLabels: 输出Y矩阵(类别标签组成的向量)
    :param numIter: 迭代次数
    :return:
    """
    dataMatrix = array(dataMatrix)
    m,n = shape(dataMatrix)
    weights = ones(n)                                           #初始化为单位矩阵
    for j in range(numIter):
        dataIndex = range(m)
        for i in range(m):
            alpha = 4/(1.0+j+i)+0.0001                          #步长递减,但是由于常数存在,所以不会变成0
            randIndex = int(random.uniform(0,len(dataIndex)))   #总算是随机了
            h = sigmoid(sum(dataMatrix[randIndex]*weights))
            error = classLabels[randIndex] - h
            weights = weights + alpha * error * dataMatrix[randIndex]
            del(dataIndex[randIndex])                           #删除这个样本,以后就不会选到了
    return weights

以上三种算法的效果:

技术分享

此外,还有多元逻辑回归,假设离散型随机变量Y的取值集合是{1,2,…K},那么多项逻辑斯蒂回归模型是: 
技术分享 
对于参数估计方法仍然可以采用最大似然估计。

3、总结

个人感觉,只要特征找的准,数据量足够大,逻辑斯蒂回归将会非常好用。另外,还要注意避免过拟合。

特征选择的话,由于逻辑斯蒂回归的优点,开始的时候不用考虑各个特征之间是否有相关性,直接把能用的特征全部线性加权起来就好。经过初步训练,观察各个特征的权值,如果权值接近为0,那么就可以将这个特征看做是不相关的可以去除的特征。总结起来就是:先做加法再做减法。

解决过拟合的方法不过两种,一种是减少特征的个数;另一种是模型选择的正则化方法。正则化的话,可以参考岭回归方法。逻辑回归的优缺点总结如下:优点:计算代价不高,易于理解和实现,且若采用随机梯度上升法可以在线学习; 缺点:可能容易欠拟合,分类精度不高,这个可能是因为我们无法找到足够的特征。

4、实现

依据以上分析,得到如下代码:

‘‘‘
Created on Oct 27, 2010
Logistic Regression Working Module
@author: Peter
‘‘‘
from numpy import *

def loadDataSet():
    dataMat = []; labelMat = []
    fr = open(testSet.txt)
    for line in fr.readlines():
        lineArr = line.strip().split()
        dataMat.append([1.0, float(lineArr[0]), float(lineArr[1])])
        labelMat.append(int(lineArr[2]))
    return dataMat,labelMat

def sigmoid(inX):
    return 1.0/(1+exp(-inX))

def gradAscent(dataMatIn, classLabels):
    dataMatrix = mat(dataMatIn)             #convert to NumPy matrix
    labelMat = mat(classLabels).transpose() #convert to NumPy matrix
    m,n = shape(dataMatrix)
    alpha = 0.001
    maxCycles = 500
    weights = ones((n,1))
    for k in range(maxCycles):              #heavy on matrix operations
        h = sigmoid(dataMatrix*weights)     #matrix mult
        error = (labelMat - h)              #vector subtraction
        weights = weights + alpha * dataMatrix.transpose()* error #matrix mult
    return weights

def plotBestFit(weights):
    import matplotlib.pyplot as plt
    dataMat,labelMat=loadDataSet()
    dataArr = array(dataMat)
    n = shape(dataArr)[0] 
    xcord1 = []; ycord1 = []
    xcord2 = []; ycord2 = []
    for i in range(n):
        if int(labelMat[i])== 1:
            xcord1.append(dataArr[i,1]); ycord1.append(dataArr[i,2])
        else:
            xcord2.append(dataArr[i,1]); ycord2.append(dataArr[i,2])
    fig = plt.figure()
    ax = fig.add_subplot(111)
    ax.scatter(xcord1, ycord1, s=30, c=red, marker=s)
    ax.scatter(xcord2, ycord2, s=30, c=green)
    x = arange(-3.0, 3.0, 0.1)
    y = (-weights[0]-weights[1]*x)/weights[2]
    ax.plot(x, y)
    plt.xlabel(X1); plt.ylabel(X2);
    plt.show()

def stocGradAscent0(dataMatrix, classLabels):
    m,n = shape(dataMatrix)
    alpha = 0.01
    weights = ones(n)   #initialize to all ones
    for i in range(m):
        h = sigmoid(sum(dataMatrix[i]*weights))
        error = classLabels[i] - h
        weights = weights + alpha * error * dataMatrix[i]
    return weights

def stocGradAscent1(dataMatrix, classLabels, numIter=150):
    m,n = shape(dataMatrix)
    weights = ones(n)   #initialize to all ones
    for j in range(numIter):
        dataIndex = range(m)
        for i in range(m):
            alpha = 4/(1.0+j+i)+0.0001    #apha decreases with iteration, does not 
            randIndex = int(random.uniform(0,len(dataIndex)))#go to 0 because of the constant
            h = sigmoid(sum(dataMatrix[randIndex]*weights))
            error = classLabels[randIndex] - h
            weights = weights + alpha * error * dataMatrix[randIndex]
            del(dataIndex[randIndex])
    return weights

def classifyVector(inX, weights):
    prob = sigmoid(sum(inX*weights))
    if prob > 0.5: return 1.0
    else: return 0.0

def colicTest():
    frTrain = open(horseColicTraining.txt); frTest = open(horseColicTest.txt)
    trainingSet = []; trainingLabels = []
    for line in frTrain.readlines():
        currLine = line.strip().split(\t)
        lineArr =[]
        for i in range(21):
            lineArr.append(float(currLine[i]))
        trainingSet.append(lineArr)
        trainingLabels.append(float(currLine[21]))
    trainWeights = stocGradAscent1(array(trainingSet), trainingLabels, 1000)
    errorCount = 0; numTestVec = 0.0
    for line in frTest.readlines():
        numTestVec += 1.0
        currLine = line.strip().split(\t)
        lineArr =[]
        for i in range(21):
            lineArr.append(float(currLine[i]))
        if int(classifyVector(array(lineArr), trainWeights))!= int(currLine[21]):
            errorCount += 1
    errorRate = (float(errorCount)/numTestVec)
    print "the error rate of this test is: %f" % errorRate
    return errorRate

def multiTest():
    numTests = 10; errorSum=0.0
    for k in range(numTests):
        errorSum += colicTest()
    print "after %d iterations the average error rate is: %f" % (numTests, errorSum/float(numTests))
        

相关博客:

http://www.hankcs.com/ml/the-logistic-regression-and-the-maximum-entropy-model.html

https://www.douban.com/note/323644915/

http://www.cnblogs.com/zhizhan/p/5007540.html

逻辑回归(logistic regression)