在python中写个自定义数据包协议的打包和解包测试

 更新时间:2023年09月06日 09:55:03   作者:qq_278667286  
这篇文章主要介绍了在python中写个自定义数据包协议的打包和解包测试,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

写个自定义数据包协议的打包和解包测试

打包代码

'''
帧数据结构创建
'''
import struct
class FrameData():
    def __init__(self,pixStyle=0x14,width=1,height=1):
        self.pixStyle = pixStyle
        self.frameWidth = width
        self.frameHeight = height
        self.pkgLen =self.frameWidth *self.frameHeight *(pixStyle & 0x0f)+6
        self.buf = [0 for i in range(self.pkgLen)]
        #数据头尾定义
        self.buf[0]=0xff
        self.buf[1] = self.pixStyle
        self.buf[2] = self.frameWidth
        #补丁
        if(width>255):
            self.buf[1]=0x40+(pixStyle & 0x0f)
            self.buf[2] = 128
        self.buf[3] = self.frameHeight
        self.buf[self.pkgLen - 2] = (self.pixStyle+self.frameWidth+self.frameHeight)&0xff
        self.buf[self.pkgLen - 1]=0xfe
    def setCrc(self):
        r=0
        for i in range(1, self.pkgLen-2):
            r+=self.buf[i]
        self.buf[self.pkgLen - 2] = r& 0xff
    def setDataToArray(self,arr):
        alen= len(arr)
        dlen= self.pkgLen - 6
        rlen=alen if(alen<dlen) else dlen
        for i in range(4, rlen+4):
            #print(i,arr[i-4])
            self.buf[i]=arr[i-4]
        self.setCrc()
    def setDataToOff(self):
        for i in range(4, self.pkgLen - 2):
            self.buf[i]=0
        self.buf[self.pkgLen - 2]=(self.pixStyle + self.frameWidth + self.frameHeight) & 0xff
        #self.setCrc()
    def setDataToOn(self):
        for i in range(4, self.pkgLen - 2):
            self.buf[i]=255
        self.setCrc()
    def setDataToRGBW(self,r=0,g=0,b=0,w=0):
        sty=self.pixStyle & 0x0f
        if (sty == 1):
            for i in range(0, int((self.pkgLen - 6) / sty)):
                self.buf[i * sty + 4] = r
        if (sty == 2):
            for i in range(0, int((self.pkgLen - 6) / sty)):
                self.buf[i * sty + 4] = r
                self.buf[i * sty + 5] = g
        if(sty==3):
            for i in range(0, int((self.pkgLen - 6)/sty)):
                self.buf[i * sty + 4] = r
                self.buf[i * sty + 5] = g
                self.buf[i * sty + 6] = b
        if (sty == 4):
            for i in range(0, int((self.pkgLen - 6) / sty)):
                self.buf[i * sty + 4] = r
                self.buf[i * sty + 5] = g
                self.buf[i * sty + 6] = b
                self.buf[i * sty + 7] = w
        self.setCrc()
    def packBytes(self):
        packstyle = str( self.pkgLen) + 'B'  # B 0-255     b 0-127
        req = struct.pack(packstyle, *self.buf)
        return req
if __name__ == '__main__':
    d=FrameData(0x14,10,2)
    print("init",d.buf)
    d.setDataToOn()
    print("on",d.buf)
    d.setDataToRGBW(255)
    print("r",d.buf)
    d.setDataToRGBW(0,255)
    print("g",d.buf)
    d.setDataToRGBW(0,0,255)
    print("b",d.buf)
    d.setDataToRGBW(0,0,0,255)
    print("w",d.buf)
    d.setDataToRGBW(255,255, 255, 255)
    print("rgbw", d.buf)
    d.setDataToArray([0,0,0])
    print("000", d.buf)
    import time
    while True:
        time.sleep(1)

输出

init [255, 20, 10, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 254]
on [255, 20, 10, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 208, 254]
r [255, 20, 10, 2, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 12, 254]
g [255, 20, 10, 2, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 12, 254]
b [255, 20, 10, 2, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 12, 254]
w [255, 20, 10, 2, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 0, 0, 0, 255, 12, 254]
rgbw [255, 20, 10, 2, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 208, 254]
000 [255, 20, 10, 2, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 211, 254]

解包代码

