Module dcm2bids.utils
View Source
# -*- coding: utf-8 -*-
import csv
import json
import logging
import os
from pathlib import Path
import re
from collections import OrderedDict
import shlex
import shutil
from subprocess import check_output
class DEFAULT(object):
""" Default values of the package"""
# cli dcm2bids
cliLogLevel = "INFO"
EPILOG="Documentation at https://github.com/unfmontreal/Dcm2Bids"
# dcm2bids.py
outputDir = Path.cwd()
session = "" # also Participant object
clobber = False
forceDcm2niix = False
defaceTpl = None
logLevel = "WARNING"
# dcm2niix.py
dcm2niixOptions = "-b y -ba y -z y -f '%3s_%f_%p_%t'"
dcm2niixVersion = "v1.0.20181125"
# sidecar.py
compKeys = ["SeriesNumber", "AcquisitionTime", "SidecarFilename"]
searchMethod = "fnmatch"
searchMethodChoices = ["fnmatch", "re"]
runTpl = "_run-{:02d}"
caseSensitive = True
# Entity table:
# https://bids-specification.readthedocs.io/en/v1.7.0/99-appendices/04-entity-table.html
entityTableKeys = ["sub", "ses", "task", "acq", "ce", "rec", "dir",
"run", "mod", "echo", "flip", "inv", "mt", "part",
"recording"]
# misc
tmpDirName = "tmp_dcm2bids"
helperDir = "helper"
def load_json(filename):
""" Load a JSON file
Args:
filename (str): Path of a JSON file
Return:
Dictionnary of the JSON file
"""
with open(filename, "r") as f:
data = json.load(f, object_pairs_hook=OrderedDict)
return data
def save_json(filename, data):
with filename.open("w") as f:
json.dump(data, f, indent=4)
def write_txt(filename, lines):
with open(filename, "a") as f:
for row in lines:
f.write("%s\n" % row)
def write_participants(filename, participants):
with open(filename, "w") as f:
writer = csv.DictWriter(f, delimiter="\t", fieldnames=participants[0].keys())
writer.writeheader()
writer.writerows(participants)
def read_participants(filename):
if not os.path.exists(filename):
return []
with open(filename, "r") as f:
reader = csv.DictReader(f, delimiter="\t")
return [row for row in reader]
def splitext_(path, extensions=None):
""" Split the extension from a pathname
Handle case with extensions with '.' in it
Args:
path (str): A path to split
extensions (list): List of special extensions
Returns:
(root, ext): ext may be empty
"""
if extensions is None:
extensions = [".nii.gz"]
for ext in extensions:
if path.endswith(ext):
return path[: -len(ext)], path[-len(ext) :]
return os.path.splitext(path)
def run_shell_command(commandLine):
""" Wrapper of subprocess.check_output
Returns:
Run command with arguments and return its output
"""
logger = logging.getLogger(__name__)
logger.info("Running %s", commandLine)
return check_output(commandLine)
def valid_path(in_path, type="folder"):
"""Assert that file exists.
Parameters
----------
required_file: Path
Path to be checked.
"""
if isinstance(in_path, str):
in_path = Path(in_path)
if type == 'folder':
if in_path.is_dir() or in_path.parent.is_dir():
return in_path
else:
raise NotADirectoryError(in_path)
elif type == "file":
if in_path.is_file():
return in_path
else:
raise FileNotFoundError(in_path)
raise TypeError(type)
def assert_dirs_empty(parser, args, required):
"""
Assert that all directories exist are empty.
If dirs exist and not empty, and --force is used, delete dirs.
Parameters
----------
parser: argparse.ArgumentParser object
Parser.
args: argparse namespace
Argument list.
required: string or list of paths to files
Required paths to be checked.
create_dir: bool
If true, create the directory if it does not exist.
"""
def check(path):
if not path.is_dir():
return
if not any(path.iterdir()):
return
if not args.overwrite:
parser.error(
f"Output directory {path} isn't empty, so some files "
"could be overwritten or deleted.\nRerun the command with "
"--force option to overwrite existing output files.")
else:
for the_file in path.iterdir():
file_path = path / the_file
try:
if file_path.is_file():
file_path.unlink()
elif file_path.is_dir():
shutil.rmtree(file_path)
except Exception as e:
print(e)
if isinstance(required, str) or isinstance(required, Path):
required = [Path(required)]
for cur_dir in required:
check(cur_dir)
Functions
assert_dirs_empty
def assert_dirs_empty(
parser,
args,
required
)
Assert that all directories exist are empty.
If dirs exist and not empty, and --force is used, delete dirs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parser | argparse.ArgumentParser object | Parser. | None |
args | argparse namespace | Argument list. | None |
required | string or list of paths to files | Required paths to be checked. | None |
create_dir | bool | If true, create the directory if it does not exist. | None |
View Source
def assert_dirs_empty(parser, args, required):
"""
Assert that all directories exist are empty.
If dirs exist and not empty, and --force is used, delete dirs.
Parameters
----------
parser: argparse.ArgumentParser object
Parser.
args: argparse namespace
Argument list.
required: string or list of paths to files
Required paths to be checked.
create_dir: bool
If true, create the directory if it does not exist.
"""
def check(path):
if not path.is_dir():
return
if not any(path.iterdir()):
return
if not args.overwrite:
parser.error(
f"Output directory {path} isn't empty, so some files "
"could be overwritten or deleted.\nRerun the command with "
"--force option to overwrite existing output files.")
else:
for the_file in path.iterdir():
file_path = path / the_file
try:
if file_path.is_file():
file_path.unlink()
elif file_path.is_dir():
shutil.rmtree(file_path)
except Exception as e:
print(e)
if isinstance(required, str) or isinstance(required, Path):
required = [Path(required)]
for cur_dir in required:
check(cur_dir)
load_json
def load_json(
filename
)
Load a JSON file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filename | str | Path of a JSON file | None |
View Source
def load_json(filename):
""" Load a JSON file
Args:
filename (str): Path of a JSON file
Return:
Dictionnary of the JSON file
"""
with open(filename, "r") as f:
data = json.load(f, object_pairs_hook=OrderedDict)
return data
read_participants
def read_participants(
filename
)
View Source
def read_participants(filename):
if not os.path.exists(filename):
return []
with open(filename, "r") as f:
reader = csv.DictReader(f, delimiter="\t")
return [row for row in reader]
run_shell_command
def run_shell_command(
commandLine
)
Wrapper of subprocess.check_output
Returns:
Type | Description |
---|---|
None | Run command with arguments and return its output |
View Source
def run_shell_command(commandLine):
""" Wrapper of subprocess.check_output
Returns:
Run command with arguments and return its output
"""
logger = logging.getLogger(__name__)
logger.info("Running %s", commandLine)
return check_output(commandLine)
save_json
def save_json(
filename,
data
)
View Source
def save_json(filename, data):
with filename.open("w") as f:
json.dump(data, f, indent=4)
splitext_
def splitext_(
path,
extensions=None
)
Split the extension from a pathname
Handle case with extensions with '.' in it
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path | str | A path to split | None |
extensions | list | List of special extensions | None |
Returns:
Type | Description |
---|---|
None | (root, ext): ext may be empty |
View Source
def splitext_(path, extensions=None):
""" Split the extension from a pathname
Handle case with extensions with '.' in it
Args:
path (str): A path to split
extensions (list): List of special extensions
Returns:
(root, ext): ext may be empty
"""
if extensions is None:
extensions = [".nii.gz"]
for ext in extensions:
if path.endswith(ext):
return path[: -len(ext)], path[-len(ext) :]
return os.path.splitext(path)
valid_path
def valid_path(
in_path,
type='folder'
)
Assert that file exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
required_file | Path | Path to be checked. | None |
View Source
def valid_path(in_path, type="folder"):
"""Assert that file exists.
Parameters
----------
required_file: Path
Path to be checked.
"""
if isinstance(in_path, str):
in_path = Path(in_path)
if type == 'folder':
if in_path.is_dir() or in_path.parent.is_dir():
return in_path
else:
raise NotADirectoryError(in_path)
elif type == "file":
if in_path.is_file():
return in_path
else:
raise FileNotFoundError(in_path)
raise TypeError(type)
write_participants
def write_participants(
filename,
participants
)
View Source
def write_participants(filename, participants):
with open(filename, "w") as f:
writer = csv.DictWriter(f, delimiter="\t", fieldnames=participants[0].keys())
writer.writeheader()
writer.writerows(participants)
write_txt
def write_txt(
filename,
lines
)
View Source
def write_txt(filename, lines):
with open(filename, "a") as f:
for row in lines:
f.write("%s\n" % row)
Classes
DEFAULT
class DEFAULT(
/,
*args,
**kwargs
)
Default values of the package
View Source
class DEFAULT(object):
""" Default values of the package"""
# cli dcm2bids
cliLogLevel = "INFO"
EPILOG="Documentation at https://github.com/unfmontreal/Dcm2Bids"
# dcm2bids.py
outputDir = Path.cwd()
session = "" # also Participant object
clobber = False
forceDcm2niix = False
defaceTpl = None
logLevel = "WARNING"
# dcm2niix.py
dcm2niixOptions = "-b y -ba y -z y -f '%3s_%f_%p_%t'"
dcm2niixVersion = "v1.0.20181125"
# sidecar.py
compKeys = ["SeriesNumber", "AcquisitionTime", "SidecarFilename"]
searchMethod = "fnmatch"
searchMethodChoices = ["fnmatch", "re"]
runTpl = "_run-{:02d}"
caseSensitive = True
# Entity table:
# https://bids-specification.readthedocs.io/en/v1.7.0/99-appendices/04-entity-table.html
entityTableKeys = ["sub", "ses", "task", "acq", "ce", "rec", "dir",
"run", "mod", "echo", "flip", "inv", "mt", "part",
"recording"]
# misc
tmpDirName = "tmp_dcm2bids"
helperDir = "helper"
Class variables
EPILOG
caseSensitive
cliLogLevel
clobber
compKeys
dcm2niixOptions
dcm2niixVersion
defaceTpl
entityTableKeys
forceDcm2niix
helperDir
logLevel
outputDir
runTpl
searchMethod
searchMethodChoices
session
tmpDirName
Last update:
2023-07-13
Created: 2023-07-13
Created: 2023-07-13