Source code for genalog.ocr.metrics

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# ---------------------------------------------------------

"""
Utility functions to support getting OCR metrics

OCR Metrics
1. word/character accuracy like in this paper https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=6065412.
Accuracy = Correct Words/Total Words (in target strings)

2. Count of edit distance ops:
insert, delete, substitutions; like in the paper "Deep Statistical Analysis of OCR Errors for Effective Post-OCR Processing".
This is based on Levenshtein edit distance.

3. By looking at the gaps in alignment we also generate substitution dicts:
e.g: if we have text "a worn coat" and ocr is "a wom coat" , "rn" -> "m" will be captured as a substitution
since the rest of the segments align.The assumption here is that we do not expect to have very long gaps in alignment,
hence collecting and counting these substitutions will be managable.

"""
import argparse
import json
import multiprocessing
import os
import re
from multiprocessing import Pool

import pandas as pd
from tqdm import tqdm

from genalog.text.alignment import GAP_CHAR
from genalog.text.anchor import align_w_anchor
from genalog.text.ner_label import _find_gap_char_candidates

LOG_LEVEL = 0
WORKERS_PER_CPU = 2


def _log(*args, **kwargs):
    if LOG_LEVEL:
        print(args)


def _trim_whitespace(src_string):
    return re.sub(r"\s+", " ", src_string.strip())


def _update_align_stats(src, target, align_stats, substitution_dict, gap_char):
    """Given two string that differ and have no alignment at all,
     update the alignment dict and fill in substitution if replacements are found.
     update alignment stats with counts of the edit operation to transform the source
     string to the targes

    Args:
        src (str): source string
        target (str): target string at the
        align_stats (dict): key-value dictionary that stores the counts of inserts, deletes,
            spacing and replacements
        substitution_dict (dict): store the counts of mapping from one substring to another of
            the replacement edit operation. e.g if 'rm' in source needs to map to 'm' in the target 2
            times this will be { ('rm','m'): 2}
        gap_char (str): gap character used in alignment
    """
    _log("getting gap stats for", src, target)
    spacing_count = 0
    for char1, char2 in zip(target, src):
        if (char1 == gap_char and char2 == " ") or (char1 == " " and char2 == gap_char):
            spacing_count += 1
    source_substr = src.strip(f"{gap_char} ")
    target_substr = target.strip(f"{gap_char} ")
    if source_substr != "" or target_substr != "":
        if source_substr == "":
            _log("inserting", target_substr)
            align_stats["insert"] += 1
        elif target_substr == "":
            _log("deleting", source_substr)
            align_stats["delete"] += 1
        else:
            align_stats["replace"] += 1
            _log("replacing", source_substr, target_substr)
            substitution_dict[source_substr, target_substr] = (
                substitution_dict.get((source_substr, target_substr), 0) + 1
            )
    _log("spacing count", spacing_count)
    align_stats["spacing"] += spacing_count


