Creating workflows

Pipelines or workflows can be created by writing a Python script:

from nlppln import WorkflowGenerator

with WorkflowGenerator() as wf:
        txt_dir = wf.add_inputs(txt_dir='Directory')

        frogout = wf.frog_dir(dir_in=txt_dir)
        saf = wf.frog_to_saf(in_files=frogout)
        ner_stats = wf.save_ner_data(in_files=saf)
        new_saf = wf.replace_ner(metadata=ner_stats, in_files=saf)
        txt = wf.saf_to_txt(in_files=new_saf)

        wf.add_outputs(ner_stats=ner_stats, txt=txt)

        wf.save('anonymize.cwl')

This workflow finds named entities in all Dutch text files in a directory. Named entities are replaced with their type (PER, LOC, ORG). The output consists of text files and a csv file that contains the named entities that have been replaced.

The workflow creation functionality in nlppln is provided by a library called scriptcwl. For a more detailed explanation of how to create workflows, have a look at the scriptcwl documentation.

Setting workflow inputs

Wokflow inputs can be added by calling add_input():

txt_dir = wf.add_inputs(txt_dir='Directory')

The add_input() method expects a name=type pair as input parameter. The pair connects an input name (txt_dir in the example) to a CWL type ('Directory'). An overview of CWL types can be found in the specification.

Check the scriptcwl documentation to find out how to add optional workflow inputs and default values.

Adding processing steps

To add a processing step to the workflow, you have to call its method on the WorkflowGenerator object. The method expects a list of (key, value) pairs as input parameters. (To find out what inputs a step needs call wf.inputs(<step name>). This method prints all the inputs and their types.) The method returns a list of strings containing output names that can be used as input for later steps, or that can be connected to workflow outputs.

For example, to add a step called frog-dir to the workflow, the following method must be called:

frogout = wf.frog_dir(dir_in=txt_dir)

In a next step, frogout can be used as input:

saf = wf.frog_to_saf(in_files=frogout)
txt = wf.saf_to_txt(in_files=saf)

Etcetera.

Listing steps

To find out what steps are available in nlppln and to get copy/paste-able specification of what needs to be typed to add a step to the workflow you can type:

print(wf.list_steps())

The result is:

Steps
apachetika............... out_files = wf.apachetika(in_files[, tika_server])
basic-text-statistics.... metadata_out = wf.basic_text_statistics(in_files, out_file)
chunk-list-of-files...... file_list = wf.chunk_list_of_files(chunk_size, in_files)
clear-xml-elements....... out_file = wf.clear_xml_elements(element, xml_file)
copy-and-rename.......... copy = wf.copy_and_rename(in_file[, rename])
docx2txt................. out_files = wf.docx2txt(in_files)
download................. out_files = wf.download(urls)
freqs.................... freqs = wf.freqs(in_files)
frog-dir................. frogout = wf.frog_dir(in_files[, skip])
frog-filter-nes.......... filtered_nerstats = wf.frog_filter_nes(nerstats[, name])
frog-single-text......... frogout = wf.frog_single_text(in_file)
frog-to-saf.............. saf = wf.frog_to_saf(in_files)
ixa-pipe-tok............. out_file = wf.ixa_pipe_tok(language, in_file)
language................. language_csv = wf.language(dir_in)
liwc-tokenized........... liwc = wf.liwc_tokenized(in_dir, liwc_dict[, encoding])
lowercase................ out_files = wf.lowercase(in_file)
ls....................... out_files = wf.ls(in_dir[, recursive])
merge-csv................ merged = wf.merge_csv(in_files[, name])
normalize-whitespace-punctuation metadata_out = wf.normalize_whitespace_punctuation(meta_in)
pattern-nl............... out_files = wf.pattern_nl(in_files)
rename-and-copy-files.... out_files = wf.rename_and_copy_files(in_files)
replace-ner.............. out_files = wf.replace_ner(metadata, in_files[, mode])
saf-to-freqs............. freqs = wf.saf_to_freqs(in_files[, mode])
saf-to-txt............... out_files = wf.saf_to_txt(in_files)
save-dir-to-subdir....... out = wf.save_dir_to_subdir(inner_dir, outer_dir)
save-files-to-dir........ out = wf.save_files_to_dir(dir_name, in_files)
save-ner-data............ ner_statistics = wf.save_ner_data(in_files)
textDNA-generate......... json = wf.textDNA_generate(dir_in, mode[, folder_sequences, name_prefix, output_dir])
xml-to-text.............. out_files = wf.xml_to_text(in_files[, tag])

