Adding Custom Processes

In Voyager version 1.9.3, you can add custom processing tasks using Python scripts. There are two files you need to create so that the custom task will appear in the Processing Task Manager. The first file is a JSON file; the second file contains the Python script.

Creating the JSON file

The JSON file stores the parameters that the Python script will load. This file should be in the {Install Dir}\app\py\processing\info folder. The filename must end with .info.json, for example my_function_py.info.json.

Here is a sample .info.json file (copy_files.info.json), included with the Voyager installation:

{ "name": "copy_files", "runner": "python", "categories": ["download"], "params": [ { "type": "VoyagerResults", "name": "input_items", "required": true }, { "type": "FolderLocation", "name": "target_folder", "required": true }, { "type": "CheckBox", "name": "flatten_results" } ], "display": { "en": { "display": "Copy Files", "description": "Copies files to a target folder", "helpURL": "https://github.com/voyagersearch/tasks/tree/master/docs#copy_files", "params": { "target_folder": { "display": "Target Folder", "description": "The folder where all files are copied. If it does not exist, it will be created." }, "flatten_results": { "display": "Flatten Results", "description": "Place all items in the root folder" } } } } }

Creating the Python Script

The second file you need to create is the Python script itself, which should be saved in

{Install Dir}\app\py\processing\info

and must have the same name as the info.json file you created earlier. in this example the info filename is copy_files.info.json, so the python filename is copy_files.py

Here are the contents of copy_files.py (note that the python file must have an execute statement or it will fail to run).  When you have created both the info.json file and the accompanying Python script, your task appears in the Process > Task Manager window (if it doesn't show up at first, click the refresh icon at the far right).

# -*- coding: utf-8 -*- # (C) Copyright 2014 Voyager Search # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #from __future__ import unicode_literals import os import shutil from voyager_tasks.utils import status from voyager_tasks.utils import task_utils def execute(request): """Copies files to a target folder. :param request: json as a dict. """ parameters = request['params'] input_items = task_utils.get_input_items(parameters) target_folder = task_utils.get_parameter_value(parameters, 'target_folder', 'value') flatten_results = task_utils.get_parameter_value(parameters, 'flatten_results', 'value') if not flatten_results: target_dirs = os.path.splitdrive(target_folder)[1] flatten_results = 'false' if not os.path.exists(request['folder']): os.makedirs(request['folder']) i = 1. copied = 0 skipped = 0 errors = 0 file_count = len(input_items) shp_files = ('shp', 'shx', 'sbn', 'dbf', 'prj', 'cpg', 'shp.xml', 'dbf.xml') sdc_files = ('sdc', 'sdi', 'sdc.xml', 'sdc.prj') status_writer = status.Writer() status_writer.send_percent(0.0, _('Starting to process...'), 'copy_files') for src_file in input_items: try: if os.path.isfile(src_file) or src_file.endswith('.gdb'): if flatten_results == 'false': # Maintain source file's folder structure. copy_dirs = os.path.splitdrive(os.path.dirname(src_file))[1] if not copy_dirs == target_dirs: dst = target_folder + copy_dirs if not os.path.exists(dst): os.makedirs(dst) else: if not os.path.exists(target_folder): dst = target_folder os.makedirs(dst) else: dst = target_folder if os.path.isfile(src_file): if src_file.endswith('.shp'): all_files = task_utils.list_files(src_file, shp_files) elif src_file.endswith('.sdc'): all_files = task_utils.list_files(src_file, sdc_files) else: all_files = [src_file] for f in all_files: shutil.copy2(f, dst) else: shutil.copytree(src_file, os.path.join(dst, os.path.basename(src_file))) status_writer.send_percent(i/file_count, _('Copied: {0}').format(src_file), 'copy_files') copied += 1 else: status_writer.send_percent( i/file_count, _('{0} is not a file or does no exist').format(src_file), 'copy_files' ) skipped += 1 except IOError as io_err: status_writer.send_percent( i/file_count, _('Skipped: {0}').format(src_file), 'copy_files') status_writer.send_status(_('FAIL: {0}').format(repr(io_err))) errors += 1 pass try: shutil.copy2(os.path.join(os.path.dirname(__file__), 'supportfiles', '_thumb.png'), request['folder']) except IOError: pass # Update state if necessary. if errors > 0 or skipped > 0: status_writer.send_state(status.STAT_WARNING, _('{0} results could not be processed').format(skipped + errors)) task_utils.report(os.path.join(request['folder'], '_report.json'), copied, skipped, errors)