Source code for mqtt_codec.io

"""
========================
`mqtt_codec.io` Package
========================

A collection of helper functions and classes used to read MQTT control
packets.
"""

import codecs
from struct import Struct


[docs]def encode_bytes(src_buf, dst_file): """Encode a buffer length followed by the bytes of the buffer itself. Parameters ---------- src_buf: bytes Source bytes to be encoded. Function asserts that 0 <= len(src_buf) <= 2**16-1. dst_file: file File-like object with write method. Returns ------- int Number of bytes written to `dst_file`. """ if not isinstance(src_buf, bytes): raise TypeError('src_buf must by bytes.') len_src_buf = len(src_buf) assert 0 <= len_src_buf <= 2**16-1 num_written_bytes = len_src_buf + 2 len_buf = FIELD_U16.pack(len_src_buf) dst_file.write(len_buf) dst_file.write(src_buf) return num_written_bytes
[docs]def decode_bytes(f): """Decode a buffer length from a 2-byte unsigned int then read the subsequent bytes. Parameters ---------- f: file File-like object with read method. Raises ------ UnderflowDecodeError When the end of stream is encountered before the end of the encoded bytes. Returns ------- int Number of bytes read from `f`. bytes Value bytes decoded from `f`. """ buf = f.read(FIELD_U16.size) if len(buf) < FIELD_U16.size: raise UnderflowDecodeError() (num_bytes,) = FIELD_U16.unpack_from(buf) num_bytes_consumed = FIELD_U16.size + num_bytes buf = f.read(num_bytes) if len(buf) < num_bytes: raise UnderflowDecodeError() return num_bytes_consumed, buf
[docs]def encode_utf8(s, f): """UTF-8 encodes string `s` to file-like object `f` according to the MQTT Version 3.1.1 specification in section 1.5.3. The maximum length for the encoded string is 2**16-1 (65535) bytes. An assertion error will result if the encoded string is longer. Parameters ---------- s: str String to be encoded. f: file File-like object. Returns ------- int Number of bytes written to f. """ encode = codecs.getencoder('utf8') encoded_str_bytes, num_encoded_chars = encode(s) num_encoded_str_bytes = len(encoded_str_bytes) assert 0 <= num_encoded_str_bytes <= 2**16-1 num_encoded_bytes = num_encoded_str_bytes + 2 f.write(FIELD_U8.pack((num_encoded_str_bytes & 0xff00) >> 8)) f.write(FIELD_U8.pack(num_encoded_str_bytes & 0x00ff)) f.write(encoded_str_bytes) return num_encoded_bytes
[docs]def decode_utf8(f): """Decode a utf-8 string encoded as described in MQTT Version 3.1.1 section 1.5.3 line 177. This is a 16-bit unsigned length followed by a utf-8 encoded string. Parameters ---------- f: file File-like object with read method. Raises ------ UnderflowDecodeError Raised when a read failed to extract enough bytes from the underlying stream to decode the string. Utf8DecodeError When any code point in the utf-8 string is invalid. Returns ------- int Number of bytes consumed. str A string utf-8 decoded from ``f``. """ decode = codecs.getdecoder('utf8') buf = f.read(FIELD_U16.size) if len(buf) < FIELD_U16.size: raise UnderflowDecodeError() (num_utf8_bytes,) = FIELD_U16.unpack_from(buf) num_bytes_consumed = FIELD_U16.size + num_utf8_bytes buf = f.read(num_utf8_bytes) if len(buf) < num_utf8_bytes: raise UnderflowDecodeError() try: s, num_chars = decode(buf, 'strict') except UnicodeError as e: raise Utf8DecodeError(e) return num_bytes_consumed, s
[docs]def encode_varint(v, f): """Encode integer `v` to file `f`. Parameters ---------- v: int Integer v >= 0. f: file Object containing a write method. Returns ------- int Number of bytes written. """ assert v >= 0 num_bytes = 0 while True: b = v % 0x80 v = v // 0x80 if v > 0: b = b | 0x80 f.write(FIELD_U8.pack(b)) num_bytes += 1 if v == 0: break return num_bytes
[docs]def decode_varint(f, max_bytes=4): """Decode variable integer using algorithm similar to that described in MQTT Version 3.1.1 line 297. Parameters ---------- f: file Object with a read method. max_bytes: int or None If a varint cannot be constructed using `max_bytes` or fewer from f then raises a `DecodeError`. If None then there is no maximum number of bytes. Raises ------- DecodeError When length is greater than max_bytes. UnderflowDecodeError When file ends before enough bytes can be read to construct the varint. Returns ------- int Number of bytes consumed. int Value extracted from `f`. """ num_bytes_consumed = 0 value = 0 m = 1 while True: buf = f.read(1) if len(buf) == 0: raise UnderflowDecodeError() (u8,) = FIELD_U8.unpack(buf) value += (u8 & 0x7f) * m m *= 0x80 num_bytes_consumed += 1 if u8 & 0x80 == 0: # No further bytes break elif max_bytes is not None and num_bytes_consumed >= max_bytes: raise DecodeError('Variable integer contained more than maximum bytes ({}).'.format(max_bytes)) return num_bytes_consumed, value
[docs]class DecodeError(Exception): pass
[docs]class UnderflowDecodeError(DecodeError): pass
[docs]class Utf8DecodeError(DecodeError): def __init__(self, e): assert isinstance(e, UnicodeError) self.error = e
[docs]class EncodeError(Exception): pass
[docs]class OverflowEncodeError(EncodeError): pass
[docs]class OversizePacketEncodeError(EncodeError): """Raised when the parameters used to create the MQTT packet would result in an impossibly large packet.""" pass
[docs]class FileDecoder(object): """Creates an object that extracts values from the file-like object `f`. Parameters ---------- f: file Object with read method. """ def __init__(self, f): self.__f = f self.__num_bytes_consumed = 0 @property def num_bytes_consumed(self): """int: number of bytes consumed from underlying stream.""" return self.__num_bytes_consumed
[docs] def unpack(self, struct): """Read as many bytes as are required to extract struct then unpack and return a tuple of the values. Raises ------ UnderflowDecodeError Raised when a read failed to extract enough bytes from the underlying stream to extract the bytes. Parameters ---------- struct: struct.Struct Returns ------- tuple Tuple of extracted values. """ v = struct.unpack(self.read(struct.size)) return v
[docs] def unpack_utf8(self): """Decode a utf-8 string encoded as described in MQTT Version 3.1.1 section 1.5.3 line 177. This is a 16-bit unsigned length followed by a utf-8 encoded string. Raises ------ UnderflowDecodeError Raised when a read failed to extract enough bytes from the underlying stream to decode the string. DecodeError When any code point in the utf-8 string is invalid. Returns ------- int Number of bytes consumed. str A string utf-8 decoded from the underlying stream. """ num_bytes_consumed, s = decode_utf8(self.__f) self.__num_bytes_consumed += num_bytes_consumed return num_bytes_consumed, s
[docs] def unpack_bytes(self): """Unpack a utf-8 string encoded as described in MQTT Version 3.1.1 section 1.5.3 line 177. This is a 16-bit unsigned length followed by a utf-8 encoded string. Returns ------- int Number of bytes consumed bytes A bytes object extracted from the underlying stream. """ num_bytes_consumed, b = decode_bytes(self.__f) self.__num_bytes_consumed += num_bytes_consumed return num_bytes_consumed, b
[docs] def unpack_varint(self, max_bytes): """Decode variable integer using algorithm similar to that described in MQTT Version 3.1.1 line 297. Parameters ---------- max_bytes: int or None If a varint cannot be constructed using `max_bytes` or fewer from f then raises a `DecodeError`. If None then there is no maximum number of bytes. Raises ------- DecodeError When length is greater than max_bytes. UnderflowDecodeError When file ends before enough bytes can be read to construct the varint. Returns ------- int Number of bytes consumed. int Value extracted from `f`. """ num_bytes_consumed, value = decode_varint(self.__f, max_bytes) self.__num_bytes_consumed += num_bytes_consumed return num_bytes_consumed, value
[docs] def read(self, num_bytes): """Read `num_bytes` and return them. Parameters ---------- num_bytes : int Number of bytes to extract from the underlying stream. Raises ------ UnderflowDecodeError Raised when a read failed to extract enough bytes from the underlying stream to extract the bytes. Returns ------- bytes A bytes object extracted from underlying stream. """ buf = self.__f.read(num_bytes) assert len(buf) <= num_bytes if len(buf) < num_bytes: raise UnderflowDecodeError() self.__num_bytes_consumed += num_bytes return buf
[docs]class LimitReader(object): """Reads up to ``limit`` bytes from the underlying file. If ``limit`` is none then reads to the end of the file. Parameters ----------- f: file File-like object with read method. limit: int optional Maximum number of bytes to read from the underlying file or ``None`` the reader should continue until the end of the file. """ def __init__(self, f, limit=None): self.__f = f self.__num_bytes_consumed = 0 self.__limit = limit @property def limit(self): """int or None: maximum number of bytes to read from underlying stream.""" return self.__limit
[docs] def read(self, max_bytes=1): """Read at most `max_bytes` from internal buffer. Parameters ----------- max_bytes: int Maximum number of bytes to read. Returns -------- bytes Bytes extracted from internal buffer. Length may be less than ``max_bytes``. On end-of file returns a bytes object with zero-length. """ if self.limit is None: b = self.__f.read(max_bytes) else: if self.__num_bytes_consumed + max_bytes > self.limit: max_bytes = self.limit - self.__num_bytes_consumed b = self.__f.read(max_bytes) self.__num_bytes_consumed += len(b) return b
[docs]class BytesReader(object): """Creates a file-like object that reads from a buffer. Parameters ---------- buf: bytes or bytearray Object to read from. """ def __init__(self, buf): assert isinstance(buf, (bytes, bytearray)), type(buf) self.__buf = buf self.__num_bytes_consumed = 0
[docs] def read(self, max_bytes=1): """Read at most `max_bytes` from internal buffer. Parameters ----------- max_bytes: int Maximum number of bytes to read. Raises ------ ValueError If read is called after close has been called. Returns -------- bytes Bytes extracted from internal buffer. Length may be less than `max_bytes`. On end-of file returns a bytes object with zero-length. """ if self.__num_bytes_consumed is None: raise ValueError('I/O operation on closed file.') if self.__num_bytes_consumed + max_bytes >= len(self.__buf): max_bytes = len(self.__buf) - self.__num_bytes_consumed b = self.__buf[self.__num_bytes_consumed:self.__num_bytes_consumed + max_bytes] self.__num_bytes_consumed += max_bytes if isinstance(b, bytearray): b = bytes(b) assert isinstance(b, bytes) return b
@property def closed(self): """bool: ``True`` if `self.close()` has been called; ``False`` otherwise.""" return self.__num_bytes_consumed is None
[docs] def close(self): """Read operations conducted after this method is called will raise `ValueError`. This makes the object behave like other read objects even though no resources are freed.""" self.__num_bytes_consumed = None
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False
FIELD_U16 = Struct('>H') FIELD_PACKET_ID = FIELD_U16 FIELD_U8 = Struct('>B')