-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathaiohttp_fetch.py
More file actions
84 lines (61 loc) · 2.12 KB
/
aiohttp_fetch.py
File metadata and controls
84 lines (61 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import asyncio
import sys
import aiohttp
# from PyQt6.QtWidgets import (
from PySide6.QtWidgets import (
QApplication,
QLabel,
QLineEdit,
QPushButton,
QTextEdit,
QVBoxLayout,
QWidget,
)
from qasync import QEventLoop, asyncClose, asyncSlot
class MainWindow(QWidget):
"""Main window."""
_DEF_URL: str = "https://jsonplaceholder.typicode.com/todos/1"
"""Default URL."""
def __init__(self):
super().__init__()
self.setLayout(QVBoxLayout())
self.lbl_status = QLabel("Idle", self)
self.layout().addWidget(self.lbl_status)
self.edit_url = QLineEdit(self._DEF_URL, self)
self.layout().addWidget(self.edit_url)
self.edit_response = QTextEdit("", self)
self.layout().addWidget(self.edit_response)
self.btn_fetch = QPushButton("Fetch", self)
self.btn_fetch.clicked.connect(self.on_btn_fetch_clicked)
self.layout().addWidget(self.btn_fetch)
self.session: aiohttp.ClientSession
@asyncClose
async def closeEvent(self, event): # noqa:N802
await self.session.close()
async def boot(self):
self.session = aiohttp.ClientSession()
@asyncSlot()
async def on_btn_fetch_clicked(self):
self.btn_fetch.setEnabled(False)
self.lbl_status.setText("Fetching...")
try:
async with self.session.get(self.edit_url.text()) as r:
self.edit_response.setText(await r.text())
except Exception as exc:
self.lbl_status.setText("Error: {}".format(exc))
else:
self.lbl_status.setText("Finished!")
finally:
self.btn_fetch.setEnabled(True)
if __name__ == "__main__":
app = QApplication(sys.argv)
app_close_event = asyncio.Event()
app.aboutToQuit.connect(app_close_event.set)
main_window = MainWindow()
main_window.show()
async def async_main():
asyncio.create_task(main_window.boot())
await app_close_event.wait()
# for 3.11 or older use qasync.run instead of asyncio.run
# qasync.run(async_main())
asyncio.run(async_main(), loop_factory=QEventLoop)