学习目标:

  • 介绍HMM的定义与符号
  • 讨论HMM的三个基本问题
    • 概率计算问题:前后向算法
    • 学习问题:Baum-Welch模型,EM算法计算参数
    • 预测问题:Viterbi算法
  • 每种算法用代码实现
  • 参考李航的《统计学习方法》(在这里吐槽一下HMM那章下标 i 乱用,有些算法不是很ok)

基本概念

HMM是一种时序数据模型。
设序列长度为 T ,具有观测序列 X={x1,,xT}隐变量序列 Z={z1,,zT}
这里认为每一个观测都由对应的隐变量生成。隐变量序列是Markov链,zt只依赖于zt1

变量都在有限的状态集里变化,观测的状态集S={s1,,sM}隐变量的状态集H={h1,,hN}
因此 xtS,ztH,t=1,,T
有时需要反向找到某状态是状态集里的第几个,定义 findindex(zt)=i ,表示 zt=hi
同理也有 findindex(xt)=i ,表示 xt=si

隐状态间的转移矩阵A=[aij]N×Naij 是从状态 hi 转移到 hj 的概率。
从隐状态到观测的发射矩阵 B=[bij]N×Mbij 是从状态 hi 转移到观测 sj 的概率。
初始状态概率向量为 Π=[π1,,πN] 。鉴于初始时没有其他时刻转移到 t=0 ,设 z0πi 的概率属于 hi

λ=(A,B,Π) ,为HMM中的参数的集合。

生成观测序列

输入:T,S,H,λ=(A,B,Π)
输出:X

例如:有4个盒子,每个盒子里有若干红球和白球。每次从某盒子抽某色的球,求该序列的颜色。

这个例子中加上约束:盒子之间转移的概率(转移矩阵),盒子里球的概率分布(发射矩阵)。

由于需要按照特定概率分布产生随机数,定义下面这个函数,输入分布,输出该分布下的随机数。

1
2
3
4
5
6
7
8
9
10
11
12
import math
import random

