...
Inside the pipeline directory, create a new Python script file named sample_pipeline.py.
Edit the Python script file and add the following code:
Code Block | ||
---|---|---|
from voyager import PipelineStep
class SampleStep(PipelineStep):
def __init__(self): super(SampleStep, self).__init__()
def info(self): """ Provides information about the pipeline step including name, title, description and parameters. Parameters are optional. :return: A JSON object/dictionary """ return { "name": "sample", "title": "Sample Step", "description": "Sample pipeline step", "params": [{ "type": "string", "name": "add", "title": "Add", "description": "Field to add" }, { "type": "string", "name": "remove", "title": "Remove", "description": "Field to remove" }] }
def run(self, entry, config): """ Runs the pipeline step. This method works by modifying the :param entry: parameter, mutating fields, etc... :param entry: The entry being indexed. :param config: The pipeline step configuration. """ print("INFO adding") add = config.get("add") if add: entry["fields"][add] = "foo"
print("INFO removing") remove = config.get("remove") if remove: entry["fields"].pop(remove)
if __name__ == "__main__": PipelineStep.main(SampleStep())
view rawsample_pipeline.py hosted with ❤ by GitHub | ||
Code Block | ||
| ||
from voyager import PipelineStep class SampleStep(PipelineStep): def __init__(self): super(SampleStep, self).__init__() def info(self): """ Provides information about the pipeline step including name, title, description and parameters. Parameters are optional. :return: A JSON object/dictionary """ return { "name": "sample", "title": "Sample Step", "description": "Sample pipeline step", "params": [{ "type": "string", "name": "add", "title": "Add", "description": "Field to add" }, { "type": "string", "name": "remove", "title": "Remove", "description": "Field to remove" }] } def run(self, entry, config): """ Runs the pipeline step. This method works by modifying the :param entry: parameter, mutating fields, etc... :param entry: The entry being indexed. :param config: The pipeline step configuration. """ print("INFO adding") add = config["add"] if add: entry["fields"][add] = "foo" print("INFO removing") remove = config.get("remove") if remove: entry["fields"].pop(remove) if __name__ == "__main__": PipelineStep.main(SampleStep()) |
...
Take some time to examine the Python code and read the documentation strings and comments. An entry which is sent to the run function is a Python dictionary with the required fields. An entry would look like this:
Code Block | ||
---|---|---|
| ||
{ 'fields': { 'meta_table_name': 'world_countries.csv', 'name': 'Vanuatu', 'repository': 'r16524da57d1', 'format': 'text/csv-record', 'format_category': 'Office', 'fs_SQMI': '3265.07', 'fs_FIPS_CNTRY': 'NH', 'fs_STATUS': 'UNMemberState', 'fs_POP2005': '205754', 'format_type': 'Record' } } |
...