Spaces:
Runtime error
Runtime error
from cpython.bytes cimport PyBytes_FromStringAndSize | |
from cpython.exc cimport PyErr_NoMemory | |
from cpython.mem cimport PyMem_Free, PyMem_Malloc, PyMem_Realloc | |
from cpython.object cimport PyObject_Str | |
from libc.stdint cimport uint8_t, uint64_t | |
from libc.string cimport memcpy | |
from multidict import istr | |
DEF BUF_SIZE = 16 * 1024 # 16KiB | |
cdef char BUFFER[BUF_SIZE] | |
cdef object _istr = istr | |
# ----------------- writer --------------------------- | |
cdef struct Writer: | |
char *buf | |
Py_ssize_t size | |
Py_ssize_t pos | |
cdef inline void _init_writer(Writer* writer): | |
writer.buf = &BUFFER[0] | |
writer.size = BUF_SIZE | |
writer.pos = 0 | |
cdef inline void _release_writer(Writer* writer): | |
if writer.buf != BUFFER: | |
PyMem_Free(writer.buf) | |
cdef inline int _write_byte(Writer* writer, uint8_t ch): | |
cdef char * buf | |
cdef Py_ssize_t size | |
if writer.pos == writer.size: | |
size = writer.size + BUF_SIZE | |
if writer.buf == BUFFER: | |
buf = <char*>PyMem_Malloc(size) | |
if buf == NULL: | |
PyErr_NoMemory() | |
return -1 | |
memcpy(buf, writer.buf, writer.size) | |
else: | |
buf = <char*>PyMem_Realloc(writer.buf, size) | |
if buf == NULL: | |
PyErr_NoMemory() | |
return -1 | |
writer.buf = buf | |
writer.size = size | |
writer.buf[writer.pos] = <char>ch | |
writer.pos += 1 | |
return 0 | |
cdef inline int _write_utf8(Writer* writer, Py_UCS4 symbol): | |
cdef uint64_t utf = <uint64_t> symbol | |
if utf < 0x80: | |
return _write_byte(writer, <uint8_t>utf) | |
elif utf < 0x800: | |
if _write_byte(writer, <uint8_t>(0xc0 | (utf >> 6))) < 0: | |
return -1 | |
return _write_byte(writer, <uint8_t>(0x80 | (utf & 0x3f))) | |
elif 0xD800 <= utf <= 0xDFFF: | |
return 0 | |
elif utf < 0x10000: | |
if _write_byte(writer, <uint8_t>(0xe0 | (utf >> 12))) < 0: | |
return -1 | |
if _write_byte(writer, <uint8_t>(0x80 | ((utf >> 6) & 0x3f))) < 0: | |
return -1 | |
return _write_byte(writer, <uint8_t>(0x80 | (utf & 0x3f))) | |
elif utf > 0x10FFFF: | |
return 0 | |
else: | |
if _write_byte(writer, <uint8_t>(0xf0 | (utf >> 18))) < 0: | |
return -1 | |
if _write_byte(writer, | |
<uint8_t>(0x80 | ((utf >> 12) & 0x3f))) < 0: | |
return -1 | |
if _write_byte(writer, | |
<uint8_t>(0x80 | ((utf >> 6) & 0x3f))) < 0: | |
return -1 | |
return _write_byte(writer, <uint8_t>(0x80 | (utf & 0x3f))) | |
cdef inline int _write_str(Writer* writer, str s): | |
cdef Py_UCS4 ch | |
for ch in s: | |
if _write_utf8(writer, ch) < 0: | |
return -1 | |
# --------------- _serialize_headers ---------------------- | |
cdef str to_str(object s): | |
typ = type(s) | |
if typ is str: | |
return <str>s | |
elif typ is _istr: | |
return PyObject_Str(s) | |
elif not isinstance(s, str): | |
raise TypeError("Cannot serialize non-str key {!r}".format(s)) | |
else: | |
return str(s) | |
cdef void _safe_header(str string) except *: | |
if "\r" in string or "\n" in string: | |
raise ValueError( | |
"Newline or carriage return character detected in HTTP status message or " | |
"header. This is a potential security issue." | |
) | |
def _serialize_headers(str status_line, headers): | |
cdef Writer writer | |
cdef object key | |
cdef object val | |
cdef bytes ret | |
_init_writer(&writer) | |
for key, val in headers.items(): | |
_safe_header(to_str(key)) | |
_safe_header(to_str(val)) | |
try: | |
if _write_str(&writer, status_line) < 0: | |
raise | |
if _write_byte(&writer, b'\r') < 0: | |
raise | |
if _write_byte(&writer, b'\n') < 0: | |
raise | |
for key, val in headers.items(): | |
if _write_str(&writer, to_str(key)) < 0: | |
raise | |
if _write_byte(&writer, b':') < 0: | |
raise | |
if _write_byte(&writer, b' ') < 0: | |
raise | |
if _write_str(&writer, to_str(val)) < 0: | |
raise | |
if _write_byte(&writer, b'\r') < 0: | |
raise | |
if _write_byte(&writer, b'\n') < 0: | |
raise | |
if _write_byte(&writer, b'\r') < 0: | |
raise | |
if _write_byte(&writer, b'\n') < 0: | |
raise | |
return PyBytes_FromStringAndSize(writer.buf, writer.pos) | |
finally: | |
_release_writer(&writer) | |