- fixed some outstanding issues from the PyQt6 port - PEP8 fixes - minor fixes - updated the saving of Preferences to update the self.options too: the `Apply` action will update the self.options but the `Save` action will save the updated preferences to the file on disk
447 lines
18 KiB
Python
447 lines
18 KiB
Python
# ##########################################################
|
|
# FlatCAM: 2D Post-processing for Manufacturing #
|
|
# File Author: Marius Adrian Stanciu (c) #
|
|
# Date: 4/23/2019 #
|
|
# MIT Licence #
|
|
# ##########################################################
|
|
from PyQt6 import QtWidgets, QtCore
|
|
|
|
from appTool import AppTool
|
|
|
|
from appParsers.ParsePDF import PdfParser, grace
|
|
from shapely.geometry import Point, MultiPolygon
|
|
from shapely.ops import unary_union
|
|
|
|
from copy import deepcopy
|
|
# from io import BytesIO
|
|
#
|
|
# import zlib
|
|
import re
|
|
import time
|
|
import logging
|
|
import traceback
|
|
import os
|
|
|
|
from pikepdf import Pdf, parse_content_stream
|
|
|
|
import gettext
|
|
import appTranslation as fcTranslate
|
|
import builtins
|
|
|
|
fcTranslate.apply_language('strings')
|
|
if '_' not in builtins.__dict__:
|
|
_ = gettext.gettext
|
|
|
|
log = logging.getLogger('base')
|
|
|
|
|
|
class ToolPDF(AppTool):
|
|
"""
|
|
Parse a PDF file.
|
|
Reference here: https://www.adobe.com/content/dam/acom/en/devnet/pdf/pdfs/pdf_reference_archives/PDFReference.pdf
|
|
Return a list of geometries
|
|
"""
|
|
pluginName = _("PDF Import Tool")
|
|
|
|
def __init__(self, app):
|
|
AppTool.__init__(self, app)
|
|
self.app = app
|
|
self.decimals = self.app.decimals
|
|
|
|
self.stream_re = re.compile(b'.*?FlateDecode.*?stream(.*?)endstream', re.S)
|
|
|
|
self.pdf_decompressed = {}
|
|
|
|
# key = file name and extension
|
|
# value is a dict to store the parsed content of the PDF
|
|
self.pdf_parsed = {}
|
|
|
|
# QTimer for periodic check
|
|
self.check_thread = QtCore.QTimer()
|
|
|
|
# Every time a parser is started we add a promise; every time a parser finished we remove a promise
|
|
# when empty we start the layer rendering
|
|
self.parsing_promises = []
|
|
|
|
self.parser = PdfParser(units=self.app.app_units,
|
|
resolution=self.app.options["gerber_circle_steps"],
|
|
abort=self.app.abort_flag)
|
|
|
|
def run(self, toggle=True):
|
|
self.app.defaults.report_usage("ToolPDF()")
|
|
|
|
self.set_tool_ui()
|
|
self.on_open_pdf_click()
|
|
|
|
def install(self, icon=None, separator=None, **kwargs):
|
|
AppTool.install(self, icon, separator, shortcut='Ctrl+Q', **kwargs)
|
|
|
|
def set_tool_ui(self):
|
|
pass
|
|
|
|
def on_open_pdf_click(self):
|
|
"""
|
|
File menu callback for opening an PDF file.
|
|
|
|
:return: None
|
|
"""
|
|
|
|
self.app.defaults.report_usage("ToolPDF.on_open_pdf_click()")
|
|
self.app.log.debug("ToolPDF.on_open_pdf_click()")
|
|
|
|
_filter_ = "Adobe PDF Files (*.pdf);;" \
|
|
"All Files (*.*)"
|
|
|
|
try:
|
|
filenames, _f = QtWidgets.QFileDialog.getOpenFileNames(caption=_("Open PDF"),
|
|
directory=self.app.get_last_folder(),
|
|
filter=_filter_)
|
|
except TypeError:
|
|
filenames, _f = QtWidgets.QFileDialog.getOpenFileNames(caption=_("Open PDF"), filter=_filter_)
|
|
|
|
if len(filenames) == 0:
|
|
self.app.inform.emit('[WARNING_NOTCL] %s.' % _("Open PDF cancelled"))
|
|
else:
|
|
# start the parsing timer with a period of 1 second
|
|
self.periodic_check(1000)
|
|
|
|
for filename in filenames:
|
|
if filename != '':
|
|
self.app.worker_task.emit({'fcn': self.open_pdf, 'params': [filename]})
|
|
|
|
def open_pdf(self, filename):
|
|
if not os.path.exists(filename):
|
|
self.inform.emit('[ERROR_NOTCL] %s' % _("File no longer available."))
|
|
return
|
|
|
|
short_name = filename.split('/')[-1].split('\\')[-1]
|
|
self.parsing_promises.append(short_name)
|
|
|
|
self.pdf_parsed[short_name] = {
|
|
'pdf': {},
|
|
'filename': filename
|
|
}
|
|
|
|
self.pdf_decompressed[short_name] = ''
|
|
|
|
if self.app.abort_flag:
|
|
# graceful abort requested by the user
|
|
raise grace
|
|
|
|
with self.app.proc_container.new('%s...' % _("Parsing")):
|
|
with open(filename, "rb") as f:
|
|
# pdf = f.read()
|
|
pdf = Pdf.open(f)
|
|
|
|
page = pdf.pages[0]
|
|
decomp_file = ''
|
|
for operands, command in parse_content_stream(page):
|
|
line = ''
|
|
for op in operands:
|
|
try:
|
|
line += str(op) + ' '
|
|
except Exception as e:
|
|
# print(str(e), operands, command)
|
|
pass
|
|
line += str(command)
|
|
decomp_file += line + '\n'
|
|
self.pdf_decompressed[short_name] = decomp_file
|
|
|
|
# stream_nr = 0
|
|
# for s in re.findall(self.stream_re, pdf):
|
|
# if self.app.abort_flag:
|
|
# # graceful abort requested by the user
|
|
# raise grace
|
|
#
|
|
# stream_nr += 1
|
|
# log.debug("PDF STREAM: %d\n" % stream_nr)
|
|
# s = s.strip(b'\r\n')
|
|
#
|
|
# # https://stackoverflow.com/questions/1089662/python-inflate-and-deflate-implementations
|
|
# # def decompress(data):
|
|
# # decompressed = zlib.decompressobj(
|
|
# # -zlib.MAX_WBITS # see above
|
|
# # )
|
|
# # inflated = decompressed.decompress(data)
|
|
# # inflated += decompressed.flush()
|
|
# # return inflated
|
|
#
|
|
# Convert 2 Bytes If Python 3
|
|
# def C2BIP3(string):
|
|
# if type(string) == bytes:
|
|
# return string
|
|
# else:
|
|
# return bytes([ord(x) for x in string])
|
|
#
|
|
# def inflate(data):
|
|
# try:
|
|
# return zlib.decompress(C2BIP3(data))
|
|
# except Exception:
|
|
# if len(data) <= 10:
|
|
# raise
|
|
# oDecompress = zlib.decompressobj(-zlib.MAX_WBITS)
|
|
# oStringIO = BytesIO()
|
|
# count = 0
|
|
# for byte in C2BIP3(data):
|
|
# try:
|
|
# oStringIO.write(oDecompress.decompress(byte))
|
|
# count += 1
|
|
# except Exception:
|
|
# break
|
|
# if len(data) - count <= 2:
|
|
# return oStringIO.getvalue()
|
|
# else:
|
|
# raise
|
|
#
|
|
# try:
|
|
# decomp = inflate(s)
|
|
# except Exception as e:
|
|
# decomp = None
|
|
# log.debug("ToolPDF.open_pdf() -> inflate (decompress) -> %s" % str(e))
|
|
#
|
|
# try:
|
|
# self.pdf_decompressed[short_name] += (decomp.decode('UTF-8') + '\r\n')
|
|
# except Exception:
|
|
# try:
|
|
# self.pdf_decompressed[short_name] += (decomp.decode('latin1') + '\r\n')
|
|
# except Exception as e:
|
|
# log.error("ToolPDF.open_pdf() -> decoding error -> %s" % str(e))
|
|
# self.pdf_decompressed[short_name] = decomp_file
|
|
|
|
if self.pdf_decompressed[short_name] == '':
|
|
self.app.inform.emit('[ERROR_NOTCL] %s: %s' % (_("Failed to open"), str(filename)))
|
|
self.app.log.debug("ToolPDF.open_pdf().obj_init() --> Empty file or error on decompression")
|
|
self.parsing_promises.remove(short_name)
|
|
return
|
|
|
|
self.pdf_parsed[short_name]['pdf'] = self.parser.parse_pdf(pdf_content=self.pdf_decompressed[short_name])
|
|
# we used it, now we delete it
|
|
if self.pdf_decompressed[short_name]:
|
|
self.pdf_decompressed[short_name] = None
|
|
|
|
# removal from list is done in a multithreaded way therefore not always the removal can be done
|
|
# try to remove until it's done
|
|
try:
|
|
while True:
|
|
self.parsing_promises.remove(short_name)
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
self.app.log.error("ToolPDF.open_pdf() --> %s" % str(e))
|
|
self.app.inform.emit('[success] %s: %s' % (_("Opened"), str(filename)))
|
|
|
|
def layer_rendering_as_excellon(self, filename, ap_dict, layer_nr):
|
|
outname = filename.split('/')[-1].split('\\')[-1] + "_%s" % str(layer_nr)
|
|
|
|
# store the points here until reconstitution:
|
|
# keys are diameters and values are list of (x,y) coords
|
|
points = {}
|
|
|
|
def obj_init(new_obj, app_obj):
|
|
clear_geo = [geo_el['clear'] for geo_el in ap_dict[0]['geometry']]
|
|
|
|
new_obj.tools = {}
|
|
|
|
for geo in clear_geo:
|
|
xmin, ymin, xmax, ymax = geo.bounds
|
|
center = (((xmax - xmin) / 2) + xmin, ((ymax - ymin) / 2) + ymin)
|
|
|
|
# for drill bits, even in INCH, it's enough 3 decimals
|
|
correction_factor = 0.974
|
|
dia = (xmax - xmin) * correction_factor
|
|
dia = round(dia, 3)
|
|
if dia in points:
|
|
points[dia].append(center)
|
|
else:
|
|
points[dia] = [center]
|
|
|
|
sorted_dia = sorted(points.keys())
|
|
|
|
name_tool = 0
|
|
for dia in sorted_dia:
|
|
name_tool += 1
|
|
tool = str(name_tool)
|
|
|
|
new_obj.tools[tool] = {
|
|
'tooldia': dia,
|
|
'drills': [],
|
|
'solid_geometry': []
|
|
}
|
|
|
|
# update the drill list
|
|
for dia_points in points:
|
|
if dia == dia_points:
|
|
for pt in points[dia_points]:
|
|
new_obj.tools[tool]['drills'].append(Point(pt))
|
|
break
|
|
|
|
ret = new_obj.create_geometry()
|
|
if ret == 'fail':
|
|
self.app.log.debug("Could not create geometry for Excellon object.")
|
|
return "fail"
|
|
|
|
new_obj.source_file = app_obj.f_handlers.export_excellon(obj_name=outname, local_use=new_obj,
|
|
filename=None, use_thread=False)
|
|
|
|
for tool in new_obj.tools:
|
|
if new_obj.tools[tool]['solid_geometry']:
|
|
return
|
|
app_obj.inform.emit('[ERROR_NOTCL] %s: %s' % (_("No geometry found in file"), outname))
|
|
return "fail"
|
|
|
|
with self.app.proc_container.new(_("Rendering PDF layer #%d ...") % int(layer_nr)):
|
|
|
|
ret_val = self.app.app_obj.new_object("excellon", outname, obj_init, autoselected=False)
|
|
if ret_val == 'fail':
|
|
self.app.inform.emit('[ERROR_NOTCL] %s' % _('Open PDF file failed.'))
|
|
return
|
|
# Register recent file
|
|
self.app.file_opened.emit("pdf", filename)
|
|
# GUI feedback
|
|
self.app.inform.emit('[success] %s: %s' % (_("Rendered"), outname))
|
|
|
|
def layer_rendering_as_gerber(self, filename, ap_dict, layer_nr):
|
|
outname = filename.split('/')[-1].split('\\')[-1] + "_%s" % str(layer_nr)
|
|
|
|
def obj_init(grb_obj, app_obj):
|
|
|
|
grb_obj.tools = ap_dict
|
|
|
|
poly_buff = []
|
|
follow_buf = []
|
|
for ap in grb_obj.tools:
|
|
for k in grb_obj.tools[ap]:
|
|
if k == 'geometry':
|
|
for geo_el in ap_dict[ap][k]:
|
|
if 'solid' in geo_el:
|
|
poly_buff.append(geo_el['solid'])
|
|
if 'follow' in geo_el:
|
|
follow_buf.append(geo_el['follow'])
|
|
poly_buff = unary_union(poly_buff)
|
|
|
|
if 0 in grb_obj.tools:
|
|
global_clear_geo = []
|
|
if 'geometry' in grb_obj.tools[0]:
|
|
for geo_el in ap_dict[0]['geometry']:
|
|
if 'clear' in geo_el:
|
|
global_clear_geo.append(geo_el['clear'])
|
|
|
|
if global_clear_geo:
|
|
solid = []
|
|
for apid in grb_obj.tools:
|
|
if 'geometry' in grb_obj.tools[apid]:
|
|
for elem in grb_obj.tools[apid]['geometry']:
|
|
if 'solid' in elem:
|
|
solid_geo = deepcopy(elem['solid'])
|
|
for clear_geo in global_clear_geo:
|
|
# Make sure that the clear_geo is within the solid_geo otherwise we loose
|
|
# the solid_geometry. We want for clear_geometry just to cut into solid_geometry
|
|
# not to delete it
|
|
if clear_geo.within(solid_geo):
|
|
solid_geo = solid_geo.difference(clear_geo)
|
|
if solid_geo.is_empty:
|
|
solid_geo = elem['solid']
|
|
try:
|
|
for poly in solid_geo:
|
|
solid.append(poly)
|
|
except TypeError:
|
|
solid.append(solid_geo)
|
|
poly_buff = deepcopy(MultiPolygon(solid))
|
|
|
|
follow_buf = unary_union(follow_buf)
|
|
|
|
try:
|
|
poly_buff = poly_buff.buffer(0.0000001)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
poly_buff = poly_buff.buffer(-0.0000001)
|
|
except ValueError:
|
|
pass
|
|
|
|
grb_obj.solid_geometry = deepcopy(poly_buff)
|
|
grb_obj.follow_geometry = deepcopy(follow_buf)
|
|
|
|
with self.app.proc_container.new(_("Rendering PDF layer #%d ...") % int(layer_nr)):
|
|
|
|
ret = self.app.app_obj.new_object('gerber', outname, obj_init, autoselected=False)
|
|
if ret == 'fail':
|
|
self.app.inform.emit('[ERROR_NOTCL] %s' % _('Open PDF file failed.'))
|
|
return
|
|
# Register recent file
|
|
self.app.file_opened.emit('pdf', filename)
|
|
# GUI feedback
|
|
self.app.inform.emit('[success] %s: %s' % (_("Rendered"), outname))
|
|
|
|
def periodic_check(self, check_period):
|
|
"""
|
|
This function starts an QTimer and it will periodically check if parsing was done
|
|
|
|
:param check_period: time at which to check periodically if all plots finished to be plotted
|
|
:return:
|
|
"""
|
|
|
|
# self.plot_thread = threading.Thread(target=lambda: self.check_plot_finished(check_period))
|
|
# self.plot_thread.start()
|
|
self.app.log.debug("ToolPDF --> Periodic Check started.")
|
|
|
|
try:
|
|
self.check_thread.stop()
|
|
except TypeError:
|
|
pass
|
|
|
|
self.check_thread.setInterval(check_period)
|
|
try:
|
|
self.check_thread.timeout.disconnect(self.periodic_check_handler)
|
|
except (TypeError, AttributeError):
|
|
pass
|
|
|
|
self.check_thread.timeout.connect(self.periodic_check_handler)
|
|
self.check_thread.start(QtCore.QThread.Priority.HighPriority)
|
|
|
|
def periodic_check_handler(self):
|
|
"""
|
|
If the parsing worker finished then start multithreaded rendering
|
|
:return:
|
|
"""
|
|
# log.debug("checking parsing --> %s" % str(self.parsing_promises))
|
|
|
|
try:
|
|
if not self.parsing_promises:
|
|
self.check_thread.stop()
|
|
self.app.log.debug("PDF --> start rendering")
|
|
# parsing finished start the layer rendering
|
|
if self.pdf_parsed:
|
|
obj_to_delete = []
|
|
for object_name in self.pdf_parsed:
|
|
if self.app.abort_flag:
|
|
# graceful abort requested by the user
|
|
raise grace
|
|
|
|
filename = deepcopy(self.pdf_parsed[object_name]['filename'])
|
|
pdf_content = deepcopy(self.pdf_parsed[object_name]['pdf'])
|
|
obj_to_delete.append(object_name)
|
|
for k in pdf_content:
|
|
if self.app.abort_flag:
|
|
# graceful abort requested by the user
|
|
raise grace
|
|
|
|
ap_dict = pdf_content[k]
|
|
|
|
if ap_dict:
|
|
layer_nr = k
|
|
if k == 0:
|
|
self.app.worker_task.emit({'fcn': self.layer_rendering_as_excellon,
|
|
'params': [filename, ap_dict, layer_nr]})
|
|
else:
|
|
self.app.worker_task.emit({'fcn': self.layer_rendering_as_gerber,
|
|
'params': [filename, ap_dict, layer_nr]})
|
|
# delete the object already processed so it will not be processed again for other objects
|
|
# that were opened at the same time; like in drag & drop on appGUI
|
|
for obj_name in obj_to_delete:
|
|
if obj_name in self.pdf_parsed:
|
|
self.pdf_parsed.pop(obj_name)
|
|
|
|
self.app.log.debug("ToolPDF --> Periodic check finished.")
|
|
except Exception:
|
|
traceback.print_exc()
|