# generate according to the distribution
def generate(rate):
r = random.random()
sum = 0
for i in range(len(rate)):
sum += rate[i];
if(r <= sum):
return i
return len(rate)-1
1
2
3
4
5
6
distribution = [0.4, 0.1, 0.5]
count = [0]*len(distribution)
for i in range(100000):
rd = generate(distribution)
count[rd] += 1
print(count)
[40043, 9990, 49967]
1
2
3
4
5
6
7
8
9
10
11
def observation(T, S, H, A, B, pi):
z = generate(pi)
x = S[generate(B[z])]
Z = [H[z]]
X = [x]
for t in range(1, T):
z = generate(A[z])
x = S[generate(B[z])]
Z.append(H[z])
X.append(x)
return Z, X
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
T = 10
S = ['red', 'white']
H = ['box1', 'box2', 'box3', 'box4']
A = [
[0, 1, 0, 0],
[0.3, 0, 0.7, 0],
[0, 0.4, 0, 0.6],
[0, 0, 0.6, 0.4]
]
B = [
[0.5, 0.5],
[0.3, 0.7],
[0.6, 0.4],
[0.4, 0.6]
]
pi = [0.4, 0.1, 0.25, 0.25]
Z, X = observation(T, S, H, A, B, pi)
print(Z)
print(X)
['box3', 'box4', 'box3', 'box2', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4']
['red', 'red', 'white', 'white', 'red', 'red', 'red', 'white', 'white', 'white']

从转移矩阵可以发现一件有趣的事。a12=1,这说明每次抽一号盒子之后,下一次一定抽二号盒子。

概率计算问题

输入:X,λ=(A,B,Π)

输出:P(X|λ)

暴力不可解,借用DP的思想,一层一层算,引入前后向算法

前向概率

从第 t 层算第 t+1 层,经典的DP的想法。

第一层是边界,特判。

αt(i)=P(x1,,xt,zt=hi|λ)=Nj=1P(x1,,xt,zt1=hj,zt=hi|λ)=Nj=1P(x1,,xt1,zt1=hj|λ)P(zt=hi|zt1=hj)P(xt|zt=hi)=Nj=1αt1(j)ajibik        k=findindex(xt)=(Nj=1αt1(j)aji)bik        k=findindex(xt)αt(i)={πibikt=1(Nj=1αt1(j)aji)bikt>1        k=findindex(xt)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def cal_alpha(T, S, H, A, B, pi):
N = len(H)
ap = []
for i in range(N):
ap.append(pi[i]*B[i][S.index(X[0])])
alpha = [ap]
for t in range(1, T):
ap = []
for i in range(N):
sum = 0
for j in range(N):
sum += alpha[t-1][j]*A[j][i]
ap.append(sum*B[i][S.index(X[t])])
alpha.append(ap)
return alpha
1
2
3
4
5
alpha = cal_alpha(T, S, H, A, B, pi)
for t in range(T):
for p in alpha[t]:
print("{:.15f}".format(p), end = " ")
print()
0.200000000000000 0.030000000000000 0.150000000000000 0.100000000000000 
0.004500000000000 0.078000000000000 0.048600000000000 0.052000000000000 
0.011700000000000 0.016758000000000 0.034320000000000 0.029976000000000 
0.002513700000000 0.017799600000000 0.011886480000000 0.019549440000000 
0.002669940000000 0.002180487600000 0.014513630400000 0.005980665600000 
0.000327073140000 0.002542617648000 0.003068844408000 0.004440177792000 
0.000381392647200 0.000466383270960 0.002666363417280 0.001446951104640 
0.000069957490644 0.001013556609878 0.000477855580982 0.001307159095334 
0.000152033491482 0.000182769806126 0.000597514033646 0.000485746192034 
0.000027415470919 0.000273727373458 0.000167754631803 0.000331684138201 

后向概率

从第 t 层算第 t1 层,可以认为是 xt 按照概率 aij 枚举了所有的可能。

最后一层是边界,特判。

βt(i)=P(xt+1,,xT|zt=hi,λ)=Nj=1P(xt+1,,xT,zt+1=hj|zt=hi,λ)=Nj=1P(zt+1=hj|zt=hi)P(xt+1|zt+1=hj)P(xt+2,,xT|zt+1=hj,λ)=Nj=1aijbjkβt+1(j)        k=findindex(xt+1)βt(i)={1t=TNj=1aijbjkβt+1(j)t<T        k=findindex(xt+1)
1
2
3
4
5
6
7
8
9
10
11
12
13
def cal_beta(T, S, H, A, B, pi):
N = len(H)
bt = [1] * N
beta = [bt]
for t in range(T-2, -1, -1):
bt = []
for i in range(N):
sum = 0
for j in range(N):
sum += A[i][j]*B[j][S.index(X[t+1])]*beta[0][j]
bt.append(sum)
beta.insert(0, bt)
return beta
1
2
3
4
5
beta = cal_beta(T, S, H, A, B, pi)
for t in range(T):
for p in beta[t]:
print("{:.15f} ".format(p), end = "")
print()
0.001523984431183 0.001898572783199 0.001631730619454 0.001940679517304 
0.002862946700712 0.005079948103944 0.003497930424029 0.004258903529088 
0.012940968952800 0.004089923858160 0.011210009860800 0.006535421510400 
0.007720551720000 0.018487098504000 0.010470861072000 0.016760061888000 
0.031171644000000 0.025735172400000 0.032884171200000 0.030761001600000 
0.038173800000000 0.103905480000000 0.047640720000000 0.085064640000000 
0.198940000000000 0.127246000000000 0.176344000000000 0.134880000000000 
0.301000000000000 0.284200000000000 0.293200000000000 0.268800000000000 
0.700000000000000 0.430000000000000 0.640000000000000 0.480000000000000 
1.000000000000000 1.000000000000000 1.000000000000000 1.000000000000000 

前后向算法

结合前向和后向概率,对于中间的 xt 前面用前向算法,后面用后向算法。

P(X|λ)=P(x1,,xt,xt+1,,xT|λ)=Ni=1P(x1,,xt,xt+1,,xT,zt=hi|λ)=Ni=1P(x1,,xt,zt=hi|λ)P(xt+1,,xT|zt=hi,λ)=Ni=1αt(i)βt(i)
1
2
3
4
5
6
7
8
def forword_backword(alpha, beta, t, T, S, H, A, B, pi):
if t < 0 or t >= T:
return 0;
sum = 0
N = len(H)
for i in range(N):
sum += alpha[t][i]*beta[t][i]
return sum
1
2
for t in range(T):
print("{:2d}".format(t), "{:.15f}".format(forword_backword(alpha, beta, t, T, S, H, A, B, pi)))
 0 0.000800581614381
 1 0.000800581614381
 2 0.000800581614381
 3 0.000800581614381
 4 0.000800581614381
 5 0.000800581614381
 6 0.000800581614381
 7 0.000800581614381
 8 0.000800581614381
 9 0.000800581614381

有什么用

不论 t 的取值是什么,最后算出来的观测概率都是一样的。为什么要大费周章算第 t 个观测的情况,这里埋个伏笔。

预测问题

输入:X,λ=(A,B,Π)
输出:Z

在上面DP的过程中,记录第 t 层的第 i 个状态是前一层哪一个转移过来的,可以得到最优路径。

Viterbi算法

一开始我以为 Viterbi 算法和前向算法是一个东西,第 t 层的每个节点都计算了从第 t1 层过来的所有概率之和。
实际上 Viterbi 算的不是和,而是从 t1 层过来的 N 个概率的最大值。
前向算法好比是算最大流,αt(i) 是第 t 个时刻经过节点 hi 的所有的可能。
Viterbi算法好比是求最短路,第 t 个时刻经过节点 hi 的路径有好多条,只需要选择其中概率最大的一条。

σt(i)={πibikt=1(max1jNσt1(j)aji)bikt>1        k=findindex(xt)

在计算最值的过程中,同时记录了转移到第 t 个时刻节点 hi 的上一层节点的标号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def viterbi(T, S, H, A, B, pi, X):
N = len(H)
sg = []
parent = [0]
for i in range(N):
sg.append(pi[i]*B[i][S.index(X[0])])
for t in range(1, T):
sigma = sg
sg = []
pt = []
for i in range(N):
maxindex, maxvalue = [-1, 0]
for j in range(N):
if sigma[j]*A[j][i] > maxvalue:
maxvalue = sigma[j]*A[j][i]
maxindex = j
sg.append(maxvalue*B[i][S.index(X[t])])
pt.append(maxindex)
parent.append(pt)
for i in range(N):
maxindex, maxvalue = [-1, 0]
if sigma[i] > maxvalue:
maxvalue = sigma[i]
maxindex = i
parent.append(maxindex)
return parent
1
2
3
4
5
6
7
8
def get_solution(parent, T):
ind = [parent[T]]
ret = [H[ind[0]]]
for t in range(T-1, 0, -1):
p = parent[t][ind[0]]
ind.insert(0, p)
ret.insert(0, H[p])
return ret
1
2
3
4
5
6
7
8
9
10
parent = viterbi(T, S, H, A, B, pi, X)
result = get_solution(parent, T)
print('X: ', X)
print('true Z: ', Z)
print('viterbi:', result)

y = 0
for i in range(len(Z)):
if Z[i] == result[i]: y += 1
print('YES: ', y, ' NO: ', len(Z)-y)
X:    ['red', 'red', 'white', 'white', 'red', 'red', 'red', 'white', 'white', 'white']
true Z:  ['box3', 'box4', 'box3', 'box2', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4']
viterbi: ['box1', 'box2', 'box1', 'box2', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4']
YES:  7    NO:  3

错误率很高,准不准看心情。看来生成同一个观测的隐序列有好多条,概率大的那条和真实的那条,并不能保证更加重合。

预测缺失

输入:x1,,xt1,xt+1,,xT,λ=(A,B,Π)
输出:xt

θ=argmaxkP(xt=sk|x1,,xt1,xt+1,,xT,λ)=argmaxkP(X|λ)P(x1,,xt1,xt+1,,xT|λ)        xt=sk=argmaxkP(X|λ)Ni=1Nj=1P(x1,,xt1,zt1=hi)P(zt=hj|zt1=hi)P(xt+1,,xT|zt=hj,λ)=argmaxkNi=1αt(i)βt(i)Ni=1Nj=1αt1(i)aijβt(j)        xt=sk=argmaxkNi=1αt(i)βt(i)        xt=skxt=sθ

先计算出所有的 α,β,复杂度为O(TN2),再根据 xt=sk 更新出 αt(i),复杂度为O(N2)βt(i) 不受 xt 的影响,故不用更新。

分母是对两个隐变量进行积分。隐变量多一个,复杂度就要乘 N,尽量让隐变量越少越好。

1
2
3
4
5
6
7
8
9
10
def normalization(distribution):
sum = 0
for x in distribution:
sum += x
if sum == 0:
return distribution
ret = []
for x in distribution:
ret.append(x/sum)
return ret
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def predict(T, S, H, A, B, pi, X, t):
alpha = cal_alpha(T, S, H, A, B, pi)
beta = cal_beta(T, S, H, A, B, pi)
N = len(H)
pd = []
for sk in S:
X[t] = sk
if t == 0:
for i in range(N):
alpha[0][i] = pi[i]*B[i][S.index(X[0])]
else:
for i in range(N):
alpha[t][i] = 0
for j in range(N):
alpha[t][i] += alpha[t-1][j]*A[j][i]
alpha[t][i] *= B[i][S.index(X[t])]
pd.append(forword_backword(alpha, beta, t, T, S, H, A, B, pi))
print(pd)
print('after normalization: ', normalization(pd))
theta = pd.index(max(pd))
return S[theta]
1
2
3
4
t = 0
xt = predict(T, S, H, A, B, pi, X, t)
print(X)
print('Truth: ', X[t], ' Result: ', xt)
[0.0008005816143812111, 0.0008919719706016694]
after normalization:  [0.47300222662628943, 0.5269977733737106]
['white', 'red', 'white', 'white', 'red', 'red', 'red', 'white', 'white', 'white']
Truth:  white        Result:  white
1
2
3
4
t = int(T/2)
xt = predict(T, S, H, A, B, pi, X, t)
print(X)
print('Truth: ', X[t], ' Result: ', xt)
[0.0008919719706016694, 0.0013705471229141035]
after normalization:  [0.3942384279354817, 0.6057615720645183]
['white', 'red', 'white', 'white', 'red', 'white', 'red', 'white', 'white', 'white']
Truth:  white        Result:  white

M 个结果归一化,若概率比较接近,则结果比较不准确。概率差的越多越准。

学习问题

输入:X
输出:λ=(A,B,Π)

常规用监督学习的样本来估计出参数,但标注费用比较高,因此用非监督的学习方法来做。

借助:P(X|λ)用最大似然估计参数,EM算法计算参数。

Baum-Welch模型

记给定观测和参数下的 zt=hi 的概率

γt(i)=P(zt=hi|X,λ)=P(zt=hi,X|λ)P(X|λ)=αt(i)βt(i)Ni=1αt(i)βt(i)

记给定观测和参数下的 zt=hi,zt+1=hj 的概率

ξt(i,j)=P(zt=hi,zt+1=hj|X,λ)=P(zt=hi,zt+1=hj,X|λ)P(X|λ)=αt(i)aijbjkβt+1(j)Ni=1αt(i)βt(i)k=findindex(xt+1)
1
2
3
4
5
6
7
8
9
10
def cal_gamma(T, S, H, A, B, pi, alpha, beta):
N = len(H)
gamma = []
for t in range(T):
d = forword_backword(alpha, beta, t, T, S, H, A, B, pi)
gm = []
for i in range(N):
gm.append(alpha[t][i]*beta[t][i]/d)
gamma.append(gm)
return gamma
1
2
3
4
5
6
7
8
9
10
11
12
13
def cal_xi(T, S, H, A, B, pi, alpha, beta):
N = len(H)
xi = []
for t in range(T-1):
d = forword_backword(alpha, beta, t, T, S, H, A, B, pi)
tx = []
for i in range(N):
ty = []
for j in range(N):
ty.append(alpha[t][i]*A[i][j]*B[j][S.index(X[t+1])]*beta[t+1][j]/d)
tx.append(ty)
xi.append(tx)
return xi

算法步骤:

  1. 初始化模型参数 λ=(A(0),B(0),Π(0))
  2. 递推a(n)ij=T1t=1ξt(i,j)T1t=1γt(i)b(n)jk=Tt=1γt(j)[xt==sk]Tt=1γt(j)π(n)i=γ1(i)
  3. 反复迭代直到结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def BaumWelch(T, S, H, A, B, pi, X):
alpha = cal_alpha(T, S, H, A, B, pi)
beta = cal_beta(T, S, H, A, B, pi)
gamma = cal_gamma(T, S, H, A, B, pi, alpha, beta)
xi = cal_xi(T, S, H, A, B, pi, alpha, beta)
N = len(H)
M = len(S)
for i in range(N):
pi[i] = gamma[0][i]
for j in range(N):
a = 0
b = 0
for t in range(T-1):
a += xi[t][i][j]
b += gamma[t][i]
A[i][j] = a / b
for j in range(N):
for k in range(M):
c = 0
d = 0
for t in range(T):
if X[t] == S[k]: c += gamma[t][j]
d += gamma[t][j]
B[j][k] = c / d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
T = 100
S = ['red', 'white']
H = ['box1', 'box2', 'box3', 'box4']
A = [
[0, 1, 0, 0],
[0.3, 0, 0.7, 0],
[0, 0.4, 0, 0.6],
[0, 0, 0.6, 0.4]
]
B = [
[0.5, 0.5],
[0.3, 0.7],
[0.6, 0.4],
[0.4, 0.6]
]
pi = [0.4, 0.1, 0.25, 0.25]
Z, X = observation(T, S, H, A, B, pi)
print(Z)
print(X)
['box2', 'box1', 'box2', 'box3', 'box4', 'box4', 'box4', 'box3', 'box2', 'box3', 'box4', 'box4', 'box4', 'box3', 'box4', 'box4', 'box4', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4', 'box4', 'box3', 'box4', 'box4', 'box3', 'box2', 'box3', 'box4', 'box3', 'box4', 'box4', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4', 'box4', 'box4', 'box4', 'box4', 'box4', 'box4', 'box3', 'box4', 'box4', 'box3', 'box2', 'box1', 'box2', 'box3', 'box2', 'box3', 'box4', 'box3', 'box4', 'box3', 'box2', 'box3', 'box2', 'box1', 'box2', 'box3', 'box2', 'box1', 'box2', 'box3', 'box4', 'box4', 'box4', 'box3', 'box4', 'box3', 'box2', 'box3', 'box4', 'box4', 'box3', 'box4', 'box4', 'box3', 'box2', 'box1', 'box2', 'box1', 'box2', 'box3', 'box4', 'box3', 'box4', 'box3', 'box4', 'box4', 'box3', 'box4', 'box3', 'box4']
['red', 'red', 'white', 'red', 'red', 'white', 'red', 'red', 'white', 'red', 'white', 'white', 'white', 'red', 'white', 'white', 'red', 'white', 'white', 'red', 'white', 'white', 'red', 'white', 'white', 'white', 'red', 'red', 'red', 'red', 'white', 'red', 'white', 'white', 'white', 'red', 'red', 'white', 'red', 'white', 'white', 'red', 'red', 'white', 'red', 'white', 'red', 'red', 'white', 'red', 'white', 'white', 'red', 'red', 'red', 'red', 'white', 'red', 'white', 'red', 'white', 'white', 'white', 'white', 'white', 'red', 'white', 'red', 'red', 'red', 'white', 'white', 'white', 'red', 'white', 'white', 'white', 'red', 'red', 'white', 'red', 'red', 'white', 'red', 'white', 'red', 'red', 'red', 'red', 'white', 'white', 'red', 'white', 'white', 'red', 'white', 'red', 'white', 'red', 'white']
1
2
3
4
5
6
7
8
9
N = len(H)
M = len(S)
for i in range(N):
pi[i] = 1/N
for j in range(N):
A[i][j] = 1.0/N
for i in range(N):
for j in range(M):
B[i][j] = 1.0/M
1
2
3
4
5
6
7
for n in range(100):
BaumWelch(T, S, H, A, B, pi, X)
print('A = ')
for a in A: print(a)
print('B = ')
for a in B: print(a)
print('pi = ', pi)
A = 
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
[0.25, 0.25, 0.25, 0.25]
B = 
[0.49, 0.51]
[0.49, 0.51]
[0.49, 0.51]
[0.49, 0.51]
pi =  [0.25, 0.25, 0.25, 0.25]
1
2
3
4
5
6
7
8
9
10
11
12
13
A = [
[0, 1, 0, 0],
[0.3, 0, 0.7, 0],
[0, 0.4, 0, 0.6],
[0, 0, 0.6, 0.4]
]
B = [
[0.5, 0.5],
[0.3, 0.7],
[0.6, 0.4],
[0.4, 0.6]
]
pi = [0.4, 0.1, 0.25, 0.25]
1
2
3
4
5
6
7
for n in range(100):
BaumWelch(T, S, H, A, B, pi, X)
print('A = ')
for a in A: print(a)
print('B = ')
for a in B: print(a)
print('pi = ', pi)
A = 
[0.0, 1.0, 0.0, 0.0]
[0.24846367821110096, 0.0, 0.7515363217888992, 0.0]
[0.0, 0.2933646349456608, 0.0, 0.7066353650543393]
[0.0, 0.0, 0.3315243870373437, 0.6684756129626561]
B = 
[0.009243586459035613, 0.990756413540964]
[0.002343559403560941, 0.9976564405964387]
[0.8930977369597577, 0.10690226304024238]
[0.4123984323282614, 0.5876015676717384]
pi =  [5.000125928515478e-158, 2.953264517100414e-70, 5.776557121141218e-18, 1.0]

展望

  • viterbi和学习参数效果挺差的,看点相关论文学习优化
  • 老板说要结合多视角啊
  • BW算法推导的地方有空再补上