import numpy as np import itertools import functools """ Set our parameters """ # Should we seed the results? setSeed = False seedNum = 27 """ Simulation parameters """ # Should we simulate more than once? setSim = True simNum = 15000 """ Define our data """ # The Statespace states = np.array(['L', 'w', 'W']) # Possible sequences of events transitionName = np.array( [ ['LL', 'Lw', 'LW'], ['wL', 'ww', 'wW'], ['WL', 'Ww', 'WW'], ] ) # Probabilities Matrix (transition matrix) transitionMatrix = np.array( [ [0.6, 0.1, 0.3], [0.1, 0.7, 0.2], [0.2, 0.2, 0.6], ] ) # Starting state - Given as a list of probabilities of starting # in each state. To always start at the same state set one of them # to 1 and the rest to 0. These must add to 1! initial_dist = np.array([1, 0, 0]) # Steps to run - How many steps or jumps we want the markov chain to make stepTime = 2 # End state you want to find probabilites of - we can estimate the stationary # distribution by running this many times for many steps. # E.g to find the stat distribution for L run the simulation 10,000 times # for 1000 jumps. This should be very close to the actual value which # we can find below. endState = 'L' """ Find the transition matrix for the given number of steps This finds the probabilities of going from step i to step j in N number of steps, where N is your step size E.g 10 steps would give P_10 which is prob of going from step i to step j in exactly 10 steps """ transition_matrix_step = True """ Get Stationary distribution of the Markov Chain This tells you what % of time you spend in each state if you simulated the Markov Chain for a high number of steps THIS NEEDS THE INTIIAL DISTRIBUTION SETTING """ stat_dist = True # A class that implements the Markov chain to forecast the state/mood: class markov(object): """Simulates a markov chain given its states, current state and transition matrix. Attributes ---------- change : str Random choice from transitionName currentState : str Current state of the Markov Chain prob : float Step probability stateList : list A list that tracks how `currentState` evolves over time states : np.array An array containing the possible states of the chain steps : int See below transitionMatrix : np.array Transition matrix transitionName : np.array Step representation of the probailities of the transisition matrix Parameters ---------- states : np.array See above transitionName : np.array See above transitionMatrix : np.array See above currentState : str See above steps : int How many steps to take """ def __init__( self, states: np.array, transitionName: np.array, transitionMatrix: np.array, currentState: str, steps: int, ): super(markov, self).__init__() self.states = states self.transitionName = transitionName self.transitionMatrix = transitionMatrix self.currentState = currentState self.steps = steps @staticmethod def setSeed(num: int): return np.random.seed(num) @staticmethod def transition_matrix_step(transitionMatrix, steps): try: for _ in itertools.repeat(None, steps - 1): step_mat = np.matmul(transitionMatrix, transitionMatrix) return step_mat except UnboundLocalError: return transitionMatrix @staticmethod def stationary_dist(transitionMatrix, initial_dist, steps): w, v = np.linalg.eig(transitionMatrix.T) j_stationary = np.argmin(abs(w - 1.0)) p_stationary = v[:, j_stationary].real p_stationary /= p_stationary.sum() return p_stationary @functools.lru_cache(maxsize=128) def forecast(self): """Simulate the Markov chain Returns ------- np.array Description """ print(f'Start state: {self.currentState}\n') # Shall store the sequence of states taken self.stateList = [self.currentState] i = 0 # To calculate the probability of the stateList self.prob = 1 while i != self.steps: if self.currentState == self.states[0]: self.change = np.random.choice( self.transitionName[0], replace=True, p=transitionMatrix[0] ) if self.change == self.transitionName[0][0]: self.prob = self.prob * self.transitionMatrix[0][0] self.stateList.append(self.states[0]) pass elif self.change == self.transitionName[0][1]: self.prob = self.prob * self.transitionMatrix[0][1] self.currentState = self.states[1] self.stateList.append(self.states[1]) else: self.prob = self.transitionMatrix[0][2] self.currentState = self.states[2] self.stateList.append(self.states[2]) elif self.currentState == self.states[1]: self.change = np.random.choice( self.transitionName[1], replace=True, p=transitionMatrix[1] ) if self.change == self.transitionName[1][0]: self.prob = self.prob * self.transitionMatrix[1][0] self.currentState = self.states[0] self.stateList.append(self.states[0]) elif self.change == self.transitionName[1][1]: self.prob = self.prob * self.transitionMatrix[1][1] self.stateList.append(self.states[1]) pass else: self.prob = self.prob * self.transitionMatrix[1][2] self.currentState = self.states[2] self.stateList.append(self.states[2]) elif self.currentState == self.states[2]: self.change = np.random.choice( self.transitionName[2], replace=True, p=transitionMatrix[2] ) if self.change == self.transitionName[2][0]: self.prob = self.prob * self.transitionMatrix[2][0] self.currentState = self.states[0] self.stateList.append(self.states[0]) elif self.change == self.transitionName[2][1]: self.prob = self.prob * self.transitionMatrix[2][1] self.currentState = self.states[1] self.stateList.append(self.states[1]) else: self.prob = self.prob * self.transitionMatrix[2][2] self.stateList.append(self.states[2]) pass i += 1 print(f'Path Markov Chain took in this iteration: {self.stateList}') print(f'End state after {self.steps} steps: {self.currentState}') print( f'Probability of taking these exact steps in this order is:' f' {self.prob:.2f} or {self.prob:.2%}\n' ) return self.stateList def checker(item, name): try: item is not None except Exception: raise Exception(f'{name} is not set - set it and try again.') def main(*args, **kwargs): global startingState try: simNum = kwargs['simNum'] except KeyError: pass sumTotal = 0 # Check validity of transitionMatrix for i in range(len(transitionMatrix)): sumTotal += sum(transitionMatrix[i]) if i != len(states) and i == len(transitionMatrix): raise ValueError('Probabilities should add to 1') # Set the seed so we can repeat with the same results if setSeed: markov.setSeed(seedNum) # Save our simulations: list_state = [] count = 0 # Simulate Multiple Times if setSim: if initial_dist is not None: startingState = np.random.choice(states, p=initial_dist) for _ in itertools.repeat(None, simNum): markovChain = markov( states, transitionName, transitionMatrix, startingState, stepTime, ) list_state.append(markovChain.forecast()) startingState = np.random.choice(states, p=initial_dist) else: for _ in itertools.repeat(None, simNum): markovChain = markov( states, transitionName, transitionMatrix, startingState, stepTime, ) list_state.append(markovChain.forecast()) else: for _ in range(1, 2): list_state.append( markov( states, transitionName, transitionMatrix, startingState, stepTime, ).forecast() ) for list in list_state: if list[-1] == f'{endState!s}': print( f'SUCCESS - path ended in the requested state {endState!s}' f':', list, ) count += 1 else: print( f'FAILURE - path did not end in the requested state' f' {endState!s}:', list, ) if setSim is False: simNum = 1 print( f'\nTherefore the estimated probability of starting in' f' {startingState} and finishing in {endState} after ' f'{stepTime} steps is ' f'{(count / simNum):.2%}.\n' f'This is calculated by number of success/total steps\n' ) if transition_matrix_step: print( f'P_{stepTime} is: \n' f'{markov.transition_matrix_step(transitionMatrix, stepTime)}\n' ) if stat_dist: checker(initial_dist, 'initial distribution') print( f"""Stat dist is { markov.stationary_dist(transitionMatrix,initial_dist, stepTime) }""" ) if __name__ == '__main__': main(simNum=simNum)