New brother_ql.backends package

The brother_ql.backends package provides backends for
* network (via raw TCP/IP sockets)
* linux_kernel (via device handles like /dev/usb/lp0 etc.)
* pyusb (via PyUSB)

The new backends are in use by the additional scripts
* brother_ql_debug
* brother_ql_print
This commit is contained in:
Philipp Klaus
2016-07-21 17:51:11 +02:00
parent 6c7f68e2b0
commit 6621134df9
8 changed files with 604 additions and 0 deletions

View File

@@ -0,0 +1,40 @@
from .generic import BrotherQLBackendGeneric
available_backends = [
'pyusb',
'network',
'linux_kernel',
]
def guess_backend(descr):
""" guess the backend from a given string descriptor for the device """
if descr.startswith('usb://') or descr.startswith('0x'):
return 'pyusb'
elif descr.startswith('file://') or descr.startswith('/dev/usb/') or descr.startswith('lp'):
return 'linux_kernel'
elif descr.startswith('tcp://'):
return 'network'
else:
raise ValueError('Cannot guess backend for given string descriptor: %s' % descr)
def backend_factory(backend_name):
if backend_name == 'pyusb':
from . import pyusb as pyusb_backend
list_available_devices = pyusb_backend.list_available_devices
dev_class = pyusb_backend.BrotherQLBackendPyUSB
elif backend_name == 'linux_kernel':
from . import linux_kernel as linux_kernel_backend
list_available_devices = linux_kernel_backend.list_available_devices
dev_class = linux_kernel_backend.BrotherQLBackendLinuxKernel
elif backend_name == 'network':
from . import network as network_backend
list_available_devices = network_backend.list_available_devices
dev_class = network_backend.BrotherQLBackendNetwork
else:
raise NotImplementedError('Backend %s not implemented.' % backend_name)
return {'list_available_devices': list_available_devices, 'dev_class': dev_class}

View File

@@ -0,0 +1,41 @@
import logging
logger = logging.getLogger(__name__)
def list_available_devices():
""" List all available devices for the respective backend """
# returns a list of dictionaries with the keys 'string_descr' and 'instance':
# [ {'string_descr': '/dev/usb/lp0', 'instance': os.open('/dev/usb/lp0', os.O_RDWR)}, ]
raise NotImplementedError()
class BrotherQLBackendGeneric(object):
def __init__(self, device_specifier):
"""
device_specifier can be either a string or an instance
of the required class type.
"""
self.write_dev = None
self.read_dev = None
raise NotImplementedError()
def _write(self, data):
self.write_dev.write(data)
def _read(self, length=32):
return bytes(self.read_dev.read(length))
def write(self, data):
logger.debug('Writing %d bytes.', len(data))
self._write(data)
def read(self, length=32):
try:
ret_bytes = self._read(length)
if ret_bytes: logger.debug('Read %d bytes.', len(ret_bytes))
return ret_bytes
except Exception as e:
logger.debug('Error reading... %s', e)
raise

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python
"""
Backend to support Brother QL-series printers via the linux kernel USB printer interface.
Works on Linux.
"""
import glob, os, time, select
from .generic import BrotherQLBackendGeneric
def list_available_devices():
"""
List all available devices for the linux kernel backend
returns: devices: a list of dictionaries with the keys 'string_descr' and 'instance': \
[ {'string_descr': 'file:///dev/usb/lp0', 'instance': None}, ] \
Instance is set to None because we don't want to open (and thus potentially block) the device here.
"""
paths = glob.glob('/dev/usb/lp*')
return [{'string_descr': 'file://' + path, 'instance': None} for path in paths]
class BrotherQLBackendLinuxKernel(BrotherQLBackendGeneric):
"""
BrotherQL backend using the Linux Kernel USB Printer Device Handles
"""
def __init__(self, device_specifier):
"""
device_specifier: string or os.open(): string descriptor in the \
format file:///dev/usb/lp0 or os.open() raw device handle.
"""
self.read_timeout = 0.01
# strategy : try_twice or select
self.strategy = 'select'
if isinstance(device_specifier, str):
if device_specifier.startswith('file://'):
device_specifier = device_specifier[7:]
self.dev = os.open(device_specifier, os.O_RDWR)
elif isinstance(device_specifier, int):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')
self.write_dev = self.dev
self.read_dev = self.dev
def _write(self, data):
os.write(self.write_dev, data)
def _read(self, length=32):
if self.strategy == 'try_twice':
data = os.read(self.read_dev, length)
if data:
return data
else:
time.sleep(self.read_timeout)
return os.read(self.read_dev, length)
elif self.strategy == 'select':
data = b''
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.read_dev], [], [], 0)
if self.read_dev in result:
data += os.read(self.read_dev, length)
if data: break
time.sleep(0.001)
if not data:
# one last try if still no data:
return os.read(self.read_dev, length)
else:
return data
else:
raise NotImplementedError('Unknown strategy')

