Skip to content

FileNameFlow: A Filename-Driven Pipeline Framework

FileNameFlow is a lightweight framework designed for data processing pipelines that rely on filenames with wildcard support.

At its core, FileNameFlow emphasizes self-descriptive filenames, driving task execution based on these patterns. This approach simplifies data processing.

FileNameFlow seamlessly integrates with distributed frameworks like Dask, expanding resource management capabilities to accommodate Concurrency, PBS, SLURM, and other distributed computation systems.

Concept 1: Filename Selection Patterns

FileNamePath

We use FileNamePath class to represented the name selection pattern.

The FileNamePath will select the files by it's pattern while ignore the file extension. The wildcard charcter is {} but it exclude the filename when separator(.) in it.

For example, if theres exists files:

sample.00.read.1.fq.gz
sample.00.read.2.fq.gz
sample.00.bwa.bam
sample.00.bwa.sort.bam
sample.00.bwa.sort.bqsr.bam
sample.01.read.1.fq.gz
sample.01.read.2.fq.gz
sample.01.bwa.bam
sample.01.04.read.2.fq.gz
sample1.bwa.csv
sample1.bowtie.csv
sample1.bowtie.filter.csv

FileNamePath listed FileNamePath
sample.{} sample.00 sample.01
sample.{}.bwa sample.00.bwa sample.01.bwa
sample.{}.bwa.sort sample.00.bwa.sort
sample.{}.read sample.00.read sample.01.read
sample.{}.read.{} sample.00.read.1 sample.00.read.2 sample.01.read.1 sample.01.read.2
sample.00.read.{} sample.00.read.1 sample.00.read.2
sample{}
sample.00.read sample.00.read
sample1.{} sample1.bwa sample1.bowtie
sample1.{}.csv sample1.bwa.csv sample1.bowtie.csv
sample1.{method}.csv sample1.bwa.csv sample1.bowtie.csv

Kind Hint, you may use ln -s to rename the file to match the pattern.

FileNamePath.list(fix)

Next, if you want to select a group instead of executing tasks one by one, you can use the fix argument to indicate that the wildcard character {} should remain fixed and unexpanded. This feature is especially useful for tasks that require a list of files as input.

FileNamePath fix listed FileNamePath
sample.{} [-1] sample.{}
sample.{}.bwa [-1] sample.{}.bwa
sample.{}.read.{} [-1] sample.00.read.{} sample.01.read.{}
sample.{}.read.{} [-2] sample.{}.read.1 sample.{}.read.2
sample.{}.read.{} [-1. -2] sample.{}.read.{}
sample.{}.read.{} [] sample.00.read.1 sample.00.read.2 sample.01.read.1 sample.01.read.2

Concept 2: Managing Workflow Steps in Filenames

In FileNameFlow, we adopt a straightforward approach to keep track of workflow steps. We save all the steps within the filename suffix, making it easy to understand what treatments have been applied to the data based on the filename.

