-
Notifications
You must be signed in to change notification settings - Fork 0
/
mlp.py
executable file
·121 lines (96 loc) · 4.07 KB
/
mlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
from sys import stderr
import os
class MLP:
def __init__(self, sizes, f=None, df=None):
self.f = f or self._f
self.df = df or self._df_dnet
self.layers = []
if sizes:
prev_size = sizes[0]
for size in sizes[1:]:
layer = np.random.rand(size, prev_size + 1) - 0.5
self.layers.append(layer)
prev_size = size
def solve(self, x_p):
f_nets = []
df_dnets = []
for layer_id, layer in enumerate(self.layers):
if layer_id == 0:
fnet1 = np.append(x_p, 1) # append 1 to match θ
else:
fnet1 = np.append(f_nets[-1], 1) # get last f_net calculated
f_net = np.empty(len(layer))
df_dnet = np.empty(len(layer))
for j, neuron in enumerate(layer):
net = fnet1 @ neuron
f_net[j] = self.f(net)
df_dnet[j] = self.df(net)
f_nets.append(f_net)
df_dnets.append(df_dnet)
return f_nets, df_dnets
def train(self, X, Y, eta=0.1, threshold=1e-2):
squared_error = 2 * threshold # initial value to enter loop
try:
count = 0
min_squared_error = 1000.0
while squared_error > threshold:
squared_error = 0.0
for x_p, y_p in zip(X, Y):
f_nets, df_dnets = self.solve(x_p)
o_p = f_nets[-1] # get last layer's (output layer) result
# # which is mlp answer to x_p
delta_p = y_p - o_p # get the difference from the expected
# # answer
squared_error += sum(np.power(delta_p, 2)) # add to error
layers = list(zip(
range(len(self.layers)),
self.layers, f_nets, df_dnets))
deltas = [None] * len(layers)
# Calculating layers delta
for idx, layer, f_net, df_dnet in reversed(list(layers)):
if idx == len(self.layers) - 1: # is output layer
deltas[idx] = delta_p * df_dnet
else:
# delete last column to match dimension
_lay = np.delete(self.layers[idx + 1], -1, 1)
deltas[idx] = df_dnet * (deltas[idx + 1] @ _lay)
# Updating layers
for idx, layer, f_net, df_net in reversed(layers):
_delta = deltas[idx][np.newaxis].T
if idx > 0:
_f_net = np.append(f_nets[idx - 1], 1)[np.newaxis]
else: # is first hidden layer
_f_net = np.append(x_p, 1)[np.newaxis]
self.layers[idx] += eta * (_delta @ _f_net)
squared_error /= len(X)
count += 1
if squared_error < min_squared_error:
min_squared_error = squared_error
print(squared_error, 'new min', file=stderr)
elif count % 1 == 0:
print(squared_error, file=stderr)
except KeyboardInterrupt as e:
pass
def save(self, dirname):
os.makedirs(dirname, exist_ok=True)
for idx, layer in enumerate(self.layers, start=1):
fname = os.path.join(dirname, '%d.layer.npy' % idx)
np.save(fname, layer)
@staticmethod
def load(dirname):
fnames = [f for f in os.listdir(dirname) if f.endswith('.layer.npy')]
fnames = sorted(fnames)
layers = [np.load(os.path.join(dirname, f)) for f in fnames]
model = MLP([])
model.layers = layers
return model
@staticmethod
def _f(net):
return 1 / (1 + np.exp(-net))
@staticmethod
def _df_dnet(net):
fnet = 1 / (1 + np.exp(-net))
return fnet * (1 - fnet)