'''
帧数据结构解析
'''
from event import *
class FramePkg(EventDispatcher):
    def __init__(self,pixStyle=0x14,width=1,height=1):
        EventDispatcher.__init__(self)
        self.pixStyle = pixStyle
        self.frameWidth = width
        self.frameHeight = height
        self.pkgLen =self.frameWidth *self.frameHeight *(pixStyle & 0x0f)+6
        self.bufLen = self.pkgLen * 3
        self.buf = [0 for i in range(self.bufLen)]
        self.dataLen = self.pkgLen - 6
        self.linedata = [0 for i in range(self.dataLen)]
        self.startIdx = 0
        self.endIdx = 0
    #获取数据长度 int
    def getDatLen(self):
        if(self.endIdx>=self.startIdx):
            return self.endIdx-self.startIdx
        return self.bufLen+self.endIdx-self.startIdx
    #判断数据长度已经到达一个包长bool
    def pkgIsOk(self):
        return self.getDatLen()>=self.pkgLen
    #检测包头为0xff并且有一个包长的数据bool
    def chkStart(self):
        while(self.buf[self.startIdx]!=0xff and self.pkgIsOk()):
            self.nextByte()
        return self.pkgIsOk()
    #检测索引超出buf末尾,从头开始
    def chkMaxIdx(self,idx):
        if (idx > self.bufLen - 1):
            idx %= self.bufLen  # ??
        return idx
    #获取包尾索引int
    def getPkgEndIdx(self):
        return self.chkMaxIdx(self.startIdx+self.pkgLen-1)
    #检查包尾为0xfe bool
    def chkEnd(self):
        return  self.buf[self.getPkgEndIdx()]==0xfe
    #相对起始索引包数据的每个字节索引的映射int
    def getIdxFromStart(self,i):
        return self.chkMaxIdx(self.startIdx+i)
    #检测校验和bool
    def chkCrc(self):
        crcidx=self.getPkgEndIdx()-1#包尾之前的一个字节
        if(crcidx<0):
            crcidx= self.bufLen+crcidx#包尾索引在缓冲区前面,包头在后面
        d= self.buf[crcidx]
        #print(d)
        r=0
        #计算和
        for i in range(1, self.pkgLen-2):
            #print(self.getIdxFromStart(i),":",self.buf[self.getIdxFromStart(i)])
            r+=self.buf[self.getIdxFromStart(i)]
        r=r&0xff
        #print(r)
        return d==r
    #写数据到显示区
    def writeData(self):
        r=0
        for i in range(4,self.pkgLen-2):
            r=self.buf[self.getIdxFromStart(i)]
            #print(r)
            self.linedata[i-4]=r
    def nextPkg(self):
        self.startIdx = self.chkMaxIdx(self.getPkgEndIdx()+1)
    def nextByte(self):
        self.startIdx=self.chkMaxIdx(self.startIdx+1)
    #解析包
    def parsePKG(self):
        while(self.pkgIsOk()):
            if(self.chkStart() and self.chkEnd() and self.chkCrc()):
                self.writeData()
                self.dispatch_event(PKGEvent(PKGEvent.PKG_DATA_OK,self.linedata ))
                self.nextPkg()
            else:
                self.nextByte()
    def writeToBuf(self,byt):
        self.buf[self.endIdx]=byt
        self.endIdx=self.chkMaxIdx(self.endIdx+1)
    def writeArrayToBuf(self,arr):
        for i in range(len(arr)):
            self.writeToBuf(arr[i])
        self.parsePKG()
    #'''
if __name__ == '__main__':
    import time
    buf=[0 for i in range(30)]
    buf[0]=0xff
    buf[1]=0x14
    buf[2]=0x01
    buf[3]=0x01
    buf[4]=0xff
    buf[5]=0xff
    buf[6]=0xff
    buf[7]=0xff
    buf[8]=0x11
    buf[9]=0xfe
    buf[10]=0xff
    buf[11]=0x14
    buf[12]=0x01
    buf[13]=0x01
    buf[14]=0xff
    buf[15]=0xff
    buf[16]=0xff
    buf[17]=0xff
    buf[18]=0x12
    buf[19]=0xfe
    buf[22] = 0xff
    buf[23] = 0x14
    buf[24] = 0x01
    buf[25] = 0x01
    buf[26] = 0xff
    buf[27] = 0xff
    buf[28] = 0xff
    buf[29] = 0xff
    buf[0] = 0x12
    buf[1] = 0xfe
    #startIdx=5
    #endIdx=2
    #dataLen=pkgLen-6
    #linedata=[0 for i in range(dataLen)]
    def on_data_ok(e):
        print("event ",e.data)
    pkg= FramePkg()
    pkg.add_event_listener(PKGEvent.PKG_DATA_OK,on_data_ok)
    c=0
    while True:
        #print(1)
        #print(pkg.buf)
        #print(pkg.linedata )
        '''print("getDatLen",pkg.getDatLen())
        print("pkgIsOk",pkg.pkgIsOk())
        print("chkStart",pkg.chkStart())
        print("chkEnd",pkg.chkEnd())
        print("chkCrc",pkg.chkCrc())
        print("getIdxFromStart",pkg.getIdxFromStart(30))
        pkg.writeData ()
        print("getPkgEndIdx-------------",pkg.getPkgEndIdx())
        pkg.nextPkg() '''
        #print("startIdx",pkg.startIdx)
        pkg.parsePKG()
        if (c < 30):
            pkg.writeToBuf(buf[c])
        else:
            pkg.endIdx=pkg.chkMaxIdx(pkg.endIdx+1)
        c += 1
        if(c%100==0):
            print(c)
        time.sleep(0.01)

