从AR到SARIMA时间序列预测代码

本文最后更新于 2025年1月15日 晚上

一,AR预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from math import log
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.graphics.tsaplots import plot_pacf,plot_acf
import statsmodels.api as sm
import warnings
warnings.filterwarnings("ignore")
np.set_printoptions(threshold=np.inf) # threshold 指定超过多少使用省略号,np.inf代表无限大
np.set_printoptions(suppress=True) #不以科学计数法输出
plt.rcParams['axes.unicode_minus'] = False #显示负号
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei'] # 散点图标签可以显示中文

data = pd.read_csv('时间序列预测数据集.csv',parse_dates=['Date'])
data.set_index('Date',inplace=True) #将时间设置为行索引
#data = data["1981-01-01":"1982-01-01"]
#################################### 绘制时序图 ############################################
def picture(data):
plt.figure(figsize=(16,16))
for i in range(1,5):
plt.subplot(4,1,i)
df = data[f"198{1+2*(i-1)}-01-01":f"198{3+2*(i-1)}-01-01"]
plt.plot(df.index,df['Temp'])
plt.xticks(rotation=45,size=6)
if i == 1:
plt.title("时序图")
plt.show()
plt.close()
###################################### 时间序列检验 #######################################
def inspect(data):
# 单位根检验-ADF检验
ADF = sm.tsa.stattools.adfuller(data['Temp'])
print('ADF值:', format(ADF[0], '.4f'))
print('拒绝程度值:', ADF[4]) #ADF值需要小于三个拒绝程度值

# 白噪声检验
white_noise = acorr_ljungbox(data['Temp'], lags = [6, 12, 24],boxpierce=True) #lag是需要检验的阶数
print('白噪声检验:\n',white_noise) #LB和BP统计量的P值都小于显著水平(α = 0.05),所以拒绝序列为纯随机序列的原假设,认为该序列为非白噪声序列

fig = plt.figure(figsize=(16,6))

ax1 = fig.add_subplot(1,2,1)
plot_acf(data['Temp'],ax=ax1)
plt.title("自相关图") #需要拖尾,确定q

ax2 = fig.add_subplot(1,2,2)
plot_pacf(data['Temp'],ax=ax2)
plt.title("偏自相关图") #需要截尾,确定p

plt.show()
plt.close()
##################################### 信息准则 ###########################################
def information(df,df0,p):
n = len(df)
mse = mean_squared_error(df,df0)
#aic = n * log(mse) + 2 * p
aicc = log(mse) + (n+p)/(n-p-2)
bic = n * log(mse) + p * log(n)
return aicc,bic

