pytorch学习13-使用RNN优化流量预测


前面《pytorch学习10-流量预测实战》对流量的预测,停留在使用稠密网络(DNN),输入是当时影响流量的特征值,实质上是计算出来一种非线性空间变换,以此来推理出当时的流量。

这其实有一个很大的矛盾,我都能获取当时的特征了,自然也能获取当时的流量。所以推理出的流量只能用于和实际流量对比,并不能实现”预测”的能力。

学习循环神经网络后,通过把历史数据做成序列化数据喂给RNN模型,输出的数据受前一个序列影响,自然想到可以利用RNN来实现真正的”预测”能力。

更新1:看了下李沐老师的教程,发现之前的预测方式有点问题。RNN分为单步预测(one-step-ahead prediction)、和”K步预测”。

单步预测往往比较精确,而K步预测经过几个预测步骤之后,预测的结果很快就会衰减到一个常数,这是因为错误的积累(可以参考1.01^365)。例如,未来24小时的天气预报往往相当准确, 但超过这个时间,精度就会迅速下降。

针对这一点,我暂时想出的方法是将输入维度由1改为144(即6 * 24),输出维度也相应改为144,即一天的数据量。这样仅仅是增大训练的数据量就行了。同时为了减少预测数据滞后于真实数据,利用流量数据的周期性,计算出当前时间步过去一天的流量均值加入特征。

RNN模型建立

RNN天然适用于序列数据建模,这里我选用GRU,num_layers=3,bidirectional=False。在最后时刻输出的隐藏状态hn的基础上,使用一个全连接得到预测输出。模型如下:

class RNNModel(torch.nn.Module):
    def __init__(self, input_size=INPUT_SIZE, hidden_size=HIDDEN_SIZE):
        super(RNNModel, self).__init__()
        self.gru = torch.nn.GRU(input_size=input_size,
                                hidden_size=hidden_size, batch_first=True, num_layers=NUM_LAYERS)
        self.ln = torch.nn.LayerNorm(HIDDEN_SIZE)
        self.relu = torch.nn.ReLU()
        self.linear = torch.nn.Linear(hidden_size, OUTPUT_SIZE)


    def forward(self, x, hidden):                      # x.shape(batch_size, seq_len, input_size)
        # hidden = torch.zeros(NUM_LAYERS, x.shape[0], HIDDEN_SIZE)  # out.shape(b,s,h)  hidden.shape(num_layers, batch_size, hidden_size)
        out, hidden = self.gru(x, hidden)
        out = self.ln(out)
        output = out[:,-1,:]  # =hidden[-1]
        output = self.linear(self.relu(output))
        return output, hidden

数据采集与处理

数据处理方式差不多,只是需要把输入x的维度调整为(batch_size, seq_len, input_size),这里的batch_size注意和DataLoader里的batch_size不是一回事,RNN的batch_size指样本数量。假设某天的数据受前7天的影响,所以序列长度seq_len取7。

class RNNTrainDataset(Dataset):
    def __init__(self, np_data):
        # 生成序列数据
        x, y = self.extract_data(np_data)

        self.x_data = torch.from_numpy(x)  # shape(n, seq_len, INPUT_SIZE)
        self.y_data = torch.from_numpy(y).reshape(-1, OUTPUT_SIZE)    #shape(n, OUTPUT_SIZE)

        # 获得reshape后数据集行数n, 即batch_size
        self.len = self.x_data.shape[0]
    
    @staticmethod
    def extract_data(np_data, seq_len=SEQ_LEN):  # 序列为7天的数据,取x:7 , y:1
        x, y = [], []
        for i in range(len(np_data) - seq_len):
            x.append(np_data[i:i+seq_len])
            y.append(np_data[i+seq_len])
        return np.array(x), np.array(y)

    # 获得索引方法
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # 获得数据集长度
    def __len__(self):
        return self.len

训练

训练还是那3板斧,只是优化器换成Adam

整体代码

import os
import datetime
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from django.db.models.query import QuerySet
from .models import ThpDataset

# plt.style.use('seaborn-whitegrid')
# 显示中文
plt.rcParams['font.sans-serif'] = 'SimHei'
plt.rcParams['axes.unicode_minus'] = False

#RNN模型参数
INPUT_SIZE = 144
HIDDEN_SIZE = 288
OUTPUT_SIZE = 144
SEQ_LEN = 7
NUM_LAYERS = 3

#RNN训练参数
EPOCH = 1000
LR = 0.001
RATIO = 0.87

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