def _update_word_stats(
    aligned_src,
    aligned_target,
    gap_char,
    start,
    end,
    matching_chars_count,
    matching_words_count,
    matching_alnum_words_count,
):
    """Given two string segments that align. update the counts of matching words and characters

    Args:
        aligned_src (str): full source string
        aligned_target (str): full target string
        gap_char (str): gap character used in alignment
        start (int): start position of alignment
        end (int): end position of alignment
        matching_chars_count (int): current count of matching characters
        matching_words_count (int): current count of matching words
        matching_alnum_words_count (int): current count of alphanumeric matching words

    Returns:
        tuple(int,int,int): the updated matching_chars_count, matching_words_count, matching_alnum_words_count
    """
    aligned_part = aligned_src[start:end]
    matching_chars_count += end - start
    # aligned_part = seq.strip()
    _log("aligned", aligned_part, start, end)
    if len(aligned_src) != len(aligned_target):
        raise ValueError("alignment strings are of different length")
    if aligned_part.strip() != "":
        words = re.split(r"\s+", aligned_part.strip())
        matching_words_count += len(words)
        matching_alnum_words_count += len(words)

        for i, word in enumerate(words):
            # remove words that dont have an alphanumeric char from the alphanumeric word count
            if not re.search(r"\w", word):
                matching_alnum_words_count -= 1

            # handle the edge case for the first and last words as these are at the boundary and need
            # to be compared with the full string to see if they have space before or after

            if i == 0:
                if start != 0 and (
                    aligned_target[start] != " " or aligned_src[start] != " "
                ):
                    # if this was the start of the string in the target or source
                    if not (
                        aligned_src[:start].replace(gap_char, "").replace(" ", "") == ""
                        and aligned_target[start - 1] == " "
                    ) and not (
                        aligned_target[:start].replace(gap_char, "").replace(" ", "")
                        == ""
                        and aligned_src[start - 1] == " "
                    ):
                        # beginning word not matching completely
                        _log("removing first match word from count", word, aligned_part)
                        matching_words_count -= 1
                        if re.search(r"\w", word):
                            matching_alnum_words_count -= 1
                        continue

            if i == len(words) - 1:
                if end != len(aligned_target) and (
                    aligned_target[end] != " " or aligned_src[end] != " "
                ):
                    # this was not the end of the string in the src and not end of string in target
                    if not (
                        aligned_src[end:].replace(gap_char, "").replace(" ", "") == ""
                        and aligned_target[end] == " "
                    ) and not (
                        aligned_target[end:].replace(gap_char, "").replace(" ", "")
                        == ""
                        and aligned_src[end] == " "
                    ):
                        # last word not matching completely
                        _log("removing last match word from count", word, aligned_part)
                        matching_words_count -= 1
                        if re.search(r"\w", word):
                            matching_alnum_words_count -= 1

    _log("matched count", matching_words_count)
    _log("matched alnum count", matching_alnum_words_count)
    return matching_chars_count, matching_words_count, matching_alnum_words_count


def _get_align_stats(alignment, src_string, target, gap_char):
    """Given an alignment, this function get the align stats and substitution mapping to
    transform the source string to the target string

    Args:
        alignment (tuple(str, str)): the result of calling align on the two strings
        src_source (str): the source string
        target (str) : the target string
        gap_char (str) : the gap character used in alignment

    Raises:
        ValueError: if any of the aligned string are empty

    Returns:
        tuple(dict, dict): align stats dict, substitution mappings dict
    """

    aligned_src, aligned_target = alignment

    if src_string.strip() == "" or target.strip() == "":
        raise ValueError("one of the input strings is empty")
    _log("src, target", src_string, target)
    substitution_dict = {}

    # words are defined as here as string sepated by whitespace
    words = re.split(r"\s+", target.strip())
    word_count = len(words)

    # alphanumeric words are defined here as words with at least one alphanumeric character
    alnum_words_count = len(list(filter(lambda x: re.search(r"\w", x), words)))

    char_count = max(len(target), len(src_string))
    matching_chars_count = 0
    matching_words_count = 0
    matching_alnum_words_count = 0

    align_stats = {
        "insert": 0,
        "delete": 0,
        "replace": 0,
        "spacing": 0,
        "total_chars": char_count,
        "total_words": word_count,
        "total_alnum_words": alnum_words_count,
    }
    start = 0

    _log("######### Alignment ############")
    _log(aligned_src)
    _log(aligned_target)
    _log("################################")

    gap_start = None
    for i, (char_1, char_2) in enumerate(zip(aligned_src, aligned_target)):
        if char_1 != char_2:
            # since characters don't match here start:i is a substring of the string that align
            # since this substring aligns, simple count the number of matching words and chars in and update
            # the word stats
            end = i
            _log(
                "sequences",
                aligned_src[start:end],
                aligned_target[start:end],
                start,
                end,
            )
            assert aligned_src[start:end] == aligned_target[start:end]
            (
                matching_chars_count,
                matching_words_count,
                matching_alnum_words_count,
            ) = _update_word_stats(
                aligned_src,
                aligned_target,
                gap_char,
                start,
                end,
                matching_chars_count,
                matching_words_count,
                matching_alnum_words_count,
            )
            start = end + 1
            if gap_start is None:
                gap_start = end
        else:
            gap_end = i
            if gap_start is not None:
                # since characters now match  gap_start:i contains a substring of the characters that didnt align before
                # handle this gap alignment by calling _update_align_stats
                _log(
                    "gap",
                    aligned_src[gap_start:gap_end],
                    aligned_target[gap_start:gap_end],
                    gap_start,
                    gap_end,
                )
                _update_align_stats(
                    aligned_src[gap_start:gap_end],
                    aligned_target[gap_start:gap_end],
                    align_stats,
                    substitution_dict,
                    gap_char,
                )
            gap_start = None

    # Now compare any left overs string segments from the for loop
    if gap_start is not None:
        # handle last alignment gap
        _log("last gap", aligned_src[gap_start:], aligned_target[gap_start:])
        _update_align_stats(
            aligned_src[gap_start:],
            aligned_target[gap_start:],
            align_stats,
            substitution_dict,
            gap_char,
        )
    else:
        # handle last aligned substring
        seq = aligned_src[start:]
        aligned_part = seq.strip()
        end = len(aligned_src)
        _log("last aligned", aligned_part)
        (
            matching_chars_count,
            matching_words_count,
            matching_alnum_words_count,
        ) = _update_word_stats(
            aligned_src,
            aligned_target,
            gap_char,
            start,
            end,
            matching_chars_count,
            matching_words_count,
            matching_alnum_words_count,
        )

    align_stats["matching_chars"] = matching_chars_count
    align_stats["matching_alnum_words"] = matching_alnum_words_count
    align_stats["matching_words"] = matching_words_count
    align_stats["alnum_word_accuracy"] = matching_alnum_words_count / alnum_words_count
    align_stats["word_accuracy"] = matching_words_count / word_count
    align_stats["char_accuracy"] = matching_chars_count / char_count
    return align_stats, substitution_dict