Workflows
anonymize................ ner_stats, out_files = wf.anonymize(in_files[, mode])

Setting workflow outputs

When all steps of the workflow have been added, you can specify workflow outputs by calling wf.add_outputs():

wf.add_outputs(ner_stats=ner_stats, txt=txt)

In this case the workflow has two outputs, one called ner_stats, which is a csv file and one called txt, which is a list of text files.

Saving workflows

To save a workflow call the WorkflowGenerator.save() method:

wf.save('anonymize.cwl')

Other options when saving workflows are described in the scriptcwl documentation. By default, nlppln saves workflows with embedded steps (inline=True).

Adding documentation

To add documentation to your workflow, use the set_documentation() method:

doc = """Workflow that replaces named entities in text files.

Input:
        txt_dir: directory containing text files

Output:
        ner_stats: csv-file containing statistics about named entities in the text files
        txt: text files with named enities replaced
"""
wf.set_documentation(doc)

Loading processing steps

nlppln comes with nlp functionality pre-loaded. If you need custom processing steps, you can create them using nlppln-gen. To be able to add these custom processing steps to you workflow, you have to load them into the WorkflowGenerator. To load a single CWL file, do:

wf.load(step_file='/path/to/step_or_workflow.cwl')

The step_file can also be a url.

To load all CWL files in a directory, do:

wf.load(steps_dir='/path/to/dir/with/cwl/steps/')

Using a working directory

Once you need more functionality than nlppln provides, and start creating your own processing steps, we recommend using a CWL working directory. A CWL working directory is a directory containing all available CWL specifications. To specify a working directory, do:

      from nlppln import WorkflowGenerator

with WorkflowGenerator(working_dir='path/to/working_dir') as wf:
  wf.load(steps_dir='some/path/')
  wf.load(steps_dir='some/other/path/')

  # add inputs, steps and outputs

If you use a working directory when creating pipelines, nlppln copies all CWL files to the working directory.

To copy these files manually, you can also use the nlppln_copy_cwl command on the command line:

nlppln_copy_cwl /path/to/cwl/working/dir

To copy CWL files from a different directory than the one containing the nlppln CWL files, do:

nlppln_copy_cwl --from_dir /path/to/your/dir/with/cwl/files /path/to/cwl/working/dir

If you use a working directory, please save your workflow using the wd=True option:

wf.save('workflow.cwl', wd=True)

The workflow is saved in the working directory and then copied to you specified location. Subsequently, the workflow should be run from the working directory.

Tips and tricks

Create workflows you can run for a single file

If you want to create a workflow that should be applied to each (text) file in a directory, create a workflow that performs all the steps to a single file. Then, use this workflow as a subworkflow that is scattered over a list of input files:

from nlppln import WorkflowGenerator

with WorkflowGenerator(working_dir='path/to/working_dir') as wf:
  wf.load(steps_dir='some/path/')

  in_dir = wf.add_input(in_dir='Directory')

  in_files = wf.ls(in_dir=in_dir)
  processed_files = wf.some_subworkflow(in_file=in_files, scatter='in_file', scatter_method='dotproduct' [, ...])

  wf.add_outputs(out_files=processed_files)

Having a workflow you can run for a single file makes it easier to test the workflow.

Use create_chunked_list and ls_chunk to run a workflow for a subset of files

Sometimes running a workflow for all files in a directory takes too long, and you’d like to run it for subsets of files. Using create_chunked_list, you can create a JSON file containing a division of the files in a directory in chunks. You can then create a workflow that, instead of using ls to list all files in a directory, uses ls_chunk that runs the workflow for a single chunk of files.

To create a division of the input files, do:

python -m nlppln.commands.create_chunked_list [--size 500 --out_name output.json] /path/to/directory/with/input/files

The result is a JSON file named output.json that contains numbered chunks containing 500 files each.

To run a workflow for a chunk of files, instead of all files in a directory, do:

from nlppln import WorkflowGenerator

with WorkflowGenerator(working_dir='path/to/working_dir') as wf:
  wf.load(steps_dir='some/path/')

  in_dir = wf.add_input(in_dir='Directory')
  chunks = wf.add_input(chunks='File')
  chunk_name = wf.add_input(name='string')

  in_files = wf.ls_chunk(in_dir=in_dir, chunks=chunks, name=chunk_name)
  processed_files = wf.some_subworkflow(in_file=in_files, scatter='in_file', scatter_method='dotproduct' [, ...])

  wf.add_outputs(out_files=processed_files)