[Gelöst] Python aiohttp - Vermutlich grundsätzliches Missverständnis und "This event loop is already running"

Vom einfachen Programm zum fertigen Debian-Paket, Fragen rund um Programmiersprachen, Scripting und Lizenzierung.
Antworten
buhtz
Beiträge: 1105
Registriert: 04.12.2015 17:54:49
Kontaktdaten:

[Gelöst] Python aiohttp - Vermutlich grundsätzliches Missverständnis und "This event loop is already running"

Beitrag von buhtz » 04.09.2021 09:07:01

X-Post
Ich nutze schon länger aiohttp. Aktuell restrukturiere ich den betreffenden Code und stoße dabei auf Probleme, die mich vermuten lassen, dass ich einige grundlegende Dinge bzgl. aiohttp bzw. asyncio missverstehe.

Die vielen Tutorials und StackOverflow Beiträge sind sehr aufschlussreich. Aber sobald ich diese Beispiele in meine eigene Programmstruktur übertragen möchte, stoße ich auf Probleme - genauer 1 Error und 2 Warnings.

Code: Alles auswählen

RuntimeError: This event loop is already running.
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
Die Ursache für den RuntimeError ist mir schon klar, aber ich weiß nicht, wie ich das umschiffen kann.

Das folgende MWE repliziert relativ gut die Struktur meines Real-Codes.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp


class FetchAsync:
    def __init__(self):
        pass

    def _get_loop(self):
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
        finally:
            loop.set_debug(True)
            return loop

    async def _receive_via_aiohttp(self, session, url, headers):
        async with session.get(url, headers=headers) as response:
            content = await response.read()
            return response, content

    async def _fetch(self,
                     url,
                     session):
        headers = {'User-Agent': 'MyAgent'}

        # use aiohttp to get feed/xml content and response object
        response, content = await self._receive_via_aiohttp(session,
                                                            url,
                                                            headers)

        # do a lot more stuff...

    def run(self):
        loop = self._get_loop()
        asyncio.run(self._run_async())
        loop.close()

    async def _run_async(self):
        async with aiohttp.ClientSession() as session:
            # in real there are much more URLs
            urls = ['https://cnn.com',
                    'https://fsfe.org']

            # create the "jobs" (futures)
            futures = [self._fetch(url, session)
                       for url
                       in urls]

            # run the "jobs" asynchrone
            self._get_loop().run_until_complete(asyncio.wait(futures))


if __name__ == '__main__':
    obj = FetchAsync()
    obj.run()
Der komplette Output sieht so aus:

Code: Alles auswählen

Traceback (most recent call last):
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 62, in <module>
    obj.run()
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 43, in run
    asyncio.run(self._run_async())
  File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/user/share/work/aiotest/./fetchfeeds.py", line 58, in _run_async
    self._get_loop().run_until_complete(asyncio.wait(futures))
  File "/usr/lib/python3.9/asyncio/base_events.py", line 618, in run_until_complete
    self._check_running()
  File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
Vielleicht noch eine kleine FAQ zum Code.
  • run() ist dort, weil die Klasse eigentlich ein Thread ist.
  • _run_async() ist da, weil ich async with ClientSession() sonst nicht aufrufen kann.
  • Das Session Objekt soll für alle get() Aufrufe genutzt werden.
  • Im Real-Code sind es zwischen 100 und 300 URLs/Jobs die hier laufen sollen.
  • _receive_via_aiohttp() "kapselt" aiohttp, damit ich das bei unittests einfach mocken kann. So das in einem unittest, das reale aiohttp gar nicht berührt wird und ich den content und response fest definieren kann.
Zuletzt geändert von buhtz am 08.09.2021 00:03:41, insgesamt 2-mal geändert.
Debian 11 & 12; Desktop-PC, Headless-NAS, Raspberry Pi 4
Teil des Upstream Betreuer Teams von Back In Time (Debianbackintime)

JTH
Moderator
Beiträge: 3023
Registriert: 13.08.2008 17:01:41
Wohnort: Berlin

Re: Python aiohttp - Vermutlich grundsätzliches Missverständnis und "This event loop is already running"

Beitrag von JTH » 04.09.2021 11:09:59

run_until_complete() will die betreffende Loop überhaupt erst starten – deine läuft in der Zeile

Code: Alles auswählen

            self._get_loop().run_until_complete(asyncio.wait(futures))
durch das vorher in FetchAsync.run() ausgeführte asyncio.run() aber schon. Du müsstest in der zitierten Zeile etwas wie asyncio.gather() benutzen, um einfach eine Reihe von Futures Koroutinenaufrufen innerhalb der laufenden Loop abzuwarten:

Code: Alles auswählen

            await asyncio.gather(*futures)

Nebenbei: asyncio.run() erstellt bei Bedarf selbst eine neue Eventloop im laufenden Thread, das musst du nicht von Hand machen. Also selbst wenn du das Ganze, wie du schreibst, irgendwie noch in einen Thread verpacken willst. (Warum? Gibts einen zwingenden Grund dafür?)

Es könnte also reichen:

Code: Alles auswählen

    def run(self):
        asyncio.run(self._run_async())
FetchAsync._get_loop() könnte damit komplett wegfallen. Debugging kannst du dann in FetchAsync._run_async() anschalten:

Code: Alles auswählen

        asyncio.get_running_loop().set_debug(True)

Selbst falls das nicht passt: asyncio.get_event_loop() startet bei Bedarf auch selbst eine Loop, die Exception in FetchAsync._get_loop() dürfte nie auftreten. Damit könntest du get_event_loop() auch, statt _get_loop(), direkt in FetchAsync.run() aufrufen.

Als „Warnung“ im Kleingedruckten: Ich bin eher C-geprägt und tu mich immer noch schwer mit diesem (eigentlich gar nicht so) neumodischen asynchronen Programmieren mit Koroutinen, wenn ich nicht selbst Threads anlegen und synchronisieren darf. Bitte um Verbesserung, falls ich Quatsch schreibe :D
Manchmal bekannt als Just (another) Terminal Hacker.

buhtz
Beiträge: 1105
Registriert: 04.12.2015 17:54:49
Kontaktdaten:

Re: Python aiohttp - Vermutlich grundsätzliches Missverständnis und "This event loop is already running"

Beitrag von buhtz » 04.09.2021 13:03:39

Grossartig! In dem MWE funktioniert es erst mal. Ich werde es heute Nacht versuchen es in meinen Real-Code zu überführen.

Möchtest du das evtl. in den betreffenden StackOverflow Beitrag als Antwort verfassen und die Punkte einkassieren?
JTH hat geschrieben: ↑ zum Beitrag ↑
04.09.2021 11:09:59
irgendwie noch in einen Thread verpacken willst. (Warum? Gibts einen zwingenden Grund dafür?)
Die GUI soll nicht blockieren. Und ja, ich weiß, dass der asyncio-loop auch automatisch in einem separaten Thread läuft; das reicht in meinem realen Fall aber nicht.
JTH hat geschrieben: ↑ zum Beitrag ↑
04.09.2021 11:09:59
Als „Warnung“ im Kleingedruckten: Ich bin eher C-geprägt und tu mich immer noch schwer mit diesem (eigentlich gar nicht so) neumodischen asynchronen Programmieren mit Koroutinen
So schwerfällig wirkt das bei dir gar nicht! :hail:

EDIT: Jo, es läuft auch im Real-Code sehr gut. Dickes Danke nochmal!
Debian 11 & 12; Desktop-PC, Headless-NAS, Raspberry Pi 4
Teil des Upstream Betreuer Teams von Back In Time (Debianbackintime)

Antworten