Step Input Output File(we don't care)
download . sample.00.read sample.00.read.1.fq.gz, sample.00.read.2.fq.gz
bowtie2 sample.00.read sample.00.read.bowtie_hg19 sample.00.read.bowtie_hg19.sam
sortBam sample.00.read.bowtie_hg19 sample.00.read.bowtie_hg19.sort sample.00.read.bowtie_hg19.sort.bam
GatkBqsr sample.00.read.bowtie_hg19.sort sample.00.read.bowtie_hg19.sort.bqsr sample.00.read.bowtie_hg19.sort.bqsr.bam
GatkHC sample.00.read.bowtie_hg19.sort.bqsr sample.00.read.bowtie_hg19.sort.bqsr.hc sample.00.read.bowtie_hg19.sort.bqsr.hc.vcf.gz

Furthermore, we incorporate parameters into the filenames to ensure that files generated with different parameters are kept separate. We use abbreviations when necessary to maintain readability.

Function Input Output
bowtie2(index="hs37d5") sample.00.read sample.00.read.bowtie_hg19
bowtie2(index="hs38DH") sample.00.read sample.00.read.bowtie_hg38

Our pipeline seamlessly handles suffix concatenation (+) or wildcard replacement (apply). Setting up pipelines is a breeze using FileNamePath, and you can define functions to handle each selected filename.

Here's an example:

from functools import partial
from filenameflow import FileNamePath, FileNameTask

def bowtie2(input_name, index):
    # The function are called two times
    # where input_name =
    # 1. sample.00.read.{}
    # 2. sample.01.read.{}
    print(input_name)
    output_name = input_name + ".bowtie" + index.replace("/", "_")  # concat the suffix you want

    fqs = sorted(input_name.list())  # use build-in list to list the current path e.g. sample.00.read.1, sample.00.read.2
    os.system(f"echo bowtie {index} {fqs[0]}.fq {fqs[1]}.fq -o {output_name}.sam")  # FileNamePath works like str
    return output_name  # return the result name for furthur task chaining

# Using FileNamePath to kick start:
# FileNamePath("sample.{}.read.{}") >> partial(bowtie2, index="index/hg19")
# or using FileNameTask to start
"sample.{}.read.{}" >> FileNameTask(partial(bowtie2, index="index/hg19"), fix=[-1])

FileNameFlow simplifies complex data processing workflows by emphasizing functions for handling selected filenames, significantly reducing the need for extensive loops in your code.

Concept 3: Combining Functions Like a Pipeline

In this concept, we combine the previously discussed concepts into our pipeline. For a complete code example, please refer to the example.py file in the GitHub repository.

def download(input_name):
    # 1(indeed 0) -> many
    output_name = "data/xxx.{}.read"
    if len(FileNamePath(output_name).list()):  # skip the step if file is downloaded
        return output_name
    # wget ...
    return output_name

def bowtie2(input_name, index):
    # 1 -> 1
    # input_name = "data/xxx.{}.read"
    # output_name = "data/xxx.{}.read.index_hs37d5"
    output_name = input_name + "." + index.replace("/", "_")
    if Path(output_name + ".sam").exists():  # skip the step if file exists
        return output_name
    os.system(f"bwa {index} {input_name}.1.fq {input_name}.2.fq -o {output_name}.sam")
    return output_name

def mergeCSV(input_name):
    # many -> 1
    # input_name = "data/xxx.{}.read.index_hs37d5.depth"
    # output_name = "data/xxx_merge.read.index_hs37d5.depth"
    output_name = input_name.replace_wildcard("_merge")
    if Path(output_name + ".csv").exists():
        return output_name
    files = input_name.list()
    df = pd.concat(pd.read_csv(i + ".csv") for i in files)
    df.to_csv(output_name + ".csv", index=False)
    return output_name

def summaryCSV(input_name):
    # 1 -> 1
    # doesn't change the suffix
    df = pd.read_csv(i + ".csv").groupby("chrom").describe()
    print(df)
    return input_name

# using >> to chain the tasks
FileNamePath("") >> download >> partial(bowtie2, index="index/hs37d5") >> sortBam >> getLowReadDepthPos >> FileNameTask(mergeCSV, fix=[-1]) >> summaryCSV
# Or using compose
from filenameflow import compose
compose([
    ".",
    download,                                # 0 to many
    partial(bowtie2, index="index/hs37d5"),  # 1 to 1
    sortBam,                                 # 1 to 1
    getLowReadDepthPos,                      # 1 to 1
    FileNameTask(mergeCSV, fix=[-1]),        # many to 1
    summaryCSV,                              # 1 to 1
])

Our pipeline appears as a simple flow due to the list already being saved in the filename pattern, eliminating the need for explicit loops.

Concept 4: Shipping Your Pipeline to Other Resources

We provide two basic executors for your convenience:

  • FileNameBaseExecutor (Default): Executes tasks one by one.
  • DaskExecutor: Executes tasks using Dask, allowing you to leverage various computational resources. Refer to Dask for available resource options.
from filenameflow.executor import DaskExecutor
from dask.distributed import LocalCluster

# Set up a DaskExecutor with a LocalCluster
exe = DaskExecutor(LocalCluster())
FileNameTask.set_default_executor(exe)

# Set the executor for a specific task
"." >> download >> FileNameTask(partial(bowtie2, index="index/hs37d5"), executor=exe)
# Or set it globally
FileNameTask.set_default_executor(exe)
"." >> download >> partial(bowtie2, index="index/hs37d5")

With FileNameFlow, you can effortlessly adapt the filename pipeline to different computation environments for efficient data processing.

Conclusion

  • Streamlined Data Science: Simplify file management and processing, perfect for bioinformatics tasks involving multiple file types.
  • Simplicity: FileNameFlow streamlines pattern matching and grouping with minimal syntax, resembling string operations while offering wildcard support.
  • Self-Descriptive Filenames: Each filename serves as a self-descriptive record of data processing steps, aiding in tracking and comprehension. It's like having automatic versioning as filenames adjust with pipeline changes.
  • Flexible Filename Control: Beyond automatic wildcard listing and task execution, users can implement various rules. This includes customizing filename edits (like adding suffixes), renaming, system calls, and task skipping.
  • Dask Resource Support: Harness the power of FileNameFlow's DaskExecutor to execute pipelines on various computational resources by given clusters (e.g. local, PBS, SLURM, ...).
  • Python Integration: FileNameFlow seamlessly integrates with Python. You can use any Python packages you want.

In summary, FileNameFlow empowers data scientists to efficiently manage, process, and collaborate on data, while simplifying intricate tasks. Its versatility, simplicity, and integration make it an invaluable tool in the data science toolkit.

Installation

pip install git+https://github.com/linnil1/FileNameFlow

Run

Run example

python example.py

Document

https://linnil1.github.io/FileNameFlow

Error Class

FileNameFlowAssert

Bases: FileNameFlowError

Mostly this is internal error

FileNameFlowDataError

Bases: FileNameFlowError

Error in exeution the pipeline

FileNameFlowError

Bases: Exception

Basic exception for FileNameFlow module

FileNamePath, a core path module of FileNameFlow.

Handle the path pattern matching rule.

FileNamePath

Bases: str

FileNamePath is a specialized string class designed for managing filenames.

Attributes:

Name Type Description
self str

A string representing the current filename.

template str

The filename pattern serving as the basis for formatting filenames.

args tuple[str, ...]

A list containing positional arguments used for formatting self based on the template.

kwargs Mapping[str, str]

A dictionary containing keyword arguments used for formatting self based on the template.

This class extends the functionality of Python's built-in str type.

Wildcards: The FileNamePath class uses {} as wildcard characters and * as value in both args and kwargs.

Example
  • With args:

    • self: test.sample.regression.lasso
    • template: test.{}.regression.{}
    • args: ["sample", "lasso"]
  • With args containing a wildcard:

    • self: test.{}.regression.lasso
    • template: test.{}.regression.{}
    • args: ["*", "lasso"]
  • With kwargs:

    • self: test.{}.regression.{type}
    • template: test.{}.regression.{type}
    • args: ["*"]
    • kwargs: {"type": "*"}
Features

FileNamePath's primary feature is the ability to list files based on patterns.

FileNamePath("test.{}.regression.lasso").list() will list all files starting with test.*.regression.lasso while ignoring the file extension.

It also excludes files with separators inside wildcards, for instance:

  • test.sample1.regression.lasso (included in the listing)
  • test.sample1.regression.lasso.txt (excluded from the listing, the filename without suffix is same as previous one)
  • test.sample1.regression.lasso.alpha10.txt (excluded from the listing, the filename without suffix is same as previous one)
  • test.sample2.regression.lasso (included in the listing)
  • test.sample_bad.sample1.regression.lasso (excluded from the listing)

After listing, the number of variables in args and kwargs is preserved, but their values change. For example, test.sample1.regression.lasso has args set to ["sample1"] instead of ["*"].

Another argument for the list method is fix, which allows you to fix specific variables from the listing. Example: FileNamePath("test.{}.regression.{}").list(fix=(0, )) would yield:

  • test.{}.regression.lasso (with args=["*", "lasso"])
  • test.{}.regression.elastic
  • test.{}.regression.rigid

Operations:

  • replace_wildcard(): Replaces the "{}" wildcard with a specified string (e.g., "_merge"). Example: "test.{}.regression" becomes "test_merge.regression".

  • +: Concatenates FileNamePath objects. Example: FileNamePath("data.{}.regression") + ".lasso" results in FileNamePath("data.{}.regression.lasso"). Note that args and kwargs are preserved.

  • apply(): Applies arguments to the FileNamePath (similar to .format()). Example: FileNamePath("test.{}.regression.lasso").apply(0, "sample1") results in FileNamePath("test.merge.regression.lasso").

args: tuple[str, ...] = tuple(path.args) instance-attribute

kwargs: Mapping[str, str] = dict(**path.kwargs) instance-attribute

separator = '.' class-attribute instance-attribute

suffix_key = 'suffix_of_filename_for_filenamepath' class-attribute instance-attribute

template: str = str(self) instance-attribute

__add__(other: Any) -> FileNamePath

Concat the path.

The function carefully handle the value in template, args, kwargs after concat.

Example
FileNamePath("a.{}.c") + ".d"
# result
FileNamePath("a.{}.c.d")

__init__(path: str)

__radd__(other: Any) -> FileNamePath

Concat the path

__rrshift__(others: Any) -> Any

see compose()

__rshift__(others: Any) -> Any

see compose()

apply(key: str | int, value: str) -> FileNamePath

Similar to .format(), but return new FileName Path that keep the template, args, kwargs

Example
self: test.{}.regression.lasso
template: test.{}.regression.{}
args: ["*", "lasso"]

# After .apply(0, "sample2")
self: test.sample2.regression.{}
template: test.{}.regression.{}
args: ["sample2", "*"]

applys(kv_pairs: Iterable[tuple[str | int, str]] = ()) -> FileNamePath

Similar to .apply, but allow apply multiple things

commit() -> FileNamePath

Substitue wildcards in the template by values from args and kwargs, except the value is wildcard charater '*'.

Example
path: test.{}.regression.lasso
template: test.{}.regression.{type}
args: ["*"]
kwargs: {type: "lasso"}

# After .commit()
path: test.{}.regression.lasso
template: test.{}.regression.lasso
args: ["*"]
kwargs: {}

construct(template: str, args: Iterable[str], kwargs: Mapping[str, str]) -> FileNamePath classmethod

Construct a new FileNamePath via template, args, kwargs. The filename will be auto generated.

get_args(key: int | str) -> str

Get arg, kwargs by key (int for positional arg, str for keyword kwargs)

is_file() -> bool

Check if the FileNamePath represents a file (no wildcards in the path)

list(fix: Iterable[str | int] = ()) -> Iterable[FileNamePath]

List files that match the current(self)'s name.

Example
Path: "cohort.{}.regression.{}"
fix: [1]
# then Return:
* `cohort.sample1.regression.{}`
* `cohort.sample2.regression.{}`
* `cohort.sample3.regression.{}`

Parameters:

Name Type Description Default
fix Iterable[str | int]

The index of arg, kwargs that keep unlisted. Note that the index is the wildcard's index on path, not the index in template.

()

list_args() -> Mapping[str | int, str]

List arg, kwargs in this FileNamePath, return a dict-like object

overwrite(key: int | str, value: str) -> FileNamePath

Force to change the value in args or kwargs. see .overwrites

overwrites(kv_pairs: Iterable[tuple[str | int, str]] = ()) -> FileNamePath

Force to change the value in args or kwargs.

Example
Path: cohort.{}.regression.lasso
template: cohort.{}.regression.{}
args: ["*", "lasso"]
# after .overwrite(`[(1, "elastic")]`)
Path: cohort.{}.regression.elastic
template: cohort.{}.regression.{}
args: ["*", "elastic"]

parse(filename: str) -> tuple[str | None, tuple[str, ...], dict[str, str]]

Parse the filename into arguments and keyword arguments according to self.template. The suffix is always removed. Returns template, args, kwargs.

If the filename does not match the template, ("", [], {}) is returned.

Example
path = "test.{}.regression.{type}"
# After .parse("test.sample1.regression.lasso.txt")
template = "test.{}.regression.{type}"
args = ["sample1"]
kwargs = {"type": "lasso"}

replace_wildcard(text: str = '_merge') -> FileNamePath

Replace the wildcard, similar to .replace(".{}", "_merge"). After replacment, the arg/kwargs will be removed.

with_filename(filename: str) -> FileNamePath | None

Create a new FileNamePath by filename and extract the args, kwargs by self.template.

Returns:

Type Description
FileNamePath | None

FileNamePath | None: A new FileNamePath object if successful, else None.

unique(arr: Iterable[_T], key: Callable[[_T], Hashable]) -> Iterable[_T]

Return a unique sequence.

Parameters:

Name Type Description Default
arr Iterable[_T]

An sequence to be unique.

required
key Callable[[_T], Hashable]

A function that returns the key of an element.

required

Returns:

Type Description
Iterable[_T]

A unique sequence.

FileNameTaskFunc = Callable[..., FileNameTaskOutput] module-attribute

FileNameTaskOutput = None | str | FileNamePath module-attribute

FileNameTask

FileNameTask is the basic class that wraps your function to become a step in a pipeline. The execution can be triggered by the ">>" operator or using the compose function.

Parameters:

Name Type Description Default
func FileNameTaskFunc

A function that defines the task to be performed on given filename (path).

lambda : i
fix Iterable[str | int]

A list of integers (for positional arguments) or strings (for keyword arguments) specifying which wildcards to ungroup.

()
executor FileNameBaseExecutor | None

An optional custom executor to use for the task.

None

Example: * Usage 1: Wrap the function to FileNameTask Object

@FileNameTask.wrapper(fix=[-1])
def func_manyto1(input_name, other_arg="1"):
    return input_name.replace_wildcard()
"data.{}.arg1" >> func_manyto1
"data.{}.arg1" >> func_manyto1(other_arg="2")

  • Usage 2: Execute the task by ">>"
    def func_manyto1(input_name):
        return input_name.replace_wildcard()
    
    "data.{}.arg1" >> FileNameTask(func_manyto1, fix=[-1])
    
  • Usage 3: Execute the task by compose

    compose([
        "data.{}.arg1",
        func_manyto1,
        FileNameTask(func_manyto1)(other_arg="1"),  # or thisw
        partial(func_manyto1, other_arg="2"),  # or this
    ])
    

  • Usage 4: The output can be got by task.output

    task = compose([
        "data.{}.arg1",
        func_manyto1,
    ])
    compose([
        "data.{}.arg2",
        partial(func_manyto1, other_arg=task.output),
    ])
    

executor: FileNameBaseExecutor = func.executor instance-attribute

fix: Iterable[str | int] = fix instance-attribute

func: FileNameTaskFunc = func.func instance-attribute

input_path: FileNamePath | None = None instance-attribute

output: FileNamePath property

Get the output path of the task

output_path: FileNamePath | None = None instance-attribute

__call__(*args: Any, **kwargs: Any) -> FileNameTask

Fill/Replace function's arguments

__deepcopy__(memo: Any) -> FileNameTask

Deep copy this Object

__init__(func: FileNameTaskFunc = lambda : i, fix: Iterable[str | int] = (), executor: FileNameBaseExecutor | None = None)

__repr__() -> str

__rrshift__(others: Any) -> Any

see compose()

__rshift__(others: Any) -> Any

see compose()

run(path: FileNamePath) -> FileNameTask

Main function to excute the function

set_default_executor(executor: FileNameBaseExecutor) -> None classmethod

Change default executor

wrapper(func: FileNameTaskFunc | None = None, /, **kwargs: Any) -> FileNameTask | Callable[[FileNameTaskFunc], FileNameTask] classmethod

A decorator to create a FileNameTask() instance

Example:

from filenameflow import FileNameTask

@FileNameTask.wrapper
def doSomething(input_name):
    return input_name + ".test"

@FileNameTask.wrapper(fix=[-1])
def doSomething1(input_name):
    return input_name.replace_wildcard()

compose(func_list: Iterable[FileNameTask | FileNamePath | FileNameTaskFunc | str]) -> FileNameTask | FileNamePath

Compose and execute a sequence of tasks in a pipeline.

Parameters:

Name Type Description Default
func_list Iterable[FileNameTask | FileNamePath | FileNameTaskFunc | str]

An list containing a series of tasks such as FileNameTask, FileNamePath, Function, or filenames(str).

required

Returns:

Type Description
FileNameTask | FileNamePath

FileNameTask or FileNamePath: The last task or path in the composed pipeline.

Example
from filenameflow import compose

# Execute the tasks based on previous output filenames.
# Equivalent to task1 = "." >> doSomething >> doSomething2
task1 = compose([
    ".", doSomething, doSomething2
])

# Assert if the output of task1 is equal to the given filename.
# Equivalent to task2 = task1 >> "expected_result_path"
task2 = compose([
    task1 , "expected_result_path"
])

DaskExecutor

Bases: FileNameBaseExecutor

Run the filenameflow under Dask.

Dask is convenient resource manager for distributed computing.

Using this Executor by

FileNameTask.set_default_executor(DaskExecutor())

client = Client(cluster) instance-attribute

__init__(cluster: Any = None)

run_task(func: Callable[..., FileNameTaskOutput], names: Iterable[FileNamePath]) -> Iterable[FileNamePath | str]

Using client.submit to run all tasks

FileNameBaseExecutor

The FileNameExecutor is a base class for implementing custom task executors within the FileNameFlow framework.

post_task(path: FileNamePath) -> FileNamePath

This method is called after executing tasks. It receives the output FileNamePath and returns the modified FileNamePath.

pre_task(path: FileNamePath) -> FileNamePath

This method is called before executing a task. It receives the input FileNamePath and returns the modified FileNamePath.

run_task(func: FileNameTaskFunc, paths: Iterable[FileNamePath]) -> Iterable[FileNameTaskOutput]

This method is responsible for running the task for each filename with FileNamePath class. It yields the results of the task execution.