92
brother_ql/backends/network.py Executable file
View File

@@ -0,0 +1,92 @@
#!/usr/bin/env python
"""
Backend to support Brother QL-series printers via network.
Works cross-platform.
"""
import socket, os, time, select
from .generic import BrotherQLBackendGeneric
def list_available_devices():
"""
List all available devices for the network backend
returns: devices: a list of dictionaries with the keys 'string_descr' and 'instance': \
[ {'string_descr': 'tcp://hostname[:port]', 'instance': None}, ] \
Instance is set to None because we don't want to connect to the device here yet.
"""
# We need some snmp request sent to 255.255.255.255 here
raise NotImplementedError()
return [{'string_descr': 'tcp://' + path, 'instance': None} for path in paths]
class BrotherQLBackendNetwork(BrotherQLBackendGeneric):
"""
BrotherQL backend using the Linux Kernel USB Printer Device Handles
"""
def __init__(self, device_specifier):
"""
device_specifier: string or os.open(): string descriptor in the \
format file:///dev/usb/lp0 or os.open() raw device handle.
"""
self.read_timeout = 0.01
# strategy : try_twice, select or socket_timeout
self.strategy = 'socket_timeout'
if isinstance(device_specifier, str):
if device_specifier.startswith('tcp://'):
device_specifier = device_specifier[6:]
host, _, port = device_specifier.partition(':')
if port:
port = int(port)
else:
port = 9100
#try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self.s.connect((host, port))
#except OSError as e:
# raise ValueError('Could not connect to the device.')
if self.strategy == 'socket_timeout':
self.s.settimeout(self.read_timeout)
elif self.strategy == 'try_twice':
self.s.settimeout(self.read_timeout)
else:
self.s.settimeout(0)
elif isinstance(device_specifier, int):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via an os.open() handle.')
def _write(self, data):
self.s.send(data)
def _read(self, length=32):
if self.strategy in ('socket_timeout', 'try_twice'):
if self.strategy == 'socket_timeout':
tries = 1
if self.strategy == 'try_twice':
tries = 2
for i in range(tries):
try:
data = self.s.recv(length)
return data
except socket.timeout:
pass
return b''
elif self.strategy == 'select':
data = b''
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.s], [], [], 0)
if self.s in result:
data += self.s.recv(length)
if data: break
time.sleep(0.001)
return data
else:
raise NotImplementedError('Unknown strategy')

