Source code for genalog.ocr.rest_client

# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# ---------------------------------------------------------

"""Uses the REST api to perform operations on the search service.
see: https://docs.microsoft.com/en-us/rest/api/searchservice/
"""
import json
import os
import pkgutil
import sys
import time
from itertools import cycle

import requests

from .common import DEFAULT_PROJECTIONS_CONTAINER_NAME

API_VERSION = "?api-version=2019-05-06-Preview"

# 15 min schedule
SCHEDULE_INTERVAL = "PT15M"


[docs]class GrokRestClient: """This is a REST client. It is a wrapper around the REST api for the Azure Search Service see: https://docs.microsoft.com/en-us/rest/api/searchservice/ This class can be used to create an indexing pipeline and can be used to run and monitor ongoing indexers. The indexing pipeline can allow you to run batch OCR enrichment of documents. """ def __init__( self, cognitive_service_key, search_service_key, search_service_name, skillset_name, index_name, indexer_name, datasource_name, datasource_container_name, blob_account_name, blob_key, projections_container_name=DEFAULT_PROJECTIONS_CONTAINER_NAME, ): """Creates the REST client Args: cognitive_service_key (str): key to cognitive services account search_service_key (str): key to the search service account search_service_name (str): name of the search service account skillset_name (str): name of the skillset index_name (str): name of the index indexer_name (str): the name of indexer datasource_name (str): the name to give the the attached blob storage source datasource_container_name (str): the container in the blob storage that host the files blob_account_name (str): blob storage account name that will host the documents to push though the pipeline blob_key (str): key to blob storage account """ # check arguments self._checkArg("COGNITIVE_SERVICE_KEY", cognitive_service_key) self._checkArg("SEARCH_SERVICE_KEY", search_service_key) self._checkArg("SEARCH_SERVICE_NAME", search_service_name) self._checkArg("SKILLSET_NAME", skillset_name) self._checkArg("INDEX_NAME", index_name) self._checkArg("INDEXER_NAME", indexer_name) self._checkArg("DATASOURCE_NAME", datasource_name) self._checkArg("DATASOURCE_CONTAINER_NAME", datasource_container_name) self._checkArg("PROJECTIONS_CONTAINER_NAME", projections_container_name) self._checkArg("BLOB_NAME", blob_account_name) self._checkArg("BLOB_KEY", blob_key) self.COGNITIVE_SERVICE_KEY = cognitive_service_key self.SEARCH_SERVICE_KEY = search_service_key self.SEARCH_SERVICE_NAME = search_service_name self.SKILLSET_NAME = skillset_name self.INDEX_NAME = index_name self.INDEXER_NAME = indexer_name self.DATASOURCE_NAME = datasource_name self.DATASOURCE_CONTAINER_NAME = datasource_container_name self.PROJECTIONS_CONTAINER_NAME = projections_container_name self.BLOB_NAME = blob_account_name self.BLOB_KEY = blob_key self.API_VERSION = API_VERSION self.BLOB_CONNECTION_STRING = ( f"DefaultEndpointsProtocol=https;AccountName={self.BLOB_NAME};" f"AccountKey={self.BLOB_KEY};EndpointSuffix=core.windows.net" ) @staticmethod def create_from_env_var(): COGNITIVE_SERVICE_KEY = os.environ["COGNITIVE_SERVICE_KEY"] SEARCH_SERVICE_KEY = os.environ["SEARCH_SERVICE_KEY"] SEARCH_SERVICE_NAME = os.environ["SEARCH_SERVICE_NAME"] SKILLSET_NAME = os.environ["SKILLSET_NAME"] INDEX_NAME = os.environ["INDEX_NAME"] INDEXER_NAME = os.environ["INDEXER_NAME"] DATASOURCE_NAME = os.environ["DATASOURCE_NAME"] DATASOURCE_CONTAINER_NAME = os.environ["DATASOURCE_CONTAINER_NAME"] BLOB_NAME = os.environ["BLOB_NAME"] BLOB_KEY = os.environ["BLOB_KEY"] PROJECTIONS_CONTAINER_NAME = os.environ.get( "PROJECTIONS_CONTAINER_NAME", DEFAULT_PROJECTIONS_CONTAINER_NAME ) client = GrokRestClient( COGNITIVE_SERVICE_KEY, SEARCH_SERVICE_KEY, SEARCH_SERVICE_NAME, SKILLSET_NAME, INDEX_NAME, INDEXER_NAME, DATASOURCE_NAME, DATASOURCE_CONTAINER_NAME, BLOB_NAME, BLOB_KEY, projections_container_name=PROJECTIONS_CONTAINER_NAME, ) return client
[docs] def create_skillset(self): """Adds a skillset that performs OCR on images""" headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } skillset_json = json.loads( pkgutil.get_data(__name__, "templates/skillset.json") ) skillset_json["name"] = self.SKILLSET_NAME skillset_json["cognitiveServices"]["key"] = self.COGNITIVE_SERVICE_KEY knowledge_store_json = json.loads( pkgutil.get_data(__name__, "templates/knowledge_store.json") ) knowledge_store_json["storageConnectionString"] = self.BLOB_CONNECTION_STRING knowledge_store_json["projections"][0]["objects"][0][ "storageContainer" ] = self.PROJECTIONS_CONTAINER_NAME skillset_json["knowledgeStore"] = knowledge_store_json print(skillset_json) endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/skillsets/{self.SKILLSET_NAME}" r = requests.put( endpoint + self.API_VERSION, json.dumps(skillset_json), headers=headers ) print("skillset response", r.text) r.raise_for_status() print("added skillset", self.SKILLSET_NAME, r)
[docs] def create_datasource(self): """Attaches the blob data store to the search service as a source for image documents""" headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } datasource_json = json.loads( pkgutil.get_data(__name__, "templates/datasource.json") ) datasource_json["name"] = self.DATASOURCE_NAME datasource_json["credentials"]["connectionString"] = self.BLOB_CONNECTION_STRING datasource_json["type"] = "azureblob" datasource_json["container"]["name"] = self.DATASOURCE_CONTAINER_NAME endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/datasources/{self.DATASOURCE_NAME}" r = requests.put( endpoint + self.API_VERSION, json.dumps(datasource_json), headers=headers ) print("datasource response", r.text) r.raise_for_status() print("added datasource", self.DATASOURCE_NAME, r)
[docs] def create_index(self): """Create an index with the layoutText column to store OCR output from the enrichment""" headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } index_json = json.loads(pkgutil.get_data(__name__, "templates/index.json")) index_json["name"] = self.INDEX_NAME endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexes/{self.INDEX_NAME}" r = requests.put( endpoint + self.API_VERSION, json.dumps(index_json), headers=headers ) print("index response", r.text) r.raise_for_status() print("created index", self.INDEX_NAME, r)
[docs] def create_indexer(self, extension_to_exclude=".txt, .json"): """Creates an indexer that runs the enrichment skillset of documents from the datatsource. The enriched results are pushed to the index. """ headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } indexer_json = json.loads(pkgutil.get_data(__name__, "templates/indexer.json")) indexer_json["name"] = self.INDEXER_NAME indexer_json["skillsetName"] = self.SKILLSET_NAME indexer_json["targetIndexName"] = self.INDEX_NAME indexer_json["dataSourceName"] = self.DATASOURCE_NAME indexer_json["schedule"] = {"interval": SCHEDULE_INTERVAL} indexer_json["parameters"]["configuration"][ "excludedFileNameExtensions" ] = extension_to_exclude endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexers/{self.INDEXER_NAME}" r = requests.put( endpoint + self.API_VERSION, json.dumps(indexer_json), headers=headers ) print("indexer response", r.text) r.raise_for_status() print("created indexer", self.INDEXER_NAME, r)
[docs] def create_indexing_pipeline(self): """Using REST calls, creates an index, indexer and skillset on the Cognitive service. The templates for json are in the templates folder. """ self.create_skillset() self.create_index() self.create_datasource() self.create_indexer()
[docs] def delete_indexer_pipeline(self): """Deletes all indexers, indexes, skillsets and datasources that had been previously created """ headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } endpoints = [ f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexers/{self.INDEXER_NAME}", f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexes/{self.INDEX_NAME}", f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/datasources/{self.DATASOURCE_NAME}", f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/skillsets/{self.SKILLSET_NAME}", ] for endpoint in endpoints: r = requests.delete(endpoint + self.API_VERSION, headers=headers) print("delete response", r.text) r.raise_for_status()
def run_indexer(self): headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexers/{self.INDEXER_NAME}/run" r = requests.post(endpoint + self.API_VERSION, headers=headers) print("run indexer response", r.text) r.raise_for_status() print("running indexer", self.INDEXER_NAME, r) def poll_indexer_till_complete(self): progress = cycle("|\b/\b-\b\\\b") i = 0 while True: # attempt a call every 100 steps if i % 100 == 0: request_json = self.get_indexer_status() if request_json["status"] == "error": raise RuntimeError("Indexer failed") if ( request_json["lastResult"] and not request_json["lastResult"]["status"] == "inProgress" ): print(request_json["lastResult"]["status"], self.INDEXER_NAME) return request_json sys.stdout.write(next(progress)) sys.stdout.flush() time.sleep(0.05) i = (1 + i) % 1000 # to avoid overflow def get_indexer_status(self): headers = { "Content-Type": "application/json", "api-key": self.SEARCH_SERVICE_KEY, } endpoint = f"https://{self.SEARCH_SERVICE_NAME}.search.windows.net/indexers/{self.INDEXER_NAME}/status" response = requests.get(endpoint + self.API_VERSION, headers=headers) response.raise_for_status() return response.json() def _checkArg(self, name, value): if not (value): raise ValueError(f"argument {name} is not set")