#!/usr/bin/env python3
################################################################################
# #
# This file is part of the Buildbotics firmware. #
# #
# Copyright (c) 2015 - 2018, Buildbotics LLC #
# All rights reserved. #
# #
# This file ("the software") is free software: you can redistribute it #
# and/or modify it under the terms of the GNU General Public License, #
# version 2 as published by the Free Software Foundation. You should #
# have received a copy of the GNU General Public License, version 2 #
# along with the software. If not, see . #
# #
# The software is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU #
# Lesser General Public License for more details. #
# #
# You should have received a copy of the GNU Lesser General Public #
# License along with the software. If not, see #
# . #
# #
# For information regarding this software email: #
# "Joseph Coffland" #
# #
################################################################################
import sys
import argparse
import json
import time
import math
import os
import re
import gzip
import struct
import math
import camotics.gplan as gplan # pylint: disable=no-name-in-module,import-error
reLogLine = re.compile(
r'^(?P[A-Z])[0-9 ]:'
r'((?P[^:]+):)?'
r'((?P\d+):)?'
r'((?P\d+):)?'
r'(?P.*)$')
def compute_unit(a, b):
unit = dict()
length = 0
for axis in 'xyz':
if axis in a and axis in b:
unit[axis] = b[axis] - a[axis]
length += unit[axis] * unit[axis]
length = math.sqrt(length)
if length:
for axis in 'xyz':
if axis in unit: unit[axis] /= length
return unit
def compute_move(start, unit, dist):
move = dict()
for axis in 'xyz':
if axis in unit and axis in start:
move[axis] = start[axis] + unit[axis] * dist
return move
class Plan(object):
def __init__(self, path, state, config):
self.path = path
self.state = state
self.config = config
self.lines = sum(1 for line in open(path, 'rb'))
self.planner = gplan.Planner()
self.planner.set_resolver(self.get_var_cb)
self.planner.set_logger(self._log_cb, 1, 'LinePlanner:3')
self.planner.load(self.path, config)
self.messages = []
self.levels = dict(I = 'info', D = 'debug', W = 'warning', E = 'error',
C = 'critical')
# Initialized axis states and bounds
self.bounds = dict(min = {}, max = {})
for axis in 'xyz':
self.bounds['min'][axis] = math.inf
self.bounds['max'][axis] = -math.inf
self.maxSpeed = 0
self.currentSpeed = None
self.lastProgress = None
self.lastProgressTime = 0
self.time = 0
def add_to_bounds(self, axis, value):
if value < self.bounds['min'][axis]: self.bounds['min'][axis] = value
if self.bounds['max'][axis] < value: self.bounds['max'][axis] = value
def get_bounds(self):
# Remove infinity from bounds
for axis in 'xyz':
if self.bounds['min'][axis] == math.inf:
del self.bounds['min'][axis]
if self.bounds['max'][axis] == -math.inf:
del self.bounds['max'][axis]
return self.bounds
def update_speed(self, s):
if self.currentSpeed == s: return False
self.currentSpeed = s
if self.maxSpeed < s: self.maxSpeed = s
return True
def get_var_cb(self, name, units):
value = 0
if len(name) and name[0] == '_':
value = self.state.get(name[1:], 0)
if units == 'IMPERIAL': value /= 25.4
return value
def log_cb(self, level, msg, filename, line, column):
if level in self.levels: level = self.levels[level]
# Ignore missing tool warning
if (level == 'warning' and
msg.startswith('Auto-creating missing tool')):
return
self.messages.append(
dict(level = level, msg = msg, filename = filename, line = line,
column = column))
def _log_cb(self, line):
line = line.strip()
m = reLogLine.match(line)
if not m: return
level = m.group('level')
msg = m.group('msg')
filename = m.group('file')
line = m.group('line')
column = m.group('column')
where = ':'.join(filter(None.__ne__, [filename, line, column]))
if line is not None: line = int(line)
if column is not None: column = int(column)
self.log_cb(level, msg, filename, line, column)
def progress(self, x):
if time.time() - self.lastProgressTime < 1 and x != 1: return
self.lastProgressTime = time.time()
p = '%.4f\n' % x
if self.lastProgress == p: return
self.lastProgress = p
sys.stdout.write(p)
sys.stdout.flush()
def _run(self):
start = time.time()
line = 0
maxLine = 0
maxLineTime = time.time()
position = {axis: 0 for axis in 'xyz'}
rapid = False
# Execute plan
try:
while self.planner.has_more():
cmd = self.planner.next()
self.planner.set_active(cmd['id']) # Release plan
# Cannot synchronize with actual machine so fake it
if self.planner.is_synchronizing(): self.planner.synchronize(0)
if cmd['type'] == 'line':
if not (cmd.get('first', False) or
cmd.get('seeking', False)):
self.time += sum(cmd['times']) / 1000
target = cmd['target']
move = {}
startPos = dict()
for axis in 'xyz':
if axis in target:
startPos[axis] = position[axis]
position[axis] = target[axis]
move[axis] = target[axis]
self.add_to_bounds(axis, move[axis])
if 'rapid' in cmd: move['rapid'] = cmd['rapid']
if 'speeds' in cmd:
unit = compute_unit(startPos, target)
for d, s in cmd['speeds']:
cur = self.currentSpeed
if self.update_speed(s):
m = compute_move(startPos, unit, d)
if cur is not None:
m['s'] = cur
yield m
move['s'] = s
yield move
elif cmd['type'] == 'set':
if cmd['name'] == 'line':
line = cmd['value']
if maxLine < line:
maxLine = line
maxLineTime = time.time()
elif cmd['name'] == 'speed':
s = cmd['value']
if self.update_speed(s): yield {'s': s}
elif cmd['type'] == 'dwell': self.time += cmd['seconds']
if args.max_time < time.time() - start:
raise Exception('Max planning time (%d sec) exceeded.' %
args.max_time)
if args.max_loop < time.time() - maxLineTime:
raise Exception('Max loop time (%d sec) exceeded.' %
args.max_loop)
if self.lines: self.progress(maxLine / self.lines)
except Exception as e:
self.log_cb('error', str(e), os.path.basename(self.path), line, 0)
def run(self):
lastS = 0
speed = 0
first = True
x, y, z = 0, 0, 0
with gzip.open('positions.gz', 'wb') as f1:
with gzip.open('speeds.gz', 'wb') as f2:
for move in self._run():
x = move.get('x', x)
y = move.get('y', y)
z = move.get('z', z)
rapid = move.get('rapid', False)
speed = move.get('s', speed)
s = struct.pack('