Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Inside the pipeline directory, create a new Python script file named sample_pipeline.py.

  • Edit the Python script file and add the following code:

Code Block
from voyager import PipelineStep
class SampleStep(PipelineStep):
    def __init__(self):        super(SampleStep, self).__init__()
    def info(self):	"""        Provides information about the pipeline step including name, title, description and parameters. Parameters are optional.        :return: A JSON object/dictionary        """        return {            "name": "sample",            "title": "Sample Step",            "description": "Sample pipeline step",            "params": [{                "type": "string",                "name": "add",                "title": "Add",                "description": "Field to add"            }, {                "type": "string",                "name": "remove",                "title": "Remove",                "description": "Field to remove"            }]        }
    def run(self, entry, config):	"""        Runs the pipeline step.        This method works by modifying the :param entry: parameter, mutating fields, etc...        :param entry: The entry being indexed.        :param config: The pipeline step configuration.        """        print("INFO adding")        add = config.get("add")        if add:            entry["fields"][add] = "foo"
        print("INFO removing")        remove = config.get("remove")        if remove:            entry["fields"].pop(remove)
if __name__ == "__main__":    PipelineStep.main(SampleStep())
view rawsample_pipeline.py hosted with ❤ by GitHub
Code Block
languagepy
from voyager import PipelineStep


class SampleStep(PipelineStep):

    def __init__(self):
        super(SampleStep, self).__init__()

    def info(self):
        """
        Provides information about the pipeline step including name, title, description and parameters. Parameters are optional.
        :return: A JSON object/dictionary
        """

        return {
            "name": "sample",
            "title": "Sample Step",
            "description": "Sample pipeline step",
            "params": [{
                "type": "string",
                "name": "add",
                "title": "Add",
                "description": "Field to add"
            }, {
                "type": "string",
                "name": "remove",
                "title": "Remove",
                "description": "Field to remove"
            }]
        }

    def run(self, entry, config):
        """
        Runs the pipeline step.
        This method works by modifying the :param entry: parameter, mutating fields, etc...

        :param entry: The entry being indexed.
        :param config: The pipeline step configuration.
        """

        print("INFO adding")
        add = config["add"]
        if add:
            entry["fields"][add] = "foo"

        print("INFO removing")
        remove = config.get("remove")
        if remove:
            entry["fields"].pop(remove)


if __name__ == "__main__":
    PipelineStep.main(SampleStep())

...

Take some time to examine the Python code and read the documentation strings and comments. An entry which is sent to the run function is a Python dictionary with the required fields. An entry would look like this:

Code Block
languagejson
{  
        'fields': {  
            'meta_table_name': 'world_countries.csv',  
            'name': 'Vanuatu',  
            'repository': 'r16524da57d1',  
            'format': 'text/csv-record',  
            'format_category': 'Office',  
            'fs_SQMI': '3265.07',  
            'fs_FIPS_CNTRY': 'NH',  
            'fs_STATUS': 'UNMemberState',  
            'fs_POP2005': '205754',  
            'format_type': 'Record'  
        }  
}  

...