Skip to content

io.py

This contains helper functions to handle inputs and outputs arguments in the benchmark scripts. It also provides some automation routine to handle data.

PartitioningEngine

This class handles partitioning data files into chunks with various strategies.

Source code in src/common/io.py
class PartitioningEngine():
    """This class handles partitioning data files into chunks with various strategies. """
    PARTITION_MODES = [
        'chunk',
        'roundrobin',
        'append'
    ]

    def __init__(self, mode, number, header=False, logger=None):
        """Constructs and setup of the engine

        Args:
            mode (str): which partition mode (in PartitioningEngine.PARTITION_MODE list)
            number (int): parameter, behavior depends on mode
            header (bool): are there header in the input files?
            logger (logging.logger): a custom logger, if needed, for this engine to log
        """
        self.mode = mode
        self.number = number
        self.header = header
        self.logger = logger or logging.getLogger(__name__)

    def split_by_append(self, input_files, output_path, file_count_target):
        """Just appends N++ files in N groups.

        Args:
            input_files (List[str]): list of file paths
            output_path (str): directory path, where to write the partitions
            file_count_target (int): how many partitions we want
        """
        if len(input_files) < file_count_target:
            raise Exception(f"To use mode=append, the number of input files ({len(input_files)}) needs to be higher than requested number of output files ({file_count_target})")

        # each partition starts as an empty list
        partitions = [
            [] for i in range(file_count_target)
        ]

        # loop on all files, and put them in one partition
        for index, input_file in enumerate(input_files):
            partitions[index % file_count_target].append(input_file)

        self.logger.info(f"Shuffled {len(input_files)} files into {file_count_target} partitions.")

        # then write each partition by appending content
        for current_partition_index, partition in enumerate(partitions):
            self.logger.info(f"Writing partition {current_partition_index}...")
            with open(os.path.join(output_path, "part_{:06d}".format(current_partition_index)), 'a', encoding="utf-8") as output_handler:
                for input_file in partition:
                    self.logger.info(f"Reading input file {input_file}...")
                    with open(input_file, 'r') as input_handler:
                        output_handler.write(input_handler.read())

        self.logger.info(f"Created {current_partition_index+1} partitions")


    def split_by_size(self, input_files, output_path, partition_size):
        """Splits input files into a variable number of partitions
        by chunking a fixed number of lines from inputs into each
        output file.

        Args:
            input_files (List[str]): list of file paths
            output_path (str): directory path, where to write the partitions
            partition_size (int): how many lines per partition
        """
        current_partition_size = 0
        current_partition_index = 0
        self.logger.info(f"Creating partition {current_partition_index}")

        header_line = None # there can be only on header line

        for input_file in input_files:
            self.logger.info(f"Opening input file {input_file}")
            with open(input_file, "r", encoding="utf-8") as input_handler:
                for line in input_handler:
                    if self.header and header_line is None:
                        # if first line of first input file
                        # write that line in every partition
                        header_line = line

                    if partition_size > 0 and current_partition_size >= partition_size:
                        current_partition_index += 1
                        current_partition_size = 0
                        self.logger.info(f"Creating partition {current_partition_index}")

                    with open(os.path.join(output_path, "part_{:06d}".format(current_partition_index)), 'a', encoding="utf-8") as output_handler:
                        if self.header and current_partition_size == 0:
                            # put header before anything else
                            output_handler.write(header_line)

                        output_handler.write(line)
                        current_partition_size += 1
        self.logger.info(f"Created {current_partition_index+1} partitions")

    def split_by_count(self, input_files, output_path, partition_count):
        """Splits input files into a fixed number of partitions by round-robin
        shuffling of the lines of input files.

        Args:
            input_files (List[str]): list of file paths
            output_path (str): directory path, where to write the partitions
            partition_count (int): how many lines per partition
        """
        self.logger.info(f"Creating {partition_count} partitions using round robin.")

        partition_files = [open(os.path.join(output_path, "part_{:06d}".format(i)), "w", encoding="utf-8") for i in range(partition_count)]

        current_index = 0
        header_line = None # there can be only on header line

        for input_file in input_files:
            self.logger.info(f"Opening input file {input_file}")
            with open(input_file, "r", encoding="utf-8") as input_handler:
                for line_index, line in enumerate(input_handler):
                    if self.header and header_line is None:
                        # if first line of first input file
                        # write that line in every partition
                        header_line = line
                        for partition_file in partition_files:
                            partition_file.write(header_line)
                        continue
                    elif self.header and line_index == 0:
                        # if first line of 2nd... input file, just pass
                        continue

                    partition_files[current_index % partition_count].write(line)
                    current_index += 1

        for handler in partition_files:
            handler.close()
        self.logger.info(f"Created {partition_count} partitions")

    def run(self, input_path, output_path):
        """Runs the partition based on provided arguments.

        Args:
            input_path (str): path to input file(s)
            output_path (str): path to store output partitions
        """
        # Retrieve all input files
        if os.path.isfile(input_path):
            self.logger.info("Input is one unique file")
            file_names = [os.path.basename(input_path)]
            input_files = [input_path]
        else:
            self.logger.info("Input is a directory, listing all of them for processing")
            file_names = os.listdir(input_path)
            input_files = [os.path.join(input_path, file) for file in file_names]
            self.logger.info("Found {} files in {}".format(len(input_files), input_path))

        if self.mode == "chunk":
            self.split_by_size(input_files, output_path, self.number)
        elif self.mode == "roundrobin":
            self.split_by_count(input_files, output_path, self.number)
        elif self.mode == "append":
            self.split_by_append(input_files, output_path, self.number)
        else:
            raise NotImplementedError(f"Mode {self.mode} not implemented.")

