ytdl-patched/yt_dlp/longname.py

286 lines
9.4 KiB
Python

# coding: utf-8
from __future__ import unicode_literals
import re
from typing import Union
from os import PathLike, fsdecode, remove, rename, sep, stat, utime, unlink, makedirs, replace
from os.path import exists, isfile, getsize, normpath, basename, dirname, isabs
from .utils import (
sanitize_open,
get_filesystem_encoding,
)
# this file is to escape long file names in following manner:
# 1. split each path segment in 255-N bytes (N=byte length of DEFAULT_DELIMITER below)
# 2. append DEFAULT_DELIMITER, so path segments are split within filesystem limit,
# with a marker on each split chunks
FS_LENGTH_LIMIT = 255 # length limit from filesystem
DEFAULT_DELIMITER = "~~"
# http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/dc4322602480/src/share/classes/java/lang/Character.java
# Constants from JDK
MIN_HIGH_SURROGATE = 0xD800
MAX_HIGH_SURROGATE = 0xDBFF
MIN_LOW_SURROGATE = 0xDC00
MAX_LOW_SURROGATE = 0xDFFF
def split_longname(input: Union[bytes, str, PathLike], encoding: str = get_filesystem_encoding()) -> bytes:
if PathLike and isinstance(input, PathLike):
input = fsdecode(input)
was_bytes = isinstance(input, bytes)
if was_bytes:
input = input.decode(encoding)
result = split_longname_str(input, encoding)
if was_bytes:
result = result.encode(encoding)
return result
def combine_longname(input: Union[bytes, str, PathLike], encoding: str = get_filesystem_encoding()) -> bytes:
if PathLike and isinstance(input, PathLike):
input = fsdecode(input)
was_bytes = isinstance(input, bytes)
if was_bytes:
input = input.decode(encoding)
result = combine_longname_str(input)
if was_bytes:
result = result.encode(encoding)
return result
def split_longname_str(input: str, encoding: str = get_filesystem_encoding()) -> str:
# https://docs.python.org/3/library/codecs.html
chunks = re.split(r'[\\/]', input)
result = []
if encoding in ('utf_8', 'U8', 'UTF', 'utf8', 'cp65001'):
# fast(er) path: UTF-8
CHUNK_LENGTH = FS_LENGTH_LIMIT - 2
for chunk in chunks:
if utf8_byte_length_all_chr(chunk) <= FS_LENGTH_LIMIT:
result.append(chunk)
continue
current_split, current_length = '', 0
for chr in chunk:
chrlen = utf8_byte_length(chr)
print(current_split)
if current_length + chrlen > CHUNK_LENGTH:
if current_split:
result.append(current_split + DEFAULT_DELIMITER)
current_split, current_length = '', 0
current_split += chr
current_length += chrlen
if current_split:
result.append(current_split)
elif encoding in ('utf_16', 'U16', 'utf16', 'utf_16_be', 'UTF-16BE', 'utf_16_le', 'UTF-16LE'):
# fast path: UTF-16 ANY Endian
CHUNK_LENGTH = FS_LENGTH_LIMIT - 4
for chunk in chunks:
if len(chunk) * 2 <= FS_LENGTH_LIMIT:
result.append(chunk)
continue
current_split, current_length = '', 0
for chr in chunk:
chrord = ord(chr)
chrlen = 2
if chrord >= MIN_HIGH_SURROGATE and chrord < (MAX_HIGH_SURROGATE + 1):
chrlen = 4
elif chrord >= MIN_LOW_SURROGATE and chrord < (MAX_LOW_SURROGATE + 1):
chrlen = 0 # same reason as UTF-8 does
if current_length + chrlen > CHUNK_LENGTH:
if current_split:
result.append(current_split + DEFAULT_DELIMITER)
current_split, current_length = '', 0
current_split += chr
current_length += chrlen
if current_split:
result.append(current_split)
elif encoding in ('utf_32', 'U32', 'utf32', 'utf_32_be', 'UTF-32BE', 'utf_32_le', 'UTF-32LE'):
# (very) fast path: UTF-32 ANY Endian
CHUNK_LENGTH = FS_LENGTH_LIMIT - 8
for chunk in chunks:
chunk_len = len(chunk)
if chunk_len * 4 <= FS_LENGTH_LIMIT:
result.append(chunk)
continue
for i in range(0, chunk_len, 4):
if chunk_len < i + 4:
result.append(chunk[i:i + 4] + DEFAULT_DELIMITER)
else:
result.append(chunk[i:i + 4])
else:
# slow path: encode each charaters
# any encoding with header/marking will break this (e.g. UTF-16 with BOM, 'idna')
CHUNK_LENGTH = FS_LENGTH_LIMIT - len(DEFAULT_DELIMITER.encode(encoding))
for chunk in chunks:
if len(chunk.encode(encoding)) <= FS_LENGTH_LIMIT:
result.append(chunk)
continue
current_split, current_length = '', 0
for chr in chunk:
chrlen = len(chr.encode(encoding))
if current_length + chrlen > CHUNK_LENGTH:
if current_split:
result.append(current_split + DEFAULT_DELIMITER)
current_split, current_length = '', 0
current_split += chr
current_length += chrlen
if current_split:
result.append(current_split)
return sep.join(result)
def combine_longname_str(input: str) -> str:
result = []
for part in re.split(r'[\\/]', input):
if result and result[-1].endswith(DEFAULT_DELIMITER):
result[-1] = result[-1][:-2] + part
else:
result.append(part)
return sep.join(result)
def utf8_byte_length(chr: Union[str, int]) -> int:
"Calculates byte length in UTF-8 without encode/decode"
if isinstance(chr, str):
chr = ord(chr[0])
if chr <= 0x7F:
return 1
if chr <= 0x7FF:
return 2
# refer to Character.isHighSurrogate from Java
if chr >= MIN_HIGH_SURROGATE and chr < (MAX_HIGH_SURROGATE + 1):
return 4 # HIGH+LOW, low should be added later without cost
if chr >= MIN_LOW_SURROGATE and chr < (MAX_LOW_SURROGATE + 1):
return 0 # length for this is already accounted at high surrogate
return 3
def utf8_byte_length_all_chr(string: str) -> int:
"Calculates byte length in UTF-8 without encode/decode"
result = 0
for chr in string:
chr = ord(chr[0])
if chr <= 0x7F:
result += 1
elif chr <= 0x7FF:
result += 2
# refer to Character.isHighSurrogate from Java
elif chr >= MIN_HIGH_SURROGATE and chr < (MAX_HIGH_SURROGATE + 1):
result += 4 # HIGH+LOW, low should be added later without cost
elif chr >= MIN_LOW_SURROGATE and chr < (MAX_LOW_SURROGATE + 1):
result += 0 # length for this is already accounted at high surrogate
else:
result += 3
return result
def ensure_directory(filename):
split = split_longname(filename, get_filesystem_encoding())
if split != filename:
try:
makedirs(normpath(dirname(split)))
except FileExistsError:
pass
return split
def escaped_open(filename, open_mode, **kwargs):
"open() that escapes long names"
split = ensure_directory(filename)
return open(split, open_mode, **kwargs)
def escaped_sanitize_open(filename, open_mode):
"sanitized_open() that escapes long names"
split = ensure_directory(filename)
a, b = sanitize_open(split, open_mode)
b = combine_longname(b)
return a, b
def escaped_stat(path, *args, **kwargs):
"os.stat() that escapes long names"
return stat(split_longname(path, get_filesystem_encoding()), *args, **kwargs)
def escaped_unlink(path, *args, **kwargs):
"os.unlink() that escapes long names"
unlink(split_longname(path, get_filesystem_encoding()), *args, **kwargs)
def escaped_path_isfile(path):
"os.path.isfile() that escapes long names"
return isfile(split_longname(path, get_filesystem_encoding()))
def escaped_path_exists(path):
"os.path.exists() that escapes long names"
return exists(split_longname(path, get_filesystem_encoding()))
def escaped_path_getsize(filename):
"os.path.getsize() that escapes long names"
return getsize(split_longname(filename, get_filesystem_encoding()))
def escaped_utime(path, *args, **kwargs):
"os.utime() that escapes long names"
utime(split_longname(path, get_filesystem_encoding()), *args, **kwargs)
def escaped_rename(src, dst, *args, **kwargs):
"os.rename() that escapes long names"
dst = ensure_directory(dst)
rename(
split_longname(src, get_filesystem_encoding()),
dst, *args, **kwargs)
def escaped_replace(src, dst, *args, **kwargs):
"os.replace() that escapes long names"
dst = ensure_directory(dst)
replace(
split_longname(src, get_filesystem_encoding()),
dst, *args, **kwargs)
def escaped_remove(path, *args, **kwargs):
"os.remove() that escapes long names"
remove(split_longname(path, get_filesystem_encoding()), *args, **kwargs)
def escaped_basename(path):
"os.path.basename() that escapes long names"
return basename(combine_longname(path, get_filesystem_encoding()))
def escaped_dirname(path):
"os.path.dirname() that escapes long names"
return dirname(combine_longname(path, get_filesystem_encoding()))
def escaped_isabs(path):
"os.path.isabs() that escapes long names"
return isabs(combine_longname(path, get_filesystem_encoding()))