参考前辈的event类

 
class Event(object):
    '''
      事件初始化的一个方式
    '''
    def __init__(self,event_type,data=None):
        self._type = event_type
        self._data = data
    @property
    def type(self):
        return self._type
    @property
    def data(self):
        return self._data
class EventDispatcher(object):
     """
    event分发类 监听和分发event事件
    """
     def __init__(self):
         #初始化事件
         self._events = dict()
     def __del__(self):
         self._events = None
     def has_listener(self,event_type,listener):
        if event_type in self._events.keys():
            return listener in self._events[event_type]
        else:
            return False
     def dispatch_event(self,event):
          """
        Dispatch an instance of Event class
        """
        # 分发event到所有关联的listener
          if event.type in self._events.keys():
              listeners = self._events[event.type]
              for listener in listeners:
                  listener(event)
     def add_event_listener(self,event_type,listener):
         #给某种事件类型添加listner
         if not self.has_listener(event_type,listener):
             listeners = self._events.get(event_type,[])
             listeners.append(listener)
             self._events[event_type] = listeners
     def remove_event_listener(self,event_type,listener):
         if self.has_listener(event_type,listener):
             listeners = self._events[event_type]
             if len(listeners) == 1:
                 del self._events[event_type]
             else:
                 listeners.remove(listener)
                 self._events[event_type] = listeners
class PKGEvent(Event):
    PKG_DATA_OK = "PKG_DATA_OK"
if __name__ == '__main__':
    class Test(EventDispatcher):
        def __init__(self, a=1, b=1):
            EventDispatcher.__init__(self)
            self.a = a
            self.b = b
        def testDispEvent(self):
            self.dispatch_event(PKGEvent(PKGEvent.PKG_DATA_OK, 124))
    t=Test()
    def on_ok(e):
        print(e.data)
    t.add_event_listener(PKGEvent.PKG_DATA_OK,on_ok)
    import time
    while True:
        print(1)
        t.testDispEvent()
        time.sleep(1)

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Python实现交通数据可视化的示例代码

    Python实现交通数据可视化的示例代码

    本文主要分享了Python交通数据分析与可视化的实战!其中主要是使用TransBigData库快速高效地处理、分析、挖掘出租车GPS数据,感兴趣的可以了解一下
    2023-04-04
  • PyCharm Ctrl+Shift+F 失灵的简单有效解决操作

    PyCharm Ctrl+Shift+F 失灵的简单有效解决操作

    这篇文章主要介绍了PyCharm Ctrl+Shift+F 失灵的简单有效解决操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-01-01
  • Python图像处理之Hough变换检测直线

    Python图像处理之Hough变换检测直线

    霍夫变换是一种特征检测(feature extraction),被广泛应用在图像分析,本文将利用Hough变换实现直线检测,感兴趣的小伙伴可以了解一下
    2023-07-07
  • Python数学建模学习模拟退火算法旅行商问题示例解析

    Python数学建模学习模拟退火算法旅行商问题示例解析

    模拟退火算法不仅可以解决连续函数优化问题,KIRKPATRICK在1983年成功将其应用于求解组合优化问题,现已成为求解旅行商问题的常用方法,通常采用反序、移位和交换等操作算子产生新解
    2021-10-10
  • DataFrame.groupby()所见的各种用法详解

    DataFrame.groupby()所见的各种用法详解

    这篇文章主要介绍了DataFrame.groupby()所见的各种用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • Python标准库之typing的用法(类型标注)

    Python标准库之typing的用法(类型标注)

    这篇文章主要介绍了Python标准库之typing的用法(类型标注),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • Python实现的多线程http压力测试代码

    Python实现的多线程http压力测试代码

    这篇文章主要介绍了Python实现的多线程http压力测试代码,结合实例形式分析了Python多线程操作的相关实现技巧,需要的朋友可以参考下
    2017-02-02
  • TensorFlow平台下Python实现神经网络

    TensorFlow平台下Python实现神经网络

    这篇文章主要为大家详细介绍了TensorFlow平台下Python实现神经网络,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-03-03
  • Python中Django的ORM高级用法

    Python中Django的ORM高级用法

    这篇文章主要介绍了Python中Django的ORM高级用法,ORM是一种思想,ORM对象-关系映射,关系数据库是企业级应用环境中永久存放数据的主流数据存储系统,需要的朋友可以参考下
    2023-07-07
  • python 标准库原理与用法详解之os.path篇

    python 标准库原理与用法详解之os.path篇

    os.path模块主要用于文件的属性获取,在编程中经常用到,本文将带你熟悉这个模块并掌握它的用法,感兴趣的朋友跟小编来看看吧
    2021-10-10

最新评论