Creating a Custom Python Connector in HQ
This tutorial demonstrates how to create a new Connector for HQ using Python. Connectors are required to connect to content repositories to extract fields, metadata and other pieces of information. This example will connect to a file folder and extract information from CSV files. Each row in a CSV file will be an entry in the index. In addition, this example will show how to create one-to-many relationships where the CSV file entry will be linked to each row entry.
Before getting started, it is import to know how to add and manage repositories in HQ. To learn more about repositories, see Managing Repositories.
Tutorial
Step 1
Ensure the following are installed:
Python 2.7 or higher
A Python IDE (recommended)
Step 2
Go to your HQ home directory:
On Windows, the home directory defaults to the AppData directory, typically at C:\Users\<user>\AppData\Roaming\Voyager\hq.
On Linux systems, the home directory defaults to the user home directory at ~/.voyager/hq.
Step 3
Create the new Python script:
In the home directory, go to AppData\Roaming\Voyager\hq\py, which contains a Python module that includes a base class for Voyager connectors.
In the voyager directory, open the connectors directory and create a new folder named CSV
Create a new Python script named csv_connector.py in the CSV directory.
Step 4
 Edit the connector.py script and add the following code:
Â
import os
import sys
import csv
import glob
import json
import glob
import time
import datetime
from voyager import Connector
class CSVConnector(Connector):
'''This class inherits attributes and methods from the base Connector class.'''
def __init__(self):
super(CSVConnector, self).__init__()
def describe(self):
'''
Provides parameter information about the connector.
:return: A JSON object/dictionary
'''
return {
'title': 'CSV Files',
'description': 'Indexes each row in each CSV file.',
'category': ['file'],
'params': [
{
'type': 'path',
'name': 'path',
'title': 'CSV File Folder',
'description': 'The folder containing CSV files.'
},
{
'type': 'datasets',
'name': 'datasets',
'title': 'File names',
'description': 'The file names'
}
]
}
def list(self, repo):
'''
Lists the datasets in a structured repository such as a folder or database.
:param repo: Dictionary representing the repository configuration
:return: A JSON object/dictionary
'''
csv_path = str(repo['config'].get('path'))
datasets = list()
files = glob.glob(os.path.join(csv_path, "*.csv"))
for file in files:
datasets.append({'name': os.path.splitext(os.path.basename(file))[0]})
return {'datasets': datasets}
def info(self, repo, dataset):
'''''
Lists detailed info such as fields about a dataset in the repository.
:param repo: Dictionary representing the repository configuration
:param dataset: The dataset name
:return: A JSON object/dictionary
'''
import csv
csv_path = os.path.join(str(repo['config'].get('path')), dataset + '.csv')
fields = list()
with open(csv_path, 'rb') as csvfile:
reader = csv.DictReader(csvfile)
row = reader.next()
for k, v in row.items():
fields.append({'name': k, 'type': 'String'})
info = dict()
info['name'] = os.path.splitext(dataset)[0]
info['fields'] = fields
return info
def scan(self, repo, *datasets):
'''
Runs the scan of the repository.
In general this function should to connect to whatever content is being scanned and
repeatably call the function index(), passing in new index jobs.
:param repo: Dictionary representing the repository configuration
:param datasets: List of datasets. Each dataset is a dictionary of name, fields, etc.
'''
for ds in datasets:
csv_path = os.path.join(str(repo['config'].get('path')), ds['name'] + '.csv')
location_id = str(repo.get('id'))
# Get the CSV file properties such as creation date, modified date, etc.
csv_properties = self.get_csv_properties(csv_path)
csv_properties['id'] = location_id + os.path.splitext(os.path.basename(csv_path))[0]
csv_properties['repository'] = location_id
# Create an entry for the CSV file.
csv_entry = dict()
csv_entry['entry'] = {'fields': csv_properties}
# Place holder for the list of rows in the CSV. This links the rows to the CSV file entry.
csv_links = list()
# Dictionary to store row information ? fields, id, format, etc.
row_entry = dict()
# Create a new list of destination fields if they are to be included.
if 'fields' in ds and ds['fields']:
exclusion_fields = [f['name'] for f in ds['fields'] if not f['include']]
dest_field_names = [f['dest'] for f in ds['fields'] if f['include']]
else:
self.report('Not indexing any fields.')
# Open the CSV file and index each row as an entry.
with open(csv_path, 'rb') as csvfile:
reader = csv.DictReader(csvfile)
for i, row in enumerate(reader, 1):
for name in row.keys():
if name in exclusion_fields:
row.pop(name)
mapped_fields = dict(zip(dest_field_names, row.values()))
# Get correct date string from a datetime object.
for k in mapped_fields.keys():
if k.startswith('fd_'):
date_str = field_maping.format_date(mapped_fields[k])
mapped_fields[k] = date_str
mapped_fields['meta_table_name'] = os.path.basename(csv_path)
mapped_fields['format_category'] = 'Office'
mapped_fields['format_type'] = 'Record'
mapped_fields['format'] = 'text/csv-record'
row_entry['id'] = '{0}_{1}_{2}'.format(location_id, os.path.splitext(os.path.basename(csv_path))[0], i)
row_entry['entry'] = {'fields': mapped_fields}
row_entry['entry']['fields']['repository'] = location_id
self.index(row_entry)
# Append the row entry into the
csv_links.append({'relation': 'contains', 'id': row_entry['id'], 'index': False})
# Index the CSV file entry.
csv_entry['entry']['links'] = csv_links
self.index(csv_entry)
def get_csv_properties(self, file_path):
'''Gets the properties of a CSV file and creates an entry.'''
properties = dict()
properties['name'] = os.path.basename(file_path)
properties['path'] = file_path
properties['format'] = 'csv'
properties['file_accessed'] = datetime.datetime.fromtimestamp(os.path.getatime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
properties['file_modified'] = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
properties['file_created'] = datetime.datetime.fromtimestamp(os.path.getctime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
return properties
if __name__ == '__main__':
Connector.main(CSVConnector())
Step 5
Test the connector by adding a new Repository:
In HQ, go to the Repositories page and click Add New
Choose File for the Repository type and click Next - you should now see CSV Files as a file type in the drop-down menu.
Enter a name
Browse for a directory containing CSV files.
Click the File Names to list the CSV files. This is populated by calling the list() function within your connector class. You may select all files or choose individual files to index.
In addition, you can choose to map fields. When mapping fields, you will be prompted with a list of fields to include or exclude as well as an option to map field names to new names. In this example, CNTRY_name is mapped to name.
The field mapping is populated by calling the info() method in your connector class.
When you have entered all of the relevant information, click Next, then follow the rest of the workflow for creating a Repository. Â
When the Repository has been added, click Index Now.This will begin indexing the information by calling the scan() method. If you have one CSV file with 50 records, you should get 51 items in the index for that repository (one for the CSV file and one for each record).
Step 6
Learn more about the Python code:
Take some time to examine the Python code and read the documentation strings and comments. An entry is a Python dictionary with the required keys of entry and id. This entry will also include fields. The entry would look like this:
 {
'entry': {
'fields': {
'meta_table_name': 'world_countries.csv',
'name': 'Vanuatu',
'repository': 'r16524da57d1',
'format': 'text/csv-record',
'format_category': 'Office',
'fs_SQMI': '3265.07',
'fs_FIPS_CNTRY': 'NH',
'fs_STATUS': 'UNMemberState',
'fs_POP2005': '205754',
'format_type': 'Record'
}
},
'id': 'r16524da57d1_world_countries_248'
}
Additional Information
After creating and saving a repository, you can view the configuration by opening the json file located in the config directory in your HQ home location.
A log file is generated each time you index your repository. This log file is created in the HQ home directory underlogs/py/connector/<date>/…
To add status messages to this log file, you can use self.report(message) within your connector.py. This is helpful for debugging as well.
For a more advanced connector, please examine the code in the Geodatabase connector located in the gdb directory.