Skip to content

Commit

Permalink
i2c_packet: new decoder for forming packets from i2c data
Browse files Browse the repository at this point in the history
i2c_packet protocol decoder stacks with i2c PD and allows to combine several data bytes from "START" i2c condition until "STOP" or "START REPEAT" and puts this data in a dedicated annotation.

Possible data formats for data output are hex, ascii, dec, bin and oct.

This decoder also allows logging data packets with the print_sec option enabled.
  • Loading branch information
sespivak committed Aug 7, 2022
1 parent e556e11 commit a4234c3
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
28 changes: 28 additions & 0 deletions decoders/i2c_packet/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2022 Sergey Spivak <[email protected]>
##
## Permission is hereby granted, free of charge, to any person obtaining a copy
## of this software and associated documentation files (the "Software"), to deal
## in the Software without restriction, including without limitation the rights
## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
## copies of the Software, and to permit persons to whom the Software is
## furnished to do so, subject to the following conditions:
##
## The above copyright notice and this permission notice shall be included in all
## copies or substantial portions of the Software.
##
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
## SOFTWARE.

'''
Make data packets from I²C decoder data.
'''

from .pd import Decoder
195 changes: 195 additions & 0 deletions decoders/i2c_packet/pd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2022 Sergey Spivak <[email protected]>
##
## Permission is hereby granted, free of charge, to any person obtaining a copy
## of this software and associated documentation files (the "Software"), to deal
## in the Software without restriction, including without limitation the rights
## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
## copies of the Software, and to permit persons to whom the Software is
## furnished to do so, subject to the following conditions:
##
## The above copyright notice and this permission notice shall be included in all
## copies or substantial portions of the Software.
##
## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
## SOFTWARE.

from collections import deque
import sigrokdecode as srd

'''
OUTPUT_PYTHON format:
Packet:
[<ptype>, <pdata>]
<ptype>:
- 'PACKET READ' (ADDRESS READ followed by one or several
DATA bytes until STOP or START REPEAT)
- 'PACKET WRITE' (ADDRESS WRITE followed by one or several
DATA bytes until STOP or START REPEAT)
<pdata> is the tuple with slave address byte and tuple with data bytes.
Slave addresses do not include bit 0 (the READ/WRITE indication bit).
'''

class Ann:
PACKET_READ, \
PACKET_WRITE, \
= range(2)

RX = 0
TX = 1

class Decoder(srd.Decoder):
api_version = 3
id = 'i2c_packet'
name = 'I²C packet'
longname = 'I²C packet builder'
desc = 'Combine I²C data to packets'
license = 'mit'
inputs = ['i2c']
outputs = []
tags = ['Embedded/industrial']
options = (
{'id': 'format', 'desc': 'Data format', 'default': 'hex',
'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},
{'id': 'print_sec', 'desc': 'Print start time (sec)', 'default': 'no', 'values': ('yes', 'no')},
)
annotations = (
('packet-read', 'READ'),
('packet-write', 'WRITE'),
)
annotation_rows = (
('packet-read', 'RD', (Ann.PACKET_READ, )),
('packet-write', 'WR', (Ann.PACKET_WRITE, )),
)

def __init__(self):
self.out_py = None
self.out_ann = None
self.packet_data = deque()
self.packet_ss = 0
self.packet_es = 0
self.read_sign = False
self.address = 0
self.print_sec = False
self.fmt = None
self.samplerate = 0
self.sampletime = 0
self.reset()

def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
if self.samplerate:
self.sampletime = 1.0 / self.samplerate

def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.out_py = self.register(srd.OUTPUT_PYTHON)

format_name = self.options['format']
if format_name == 'hex':
self.fmt = '{:02X}'
elif format_name == 'dec':
self.fmt = '{:d}'
elif format_name == 'bin':
self.fmt = '{:08b}'
elif format_name == 'oct':
self.fmt = '{:03o}'
else:
self.fmt = None

self.print_sec = self.options['print_sec'] == 'yes'
self.packet_data = deque()

def reset(self):
self.packet_data.clear()
self.packet_ss = 0
self.packet_es = 0
self.address = 0

def putg(self, ss, es, data):
"""Put a graphical annotation."""
self.put(ss, es, self.out_ann, data)

def putp(self, ss, es, data):
"""Put a python annotation."""
self.put(ss, es, self.out_py, data)

def format_data_value(self, v):
# Assume "is printable" for values from 32 to including 126,
# below 32 is "control" and thus not printable, above 127 is
# "not ASCII" in its strict sense, 127 (DEL) is not printable,
# fall back to hex representation for non-printables.
if self.fmt is None:
if 32 <= v <= 126:
return chr(v)
return "[{:02X}]".format(v)
else:
return self.fmt.format(v)

def data_array_to_str(self, data_array):
if self.fmt:
str_array = [self.fmt.format(value) for value in data_array]
return ' '.join(str_array)
else:
str_array = [self.format_data_value(value) for value in data_array]
return ''.join(str_array)

def handle_packet(self):
if len(self.packet_data):
packet_str = self.data_array_to_str(self.packet_data)
if self.read_sign:
ann = Ann.PACKET_READ
else:
ann = Ann.PACKET_WRITE

packet_str = "ADDR 0x{:02X} {:}: ".format(
self.address,
'RD' if self.read_sign else 'WR',
) + packet_str

# TODO: This option should be in the sigrok-cli to avoid boilerplate code in each PD
if self.print_sec and self.sampletime:
packet_str = "{:8.3f} ".format(self.packet_ss * self.sampletime,) + packet_str

self.putg(self.packet_ss, self.packet_es, [ann,
[
# Full version of packet annotation
packet_str,
# Short version without timestamp and "ADDR 0x" prefix
packet_str[7+9:] if self.print_sec else packet_str[7:],
]])
ptype = 'PACKET READ' if self.read_sign else 'PACKET WRITE'
self.putp(self.packet_ss, self.packet_es, (ptype, (self.address, tuple(self.packet_data))))
self.reset()

def decode(self, ss, es, data):
ptype = data[0]
if 'DATA' in ptype:
self.packet_data.append(data[1])
self.packet_es = es
elif ptype.startswith('START'):
if 'REPEAT' in ptype:
self.packet_es = es
# If something goes wrong, there is still data in the receive buffer self.packet_data
self.handle_packet()
self.packet_ss = ss
elif 'ADDRESS' in ptype:
self.address = data[1]
self.read_sign = 'READ' in ptype
self.packet_es = es
elif ptype.endswith('ACK'):
self.packet_es = es
elif ptype == 'STOP':
self.packet_es = es
self.handle_packet()

0 comments on commit a4234c3

Please sign in to comment.