When there is need to resolve many entities, multithreading can be used
to speed things up. Below is an example using the autosuggest method and
the implementation would be similar with any of the other methods from
the knowledge_graph family.
The instance of Bigdata includes a built-in rate limiter to avoid making
too many requests in a short period of time. More information can be
foud at threading.
Example
Here the goal is to use threads and not having to wait for the response
of KnowledgeGraph.autosuggest before making the next call. We create a
function autosuggest_handling_errors that will be used by the threads.
Step 1: Create a function that invokes the method and wraps any
possible error.
from bigdata_client import Bigdata
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
    try:
        return bigdata.knowledge_graph.autosuggest(value)
    except Exception as e:
        return f"Error: {e}"
def concurrent_autosuggest(
    bigdata: Bigdata, values: list[str], max_concurrency: int
):
  results = {}
  with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
    futures_to_values = {
        executor.submit(autosuggest_handling_errors, bigdata, value): value
        for value in values
    }
    for future in concurrent.futures.as_completed(futures_to_values):
      value_resolved = futures_to_values[future]
      results[value_resolved] = future.result()
  return results
autosuggest_handling_errors. The response is returned as a dictionary.
The instance of Bigdata reuses the connection so in case you plan on
creating many threads, increase the number of simultaneous connections
allowed by configuring
BIGDATA_MAX_PARALLEL_REQUESTS.
import pprint
if __name__ == "__main__":
    values = ["tesla", "apple"]
    max_concurrency = 2
    results = concurrent_autosuggest(bigdata, values, max_concurrency)
    pprint.PrettyPrinter().pprint(results)
{'apple': [Company(id='D8442A', name='Apple Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Technology', industry_group='Computer Hardware', industry='Computer Hardware', ticker='AAPL'),
       Concept(id='4AD3C9', name='Apple', volume=None, description=None, entity_type='FRTS', entity_type_name='Fruits', concept_level_2=None, concept_level_3=None, concept_level_4=None, concept_level_5=None)],
'tesla': [Company(id='DD3BB1', name='Tesla Inc.', volume=None, description=None, entity_type='COMP', company_type='Public', country='United States', sector='Consumer Goods', industry_group='Automobiles', industry='Automobiles', ticker='TSLA'),
       Product(id='C7BCE3', name='Tesla Automobile', volume=None, description=None, entity_type='PROD', product_type='Electric Vehicle', product_owner='Tesla Inc.')]}
Full code
from bigdata_client import Bigdata
import concurrent.futures
from concurrent.futures.thread import ThreadPoolExecutor
import pprint
bigdata = Bigdata()
def autosuggest_handling_errors(bigdata: Bigdata, value: str):
    try:
        return bigdata.knowledge_graph.autosuggest(value)
    except Exception as e:
        return f"Error: {e}"
def concurrent_autosuggest(
    bigdata: Bigdata, values: list[str], max_concurrency: int
):
    results = {}
    with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
        futures_to_values = {
            executor.submit(autosuggest_handling_errors, bigdata, value): value
            for value in values
        }
        for future in concurrent.futures.as_completed(futures_to_values):
            value_resolved = futures_to_values[future]
            results[value_resolved] = future.result()
    return results
values = ["tesla", "apple"]
max_concurrency = 2
results = concurrent_autosuggest(bigdata, values, max_concurrency)
pprint.PrettyPrinter().pprint(results)