137
brother_ql/backends/pyusb.py Executable file
View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python
"""
Backend to support Brother QL-series printers via PyUSB.
Works on Mac OS X and Linux.
Requires PyUSB: https://github.com/walac/pyusb/
Install via `pip install pyusb`
"""
import time
import usb.core
import usb.util
from .generic import BrotherQLBackendGeneric
def list_available_devices():
"""
List all available devices for the respective backend
returns: devices: a list of dictionaries with the keys 'string_descr' and 'instance': \
[ {'string_descr': 'usb://0x04f9:0x2015/C5Z315686', 'instance': pyusb.core.Device()}, ]
The 'string_descr' is of the format idVendor:idProduct_iSerialNumber.
"""
class find_class(object):
def __init__(self, class_):
self._class = class_
def __call__(self, device):
# first, let's check the device
if device.bDeviceClass == self._class:
return True
# ok, transverse all devices to find an interface that matches our class
for cfg in device:
# find_descriptor: what's it?
intf = usb.util.find_descriptor(cfg, bInterfaceClass=self._class)
if intf is not None:
return True
return False
# only Brother printers
printers = usb.core.find(find_all=1, custom_match=find_class(7), idVendor=0x04f9)
def string_descr(dev):
try:
serial = usb.util.get_string(dev, 256, dev.iSerialNumber)
return 'usb://0x{:04x}:0x{:04x}_{}'.format(dev.idVendor, dev.idProduct, serial)
except:
return 'usb://0x{:04x}:0x{:04x}'.format(dev.idVendor, dev.idProduct)
return [{'string_descr': string_descr(printer), 'instance': printer} for printer in printers]
class BrotherQLBackendPyUSB(BrotherQLBackendGeneric):
"""
BrotherQL backend using PyUSB
"""
def __init__(self, device_specifier):
"""
device_specifier: string or pyusb.core.Device: string descriptor of the \
format usb://brother_ql/idVendor/idProduct/iSerialNumber or pyusb.core.Device instance.
"""
self.dev = None
self.read_timeout = 0.01
# strategy : try_twice or select
self.strategy = 'try_twice'
if isinstance(device_specifier, str):
if device_specifier.startswith('usb://'):
device_specifier = device_specifier[6:]
vendor_product, _, serial = device_specifier.partition('/')
vendor, _, product = vendor_product.partition(':')
vendor, product = int(vendor, 16), int(product, 16)
for result in list_available_devices():
printer = result['instance']
if printer.idVendor == vendor and printer.idProduct == product or (serial and printer.iSerialNumber == serial):
self.dev = printer
break
if self.dev is None:
raise ValueError('Device not found')
elif isinstance(device_specifier, usb.core.Device):
self.dev = device_specifier
else:
raise NotImplementedError('Currently the printer can be specified either via an appropriate string or via a usb.core.Device instance.')
# Now we are sure to have self.dev around, start using it:
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
# set the active configuration. With no arguments, the first configuration will be the active one
self.dev.set_configuration()
cfg = self.dev.get_active_configuration()
intf = cfg[(0,0)]
ep_match_in = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
ep_match_out = lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
ep_in = usb.util.find_descriptor(intf, custom_match=ep_match_in)
ep_out = usb.util.find_descriptor(intf, custom_match=ep_match_out)
assert ep_in is not None
assert ep_out is not None
self.write_dev = ep_out
self.read_dev = ep_in
def _raw_read(self, length):
# pyusb Device.read() operations return array() type - let's convert it to bytes()
return bytes(self.read_dev.read(length))
def _read(self, length=32):
if self.strategy == 'try_twice':
data = self._raw_read(length)
if data:
return bytes(data)
else:
time.sleep(self.read_timeout)
return self._raw_read(length)
elif self.strategy == 'select':
data = b''
start = time.time()
while (not data) and (time.time() - start < self.read_timeout):
result, _, _ = select.select([self.read_dev], [], [], 0)
if self.read_dev in result:
data += self._raw_read(length)
if data: break
time.sleep(0.001)
if not data:
# one last try if still no data:
return self._raw_read(length)
else:
return data
else:
raise NotImplementedError('Unknown strategy')

