Design Pattern in Python (3): State Pattern

This post is one of my “Design Pattern” post collection.

Introduction

In State pattern a class behavior changes based on its state. This type of design pattern comes under behavior pattern. In State pattern, we create objects which represent various states and a context object whose behavior varies as its state object changes.

Today I do some coding work as an exercise on State Pattern in Python.

State examples

States concept is everywhere in this world. For example, I am now in “working state” for this moment, and one hour ago I was in “playing state” (watching Netflix). Another example, water stays in “Liquid State” under room temperature, however it will transit to “Solid State” or “Gas State” when temperature goer higher than 100 deg or lower than 0 deg.

In industrial domain, States concept is also broadly adopted in many technologies. I used to work on medical device development, I thus give two examples in this domain.

Task states in VxWorks RTOS:

CanOpen devices conforming to CanOpen protocol (CiA DS301):

My implementation

I would like to simulate state behaviors of a CANOpen communication node (CiA DS301). As seen in the figure above, it has 4 states:
Initialization, PreOperational, Operational and Stopped. My code below is supposed to simulate ONLY some transitions between its states. In fact, a real CANOpen device is much more complicated than my simulation (see protocol).

Now time to code.

Package & singleton preparation

Firstly import some useful packages and define singleton decorator.

from abc import ABCMeta, abstractmethod
import threading, time

### Singleton Decorator method def singleton(cls, *args, **kwargs):
    __instance = {}

    def __singleton(*args, **kwargs):

        if cls not in __instance:
            __instance[cls] = cls(*args, **kwargs)
        else:
            pass

        return __instance[cls]
    return __singleton

Enter fullscreen mode Exit fullscreen mode

Context class

Context class is designed to possess a list of its states. It should has one current state at each moment in its life cycle. This class should implement a function for transition of state. One can also add state-dependent behaviors. In my case, I defined entryBehavior() and exitBehavior() to call a state’s behaviors upon its entry and exit.

class Context():
   """Context base class"""
    def __init__(self, ContextName):

        self.__states = {}
        self.__currentState = None    # state name         self.__name = ContextName

    def addState(self,state):
        self.__states[state.getName()] = state
        print("States: ",  self.__states.keys())

    def setState(self, stateName):
        if stateName in self.__states :
            self.__currentState = self.__states[stateName]
        else:
            print("Error: unknown state: {}".format(stateName))

    def getState(self):
        return self.__currentState

    def getContextName(self):
        return self.__name   

    ## message driven transition     def doTransition(self, msg):
        current = self.__currentState.getName()
        if msg["from"] == current:
            print("Transition from {} to {}".format(msg["from"], msg["to"]))
            self.exitBehavior( self.__states[msg["from"]])
            self.setState(msg["to"])
            self.entryBehavior(  self.__states[msg["to"]])    
        else:
            print("Error: Current State is {}, received transition from {} to {}".format(current, msg["from"], msg["to"]))

    ## Behavior upon entry of a new state     def entryBehavior(self, toState):        
        if (isinstance(toState,State)):
            toState.onEntryBehavior(self)

    ## Behavior upon exit of present state     def exitBehavior(self, fromState):        
        if (isinstance(fromState,State)):
            fromState.onExitBehavior(self)

Enter fullscreen mode Exit fullscreen mode

State class

State class is quite simple. It has a private name member. I defined two abstractmethods onExitBehavior() andonExitBehavior() which shall be implemented in derived state classes.

class State(metaclass=ABCMeta):
    """State base class"""

    def __init__(self, name):
        self.__name = name

    def getName(self):
        return self.__name

    @abstractmethod
    def onEntryBehavior(self, CANOpen_Node):
        pass 

    @abstractmethod
    def onExitBehavior(self, CANOpen_Node):
        pass 

Enter fullscreen mode Exit fullscreen mode

Concrete state class

As one example of Concrete state class, I show in this part code of InitializationStateclass which is a derived class from State class. Instance of this state shall call onEntryBehavior() and “start dispatching heartbeat msgs” upon entry of state. And upon exit, it calls onExitBehavior() to “stop dispatching heartbeat msgs”.

@singleton
class InitializationState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))

    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))

