0


机器学习-基于朴素贝叶斯的垃圾邮件分类

概率论是许多机器学习算法的基础,此篇博客会给出一些使用概率论进行分类的方法。

首先从一个最简单的概率分类器开始,然后给出一些假设来学习朴素贝叶斯分类器。我们称之为“朴素”,是因为整个形式化过程只做最原始、最简单的假设。

我们还将构建另一个分类器,观察其在真实的垃圾邮件数据集中的过滤效果。

一、基于贝叶斯决策理论的分类方法

朴素贝叶斯是贝叶斯决策理论的一部分,所以讲述朴素贝叶斯之前有必要快速了解一下贝叶斯决策理论。

我们用之前划分的数据来简单理解一下贝叶斯决策理论的核心思想。

如上图,我们现在用p1(x,y,z)表示数据点(x,y,z)属于类别1(图中红色圆点表示的类别)的概率,用p2(x,y,z)表示数据点(x,y,z)属于类别2(图中蓝色三角形表示的类别)的概率,那么对于一个新数据点(x0,y0,z0),可以用下面的规则来判断它的类别:

  • 如果p1(x0,y0,z0)>p2(x0,y0,z0),那么数据点(x0,y0,z0)的类别为1
  • 如果p1(x0,y0,z0)<p2(x0,y0,z0),那么数据点(x0,y0,z0)的类别为2

也就是说,我们会选择高概率对应的类别。这就是贝叶斯决策理论的核心思想,即选择具有最高概率的决策。

那么接下来,就是学习如何计算p1和p2概率。

条件概率

相信大家对条件概率都不陌生,就是指:设A,B是两个事件,且P(B)>0,则在事件B发生的条件下,事件A发生的条件概率为:

P(A|B)=\frac{P(AB)}{P(B)}

P(AB)=P(A|B)\cdot P(B)=P(B|A)\cdot P(A)

我们对上面的公式做推导,可得到贝叶斯公式:

P(A|B)= P(A)\cdot\frac{P(B|A)}{P(B)}

  • P(A) 称为”先验概率”,即在B事件发生之前,我们对A事件概率的一个判断。如:正常收到一封邮件,该邮件为垃圾邮件的概率就是“先验概率”
  • P(A|B)称为”后验概率”, 即在B事件发生之后,我们对A事件概率的重新评估。如:邮件中含有“中奖”这个词,该邮件为垃圾邮件的概率就是“后验概率”

全概率公式

除了条件概率以外,在计算p1和p2的时候,还要用到全概率公式,因此,这里继续推导全概率公式。假定样本空间S,是两个事件A与A’的和。根据前面推到的条件概率公式可得:

P(B)=P(B|A)\cdot P(A)+P(B|A')\cdot P(A')

如果A和A’构成样本空间的一个划分,那么事件B的概率,就等于A和A’的概率分别乘以B对这两个事件的条件概率之和。

贝叶斯公式就变成:

P(A|B)=\frac{P(AB)}{P(B)}=\frac{P(B|A)\cdot P(A)}{P(B|A)\cdot P(A)+P(B|A')\cdot P(A')}

贝叶斯公式

由条件概率的定义以及全概率公式即得贝叶斯公式:

P(B_{i}|A)=\frac{P(B_{i}A)}{P(A)}=\frac{P(A|B_{i})P(B_{i})}{\sum_{j=1}^{n}P(A|B_{j})P(B_{j})},i=1,2,...,n

贝叶斯推断

前面我们提到,将

