diff --git a/brother_ql/reader.py b/brother_ql/reader.py index 72154cb..5b66820 100755 --- a/brother_ql/reader.py +++ b/brother_ql/reader.py @@ -11,7 +11,7 @@ from matplotlib import pyplot as plt logger = logging.getLogger(__name__) -cmds = { +OPCODES = { # signature name following bytes description b'\x00': ("preamble", -1, "Preamble, 200-300x 0x00 to clear comamnd buffer"), b'\x4D': ("compression", 1, ""), @@ -27,12 +27,83 @@ cmds = { b'\x1b\x69\x64': ("margins", 2, ""), b'\x1b\x69\x55\x77\x01': ('amedia', 127, "Additional media information command"), b'\x1b\x69\x55\x4A': ('jobid', 14, "Job ID setting command"), + b'\x1b\x69\x53': ('status request', 0, "A status information request sent to the printer"), + b'\x80\x20\x42': ('status response',29, "A status response received from the printer"), } dot_widths = { 62: 90*8, } +RESP_ERROR_INFORMATION_1_DEF = { + 0: 'No media when printing', + 1: 'End of media (die-cut size only)', + 2: 'Tape cutter jam', + 3: 'Not used', + 4: 'Main unit in use (QL-560/650TD/1050)', + 5: 'Printer turned off', + 6: 'High-voltage adapter (not used)', + 7: 'Fan doesn’t work (QL-1050/1060N)', +} + +RESP_ERROR_INFORMATION_2_DEF = { + 0: 'Replace media error', + 1: 'Expansion buffer full error', + 2: 'Transmission / Communication error', + 3: 'Communication buffer full error (not used)', + 4: 'Cover opened while printing (Except QL-500)', + 5: 'Cancel key (not used)', + 6: 'Media cannot be fed (also when the media end is detected)', + 7: 'System error', +} + +RESP_MEDIA_TYPES = { + 0x00: 'No media', + 0x0A: 'Continuous length tape', + 0x0B: 'Die-cut labels', +} + +RESP_STATUS_TYPES = { + 0x00: 'Reply to status request', + 0x01: 'Printing completed', + 0x02: 'Error occurred', + 0x05: 'Notification', + 0x06: 'Phase change', +} + +RESP_PHASE_TYPES = { + 0x00: 'Waiting to receive', + 0x01: 'Printing state', +} + +RESP_BYTE_NAMES = [ + 'Print head mark', + 'Size', + 'Fixed (B=0x42)', + 'Device dependent', + 'Device dependent', + 'Fixed (0=0x30)', + 'Fixed (0x00 or 0=0x30)', + 'Fixed (0x00)', + 'Error information 1', + 'Error information 2', + 'Media width', + 'Media type', + 'Fixed (0x00)', + 'Fixed (0x00)', + 'Reserved', + 'Mode', + 'Fixed (0x00)', + 'Media length', + 'Status type', + 'Phase type', + 'Phase number (high)', + 'Phase number (low)', + 'Notification number', + 'Reserved', + 'Reserved', +] + def hex_format(data): try: # Py3 return ' '.join('{:02X}'.format(byte) for byte in data) @@ -41,39 +112,114 @@ def hex_format(data): def chunker(data, raise_exception=False): """ - Breaks data stream (bytes) into a list of bytes containing single instructions each. + Breaks data stream (bytes) into a list of bytes objects containing single instructions each. - Logs warnings for unknown commands. - If raise_exception is set to True it raise exceptions instead. + Logs warnings for unknown opcodes or raises an exception instead, if raise_exception is set to True. - returns: list of bytes + returns: list of bytes objects """ instructions = [] while True: if len(data) == 0: break - cmd_found = False - for command in cmds.keys(): - if data.startswith(command): - cmd = cmds[command] - num_bytes = len(command) - if cmd[1] > 0: num_bytes += cmd[1] - if cmd[0] == 'raster': - num_bytes += data[2] + 2 - #payload = data[len(command):num_bytes] - instructions.append(data[:num_bytes]) - data = data[num_bytes:] - cmd_found = True - if cmd_found: - continue - else: - msg = 'unknown instruction starting with {}...)'.format(hex_format(data[0:4])) + try: + opcode = match_opcode(data) + except: + msg = 'unknown opcode starting with {}...)'.format(hex_format(data[0:4])) if raise_exception: raise ValueError(msg) else: logger.warning(msg) data = data[1:] + opcode_def = OPCODES[opcode] + num_bytes = len(opcode) + if opcode_def[1] > 0: num_bytes += opcode_def[1] + if opcode_def[0] == 'raster': + num_bytes += data[2] + 2 + #payload = data[len(opcode):num_bytes] + instructions.append(data[:num_bytes]) + data = data[num_bytes:] return instructions +def match_opcode(data): + matching_opcodes = [opcode for opcode in OPCODES.keys() if data.startswith(opcode)] + assert len(matching_opcodes) == 1 + return matching_opcodes[0] + +def interpret_response(data): + assert len(data) >= 32 + assert data.startswith(b'\x80\x20\x42') + for i, byte_name in enumerate(RESP_BYTE_NAMES): + logger.debug('Byte %2d %24s %02X', i, byte_name+':', data[i]) + errors = [] + error_info_1 = data[8] + error_info_2 = data[9] + for error_bit in RESP_ERROR_INFORMATION_1_DEF: + if error_info_1 & (1 << error_bit): + logging.error('Error: ' + RESP_ERROR_INFORMATION_1_DEF[error_bit]) + errors.append(RESP_ERROR_INFORMATION_1_DEF[error_bit]) + for error_bit in RESP_ERROR_INFORMATION_2_DEF: + if error_info_2 & (1 << error_bit): + logging.error('Error: ' + RESP_ERROR_INFORMATION_2_DEF[error_bit]) + errors.append(RESP_ERROR_INFORMATION_2_DEF[error_bit]) + + media_width = data[10] + media_length = data[17] + + media_type = data[11] + if media_type in RESP_MEDIA_TYPES: + media_type = RESP_MEDIA_TYPES[media_type] + logger.debug("Media type: %s", media_type) + else: + logger.error("Unknown media type %02X", media_type) + + status_type = data[18] + if status_type in RESP_STATUS_TYPES: + status_type = RESP_STATUS_TYPES[status_type] + logger.debug("Status type: %s", status_type) + else: + logger.error("Unknown status type %02X", status_type) + + phase_type = data[19] + if phase_type in RESP_PHASE_TYPES: + phase_type = RESP_PHASE_TYPES[phase_type] + logger.debug("Phase type: %s", phase_type) + else: + logger.error("Unknown phase type %02X", phase_type) + + response = { + 'status_type': status_type, + 'phase_type': phase_type, + 'media_type': media_type, + 'media_width': media_width, + 'media_length': media_length, + 'errors': errors, + } + return response + + +def merge_specific_instructions(chunks, join_preamble=True, join_raster=True): + """ + Process a list of instructions by merging subsequent instuctions with + identical opcodes into "large instructions". + """ + new_instructions = [] + last_opcode = None + instruction_buffer = b'' + for instruction in chunks: + opcode = match_opcode(instruction) + if join_preamble and OPCODES[opcode][0] == 'preamble' and last_opcode == 'preamble': + instruction_buffer += instruction + elif join_raster and OPCODES[opcode][0] == 'raster' and last_opcode == 'raster': + instruction_buffer += instruction + else: + if instruction_buffer: + new_instructions.append(instruction_buffer) + instruction_buffer = instruction + last_opcode = OPCODES[opcode][0] + if instruction_buffer: + new_instructions.append(instruction_buffer) + return new_instructions + class BrotherQLReader(object): def __init__(self, brother_file): @@ -89,18 +235,18 @@ class BrotherQLReader(object): def analyse(self): instructions = self.brother_file.read() for instruction in chunker(instructions): - for command in cmds.keys(): - if instruction.startswith(command): - cmd = cmds[command] - if cmd[0] == 'init': + for opcode in OPCODES.keys(): + if instruction.startswith(opcode): + opcode_def = OPCODES[opcode] + if opcode_def[0] == 'init': self.mwidth, self.mheight = None, None self.raster_no = None self.rows = [] - payload = instruction[len(command):] - logger.info(" {} ({}) --> found! (payload: {})".format(cmd[0], hex_format(command), hex_format(payload))) - if cmd[0] == 'compression': + payload = instruction[len(opcode):] + logger.info(" {} ({}) --> found! (payload: {})".format(opcode_def[0], hex_format(opcode), hex_format(payload))) + if opcode_def[0] == 'compression': self.compression = payload[0] == 0x02 - if cmd[0] == 'raster': + if opcode_def[0] == 'raster': rpl = payload[2:] # raster payload index = 0 row = [] @@ -121,13 +267,13 @@ class BrotherQLReader(object): else: row.append(list(rpl)) self.rows.append(row) - if cmd[0] == 'media/quality': + if opcode_def[0] == 'media/quality': self.raster_no = struct.unpack('