[docs]def get_editops_stats(alignment, gap_char): """Get stats for character level edit operations that need to be done to transform the source string to the target string. Inputs must not be empty and must be the result of calling the runing the align function. Args: alignment (tuple(str, str)): the results from the string alignment biopy function gap_char (str): gap character used in alignment Raises: ValueError: If any of the string in the alignment are empty Returns: [type]: [description] """ aligned_src, aligned_target = alignment if aligned_src == "" or aligned_target == "": raise ValueError("one of the input strings is empty") stats = { "edit_insert": 0, "edit_delete": 0, "edit_replace": 0, "edit_insert_spacing": 0, "edit_delete_spacing": 0, } actions = {} for i, (char_1, char_2) in enumerate(zip(aligned_src, aligned_target)): if LOG_LEVEL > 1: _log(char_1, char_2) if char_1 == gap_char: # insert if char_2 == " ": stats["edit_insert_spacing"] += 1 else: stats["edit_insert"] += 1 actions[i] = ("I", char_2) elif char_2 == gap_char: # delete if char_1 == " ": stats["edit_delete_spacing"] += 1 else: stats["edit_delete"] += 1 actions[i] = "D" elif char_2 != char_1: stats["edit_replace"] += 1 actions[i] = ("R", char_2) return stats, actions
[docs]def get_align_stats(alignment, src_string, target, gap_char): """Get alignment stats Args: alignment (tuple(str,str)): the result of calling the align function src_string (str): the original source string target (str): the original target string gap_char (str): the gap character used in alignment Raises: ValueError: if any of the strings are empty Returns: tuple(dict, dict): dict of the align starts and dict of the substitution mappings """ if src_string.strip() == "" or target.strip() == "": raise ValueError("one of the input strings is empty") _log("alignment results") _log(alignment) align_stats, substitution_dict = _get_align_stats( alignment, src_string, target, gap_char ) return align_stats, substitution_dict
[docs]def get_stats(target, src_string): """Get align stats, edit stats, and substitution mappings for transforming the source string to the target string. Edit stats refers to character level edit operation required to transform the source to target. Align stats referers to substring level operation required to transform the source to target. Align stats have keys insert,replace,delete and the special key spacing which counts spacing differences between the two strings. Edit stats have the keys edit_insert, edit_replace, edit_delete which count the character level edits. Args: src_string (str): the source string target (str): the target string Returns: tuple(str, str): One dict containing the edit and align stats, another dict containing the substitutions """ gap_char_candidates, input_char_set = _find_gap_char_candidates( [src_string], [target] ) gap_char = ( GAP_CHAR if GAP_CHAR in gap_char_candidates else gap_char_candidates.pop() ) alignment = align_w_anchor(src_string, target, gap_char=gap_char) align_stats, substitution_dict = get_align_stats( alignment, src_string, target, gap_char ) edit_stats, actions = get_editops_stats(alignment, gap_char) _log("alignment", align_stats) return {**edit_stats, **align_stats}, substitution_dict, actions
[docs]def get_metrics( src_text_path, ocr_json_path, folder_hash=None, use_multiprocessing=True ): """Given a path to the folder containing the source text and a folder containing the output OCR json, this generates the metrics for all files in the source folder. This assumes that the files json folder are of the same name the text files except they are prefixed by the parameter folder_hash followed by underscore and suffixed by .png.json. Args: src_text_path (str): path to source txt files ocr_json_path (str): path to OCR json files folder_hash (str): prefix for OCR json files use_multiprocessing (bool): use multiprocessing Returns: tuple(pandas.DataFrame, dict): A pandas dataframe of the metrics with each file in a row, a dict containing the substitions mappings for each file. the key to the dict is the filename and the values are dicts of the substition mappings for that file. """ rows = [] substitutions = {} actions_map = {} # Spin up workers as alignment on many files can take a while cpu_count = multiprocessing.cpu_count() n_workers = WORKERS_PER_CPU * cpu_count job_args = list( map( lambda f: (f, src_text_path, ocr_json_path, folder_hash), os.listdir(src_text_path), ) ) if use_multiprocessing: with Pool(n_workers) as pool: for f, stats, actions, subs in tqdm( pool.imap_unordered(_worker, job_args), total=len(job_args) ): substitutions[f] = subs actions_map[f] = actions rows.append(stats) else: for f, stats, actions, subs in tqdm( map(_worker, job_args), total=len(job_args) ): substitutions[f] = subs actions_map[f] = actions rows.append(stats) df = pd.DataFrame(rows) return df, substitutions, actions_map
def get_file_metrics(f, src_text_path, ocr_json_path, folder_hash): src_filename = os.path.join(src_text_path, f) if folder_hash: ocr_filename = os.path.join( ocr_json_path, f"{folder_hash}_{f.split('txt')[0] + 'json'}" ) else: ocr_filename = os.path.join(ocr_json_path, f"{f.split('txt')[0] + 'json'}") try: src_string = open(src_filename, "r", errors="ignore", encoding="utf8").read() except FileNotFoundError: print(f"File not found: {src_filename}, skipping this file.") return f, {}, {}, {} try: ocr_string = _get_sorted_text(json.load(open(ocr_filename, "rb"))) except FileNotFoundError: print(f"File not found: {ocr_filename}, skipping this file.") return f, {}, {}, {} # TODO ocr bug? text lines are sometimes not sorted correctly ocr_string = _trim_whitespace(ocr_string) src_string = _trim_whitespace(src_string) try: stats, subs, actions = get_stats(ocr_string, src_string) except ValueError as e: print("Error:", src_filename, ocr_filename, e) return f, {}, {}, {} stats["txt_path"] = src_filename stats["ocr_json_path"] = ocr_filename stats["filename"] = f return f, stats, actions, subs def _worker(args): (f, src_text_path, ocr_json_path, folder_hash) = args return get_file_metrics(f, src_text_path, ocr_json_path, folder_hash) def _get_sorted_text(ocr_json): if "lines" in ocr_json[0]: lines = ocr_json[0]["lines"] sorted_lines = sorted(lines, key=lambda line: line["boundingBox"][0]["y"]) return " ".join([line["text"] for line in sorted_lines]) else: return ocr_json[0]["text"]
[docs]def substitution_dict_to_json(substitution_dict): """Converts substitution dict to list of tuples of (source_substring, target_substring, count) Args: substitution_dict ([type]): [description] """ to_tuple = lambda x: [(k + (x[k],)) for k in x] # noqa: E731 out = {} for filename in substitution_dict: out[filename] = to_tuple(substitution_dict[filename]) return out
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("src", help="path to folder with text files.") parser.add_argument( "ocr", help="folder with ocr json. the filename must match the text filename prefixed by ocr_prefix.", ) parser.add_argument("--ocr_prefix", help="the prefix of the ocr files") parser.add_argument("--output", help="output names of metrics files") args = parser.parse_args() df, subs, actions = get_metrics(args.src, args.ocr, args.ocr_prefix) csv_file, json_file = f"{args.output}.csv", f"{args.output}.json" print("got metrics. dumping to files:", csv_file, json_file) df.to_csv(csv_file) json.dump(substitution_dict_to_json(subs), open(json_file, "w"))