pytorch学习10-流量预测实战


理论结合实际,目前做了一个神经网络在流量预测和告警分析中的应用。利用预测流量与实际流量对比,可以分析出是否发生故障以及预知流量发展趋势,从而提前扩容等等等等。

以我们某个机房业务为例:

模型建立

考虑影响互联网入口流量的影响因素,基本确定以下6点:

  1. 在线设备数量
  2. 对象存储利用率
  3. 上上一刻流量值
  4. 上一刻流量值
  5. 当前时间(当天的时间戳)
  6. 当前weekday(周几)

后来发现weekday影响不大就去掉了。

采用3层全连接神经网络,输入维度为5,隐含层维度为10,输出为1,其中引入ReLU激活函数,用于增强网络非线性能力。

更新1: 增加一个BN层做标准化(虽然对不深的神经网络没啥用)

class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.linear1 = torch.nn.Linear(5, 10)
        self.bn1 = torch.nn.BatchNorm1d(10)
        self.linear2 = torch.nn.Linear(10, 1)
        # self.linear3 = torch.nn.Linear(2, 1)
        self.relu = torch.nn.ReLU()
        

    def forward(self, x):
        x = self.relu(self.bn1(self.linear1(x)))
        x = self.relu(self.linear2(x))
        # x = self.sigmoid(self.linear3(x))
        return x

数据采集与处理

其实我感觉深度学习数据集这一块才是重点。。。模型和训练套路都差不多

数据采集用一个定时任务实现,这部分不细说了。

数据处理使用mini-batch,继承Dataset实现。

  1. 从数据库中读取数据
  2. 按设计模型组装数据
  3. 归一化处理
def load_data(thpdataset_lines, train=True):
    # 数据载入预处理
    assert isinstance(thpdataset_lines, QuerySet), '原始数据需传入QuerySet'
    # 组装原始数据,列表
    src_data = []
    for thpdataset_line in thpdataset_lines:
        src_data.append(thpdataset_line.device_num)
        src_data.append(thpdataset_line.dxcc_usage)
        src_data.append(thpdataset_line.thp_2)
        src_data.append(thpdataset_line.thp_1)
        src_data.append(thpdataset_line.created_at.hour*3600 +
                        thpdataset_line.created_at.minute*60 + thpdataset_line.created_at.second)  # 当天的时间戳
        src_data.append(thpdataset_line.thp)

    # 列表转换为数组
    np_data = np.array(src_data, dtype=np.float32)
    feature_names = ['device_num', 'dxcc_usage',
                     'thp_2', 'thp_1', 'hour', 'thp']

    # 一维数组数组转换为矩阵, N行6列
    np_data = np_data.reshape(-1, len(feature_names))

    if train:
        # 划分训练测试集
        offset = int(len(np_data)*RATIO)
        train_data, test_data = np_data[:offset], np_data[offset:]
            
        # 计算训练数据集的最大最小值
        max, min = train_data.max(axis=0), train_data.min(axis=0)
        # 记录训练数据的归一化参数,在预测时对数据做反归一化
        np.savez('deeplearn/st_train_param.npz', max=max, min=min)

        # 对数据进行归一化处理
        np_data = (np_data - min) / (max - min)
        train_data, test_data = np_data[:offset], np_data[offset:]

        return train_data, test_data, max, min
    else:
        # 加载训练数据的最大最小值
        train_param = np.load('deeplearn/st_train_param.npz')
        max, min = train_param['max'], train_param['min']
        # 对数据进行归一化处理
        np_data = (np_data - min) / (max - min)

        return np_data, max, min


class TrainDataset(Dataset):
    def __init__(self, np_data):
        self.x_data = torch.from_numpy(np_data[:, :-1])     # -1列不取
        self.y_data = torch.from_numpy(np_data[:, [-1]])    # 取-1列(矩阵)

        # 获得数据集行数
        self.len = np_data.shape[0]

    # 获得索引方法
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # 获得数据集长度
    def __len__(self):
        return self.len

训练

4.按训练数据比例分配数据

5.开始训练

6.最后保存模型参数,通过matplotlib画图

整体代码

import os
import datetime
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from django.db.models.query import QuerySet
from .models import ThpDataset


# plt.style.use('seaborn-whitegrid')
# 显示中文
plt.rcParams['font.sans-serif'] = 'SimHei'
plt.rcParams['axes.unicode_minus'] = False

# 训练参数
EPOCH = 1000
LR = 0.001
RATIO = 0.9

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


def load_data(thpdataset_lines, train=True):
    # 数据载入预处理
    assert isinstance(thpdataset_lines, QuerySet), '原始数据需传入QuerySet'
    # 组装原始数据,列表
    src_data = []
    for thpdataset_line in thpdataset_lines:
        src_data.append(thpdataset_line.device_num)
        src_data.append(thpdataset_line.dxcc_usage)
        src_data.append(thpdataset_line.thp_2)
        src_data.append(thpdataset_line.thp_1)
        src_data.append(thpdataset_line.created_at.hour*3600 +
                        thpdataset_line.created_at.minute*60 + thpdataset_line.created_at.second)  # 当天的时间戳
        src_data.append(thpdataset_line.thp)

    # 列表转换为数组
    np_data = np.array(src_data, dtype=np.float32)
    feature_names = ['device_num', 'dxcc_usage',
                     'thp_2', 'thp_1', 'hour', 'thp']

    # 一维数组数组转换为矩阵, N行6列
    np_data = np_data.reshape(-1, len(feature_names))

    if train:
        # 划分训练测试集
        offset = int(len(np_data)*RATIO)
        train_data, test_data = np_data[:offset], np_data[offset:]
            
        # 计算训练数据集的最大最小值
        max, min = train_data.max(axis=0), train_data.min(axis=0)
        # 记录训练数据的归一化参数,在预测时对数据做反归一化
        np.savez('deeplearn/st_train_param.npz', max=max, min=min)

        # 对数据进行归一化处理
        np_data = (np_data - min) / (max - min)
        train_data, test_data = np_data[:offset], np_data[offset:]

        return train_data, test_data, max, min
    else:
        # 加载训练数据的最大最小值
        train_param = np.load('deeplearn/st_train_param.npz')
        max, min = train_param['max'], train_param['min']
        # 对数据进行归一化处理
        np_data = (np_data - min) / (max - min)

        return np_data, max, min