__init__(mode, number, header=False, logger=None)

Constructs and setup of the engine

Parameters:

Name Type Description Default
mode str

which partition mode (in PartitioningEngine.PARTITION_MODE list)

required
number int

parameter, behavior depends on mode

required
header bool

are there header in the input files?

False
logger logging.logger

a custom logger, if needed, for this engine to log

None
Source code in src/common/io.py
def __init__(self, mode, number, header=False, logger=None):
    """Constructs and setup of the engine

    Args:
        mode (str): which partition mode (in PartitioningEngine.PARTITION_MODE list)
        number (int): parameter, behavior depends on mode
        header (bool): are there header in the input files?
        logger (logging.logger): a custom logger, if needed, for this engine to log
    """
    self.mode = mode
    self.number = number
    self.header = header
    self.logger = logger or logging.getLogger(__name__)

split_by_append(input_files, output_path, file_count_target)

Just appends N++ files in N groups.

Parameters:

Name Type Description Default
input_files List[str]

list of file paths

required
output_path str

directory path, where to write the partitions

required
file_count_target int

how many partitions we want

required
Source code in src/common/io.py
def split_by_append(self, input_files, output_path, file_count_target):
    """Just appends N++ files in N groups.

    Args:
        input_files (List[str]): list of file paths
        output_path (str): directory path, where to write the partitions
        file_count_target (int): how many partitions we want
    """
    if len(input_files) < file_count_target:
        raise Exception(f"To use mode=append, the number of input files ({len(input_files)}) needs to be higher than requested number of output files ({file_count_target})")

    # each partition starts as an empty list
    partitions = [
        [] for i in range(file_count_target)
    ]

    # loop on all files, and put them in one partition
    for index, input_file in enumerate(input_files):
        partitions[index % file_count_target].append(input_file)

    self.logger.info(f"Shuffled {len(input_files)} files into {file_count_target} partitions.")

    # then write each partition by appending content
    for current_partition_index, partition in enumerate(partitions):
        self.logger.info(f"Writing partition {current_partition_index}...")
        with open(os.path.join(output_path, "part_{:06d}".format(current_partition_index)), 'a', encoding="utf-8") as output_handler:
            for input_file in partition:
                self.logger.info(f"Reading input file {input_file}...")
                with open(input_file, 'r') as input_handler:
                    output_handler.write(input_handler.read())

    self.logger.info(f"Created {current_partition_index+1} partitions")

split_by_size(input_files, output_path, partition_size)

Splits input files into a variable number of partitions by chunking a fixed number of lines from inputs into each output file.

Parameters:

Name Type Description Default
input_files List[str]

list of file paths

required
output_path str

directory path, where to write the partitions

required
partition_size int

how many lines per partition

required
Source code in src/common/io.py
def split_by_size(self, input_files, output_path, partition_size):
    """Splits input files into a variable number of partitions
    by chunking a fixed number of lines from inputs into each
    output file.

    Args:
        input_files (List[str]): list of file paths
        output_path (str): directory path, where to write the partitions
        partition_size (int): how many lines per partition
    """
    current_partition_size = 0
    current_partition_index = 0
    self.logger.info(f"Creating partition {current_partition_index}")

    header_line = None # there can be only on header line

    for input_file in input_files:
        self.logger.info(f"Opening input file {input_file}")
        with open(input_file, "r", encoding="utf-8") as input_handler:
            for line in input_handler:
                if self.header and header_line is None:
                    # if first line of first input file
                    # write that line in every partition
                    header_line = line

                if partition_size > 0 and current_partition_size >= partition_size:
                    current_partition_index += 1
                    current_partition_size = 0
                    self.logger.info(f"Creating partition {current_partition_index}")

                with open(os.path.join(output_path, "part_{:06d}".format(current_partition_index)), 'a', encoding="utf-8") as output_handler:
                    if self.header and current_partition_size == 0:
                        # put header before anything else
                        output_handler.write(header_line)

                    output_handler.write(line)
                    current_partition_size += 1
    self.logger.info(f"Created {current_partition_index+1} partitions")

split_by_count(input_files, output_path, partition_count)

Splits input files into a fixed number of partitions by round-robin shuffling of the lines of input files.

Parameters:

Name Type Description Default
input_files List[str]

list of file paths

required
output_path str

directory path, where to write the partitions

required
partition_count int

how many lines per partition

