...
Edit the connector.py script and add the following code:
<script src="
https://gist.github.com/voyagerbot/427fbdda32e1b05601577472e42ab7bd.js%22
</script>>
Code Block | ||
---|---|---|
| ||
import os
import sys
import csv
import glob
import json
import glob
import time
import datetime
from voyager import Connector
class CSVConnector(Connector):
'''This class inherits attributes and methods from the base Connector class.'''
def __init__(self):
super(CSVConnector, self).__init__()
def describe(self):
'''
Provides parameter information about the connector.
:return: A JSON object/dictionary
'''
return {
'title': 'CSV Files',
'description': 'Indexes each row in each CSV file.',
'category': ['file'],
'params': [
{
'type': 'path',
'name': 'path',
'title': 'CSV File Folder',
'description': 'The folder containing CSV files.'
},
{
'type': 'datasets',
'name': 'datasets',
'title': 'File names',
'description': 'The file names'
}
]
}
def list(self, repo):
'''
Lists the datasets in a structured repository such as a folder or database.
:param repo: Dictionary representing the repository configuration
:return: A JSON object/dictionary
'''
csv_path = str(repo['config'].get('path'))
datasets = list()
files = glob.glob(os.path.join(csv_path, "*.csv"))
for file in files:
datasets.append({'name': os.path.splitext(os.path.basename(file))[0]})
return {'datasets': datasets}
def info(self, repo, dataset):
'''''
Lists detailed info such as fields about a dataset in the repository.
:param repo: Dictionary representing the repository configuration
:param dataset: The dataset name
:return: A JSON object/dictionary
'''
import csv
csv_path = os.path.join(str(repo['config'].get('path')), dataset + '.csv')
fields = list()
with open(csv_path, 'rb') as csvfile:
reader = csv.DictReader(csvfile)
row = reader.next()
for k, v in row.items():
fields.append({'name': k, 'type': 'String'})
info = dict()
info['name'] = os.path.splitext(dataset)[0]
info['fields'] = fields
return info
def scan(self, repo, *datasets):
'''
Runs the scan of the repository.
In general this function should to connect to whatever content is being scanned and
repeatably call the function index(), passing in new index jobs.
:param repo: Dictionary representing the repository configuration
:param datasets: List of datasets. Each dataset is a dictionary of name, fields, etc.
'''
for ds in datasets:
csv_path = os.path.join(str(repo['config'].get('path')), ds['name'] + '.csv')
location_id = str(repo.get('id'))
# Get the CSV file properties such as creation date, modified date, etc.
csv_properties = self.get_csv_properties(csv_path)
csv_properties['id'] = location_id + os.path.splitext(os.path.basename(csv_path))[0]
csv_properties['repository'] = location_id
# Create an entry for the CSV file.
csv_entry = dict()
csv_entry['entry'] = {'fields': csv_properties}
# Place holder for the list of rows in the CSV. This links the rows to the CSV file entry.
csv_links = list()
# Dictionary to store row information ? fields, id, format, etc.
row_entry = dict()
# Create a new list of destination fields if they are to be included.
if 'fields' in ds and ds['fields']:
exclusion_fields = [f['name'] for f in ds['fields'] if not f['include']]
dest_field_names = [f['dest'] for f in ds['fields'] if f['include']]
else:
self.report('Not indexing any fields.')
# Open the CSV file and index each row as an entry.
with open(csv_path, 'rb') as csvfile:
reader = csv.DictReader(csvfile)
for i, row in enumerate(reader, 1):
for name in row.keys():
if name in exclusion_fields:
row.pop(name)
mapped_fields = dict(zip(dest_field_names, row.values()))
# Get correct date string from a datetime object.
for k in mapped_fields.keys():
if k.startswith('fd_'):
date_str = field_maping.format_date(mapped_fields[k])
mapped_fields[k] = date_str
mapped_fields['meta_table_name'] = os.path.basename(csv_path)
mapped_fields['format_category'] = 'Office'
mapped_fields['format_type'] = 'Record'
mapped_fields['format'] = 'text/csv-record'
row_entry['id'] = '{0}_{1}_{2}'.format(location_id, os.path.splitext(os.path.basename(csv_path))[0], i)
row_entry['entry'] = {'fields': mapped_fields}
row_entry['entry']['fields']['repository'] = location_id
self.index(row_entry)
# Append the row entry into the
csv_links.append({'relation': 'contains', 'id': row_entry['id'], 'index': False})
# Index the CSV file entry.
csv_entry['entry']['links'] = csv_links
self.index(csv_entry)
def get_csv_properties(self, file_path):
'''Gets the properties of a CSV file and creates an entry.'''
properties = dict()
properties['name'] = os.path.basename(file_path)
properties['path'] = file_path
properties['format'] = 'csv'
properties['file_accessed'] = datetime.datetime.fromtimestamp(os.path.getatime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
properties['file_modified'] = datetime.datetime.fromtimestamp(os.path.getmtime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
properties['file_created'] = datetime.datetime.fromtimestamp(os.path.getctime(file_path)).strftime('%Y-%m-%dT%H:%M:%S.%f%Z')
return properties
if __name__ == '__main__':
Connector.main(CSVConnector()) |
Step 5
Test the connector by adding a new Repository:
...