sim16/matteo_env/Lib/site-packages/aiohttp/http_websocket.py

702 lines
25 KiB
Python
Raw Normal View History

2020-12-20 00:08:09 +00:00
"""WebSocket protocol versions 13 and 8."""
import asyncio
import collections
import json
import random
import re
import sys
import zlib
from enum import IntEnum
from struct import Struct
2022-09-18 13:17:20 +00:00
from typing import Any, Callable, List, Optional, Pattern, Set, Tuple, Union, cast
2020-12-20 00:08:09 +00:00
from .base_protocol import BaseProtocol
from .helpers import NO_EXTENSIONS
from .streams import DataQueue
2022-09-18 13:17:20 +00:00
from .typedefs import Final
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
__all__ = (
"WS_CLOSED_MESSAGE",
"WS_CLOSING_MESSAGE",
"WS_KEY",
"WebSocketReader",
"WebSocketWriter",
"WSMessage",
"WebSocketError",
"WSMsgType",
"WSCloseCode",
)
2020-12-20 00:08:09 +00:00
class WSCloseCode(IntEnum):
OK = 1000
GOING_AWAY = 1001
PROTOCOL_ERROR = 1002
UNSUPPORTED_DATA = 1003
2022-09-18 13:17:20 +00:00
ABNORMAL_CLOSURE = 1006
2020-12-20 00:08:09 +00:00
INVALID_TEXT = 1007
POLICY_VIOLATION = 1008
MESSAGE_TOO_BIG = 1009
MANDATORY_EXTENSION = 1010
INTERNAL_ERROR = 1011
SERVICE_RESTART = 1012
TRY_AGAIN_LATER = 1013
2022-09-18 13:17:20 +00:00
BAD_GATEWAY = 1014
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
ALLOWED_CLOSE_CODES: Final[Set[int]] = {int(i) for i in WSCloseCode}
2020-12-20 00:08:09 +00:00
class WSMsgType(IntEnum):
# websocket spec types
CONTINUATION = 0x0
TEXT = 0x1
BINARY = 0x2
PING = 0x9
2022-09-18 13:17:20 +00:00
PONG = 0xA
2020-12-20 00:08:09 +00:00
CLOSE = 0x8
# aiohttp specific types
CLOSING = 0x100
CLOSED = 0x101
ERROR = 0x102
text = TEXT
binary = BINARY
ping = PING
pong = PONG
close = CLOSE
closing = CLOSING
closed = CLOSED
error = ERROR
2022-09-18 13:17:20 +00:00
WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
UNPACK_LEN2 = Struct("!H").unpack_from
UNPACK_LEN3 = Struct("!Q").unpack_from
UNPACK_CLOSE_CODE = Struct("!H").unpack
PACK_LEN1 = Struct("!BB").pack
PACK_LEN2 = Struct("!BBH").pack
PACK_LEN3 = Struct("!BBQ").pack
PACK_CLOSE_CODE = Struct("!H").pack
MSG_SIZE: Final[int] = 2 ** 14
DEFAULT_LIMIT: Final[int] = 2 ** 16
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
_WSMessageBase = collections.namedtuple("_WSMessageBase", ["type", "data", "extra"])
2020-12-20 00:08:09 +00:00
class WSMessage(_WSMessageBase):
2022-09-18 13:17:20 +00:00
def json(self, *, loads: Callable[[Any], Any] = json.loads) -> Any:
2020-12-20 00:08:09 +00:00
"""Return parsed JSON data.
.. versionadded:: 0.22
"""
return loads(self.data)
WS_CLOSED_MESSAGE = WSMessage(WSMsgType.CLOSED, None, None)
WS_CLOSING_MESSAGE = WSMessage(WSMsgType.CLOSING, None, None)
class WebSocketError(Exception):
"""WebSocket protocol parser error."""
def __init__(self, code: int, message: str) -> None:
self.code = code
super().__init__(code, message)
def __str__(self) -> str:
2022-09-18 13:17:20 +00:00
return cast(str, self.args[1])
2020-12-20 00:08:09 +00:00
class WSHandshakeError(Exception):
"""WebSocket protocol handshake error."""
2022-09-18 13:17:20 +00:00
native_byteorder: Final[str] = sys.byteorder
2020-12-20 00:08:09 +00:00
# Used by _websocket_mask_python
2022-09-18 13:17:20 +00:00
_XOR_TABLE: Final[List[bytes]] = [bytes(a ^ b for a in range(256)) for b in range(256)]
2020-12-20 00:08:09 +00:00
def _websocket_mask_python(mask: bytes, data: bytearray) -> None:
"""Websocket masking function.
`mask` is a `bytes` object of length 4; `data` is a `bytearray`
object of any length. The contents of `data` are masked with `mask`,
as specified in section 5.3 of RFC 6455.
Note that this function mutates the `data` argument.
This pure-python implementation may be replaced by an optimized
version when available.
"""
assert isinstance(data, bytearray), data
assert len(mask) == 4, mask
if data:
a, b, c, d = (_XOR_TABLE[n] for n in mask)
data[::4] = data[::4].translate(a)
data[1::4] = data[1::4].translate(b)
data[2::4] = data[2::4].translate(c)
data[3::4] = data[3::4].translate(d)
if NO_EXTENSIONS: # pragma: no cover
_websocket_mask = _websocket_mask_python
else:
try:
2022-09-18 13:17:20 +00:00
from ._websocket import _websocket_mask_cython # type: ignore[import]
2020-12-20 00:08:09 +00:00
_websocket_mask = _websocket_mask_cython
except ImportError: # pragma: no cover
_websocket_mask = _websocket_mask_python
2022-09-18 13:17:20 +00:00
_WS_DEFLATE_TRAILING: Final[bytes] = bytes([0x00, 0x00, 0xFF, 0xFF])
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
_WS_EXT_RE: Final[Pattern[str]] = re.compile(
r"^(?:;\s*(?:"
r"(server_no_context_takeover)|"
r"(client_no_context_takeover)|"
r"(server_max_window_bits(?:=(\d+))?)|"
r"(client_max_window_bits(?:=(\d+))?)))*$"
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?")
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]:
2020-12-20 00:08:09 +00:00
if not extstr:
return 0, False
compress = 0
notakeover = False
for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
defext = ext.group(1)
# Return compress = 15 when get `permessage-deflate`
if not defext:
compress = 15
break
match = _WS_EXT_RE.match(defext)
if match:
compress = 15
if isserver:
# Server never fail to detect compress handshake.
# Server does not need to send max wbit to client
if match.group(4):
compress = int(match.group(4))
# Group3 must match if group4 matches
# Compress wbit 8 does not support in zlib
# If compress level not support,
# CONTINUE to next extension
if compress > 15 or compress < 9:
compress = 0
continue
if match.group(1):
notakeover = True
# Ignore regex group 5 & 6 for client_max_window_bits
break
else:
if match.group(6):
compress = int(match.group(6))
# Group5 must match if group6 matches
# Compress wbit 8 does not support in zlib
# If compress level not support,
# FAIL the parse progress
if compress > 15 or compress < 9:
2022-09-18 13:17:20 +00:00
raise WSHandshakeError("Invalid window size")
2020-12-20 00:08:09 +00:00
if match.group(2):
notakeover = True
# Ignore regex group 5 & 6 for client_max_window_bits
break
# Return Fail if client side and not match
elif not isserver:
2022-09-18 13:17:20 +00:00
raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
2020-12-20 00:08:09 +00:00
return compress, notakeover
2022-09-18 13:17:20 +00:00
def ws_ext_gen(
compress: int = 15, isserver: bool = False, server_notakeover: bool = False
) -> str:
2020-12-20 00:08:09 +00:00
# client_notakeover=False not used for server
# compress wbit 8 does not support in zlib
if compress < 9 or compress > 15:
2022-09-18 13:17:20 +00:00
raise ValueError(
"Compress wbits must between 9 and 15, " "zlib does not support wbits=8"
)
enabledext = ["permessage-deflate"]
2020-12-20 00:08:09 +00:00
if not isserver:
2022-09-18 13:17:20 +00:00
enabledext.append("client_max_window_bits")
2020-12-20 00:08:09 +00:00
if compress < 15:
2022-09-18 13:17:20 +00:00
enabledext.append("server_max_window_bits=" + str(compress))
2020-12-20 00:08:09 +00:00
if server_notakeover:
2022-09-18 13:17:20 +00:00
enabledext.append("server_no_context_takeover")
2020-12-20 00:08:09 +00:00
# if client_notakeover:
# enabledext.append('client_no_context_takeover')
2022-09-18 13:17:20 +00:00
return "; ".join(enabledext)
2020-12-20 00:08:09 +00:00
class WSParserState(IntEnum):
READ_HEADER = 1
READ_PAYLOAD_LENGTH = 2
READ_PAYLOAD_MASK = 3
READ_PAYLOAD = 4
class WebSocketReader:
2022-09-18 13:17:20 +00:00
def __init__(
self, queue: DataQueue[WSMessage], max_msg_size: int, compress: bool = True
) -> None:
2020-12-20 00:08:09 +00:00
self.queue = queue
self._max_msg_size = max_msg_size
self._exc = None # type: Optional[BaseException]
self._partial = bytearray()
self._state = WSParserState.READ_HEADER
self._opcode = None # type: Optional[int]
self._frame_fin = False
self._frame_opcode = None # type: Optional[int]
self._frame_payload = bytearray()
2022-09-18 13:17:20 +00:00
self._tail = b""
2020-12-20 00:08:09 +00:00
self._has_mask = False
self._frame_mask = None # type: Optional[bytes]
self._payload_length = 0
self._payload_length_flag = 0
self._compressed = None # type: Optional[bool]
self._decompressobj = None # type: Any # zlib.decompressobj actually
self._compress = compress
def feed_eof(self) -> None:
self.queue.feed_eof()
def feed_data(self, data: bytes) -> Tuple[bool, bytes]:
if self._exc:
return True, data
try:
return self._feed_data(data)
except Exception as exc:
self._exc = exc
self.queue.set_exception(exc)
2022-09-18 13:17:20 +00:00
return True, b""
2020-12-20 00:08:09 +00:00
def _feed_data(self, data: bytes) -> Tuple[bool, bytes]:
for fin, opcode, payload, compressed in self.parse_frame(data):
if compressed and not self._decompressobj:
self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
if opcode == WSMsgType.CLOSE:
if len(payload) >= 2:
close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
2022-09-18 13:17:20 +00:00
if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
2020-12-20 00:08:09 +00:00
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
f"Invalid close code: {close_code}",
)
2020-12-20 00:08:09 +00:00
try:
2022-09-18 13:17:20 +00:00
close_message = payload[2:].decode("utf-8")
2020-12-20 00:08:09 +00:00
except UnicodeDecodeError as exc:
raise WebSocketError(
2022-09-18 13:17:20 +00:00
WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
) from exc
2020-12-20 00:08:09 +00:00
msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)
elif payload:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
f"Invalid close frame: {fin} {opcode} {payload!r}",
)
2020-12-20 00:08:09 +00:00
else:
2022-09-18 13:17:20 +00:00
msg = WSMessage(WSMsgType.CLOSE, 0, "")
2020-12-20 00:08:09 +00:00
self.queue.feed_data(msg, 0)
elif opcode == WSMsgType.PING:
self.queue.feed_data(
2022-09-18 13:17:20 +00:00
WSMessage(WSMsgType.PING, payload, ""), len(payload)
)
2020-12-20 00:08:09 +00:00
elif opcode == WSMsgType.PONG:
self.queue.feed_data(
2022-09-18 13:17:20 +00:00
WSMessage(WSMsgType.PONG, payload, ""), len(payload)
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
elif (
opcode not in (WSMsgType.TEXT, WSMsgType.BINARY)
and self._opcode is None
):
2020-12-20 00:08:09 +00:00
raise WebSocketError(
2022-09-18 13:17:20 +00:00
WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"
)
2020-12-20 00:08:09 +00:00
else:
# load text/binary
if not fin:
# got partial frame payload
if opcode != WSMsgType.CONTINUATION:
self._opcode = opcode
self._partial.extend(payload)
2022-09-18 13:17:20 +00:00
if self._max_msg_size and len(self._partial) >= self._max_msg_size:
2020-12-20 00:08:09 +00:00
raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG,
"Message size {} exceeds limit {}".format(
2022-09-18 13:17:20 +00:00
len(self._partial), self._max_msg_size
),
)
2020-12-20 00:08:09 +00:00
else:
# previous frame was non finished
# we should get continuation opcode
if self._partial:
if opcode != WSMsgType.CONTINUATION:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
"The opcode in non-fin frame is expected "
"to be zero, got {!r}".format(opcode),
)
2020-12-20 00:08:09 +00:00
if opcode == WSMsgType.CONTINUATION:
assert self._opcode is not None
opcode = self._opcode
self._opcode = None
self._partial.extend(payload)
2022-09-18 13:17:20 +00:00
if self._max_msg_size and len(self._partial) >= self._max_msg_size:
2020-12-20 00:08:09 +00:00
raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG,
"Message size {} exceeds limit {}".format(
2022-09-18 13:17:20 +00:00
len(self._partial), self._max_msg_size
),
)
2020-12-20 00:08:09 +00:00
# Decompress process must to be done after all packets
# received.
if compressed:
self._partial.extend(_WS_DEFLATE_TRAILING)
payload_merged = self._decompressobj.decompress(
2022-09-18 13:17:20 +00:00
self._partial, self._max_msg_size
)
2020-12-20 00:08:09 +00:00
if self._decompressobj.unconsumed_tail:
left = len(self._decompressobj.unconsumed_tail)
raise WebSocketError(
WSCloseCode.MESSAGE_TOO_BIG,
2022-09-18 13:17:20 +00:00
"Decompressed message size {} exceeds limit {}".format(
self._max_msg_size + left, self._max_msg_size
),
2020-12-20 00:08:09 +00:00
)
else:
payload_merged = bytes(self._partial)
self._partial.clear()
if opcode == WSMsgType.TEXT:
try:
2022-09-18 13:17:20 +00:00
text = payload_merged.decode("utf-8")
2020-12-20 00:08:09 +00:00
self.queue.feed_data(
2022-09-18 13:17:20 +00:00
WSMessage(WSMsgType.TEXT, text, ""), len(text)
)
2020-12-20 00:08:09 +00:00
except UnicodeDecodeError as exc:
raise WebSocketError(
2022-09-18 13:17:20 +00:00
WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
) from exc
2020-12-20 00:08:09 +00:00
else:
self.queue.feed_data(
2022-09-18 13:17:20 +00:00
WSMessage(WSMsgType.BINARY, payload_merged, ""),
len(payload_merged),
)
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
return False, b""
2020-12-20 00:08:09 +00:00
2022-09-18 13:17:20 +00:00
def parse_frame(
self, buf: bytes
) -> List[Tuple[bool, Optional[int], bytearray, Optional[bool]]]:
2020-12-20 00:08:09 +00:00
"""Return the next frame from the socket."""
frames = []
if self._tail:
2022-09-18 13:17:20 +00:00
buf, self._tail = self._tail + buf, b""
2020-12-20 00:08:09 +00:00
start_pos = 0
buf_length = len(buf)
while True:
# read header
if self._state == WSParserState.READ_HEADER:
if buf_length - start_pos >= 2:
2022-09-18 13:17:20 +00:00
data = buf[start_pos : start_pos + 2]
2020-12-20 00:08:09 +00:00
start_pos += 2
first_byte, second_byte = data
fin = (first_byte >> 7) & 1
rsv1 = (first_byte >> 6) & 1
rsv2 = (first_byte >> 5) & 1
rsv3 = (first_byte >> 4) & 1
2022-09-18 13:17:20 +00:00
opcode = first_byte & 0xF
2020-12-20 00:08:09 +00:00
# frame-fin = %x0 ; more frames of this message follow
# / %x1 ; final frame of this message
# frame-rsv1 = %x0 ;
# 1 bit, MUST be 0 unless negotiated otherwise
# frame-rsv2 = %x0 ;
# 1 bit, MUST be 0 unless negotiated otherwise
# frame-rsv3 = %x0 ;
# 1 bit, MUST be 0 unless negotiated otherwise
#
# Remove rsv1 from this test for deflate development
if rsv2 or rsv3 or (rsv1 and not self._compress):
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
"Received frame with non-zero reserved bits",
)
2020-12-20 00:08:09 +00:00
if opcode > 0x7 and fin == 0:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
"Received fragmented control frame",
)
2020-12-20 00:08:09 +00:00
has_mask = (second_byte >> 7) & 1
2022-09-18 13:17:20 +00:00
length = second_byte & 0x7F
2020-12-20 00:08:09 +00:00
# Control frames MUST have a payload
# length of 125 bytes or less
if opcode > 0x7 and length > 125:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
"Control frame payload cannot be " "larger than 125 bytes",
)
2020-12-20 00:08:09 +00:00
# Set compress status if last package is FIN
# OR set compress status if this is first fragment
# Raise error if not first fragment with rsv1 = 0x1
if self._frame_fin or self._compressed is None:
self._compressed = True if rsv1 else False
elif rsv1:
raise WebSocketError(
WSCloseCode.PROTOCOL_ERROR,
2022-09-18 13:17:20 +00:00
"Received frame with non-zero reserved bits",
)
2020-12-20 00:08:09 +00:00
self._frame_fin = bool(fin)
self._frame_opcode = opcode
self._has_mask = bool(has_mask)
self._payload_length_flag = length
self._state = WSParserState.READ_PAYLOAD_LENGTH
else:
break
# read payload length
if self._state == WSParserState.READ_PAYLOAD_LENGTH:
length = self._payload_length_flag
if length == 126:
if buf_length - start_pos >= 2:
2022-09-18 13:17:20 +00:00
data = buf[start_pos : start_pos + 2]
2020-12-20 00:08:09 +00:00
start_pos += 2
length = UNPACK_LEN2(data)[0]
self._payload_length = length
self._state = (
WSParserState.READ_PAYLOAD_MASK
if self._has_mask
2022-09-18 13:17:20 +00:00
else WSParserState.READ_PAYLOAD
)
2020-12-20 00:08:09 +00:00
else:
break
elif length > 126:
if buf_length - start_pos >= 8:
2022-09-18 13:17:20 +00:00
data = buf[start_pos : start_pos + 8]
2020-12-20 00:08:09 +00:00
start_pos += 8
length = UNPACK_LEN3(data)[0]
self._payload_length = length
self._state = (
WSParserState.READ_PAYLOAD_MASK
if self._has_mask
2022-09-18 13:17:20 +00:00
else WSParserState.READ_PAYLOAD
)
2020-12-20 00:08:09 +00:00
else:
break
else:
self._payload_length = length
self._state = (
WSParserState.READ_PAYLOAD_MASK
if self._has_mask
2022-09-18 13:17:20 +00:00
else WSParserState.READ_PAYLOAD
)
2020-12-20 00:08:09 +00:00
# read payload mask
if self._state == WSParserState.READ_PAYLOAD_MASK:
if buf_length - start_pos >= 4:
2022-09-18 13:17:20 +00:00
self._frame_mask = buf[start_pos : start_pos + 4]
2020-12-20 00:08:09 +00:00
start_pos += 4
self._state = WSParserState.READ_PAYLOAD
else:
break
if self._state == WSParserState.READ_PAYLOAD:
length = self._payload_length
payload = self._frame_payload
chunk_len = buf_length - start_pos
if length >= chunk_len:
self._payload_length = length - chunk_len
payload.extend(buf[start_pos:])
start_pos = buf_length
else:
self._payload_length = 0
2022-09-18 13:17:20 +00:00
payload.extend(buf[start_pos : start_pos + length])
2020-12-20 00:08:09 +00:00
start_pos = start_pos + length
if self._payload_length == 0:
if self._has_mask:
assert self._frame_mask is not None
_websocket_mask(self._frame_mask, payload)
2022-09-18 13:17:20 +00:00
frames.append(
(self._frame_fin, self._frame_opcode, payload, self._compressed)
)
2020-12-20 00:08:09 +00:00
self._frame_payload = bytearray()
self._state = WSParserState.READ_HEADER
else:
break
self._tail = buf[start_pos:]
return frames
class WebSocketWriter:
2022-09-18 13:17:20 +00:00
def __init__(
self,
protocol: BaseProtocol,
transport: asyncio.Transport,
*,
use_mask: bool = False,
limit: int = DEFAULT_LIMIT,
random: Any = random.Random(),
compress: int = 0,
notakeover: bool = False,
) -> None:
2020-12-20 00:08:09 +00:00
self.protocol = protocol
self.transport = transport
self.use_mask = use_mask
self.randrange = random.randrange
self.compress = compress
self.notakeover = notakeover
self._closing = False
self._limit = limit
self._output_size = 0
self._compressobj = None # type: Any # actually compressobj
2022-09-18 13:17:20 +00:00
async def _send_frame(
self, message: bytes, opcode: int, compress: Optional[int] = None
) -> None:
2020-12-20 00:08:09 +00:00
"""Send a frame over the websocket with message as its payload."""
2022-09-18 13:17:20 +00:00
if self._closing and not (opcode & WSMsgType.CLOSE):
raise ConnectionResetError("Cannot write to closing transport")
2020-12-20 00:08:09 +00:00
rsv = 0
# Only compress larger packets (disabled)
# Does small packet needs to be compressed?
# if self.compress and opcode < 8 and len(message) > 124:
if (compress or self.compress) and opcode < 8:
if compress:
# Do not set self._compress if compressing is for this frame
2022-09-18 13:17:20 +00:00
compressobj = zlib.compressobj(level=zlib.Z_BEST_SPEED, wbits=-compress)
2020-12-20 00:08:09 +00:00
else: # self.compress
if not self._compressobj:
2022-09-18 13:17:20 +00:00
self._compressobj = zlib.compressobj(
level=zlib.Z_BEST_SPEED, wbits=-self.compress
)
2020-12-20 00:08:09 +00:00
compressobj = self._compressobj
message = compressobj.compress(message)
message = message + compressobj.flush(
2022-09-18 13:17:20 +00:00
zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH
)
2020-12-20 00:08:09 +00:00
if message.endswith(_WS_DEFLATE_TRAILING):
message = message[:-4]
rsv = rsv | 0x40
msg_length = len(message)
use_mask = self.use_mask
if use_mask:
mask_bit = 0x80
else:
mask_bit = 0
if msg_length < 126:
header = PACK_LEN1(0x80 | rsv | opcode, msg_length | mask_bit)
elif msg_length < (1 << 16):
header = PACK_LEN2(0x80 | rsv | opcode, 126 | mask_bit, msg_length)
else:
header = PACK_LEN3(0x80 | rsv | opcode, 127 | mask_bit, msg_length)
if use_mask:
2022-09-18 13:17:20 +00:00
mask = self.randrange(0, 0xFFFFFFFF)
mask = mask.to_bytes(4, "big")
2020-12-20 00:08:09 +00:00
message = bytearray(message)
_websocket_mask(mask, message)
2022-09-18 13:17:20 +00:00
self._write(header + mask + message)
2020-12-20 00:08:09 +00:00
self._output_size += len(header) + len(mask) + len(message)
else:
if len(message) > MSG_SIZE:
2022-09-18 13:17:20 +00:00
self._write(header)
self._write(message)
2020-12-20 00:08:09 +00:00
else:
2022-09-18 13:17:20 +00:00
self._write(header + message)
2020-12-20 00:08:09 +00:00
self._output_size += len(header) + len(message)
if self._output_size > self._limit:
self._output_size = 0
await self.protocol._drain_helper()
2022-09-18 13:17:20 +00:00
def _write(self, data: bytes) -> None:
if self.transport is None or self.transport.is_closing():
raise ConnectionResetError("Cannot write to closing transport")
self.transport.write(data)
async def pong(self, message: bytes = b"") -> None:
2020-12-20 00:08:09 +00:00
"""Send pong message."""
if isinstance(message, str):
2022-09-18 13:17:20 +00:00
message = message.encode("utf-8")
2020-12-20 00:08:09 +00:00
await self._send_frame(message, WSMsgType.PONG)
2022-09-18 13:17:20 +00:00
async def ping(self, message: bytes = b"") -> None:
2020-12-20 00:08:09 +00:00
"""Send ping message."""
if isinstance(message, str):
2022-09-18 13:17:20 +00:00
message = message.encode("utf-8")
2020-12-20 00:08:09 +00:00
await self._send_frame(message, WSMsgType.PING)
2022-09-18 13:17:20 +00:00
async def send(
self,
message: Union[str, bytes],
binary: bool = False,
compress: Optional[int] = None,
) -> None:
2020-12-20 00:08:09 +00:00
"""Send a frame over the websocket with message as its payload."""
if isinstance(message, str):
2022-09-18 13:17:20 +00:00
message = message.encode("utf-8")
2020-12-20 00:08:09 +00:00
if binary:
await self._send_frame(message, WSMsgType.BINARY, compress)
else:
await self._send_frame(message, WSMsgType.TEXT, compress)
2022-09-18 13:17:20 +00:00
async def close(self, code: int = 1000, message: bytes = b"") -> None:
2020-12-20 00:08:09 +00:00
"""Close the websocket, sending the specified code and message."""
if isinstance(message, str):
2022-09-18 13:17:20 +00:00
message = message.encode("utf-8")
2020-12-20 00:08:09 +00:00
try:
await self._send_frame(
2022-09-18 13:17:20 +00:00
PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE
)
2020-12-20 00:08:09 +00:00
finally:
self._closing = True