required
Source code in src/common/io.py
def split_by_count(self, input_files, output_path, partition_count):
    """Splits input files into a fixed number of partitions by round-robin
    shuffling of the lines of input files.

    Args:
        input_files (List[str]): list of file paths
        output_path (str): directory path, where to write the partitions
        partition_count (int): how many lines per partition
    """
    self.logger.info(f"Creating {partition_count} partitions using round robin.")

    partition_files = [open(os.path.join(output_path, "part_{:06d}".format(i)), "w", encoding="utf-8") for i in range(partition_count)]

    current_index = 0
    header_line = None # there can be only on header line

    for input_file in input_files:
        self.logger.info(f"Opening input file {input_file}")
        with open(input_file, "r", encoding="utf-8") as input_handler:
            for line_index, line in enumerate(input_handler):
                if self.header and header_line is None:
                    # if first line of first input file
                    # write that line in every partition
                    header_line = line
                    for partition_file in partition_files:
                        partition_file.write(header_line)
                    continue
                elif self.header and line_index == 0:
                    # if first line of 2nd... input file, just pass
                    continue

                partition_files[current_index % partition_count].write(line)
                current_index += 1

    for handler in partition_files:
        handler.close()
    self.logger.info(f"Created {partition_count} partitions")

run(input_path, output_path)

Runs the partition based on provided arguments.

Parameters:

Name Type Description Default
input_path str

path to input file(s)

required
output_path str

path to store output partitions

required
Source code in src/common/io.py
def run(self, input_path, output_path):
    """Runs the partition based on provided arguments.

    Args:
        input_path (str): path to input file(s)
        output_path (str): path to store output partitions
    """
    # Retrieve all input files
    if os.path.isfile(input_path):
        self.logger.info("Input is one unique file")
        file_names = [os.path.basename(input_path)]
        input_files = [input_path]
    else:
        self.logger.info("Input is a directory, listing all of them for processing")
        file_names = os.listdir(input_path)
        input_files = [os.path.join(input_path, file) for file in file_names]
        self.logger.info("Found {} files in {}".format(len(input_files), input_path))

    if self.mode == "chunk":
        self.split_by_size(input_files, output_path, self.number)
    elif self.mode == "roundrobin":
        self.split_by_count(input_files, output_path, self.number)
    elif self.mode == "append":
        self.split_by_append(input_files, output_path, self.number)
    else:
        raise NotImplementedError(f"Mode {self.mode} not implemented.")

input_file_path(path)

Argparse type to resolve input path as single file from directory. Given input path can be either a file, or a directory. If it's a directory, this returns the path to the unique file it contains.

Parameters:

Name Type Description Default
path str

either file or directory path

required

Returns:

Name Type Description
str

path to file, or to unique file in directory

Source code in src/common/io.py
def input_file_path(path):
    """ Argparse type to resolve input path as single file from directory.
    Given input path can be either a file, or a directory.
    If it's a directory, this returns the path to the unique file it contains.

    Args:
        path (str): either file or directory path

    Returns:
        str: path to file, or to unique file in directory
    """
    if os.path.isfile(path):
        logging.getLogger(__name__).info(f"Found INPUT file {path}")
        return path
    if os.path.isdir(path):
        all_files = os.listdir(path)
        if not all_files:
            raise Exception(f"Could not find any file in specified input directory {path}")
        if len(all_files) > 1:
            raise Exception(f"Found multiple files in input file path {path}, use input_directory_path type instead.")
        logging.getLogger(__name__).info(f"Found INPUT directory {path}, selecting unique file {all_files[0]}")
        return os.path.join(path, all_files[0])

    logging.getLogger(__name__).critical(f"Provided INPUT path {path} is neither a directory or a file???")
    return path

get_all_files(path, fail_on_unknown_type=False)

Scans some input path and returns a list of files.

Parameters:

Name Type Description Default
path str

either a file, or directory path

required
fail_on_unknown_type bool

fails if path is neither a file or a dir?

False

Returns:

Type Description

List[str]: list of paths contained in path

Source code in src/common/io.py
def get_all_files(path, fail_on_unknown_type=False):
    """ Scans some input path and returns a list of files.

    Args:
        path (str): either a file, or directory path
        fail_on_unknown_type (bool): fails if path is neither a file or a dir?

    Returns:
        List[str]: list of paths contained in path
    """
    # check the existence of the path
    if exists(path) == False: 
        raise Exception(f"The specified path {path} does not exist.")

    # if input path is already a file, return as list
    if os.path.isfile(path):
        logging.getLogger(__name__).info(f"Found INPUT file {path}")
        return [path]

    # if input path is a directory, list all files and return
    if os.path.isdir(path):
        all_files = [ os.path.join(path, entry) for entry in os.listdir(path) ]
        if not all_files:
            raise Exception(f"Could not find any file in specified input directory {path}")
        return all_files

    if fail_on_unknown_type:
        raise FileNotFoundError(f"Provided INPUT path {path} is neither a directory or a file???")
    else:
        logging.getLogger(__name__).critical(f"Provided INPUT path {path} is neither a directory or a file???")

    return path