Source code for M2Crypto.PGP.packet

from __future__ import absolute_import

"""M2Crypto PGP2.

This module implements PGP packets per RFC1991 and various source
distributions.

Each Packet type is represented by a class; Packet classes derive from
the abstract 'Packet' class.

The 'message digest' Packet type, mentioned but not documented in RFC1991,
is not implemented.

Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""

# XXX Work-in-progress. UNFINISHED, type hinting is probably wrong!!!

# Be liberal in what you accept.
# Be conservative in what you send.
# Be lazy in what you eval.

import struct

from io import BytesIO

from M2Crypto import util  # noqa
from M2Crypto.util import octx_to_num
from M2Crypto.PGP import constants  # noqa
if util.py27plus:
    from typing import AnyStr, IO, Optional, Tuple  # noqa

_OK_VERSION = ('\002', '\003')
_OK_VALIDITY = ('\000',)
_OK_PKC = ('\001',)


[docs]class XXXError(Exception): pass
[docs]class Packet: def __init__(self, ctb, body=None): # type: (int, Optional[str]) -> None import warnings warnings.warn( 'Deprecated. No maintainer for PGP. If you use this, ' + 'please inform M2Crypto maintainer.', DeprecationWarning) self.ctb = ctb if body is not None: self.body = BytesIO(body) # type: Optional[IO[str]] else: self.body = None
[docs] def validate(self): # type: () -> int return 1
[docs] def pack(self): # type: () -> None raise NotImplementedError('%s.pack(): abstract method' % (self.__class__,))
[docs] def version(self): # type: () -> Optional[int] if hasattr(self, '_version'): return ord(self._version) else: return None
[docs] def timestamp(self): # type: () -> Optional[int] if hasattr(self, '_timestamp'): return struct.unpack('>L', self._timestamp)[0] else: return None
[docs] def validity(self): # type: () -> Optional[int] if hasattr(self, '_validity'): return struct.unpack('>H', self._validity)[0] else: return None
[docs] def pkc(self): # type: () -> Optional[bytes] if hasattr(self, '_pkc'): return self._pkc else: return None
def _llf(self, lenf): # type: (int) -> Tuple[int, bytes] if lenf < 256: return 0, chr(lenf) elif lenf < 65536: return 1, struct.pack('>H', lenf) else: assert lenf < 2**32 return 2, struct.pack('>L', lenf) def _ctb(self, llf): # type: (int) -> int ctbv = _FACTORY[self.__class__] return chr((1 << 7) | (ctbv << 2) | llf)
[docs]class PublicKeyPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, Optional[IO[str]) -> None Packet.__init__(self, ctb, body) if self.body is not None: self._version = self.body.read(1) self._timestamp = self.body.read(4) self._validity = self.body.read(2) self._pkc = self.body.read(1) self._nlen = self.body.read(2) nlen = (struct.unpack('>H', self._nlen)[0] + 7) / 8 self._n = self.body.read(nlen) self._elen = self.body.read(2) elen = (struct.unpack('>H', self._elen)[0] + 7) / 8 self._e = self.body.read(elen)
[docs] def pack(self): # type: () -> str if self.body is None: self.body = BytesIO() self.body.write(self._version) self.body.write(self._timestamp) self.body.write(self._validity) self.body.write(self._pkc) self.body.write(self._nlen) self.body.write(self._n) self.body.write(self._elen) self.body.write(self._e) self.body = self.body.getvalue() llf, lenf = self._llf(len(self.body)) ctb = self._ctb(llf) return '%s%s%s' % (ctb, lenf, self.body)
[docs] def pubkey(self): # type: () -> bytes return self._pubkey.pub()
[docs]class TrustPacket(Packet): # noqa # This implementation neither interprets nor emits trust packets. def __init__(self, ctb, body=None): # type: (int, Optional[AnyStr]) -> None Packet.__init__(self, ctb, body) if body is not None: self.trust = self.body.read(1)
[docs]class UserIDPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, Optional[str]) -> None Packet.__init__(self, ctb, body) if body is not None: self._userid = body
[docs] def pack(self): # type: () -> int if self.body is None: self.body = '' self.body += chr(len(self._userid)) self.body += self._userid return self.ctb + self.body
[docs] def userid(self): # type: () -> int return self._userid
[docs]class CommentPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, Optional[int]) -> None Packet.__init__(self, ctb, body) if body is not None: self.comment = self.body.getvalue()
[docs] def pack(self): # type: () -> int if self.body is None: self.body = chr(len(self.comment)) self.body += self.comment return self.ctb + self.body
[docs]class SignaturePacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, Optional[IO[bytes]]) -> None Packet.__init__(self, ctb, body) if body is not None: self._version = self.body.read(1) self._len_md_stuff = self.body.read(1) self._classification = self.body.read(1) self._timestamp = self.body.read(4) self._keyid = self.body.read(8) self._pkc = self.body.read(1) self._md_algo = self.body.read(1) self._md_chksum = self.body.read(2) self._sig = self.body.read()
[docs] def pack(self): # type: () -> str if self.body is None: self.body = self._version self.body += self._len_md_stuff self.body += self._classification self.body += self._timestamp self.body += self._keyid self.body += self._pkc self.body += self._md_algo self.body += self._md_chksum self.body += self._sig llf, lenf = self._llf(len(self.body)) self.ctb = self.ctb | llf return '%s%s%s' % (self.ctb, lenf, self.body)
[docs] def validate(self): # type: () -> None # FIXME this looks broken ... returning None always? if self._version not in _OK_VERSION: return None if self._len_md_stuff != '\005': return None
[docs]class PrivateKeyPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, IO[bytes]) -> None Packet.__init__(self, ctb, body) if body is not None: self._version = self.body.read(1) self._timestamp = self.body.read(4) self._validity = self.body.read(2) self._pkc = self.body.read(1) self._nlen = self.body.read(2) nlen = (struct.unpack('>H', self._nlen)[0] + 7) / 8 self._n = self.body.read(nlen) self._elen = self.body.read(2) elen = (struct.unpack('>H', self._elen)[0] + 7) / 8 self._e = self.body.read(elen) self._cipher = self.body.read(1) if self._cipher == '\001': self._iv = self.body.read(8) else: self._iv = None for param in ['d', 'p', 'q', 'u']: _plen = self.body.read(2) setattr(self, '_' + param + 'len', _plen) plen = (struct.unpack('>H', _plen)[0] + 7) / 8 setattr(self, '_' + param, self.body.read(plen)) self._cksum = self.body.read(2)
[docs] def is_encrypted(self): # type: () -> int return ord(self._cipher)
[docs]class CKEPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, IO[bytes]) -> None Packet.__init__(self, ctb, body) if body is not None: self._iv = self.body.read(8) self._cksum = self.body.read(2) self._ctxt = self.body.read()
[docs]class PKEPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, IO[bytes]) -> None Packet.__init__(self, ctb, body) if body is not None: self._version = self.body.read(1) self._keyid = self.body.read(8) self._pkc = ord(self.body.read(1)) deklen = (struct.unpack('>H', self.body.read(2))[0] + 7) / 8 self._dek = octx_to_num(self.body.read(deklen))
[docs]class LiteralPacket(Packet): # noqa def __init__(self, ctb, body=None): # type: (int, IO[bytes]) -> None Packet.__init__(self, ctb, body) if body is not None: self.fmode = self.body.read(1) fnlen = self.body.read(1) self.fname = self.body.read(fnlen) self.ftime = self.body.read(4) # self.data = self.body.read()
[docs]class CompressedPacket(Packet): # noqa def __init__(self, ctb, stream): # type: (int, IO[bytes]) -> None Packet.__init__(self, ctb, '') if self.body is not None: self.algo = stream.read(1) # This reads the entire stream into memory. self.data = stream.read()
[docs] def validate(self): # type: () -> bool return self.algo == '\001'
[docs] def uncompress(self): # type: () -> IO[bytes] import zlib decomp = zlib.decompressobj(-13) # RFC 2440, pg 61. # This doubles the memory usage. stream = BytesIO(decomp.decompress(self.data)) return stream
_FACTORY = { 1: PKEPacket, 2: SignaturePacket, # 3 : message_digest_packet, # XXX not implemented 5: PrivateKeyPacket, 6: PublicKeyPacket, # 8 : CompressedPacket, # special case 9: CKEPacket, 11: LiteralPacket, 12: TrustPacket, 13: UserIDPacket, 14: CommentPacket, PKEPacket: 1, SignaturePacket: 2, # 3 : message_digest_packet, PrivateKeyPacket: 5, PublicKeyPacket: 6, # 8 : CompressedPacket, CKEPacket: 9, LiteralPacket: 11, TrustPacket: 12, UserIDPacket: 13, CommentPacket: 14 }
[docs]class PacketStream: # noqa def __init__(self, input): # type: (IO[bytes]) -> None self.stream = input self.under_current = None self._count = 0
[docs] def close(self): # type: () -> None self.stream.close() if self.under_current is not None: self.under_current.close()
[docs] def read(self, keep_trying=0): # type: (int) -> Packet while 1: ctb0 = self.stream.read(1) if not ctb0: return None ctb = ord(ctb0) if is_ctb(ctb): break elif keep_trying: continue else: raise XXXError ctbt = (ctb & 0x3c) >> 2 if ctbt == constants.CTB_COMPRESSED_DATA: self.under_current = self.stream cp = CompressedPacket(ctb0, self.stream) self.stream = cp.uncompress() return self.read() # Decode the length of following data. See RFC for details. llf = ctb & 3 if llf == 0: lenf = ord(self.stream.read(1)) elif llf == 1: lenf = struct.unpack('>H', self.stream.read(2))[0] elif llf == 2: lenf = struct.unpack('>L', self.stream.read(4))[0] else: # llf == 3 raise XXXError('impossible case') body = self.stream.read(lenf) if not body or (len(body) != lenf): raise XXXError('corrupted Packet') self._count = self.stream.tell() try: return _FACTORY[ctbt](ctb0, body) except KeyError: return Packet(ctb0, body)
[docs] def count(self): # type: () -> int return self._count
[docs]def is_ctb(ctb): # type: (int) -> bool return ctb & 0xc0
[docs]def make_ctb(value, llf): # type: (int, int) -> str return chr((1 << 7) | (value << 2) | llf)