Files
flatcam-wsl/appPlugins/ToolPDF.py

447 lines
18 KiB
Python

# ##########################################################
# FlatCAM: 2D Post-processing for Manufacturing #
# File Author: Marius Adrian Stanciu (c) #
# Date: 4/23/2019 #
# MIT Licence #
# ##########################################################
from PyQt6 import QtWidgets, QtCore
from appTool import AppTool
from appParsers.ParsePDF import PdfParser, grace
from shapely.geometry import Point, MultiPolygon
from shapely.ops import unary_union
from copy import deepcopy
# from io import BytesIO
#
# import zlib
import re
import time
import logging
import traceback
import os
from pikepdf import Pdf, parse_content_stream
import gettext
import appTranslation as fcTranslate
import builtins
fcTranslate.apply_language('strings')
if '_' not in builtins.__dict__:
_ = gettext.gettext
log = logging.getLogger('base')
class ToolPDF(AppTool):
"""
Parse a PDF file.
Reference here: https://www.adobe.com/content/dam/acom/en/devnet/pdf/pdfs/pdf_reference_archives/PDFReference.pdf
Return a list of geometries
"""
pluginName = _("PDF Import Tool")
def __init__(self, app):
AppTool.__init__(self, app)
self.app = app
self.decimals = self.app.decimals
self.stream_re = re.compile(b'.*?FlateDecode.*?stream(.*?)endstream', re.S)
self.pdf_decompressed = {}
# key = file name and extension
# value is a dict to store the parsed content of the PDF
self.pdf_parsed = {}
# QTimer for periodic check
self.check_thread = QtCore.QTimer()
# Every time a parser is started we add a promise; every time a parser finished we remove a promise
# when empty we start the layer rendering
self.parsing_promises = []
self.parser = PdfParser(units=self.app.app_units,
resolution=self.app.defaults["gerber_circle_steps"],
abort=self.app.abort_flag)
def run(self, toggle=True):
self.app.defaults.report_usage("ToolPDF()")
self.set_tool_ui()
self.on_open_pdf_click()
def install(self, icon=None, separator=None, **kwargs):
AppTool.install(self, icon, separator, shortcut='Ctrl+Q', **kwargs)
def set_tool_ui(self):
pass
def on_open_pdf_click(self):
"""
File menu callback for opening an PDF file.
:return: None
"""
self.app.defaults.report_usage("ToolPDF.on_open_pdf_click()")
self.app.log.debug("ToolPDF.on_open_pdf_click()")
_filter_ = "Adobe PDF Files (*.pdf);;" \
"All Files (*.*)"
try:
filenames, _f = QtWidgets.QFileDialog.getOpenFileNames(caption=_("Open PDF"),
directory=self.app.get_last_folder(),
filter=_filter_)
except TypeError:
filenames, _f = QtWidgets.QFileDialog.getOpenFileNames(caption=_("Open PDF"), filter=_filter_)
if len(filenames) == 0:
self.app.inform.emit('[WARNING_NOTCL] %s.' % _("Open PDF cancelled"))
else:
# start the parsing timer with a period of 1 second
self.periodic_check(1000)
for filename in filenames:
if filename != '':
self.app.worker_task.emit({'fcn': self.open_pdf, 'params': [filename]})
def open_pdf(self, filename):
if not os.path.exists(filename):
self.inform.emit('[ERROR_NOTCL] %s' % _("File no longer available."))
return
short_name = filename.split('/')[-1].split('\\')[-1]
self.parsing_promises.append(short_name)
self.pdf_parsed[short_name] = {
'pdf': {},
'filename': filename
}
self.pdf_decompressed[short_name] = ''
if self.app.abort_flag:
# graceful abort requested by the user
raise grace
with self.app.proc_container.new('%s...' % _("Parsing")):
with open(filename, "rb") as f:
# pdf = f.read()
pdf = Pdf.open(f)
page = pdf.pages[0]
decomp_file = ''
for operands, command in parse_content_stream(page):
line = ''
for op in operands:
try:
line += str(op) + ' '
except Exception as e:
# print(str(e), operands, command)
pass
line += str(command)
decomp_file += line + '\n'
self.pdf_decompressed[short_name] = decomp_file
# stream_nr = 0
# for s in re.findall(self.stream_re, pdf):
# if self.app.abort_flag:
# # graceful abort requested by the user
# raise grace
#
# stream_nr += 1
# log.debug("PDF STREAM: %d\n" % stream_nr)
# s = s.strip(b'\r\n')
#
# # https://stackoverflow.com/questions/1089662/python-inflate-and-deflate-implementations
# # def decompress(data):
# # decompressed = zlib.decompressobj(
# # -zlib.MAX_WBITS # see above
# # )
# # inflated = decompressed.decompress(data)
# # inflated += decompressed.flush()
# # return inflated
#
# Convert 2 Bytes If Python 3
# def C2BIP3(string):
# if type(string) == bytes:
# return string
# else:
# return bytes([ord(x) for x in string])
#
# def inflate(data):
# try:
# return zlib.decompress(C2BIP3(data))
# except Exception:
# if len(data) <= 10:
# raise
# oDecompress = zlib.decompressobj(-zlib.MAX_WBITS)
# oStringIO = BytesIO()
# count = 0
# for byte in C2BIP3(data):
# try:
# oStringIO.write(oDecompress.decompress(byte))
# count += 1
# except Exception:
# break
# if len(data) - count <= 2:
# return oStringIO.getvalue()
# else:
# raise
#
# try:
# decomp = inflate(s)
# except Exception as e:
# decomp = None
# log.debug("ToolPDF.open_pdf() -> inflate (decompress) -> %s" % str(e))
#
# try:
# self.pdf_decompressed[short_name] += (decomp.decode('UTF-8') + '\r\n')
# except Exception:
# try:
# self.pdf_decompressed[short_name] += (decomp.decode('latin1') + '\r\n')
# except Exception as e:
# log.error("ToolPDF.open_pdf() -> decoding error -> %s" % str(e))
# self.pdf_decompressed[short_name] = decomp_file
if self.pdf_decompressed[short_name] == '':
self.app.inform.emit('[ERROR_NOTCL] %s: %s' % (_("Failed to open"), str(filename)))
self.app.log.debug("ToolPDF.open_pdf().obj_init() --> Empty file or error on decompression")
self.parsing_promises.remove(short_name)
return
self.pdf_parsed[short_name]['pdf'] = self.parser.parse_pdf(pdf_content=self.pdf_decompressed[short_name])
# we used it, now we delete it
if self.pdf_decompressed[short_name]:
self.pdf_decompressed[short_name] = None
# removal from list is done in a multithreaded way therefore not always the removal can be done
# try to remove until it's done
try:
while True:
self.parsing_promises.remove(short_name)
time.sleep(0.1)
except Exception as e:
self.app.log.error("ToolPDF.open_pdf() --> %s" % str(e))
self.app.inform.emit('[success] %s: %s' % (_("Opened"), str(filename)))
def layer_rendering_as_excellon(self, filename, ap_dict, layer_nr):
outname = filename.split('/')[-1].split('\\')[-1] + "_%s" % str(layer_nr)
# store the points here until reconstitution:
# keys are diameters and values are list of (x,y) coords
points = {}
def obj_init(new_obj, app_obj):
clear_geo = [geo_el['clear'] for geo_el in ap_dict[0]['geometry']]
new_obj.tools = {}
for geo in clear_geo:
xmin, ymin, xmax, ymax = geo.bounds
center = (((xmax - xmin) / 2) + xmin, ((ymax - ymin) / 2) + ymin)
# for drill bits, even in INCH, it's enough 3 decimals
correction_factor = 0.974
dia = (xmax - xmin) * correction_factor
dia = round(dia, 3)
if dia in points:
points[dia].append(center)
else:
points[dia] = [center]
sorted_dia = sorted(points.keys())
name_tool = 0
for dia in sorted_dia:
name_tool += 1
tool = str(name_tool)
new_obj.tools[tool] = {
'tooldia': dia,
'drills': [],
'solid_geometry': []
}
# update the drill list
for dia_points in points:
if dia == dia_points:
for pt in points[dia_points]:
new_obj.tools[tool]['drills'].append(Point(pt))
break
ret = new_obj.create_geometry()
if ret == 'fail':
self.app.log.debug("Could not create geometry for Excellon object.")
return "fail"
new_obj.source_file = app_obj.f_handlers.export_excellon(obj_name=outname, local_use=new_obj,
filename=None, use_thread=False)
for tool in new_obj.tools:
if new_obj.tools[tool]['solid_geometry']:
return
app_obj.inform.emit('[ERROR_NOTCL] %s: %s' % (_("No geometry found in file"), outname))
return "fail"
with self.app.proc_container.new(_("Rendering PDF layer #%d ...") % int(layer_nr)):
ret_val = self.app.app_obj.new_object("excellon", outname, obj_init, autoselected=False)
if ret_val == 'fail':
self.app.inform.emit('[ERROR_NOTCL] %s' % _('Open PDF file failed.'))
return
# Register recent file
self.app.file_opened.emit("pdf", filename)
# GUI feedback
self.app.inform.emit('[success] %s: %s' % (_("Rendered"), outname))
def layer_rendering_as_gerber(self, filename, ap_dict, layer_nr):
outname = filename.split('/')[-1].split('\\')[-1] + "_%s" % str(layer_nr)
def obj_init(grb_obj, app_obj):
grb_obj.tools = ap_dict
poly_buff = []
follow_buf = []
for ap in grb_obj.tools:
for k in grb_obj.tools[ap]:
if k == 'geometry':
for geo_el in ap_dict[ap][k]:
if 'solid' in geo_el:
poly_buff.append(geo_el['solid'])
if 'follow' in geo_el:
follow_buf.append(geo_el['follow'])
poly_buff = unary_union(poly_buff)
if 0 in grb_obj.tools:
global_clear_geo = []
if 'geometry' in grb_obj.tools[0]:
for geo_el in ap_dict[0]['geometry']:
if 'clear' in geo_el:
global_clear_geo.append(geo_el['clear'])
if global_clear_geo:
solid = []
for apid in grb_obj.tools:
if 'geometry' in grb_obj.tools[apid]:
for elem in grb_obj.tools[apid]['geometry']:
if 'solid' in elem:
solid_geo = deepcopy(elem['solid'])
for clear_geo in global_clear_geo:
# Make sure that the clear_geo is within the solid_geo otherwise we loose
# the solid_geometry. We want for clear_geometry just to cut into solid_geometry
# not to delete it
if clear_geo.within(solid_geo):
solid_geo = solid_geo.difference(clear_geo)
if solid_geo.is_empty:
solid_geo = elem['solid']
try:
for poly in solid_geo:
solid.append(poly)
except TypeError:
solid.append(solid_geo)
poly_buff = deepcopy(MultiPolygon(solid))
follow_buf = unary_union(follow_buf)
try:
poly_buff = poly_buff.buffer(0.0000001)
except ValueError:
pass
try:
poly_buff = poly_buff.buffer(-0.0000001)
except ValueError:
pass
grb_obj.solid_geometry = deepcopy(poly_buff)
grb_obj.follow_geometry = deepcopy(follow_buf)
with self.app.proc_container.new(_("Rendering PDF layer #%d ...") % int(layer_nr)):
ret = self.app.app_obj.new_object('gerber', outname, obj_init, autoselected=False)
if ret == 'fail':
self.app.inform.emit('[ERROR_NOTCL] %s' % _('Open PDF file failed.'))
return
# Register recent file
self.app.file_opened.emit('pdf', filename)
# GUI feedback
self.app.inform.emit('[success] %s: %s' % (_("Rendered"), outname))
def periodic_check(self, check_period):
"""
This function starts an QTimer and it will periodically check if parsing was done
:param check_period: time at which to check periodically if all plots finished to be plotted
:return:
"""
# self.plot_thread = threading.Thread(target=lambda: self.check_plot_finished(check_period))
# self.plot_thread.start()
self.app.log.debug("ToolPDF --> Periodic Check started.")
try:
self.check_thread.stop()
except TypeError:
pass
self.check_thread.setInterval(check_period)
try:
self.check_thread.timeout.disconnect(self.periodic_check_handler)
except (TypeError, AttributeError):
pass
self.check_thread.timeout.connect(self.periodic_check_handler)
self.check_thread.start(QtCore.QThread.Priority.HighPriority)
def periodic_check_handler(self):
"""
If the parsing worker finished then start multithreaded rendering
:return:
"""
# log.debug("checking parsing --> %s" % str(self.parsing_promises))
try:
if not self.parsing_promises:
self.check_thread.stop()
self.app.log.debug("PDF --> start rendering")
# parsing finished start the layer rendering
if self.pdf_parsed:
obj_to_delete = []
for object_name in self.pdf_parsed:
if self.app.abort_flag:
# graceful abort requested by the user
raise grace
filename = deepcopy(self.pdf_parsed[object_name]['filename'])
pdf_content = deepcopy(self.pdf_parsed[object_name]['pdf'])
obj_to_delete.append(object_name)
for k in pdf_content:
if self.app.abort_flag:
# graceful abort requested by the user
raise grace
ap_dict = pdf_content[k]
if ap_dict:
layer_nr = k
if k == 0:
self.app.worker_task.emit({'fcn': self.layer_rendering_as_excellon,
'params': [filename, ap_dict, layer_nr]})
else:
self.app.worker_task.emit({'fcn': self.layer_rendering_as_gerber,
'params': [filename, ap_dict, layer_nr]})
# delete the object already processed so it will not be processed again for other objects
# that were opened at the same time; like in drag & drop on appGUI
for obj_name in obj_to_delete:
if obj_name in self.pdf_parsed:
self.pdf_parsed.pop(obj_name)
self.app.log.debug("ToolPDF --> Periodic check finished.")
except Exception:
traceback.print_exc()