Commit 1204ec96 authored by Clark Lin's avatar Clark Lin
Browse files

committed currnet version of fastapi-for-apex from prod

parents
{
"logging": {
"log_level": "INFO",
"log_dir": "./logs",
"filename_template": "%Y%m%d.log"
}
}
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: logging_manager.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-03-18 Clark Lin Initial version
#
# Main features summary:
# - Feature 1
# - Feature 2
# - Feature 3
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
import json
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import datetime
class LoggingManager:
_instance = None
@staticmethod
def get_instance(config_rel_path='../config/logging.json'):
"""Get the singleton instance of LoggingManager."""
if LoggingManager._instance is None:
LoggingManager._instance = LoggingManager(config_rel_path)
return LoggingManager._instance
def __init__(self, config_rel_path='../config/logging.json'):
"""
Initialize the LoggingManager with the relative path to the configuration file.
By default, the configuration file is assumed to be located at ../config/logging.json.
Args:
config_rel_path (str, optional): Relative path to the JSON configuration file.
Defaults to '../config/logging.json'.
"""
if self._instance is not None:
raise RuntimeError("LoggingManager is a singleton class. Use get_instance() instead of directly instantiating.")
script_dir = os.path.dirname(os.path.abspath(__file__))
config_absolute_path = os.path.join(script_dir, config_rel_path)
self.config = self.load_config(config_absolute_path)
self.logger = self.setup_logger()
def load_config(self, config_path):
"""
Load logging configuration from the provided JSON file.
Args:
config_path (str): Absolute path to the JSON configuration file.
Returns:
dict: The loaded logging configuration.
"""
with open(config_path, 'r') as f:
config_data = json.load(f)
return config_data.get('logging', {})
def setup_logger(self):
"""
Setup the logger with the configuration settings.
Returns:
logging.Logger: A configured logger instance.
"""
log_level = self.config.get('log_level', logging.INFO)
log_dir = self.config.get('log_dir', './logs')
os.makedirs(log_dir, exist_ok=True)
# Use today's date to create a new file name each day
today_date = datetime.datetime.today().strftime('%Y%m%d')
filename = f"{today_date}.log"
# Construct the full path for the log file
log_file_path = os.path.join(log_dir, filename)
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
# Create a file handler specifically for today's log file
file_handler = logging.FileHandler(log_file_path)
file_handler.setLevel(log_level)
# Create and set a log formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def log(self, log_level, module, message, *args, **kwargs):
"""
Log a message at the specified log level.
Args:
log_level (int): One of the logging levels (e.g., logging.INFO).
message (str): The log message to be recorded.
*args, **kwargs: Additional arguments passed to the underlying logging function.
Raises:
ValueError: If the given log_level is invalid.
"""
if log_level not in [
logging.NOTSET,
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL,
]:
raise ValueError(f'Invalid log level: {log_level}')
# Init message
message = module + ' ' + message
# Append additional content
arguments = list()
for arg in args:
arguments.append(arg)
add_info = ','.join(arguments)
if len(add_info) != 0:
message += '(' + add_info + ')'
else:
pass
# self.logger.log(log_level, message, *args, **kwargs)
self.logger.log(log_level, message)
# Example usage
if __name__ == "__main__":
lm = LoggingManager()
lm.log(logging.INFO, 'This is an info message.')
lm.log(logging.ERROR, 'An error has occurred.')
# For dynamic log level change, you could add a method like this:
# def set_log_level(self, new_level):
# self.logger.setLevel(new_level)
# And then call it:
# lm.set_log_level(logging.DEBUG)
{
"oss": {
"<qcloud oss bucket name>": {
"oss-program": {
"secret_id": "<qcloud oss secret_id>",
"secret_key": "<qcloud oss secret_key>",
"description": "object-storage-all"
}
}
},
"ocr": {
"ocr-program": {
"secret_id": "<qcloud ocr secret_id>",
"secret_key": "<qcloud ocr secret_key>",
"description": "文字识别 (ocr)"
}
},
"asr": {
"asr-program": {
"app_id": "<qcloud ASR app id>",
"secret_id": "<qcloud ASR secret_id>",
"secret_key": "<qcloud ASR secret_key>",
"description": "语音识别 (asr)"
}
},
"paddle": {
"aliyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>"
},
"ctyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>"
}
},
"whisper": {
"aliyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>"
},
"ctyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>"
}
},
"gen-ai": {
"aliyun":{
"api_key": "<Gen AI api_key>",
"description": "Tongyi"
}
}
}
# Import FastAPI Libs
from pydantic import BaseModel
# Import Other Common Libs
import json
import base64
import os
import requests
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
credential_file = '/home/oracle/python/apex/credential/credential.json'
# env = 'aliyun_ecs'
env = 'ctyun_ecs'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class ocr_req_basemodel(BaseModel):
ocr_tool: str # Q: Qcloud, LP: Local PaddleOCR
dir: str
file_name: str
class ocr_res_basemodel(BaseModel):
result: int
result_message: str
detected_text: list
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, env: str) -> tuple:
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
base_url = data['paddle'][env]['base_url']
client_id = data['paddle'][env]['client_id']
client_secret = data['paddle'][env]['client_secret']
return base_url, client_id, client_secret
except Exception as e:
print(e)
return None, None, None
# ------------------------------------------------
# Function of OCR request to Paddle OCR
# ------------------------------------------------
def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple:
ret_code = c_ret_code_success
err_msg = ""
# ------------------------------------------------
# Get URL and credential
# ------------------------------------------------
base_url, client_id, client_secret = get_credentials(credential_file, env);
if base_url == None or client_id == None or client_secret == None:
ret_code = c_ret_code_error
err_msg = "Failed to get credential from file [" + credential_file + "] and env [" + env + "]"
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
# ------------------------------------------------
# Get Token
# ------------------------------------------------
url = base_url + "/token"
access_token = ""
headers = {
"Authorization": "Bearer " + access_token,
"Content-type": "application/x-www-form-urlencoded"
}
req_body = "grant_type=&username={0}&password={1}".format(client_id, client_secret)
try:
response = requests.post(
url = url,
headers = headers,
data = req_body,
verify = False)
response.raise_for_status
response_dict = response.json()
print(response.text)
access_token = response_dict["access_token"]
print(access_token)
except requests.exceptions.HTTPError as httpError:
print("httpError")
print(httpError)
ret_code = c_ret_code_error
err_msg = "HTTP Error: Failed to get access token: " + str(httpError)
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
except Exception as err:
print("err")
print(err)
ret_code = c_ret_code_error
err_msg = "Unexpected Error: Failed to get access token: " + str(err)
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
# ------------------------------------------------
# Get Detected Text
# ------------------------------------------------
url = base_url + "/paddleocr/read-image"
# 打开文件
with open(os.path.normpath(os.path.join(ocr_req.dir, ocr_req.file_name)), "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
# print(encoded_string)
headers = {
"accept": "application/json",
"Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}
data = {
"image_b64_string": encoded_string
}
json_data = json.dumps(data)
try:
response = requests.post(
url = url,
headers = headers,
data = json_data,
verify = False)
response.raise_for_status
response_dict = response.json()
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = response_dict
)
return ocr_res
except requests.exceptions.HTTPError as httpError:
ret_code = c_ret_code_error
err_msg = "HTTP Error: Failed to get detected text: " + str(httpError)
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
except Exception as err:
ret_code = c_ret_code_error
if (response.text):
err_msg = "Unexpected Error: Failed to get detected text: " + str(err) + ", " + response.text
else:
err_msg = "Unexpected Error: Failed to get detected text: " + str(err)
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
# Import FastAPI Libs
from pydantic import BaseModel
# Import Other Common Libs
import json
import base64
import os
import requests
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
credential_file = '/home/oracle/python/apex/credential/credential.json'
# env = 'aliyun_ecs'
env = 'ctyun_ecs'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class asr_req_basemodel(BaseModel):
asr_tool: str # W: Whisper
model: str
audio_b64_string: str
initial_prompt: str
class asr_res_basemodel(BaseModel):
result: int
result_message: str
detected_text: str
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, env: str) -> tuple:
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
base_url = data['whisper'][env]['base_url']
client_id = data['whisper'][env]['client_id']
client_secret = data['whisper'][env]['client_secret']
return base_url, client_id, client_secret
except Exception as e:
print(e)
return None, None, None
# ------------------------------------------------
# Function of ASR request to Whisper ASR
# ------------------------------------------------
def get_detected_text(asr_req: asr_req_basemodel) -> tuple:
# print(asr_req)
ret_code = c_ret_code_success
err_msg = ""
# ------------------------------------------------
# Get URL and credential
# ------------------------------------------------
base_url, client_id, client_secret = get_credentials(credential_file, env);
if base_url == None or client_id == None or client_secret == None:
ret_code = c_ret_code_error
err_msg = "Failed to get credential from file [" + credential_file + "] and env [" + env + "]"
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = ""
)
return asr_res
# ------------------------------------------------
# Get Token
# ------------------------------------------------
url = base_url + "/token"
access_token = ""
headers = {
"Authorization": "Bearer " + access_token,
"Content-type": "application/x-www-form-urlencoded"
}
req_body = "grant_type=&username={0}&password={1}".format(client_id, client_secret)
try:
response = requests.post(
url = url,
headers = headers,
data = req_body,
verify = False)
response.raise_for_status
response_dict = response.json()
# print(response.text)
access_token = response_dict["access_token"]
# print(access_token)
except requests.exceptions.HTTPError as httpError:
print("httpError")
print(httpError)
ret_code = c_ret_code_error
err_msg = "HTTP Error: Failed to get access token: " + str(httpError)
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = ""
)
return asr_res
except Exception as err:
print("err")
print(err)
ret_code = c_ret_code_error
err_msg = "Unexpected Error: Failed to get access token: " + str(err)
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = ""
)
return asr_res
# ------------------------------------------------
# Get Detected Text
# ------------------------------------------------
url = base_url + "/whisper/read-audio"
headers = {
"accept": "application/json",
"Authorization": "Bearer " + access_token,
"Content-Type": "application/json"
}
data = {
"model": asr_req.model,
"audio_b64_string": asr_req.audio_b64_string,
"initial_prompt": asr_req.initial_prompt
}
json_data = json.dumps(data)
try:
response = requests.post(
url = url,
headers = headers,
data = json_data,
verify = False)
response.raise_for_status
response_dict = response.json()
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = response_dict["text"]
)
return asr_res
except requests.exceptions.HTTPError as httpError:
ret_code = c_ret_code_error
err_msg = "HTTP Error: Failed to get detected text: " + str(httpError)
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = ""
)
return asr_res
except Exception as err:
ret_code = c_ret_code_error
if (response.text):
err_msg = "Unexpected Error: Failed to get detected text: " + str(err) + ", " + response.text
else:
err_msg = "Unexpected Error: Failed to get detected text: " + str(err)
asr_res = asr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = ""
)
return asr_res
# Import FastAPI Libs
from pydantic import BaseModel
# Import Qcloud OCR Service Libs
import sys
# sys.path.append("../qcloud")
from qcloud.common import credential
from qcloud.asr import flash_recognizer
# Common log part
import os
import logging
from common.script.logging_manager import LoggingManager
curr_module = os.path.basename(__file__) # Initialize logging manager
lm = LoggingManager.get_instance() # lm = LoggingManager()
# Import Other Common Libs
import json
import base64
import os
import random
import string
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
user_name = 'asr-program'
credential_file = 'credential/credential.json'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class asr_req_basemodel(BaseModel):
asr_tool: str # W: Whisper; Q: Qcloud
model: str
audio_b64_string: str
initial_prompt: str
class asr_res_basemodel(BaseModel):
result: int
result_message: str
detected_text: str
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, user_name: str) -> tuple:
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
app_id = data['asr'][user_name]['app_id']
secret_id = data['asr'][user_name]['secret_id']
secret_key = data['asr'][user_name]['secret_key']
return app_id, secret_id, secret_key
except Exception as e:
lm.log(logging.INFO, curr_module, str(e))
return None, None, None
# ------------------------------------------------
# Function of ASR request to Qcloud COS
# ------------------------------------------------
def get_detected_text(asr_req: asr_req_basemodel) -> tuple:
lm.log(logging.INFO, curr_module, "qcloud get_detected_text start")
# 初始化处理结果
ret_code = c_ret_code_success
err_msg = ""
# 初始化ID
app_id = ""
secret_id = ""
secret_key = ""
engine_type = "16k_en"
# 初始化返回对象
asr_resp = asr_res_basemodel(
result = c_ret_code_success,
result_message = '',
detected_text = ''
)
try:
app_id, secret_id, secret_key = get_credentials(credential_file, user_name)
cred = credential.Credential(secret_id, secret_key)
if (app_id == None or secret_id == None or secret_key == None):
lm.log(logging.ERROR, curr_module, 'Failed to get credential')
asr_resp.result = c_ret_code_error
asr_resp.detected_text = ''
asr_resp.result_message = 'Failed to get credential'
return asr_resp
# 新建FlashRecognizer,一个recognizer可以执行N次识别请求
recognizer = flash_recognizer.FlashRecognizer(app_id, cred)
# 新建识别请求
req = flash_recognizer.FlashRecognitionRequest(engine_type)
req.set_filter_modal(0)
req.set_filter_punc(0)
req.set_filter_dirty(0)
# 目前aac格式可用于解码MacOS Safari/Chrome的录音文件,IOS Safari待确认
# req.set_voice_format("wav")
req.set_voice_format("aac")
req.set_word_info(0)
req.set_convert_num_mode(1)
# 将Base64字符串写到临时文件
# Decode base64 string
decoded_data = base64.b64decode(asr_req.audio_b64_string)
# Write the decoded data to a file
characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for i in range(16))
tmp_file = '/tmp/whisper_' + random_string
with open(tmp_file, 'wb') as file:
file.write(decoded_data)
with open(tmp_file, 'rb') as f:
#读取音频数据
data = f.read()
#执行识别
resultData = recognizer.recognize(req, data)
resp = json.loads(resultData)
request_id = resp["request_id"]
code = resp["code"]
if code != 0:
lm.log(logging.ERROR, curr_module, "recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
resp.result = c_ret_code_error
resp.detected_text = ''
resp.result_message = 'Recognize faild! Please find detail in log.'
return resp
lm.log(logging.INFO, curr_module, "request_id: ", request_id)
#一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result
for channl_result in resp["flash_result"]:
lm.log(logging.INFO, curr_module, "channel_id: " + str(channl_result["channel_id"]))
lm.log(logging.INFO, curr_module, channl_result["text"])
asr_resp.result = c_ret_code_success
asr_resp.detected_text = channl_result["text"]
asr_resp.result_message = ''
return asr_resp
lm.log(logging.INFO, curr_module, "qcloud get_detected_text complete")
except Exception as e:
lm.log(logging.ERROR, curr_module, str(e))
asr_resp.result = c_ret_code_error
asr_resp.detected_text = ''
asr_resp.result_message = str(e)
return asr_resp
# Import FastAPI Libs
from pydantic import BaseModel
# Import Qcloud COS Service Libs
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
# Import Other Common Libs
import sys
import os
import logging
from datetime import datetime
import mimetypes
import json
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
region = 'ap-shanghai'
bucket = 'clark-apex-saz-1320304559'
user_name = 'oss-program'
credential_file = '/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, bucket_name: str, user_name: str) -> tuple:
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
secret_id = data['oss'][bucket_name][user_name]['secret_id']
secret_key = data['oss'][bucket_name][user_name]['secret_key']
return secret_id, secret_key
except Exception as e:
print(e)
return None, None
# ------------------------------------------------
# Function of upload single file to Qcloud COS
# ------------------------------------------------
def upload_single_file(dir: str, file_name: str, rename_prefix: str, client: CosS3Client, bucket: str, index: int) -> tuple:
file_key = ""
file_etag = ""
ret_code = c_ret_code_success
err_msg = ""
try:
# Check if file is empty
if file_name == None or file_name == "":
file_key = ""
else:
# Check file existance
file_location = os.path.join(dir, file_name)
mime_type, _ = mimetypes.guess_type(file_location)
if os.path.exists(file_location):
file_key = rename_prefix + "_" + str(index) + "_" + file_name
# Call COS service to upload
with open(file_location, 'rb') as fp:
response = client.put_object(
Bucket=bucket, # Bucket 由 BucketName-APPID 组成
Body=fp,
Key=file_key,
StorageClass='STANDARD',
ContentType=mime_type
)
file_etag = response['ETag'].strip('"')
# delete file after upload
os.remove(file_location)
else:
file_key = ""
return ret_code, err_msg, file_key, file_etag
except Exception as e:
ret_code = c_ret_code_error
err_msg = str(e)
return ret_code, err_msg, file_key, file_etag
# ------------------------------------------------
# Define File Upload Service Request and Response
# ------------------------------------------------
class file_upload_req_basemodel(BaseModel):
dir: str
file1_name: str
file2_name: str
file3_name: str
file4_name: str
file5_name: str
class file_upload_res_basemodel(BaseModel):
result: int
result_message: str
file1_key: str
file1_etag: str
file2_key: str
file2_etag: str
file3_key: str
file3_etag: str
file4_key: str
file4_etag: str
file5_key: str
file5_etag: str
# ------------------------------------------------
# Define File Upload Service Process Detail
# ------------------------------------------------
def upload(file_uplaod_req: file_upload_req_basemodel):
secret_id, secret_key = get_credentials(credential_file, bucket, user_name)
#secret_id = # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
#secret_key = # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# ---------------------------------------------------
# 10. Set Logging Level
# ---------------------------------------------------
# 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
# ---------------------------------------------------
# 20. Set Credential
# ---------------------------------------------------
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
# ---------------------------------------------------
# 30. Config Client
# ---------------------------------------------------
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
# ---------------------------------------------------
# 40. Compose File Keys
# ---------------------------------------------------
# Get current date yyyy/mm/dd
current_time = datetime.now()
formatted_date = current_time.strftime("%Y/%m/%d/%H%M%S")
# ---------------------------------------------------
# 50. Upload Files
# ---------------------------------------------------
result = c_ret_code_success
error_list = []
# File 1
ret_code, err_msg, file1_key, file1_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file1_name, formatted_date, client, bucket, 1)
if ret_code != c_ret_code_success:
result = c_ret_code_error
error_list.append("File 1 upload failed: " + err_msg)
# File 2
ret_code, err_msg, file2_key, file2_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file2_name, formatted_date, client, bucket, 2)
if ret_code != c_ret_code_success:
result = c_ret_code_error
error_list.append("File 2 upload failed: " + err_msg)
# File 3
ret_code, err_msg, file3_key, file3_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file3_name, formatted_date, client, bucket, 3)
if ret_code != c_ret_code_success:
result = c_ret_code_error
error_list.append("File 3 upload failed: " + err_msg)
# File 4
ret_code, err_msg, file4_key, file4_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file4_name, formatted_date, client, bucket, 4)
if ret_code != c_ret_code_success:
result = c_ret_code_error
error_list.append("File 4 upload failed: " + err_msg)
# File 5
ret_code, err_msg, file5_key, file5_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file5_name, formatted_date, client, bucket, 5)
if ret_code != c_ret_code_success:
result = c_ret_code_error
error_list.append("File 5 upload failed: " + err_msg)
file_upload_res = file_upload_res_basemodel(
result = result,
result_message = ', '.join(error_list),
file1_key = file1_key,
file1_etag = file1_etag,
file2_key = file2_key,
file2_etag = file2_etag,
file3_key = file3_key,
file3_etag = file3_etag,
file4_key = file4_key,
file4_etag = file4_etag,
file5_key = file5_key,
file5_etag = file5_etag
)
return file_upload_res
# ------------------------------------------------
# Define File Download URL Service Request and Response
# ------------------------------------------------
class file_download_req_basemodel(BaseModel):
key: str
class file_download_res_basemodel(BaseModel):
result: int
result_message: str
download_url: str
# ------------------------------------------------
# Define File Download URL Service Process Detail
# ------------------------------------------------
def get_download_url(file_download_req: file_download_req_basemodel):
# Define constants
secret_id, secret_key = get_credentials(credential_file, bucket, user_name)
# ---------------------------------------------------
# 10. Set Logging Level
# ---------------------------------------------------
# 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
# ---------------------------------------------------
# 20. Set Credential
# ---------------------------------------------------
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
# ---------------------------------------------------
# 30. Config Client
# ---------------------------------------------------
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config)
try:
# 生成下载 URL,未限制请求头部和请求参数
url = client.get_presigned_url(
Method='GET',
Bucket=bucket,
Key=file_download_req.key,
Expired=120 # 120秒后过期,过期时间请根据自身场景定义
)
ret_code = c_ret_code_success
err_msg = ""
except Exception as e:
ret_code = c_ret_code_error
err_msg = str(e)
file_download_res = file_download_res_basemodel(
result = ret_code,
result_message = err_msg,
download_url = url
)
return file_download_res
# Import FastAPI Libs
from pydantic import BaseModel
# Import Qcloud OCR Service Libs
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.ocr.v20181119 import ocr_client, models
# Import Other Common Libs
import json
import base64
import os
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
region = 'ap-shanghai'
user_name = 'ocr-program'
credential_file = '/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class ocr_req_basemodel(BaseModel):
ocr_tool: str # Q: Qcloud, LP: Local PaddleOCR
dir: str
file_name: str
class ocr_res_basemodel(BaseModel):
result: int
result_message: str
detected_text: list
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, user_name: str) -> tuple:
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
secret_id = data['ocr'][user_name]['secret_id']
secret_key = data['ocr'][user_name]['secret_key']
return secret_id, secret_key
except Exception as e:
print(e)
return None, None
# ------------------------------------------------
# Function of OCR request to Qcloud COS
# ------------------------------------------------
def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple:
ret_code = c_ret_code_success
err_msg = ""
try:
secret_id, secret_key = get_credentials(credential_file, user_name)
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred = credential.Credential(secret_id, secret_key)
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "ocr.tencentcloudapi.com"
# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的
client = ocr_client.OcrClient(cred, region, clientProfile)
# 打开文件
file_location = os.path.normpath(os.path.join(ocr_req.dir, ocr_req.file_name))
with open(file_location, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
# print(encoded_string)
# Remove file
os.remove(file_location)
# 实例化一个请求对象,每个接口都会对应一个request对象
req = models.GeneralHandwritingOCRRequest()
params = {
"ImageBase64": encoded_string
}
req.from_json_string(json.dumps(params))
# 返回的resp是一个GeneralHandwritingOCRResponse的实例,与请求对象对应
resp = client.GeneralHandwritingOCR(req)
# 输出json格式的字符串回包
# print(resp.to_json_string())
list_detected_text = []
for text_detection in (json.loads(resp.to_json_string()))['TextDetections']:
# print(text_detection['DetectedText'])
list_detected_text.append(text_detection['DetectedText'])
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = list_detected_text
)
return ocr_res
except TencentCloudSDKException as err:
ret_code = c_ret_code_error
err_msg = str(err)
ocr_res = ocr_res_basemodel(
result = ret_code,
result_message = err_msg,
detected_text = []
)
return ocr_res
# Import FastAPI Libs
from fastapi import FastAPI
from pydantic import BaseModel
import qcloud_cos_service
import qcloud_ocr_service
# ------------------------------------------------
# Define FastAPI
# ------------------------------------------------
app = FastAPI()
# ------------------------------------------------
# Call File Upload Service Process
# ------------------------------------------------
@app.post("/upload/")
async def upload(file_uplaod_req: qcloud_cos_service.file_upload_req_basemodel):
return qcloud_cos_service.upload(file_uplaod_req)
# ------------------------------------------------
# Call File Download URL Service Process
# ------------------------------------------------
@app.post("/get_download_url/")
async def get_download_url(file_download_req: qcloud_cos_service.file_download_req_basemodel):
return qcloud_cos_service.get_download_url(file_download_req)
# ------------------------------------------------
# Call OCR Service Process
# ------------------------------------------------
@app.post("/get_detected_text/")
async def get_detected_text(ocr_req: qcloud_ocr_service.ocr_req_basemodel):
return qcloud_ocr_service.get_detected_text(ocr_req)
\ No newline at end of file
# Import FastAPI Libs
from pydantic import BaseModel
# Import Dashscope Service Libs
from http import HTTPStatus
from dashscope import Generation
from dashscope.api_entities.dashscope_response import Role
import dashscope
import json
import os
# Import your custom logging manager
import logging
from common.script.logging_manager import LoggingManager
# Initialize logging manager
curr_module = os.path.basename(__file__)
lm = LoggingManager.get_instance()
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success = 0
c_ret_code_error = 1
gen_ai_id = 'aliyun'
credential_file = '/home/oracle/python/apex/credential/credential.json'
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str, gen_ai_id: str) -> str:
try:
# Open and read the JSON file
with open(file, 'r') as credential_file:
data = json.load(credential_file)
# Access data from the JSON
api_key = data['gen-ai'][gen_ai_id]['api_key']
return api_key
except Exception as e:
lm.log(logging.ERROR, "Exception occurred in get_credentials: %s", str(e), exc_info=True)
return None
# ------------------------------------------------
# Define GenAI Message Service Request and Response
# ------------------------------------------------
class Massage(BaseModel):
role: str
text: str
class send_message_req_basemodel(BaseModel):
model: str
messages: list[Massage]
class send_message_res_basemodel(BaseModel):
result: int
result_message: str
message_res: str
# ------------------------------------------------
# Function to call dashscope SDK
# ------------------------------------------------
def send_message(messages_req: send_message_req_basemodel) -> send_message_res_basemodel:
result = c_ret_code_success
result_message = ""
try:
# Get API Key
api_key = get_credentials(credential_file, gen_ai_id)
if (api_key == None):
result = c_ret_code_error
# Set return tuple for normal result
send_message_res = send_message_res_basemodel(
result = result,
result_message = 'Failed to get API key',
message_res = ''
)
return send_message_res
else:
dashscope.api_key = api_key
# Set system prompt
messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant.'}]
# Loop input message list
for message in messages_req.messages:
# Set user and assistant prompt
messages.append({'role': message.role, 'content': message.text})
lm.log(logging.INFO, curr_module, 'message: ', str(messages))
# Get Model
# model = Models()
match messages_req.model:
case 'qwen-max':
model = Generation.Models.qwen_max
case 'qwen-turbo':
model = Generation.Models.qwen_turbo
case 'qwen-plus':
model = Generation.Models.qwen_plus
# Use SDK to get answer
response = Generation.call(
model, # Generation.Models.qwen_turbo,
messages=messages,
result_format='message', # set the result to be 'message' format.
)
if response.status_code == HTTPStatus.OK:
lm.log(logging.INFO, curr_module, 'Normal response', str(response))
send_message_res = send_message_res_basemodel(
result = result,
result_message = '',
message_res = response['output']['choices'][0]['message']['content']
)
return send_message_res
else:
lm.log(logging.INFO, curr_module, 'Error response', str(response))
send_message_res = send_message_res_basemodel(
result = result,
result_message = '',
message_res = response.message
)
return send_message_res
except Exception as e:
lm.log(logging.ERROR, "Exception occurred: %s", str(e), exc_info=True)
result = c_ret_code_error
# Set return tuple for abnormal result
send_message_res = send_message_res_basemodel(
result = result,
result_message = str(e),
message_res = ''
)
return send_message_res
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment