aicheckv2-api/utils/huawei_obs.py

60 lines
1.8 KiB
Python
Raw Permalink Normal View History

2025-04-11 14:30:48 +08:00
from application.settings import HUAWEI_OBS
from obs import ObsClient
from obs import DeleteObjectsRequest
from obs import Object
class MyObs:
2025-04-11 14:30:48 +08:00
def __init__(self):
2025-04-11 14:30:48 +08:00
self.obsClient = ObsClient(
access_key_id=HUAWEI_OBS['AccessKeyID'],
secret_access_key=HUAWEI_OBS['SecretAccessKey'],
server=HUAWEI_OBS["server"])
def put_file(self, object_key: str, file_path: str):
resp = self.obsClient.putFile(
2025-04-11 14:30:48 +08:00
bucketName=HUAWEI_OBS["bucketName"],
objectKey=object_key,
2025-04-11 14:30:48 +08:00
file_path=file_path)
if resp.status < 300:
print("objectKey", object_key)
2025-04-11 14:30:48 +08:00
print("url", resp.body.objectUrl)
return True, object_key, resp.body.objectUrl
2025-04-11 14:30:48 +08:00
else:
return False, None, None
def put_object(self, objectKey: str, file_content):
resp = self.obsClient.put_object(
2025-04-11 14:30:48 +08:00
bucketName=HUAWEI_OBS["bucketName"],
objectKey=objectKey,
file_cotent=file_content)
if resp.status < 300:
print("objectKey", objectKey)
print("url", resp.body.objectUrl)
return True, objectKey, resp.body.objectUrl
else:
return False, None, None
def del_objects(self, object_keys: []):
2025-04-11 14:30:48 +08:00
objects = []
for object_key in object_keys:
object_one = Object(key=object_key, versionId=None)
objects.append(object_one)
encoding_type = 'url'
resp = self.obsClient.deleteObjects(
2025-04-11 14:30:48 +08:00
bucketName=HUAWEI_OBS["bucketName"],
deleteObjectsRequest=DeleteObjectsRequest(quiet=False, objects=objects, encoding_type=encoding_type))
if resp.status < 300:
return True
else:
return False
async def close(self):
await self.obsClient.close()