P(A)称为"先验概率"(Prior probability

P(A|B)称为"后验概率"(Posterior probability)

P(B|A)/P(B)称为"可能性函数"(Likelyhood)

所以,条件概率可以理解为:后验概率 = 先验概率 x 调整因子

这就是贝叶斯推断的含义。我们先预估一个"先验概率",然后加入实验结果,看这个实验到底是增强还是削弱了"先验概率",由此得到更接近事实的"后验概率"。

二、python实现

数据集

本次实验中,所采用的数据集为Enron Email Dataset。该数据集已经对正常邮件和垃圾邮件进行了分类。由于该数据集是真实的电子邮件数据集,因此它包含真实的垃圾邮件,杀毒软件可能会对其中部分的邮件进行删除,因此进行本次实验时请暂时关闭杀毒软件。

导包

import os
import re
import string
import math

读取数据

DATA_DIR = 'enron'
target_names = ['ham', 'spam']
def get_data(DATA_DIR):
    subfolders = ['enron%d' % i for i in range(1,7)]
    data = []
    target = []
    for subfolder in subfolders:
        # spam
        spam_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'spam'))
        for spam_file in spam_files:
            with open(os.path.join(DATA_DIR, subfolder, 'spam', spam_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(1)
        # ham
        ham_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'ham'))
        for ham_file in ham_files:
            with open(os.path.join(DATA_DIR, subfolder, 'ham', ham_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(0)
    return data, target
 
X, y = get_data(DATA_DIR)#读取数据

数据预处理

class SpamDetector_1(object):
    #清除标点符号
    def clean(self, s):
        translator = str.maketrans("", "", string.punctuation)
        return s.translate(translator)
    #将字符串标记为单词
    def tokenize(self, text):
        text = self.clean(text).lower()
        return re.split("\W+", text)
    #计算某个单词出现的次数
    def get_word_counts(self, words):
        word_counts = {}
        for word in words:
            word_counts[word] = word_counts.get(word, 0.0) + 1.0
        return word_counts
class SpamDetector_2(SpamDetector_1):
    # X:data,Y:target标签(垃圾邮件或正常邮件)
    def fit(self, X, Y):
        self.num_messages = {}
        self.log_class_priors = {}
        self.word_counts = {}
        # 建立一个集合存储所有出现的单词
        self.vocab = set()
        # 统计spam和ham邮件的个数
        self.num_messages['spam'] = sum(1 for label in Y if label == 1)
        self.num_messages['ham'] = sum(1 for label in Y if label == 0)
 
        # 计算先验概率,即所有的邮件中,垃圾邮件和正常邮件所占的比例
        self.log_class_priors['spam'] = math.log(
            self.num_messages['spam'] / (self.num_messages['spam'] + self.num_messages['ham']))
        self.log_class_priors['ham'] = math.log(
            self.num_messages['ham'] / (self.num_messages['spam'] + self.num_messages['ham']))
 
        self.word_counts['spam'] = {}
        self.word_counts['ham'] = {}
 
        for x, y in zip(X, Y):
            c = 'spam' if y == 1 else 'ham'
            # 构建一个字典存储单封邮件中的单词以及其个数
            counts = self.get_word_counts(self.tokenize(x))
            for word, count in counts.items():
                if word not in self.vocab:
                    self.vocab.add(word)#确保self.vocab中含有所有邮件中的单词
                # 下面语句是为了计算垃圾邮件和非垃圾邮件的词频,即给定词在垃圾邮件和非垃圾邮件中出现的次数。
                # c是0或1,垃圾邮件的标签
                if word not in self.word_counts[c]:
                    self.word_counts[c][word] = 0.0
                self.word_counts[c][word] += count

# 可以利用下面的语句进行debug,判断是否运行正确,若正确,log_class_priors of spam应该为-0.6776,log_class_priors of ham应该为-0.7089。
# 我们选取了第100封之后的邮件作为训练集,前面一百封邮件作为测试集。
'''
MNB = SpamDetector_2()
# 选取了第100封之后的邮件作为训练集,前面一百封邮件作为测试集
MNB.fit(X[100:], y[100:])
 
# print("log_class_priors of spam", MNB.log_class_priors['spam']) #-0.6776
# print("log_class_priors of ham", MNB.log_class_priors['ham']) #-0.7089
'''

测试

class SpamDetector(SpamDetector_2):
    def predict(self, X):
        result = []
        flag_1 = 0
        # 遍历所有的测试集
        for x in X:
            counts = self.get_word_counts(self.tokenize(x))  # 生成可以记录单词以及该单词出现的次数的字典
            spam_score = 0
            ham_score = 0
            flag_2 = 0
            for word, _ in counts.items():
                if word not in self.vocab: continue
 
                #下面计算P(内容|垃圾邮件)和P(内容|正常邮件),所有的单词都要进行拉普拉斯平滑
                else:
                    # 该单词存在于正常邮件的训练集和垃圾邮件的训练集当中
                    if word in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # 该单词存在于垃圾邮件的训练集当中,但不存在于正常邮件的训练集当中
                    if word in self.word_counts['spam'].keys() and word not in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log( 1 / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # 该单词存在于正常邮件的训练集当中,但不存在于垃圾邮件的训练集当中
                    if word not in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log( 1 / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
 
                # 把计算到的P(内容|垃圾邮件)和P(内容|正常邮件)加起来
                spam_score += log_w_given_spam
                ham_score += log_w_given_ham
 
                flag_2 += 1
 
                # 最后,还要把先验加上去,即P(垃圾邮件)和P(正常邮件)
                spam_score += self.log_class_priors['spam']
                ham_score += self.log_class_priors['ham']
 
            # 最后进行预测,如果spam_score > ham_score则标志为1,即垃圾邮件
            if spam_score > ham_score:
                result.append(1)
            else:
                result.append(0)
                
            flag_1 += 1
 
        return result
MNB = SpamDetector()
MNB.fit(X[100:], y[100:])
pred = MNB.predict(X[:100])
true = y[:100]
 
accuracy = 0
for i in range(100):
    if pred[i] == true[i]:
        accuracy += 1
print(accuracy) # 0.98

整体代码

import os
import re
import string
import math
DATA_DIR = 'enron'
target_names = ['ham', 'spam']
def get_data(DATA_DIR):
    subfolders = ['enron%d' % i for i in range(1,7)]
    data = []
    target = []
    for subfolder in subfolders:
        # spam
        spam_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'spam'))
        for spam_file in spam_files:
            with open(os.path.join(DATA_DIR, subfolder, 'spam', spam_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(1)
        # ham
        ham_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'ham'))
        for ham_file in ham_files:
            with open(os.path.join(DATA_DIR, subfolder, 'ham', ham_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(0)
    return data, target
 
X, y = get_data(DATA_DIR)
 
class SpamDetector_1(object):
    """Implementation of Naive Bayes for binary classification"""
    #清除空格
    def clean(self, s):
        translator = str.maketrans("", "", string.punctuation)
        return s.translate(translator)
    #分开每个单词
    def tokenize(self, text):
        text = self.clean(text).lower()
        return re.split("\W+", text)
    #计算某个单词出现的次数
    def get_word_counts(self, words):
        word_counts = {}
        for word in words:
            word_counts[word] = word_counts.get(word, 0.0) + 1.0
        return word_counts
 
class SpamDetector_2(SpamDetector_1):
    # X:data,Y:target标签(垃圾邮件或正常邮件)
    def fit(self, X, Y):
        self.num_messages = {}
        self.log_class_priors = {}
        self.word_counts = {}
        # 建立一个集合存储所有出现的单词
        self.vocab = set()
        # 统计spam和ham邮件的个数
        self.num_messages['spam'] = sum(1 for label in Y if label == 1)
        self.num_messages['ham'] = sum(1 for label in Y if label == 0)
 
        # 计算先验概率,即所有的邮件中,垃圾邮件和正常邮件所占的比例
        self.log_class_priors['spam'] = math.log(
            self.num_messages['spam'] / (self.num_messages['spam'] + self.num_messages['ham']))
        self.log_class_priors['ham'] = math.log(
            self.num_messages['ham'] / (self.num_messages['spam'] + self.num_messages['ham']))
 
        self.word_counts['spam'] = {}
        self.word_counts['ham'] = {}
 
        for x, y in zip(X, Y):
            c = 'spam' if y == 1 else 'ham'
            # 构建一个字典存储单封邮件中的单词以及其个数
            counts = self.get_word_counts(self.tokenize(x))
            for word, count in counts.items():
                if word not in self.vocab:
                    self.vocab.add(word)#确保self.vocab中含有所有邮件中的单词
                # 下面语句是为了计算垃圾邮件和非垃圾邮件的词频,即给定词在垃圾邮件和非垃圾邮件中出现的次数。
                # c是0或1,垃圾邮件的标签
                if word not in self.word_counts[c]:
                    self.word_counts[c][word] = 0.0
                self.word_counts[c][word] += count
 
MNB = SpamDetector_2()
MNB.fit(X[100:], y[100:])
 
class SpamDetector(SpamDetector_2):
    def predict(self, X):
        result = []
        flag_1 = 0
        # 遍历所有的测试集
        for x in X:
            counts = self.get_word_counts(self.tokenize(x))  # 生成可以记录单词以及该单词出现的次数的字典
            spam_score = 0
            ham_score = 0
            flag_2 = 0
            for word, _ in counts.items():
                if word not in self.vocab: continue
 
                #下面计算P(内容|垃圾邮件)和P(内容|正常邮件),所有的单词都要进行拉普拉斯平滑
                else:
                    # 该单词存在于正常邮件的训练集和垃圾邮件的训练集当中
                    if word in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # 该单词存在于垃圾邮件的训练集当中,但不存在于正常邮件的训练集当中
                    if word in self.word_counts['spam'].keys() and word not in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log( 1 / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # 该单词存在于正常邮件的训练集当中,但不存在于垃圾邮件的训练集当中
                    if word not in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log( 1 / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
 
                # 把计算到的P(内容|垃圾邮件)和P(内容|正常邮件)加起来
                spam_score += log_w_given_spam
                ham_score += log_w_given_ham
 
                flag_2 += 1
 
                # 最后,还要把先验加上去,即P(垃圾邮件)和P(正常邮件)
                spam_score += self.log_class_priors['spam']
                ham_score += self.log_class_priors['ham']
 
            # 最后进行预测,如果spam_score > ham_score则标志为1,即垃圾邮件
            if spam_score > ham_score:
                result.append(1)
            else:
                result.append(0)
 
            flag_1 += 1
 
        return result
 
MNB = SpamDetector()
MNB.fit(X[100:], y[100:])
pred = MNB.predict(X[:100])
true = y[:100]
 
accuracy = 0
for i in range(100):
    if pred[i] == true[i]:
        accuracy += 1
print(accuracy) # 0.98

测试集分类的正确率达到98%!


本文转载自: https://blog.csdn.net/m0_52053228/article/details/128061773
版权归原作者 库里不会写代码 所有, 如有侵权,请联系我们删除。

“机器学习-基于朴素贝叶斯的垃圾邮件分类”的评论:

还没有评论