Helpers

Collection of simple helper functions that abstract some specifics of the raw API.

Connecting

from elasticsearch import Elasticsearch

client = Elasticsearch("https://.../", api_key="YOUR_API_KEY")

Bulk helpers

There are several helpers for the bulk API since its requirement for specific formatting and other considerations can make it cumbersome if used directly.

All bulk helpers accept an instance of Elasticsearch class and an iterable actions (any iterable, can also be a generator, which is ideal in most cases since it will allow you to index large datasets without the need of loading them into memory).

The items in the action iterable should be the documents we wish to index in several formats. The most common one is the same as returned by search(), for example:

{
    '_index': 'index-name',
    '_id': 42,
    '_routing': 5,
    'pipeline': 'my-ingest-pipeline',
    '_source': {
        "title": "Hello World!",
        "body": "..."
    }
}

Alternatively, if _source is not present, it will pop all metadata fields from the doc and use the rest as the document data:

{
    "_id": 42,
    "_routing": 5,
    "title": "Hello World!",
    "body": "..."
}

The bulk() api accepts index, create, delete, and update actions. Use the _op_type field to specify an action (_op_type defaults to index):

{
    '_op_type': 'delete',
    '_index': 'index-name',
    '_id': 42,
}
{
    '_op_type': 'update',
    '_index': 'index-name',
    '_id': 42,
    'doc': {'question': 'The life, universe and everything.'}
}

Example:

Lets say we have an iterable of data. Lets say a list of words called mywords and we want to index those words into individual documents where the structure of the document is like {"word": "<myword>"}.

from elasticsearch.helpers import bulk

def gendata():
    mywords = ['foo', 'bar', 'baz']
    for word in mywords:
        yield {
            "_index": "mywords",
            "word": word,
        }

bulk(client, gendata())

For a more complete and complex example please take a look at https://github.com/elastic/elasticsearch-py/blob/main/examples/bulk-ingest

The parallel_bulk() api is a wrapper around the bulk() api to provide threading. parallel_bulk() returns a generator which must be consumed to produce results.

To see the results use:

for success, info in parallel_bulk(...):
if not success:
    print('A document failed:', info)

If you don’t care about the results, you can use deque from collections:

from collections import deque
deque(parallel_bulk(...), maxlen=0)

Note

When reading raw json strings from a file, you can also pass them in directly (without decoding to dicts first). In that case, however, you lose the ability to specify anything (index, op_type and even id) on a per-record basis, all documents will just be sent to elasticsearch to be indexed as-is.

elasticsearch.helpers.streaming_bulk(client, actions, chunk_size=500, max_chunk_bytes=104857600, raise_on_error=True, expand_action_callback=<function expand_action>, raise_on_exception=True, max_retries=0, initial_backoff=2, max_backoff=600, yield_ok=True, ignore_status=(), retry_on_status=(429, ), span_name='helpers.streaming_bulk', *args, **kwargs)

Streaming bulk consumes actions from the iterable passed in and yields results per action. For non-streaming usecases use bulk() which is a wrapper around streaming bulk that returns summary information about the bulk operation once the entire input is consumed and sent.

If you specify max_retries it will also retry any documents that were rejected with a 429 status code. Use retry_on_status to configure which status codes will be retried. To do this it will wait (by calling time.sleep which will block) for initial_backoff seconds and then, every subsequent rejection for the same chunk, for double the time every time up to max_backoff seconds.

Parameters:
  • client (Elasticsearch) – instance of Elasticsearch to use

  • actions (Iterable[bytes | str | Dict[str, Any]]) – iterable containing the actions to be executed

  • chunk_size (int) – number of docs in one chunk sent to es (default: 500)

  • max_chunk_bytes (int) – the maximum size of the request in bytes (default: 100MB)

  • raise_on_error (bool) – raise BulkIndexError containing errors (as .errors) from the execution of the last chunk when some occur. By default we raise.

  • raise_on_exception (bool) – if False then don’t propagate exceptions from call to bulk and just report the items that failed as failed.

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – callback executed on each action passed in, should return a tuple containing the action line and the data line (None if data line should be omitted).

  • retry_on_status (int | Collection[int]) – HTTP status code that will trigger a retry. (if None is specified only status 429 will retry).

  • max_retries (int) – maximum number of times a document will be retried when retry_on_status (defaulting to 429) is received, set to 0 (default) for no retries

  • initial_backoff (float) – number of seconds we should wait before the first retry. Any subsequent retries will be powers of initial_backoff * 2**retry_number

  • max_backoff (float) – maximum number of seconds a retry will wait

  • yield_ok (bool) – if set to False will skip successful documents in the output

  • ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore

  • client

  • actions

  • chunk_size

  • max_chunk_bytes

  • raise_on_error

  • expand_action_callback

  • raise_on_exception

  • max_retries

  • initial_backoff

  • max_backoff

  • yield_ok

  • ignore_status

  • retry_on_status

  • span_name (str)

  • args (Any)

  • kwargs (Any)

Return type:

Iterable[Tuple[bool, Dict[str, Any]]]

elasticsearch.helpers.parallel_bulk(client, actions, thread_count=4, chunk_size=500, max_chunk_bytes=104857600, queue_size=4, expand_action_callback=<function expand_action>, ignore_status=(), *args, **kwargs)

Parallel version of the bulk helper run in multiple threads at once.

