230 lines
8.4 KiB
Python
230 lines
8.4 KiB
Python
import numpy as np
|
|
import itertools
|
|
import functools
|
|
|
|
|
|
""" Set our parameters """
|
|
# Should we seed the results?
|
|
setSeed = False
|
|
seedNum = 27
|
|
|
|
""" Simulation parameters """
|
|
# Should we simulate more than once?
|
|
setSim = True
|
|
simNum = 1
|
|
|
|
""" Define our data """
|
|
# The Statespace
|
|
states = np.array(['L', 'w', 'W'])
|
|
|
|
# Possible sequences of events
|
|
transitionName = np.array([['LL', 'Lw', 'LW'],
|
|
['wL', 'ww', 'wW'],
|
|
['WL', 'Ww', 'WW']])
|
|
|
|
# Probabilities Matrix (transition matrix)
|
|
transitionMatrix = np.array([[0.6, 0.1, 0.3],
|
|
[0.1, 0.7, 0.2],
|
|
[0.2, 0.2, 0.6]])
|
|
|
|
# Starting state
|
|
startingState = 'w'
|
|
initial_dist = np.array([0, 1, 0])
|
|
# initial_dist = None
|
|
|
|
# Steps to run
|
|
stepTime = 100
|
|
# End state you want to find probabilites of
|
|
endState = 'W'
|
|
|
|
""" Find the transition matrix for the given number of steps
|
|
This finds the probabilities of going from step i to step j
|
|
in N number of steps, where N is your step size
|
|
E.g 10 steps would give P_10 which is prob of going from step i
|
|
to step j in exactly 10 steps """
|
|
transition_matrix_step = True
|
|
|
|
""" Get Stationary distribution of the Markov Chain
|
|
This tells you what % of time you spend in each state if you
|
|
simulated the Markov Chain for a high number of steps
|
|
THIS NEEDS THE INTIIAL DISTRIBUTION SETTING """
|
|
stat_dist = True
|
|
|
|
|
|
# A class that implements the Markov chain to forecast the state/mood:
|
|
class markov(object):
|
|
"""simulates a markov chain given its states, current state and
|
|
transition matrix.
|
|
|
|
Parameters:
|
|
states: 1-d array containing all the possible states
|
|
transitionName: 2-d array containing a list
|
|
of the all possible state directions
|
|
transitionMatrix: 2-d array containing all
|
|
the probabilites of moving to each state
|
|
currentState: a string indicating the starting state
|
|
steps: an integer determining how many steps (or times) to simulate"""
|
|
|
|
def __init__(self, states: np.array, transitionName: np.array,
|
|
transitionMatrix: np.array, currentState: str,
|
|
steps: int):
|
|
super(markov, self).__init__()
|
|
self.states = states
|
|
self.list = list
|
|
self.transitionName = transitionName
|
|
self.transitionMatrix = transitionMatrix
|
|
self.currentState = currentState
|
|
self.steps = steps
|
|
|
|
@staticmethod
|
|
def setSeed(num: int):
|
|
return np.random.seed(num)
|
|
|
|
@staticmethod
|
|
def transition_matrix_step(transitionMatrix, steps):
|
|
for _ in itertools.repeat(None, steps):
|
|
step_mat = np.matmul(transitionMatrix, transitionMatrix)
|
|
return step_mat
|
|
|
|
@staticmethod
|
|
def stationary_dist(transitionMatrix, initial_dist, steps):
|
|
w, v = np.linalg.eig(transitionMatrix.T)
|
|
j_stationary = np.argmin(abs(w - 1.0))
|
|
p_stationary = v[:, j_stationary].real
|
|
p_stationary /= p_stationary.sum()
|
|
return p_stationary
|
|
|
|
@functools.lru_cache(maxsize=128)
|
|
def forecast(self):
|
|
print(f'Start state: {self.currentState}\n')
|
|
# Shall store the sequence of states taken
|
|
self.stateList = [self.currentState]
|
|
i = 0
|
|
# To calculate the probability of the stateList
|
|
self.prob = 1
|
|
while i != self.steps:
|
|
if self.currentState == 'L':
|
|
self.change = np.random.choice(self.transitionName[0],
|
|
replace=True,
|
|
p=transitionMatrix[0])
|
|
if self.change == 'LL':
|
|
self.prob = self.prob * 0.8
|
|
self.stateList.append('L')
|
|
pass
|
|
elif self.change == 'Lw':
|
|
self.prob = self.prob * 0.15
|
|
self.currentState = 'w'
|
|
self.stateList.append('w')
|
|
else:
|
|
self.prob = self.prob * 0.05
|
|
self.currentState = "W"
|
|
self.stateList.append("W")
|
|
elif self.currentState == "w":
|
|
self.change = np.random.choice(self.transitionName[1],
|
|
replace=True,
|
|
p=transitionMatrix[1])
|
|
if self.change == "ww":
|
|
self.prob = self.prob * 0.15
|
|
self.stateList.append("w")
|
|
pass
|
|
elif self.change == "wL":
|
|
self.prob = self.prob * 0.8
|
|
self.currentState = "L"
|
|
self.stateList.append("L")
|
|
else:
|
|
self.prob = self.prob * 0.05
|
|
self.currentState = "W"
|
|
self.stateList.append("W")
|
|
elif self.currentState == "W":
|
|
self.change = np.random.choice(self.transitionName[2],
|
|
replace=True,
|
|
p=transitionMatrix[2])
|
|
if self.change == "WW":
|
|
self.prob = self.prob * 0.05
|
|
self.stateList.append("W")
|
|
pass
|
|
elif self.change == "WL":
|
|
self.prob = self.prob * 0.8
|
|
self.currentState = "L"
|
|
self.stateList.append("L")
|
|
else:
|
|
self.prob = self.prob * 0.15
|
|
self.currentState = "w"
|
|
self.stateList.append("w")
|
|
i += 1
|
|
print(f'Path Markov Chain took in this iteration: {self.stateList}')
|
|
print(f'End state after {self.steps} steps: {self.currentState}')
|
|
print(f'Probability of this specific path:'
|
|
f' {self.prob:.4f} or {self.prob:.2%}\n')
|
|
return self.stateList
|
|
|
|
|
|
def checker(item, name):
|
|
try:
|
|
item is not None
|
|
except Exception:
|
|
raise Exception(f'{name} is not set - set it and try again.')
|
|
|
|
|
|
def main(*args, **kwargs):
|
|
global startingState
|
|
try:
|
|
simNum = kwargs['simNum']
|
|
except KeyError:
|
|
pass
|
|
sumTotal = 0
|
|
# Check validity of transitionMatrix
|
|
for i in range(len(transitionMatrix)):
|
|
sumTotal += sum(transitionMatrix[i])
|
|
if i != len(states) and i == len(transitionMatrix):
|
|
raise ValueError('Probabilities should add to 1')
|
|
# Set the seed so we can repeat with the same results
|
|
if setSeed:
|
|
markov.setSeed(seedNum)
|
|
# Save our simulations:
|
|
list_state = []
|
|
count = 0
|
|
# Simulate Multiple Times
|
|
if setSim:
|
|
if initial_dist is not None:
|
|
startingState = np.random.choice(states, p=initial_dist)
|
|
for _ in itertools.repeat(None, simNum):
|
|
markovChain = markov(states, transitionName,
|
|
transitionMatrix, startingState,
|
|
stepTime)
|
|
list_state.append(markovChain.forecast())
|
|
startingState = np.random.choice(states, p=initial_dist)
|
|
else:
|
|
for _ in itertools.repeat(None, simNum):
|
|
markovChain = markov(states, transitionName,
|
|
transitionMatrix, startingState,
|
|
stepTime)
|
|
list_state.append(markovChain.forecast())
|
|
else:
|
|
for _ in range(1, 2):
|
|
list_state.append(markov(states, transitionName,
|
|
transitionMatrix, startingState,
|
|
stepTime).forecast())
|
|
for list in list_state:
|
|
if(list[-1] == f'{endState!s}'):
|
|
print(f'SUCCESS - path ended in the requested state {endState!s}'
|
|
f':', list)
|
|
count += 1
|
|
else:
|
|
print(f'FAILURE - path did not end in the requested state'
|
|
f' {endState!s}:', list)
|
|
if setSim is False:
|
|
simNum = 1
|
|
print(f'\nThe probability of starting in {startingState} and finishing'
|
|
f' in {endState} after {stepTime} steps is {(count / simNum):.2%}\n')
|
|
if transition_matrix_step:
|
|
print(f'P_{stepTime} is: \n'
|
|
f'{markov.transition_matrix_step(transitionMatrix, stepTime)}\n')
|
|
if stat_dist:
|
|
checker(initial_dist, 'initial distribution')
|
|
print(f'Stat dist is {markov.stationary_dist(transitionMatrix,initial_dist, stepTime)}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main(simNum=simNum)
|