"""Http related parsers and protocol.""" import asyncio import zlib from typing import Any, Awaitable, Callable, NamedTuple, Optional, Union # noqa from multidict import CIMultiDict from .abc import AbstractStreamWriter from .base_protocol import BaseProtocol from .compression_utils import ZLibCompressor from .helpers import NO_EXTENSIONS __all__ = ("StreamWriter", "HttpVersion", "HttpVersion10", "HttpVersion11") class HttpVersion(NamedTuple): major: int minor: int HttpVersion10 = HttpVersion(1, 0) HttpVersion11 = HttpVersion(1, 1) _T_OnChunkSent = Optional[Callable[[bytes], Awaitable[None]]] _T_OnHeadersSent = Optional[Callable[["CIMultiDict[str]"], Awaitable[None]]] class StreamWriter(AbstractStreamWriter): def __init__( self, protocol: BaseProtocol, loop: asyncio.AbstractEventLoop, on_chunk_sent: _T_OnChunkSent = None, on_headers_sent: _T_OnHeadersSent = None, ) -> None: self._protocol = protocol self.loop = loop self.length = None self.chunked = False self.buffer_size = 0 self.output_size = 0 self._eof = False self._compress: Optional[ZLibCompressor] = None self._drain_waiter = None self._on_chunk_sent: _T_OnChunkSent = on_chunk_sent self._on_headers_sent: _T_OnHeadersSent = on_headers_sent @property def transport(self) -> Optional[asyncio.Transport]: return self._protocol.transport @property def protocol(self) -> BaseProtocol: return self._protocol def enable_chunking(self) -> None: self.chunked = True def enable_compression( self, encoding: str = "deflate", strategy: int = zlib.Z_DEFAULT_STRATEGY ) -> None: self._compress = ZLibCompressor(encoding=encoding, strategy=strategy) def _write(self, chunk: bytes) -> None: size = len(chunk) self.buffer_size += size self.output_size += size transport = self.transport if not self._protocol.connected or transport is None or transport.is_closing(): raise ConnectionResetError("Cannot write to closing transport") transport.write(chunk) async def write( self, chunk: bytes, *, drain: bool = True, LIMIT: int = 0x10000 ) -> None: """Writes chunk of data to a stream. write_eof() indicates end of stream. writer can't be used after write_eof() method being called. write() return drain future. """ if self._on_chunk_sent is not None: await self._on_chunk_sent(chunk) if isinstance(chunk, memoryview): if chunk.nbytes != len(chunk): # just reshape it chunk = chunk.cast("c") if self._compress is not None: chunk = await self._compress.compress(chunk) if not chunk: return if self.length is not None: chunk_len = len(chunk) if self.length >= chunk_len: self.length = self.length - chunk_len else: chunk = chunk[: self.length] self.length = 0 if not chunk: return if chunk: if self.chunked: chunk_len_pre = ("%x\r\n" % len(chunk)).encode("ascii") chunk = chunk_len_pre + chunk + b"\r\n" self._write(chunk) if self.buffer_size > LIMIT and drain: self.buffer_size = 0 await self.drain() async def write_headers( self, status_line: str, headers: "CIMultiDict[str]" ) -> None: """Write request/response status and headers.""" if self._on_headers_sent is not None: await self._on_headers_sent(headers) # status + headers buf = _serialize_headers(status_line, headers) self._write(buf) async def write_eof(self, chunk: bytes = b"") -> None: if self._eof: return if chunk and self._on_chunk_sent is not None: await self._on_chunk_sent(chunk) if self._compress: if chunk: chunk = await self._compress.compress(chunk) chunk += self._compress.flush() if chunk and self.chunked: chunk_len = ("%x\r\n" % len(chunk)).encode("ascii") chunk = chunk_len + chunk + b"\r\n0\r\n\r\n" else: if self.chunked: if chunk: chunk_len = ("%x\r\n" % len(chunk)).encode("ascii") chunk = chunk_len + chunk + b"\r\n0\r\n\r\n" else: chunk = b"0\r\n\r\n" if chunk: self._write(chunk) await self.drain() self._eof = True async def drain(self) -> None: """Flush the write buffer. The intended use is to write await w.write(data) await w.drain() """ if self._protocol.transport is not None: await self._protocol._drain_helper() def _safe_header(string: str) -> str: if "\r" in string or "\n" in string: raise ValueError( "Newline or carriage return detected in headers. " "Potential header injection attack." ) return string def _py_serialize_headers(status_line: str, headers: "CIMultiDict[str]") -> bytes: headers_gen = (_safe_header(k) + ": " + _safe_header(v) for k, v in headers.items()) line = status_line + "\r\n" + "\r\n".join(headers_gen) + "\r\n\r\n" return line.encode("utf-8") _serialize_headers = _py_serialize_headers try: import aiohttp._http_writer as _http_writer # type: ignore[import-not-found] _c_serialize_headers = _http_writer._serialize_headers if not NO_EXTENSIONS: _serialize_headers = _c_serialize_headers except ImportError: pass
Name | Type | Size | Permission | Actions |
---|---|---|---|---|
.hash | Folder | 0755 |
|
|
__pycache__ | Folder | 0755 |
|
|
__init__.py | File | 7.58 KB | 0644 |
|
_cparser.pxd | File | 4.22 KB | 0644 |
|
_find_header.pxd | File | 68 B | 0644 |
|
_headers.pxi | File | 1.96 KB | 0644 |
|
_helpers.cpython-311-x86_64-linux-gnu.so | File | 86.01 KB | 0755 |
|
_helpers.pyi | File | 202 B | 0644 |
|
_helpers.pyx | File | 1.02 KB | 0644 |
|
_http_parser.cpython-311-x86_64-linux-gnu.so | File | 569.59 KB | 0755 |
|
_http_parser.pyx | File | 27.4 KB | 0644 |
|
_http_writer.cpython-311-x86_64-linux-gnu.so | File | 75.01 KB | 0755 |
|
_http_writer.pyx | File | 4.47 KB | 0644 |
|
_websocket.cpython-311-x86_64-linux-gnu.so | File | 53.37 KB | 0755 |
|
_websocket.pyx | File | 1.52 KB | 0644 |
|
abc.py | File | 5.37 KB | 0644 |
|
base_protocol.py | File | 2.68 KB | 0644 |
|
client.py | File | 46.17 KB | 0644 |
|
client_exceptions.py | File | 9.19 KB | 0644 |
|
client_proto.py | File | 8.45 KB | 0644 |
|
client_reqrep.py | File | 38.75 KB | 0644 |
|
client_ws.py | File | 10.75 KB | 0644 |
|
compression_utils.py | File | 4.9 KB | 0644 |
|
connector.py | File | 51.56 KB | 0644 |
|
cookiejar.py | File | 13.69 KB | 0644 |
|
formdata.py | File | 5.96 KB | 0644 |
|
hdrs.py | File | 4.5 KB | 0644 |
|
helpers.py | File | 29.55 KB | 0644 |
|
http.py | File | 1.8 KB | 0644 |
|
http_exceptions.py | File | 2.65 KB | 0644 |
|
http_parser.py | File | 34.66 KB | 0644 |
|
http_websocket.py | File | 26.09 KB | 0644 |
|
http_writer.py | File | 5.79 KB | 0644 |
|
locks.py | File | 1.11 KB | 0644 |
|
log.py | File | 325 B | 0644 |
|
multipart.py | File | 31.71 KB | 0644 |
|
payload.py | File | 13.22 KB | 0644 |
|
payload_streamer.py | File | 2.04 KB | 0644 |
|
py.typed | File | 7 B | 0644 |
|
pytest_plugin.py | File | 11.33 KB | 0644 |
|
resolver.py | File | 4.95 KB | 0644 |
|
streams.py | File | 20.35 KB | 0644 |
|
tcp_helpers.py | File | 961 B | 0644 |
|
test_utils.py | File | 19.71 KB | 0644 |
|
tracing.py | File | 14.78 KB | 0644 |
|
typedefs.py | File | 1.44 KB | 0644 |
|
web.py | File | 18.81 KB | 0644 |
|
web_app.py | File | 17.88 KB | 0644 |
|
web_exceptions.py | File | 10.12 KB | 0644 |
|
web_fileresponse.py | File | 11.15 KB | 0644 |
|
web_log.py | File | 7.62 KB | 0644 |
|
web_middlewares.py | File | 3.94 KB | 0644 |
|
web_protocol.py | File | 22.5 KB | 0644 |
|
web_request.py | File | 28.08 KB | 0644 |
|
web_response.py | File | 27.08 KB | 0644 |
|
web_routedef.py | File | 5.99 KB | 0644 |
|
web_runner.py | File | 11.46 KB | 0644 |
|
web_server.py | File | 2.53 KB | 0644 |
|
web_urldispatcher.py | File | 39.12 KB | 0644 |
|
web_ws.py | File | 18.21 KB | 0644 |
|
worker.py | File | 7.78 KB | 0644 |
|