class TrainDataset(Dataset):
    def __init__(self, np_data):
        self.x_data = torch.from_numpy(np_data[:, :-1])     # -1列不取
        self.y_data = torch.from_numpy(np_data[:, [-1]])    # 取-1列(矩阵)

        # 获得数据集行数
        self.len = np_data.shape[0]

    # 获得索引方法
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # 获得数据集长度
    def __len__(self):
        return self.len


class Model(torch.nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.linear1 = torch.nn.Linear(5, 10)
        self.bn1 = torch.nn.BatchNorm1d(10)
        self.linear2 = torch.nn.Linear(10, 1)
        # self.linear3 = torch.nn.Linear(2, 1)   #隐藏层数和大小也是超参数
        self.relu = torch.nn.ReLU()
        

    def forward(self, x):
        x = self.bn1(self.linear1(x))
        x = self.relu(x)
        x = self.relu(self.linear2(x))
        # x = self.sigmoid(self.linear3(x))
        return x


# 模型
model = Model()
# 损失函数
criterion = torch.nn.MSELoss()
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
# 学习率调整器
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer=optimizer, verbose=True)


def train():
    # 读取训练数据并划分训练集和测试集
    now = datetime.datetime.now()
    start = now - datetime.timedelta(days=30)
    thpdataset_lines = ThpDataset.objects.filter(
        created_at__gte=start, created_at__lte=now, perf_name='水土')

    train_data, test_data, max, min = load_data(thpdataset_lines, train=True)

    train_dataset = TrainDataset(train_data)
    test_dataset = TrainDataset(test_data)

    train_loader = DataLoader(
        dataset=train_dataset, batch_size=128, shuffle=True, num_workers=3, pin_memory=True)
    test_loader = DataLoader(dataset=test_dataset, batch_size=128,
                             shuffle=False, num_workers=3, pin_memory=True)

    print(f'[{now}]训练开始, 训练集长度为:{len(train_dataset)}, 测试集长度为:{len(test_dataset)}')
    # 加载训练模型
    # if os.path.exists('deeplearn/st_model_param.pkl'):
    #     model.load_state_dict(torch.load('deeplearn/st_model_param.pkl'))
    train_loss_list, test_loss_list = [], []
    model.to(device)
    for epoch in range(EPOCH):
        model.train()
        train_loss = 0
        for i, data in enumerate(train_loader, 0):
            # 准备数据dataloader会将按batch_size返回的数据整合成矩阵加载
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            # 前馈
            y_pred = model(inputs)
            loss = criterion(y_pred, labels)
            train_loss += loss.item()
            # if epoch % 100 == 0:
            #     print(epoch, i, loss.item())
            # 反向传播求梯度
            optimizer.zero_grad()
            loss.backward()
            # 更新
            optimizer.step()

        train_loss /= len(train_loader)
        train_loss_list.append(train_loss)
        # 调整学习率
        # scheduler.step(loss.item())

        # 测试
        model.eval()
        with torch.no_grad():
            test_loss = 0
            for i, data in enumerate(test_loader, 0):
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                y_pred = model(inputs)
                test_loss += criterion(y_pred, labels).item()

            test_loss /= len(test_loader)
            test_loss_list.append(test_loss)
            # print(epoch, i, train_loss,test_loss)

        if (epoch+1) % 100 == 0:
            print(f'epoch:{epoch+1}/{EPOCH}, train_loss:{train_loss}, test_loss:{test_loss}')

    # 保存模型参数
    model.to(torch.device('cpu'))
    torch.save(model.state_dict(), 'deeplearn/st_model_param.pkl')
    print(f'[{datetime.datetime.now()}]模型保存完毕!')

    x, y = test_dataset[:]
    with torch.no_grad():
        y_pred = model(x)
    # 对测试结果做反归一化处理
    y_pred = y_pred.cpu().numpy() * (max[-1] - min[-1]) + min[-1]
    y = y.cpu().numpy() * (max[-1] - min[-1]) + min[-1]

    y_pred = y_pred*8/1000/1000/1000
    y = y*8/1000/1000/1000
    # print("预测 = ", y_pred)
    # print("实际 = ", y)

    plt.subplot(2, 1, 1)
    plt.grid(True)
    plt.title('训练数据')
    plt.plot(y[:, 0], label='实际流量')
    plt.plot(y_pred[:, 0], label='预测流量')
    plt.ylabel('流量,Gb/s')
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.grid(True)
    plt.plot(train_loss_list, label='train_loss')
    plt.plot(test_loss_list, label='test_loss')
    plt.ylabel('误差值')
    plt.legend()
    plt.savefig('deeplearn/st_test.png')
    plt.close()

微信截图_20220930145322

最后,模型比较简单,损失值最低只能到这个数量级就不收敛了。还有就是待预测数据会作为下次的训练数据,可能存在过拟合的问题。