118
brother_ql/brother_ql_debug.py Executable file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python
import sys, argparse, logging, struct, io, logging, sys, os, time
from pprint import pprint, pformat
from brother_ql.reader import OPCODES, chunker, merge_specific_instructions, interpret_response, match_opcode, hex_format
from brother_ql.backends import backend_factory, guess_backend
logger = logging.getLogger(__name__)
class BrotherQL_USBdebug(object):
def __init__(self, dev, instructions_data, backend='linux_kernel'):
be_cls = backend_factory(backend)['dev_class']
self.be = be_cls(dev)
self.sleep_time = 0.0
self.sleep_before_read = 0.0
self.continue_reading_for = 3.0
self.start = time.time()
self.interactive = False
self.merge_specific_instructions = True
if type(instructions_data) in (str,):
with open(instructions_data, 'rb') as f:
self.instructions_data = f.read()
elif type(instructions_data) in (bytes,):
self.instructions_data = instructions_data
else:
raise NotImplementedError('Only filename or bytes supported for instructions_data argument')
response = self.be.read()
if response:
logger.warning('Received response before sending instructions: {}'.format(hex_format(response)))
def continue_reading(self, seconds=3.0):
start = time.time()
while time.time() - start < seconds:
data = self.be.read()
if data != b'':
global_time = time.time() - self.start
print('TIME %.2f' % global_time)
self.log_interp_response(data)
time.sleep(0.001)
def log_interp_response(self, data):
try:
interp_result = interpret_response(data)
logger.info("Interpretation of the response: '{status_type}' (phase: {phase_type}), '{media_type}' {media_width}x{media_length} mm^2, errors: {errors}".format(**interp_result))
except:
logger.error("Couln't interpret response: %s", hex_format(data))
def print_and_debug(self):
self.continue_reading(0.2)
instructions = chunker(self.instructions_data)
instructions = merge_specific_instructions(instructions, join_preamble=True, join_raster=self.merge_specific_instructions)
for instruction in instructions:
opcode = match_opcode(instruction)
opcode_def = OPCODES[opcode]
cmd_name = opcode_def[0]
hex_instruction = hex_format(instruction).split()
if len(hex_instruction) > 100:
hex_instruction = ' '.join(hex_instruction[0:70] + ['[...]'] + hex_instruction[-30:])
else:
hex_instruction = ' '.join(hex_instruction)
logger.info("CMD {} FOUND. Instruction: {} ".format(cmd_name, hex_instruction))
if self.interactive: input('Continue?')
# WRITE
self.be.write(instruction)
# SLEEP BEFORE READ
time.sleep(self.sleep_before_read)
# READ
response = self.be.read()
#response += self.be.read()
if response != b'':
logger.info("Response from the device: {}".format(hex_format(response)))
self.log_interp_response(response)
# SLEEP BETWEEN INSTRUCTIONS
time.sleep(self.sleep_time)
self.continue_reading(self.continue_reading_for)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('file', help='The file to analyze')
parser.add_argument('dev', help='The device to use. Can be usb://0x04f9:0x2015 or /dev/usb/lp0 for example')
parser.add_argument('--sleep-time', type=float, help='time in seconds to sleep between instructions')
parser.add_argument('--sleep-before-read', type=float, help='time in seconds to sleep before reading response')
parser.add_argument('--continue-reading-for', type=float, help='continue reading after sending the last commands (time in seconds)')
parser.add_argument('--interactive', action='store_true', help='interactive mode')
parser.add_argument('--split-raster', action='store_true', help='even split preamble and raster instructions into single write operations')
parser.add_argument('--debug', action='store_true', help='enable debug mode')
args = parser.parse_args()
# SETUP
loglevel = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=loglevel, format='%(levelname)s: %(message)s')
try:
backend = guess_backend(args.dev)
except ValueError as e:
parser.error(e.msg)
br = BrotherQL_USBdebug(args.dev, args.file, backend=backend)
if args.interactive: br.interactive = True
if args.sleep_time: br.sleep_time = args.sleep_time
if args.sleep_before_read: br.sleep_before_read = args.sleep_before_read
if args.split_raster: br.merge_specific_instructions = False
if args.continue_reading_for: br.continue_reading_for = args.continue_reading_for
# GO
br.print_and_debug()
if __name__ == '__main__':
main()