def load_data(thpdataset_lines):
    # 数据载入预处理
    assert isinstance(thpdataset_lines, QuerySet), '原始数据需传入QuerySet'
    # 组装原始数据,列表
    src_data = [thpdataset_line.thp for thpdataset_line in thpdataset_lines]

    # 列表转换为数组
    np_data = np.array(src_data, dtype=np.float32)
    np_data = np_data[-(np_data.shape[0]//INPUT_SIZE)*INPUT_SIZE:]  #取整
    
    # 一维数组数组转换为矩阵, (n,144)
    np_data = np_data.reshape(-1, INPUT_SIZE)

    return np_data


def normalize_data(np_data):
    # 计算训练数据集的最大最小值
    max, min = np_data.max(axis=0), np_data.min(axis=0)
    # 记录训练数据的归一化参数,在预测时对数据做反归一化
    np.savez('deeplearn/rnn_train_param.npz', max=max, min=min)
    # 对数据进行归一化处理
    norm_data = (np_data - min) / (max - min)

    return norm_data, max, min


class RNNTrainDataset(Dataset):
    def __init__(self, np_data):
        # 生成序列数据
        x, y = self.extract_data(np_data)

        self.x_data = torch.from_numpy(x)  # shape(n, seq_len, INPUT_SIZE)
        self.y_data = torch.from_numpy(y).reshape(-1, OUTPUT_SIZE)    #shape(n, OUTPUT_SIZE)

        # 获得reshape后数据集行数n, 即batch_size
        self.len = self.x_data.shape[0]
    
    @staticmethod
    def extract_data(np_data, seq_len=SEQ_LEN):  # 序列为7天的数据,取x:7 , y:1
        x, y = [], []
        for i in range(len(np_data) - seq_len):
            x.append(np_data[i:i+seq_len])
            y.append(np_data[i+seq_len])
        return np.array(x), np.array(y)

    # 获得索引方法
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    # 获得数据集长度
    def __len__(self):
        return self.len


class RNNModel(torch.nn.Module):
    def __init__(self, input_size=INPUT_SIZE, hidden_size=HIDDEN_SIZE):
        super(RNNModel, self).__init__()
        self.gru = torch.nn.RNN(input_size=input_size,
                                hidden_size=hidden_size, batch_first=True, num_layers=NUM_LAYERS)
        self.ln = torch.nn.LayerNorm(HIDDEN_SIZE)
        self.relu = torch.nn.ReLU()
        self.linear = torch.nn.Linear(hidden_size, OUTPUT_SIZE)


    def forward(self, x, hidden):                      # x.shape(batch_size, seq_len, input_size)
        # hidden = torch.zeros(NUM_LAYERS, x.shape[0], HIDDEN_SIZE)  # out.shape(b,s,h)  hidden.shape(num_layers, batch_size, hidden_size)
        out, hidden = self.gru(x, hidden)
        out = self.ln(out)
        output = out[:,-1,:]  # =hidden[-1]
        output = self.linear(self.relu(output))
        return output, hidden


# 模型
model = RNNModel()
# 损失函数
criterion = torch.nn.MSELoss()
# 优化器
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
# 学习率调整器 adam不需手动调整学习率
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
#     optimizer=optimizer, verbose=True)


def train():
    # 读取训练数据并划分训练集和测试集
    now = datetime.datetime.now()
    start = now - datetime.timedelta(days=90)
    thpdataset_lines = ThpDataset.objects.filter(
        created_at__gte=start, created_at__lte=now)
    
    np_data = load_data(thpdataset_lines)
    norm_data, max, min = normalize_data(np_data)

    offset = int(len(norm_data)*RATIO)
    train_data, test_data = norm_data[:offset], norm_data[offset:]
    train_dataset = RNNTrainDataset(norm_data)
    test_dataset = RNNTrainDataset(test_data) 

    train_loader = DataLoader(
        dataset=train_dataset, batch_size=32, shuffle=True, num_workers=3, pin_memory=True)
    test_loader = DataLoader(dataset=test_dataset,
                             batch_size=32, shuffle=False, num_workers=3, pin_memory=True)
    x, y = test_dataset[:]
    print(f'[{now}]RNN训练开始,seq训练集长度为:{len(train_dataset)}')
    # 加载训练模型
    # if os.path.exists('deeplearn/rnn_model_param3.pkl'):
    #     model.load_state_dict(torch.load('deeplearn/rnn_model_param3.pkl'))
    train_loss, test_loss = [], []
    model.to(device)
    for epoch in range(EPOCH):
        model.train()
        for i, data in enumerate(train_loader, 0):
            # 准备数据dataloader会将按batch_size返回的数据整合成矩阵加载
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            hidden = torch.zeros(NUM_LAYERS, inputs.shape[0], HIDDEN_SIZE, device=device)
            # 前馈
            y_pred, hidden = model(inputs, hidden)
            loss = criterion(y_pred, labels)
            # if epoch % 100 == 0:
            #     print(epoch, i, loss.item())
            # 反向传播求梯度
            optimizer.zero_grad()
            loss.backward()
            # 更新
            optimizer.step()
        train_loss.append(loss.item())
        # 调整学习率
        # scheduler.step(loss.item())

        # 测试
        model.eval()
        hidden = torch.zeros(NUM_LAYERS, x.shape[0], HIDDEN_SIZE, device=device)
        with torch.no_grad():
            x, y = x.to(device), y.to(device)
            y_pred, _ = model(x, hidden)  
            loss = criterion(y_pred, y)
            test_loss.append(loss.item())

    # 保存模型参数
    model.to(torch.device('cpu'))
    torch.save(model.state_dict(), 'deeplearn/rnn_model_param3.pkl')
    print(f'[{datetime.datetime.now()}]RNN模型保存完毕')
    
    # 对测试结果做反归一化处理
    print(f'RNN测试开始,seq测试集长度为:{len(test_dataset)}')
    y_pred = y_pred.cpu().numpy() * (max - min) + min
    y = y.cpu().numpy() * (max - min) + min

    y_pred = y_pred*8/1000/1000/1000
    y = y*8/1000/1000/1000

    plt.subplot(2, 1, 1)
    plt.grid(True)
    plt.title('RNN训练数据')
    plt.plot(y.flatten(), label='实际流量')
    plt.plot(y_pred.flatten(), label='预测流量')

    plt.ylabel('流量,Gb/s')
    plt.legend()

    plt.subplot(2, 1, 2)
    plt.grid(True)
    plt.plot(train_loss, label='train_loss')
    plt.plot(test_loss, label='test_loss')
    plt.ylabel('误差值')
    plt.legend()
    plt.savefig('deeplearn/rnn_test.png')
    plt.close()

微信截图_20221018101816