import numpy as np import itertools import functools """ Set our parameters """ # Should we seed the results? setSeed = False seedNum = 27 """ Simulation parameters """ # Should we simulate more than once? setSim = True simNum = 5000 """ Define our data """ # The Statespace states = np.array(['L', 'w', 'W']) # Possible sequences of events transitionName = np.array([['LL', 'Lw', 'LW'], ['wL', 'ww', 'wW'], ['WL', 'Ww', 'WW']]) # Probabilities Matrix (transition matrix) transitionMatrix = np.array([[0.6, 0.1, 0.3], [0.1, 0.7, 0.2], [0.2, 0.2, 0.6]]) # Starting state # startingState = 'L' initial_dist = np.array([1, 0, 0]) # initial_dist = None # Steps to run stepTime = 5 # End state you want to find probabilites of endState = 'L' """ Find the transition matrix for the given number of steps This finds the probabilities of going from step i to step j in N number of steps, where N is your step size E.g 10 steps would give P_10 which is prob of going from step i to step j in exactly 10 steps """ transition_matrix_step = True """ Get Stationary distribution of the Markov Chain This tells you what % of time you spend in each state if you simulated the Markov Chain for a high number of steps THIS NEEDS THE INTIIAL DISTRIBUTION SETTING """ stat_dist = True # A class that implements the Markov chain to forecast the state/mood: class markov(object): """simulates a markov chain given its states, current state and transition matrix. Parameters: states: 1-d array containing all the possible states transitionName: 2-d array containing a list of the all possible state directions transitionMatrix: 2-d array containing all the probabilites of moving to each state currentState: a string indicating the starting state steps: an integer determining how many steps (or times) to simulate""" def __init__(self, states: np.array, transitionName: np.array, transitionMatrix: np.array, currentState: str, steps: int): super(markov, self).__init__() self.states = states self.list = list self.transitionName = transitionName self.transitionMatrix = transitionMatrix self.currentState = currentState self.steps = steps @staticmethod def setSeed(num: int): return np.random.seed(num) @staticmethod def transition_matrix_step(transitionMatrix, steps): for _ in itertools.repeat(None, steps): step_mat = np.matmul(transitionMatrix, transitionMatrix) return step_mat @staticmethod def stationary_dist(transitionMatrix, initial_dist, steps): w, v = np.linalg.eig(transitionMatrix.T) j_stationary = np.argmin(abs(w - 1.0)) p_stationary = v[:, j_stationary].real p_stationary /= p_stationary.sum() return p_stationary @functools.lru_cache(maxsize=128) def forecast(self): print(f'Start state: {self.currentState}\n') # Shall store the sequence of states taken self.stateList = [self.currentState] i = 0 # To calculate the probability of the stateList self.prob = 1 while i != self.steps: if self.currentState == self.states[0]: self.change = np.random.choice(self.transitionName[0], replace=True, p=transitionMatrix[0]) if self.change == self.transitionName[0][0]: self.prob = self.prob * self.transitionMatrix[0][0] self.stateList.append(self.states[0]) pass elif self.change == self.transitionName[0][1]: self.prob = self.prob * self.transitionMatrix[0][1] self.currentState = self.states[1] self.stateList.append(self.states[1]) else: self.prob == self.transitionMatrix[0][2] self.currentState = self.states[2] self.stateList.append(self.states[2]) elif self.currentState == "w": self.change = np.random.choice(self.transitionName[1], replace=True, p=transitionMatrix[1]) if self.change == "ww": self.prob = self.prob * 0.15 self.stateList.append("w") pass elif self.change == "wL": self.prob = self.prob * 0.8 self.currentState = "L" self.stateList.append("L") else: self.prob = self.prob * 0.05 self.currentState = "W" self.stateList.append("W") elif self.currentState == "W": self.change = np.random.choice(self.transitionName[2], replace=True, p=transitionMatrix[2]) if self.change == "WW": self.prob = self.prob * 0.05 self.stateList.append("W") pass elif self.change == "WL": self.prob = self.prob * 0.8 self.currentState = "L" self.stateList.append("L") else: self.prob = self.prob * 0.15 self.currentState = "w" self.stateList.append("w") i += 1 print(f'Path Markov Chain took in this iteration: {self.stateList}') print(f'End state after {self.steps} steps: {self.currentState}') print(f'Probability of this specific path:' f' {self.prob:.4f} or {self.prob:.2%}\n') return self.stateList def checker(item, name): try: item is not None except Exception: raise Exception(f'{name} is not set - set it and try again.') def main(*args, **kwargs): global startingState try: simNum = kwargs['simNum'] except KeyError: pass sumTotal = 0 # Check validity of transitionMatrix for i in range(len(transitionMatrix)): sumTotal += sum(transitionMatrix[i]) if i != len(states) and i == len(transitionMatrix): raise ValueError('Probabilities should add to 1') # Set the seed so we can repeat with the same results if setSeed: markov.setSeed(seedNum) # Save our simulations: list_state = [] count = 0 # Simulate Multiple Times if setSim: if initial_dist is not None: startingState = np.random.choice(states, p=initial_dist) for _ in itertools.repeat(None, simNum): markovChain = markov(states, transitionName, transitionMatrix, startingState, stepTime) list_state.append(markovChain.forecast()) startingState = np.random.choice(states, p=initial_dist) else: for _ in itertools.repeat(None, simNum): markovChain = markov(states, transitionName, transitionMatrix, startingState, stepTime) list_state.append(markovChain.forecast()) else: for _ in range(1, 2): list_state.append(markov(states, transitionName, transitionMatrix, startingState, stepTime).forecast()) for list in list_state: if(list[-1] == f'{endState!s}'): print(f'SUCCESS - path ended in the requested state {endState!s}' f':', list) count += 1 else: print(f'FAILURE - path did not end in the requested state' f' {endState!s}:', list) if setSim is False: simNum = 1 print(f'\nThe probability of starting in {startingState} and finishing' f' in {endState} after {stepTime} steps is {(count / simNum):.2%}\n') if transition_matrix_step: print(f'P_{stepTime} is: \n' f'{markov.transition_matrix_step(transitionMatrix, stepTime)}\n') if stat_dist: checker(initial_dist, 'initial distribution') print(f'Stat dist is {markov.stationary_dist(transitionMatrix,initial_dist, stepTime)}') if __name__ == '__main__': main(simNum=simNum)