FileNameFlow: A Filename-Driven Pipeline Framework
FileNameFlow is a lightweight framework designed for data processing pipelines that rely on filenames with wildcard support.
At its core, FileNameFlow emphasizes self-descriptive filenames, driving task execution based on these patterns. This approach simplifies data processing.
FileNameFlow seamlessly integrates with distributed frameworks like Dask, expanding resource management capabilities to accommodate Concurrency, PBS, SLURM, and other distributed computation systems.
Concept 1: Filename Selection Patterns
FileNamePath
We use FileNamePath
class to represented the name selection pattern.
The FileNamePath
will select the files by it's pattern while ignore the file extension.
The wildcard charcter is {}
but it exclude the filename when separator(.
) in it.
For example, if theres exists files:
sample.00.read.1.fq.gz
sample.00.read.2.fq.gz
sample.00.bwa.bam
sample.00.bwa.sort.bam
sample.00.bwa.sort.bqsr.bam
sample.01.read.1.fq.gz
sample.01.read.2.fq.gz
sample.01.bwa.bam
sample.01.04.read.2.fq.gz
sample1.bwa.csv
sample1.bowtie.csv
sample1.bowtie.filter.csv
FileNamePath | listed FileNamePath |
---|---|
sample.{} |
sample.00 sample.01 |
sample.{}.bwa |
sample.00.bwa sample.01.bwa |
sample.{}.bwa.sort |
sample.00.bwa.sort |
sample.{}.read |
sample.00.read sample.01.read |
sample.{}.read.{} |
sample.00.read.1 sample.00.read.2 sample.01.read.1 sample.01.read.2 |
sample.00.read.{} |
sample.00.read.1 sample.00.read.2 |
sample{} |
|
sample.00.read |
sample.00.read |
sample1.{} |
sample1.bwa sample1.bowtie |
sample1.{}.csv |
sample1.bwa.csv sample1.bowtie.csv |
sample1.{method}.csv |
sample1.bwa.csv sample1.bowtie.csv |
Kind Hint, you may use ln -s
to rename the file to match the pattern.
FileNamePath.list(fix)
Next, if you want to select a group instead of executing tasks one by one, you can use the fix
argument to indicate that the wildcard character {}
should remain fixed and unexpanded. This feature is especially useful for tasks that require a list of files as input.
FileNamePath | fix | listed FileNamePath |
---|---|---|
sample.{} |
[-1] |
sample.{} |
sample.{}.bwa |
[-1] |
sample.{}.bwa |
sample.{}.read.{} |
[-1] |
sample.00.read.{} sample.01.read.{} |
sample.{}.read.{} |
[-2] |
sample.{}.read.1 sample.{}.read.2 |
sample.{}.read.{} |
[-1. -2] |
sample.{}.read.{} |
sample.{}.read.{} |
[] |
sample.00.read.1 sample.00.read.2 sample.01.read.1 sample.01.read.2 |
Concept 2: Managing Workflow Steps in Filenames
In FileNameFlow, we adopt a straightforward approach to keep track of workflow steps. We save all the steps within the filename suffix, making it easy to understand what treatments have been applied to the data based on the filename.
Step | Input | Output | File(we don't care) |
---|---|---|---|
download | . |
sample.00.read |
sample.00.read.1.fq.gz , sample.00.read.2.fq.gz |
bowtie2 | sample.00.read |
sample.00.read.bowtie_hg19 |
sample.00.read.bowtie_hg19.sam |
sortBam | sample.00.read.bowtie_hg19 |
sample.00.read.bowtie_hg19.sort |
sample.00.read.bowtie_hg19.sort.bam |
GatkBqsr | sample.00.read.bowtie_hg19.sort |
sample.00.read.bowtie_hg19.sort.bqsr |
sample.00.read.bowtie_hg19.sort.bqsr.bam |
GatkHC | sample.00.read.bowtie_hg19.sort.bqsr |
sample.00.read.bowtie_hg19.sort.bqsr.hc |
sample.00.read.bowtie_hg19.sort.bqsr.hc.vcf.gz |
Furthermore, we incorporate parameters into the filenames to ensure that files generated with different parameters are kept separate. We use abbreviations when necessary to maintain readability.
Function | Input | Output |
---|---|---|
bowtie2(index="hs37d5") | sample.00.read |
sample.00.read.bowtie_hg19 |
bowtie2(index="hs38DH") | sample.00.read |
sample.00.read.bowtie_hg38 |
Our pipeline seamlessly handles suffix concatenation (+
) or wildcard replacement (apply
).
Setting up pipelines is a breeze using FileNamePath
,
and you can define functions to handle each selected filename.
Here's an example:
from functools import partial
from filenameflow import FileNamePath, FileNameTask
def bowtie2(input_name, index):
# The function are called two times
# where input_name =
# 1. sample.00.read.{}
# 2. sample.01.read.{}
print(input_name)
output_name = input_name + ".bowtie" + index.replace("/", "_") # concat the suffix you want
fqs = sorted(input_name.list()) # use build-in list to list the current path e.g. sample.00.read.1, sample.00.read.2
os.system(f"echo bowtie {index} {fqs[0]}.fq {fqs[1]}.fq -o {output_name}.sam") # FileNamePath works like str
return output_name # return the result name for furthur task chaining
# Using FileNamePath to kick start:
# FileNamePath("sample.{}.read.{}") >> partial(bowtie2, index="index/hg19")
# or using FileNameTask to start
"sample.{}.read.{}" >> FileNameTask(partial(bowtie2, index="index/hg19"), fix=[-1])
FileNameFlow simplifies complex data processing workflows by emphasizing functions for handling selected filenames, significantly reducing the need for extensive loops in your code.
Concept 3: Combining Functions Like a Pipeline
In this concept, we combine the previously discussed concepts into our pipeline.
For a complete code example, please refer to the example.py
file in the GitHub repository.
def download(input_name):
# 1(indeed 0) -> many
output_name = "data/xxx.{}.read"
if len(FileNamePath(output_name).list()): # skip the step if file is downloaded
return output_name
# wget ...
return output_name
def bowtie2(input_name, index):
# 1 -> 1
# input_name = "data/xxx.{}.read"
# output_name = "data/xxx.{}.read.index_hs37d5"
output_name = input_name + "." + index.replace("/", "_")
if Path(output_name + ".sam").exists(): # skip the step if file exists
return output_name
os.system(f"bwa {index} {input_name}.1.fq {input_name}.2.fq -o {output_name}.sam")
return output_name
def mergeCSV(input_name):
# many -> 1
# input_name = "data/xxx.{}.read.index_hs37d5.depth"
# output_name = "data/xxx_merge.read.index_hs37d5.depth"
output_name = input_name.replace_wildcard("_merge")
if Path(output_name + ".csv").exists():
return output_name
files = input_name.list()
df = pd.concat(pd.read_csv(i + ".csv") for i in files)
df.to_csv(output_name + ".csv", index=False)
return output_name
def summaryCSV(input_name):
# 1 -> 1
# doesn't change the suffix
df = pd.read_csv(i + ".csv").groupby("chrom").describe()
print(df)
return input_name
# using >> to chain the tasks
FileNamePath("") >> download >> partial(bowtie2, index="index/hs37d5") >> sortBam >> getLowReadDepthPos >> FileNameTask(mergeCSV, fix=[-1]) >> summaryCSV
# Or using compose
from filenameflow import compose
compose([
".",
download, # 0 to many
partial(bowtie2, index="index/hs37d5"), # 1 to 1
sortBam, # 1 to 1
getLowReadDepthPos, # 1 to 1
FileNameTask(mergeCSV, fix=[-1]), # many to 1
summaryCSV, # 1 to 1
])
Our pipeline appears as a simple flow due to the list already being saved in the filename pattern, eliminating the need for explicit loops.
Concept 4: Shipping Your Pipeline to Other Resources
We provide two basic executors for your convenience:
- FileNameBaseExecutor (Default): Executes tasks one by one.
- DaskExecutor: Executes tasks using Dask, allowing you to leverage various computational resources. Refer to Dask for available resource options.
from filenameflow.executor import DaskExecutor
from dask.distributed import LocalCluster
# Set up a DaskExecutor with a LocalCluster
exe = DaskExecutor(LocalCluster())
FileNameTask.set_default_executor(exe)
# Set the executor for a specific task
"." >> download >> FileNameTask(partial(bowtie2, index="index/hs37d5"), executor=exe)
# Or set it globally
FileNameTask.set_default_executor(exe)
"." >> download >> partial(bowtie2, index="index/hs37d5")
With FileNameFlow, you can effortlessly adapt the filename pipeline to different computation environments for efficient data processing.
Conclusion
- Streamlined Data Science: Simplify file management and processing, perfect for bioinformatics tasks involving multiple file types.
- Simplicity: FileNameFlow streamlines pattern matching and grouping with minimal syntax, resembling string operations while offering wildcard support.
- Self-Descriptive Filenames: Each filename serves as a self-descriptive record of data processing steps, aiding in tracking and comprehension. It's like having automatic versioning as filenames adjust with pipeline changes.
- Flexible Filename Control: Beyond automatic wildcard listing and task execution, users can implement various rules. This includes customizing filename edits (like adding suffixes), renaming, system calls, and task skipping.
- Dask Resource Support: Harness the power of FileNameFlow's DaskExecutor to execute pipelines on various computational resources by given clusters (e.g. local, PBS, SLURM, ...).
- Python Integration: FileNameFlow seamlessly integrates with Python. You can use any Python packages you want.
In summary, FileNameFlow empowers data scientists to efficiently manage, process, and collaborate on data, while simplifying intricate tasks. Its versatility, simplicity, and integration make it an invaluable tool in the data science toolkit.
Installation
pip install git+https://github.com/linnil1/FileNameFlow
Run
Run example
python example.py
Document
https://linnil1.github.io/FileNameFlow
Error Class
FileNameFlowAssert
Bases: FileNameFlowError
Mostly this is internal error
FileNameFlowDataError
Bases: FileNameFlowError
Error in exeution the pipeline
FileNameFlowError
Bases: Exception
Basic exception for FileNameFlow module
FileNamePath, a core path module of FileNameFlow.
Handle the path pattern matching rule.
FileNamePath
Bases: str
FileNamePath is a specialized string class designed for managing filenames.
Attributes:
Name | Type | Description |
---|---|---|
self |
str
|
A string representing the current filename. |
template |
str
|
The filename pattern serving as the basis for formatting filenames. |
args |
tuple[str, ...]
|
A list containing positional arguments used for formatting |
kwargs |
Mapping[str, str]
|
A dictionary containing keyword arguments used for formatting |
This class extends the functionality of Python's built-in str
type.
Wildcards:
The FileNamePath class uses {}
as wildcard characters and *
as value in both args and kwargs.
Example
-
With args:
- self:
test.sample.regression.lasso
- template:
test.{}.regression.{}
- args: [
"sample", "lasso"]
- self:
-
With args containing a wildcard:
- self:
test.{}.regression.lasso
- template:
test.{}.regression.{}
- args:
["*", "lasso"]
- self:
-
With kwargs:
- self:
test.{}.regression.{type}
- template:
test.{}.regression.{type}
- args:
["*"]
- kwargs:
{"type": "*"}
- self:
Features
FileNamePath's primary feature is the ability to list files based on patterns.
FileNamePath("test.{}.regression.lasso").list()
will list all files starting with test.*.regression.lasso
while ignoring the file extension.
It also excludes files with separators inside wildcards, for instance:
test.sample1.regression.lasso
(included in the listing)test.sample1.regression.lasso.txt
(excluded from the listing, the filename without suffix is same as previous one)test.sample1.regression.lasso.alpha10.txt
(excluded from the listing, the filename without suffix is same as previous one)test.sample2.regression.lasso
(included in the listing)test.sample_bad.sample1.regression.lasso
(excluded from the listing)
After listing, the number of variables in args
and kwargs
is preserved, but their values change.
For example, test.sample1.regression.lasso
has args
set to ["sample1"]
instead of ["*"]
.
Another argument for the list
method is fix
, which allows you to fix specific variables from the listing.
Example: FileNamePath("test.{}.regression.{}").list(fix=(0, ))
would yield:
test.{}.regression.lasso
(withargs=["*", "lasso"]
)test.{}.regression.elastic
test.{}.regression.rigid
Operations:
-
replace_wildcard()
: Replaces the "{}" wildcard with a specified string (e.g., "_merge"). Example: "test.{}.regression" becomes "test_merge.regression". -
+
: Concatenates FileNamePath objects. Example:FileNamePath("data.{}.regression") + ".lasso"
results inFileNamePath("data.{}.regression.lasso")
. Note that args and kwargs are preserved. -
apply()
: Applies arguments to the FileNamePath (similar to.format()
). Example:FileNamePath("test.{}.regression.lasso").apply(0, "sample1")
results inFileNamePath("test.merge.regression.lasso")
.
args: tuple[str, ...] = tuple(path.args)
instance-attribute
kwargs: Mapping[str, str] = dict(**path.kwargs)
instance-attribute
separator = '.'
class-attribute
instance-attribute
suffix_key = 'suffix_of_filename_for_filenamepath'
class-attribute
instance-attribute
template: str = str(self)
instance-attribute
__add__(other: Any) -> FileNamePath
Concat the path.
The function carefully handle the value in template, args, kwargs after concat.
Example
FileNamePath("a.{}.c") + ".d"
# result
FileNamePath("a.{}.c.d")
__init__(path: str)
__radd__(other: Any) -> FileNamePath
Concat the path
__rrshift__(others: Any) -> Any
see compose()
__rshift__(others: Any) -> Any
see compose()
apply(key: str | int, value: str) -> FileNamePath
Similar to .format(), but return new FileName Path that keep the template, args, kwargs
Example
self: test.{}.regression.lasso
template: test.{}.regression.{}
args: ["*", "lasso"]
# After .apply(0, "sample2")
self: test.sample2.regression.{}
template: test.{}.regression.{}
args: ["sample2", "*"]
applys(kv_pairs: Iterable[tuple[str | int, str]] = ()) -> FileNamePath
Similar to .apply, but allow apply multiple things
commit() -> FileNamePath
Substitue wildcards in the template by values from args and kwargs, except the value is wildcard charater '*'.
Example
path: test.{}.regression.lasso
template: test.{}.regression.{type}
args: ["*"]
kwargs: {type: "lasso"}
# After .commit()
path: test.{}.regression.lasso
template: test.{}.regression.lasso
args: ["*"]
kwargs: {}
construct(template: str, args: Iterable[str], kwargs: Mapping[str, str]) -> FileNamePath
classmethod
Construct a new FileNamePath via template, args, kwargs. The filename will be auto generated.
get_args(key: int | str) -> str
Get arg, kwargs by key (int for positional arg, str for keyword kwargs)
is_file() -> bool
Check if the FileNamePath represents a file (no wildcards in the path)
list(fix: Iterable[str | int] = ()) -> Iterable[FileNamePath]
List files that match the current(self)'s name.
Example
Path: "cohort.{}.regression.{}"
fix: [1]
# then Return:
* `cohort.sample1.regression.{}`
* `cohort.sample2.regression.{}`
* `cohort.sample3.regression.{}`
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fix |
Iterable[str | int]
|
The index of arg, kwargs that keep unlisted. Note that the index is the wildcard's index on path, not the index in template. |
()
|
list_args() -> Mapping[str | int, str]
List arg, kwargs in this FileNamePath, return a dict-like object
overwrite(key: int | str, value: str) -> FileNamePath
Force to change the value in args or kwargs. see .overwrites
overwrites(kv_pairs: Iterable[tuple[str | int, str]] = ()) -> FileNamePath
Force to change the value in args or kwargs.
Example
Path: cohort.{}.regression.lasso
template: cohort.{}.regression.{}
args: ["*", "lasso"]
# after .overwrite(`[(1, "elastic")]`)
Path: cohort.{}.regression.elastic
template: cohort.{}.regression.{}
args: ["*", "elastic"]
parse(filename: str) -> tuple[str | None, tuple[str, ...], dict[str, str]]
Parse the filename into arguments and keyword arguments according to self.template. The suffix is always removed. Returns template, args, kwargs.
If the filename does not match the template, ("", [], {})
is returned.
Example
path = "test.{}.regression.{type}"
# After .parse("test.sample1.regression.lasso.txt")
template = "test.{}.regression.{type}"
args = ["sample1"]
kwargs = {"type": "lasso"}
replace_wildcard(text: str = '_merge') -> FileNamePath
Replace the wildcard, similar to .replace(".{}", "_merge")
.
After replacment, the arg/kwargs will be removed.
with_filename(filename: str) -> FileNamePath | None
Create a new FileNamePath by filename and extract the args, kwargs by self.template.
Returns:
Type | Description |
---|---|
FileNamePath | None
|
FileNamePath | None: A new FileNamePath object if successful, else None. |
unique(arr: Iterable[_T], key: Callable[[_T], Hashable]) -> Iterable[_T]
Return a unique sequence.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
Iterable[_T]
|
An sequence to be unique. |
required |
key |
Callable[[_T], Hashable]
|
A function that returns the key of an element. |
required |
Returns:
Type | Description |
---|---|
Iterable[_T]
|
A unique sequence. |
FileNameTaskFunc = Callable[..., FileNameTaskOutput]
module-attribute
FileNameTaskOutput = None | str | FileNamePath
module-attribute
FileNameTask
FileNameTask is the basic class that wraps your function to become a step in a pipeline. The execution can be triggered by the ">>" operator or using the compose function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
FileNameTaskFunc
|
A function that defines the task to be performed on given filename (path). |
lambda : i
|
fix |
Iterable[str | int]
|
A list of integers (for positional arguments) or strings (for keyword arguments) specifying which wildcards to ungroup. |
()
|
executor |
FileNameBaseExecutor | None
|
An optional custom executor to use for the task. |
None
|
Example: * Usage 1: Wrap the function to FileNameTask Object
@FileNameTask.wrapper(fix=[-1])
def func_manyto1(input_name, other_arg="1"):
return input_name.replace_wildcard()
"data.{}.arg1" >> func_manyto1
"data.{}.arg1" >> func_manyto1(other_arg="2")
- Usage 2:
Execute the task by ">>"
def func_manyto1(input_name): return input_name.replace_wildcard() "data.{}.arg1" >> FileNameTask(func_manyto1, fix=[-1])
-
Usage 3: Execute the task by compose
compose([ "data.{}.arg1", func_manyto1, FileNameTask(func_manyto1)(other_arg="1"), # or thisw partial(func_manyto1, other_arg="2"), # or this ])
-
Usage 4: The output can be got by task.output
task = compose([ "data.{}.arg1", func_manyto1, ]) compose([ "data.{}.arg2", partial(func_manyto1, other_arg=task.output), ])
executor: FileNameBaseExecutor = func.executor
instance-attribute
fix: Iterable[str | int] = fix
instance-attribute
func: FileNameTaskFunc = func.func
instance-attribute
input_path: FileNamePath | None = None
instance-attribute
output: FileNamePath
property
Get the output path of the task
output_path: FileNamePath | None = None
instance-attribute
__call__(*args: Any, **kwargs: Any) -> FileNameTask
Fill/Replace function's arguments
__deepcopy__(memo: Any) -> FileNameTask
Deep copy this Object
__init__(func: FileNameTaskFunc = lambda : i, fix: Iterable[str | int] = (), executor: FileNameBaseExecutor | None = None)
__repr__() -> str
__rrshift__(others: Any) -> Any
see compose()
__rshift__(others: Any) -> Any
see compose()
run(path: FileNamePath) -> FileNameTask
Main function to excute the function
set_default_executor(executor: FileNameBaseExecutor) -> None
classmethod
Change default executor
wrapper(func: FileNameTaskFunc | None = None, /, **kwargs: Any) -> FileNameTask | Callable[[FileNameTaskFunc], FileNameTask]
classmethod
A decorator to create a FileNameTask() instance
Example:
from filenameflow import FileNameTask
@FileNameTask.wrapper
def doSomething(input_name):
return input_name + ".test"
@FileNameTask.wrapper(fix=[-1])
def doSomething1(input_name):
return input_name.replace_wildcard()
compose(func_list: Iterable[FileNameTask | FileNamePath | FileNameTaskFunc | str]) -> FileNameTask | FileNamePath
Compose and execute a sequence of tasks in a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func_list |
Iterable[FileNameTask | FileNamePath | FileNameTaskFunc | str]
|
An list containing a series of tasks such as FileNameTask, FileNamePath, Function, or filenames(str). |
required |
Returns:
Type | Description |
---|---|
FileNameTask | FileNamePath
|
FileNameTask or FileNamePath: The last task or path in the composed pipeline. |
Example
from filenameflow import compose
# Execute the tasks based on previous output filenames.
# Equivalent to task1 = "." >> doSomething >> doSomething2
task1 = compose([
".", doSomething, doSomething2
])
# Assert if the output of task1 is equal to the given filename.
# Equivalent to task2 = task1 >> "expected_result_path"
task2 = compose([
task1 , "expected_result_path"
])
DaskExecutor
Bases: FileNameBaseExecutor
Run the filenameflow under Dask.
Dask is convenient resource manager for distributed computing.
Using this Executor by
FileNameTask.set_default_executor(DaskExecutor())
client = Client(cluster)
instance-attribute
__init__(cluster: Any = None)
run_task(func: Callable[..., FileNameTaskOutput], names: Iterable[FileNamePath]) -> Iterable[FileNamePath | str]
Using client.submit to run all tasks
FileNameBaseExecutor
The FileNameExecutor is a base class for implementing custom task executors within the FileNameFlow framework.
post_task(path: FileNamePath) -> FileNamePath
This method is called after executing tasks. It receives the output FileNamePath and returns the modified FileNamePath.
pre_task(path: FileNamePath) -> FileNamePath
This method is called before executing a task. It receives the input FileNamePath and returns the modified FileNamePath.
run_task(func: FileNameTaskFunc, paths: Iterable[FileNamePath]) -> Iterable[FileNameTaskOutput]
This method is responsible for running the task for each filename with FileNamePath class. It yields the results of the task execution.