Parameters:
  • client (Elasticsearch) – instance of Elasticsearch to use

  • actions (Iterable[bytes | str | Dict[str, Any]]) – iterator containing the actions

  • thread_count (int) – size of the threadpool to use for the bulk requests

  • chunk_size (int) – number of docs in one chunk sent to es (default: 500)

  • max_chunk_bytes (int) – the maximum size of the request in bytes (default: 100MB)

  • raise_on_error – raise BulkIndexError containing errors (as .errors) from the execution of the last chunk when some occur. By default we raise.

  • raise_on_exception – if False then don’t propagate exceptions from call to bulk and just report the items that failed as failed.

  • expand_action_callback (Callable[[bytes | str | Dict[str, Any]], Tuple[Dict[str, Any], None | bytes | Dict[str, Any]]]) – callback executed on each action passed in, should return a tuple containing the action line and the data line (None if data line should be omitted).

  • queue_size (int) – size of the task queue between the main thread (producing chunks to send) and the processing threads.

  • ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore

  • client

  • actions

  • thread_count

  • chunk_size

  • max_chunk_bytes

  • queue_size

  • expand_action_callback

  • ignore_status

  • args (Any)

  • kwargs (Any)

Return type:

Iterable[Tuple[bool, Any]]

elasticsearch.helpers.bulk(client, actions, stats_only=False, ignore_status=(), *args, **kwargs)

Helper for the bulk() api that provides a more human friendly interface - it consumes an iterator of actions and sends them to elasticsearch in chunks. It returns a tuple with summary information - number of successfully executed actions and either list of errors or number of errors if stats_only is set to True. Note that by default we raise a BulkIndexError when we encounter an error so options like stats_only only apply when raise_on_error is set to False.

When errors are being collected original document data is included in the error dictionary which can lead to an extra high memory usage. If you need to process a lot of data and want to ignore/collect errors please consider using the streaming_bulk() helper which will just return the errors and not store them in memory.

Parameters:
  • client (Elasticsearch) – instance of Elasticsearch to use

  • actions (Iterable[bytes | str | Dict[str, Any]]) – iterator containing the actions

  • stats_only (bool) – if True only report number of successful/failed operations instead of just number of successful and a list of error responses

  • ignore_status (int | Collection[int]) – list of HTTP status code that you want to ignore

  • client

  • actions

  • stats_only

  • ignore_status

  • args (Any)

  • kwargs (Any)

Return type:

Tuple[int, int | List[Dict[str, Any]]]

Any additional keyword arguments will be passed to streaming_bulk() which is used to execute the operation, see streaming_bulk() for more accepted parameters.

Scan

elasticsearch.helpers.scan(client, query=None, scroll='5m', raise_on_error=True, preserve_order=False, size=1000, request_timeout=None, clear_scroll=True, scroll_kwargs=None, **kwargs)

Simple abstraction on top of the scroll() api - a simple iterator that yields all hits as returned by underlining scroll requests.

By default scan does not return results in any pre-determined order. To have a standard order in the returned documents (either by score or explicit sort definition) when scrolling, use preserve_order=True. This may be an expensive operation and will negate the performance benefits of using scan.

Parameters:
  • client (Elasticsearch) – instance of Elasticsearch to use

  • query (Any | None) – body for the search() api

  • scroll (str) – Specify how long a consistent view of the index should be maintained for scrolled search

  • raise_on_error (bool) – raises an exception (ScanError) if an error is encountered (some shards fail to execute). By default we raise.

  • preserve_order (bool) – don’t set the search_type to scan - this will cause the scroll to paginate with preserving the order. Note that this can be an extremely expensive operation and can easily lead to unpredictable results, use with caution.

  • size (int) – size (per shard) of the batch send at each iteration.

  • request_timeout (float | None) – explicit timeout for each call to scan

  • clear_scroll (bool) – explicitly calls delete on the scroll id via the clear scroll API at the end of the method on completion or error, defaults to true.

  • scroll_kwargs (MutableMapping[str, Any] | None) – additional kwargs to be passed to scroll()

  • client

  • query

  • scroll

  • raise_on_error

  • preserve_order

  • size

  • request_timeout

  • clear_scroll

  • scroll_kwargs

  • kwargs (Any)

Return type:

Iterable[Dict[str, Any]]

Any additional keyword arguments will be passed to the initial search() call:

scan(client,
    query={"query": {"match": {"title": "python"}}},
    index="orders-*",
    doc_type="books"
)

Reindex

elasticsearch.helpers.reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', op_type=None, scan_kwargs={}, bulk_kwargs={})

Reindex all documents from one index that satisfy a given query to another, potentially (if target_client is specified) on a different cluster. If you don’t specify the query you will reindex all the documents.

Since 2.3 a reindex() api is available as part of elasticsearch itself. It is recommended to use the api instead of this helper wherever possible. The helper is here mostly for backwards compatibility and for situations where more flexibility is needed.

Note

This helper doesn’t transfer mappings, just the data.

Parameters:
  • client (Elasticsearch) – instance of Elasticsearch to use (for read if target_client is specified as well)

  • source_index (str | Collection[str]) – index (or list of indices) to read documents from

  • target_index (str) – name of the index in the target cluster to populate

  • query (Any | None) – body for the search() api

  • target_client (Elasticsearch | None) – optional, is specified will be used for writing (thus enabling reindex between clusters)

  • chunk_size (int) – number of docs in one chunk sent to es (default: 500)

  • scroll (str) – Specify how long a consistent view of the index should be maintained for scrolled search

  • op_type (str | None) – Explicit operation type. Defaults to ‘_index’. Data streams must be set to ‘create’. If not specified, will auto-detect if target_index is a data stream.

  • scan_kwargs (MutableMapping[str, Any]) – additional kwargs to be passed to scan()

  • bulk_kwargs (MutableMapping[str, Any]) – additional kwargs to be passed to bulk()

  • client

  • source_index

  • target_index

  • query

  • target_client

  • chunk_size

  • scroll

  • op_type

  • scan_kwargs

  • bulk_kwargs

Return type:

Tuple[int, int | List[Dict[str, Any]]]