Enter fullscreen mode Exit fullscreen mode

Simulation

To realize a full simulation, I introduced a CANOpen_Node class which is a concrete Context class. Its instance named “Node_Lidar_2020” is defined in main function. Its possesses instances of all the 4 concrete State classes. I define also a series of msgs of transitions and their arrival time. Below is my simulation code.

Code of CANOpen_Node class:


class CANOpen_Node(Context):
    """ Simulated CANOpen 301 Node class """

    def __init__(self, ContextName):
        super().__init__(ContextName)

        self.addState(InitializationState("State_Initialization"))        
        self.addState(PreOperationalState("State_PreOperational"))        
        self.addState(OperationalState("State_Operational"))
        self.addState(StoppedState("State_Stopped"))
        self.__active = False
        self.__thread = threading.Thread(target=self.communication)
        self.__timer = 0


    def PowerOn(self):

        if  self.__thread.isAlive():            
            pass
        else:
            self.__active = True
            print("Power is ON!")
            self.setState("State_Initialization")
            print("Automatically entering State_Initialization!")
            self.__thread.start()


    def PowerOff(self):
        self.__active = False
        print("Calling Power Off!")

    def communication(self):
        while self.__active:
            print("[HeartBeat] Node {} is in {}".format(self.getContextName(), self.getState().getName()))
            time.sleep(1)
            self.__timer+=1
        print("Power is Off!")

Enter fullscreen mode Exit fullscreen mode

The other 3 concrete State classes:


@singleton           
class PreOperationalState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))

@singleton          
class OperationalState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))

@singleton          
class StoppedState(State):

    def __init__(self, name):
        super().__init__(name)

    def onEntryBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Start dispatching heartbeat msg of state - {}".format(nn, sn))
    def onExitBehavior(self, context):
        nn = context.getContextName()
        sn = self.getName()
        print("[{}] Stop dispatching heartbeat msg of state - {}".format(nn, sn))

Enter fullscreen mode Exit fullscreen mode

Now launch the simulation:

## State transition msgs msg12 = {"from": "State_Initialization", "to": "State_PreOperational"}
msg21 = {"from": "State_PreOperational", "to": "State_Initialization"}
msg23 = {"from": "State_PreOperational", "to": "State_Operational"}
msg32 = {"from": "State_Operational", "to": "State_PreOperational"}
msg34 = {"from": "State_Operational", "to": "State_Stopped"}
msg43 = {"from": "State_Stopped", "to": "State_Operational"}
msg42 = {"from": "State_Stopped", "to": "State_PreOperational"}
msg24 = {"from": "State_PreOperational", "to": "State_Stopped"}

## simulate a ring buffer for msgs msgQ = [(msg12, 6), (msg23, 7), (msg32, 22), (msg34, 25),(msg24, 28)]

if __name__ == "__main__":

    Simulated_Node = CANOpen_Node("Node_Lidar_2020")
    step = 0
    buffer_head = 0
    while (True):

        if step == 3:
            Simulated_Node.PowerOn()
        if step == 30:
            Simulated_Node.PowerOff()
            break

        if step == msgQ[buffer_head][1]:
            Simulated_Node.doTransition(msgQ[buffer_head][0])
            buffer_head+=1
            if buffer_head ==  len(msgQ):
                buffer_head = 0


        time.sleep(1)
        step+=1
        print("========= step {} =========".format(step))

Enter fullscreen mode Exit fullscreen mode

Execution output:

As one can see:

  • at step #3, node is powered on and it enters autonomously “Initialization” state. It starts sending HeartBeat signal of its current state.
  • at step #6, it makes transition from State_Initialization to State_PreOperational
  • at step #7, it makes transition from State_PreOperational to State_Operational
  • at step #22, it makes transition from State_Operational to State_PreOperational
  • at step #25, it receives msg for transition from State_Operational to State_Stopped, since its current state is not consistent with this transition, it does not transit its state
  • at step #28, it makes transition from State_PreOperational to State_Stopped
  • at step #30, node is powered off

原文链接:Design Pattern in Python (3): State Pattern

© 版权声明
THE END
喜欢就支持一下吧
点赞5 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容