Adding Custom Processes
In Voyager version 1.9.3, you can add custom processing tasks using Python scripts. There are two files you need to create so that the custom task will appear in the Processing Task Manager. The first file is a JSON file; the second file contains the Python script.
Creating the JSON file
The JSON file stores the parameters that the Python script will load. This file should be in the {Install Dir}\app\py\processing\info folder. The filename must end with .info.json, for example my_function_py.info.json.
Here is a sample .info.json file (copy_files.info.json), included with the Voyager installation:
{
"name": "copy_files",
"runner": "python",
"categories": ["download"],
"params": [
{
"type": "VoyagerResults",
"name": "input_items",
"required": true
},
{
"type": "FolderLocation",
"name": "target_folder",
"required": true
},
{
"type": "CheckBox",
"name": "flatten_results"
}
],
"display":
{
"en":
{
"display": "Copy Files",
"description": "Copies files to a target folder",
"helpURL": "https://github.com/voyagersearch/tasks/tree/master/docs#copy_files",
"params":
{
"target_folder": {
"display": "Target Folder",
"description": "The folder where all files are copied. If it does not exist, it will be created."
},
"flatten_results": {
"display": "Flatten Results",
"description": "Place all items in the root folder"
}
}
}
}
}
Creating the Python Script
The second file you need to create is the Python script itself, which should be saved in
{Install Dir}\app\py\processing\info
and must have the same name as the info.json file you created earlier. in this example the info filename is copy_files.info.json, so the python filename is copy_files.py.Â
Here are the contents of copy_files.py (note that the python file must have an execute statement or it will fail to run).  When you have created both the info.json file and the accompanying Python script, your task appears in the Process > Task Manager window (if it doesn't show up at first, click the refresh icon at the far right).
# -*- coding: utf-8 -*-
# (C) Copyright 2014 Voyager Search
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#from __future__ import unicode_literals
import os
import shutil
from voyager_tasks.utils import status
from voyager_tasks.utils import task_utils
def execute(request):
"""Copies files to a target folder.
:param request: json as a dict.
"""
parameters = request['params']
input_items = task_utils.get_input_items(parameters)
target_folder = task_utils.get_parameter_value(parameters, 'target_folder', 'value')
flatten_results = task_utils.get_parameter_value(parameters, 'flatten_results', 'value')
if not flatten_results:
target_dirs = os.path.splitdrive(target_folder)[1]
flatten_results = 'false'
if not os.path.exists(request['folder']):
os.makedirs(request['folder'])
i = 1.
copied = 0
skipped = 0
errors = 0
file_count = len(input_items)
shp_files = ('shp', 'shx', 'sbn', 'dbf', 'prj', 'cpg', 'shp.xml', 'dbf.xml')
sdc_files = ('sdc', 'sdi', 'sdc.xml', 'sdc.prj')
status_writer = status.Writer()
status_writer.send_percent(0.0, _('Starting to process...'), 'copy_files')
for src_file in input_items:
try:
if os.path.isfile(src_file) or src_file.endswith('.gdb'):
if flatten_results == 'false':
# Maintain source file's folder structure.
copy_dirs = os.path.splitdrive(os.path.dirname(src_file))[1]
if not copy_dirs == target_dirs:
dst = target_folder + copy_dirs
if not os.path.exists(dst):
os.makedirs(dst)
else:
if not os.path.exists(target_folder):
dst = target_folder
os.makedirs(dst)
else:
dst = target_folder
if os.path.isfile(src_file):
if src_file.endswith('.shp'):
all_files = task_utils.list_files(src_file, shp_files)
elif src_file.endswith('.sdc'):
all_files = task_utils.list_files(src_file, sdc_files)
else:
all_files = [src_file]
for f in all_files:
shutil.copy2(f, dst)
else:
shutil.copytree(src_file, os.path.join(dst, os.path.basename(src_file)))
status_writer.send_percent(i/file_count, _('Copied: {0}').format(src_file), 'copy_files')
copied += 1
else:
status_writer.send_percent(
i/file_count,
_('{0} is not a file or does no exist').format(src_file),
'copy_files'
)
skipped += 1
except IOError as io_err:
status_writer.send_percent(
i/file_count, _('Skipped: {0}').format(src_file), 'copy_files')
status_writer.send_status(_('FAIL: {0}').format(repr(io_err)))
errors += 1
pass
try:
shutil.copy2(os.path.join(os.path.dirname(__file__), 'supportfiles', '_thumb.png'), request['folder'])
except IOError:
pass
# Update state if necessary.
if errors > 0 or skipped > 0:
status_writer.send_state(status.STAT_WARNING, _('{0} results could not be processed').format(skipped + errors))
task_utils.report(os.path.join(request['folder'], '_report.json'), copied, skipped, errors)
Â