#################################### 模型预测 ##########################################
def AR_prediction(x, p, predict_x_n = 0):
df = np.array(x).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i][-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
print(sigma)
for i in range(h):
df0[p+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1

######################################### 阶数确定 ##############################################
def p_finally(n):
jieguo = []
for i in range(1,n):
df0,df1 = AR_prediction(data,i)
aicc,bic = information(data,df0,i)
jieguo.append([i,aicc,bic])
jieguo_aicc = sorted(jieguo,reverse=False, key=lambda x: x[1]) #以aicc排序
jieguo_bic = sorted(jieguo,reverse=False, key=lambda x: x[2]) #以bic排序

return jieguo_aicc[0][0],jieguo_bic[0][0]

################################# 得到最终结果 ###################################
#images(data)
#inspect(data)

#p_aicc,p_bic = p_finally(30)
#print(p_aicc)
df0,df1 = AR_prediction(data,4,0)

plt.figure()
plt.plot(range(50),data[-50:],c = 'black')
plt.plot(range(50),df0[-50:],c='red')
plt.show()

二,MA预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
np.set_printoptions(threshold=np.inf) # threshold 指定超过多少使用省略号,np.inf代表无限大
np.set_printoptions(suppress=True) #不以科学计数法输出
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['axes.unicode_minus'] = False #显示负号
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei'] # 散点图标签可以显示中文

data = pd.read_csv('时间序列预测数据集.csv',parse_dates=['Date'])
data.set_index('Date',inplace=True) #将时间设置为行索引

def sma(x):
return np.mean(x) #简单移动平均

def wma(x):
x = np.array(x).ravel()
w = np.arange(1,len(x)+1) #生成等差数列
return np.sum(x*w)/(len(x)*(len(x)+1)/2) #加权移动平均(等距)

def ema(x,i):
x = np.array(x).ravel()
if i < 1 and i > 0:
l = len(x)
w = np.logspace(l, 0, num=l, base=(1-i)) #生成等比数列
else:
print('平滑因子范围错误')
return np.sum(x*w)/np.sum(w) #指数移动平均

def ma_prediction(data, q, predict_x_n=0):
########################## 移动平均预测 ################################
df = np.array(data).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,存储真实值以及预测值
for i in range(len(data)-q+1):
df[q+i-1] = ema(data[i:i+q],0.5) #获得简单移动平均获得的预测数列

df_wu = df0 - df #获得误差项

df1 = np.zeros(predict_x_n) #存储预测值
for i in range(predict_x_n):
df1[i] = ema(df0[-q:],0.5) #得到所有预测值
df0 = np.append(df0,df1[i])
############################## 误差项预测 ###############################
def AR_prediction(x, p, predict_x_n = 0): #误差项预测实际就为AR预测,这也是为什么MA模型阶数为0时就转化成AR模型的原因
df = np.array(x[p:]).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(np.multiply(sigma , X[i])) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1
wucha,wucha_predict = AR_prediction(df_wu,q,predict_x_n)
result = df[-len(wucha):] - wucha
predict = df1 - wucha_predict
return result,predict

result,predict = ma_prediction(data, 6, 0)

plt.figure()
plt.plot(range(60),data[-60:],c = 'black')
plt.plot(range(60),result[-60:],c='red')
plt.show()

三,ARMA预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from math import log
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
np.set_printoptions(threshold=np.inf) # threshold 指定超过多少使用省略号,np.inf代表无限大
np.set_printoptions(suppress=True) #不以科学计数法输出
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['axes.unicode_minus'] = False #显示负号
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei'] # 散点图标签可以显示中文

data = pd.read_csv('时间序列预测数据集.csv',parse_dates=['Date'])
data.set_index('Date',inplace=True) #将时间设置为行索引

##################################### 信息准则 ###########################################
def information(df,df0,p):
n = len(df)
mse = mean_squared_error(df,df0)
#aic = n * log(mse) + 2 * p
aicc = log(mse) + (n+p)/(n-p-2)
bic = n * log(mse) + p * log(n)
return aicc,bic
def AR_prediction(x, p, predict_x_n = 0):
df = np.array(x).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1

######################################### 阶数确定 ##############################################
def p_finally(n):
jieguo = []
for i in range(1,n):
df0,df1 = AR_prediction(data,i)
aicc,bic = information(data,df0,i)
jieguo.append([i,aicc,bic])
jieguo_aicc = sorted(jieguo,reverse=False, key=lambda x: x[1]) #以aicc排序
jieguo_bic = sorted(jieguo,reverse=False, key=lambda x: x[2]) #以bic排序
return jieguo_aicc[0][0],jieguo_bic[0][0]

def sma(x):
return np.mean(x) #简单移动平均

def wma(x):
x = np.array(x).ravel()
w = np.arange(1,len(x)+1) #生成等差数列
return np.sum(x*w)/(len(x)*(len(x)+1)/2) #加权移动平均(等距)

def ema(x,i):
x = np.array(x).ravel()
if i < 1 and i > 0:
l = len(x)
w = np.logspace(l, 0, num=l, base=(1-i)) #生成等比数列
else:
print('平滑因子范围错误')
return np.sum(x*w)/np.sum(w) #指数移动平均

def MA_prediction(data, q, predict_x_n=0):
########################## 移动平均预测 ################################
df = np.array(data).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,存储真实值以及预测值
for i in range(len(data)-q+1):
df[q+i-1] = ema(data[i:i+q],0.5) #获得简单移动平均获得的预测数列

df_wu = df0 - df #获得误差项

df1 = np.zeros(predict_x_n) #存储预测值
for i in range(predict_x_n):
df1[i] = ema(df0[-q:],0.5) #得到所有预测值
df0 = np.append(df0,df1[i])
############################## 误差项预测 ###############################
def MA_AR_prediction(x, p, predict_x_n = 0): #误差项预测实际就为AR预测,这也是为什么MA模型阶数为0时就转化成AR模型的原因
df = np.array(x[p:]).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1
wucha,wucha_predict = MA_AR_prediction(df_wu,q,predict_x_n)
return wucha,wucha_predict

def ARMA(p,q):
wucha,wucha_predict = MA_prediction(data, q, 0)
#p_aicc,p_bic = p_finally(10)
df0,df1 = AR_prediction(data,p,0)
result = df0[-len(wucha):] - wucha
return result

result = ARMA(1,6)
plt.figure()
plt.plot(range(60),data[-60:],c = 'black')
plt.plot(range(60),result[-60:],c='red')
plt.show()

print('mse:',mean_squared_error(result,data[-len(result):]))

四,ARIMA预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import copy
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import statsmodels.api as sm
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.graphics.tsaplots import plot_pacf,plot_acf
import warnings
from math import log
from sklearn.metrics import mean_squared_error
warnings.filterwarnings("ignore")
plt.rcParams['axes.unicode_minus'] = False #显示负号
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei'] # 散点图标签可以显示中文

data = pd.read_csv('时间序列预测数据集.csv',parse_dates=['Date'])
data.set_index('Date',inplace=True) #将时间设置为行索引

#################################### 绘制时序图 ############################################
def picture(data):
plt.figure(figsize=(16,16))
for i in range(1,5):
plt.subplot(4,1,i)
df = data[f"198{1+2*(i-1)}-01-01":f"198{3+2*(i-1)}-01-01"]
plt.plot(df.index,df['Temp'].values)
plt.xticks(rotation=45,size=6)
if i == 1:
plt.title("时序图")
plt.show()
plt.close()
###################################### 时间序列检验 #######################################
def inspect(data):
# 单位根检验-ADF检验
ADF = sm.tsa.stattools.adfuller(data['Temp'])
print('ADF值:', format(ADF[0], '.4f'))
print('拒绝程度值:', ADF[4]) #ADF值需要小于三个拒绝程度值

# 白噪声检验
white_noise = acorr_ljungbox(data['Temp'], lags = [6, 12, 24],boxpierce=True) #lag是需要检验的阶数
print('白噪声检验:\n',white_noise) #LB和BP统计量的P值都小于显著水平(α = 0.05),所以拒绝序列为纯随机序列的原假设,认为该序列为非白噪声序列

fig = plt.figure(figsize=(16,6))

ax1 = fig.add_subplot(1,2,1)
plot_acf(data['Temp'],ax=ax1)
plt.title("自相关图") #需要拖尾

ax2 = fig.add_subplot(1,2,2)
plot_pacf(data['Temp'],ax=ax2)
plt.title("偏自相关图") #需要截尾,确定p

plt.show()
plt.close()
##################################### 信息准则 ###########################################
def information(df,df0,p):
n = len(df)
mse = mean_squared_error(df,df0)
#aic = n * log(mse) + 2 * p
aicc = log(mse) + (n+p)/(n-p-2)
bic = n * log(mse) + p * log(n)
return aicc,bic
def AR_prediction(x, p, predict_x_n = 0):
df = np.array(x).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(sigma[1:] * df00[-p:][::-1])+sigma[0] #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1

######################################### 阶数确定 ##############################################
def p_finally(n):
jieguo = []
for i in range(1,n):
df0,df1 = AR_prediction(data,i)
aicc,bic = information(data,df0,i)
jieguo.append([i,aicc,bic])
jieguo_aicc = sorted(jieguo,reverse=False, key=lambda x: x[1]) #以aicc排序
jieguo_bic = sorted(jieguo,reverse=False, key=lambda x: x[2]) #以bic排序
return jieguo_aicc[0][0],jieguo_bic[0][0]

def sma(x):
return np.mean(x) #简单移动平均

def wma(x):
x = np.array(x).ravel()
w = np.arange(1,len(x)+1) #生成等差数列
return np.sum(x*w)/(len(x)*(len(x)+1)/2) #加权移动平均(等距)
def ema(x,i):
x = np.array(x).ravel()
if i < 1 and i > 0:
l = len(x)
w = np.logspace(l, 0, num=l, base=(1-i)) #生成等比数列
else:
print('平滑因子范围错误')
return np.sum(x*w)/np.sum(w) #指数移动平均
def MA_prediction(data, q, predict_x_n=0):
########################## 移动平均预测 ################################
df = np.array(data).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,存储真实值以及预测值
for i in range(len(data)-q+1):
df[q+i-1] = ema(data[i:i+q],0.5) #获得简单移动平均获得的预测数列

df_wu = df0 - df #获得误差项

df1 = np.zeros(predict_x_n) #存储预测值
for i in range(predict_x_n):
df1[i] = ema(df0[-q:],0.5) #得到所有预测值
df0 = np.append(df0,df1[i])
############################## 误差项预测 ###############################
def MA_AR_prediction(x, p, predict_x_n = 0): #误差项预测实际就为AR预测,这也是为什么MA模型阶数为0时就转化成AR模型的原因
df = np.array(x[p:]).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p+1))
for i in range(h): #得到X矩阵
X[i][0] = 1
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(sigma[1:] * df00[-p:][::-1])+sigma[0] #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1
wucha,wucha_predict = MA_AR_prediction(df_wu,q,predict_x_n)
return wucha,wucha_predict
def sum_s(data,df,q,result_predict):
data = np.array(data).ravel()
df = np.array(df).ravel()
result_predict = np.array(result_predict).ravel()
if len(data)-len(df)==1+q:
data0 = np.zeros(len(data))
data0[0] = data[0]
for i in range(len(data)-1):
data0[i+1] = data[i]
df0 = np.zeros(len(df)+1)
for i in range(len(df)):
df0[i+1] = df[i]
asd = data0[-len(df0):]+df0
for i in range(len(result_predict)):
if i == 0:
result_predict[0]=result_predict[0]+asd[-1]
else:
result_predict[i]=result_predict[i]+result_predict[i-1]
print(result_predict)
return asd,result_predict
elif len(data)-len(df)==2+q:
cha1 = np.diff(data)
cha1_1,result_predict = sum_s(cha1,df,q,result_predict)
return sum_s(data,cha1_1,q,result_predict)
elif len(data)-len(df)==3+q:
cha1 = np.diff(data)
cha11 = np.diff(cha1)
cha1_1,result_predict = sum_s(cha11,df,q,result_predict)
cha1_1_1,result_predict = sum_s(cha1,cha1_1,q,result_predict)
return sum_s(data,cha1_1_1,q,result_predict)
def ARIMA(p,q,d,data,predict_n=0):
df_0 = np.array(data).ravel()
df = np.diff(df_0,d)
wucha,wucha_predict = MA_prediction(df, q, predict_n)
#p_aicc,p_bic = p_finally(10)
df0,df1 = AR_prediction(df,p,predict_n)
result = df0[-len(wucha):] - wucha
result_predict = df1-wucha_predict
if d != 0:
result,result_predict = sum_s(df_0,result,q,result_predict)
return result,result_predict

result,result_predict = ARIMA(4,4,0,data,4)

asd = np.append(result,result_predict)
plt.plot(range(len(data[-60:])),data[-60:],c='b')
plt.plot(range(len(result[-63:])),asd[-63:],c='r')
plt.show()

五,SARIMA预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import copy
from math import log
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
np.set_printoptions(threshold=np.inf) # threshold 指定超过多少使用省略号,np.inf代表无限大
np.set_printoptions(suppress=True) #不以科学计数法输出
import warnings
warnings.filterwarnings("ignore")
plt.rcParams['axes.unicode_minus'] = False #显示负号
plt.rcParams['font.family'] = ['sans-serif']
plt.rcParams['font.sans-serif'] = ['SimHei'] # 散点图标签可以显示中文


##################################### 信息准则 ###########################################
def information(df,df0,p):
n = len(df)
mse = mean_squared_error(df,df0)
#aic = n * log(mse) + 2 * p
aicc = log(mse) + (n+p)/(n-p-2)
bic = n * log(mse) + p * log(n)
return aicc,bic

def AR_prediction(x, p, predict_x_n = 0):
df = np.array(x).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p))
for i in range(h): #得到X矩阵
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(np.multiply(sigma , X[i])) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1,sigma
def SAR(x,p,s,predict_x_n=0):
x = np.array(x).ravel()
df0 = x.copy()
Y = x[s*p:].copy()
h = len(x) - p*s
X = np.zeros((h,p))
for t in range(h):
for i in range(1,p+1):
X[t][-i] = x[p*s+t-i*s]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p*s+i] = sum(sigma * X[i]) #得到所有预测的估计值
df00 = x.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1,sigma

######################################### 阶数确定 ##############################################
def p_finally(n):
jieguo = []
for i in range(1,n):
df0,df1 = AR_prediction(data,i)
aicc,bic = information(data,df0,i)
jieguo.append([i,aicc,bic])
jieguo_aicc = sorted(jieguo,reverse=False, key=lambda x: x[1]) #以aicc排序
jieguo_bic = sorted(jieguo,reverse=False, key=lambda x: x[2]) #以bic排序
return jieguo_aicc[0][0],jieguo_bic[0][0]

def MA_prediction(data, q, predict_x_n=0):
def sma(x):
return np.mean(x) #简单移动平均
def wma(x):
x = np.array(x).ravel()
w = np.arange(1,len(x)+1) #生成等差数列
return np.sum(x*w)/(len(x)*(len(x)+1)/2) #加权移动平均(等距)
def ema(x,i):
x = np.array(x).ravel()
if i < 1 and i > 0:
l = len(x)
w = np.logspace(l, 0, num=l, base=(1-i)) #生成等比数列
else:
print('平滑因子范围错误')
return np.sum(x*w)/np.sum(w) #指数移动平均
########################## 移动平均预测 ################################
df = np.array(data).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,存储真实值以及预测值
for i in range(len(df0)-q+1):
df[q+i-1] = ema(df0[i:i+q],0.5) #获得简单移动平均获得的预测数列