96
brother_ql/brother_ql_print.py Executable file
View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python
"""
Testing the packaged version of the Linux Kernel backend
"""
import argparse, logging, sys, time
from pprint import pprint
from brother_ql.backends import backend_factory, guess_backend, available_backends
from brother_ql.reader import interpret_response
logger = logging.getLogger(__name__)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--backend', choices=available_backends, default='linux_kernel', help='Forces the use of a specific backend')
parser.add_argument('--list-printers', action='store_true', help='List the devices available with the selected --backend')
parser.add_argument('--debug', action='store_true', help='Enable debugging output')
parser.add_argument('instruction_file', nargs='?', help='file containing the instructions to be sent to the printer')
parser.add_argument('device', metavar='DEVICE_STRING_DESCRIPTOR', nargs='?', help='String descriptor for specific device. If not specified, select first detected device')
args = parser.parse_args()
if args.list_printers and not args.backend:
parser.error('Please specify the backend in order to list available devices.')
if not args.list_printers and not args.instruction_file:
parser.error("the following arguments are required: instruction_file")
level = logging.DEBUG if args.debug else logging.WARNING
logging.basicConfig(level=level)
if args.backend == 'network':
logger.warning("Warning: The network backend has not 'readback' functionality. Only writing/sending to the printer works here.")
try:
selected_backend = guess_backend(args.device)
except:
logger.warning("Couln't guess the backend. Trying with linux_kernel backend.")
selected_backend = 'linux_kernel'
be = backend_factory(selected_backend)
list_available_devices = be['list_available_devices']
BrotherQLBackend = be['dev_class']
if args.list_printers:
for printer in list_available_devices():
print(printer['string_descr'])
sys.exit(0)
if not args.device:
"We need to search for available devices and select the first."
ad = list_available_devices()
selected = None
pprint(available_devices)
for device in ad:
string_descr = device['string_descr']
instance = device['instance']
selected = string_descr
break
if not selected:
sys.exit("No printer found")
else:
"A string descriptor for the device was given, let's use it."
selected = args.device
printer = BrotherQLBackend(selected)
start = time.time()
with open(args.instruction_file, 'rb') as f:
content = f.read()
logger.info('Sending instructions to the printer. Total: %d bytes.', len(content))
printer.write(content)
if args.backend in ('network',): return
printing_completed = False
waiting_to_receive = False
while time.time() - start < 10:
data = printer.read()
if not data:
time.sleep(0.005)
continue
try:
result = interpret_response(data)
except ValueError:
logger.error("TIME %.3f - Couln't understand response: %s", time.time()-start, data)
continue
logger.debug('TIME %.3f - result: %s', time.time()-start, result)
if result['errors']:
logger.error('Errors occured: %s', result['errors'])
if result['status_type'] == 'Printing completed': printing_completed = True
if result['status_type'] == 'Phase change' and result['phase_type'] == 'Waiting to receive': waiting_to_receive = True
if printing_completed and waiting_to_receive:
break
if not (printing_completed and waiting_to_receive):
logger.warning('Printing potentially not successful?')
if __name__ == "__main__": main()

View File

@@ -14,11 +14,14 @@ setup(name='brother_ql',
url = '',
license = 'GPL',
packages = ['brother_ql',
'brother_ql.backends',
'brother_ql.web'],
entry_points = {
'console_scripts': [
'brother_ql_analyse = brother_ql.brother_ql_analyse:main',
'brother_ql_create = brother_ql.brother_ql_create:main',
'brother_ql_print = brother_ql.brother_ql_print:main',
'brother_ql_debug = brother_ql.brother_ql_debug:main',
'brother_ql_web = brother_ql.web.__init__:main',
],
},