AiOS / mmcv /tests /test_fileclient.py
ttxskk
update
d7e58f0
raw
history blame
33.7 kB
# Copyright (c) OpenMMLab. All rights reserved.
import os
import os.path as osp
import sys
import tempfile
from contextlib import contextmanager
from copy import deepcopy
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import mmcv
from mmcv import BaseStorageBackend, FileClient
from mmcv.utils import has_method
sys.modules['ceph'] = MagicMock()
sys.modules['petrel_client'] = MagicMock()
sys.modules['petrel_client.client'] = MagicMock()
sys.modules['mc'] = MagicMock()
@contextmanager
def build_temporary_directory():
"""Build a temporary directory containing many files to test
``FileClient.list_dir_or_file``.
. \n
| -- dir1 \n
| -- | -- text3.txt \n
| -- dir2 \n
| -- | -- dir3 \n
| -- | -- | -- text4.txt \n
| -- | -- img.jpg \n
| -- text1.txt \n
| -- text2.txt \n
"""
with tempfile.TemporaryDirectory() as tmp_dir:
text1 = Path(tmp_dir) / 'text1.txt'
text1.open('w').write('text1')
text2 = Path(tmp_dir) / 'text2.txt'
text2.open('w').write('text2')
dir1 = Path(tmp_dir) / 'dir1'
dir1.mkdir()
text3 = dir1 / 'text3.txt'
text3.open('w').write('text3')
dir2 = Path(tmp_dir) / 'dir2'
dir2.mkdir()
jpg1 = dir2 / 'img.jpg'
jpg1.open('wb').write(b'img')
dir3 = dir2 / 'dir3'
dir3.mkdir()
text4 = dir3 / 'text4.txt'
text4.open('w').write('text4')
yield tmp_dir
@contextmanager
def delete_and_reset_method(obj, method):
method_obj = deepcopy(getattr(type(obj), method))
try:
delattr(type(obj), method)
yield
finally:
setattr(type(obj), method, method_obj)
class MockS3Client:
def __init__(self, enable_mc=True):
self.enable_mc = enable_mc
def Get(self, filepath):
with open(filepath, 'rb') as f:
content = f.read()
return content
class MockPetrelClient:
def __init__(self, enable_mc=True, enable_multi_cluster=False):
self.enable_mc = enable_mc
self.enable_multi_cluster = enable_multi_cluster
def Get(self, filepath):
with open(filepath, 'rb') as f:
content = f.read()
return content
def put(self):
pass
def delete(self):
pass
def contains(self):
pass
def isdir(self):
pass
def list(self, dir_path):
for entry in os.scandir(dir_path):
if not entry.name.startswith('.') and entry.is_file():
yield entry.name
elif osp.isdir(entry.path):
yield entry.name + '/'
class MockMemcachedClient:
def __init__(self, server_list_cfg, client_cfg):
pass
def Get(self, filepath, buffer):
with open(filepath, 'rb') as f:
buffer.content = f.read()
class TestFileClient:
@classmethod
def setup_class(cls):
cls.test_data_dir = Path(__file__).parent / 'data'
cls.img_path = cls.test_data_dir / 'color.jpg'
cls.img_shape = (300, 400, 3)
cls.text_path = cls.test_data_dir / 'filelist.txt'
def test_error(self):
with pytest.raises(ValueError):
FileClient('hadoop')
def test_disk_backend(self):
disk_backend = FileClient('disk')
# test `name` attribute
assert disk_backend.name == 'HardDiskBackend'
# test `allow_symlink` attribute
assert disk_backend.allow_symlink
# test `get`
# input path is Path object
img_bytes = disk_backend.get(self.img_path)
img = mmcv.imfrombytes(img_bytes)
assert self.img_path.open('rb').read() == img_bytes
assert img.shape == self.img_shape
# input path is str
img_bytes = disk_backend.get(str(self.img_path))
img = mmcv.imfrombytes(img_bytes)
assert self.img_path.open('rb').read() == img_bytes
assert img.shape == self.img_shape
# test `get_text`
# input path is Path object
value_buf = disk_backend.get_text(self.text_path)
assert self.text_path.open('r').read() == value_buf
# input path is str
value_buf = disk_backend.get_text(str(self.text_path))
assert self.text_path.open('r').read() == value_buf
with tempfile.TemporaryDirectory() as tmp_dir:
# test `put`
filepath1 = Path(tmp_dir) / 'test.jpg'
disk_backend.put(b'disk', filepath1)
assert filepath1.open('rb').read() == b'disk'
# test the `mkdir_or_exist` behavior in `put`
_filepath1 = Path(tmp_dir) / 'not_existed_dir1' / 'test.jpg'
disk_backend.put(b'disk', _filepath1)
assert _filepath1.open('rb').read() == b'disk'
# test `put_text`
filepath2 = Path(tmp_dir) / 'test.txt'
disk_backend.put_text('disk', filepath2)
assert filepath2.open('r').read() == 'disk'
# test the `mkdir_or_exist` behavior in `put_text`
_filepath2 = Path(tmp_dir) / 'not_existed_dir2' / 'test.txt'
disk_backend.put_text('disk', _filepath2)
assert _filepath2.open('r').read() == 'disk'
# test `isfile`
assert disk_backend.isfile(filepath2)
assert not disk_backend.isfile(Path(tmp_dir) / 'not/existed/path')
# test `remove`
disk_backend.remove(filepath2)
# test `exists`
assert not disk_backend.exists(filepath2)
# test `get_local_path`
# if the backend is disk, `get_local_path` just return the input
with disk_backend.get_local_path(filepath1) as path:
assert str(filepath1) == path
assert osp.isfile(filepath1)
# test `join_path`
disk_dir = '/path/of/your/directory'
assert disk_backend.join_path(disk_dir, 'file') == \
osp.join(disk_dir, 'file')
assert disk_backend.join_path(disk_dir, 'dir', 'file') == \
osp.join(disk_dir, 'dir', 'file')
# test `list_dir_or_file`
with build_temporary_directory() as tmp_dir:
# 1. list directories and files
assert set(disk_backend.list_dir_or_file(tmp_dir)) == {
'dir1', 'dir2', 'text1.txt', 'text2.txt'
}
# 2. list directories and files recursively
assert set(disk_backend.list_dir_or_file(
tmp_dir, recursive=True)) == {
'dir1',
osp.join('dir1', 'text3.txt'), 'dir2',
osp.join('dir2', 'dir3'),
osp.join('dir2', 'dir3', 'text4.txt'),
osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt'
}
# 3. only list directories
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_file=False)) == {'dir1', 'dir2'}
with pytest.raises(
TypeError,
match='`suffix` should be None when `list_dir` is True'):
# Exception is raised among the `list_dir_or_file` of client,
# so we need to invode the client to trigger the exception
disk_backend.client.list_dir_or_file(
tmp_dir, list_file=False, suffix='.txt')
# 4. only list directories recursively
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_file=False, recursive=True)) == {
'dir1', 'dir2',
osp.join('dir2', 'dir3')
}
# 5. only list files
assert set(disk_backend.list_dir_or_file(
tmp_dir, list_dir=False)) == {'text1.txt', 'text2.txt'}
# 6. only list files recursively
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_dir=False, recursive=True)) == {
osp.join('dir1', 'text3.txt'),
osp.join('dir2', 'dir3', 'text4.txt'),
osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt'
}
# 7. only list files ending with suffix
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_dir=False,
suffix='.txt')) == {'text1.txt', 'text2.txt'}
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_dir=False,
suffix=('.txt', '.jpg'))) == {'text1.txt', 'text2.txt'}
with pytest.raises(
TypeError,
match='`suffix` must be a string or tuple of strings'):
disk_backend.client.list_dir_or_file(
tmp_dir, list_dir=False, suffix=['.txt', '.jpg'])
# 8. only list files ending with suffix recursively
assert set(
disk_backend.list_dir_or_file(
tmp_dir, list_dir=False, suffix='.txt',
recursive=True)) == {
osp.join('dir1', 'text3.txt'),
osp.join('dir2', 'dir3', 'text4.txt'), 'text1.txt',
'text2.txt'
}
# 7. only list files ending with suffix
assert set(
disk_backend.list_dir_or_file(
tmp_dir,
list_dir=False,
suffix=('.txt', '.jpg'),
recursive=True)) == {
osp.join('dir1', 'text3.txt'),
osp.join('dir2', 'dir3', 'text4.txt'),
osp.join('dir2', 'img.jpg'), 'text1.txt', 'text2.txt'
}
@patch('ceph.S3Client', MockS3Client)
def test_ceph_backend(self):
ceph_backend = FileClient('ceph')
# test `allow_symlink` attribute
assert not ceph_backend.allow_symlink
# input path is Path object
with pytest.raises(NotImplementedError):
ceph_backend.get_text(self.text_path)
# input path is str
with pytest.raises(NotImplementedError):
ceph_backend.get_text(str(self.text_path))
# input path is Path object
img_bytes = ceph_backend.get(self.img_path)
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# input path is str
img_bytes = ceph_backend.get(str(self.img_path))
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# `path_mapping` is either None or dict
with pytest.raises(AssertionError):
FileClient('ceph', path_mapping=1)
# test `path_mapping`
ceph_path = 's3://user/data'
ceph_backend = FileClient(
'ceph', path_mapping={str(self.test_data_dir): ceph_path})
ceph_backend.client._client.Get = MagicMock(
return_value=ceph_backend.client._client.Get(self.img_path))
img_bytes = ceph_backend.get(self.img_path)
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
ceph_backend.client._client.Get.assert_called_with(
str(self.img_path).replace(str(self.test_data_dir), ceph_path))
@patch('petrel_client.client.Client', MockPetrelClient)
@pytest.mark.parametrize('backend,prefix', [('petrel', None),
(None, 's3')])
def test_petrel_backend(self, backend, prefix):
petrel_backend = FileClient(backend=backend, prefix=prefix)
# test `allow_symlink` attribute
assert not petrel_backend.allow_symlink
# input path is Path object
img_bytes = petrel_backend.get(self.img_path)
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# input path is str
img_bytes = petrel_backend.get(str(self.img_path))
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# `path_mapping` is either None or dict
with pytest.raises(AssertionError):
FileClient('petrel', path_mapping=1)
# test `_map_path`
petrel_dir = 's3://user/data'
petrel_backend = FileClient(
'petrel', path_mapping={str(self.test_data_dir): petrel_dir})
assert petrel_backend.client._map_path(str(self.img_path)) == \
str(self.img_path).replace(str(self.test_data_dir), petrel_dir)
petrel_path = f'{petrel_dir}/test.jpg'
petrel_backend = FileClient('petrel')
# test `_format_path`
assert petrel_backend.client._format_path('s3://user\\data\\test.jpg')\
== petrel_path
# test `get`
with patch.object(
petrel_backend.client._client, 'Get',
return_value=b'petrel') as mock_get:
assert petrel_backend.get(petrel_path) == b'petrel'
mock_get.assert_called_once_with(petrel_path)
# test `get_text`
with patch.object(
petrel_backend.client._client, 'Get',
return_value=b'petrel') as mock_get:
assert petrel_backend.get_text(petrel_path) == 'petrel'
mock_get.assert_called_once_with(petrel_path)
# test `put`
with patch.object(petrel_backend.client._client, 'put') as mock_put:
petrel_backend.put(b'petrel', petrel_path)
mock_put.assert_called_once_with(petrel_path, b'petrel')
# test `put_text`
with patch.object(petrel_backend.client._client, 'put') as mock_put:
petrel_backend.put_text('petrel', petrel_path)
mock_put.assert_called_once_with(petrel_path, b'petrel')
# test `remove`
assert has_method(petrel_backend.client._client, 'delete')
# raise Exception if `delete` is not implemented
with delete_and_reset_method(petrel_backend.client._client, 'delete'):
assert not has_method(petrel_backend.client._client, 'delete')
with pytest.raises(NotImplementedError):
petrel_backend.remove(petrel_path)
with patch.object(petrel_backend.client._client,
'delete') as mock_delete:
petrel_backend.remove(petrel_path)
mock_delete.assert_called_once_with(petrel_path)
# test `exists`
assert has_method(petrel_backend.client._client, 'contains')
assert has_method(petrel_backend.client._client, 'isdir')
# raise Exception if `delete` is not implemented
with delete_and_reset_method(petrel_backend.client._client,
'contains'), delete_and_reset_method(
petrel_backend.client._client,
'isdir'):
assert not has_method(petrel_backend.client._client, 'contains')
assert not has_method(petrel_backend.client._client, 'isdir')
with pytest.raises(NotImplementedError):
petrel_backend.exists(petrel_path)
with patch.object(
petrel_backend.client._client, 'contains',
return_value=True) as mock_contains:
assert petrel_backend.exists(petrel_path)
mock_contains.assert_called_once_with(petrel_path)
# test `isdir`
assert has_method(petrel_backend.client._client, 'isdir')
with delete_and_reset_method(petrel_backend.client._client, 'isdir'):
assert not has_method(petrel_backend.client._client, 'isdir')
with pytest.raises(NotImplementedError):
petrel_backend.isdir(petrel_path)
with patch.object(
petrel_backend.client._client, 'isdir',
return_value=True) as mock_isdir:
assert petrel_backend.isdir(petrel_dir)
mock_isdir.assert_called_once_with(petrel_dir)
# test `isfile`
assert has_method(petrel_backend.client._client, 'contains')
with delete_and_reset_method(petrel_backend.client._client,
'contains'):
assert not has_method(petrel_backend.client._client, 'contains')
with pytest.raises(NotImplementedError):
petrel_backend.isfile(petrel_path)
with patch.object(
petrel_backend.client._client, 'contains',
return_value=True) as mock_contains:
assert petrel_backend.isfile(petrel_path)
mock_contains.assert_called_once_with(petrel_path)
# test `join_path`
assert petrel_backend.join_path(petrel_dir, 'file') == \
f'{petrel_dir}/file'
assert petrel_backend.join_path(f'{petrel_dir}/', 'file') == \
f'{petrel_dir}/file'
assert petrel_backend.join_path(petrel_dir, 'dir', 'file') == \
f'{petrel_dir}/dir/file'
# test `get_local_path`
with patch.object(petrel_backend.client._client, 'Get',
return_value=b'petrel') as mock_get, \
patch.object(petrel_backend.client._client, 'contains',
return_value=True) as mock_contains:
with petrel_backend.get_local_path(petrel_path) as path:
assert Path(path).open('rb').read() == b'petrel'
# exist the with block and path will be released
assert not osp.isfile(path)
mock_get.assert_called_once_with(petrel_path)
mock_contains.assert_called_once_with(petrel_path)
# test `list_dir_or_file`
assert has_method(petrel_backend.client._client, 'list')
with delete_and_reset_method(petrel_backend.client._client, 'list'):
assert not has_method(petrel_backend.client._client, 'list')
with pytest.raises(NotImplementedError):
list(petrel_backend.list_dir_or_file(petrel_dir))
with build_temporary_directory() as tmp_dir:
# 1. list directories and files
assert set(petrel_backend.list_dir_or_file(tmp_dir)) == {
'dir1', 'dir2', 'text1.txt', 'text2.txt'
}
# 2. list directories and files recursively
assert set(
petrel_backend.list_dir_or_file(tmp_dir, recursive=True)) == {
'dir1', '/'.join(('dir1', 'text3.txt')), 'dir2', '/'.join(
('dir2', 'dir3')), '/'.join(
('dir2', 'dir3', 'text4.txt')), '/'.join(
('dir2', 'img.jpg')), 'text1.txt', 'text2.txt'
}
# 3. only list directories
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_file=False)) == {'dir1', 'dir2'}
with pytest.raises(
TypeError,
match=('`list_dir` should be False when `suffix` is not '
'None')):
# Exception is raised among the `list_dir_or_file` of client,
# so we need to invode the client to trigger the exception
petrel_backend.client.list_dir_or_file(
tmp_dir, list_file=False, suffix='.txt')
# 4. only list directories recursively
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_file=False, recursive=True)) == {
'dir1', 'dir2', '/'.join(('dir2', 'dir3'))
}
# 5. only list files
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_dir=False)) == {'text1.txt', 'text2.txt'}
# 6. only list files recursively
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_dir=False, recursive=True)) == {
'/'.join(('dir1', 'text3.txt')), '/'.join(
('dir2', 'dir3', 'text4.txt')), '/'.join(
('dir2', 'img.jpg')), 'text1.txt', 'text2.txt'
}
# 7. only list files ending with suffix
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_dir=False,
suffix='.txt')) == {'text1.txt', 'text2.txt'}
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_dir=False,
suffix=('.txt', '.jpg'))) == {'text1.txt', 'text2.txt'}
with pytest.raises(
TypeError,
match='`suffix` must be a string or tuple of strings'):
petrel_backend.client.list_dir_or_file(
tmp_dir, list_dir=False, suffix=['.txt', '.jpg'])
# 8. only list files ending with suffix recursively
assert set(
petrel_backend.list_dir_or_file(
tmp_dir, list_dir=False, suffix='.txt',
recursive=True)) == {
'/'.join(('dir1', 'text3.txt')), '/'.join(
('dir2', 'dir3', 'text4.txt')), 'text1.txt',
'text2.txt'
}
# 7. only list files ending with suffix
assert set(
petrel_backend.list_dir_or_file(
tmp_dir,
list_dir=False,
suffix=('.txt', '.jpg'),
recursive=True)) == {
'/'.join(('dir1', 'text3.txt')), '/'.join(
('dir2', 'dir3', 'text4.txt')), '/'.join(
('dir2', 'img.jpg')), 'text1.txt', 'text2.txt'
}
@patch('mc.MemcachedClient.GetInstance', MockMemcachedClient)
@patch('mc.pyvector', MagicMock)
@patch('mc.ConvertBuffer', lambda x: x.content)
def test_memcached_backend(self):
mc_cfg = dict(server_list_cfg='', client_cfg='', sys_path=None)
mc_backend = FileClient('memcached', **mc_cfg)
# test `allow_symlink` attribute
assert not mc_backend.allow_symlink
# input path is Path object
with pytest.raises(NotImplementedError):
mc_backend.get_text(self.text_path)
# input path is str
with pytest.raises(NotImplementedError):
mc_backend.get_text(str(self.text_path))
# input path is Path object
img_bytes = mc_backend.get(self.img_path)
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# input path is str
img_bytes = mc_backend.get(str(self.img_path))
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
def test_lmdb_backend(self):
lmdb_path = self.test_data_dir / 'demo.lmdb'
# db_path is Path object
lmdb_backend = FileClient('lmdb', db_path=lmdb_path)
# test `allow_symlink` attribute
assert not lmdb_backend.allow_symlink
with pytest.raises(NotImplementedError):
lmdb_backend.get_text(self.text_path)
img_bytes = lmdb_backend.get('baboon')
img = mmcv.imfrombytes(img_bytes)
assert img.shape == (120, 125, 3)
# db_path is str
lmdb_backend = FileClient('lmdb', db_path=str(lmdb_path))
with pytest.raises(NotImplementedError):
lmdb_backend.get_text(str(self.text_path))
img_bytes = lmdb_backend.get('baboon')
img = mmcv.imfrombytes(img_bytes)
assert img.shape == (120, 125, 3)
@pytest.mark.parametrize('backend,prefix', [('http', None),
(None, 'http')])
def test_http_backend(self, backend, prefix):
http_backend = FileClient(backend=backend, prefix=prefix)
img_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \
'master/tests/data/color.jpg'
text_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \
'master/tests/data/filelist.txt'
# test `allow_symlink` attribute
assert not http_backend.allow_symlink
# input is path or Path object
with pytest.raises(Exception):
http_backend.get(self.img_path)
with pytest.raises(Exception):
http_backend.get(str(self.img_path))
with pytest.raises(Exception):
http_backend.get_text(self.text_path)
with pytest.raises(Exception):
http_backend.get_text(str(self.text_path))
# input url is http image
img_bytes = http_backend.get(img_url)
img = mmcv.imfrombytes(img_bytes)
assert img.shape == self.img_shape
# input url is http text
value_buf = http_backend.get_text(text_url)
assert self.text_path.open('r').read() == value_buf
# test `_get_local_path`
# exist the with block and path will be released
with http_backend.get_local_path(img_url) as path:
assert mmcv.imread(path).shape == self.img_shape
assert not osp.isfile(path)
def test_new_magic_method(self):
class DummyBackend1(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath, encoding='utf-8'):
return filepath
FileClient.register_backend('dummy_backend', DummyBackend1)
client1 = FileClient(backend='dummy_backend')
client2 = FileClient(backend='dummy_backend')
assert client1 is client2
# if a backend is overwrote, it will disable the singleton pattern for
# the backend
class DummyBackend2(BaseStorageBackend):
def get(self, filepath):
pass
def get_text(self, filepath):
pass
FileClient.register_backend('dummy_backend', DummyBackend2, force=True)
client3 = FileClient(backend='dummy_backend')
client4 = FileClient(backend='dummy_backend')
assert client2 is not client3
assert client3 is client4
def test_parse_uri_prefix(self):
# input path is None
with pytest.raises(AssertionError):
FileClient.parse_uri_prefix(None)
# input path is list
with pytest.raises(AssertionError):
FileClient.parse_uri_prefix([])
# input path is Path object
assert FileClient.parse_uri_prefix(self.img_path) is None
# input path is str
assert FileClient.parse_uri_prefix(str(self.img_path)) is None
# input path starts with https
img_url = 'https://raw.githubusercontent.com/open-mmlab/mmcv/' \
'master/tests/data/color.jpg'
assert FileClient.parse_uri_prefix(img_url) == 'https'
# input path starts with s3
img_url = 's3://your_bucket/img.png'
assert FileClient.parse_uri_prefix(img_url) == 's3'
# input path starts with clusterName:s3
img_url = 'clusterName:s3://your_bucket/img.png'
assert FileClient.parse_uri_prefix(img_url) == 's3'
def test_infer_client(self):
# HardDiskBackend
file_client_args = {'backend': 'disk'}
client = FileClient.infer_client(file_client_args)
assert client.name == 'HardDiskBackend'
client = FileClient.infer_client(uri=self.img_path)
assert client.name == 'HardDiskBackend'
# PetrelBackend
file_client_args = {'backend': 'petrel'}
client = FileClient.infer_client(file_client_args)
assert client.name == 'PetrelBackend'
uri = 's3://user_data'
client = FileClient.infer_client(uri=uri)
assert client.name == 'PetrelBackend'
def test_register_backend(self):
# name must be a string
with pytest.raises(TypeError):
class TestClass1:
pass
FileClient.register_backend(1, TestClass1)
# module must be a class
with pytest.raises(TypeError):
FileClient.register_backend('int', 0)
# module must be a subclass of BaseStorageBackend
with pytest.raises(TypeError):
class TestClass1:
pass
FileClient.register_backend('TestClass1', TestClass1)
class ExampleBackend(BaseStorageBackend):
def get(self, filepath):
return filepath
def get_text(self, filepath, encoding='utf-8'):
return filepath
FileClient.register_backend('example', ExampleBackend)
example_backend = FileClient('example')
assert example_backend.get(self.img_path) == self.img_path
assert example_backend.get_text(self.text_path) == self.text_path
assert 'example' in FileClient._backends
class Example2Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes2'
def get_text(self, filepath, encoding='utf-8'):
return 'text2'
# force=False
with pytest.raises(KeyError):
FileClient.register_backend('example', Example2Backend)
FileClient.register_backend('example', Example2Backend, force=True)
example_backend = FileClient('example')
assert example_backend.get(self.img_path) == b'bytes2'
assert example_backend.get_text(self.text_path) == 'text2'
@FileClient.register_backend(name='example3')
class Example3Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes3'
def get_text(self, filepath, encoding='utf-8'):
return 'text3'
example_backend = FileClient('example3')
assert example_backend.get(self.img_path) == b'bytes3'
assert example_backend.get_text(self.text_path) == 'text3'
assert 'example3' in FileClient._backends
# force=False
with pytest.raises(KeyError):
@FileClient.register_backend(name='example3')
class Example4Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes4'
def get_text(self, filepath, encoding='utf-8'):
return 'text4'
@FileClient.register_backend(name='example3', force=True)
class Example5Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes5'
def get_text(self, filepath, encoding='utf-8'):
return 'text5'
example_backend = FileClient('example3')
assert example_backend.get(self.img_path) == b'bytes5'
assert example_backend.get_text(self.text_path) == 'text5'
# prefixes is a str
class Example6Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes6'
def get_text(self, filepath, encoding='utf-8'):
return 'text6'
FileClient.register_backend(
'example4',
Example6Backend,
force=True,
prefixes='example4_prefix')
example_backend = FileClient('example4')
assert example_backend.get(self.img_path) == b'bytes6'
assert example_backend.get_text(self.text_path) == 'text6'
example_backend = FileClient(prefix='example4_prefix')
assert example_backend.get(self.img_path) == b'bytes6'
assert example_backend.get_text(self.text_path) == 'text6'
example_backend = FileClient('example4', prefix='example4_prefix')
assert example_backend.get(self.img_path) == b'bytes6'
assert example_backend.get_text(self.text_path) == 'text6'
# prefixes is a list of str
class Example7Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes7'
def get_text(self, filepath, encoding='utf-8'):
return 'text7'
FileClient.register_backend(
'example5',
Example7Backend,
force=True,
prefixes=['example5_prefix1', 'example5_prefix2'])
example_backend = FileClient('example5')
assert example_backend.get(self.img_path) == b'bytes7'
assert example_backend.get_text(self.text_path) == 'text7'
example_backend = FileClient(prefix='example5_prefix1')
assert example_backend.get(self.img_path) == b'bytes7'
assert example_backend.get_text(self.text_path) == 'text7'
example_backend = FileClient(prefix='example5_prefix2')
assert example_backend.get(self.img_path) == b'bytes7'
assert example_backend.get_text(self.text_path) == 'text7'
# backend has a higher priority than prefixes
class Example8Backend(BaseStorageBackend):
def get(self, filepath):
return b'bytes8'
def get_text(self, filepath, encoding='utf-8'):
return 'text8'
FileClient.register_backend(
'example6',
Example8Backend,
force=True,
prefixes='example6_prefix')
example_backend = FileClient('example6')
assert example_backend.get(self.img_path) == b'bytes8'
assert example_backend.get_text(self.text_path) == 'text8'
example_backend = FileClient('example6', prefix='example4_prefix')
assert example_backend.get(self.img_path) == b'bytes8'
assert example_backend.get_text(self.text_path) == 'text8'