df_wu = df0 - df #获得误差项

df1 = np.zeros(predict_x_n) #存储预测值
for i in range(predict_x_n):
df1[i] = ema(df0[-q:],0.5) #得到所有预测值
df0 = np.append(df0,df1[i])
############################## 误差项预测 ###############################
def MA_AR_prediction(x, p, predict_x_n = 0): #误差项预测实际就为AR预测,这也是为什么MA模型阶数为0时就转化成AR模型的原因
df = np.array(x[p:]).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,为了存储预测的数据
Y = df[p:].copy()
h = len(df0)-p
X = np.zeros((h,p))
for i in range(h): #得到X矩阵
for v in range(1,p+1):
X[i,-v] = df[i+v-1]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p+i] = sum(np.multiply(sigma , X[i])) #得到所有预测的估计值
df00 = df.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1,sigma
wucha,wucha_predict,sigma = MA_AR_prediction(df_wu,q,predict_x_n)
return wucha,wucha_predict,sigma,df_wu

def SMA(data,q,s,predict_x_n=0):
def sma(x):
return np.mean(x) #简单移动平均
def wma(x):
x = np.array(x).ravel()
w = np.arange(1,len(x)+1) #生成等差数列
return np.sum(x*w)/(len(x)*(len(x)+1)/2) #加权移动平均(等距)
def ema(x,i):
x = np.array(x).ravel()
if i < 1 and i > 0:
l = len(x)
w = np.logspace(l, 0, num=l, base=(1-i)) #生成等比数列
else:
print('平滑因子范围错误')
return np.sum(x*w)/np.sum(w) #指数移动平均
########################## 移动平均预测 ################################
df = np.array(data).ravel() #将数据转化为numpy格式,并将其转化为一维列表
df0 = df.copy() #准备一个副本,存储真实值以及预测值
h = len(data) - q*s
X = np.zeros((h,q))
for t in range(h):
for i in range(1,q+1):
X[t][-i] = df[q*s+t-i*s]
for i in range(h):
df[q*s+i] = ema(X[i],0.5) #获得简单移动平均获得的预测数列

df_wu = df0 - df #获得误差项

df1 = np.zeros(predict_x_n) #存储预测值
for i in range(predict_x_n):
df1[i] = ema(df0[-q:],0.5) #得到所有预测值
df0 = np.append(df0,df1[i])
############################## 误差项预测 ###############################
def SAR(x,p,s,predict_x_n=0):
x = np.array(x[p*s:]).ravel()
df0 = x.copy()
Y = x[s*p:].copy()
h = len(x) - p*s
X = np.zeros((h,p))
for t in range(h):
for i in range(1,p+1):
X[t][-i] = x[p*s+t-i*s]
sigma = np.linalg.inv(X.T.dot(X)).dot(X.T).dot(Y.T) #最小二乘法估计参数值
for i in range(h):
df0[p*s+i] = sum(np.multiply(sigma , X[i])) #得到所有预测的估计值
df00 = x.copy()
df1 = np.zeros(predict_x_n)
for i in range(predict_x_n):
df1[i] = sum(np.multiply(sigma , df00[-p:][::-1])) #得到所有预测值
df00 = np.append(df00,df1[i])
return df0,df1,sigma
wucha,wucha_predict,sigma = SAR(df_wu,q,s,predict_x_n)
return wucha,wucha_predict,sigma
def SARIMA(p,q,P,Q,data,m):
#p_aicc,p_bic = p_finally(10)
df0,df1,sigma = AR_prediction(data,p,0)
wucha,wucha_predict,sigma1,df_wu = MA_prediction(data, q, 0)
a,b,sigma2 = SAR(data,P,m,0)
c,d,sigma3 = SMA(data,Q,m,0)
sigma = sigma[::-1]
sigma1 = sigma1[::-1]
sigma2 = sigma2[::-1]
sigma3 = sigma3[::-1]
sigma22 = np.append(np.array([1]),sigma2)
sigma33 = np.append(np.array([1]),sigma3)
result = []
for t in range(30,len(data)+1):
sar = 0
for p in range(p):
for P in range(P+1):
sar += sigma[p] * data[t-p-1-P*m] * sigma22[P]
sar += sum(sigma2 * np.array([data[t-i*m] for i in range(1,P+1)]))
sma = 0
for q in range(q):
for Q in range(Q+1):
sma += sigma1[q] * df_wu[t-q-1-Q*m] * sigma33[Q]
sma += sum(sigma3 * np.array([df_wu[t-i*m] for i in range(1,Q+1)]))
result.append(sar + sma)
return result


data0 = pd.read_csv('时间序列预测数据集.csv',parse_dates=['Date'])
data0.set_index('Date',inplace=True) #将时间设置为行索引
data=np.array(copy.deepcopy(data0)).ravel()
#data = data.reshape((10,365)) #季节性差分
#data=np.diff(data,1)
#data = data.ravel()
result = SARIMA(6,7,2,2,data,365)
plt.figure(figsize=(15, 7.5))
plt.plot(range(500),data[-500:],c = 'black',label='actual')
plt.plot(range(500),result[-500:],c='red',label='model')
plt.show()

print('mse:',mean_squared_error(result,data[-len(result):]))

从AR到SARIMA时间序列预测代码
https://jimes.cn/2024/09/12/从AR到SARIMA时间序列预测/
作者
Jimes
发布于
2024年9月12日
许可协议