Using asyncio with Elasticsearch ================================ .. py:module:: elasticsearch :no-index: The ``elasticsearch`` package supports async/await with `asyncio `_ and `aiohttp `_. You can either install ``aiohttp`` directly or use the ``[async]`` extra: .. code-block:: bash $ python -m pip install elasticsearch aiohttp # - OR - $ python -m pip install elasticsearch[async] Getting Started with Async -------------------------- After installation all async API endpoints are available via :class:`~elasticsearch.AsyncElasticsearch` and are used in the same way as other APIs, just with an extra ``await``: .. code-block:: python import asyncio from elasticsearch import AsyncElasticsearch client = AsyncElasticsearch() async def main(): resp = await client.search( index="documents", body={"query": {"match_all": {}}}, size=20, ) print(resp) loop = asyncio.get_event_loop() loop.run_until_complete(main()) All APIs that are available under the sync client are also available under the async client. ASGI Applications and Elastic APM --------------------------------- `ASGI `_ (Asynchronous Server Gateway Interface) is a new way to serve Python web applications making use of async I/O to achieve better performance. Some examples of ASGI frameworks include FastAPI, Django 3.0+, and Starlette. If you're using one of these frameworks along with Elasticsearch then you should be using :py:class:`~elasticsearch.AsyncElasticsearch` to avoid blocking the event loop with synchronous network calls for optimal performance. `Elastic APM `_ also supports tracing of async Elasticsearch queries just the same as synchronous queries. For an example on how to configure ``AsyncElasticsearch`` with a popular ASGI framework `FastAPI `_ and APM tracing there is a `pre-built example `_ in the ``examples/fastapi-apm`` directory. Frequently Asked Questions -------------------------- ValueError when initializing ``AsyncElasticsearch``? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ If when trying to use ``AsyncElasticsearch`` you receive ``ValueError: You must have 'aiohttp' installed to use AiohttpHttpNode`` you should ensure that you have ``aiohttp`` installed in your environment (check with ``$ python -m pip freeze | grep aiohttp``). Otherwise, async support won't be available. What about the ``elasticsearch-async`` package? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Previously asyncio was supported separately via the `elasticsearch-async `_ package. The ``elasticsearch-async`` package has been deprecated in favor of ``AsyncElasticsearch`` provided by the ``elasticsearch`` package in v7.8 and onwards. Receiving 'Unclosed client session / connector' warning? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This warning is created by ``aiohttp`` when an open HTTP connection is garbage collected. You'll typically run into this when closing your application. To resolve the issue ensure that :meth:`~elasticsearch.AsyncElasticsearch.close` is called before the :py:class:`~elasticsearch.AsyncElasticsearch` instance is garbage collected. For example if using FastAPI that might look like this: .. code-block:: python import os from contextlib import asynccontextmanager from fastapi import FastAPI from elasticsearch import AsyncElasticsearch ELASTICSEARCH_URL = os.environ["ELASTICSEARCH_URL"] client = None @asynccontextmanager async def lifespan(app: FastAPI): global client client = AsyncElasticsearch(ELASTICSEARCH_URL) yield await client.close() app = FastAPI(lifespan=lifespan) @app.get("/") async def main(): return await client.info() You can run this example by saving it to ``main.py`` and executing ``ELASTICSEARCH_URL=http://localhost:9200 uvicorn main:app``. Async Helpers ------------- Async variants of all helpers are available in ``elasticsearch.helpers`` and are all prefixed with ``async_*``. You'll notice that these APIs are identical to the ones in the sync :ref:`helpers` documentation. All async helpers that accept an iterator or generator also accept async iterators and async generators. .. py:module:: elasticsearch.helpers :no-index: Bulk and Streaming Bulk ~~~~~~~~~~~~~~~~~~~~~~~ .. autofunction:: async_bulk .. code-block:: python import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_bulk client = AsyncElasticsearch() async def gendata(): mywords = ['foo', 'bar', 'baz'] for word in mywords: yield { "_index": "mywords", "doc": {"word": word}, } async def main(): await async_bulk(client, gendata()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) .. autofunction:: async_streaming_bulk .. code-block:: python import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_streaming_bulk client = AsyncElasticsearch() async def gendata(): mywords = ['foo', 'bar', 'baz'] for word in mywords: yield { "_index": "mywords", "word": word, } async def main(): async for ok, result in async_streaming_bulk(client, gendata()): action, result = result.popitem() if not ok: print("failed to %s document %s" % ()) loop = asyncio.get_event_loop() loop.run_until_complete(main()) Scan ~~~~ .. autofunction:: async_scan .. code-block:: python import asyncio from elasticsearch import AsyncElasticsearch from elasticsearch.helpers import async_scan client = AsyncElasticsearch() async def main(): async for doc in async_scan( client=client, query={"query": {"match": {"title": "python"}}}, index="orders-*" ): print(doc) loop = asyncio.get_event_loop() loop.run_until_complete(main()) Reindex ~~~~~~~ .. autofunction:: async_reindex API Reference ------------- .. py:module:: elasticsearch :no-index: The API of :class:`~elasticsearch.AsyncElasticsearch` is nearly identical to the API of :class:`~elasticsearch.Elasticsearch` with the exception that every API call like :py:func:`~elasticsearch.AsyncElasticsearch.search` is an ``async`` function and requires an ``await`` to properly return the response body. AsyncElasticsearch ~~~~~~~~~~~~~~~~~~ .. note:: To reference Elasticsearch APIs that are namespaced like ``.indices.create()`` refer to the sync API reference. These APIs are identical between sync and async. .. autoclass:: AsyncElasticsearch :members: