Commit f2a08f6b authored by Clark Lin's avatar Clark Lin
Browse files

Merge branch 'development' of https://linyinghao.cn/gitlab/clark/fastapi-for-apex into development

parents 1204ec96 753bfe8e
# Fastapi for APEX
{ {
"oss": { "oss": {
"<qcloud oss bucket name>": { "<qcloud oss bucket name>": {
"oss-program": { "oss-program": {
"secret_id": "<qcloud oss secret_id>", "secret_id": "<qcloud oss secret_id>",
"secret_key": "<qcloud oss secret_key>", "secret_key": "<qcloud oss secret_key>",
"description": "object-storage-all" "description": "object-storage-all"
} }
} }
}, },
"ocr": { "ocr": {
"ocr-program": { "ocr-program": {
"secret_id": "<qcloud ocr secret_id>", "secret_id": "<qcloud ocr secret_id>",
"secret_key": "<qcloud ocr secret_key>", "secret_key": "<qcloud ocr secret_key>",
"description": "文字识别 (ocr)" "description": "文字识别 (ocr)"
} }
}, },
"asr": { "asr": {
"asr-program": { "asr-program": {
"app_id": "<qcloud ASR app id>", "app_id": "<qcloud ASR app id>",
"secret_id": "<qcloud ASR secret_id>", "secret_id": "<qcloud ASR secret_id>",
"secret_key": "<qcloud ASR secret_key>", "secret_key": "<qcloud ASR secret_key>",
"description": "语音识别 (asr)" "description": "语音识别 (asr)"
} }
}, },
"paddle": { "paddle": {
"aliyun_ecs": { "aliyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx", "base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>", "client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>" "client_secret": "<OP fastapi client_secret>"
}, },
"ctyun_ecs": { "ctyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx", "base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>", "client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>" "client_secret": "<OP fastapi client_secret>"
} }
}, },
"whisper": { "whisper": {
"aliyun_ecs": { "aliyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx", "base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>", "client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>" "client_secret": "<OP fastapi client_secret>"
}, },
"ctyun_ecs": { "ctyun_ecs": {
"base_url": "https://xxx.xxx.xxx.xxx:xxxx", "base_url": "https://xxx.xxx.xxx.xxx:xxxx",
"client_id": "<OP fastapi client_id>", "client_id": "<OP fastapi client_id>",
"client_secret": "<OP fastapi client_secret>" "client_secret": "<OP fastapi client_secret>"
} }
}, },
"gen-ai": { "gen-ai": {
"aliyun":{ "aliyun":{
"api_key": "<Gen AI api_key>", "api_key": "<Gen AI api_key>",
"description": "Tongyi" "description": "Tongyi"
} }
} }
} }
{
"oauth2_secret_key": "<generate random value by openssl rand -hex 32>",
"client_db": {
"client_id: generate random value by openssl rand -hex 32": {
"hashed_client_secret": "<generate by CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password)>",
"name": "fastapi_service_test",
"description": "This client is for fastapi service test"
}
}
}
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: fastapi_security_util.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-04-24 Clark Lin Initial version
#
# Main features summary:
# - Implementation of OAuth2 Authentication
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
from datetime import datetime, timedelta, timezone
from typing import Annotated
from pydantic import BaseModel
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
# Import your custom logging manager
import os
import logging
from common.script.logging_manager import LoggingManager
import json
# Initialize logging manager
curr_module = os.path.basename(__file__)
# lm = LoggingManager()
lm = LoggingManager.get_instance()
# ------------------------------------------------
# Init Global Variables
# ------------------------------------------------
# Credential file
credential_file = './credential/oauth2.json'
# to get a string like this run:
# openssl rand -hex 32
secret_key = ""
algorithm = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Init crypt context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Init Oauth2 schema, specify token endpoint
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ------------------------------------------------
# Sample Repository for Client Verification
# ------------------------------------------------
client_db = {}
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str) -> tuple:
global secret_key
global client_db
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
secret_key = data['oauth2_secret_key']
client_db = data['client_db']
return secret_key, client_db
except Exception as e:
print(e)
return None, None
# ------------------------------------------------
# Model Definition
# ------------------------------------------------
class Token(BaseModel):
access_token: str
token_type: str
# ------------------------------------------------
# Sub Function - Verify Password
# ------------------------------------------------
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# ------------------------------------------------
# Sub Function - Use Cript Context to Get Hash
# ------------------------------------------------
def get_password_hash(password):
return pwd_context.hash(password)
# ------------------------------------------------
# Sub Function - Get Client from Dict by Client ID
# ------------------------------------------------
def get_client(db: dict, client_id: str):
if client_id in db:
client_dict = db[client_id]
return client_dict
# ------------------------------------------------
# Sub Function - Authentication Process
# ------------------------------------------------
def authenticate_client(client_db: dict, client_id: str, client_secret: str):
client = get_client(client_db, client_id)
if not client:
return False
if not verify_password(client_secret, client["hashed_client_secret"]):
return False
return client
# ------------------------------------------------
# Sub Function - Generate Access Token
# ------------------------------------------------
def create_access_token(data: dict, expires_delta: timedelta | None = None):
global secret_key
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
# ------------------------------------------------
# Sub Function - Generate Access Token
# ------------------------------------------------
def get_access_token(
form_data: OAuth2PasswordRequestForm,
without_credential: bool = False) -> Token:
global secret_key
global client_db
# Get credentials
secret_key, client_db = get_credentials(credential_file)
if secret_key == None or client_db == None:
lm.log(logging.INFO, curr_module, "Failed to get credentials from file [" + credential_file + "]")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get credentials from file [" + credential_file + "]",
)
# Call authentication process
if (not without_credential):
client = authenticate_client(client_db, form_data.username, form_data.password)
if not client:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
if (not without_credential):
access_token = create_access_token(
data = {
"sub": form_data.username,
"name": client["name"],
"description": client["description"]
},
expires_delta = access_token_expires
)
else:
client_id = list(client_db.keys())[0]
access_token = create_access_token(
data = {
"sub": client_id,
"name": client_db[client_id]["name"],
"description": client_db[client_id]["description"]
},
expires_delta = access_token_expires
)
lm.log(logging.INFO, curr_module, 'get_access_token completed with normal')
return Token(access_token=access_token, token_type="bearer")
# ------------------------------------------------
# Sub Function - Generate Access Token
# ------------------------------------------------
def get_access_token_without_credential() -> Token:
return get_access_token(None, True);
# Import FastAPI Libs
from fastapi import FastAPI, Query
from pydantic import BaseModel
import qcloud_cos_service
import qcloud_ocr_service
import tongyi_genai_service
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from typing import Annotated
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import fastapi_security_util
from fastapi_security_util import Token
# ------------------------------------------------
# Init Global Variables
# ------------------------------------------------
# Init Oauth2 schema, specify token endpoint
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ------------------------------------------------
# Define FastAPI
# ------------------------------------------------
app = FastAPI()
# ------------------------------------------------
# Call Token Service
# ------------------------------------------------
@app.post("/token")
def get_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
return fastapi_security_util.get_access_token(form_data)
# ------------------------------------------------
# Call Token Service without Credential
# ------------------------------------------------
@app.post("/token_without_credential")
def get_access_token_without_credential() -> Token:
return fastapi_security_util.get_access_token_without_credential()
# ------------------------------------------------
# Define CORS Options
# ------------------------------------------------
# origins = [
# "http://localhost",
# "http://localhost:8000",
# "https://yourdomain.com",
# # Add more origins if needed
# ]
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # List of allowed origins
allow_credentials=True, # Whether to allow credentials (cookies, authorization headers, etc.)
allow_methods=["*"], # List of allowed methods (GET, POST, PUT, DELETE, etc.)
allow_headers=["*"], # List of allowed headers
)
# ------------------------------------------------
# Call File Upload Service Process
# ------------------------------------------------
@app.post("/upload/")
async def upload(file_uplaod_req: qcloud_cos_service.file_upload_req_basemodel):
return qcloud_cos_service.upload(file_uplaod_req)
# ------------------------------------------------
# Call File Download URL Service Process
# ------------------------------------------------
@app.post("/get_download_url/")
async def get_download_url(file_download_req: qcloud_cos_service.file_download_req_basemodel):
return qcloud_cos_service.get_download_url(file_download_req)
# ------------------------------------------------
# Call OCR Service Process
# ------------------------------------------------
@app.post("/get_detected_text/")
async def get_detected_text(ocr_req: qcloud_ocr_service.ocr_req_basemodel):
if ocr_req.tool == 'Q':
# Call Qcloud API
return qcloud_ocr_service.get_qcloud_detected_text(ocr_req)
elif ocr_req.tool == 'LP':
# Call Local PaddleOCR API
return qcloud_ocr_service.get_paddleocr_detected_text(ocr_req)
# ------------------------------------------------
# Call Gen AI Service Process
# ------------------------------------------------
@app.post("/send_message_to_gen_ai/tongyi/")
async def send_message(send_message_req: tongyi_genai_service.send_message_req_basemodel):
return tongyi_genai_service.send_message(send_message_req)
# ------------------------------------------------
# Call Gen AI Service Process (send question)
# ------------------------------------------------
@app.post("/send_message_to_gen_ai_async/tongyi/send_question")
async def send_question(
token: Annotated[str, Depends(oauth2_scheme)],
send_message_req: tongyi_genai_service.send_message_req_basemodel):
print('token', token)
return tongyi_genai_service.send_question(token, send_message_req)
# ------------------------------------------------
# Call Gen AI Service Process (get answer)
# ------------------------------------------------
@app.get("/send_message_to_gen_ai_async/tongyi/get_answer")
async def get_answer(
uuid: str = Query(None, title="UUID", description="UUID linked to request payload")):
return StreamingResponse(tongyi_genai_service.get_answer(uuid), media_type="text/event-stream")
...@@ -203,4 +203,3 @@ def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple: ...@@ -203,4 +203,3 @@ def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple:
return ocr_res return ocr_res
...@@ -203,4 +203,3 @@ def get_detected_text(asr_req: asr_req_basemodel) -> tuple: ...@@ -203,4 +203,3 @@ def get_detected_text(asr_req: asr_req_basemodel) -> tuple:
return asr_res return asr_res
# Import FastAPI Libs # Import FastAPI Libs
from pydantic import BaseModel from pydantic import BaseModel
# Import Qcloud OCR Service Libs # Import Qcloud OCR Service Libs
import sys import sys
# sys.path.append("../qcloud") # sys.path.append("../qcloud")
from qcloud.common import credential from qcloud.common import credential
from qcloud.asr import flash_recognizer from qcloud.asr import flash_recognizer
# Common log part # Common log part
import os import os
import logging import logging
from common.script.logging_manager import LoggingManager from common.script.logging_manager import LoggingManager
curr_module = os.path.basename(__file__) # Initialize logging manager curr_module = os.path.basename(__file__) # Initialize logging manager
lm = LoggingManager.get_instance() # lm = LoggingManager() lm = LoggingManager.get_instance() # lm = LoggingManager()
# Import Other Common Libs # Import Other Common Libs
import json import json
import base64 import base64
import os import os
import random import random
import string import string
# ------------------------------------------------ # ------------------------------------------------
# Define constants # Define constants
# ------------------------------------------------ # ------------------------------------------------
c_ret_code_success = 0 c_ret_code_success = 0
c_ret_code_error = 1 c_ret_code_error = 1
user_name = 'asr-program' user_name = 'asr-program'
credential_file = 'credential/credential.json' credential_file = 'credential/credential.json'
# ------------------------------------------------ # ------------------------------------------------
# Define OCR Request Service Request and Response # Define OCR Request Service Request and Response
# ------------------------------------------------ # ------------------------------------------------
class asr_req_basemodel(BaseModel): class asr_req_basemodel(BaseModel):
asr_tool: str # W: Whisper; Q: Qcloud asr_tool: str # W: Whisper; Q: Qcloud
model: str model: str
audio_b64_string: str audio_b64_string: str
initial_prompt: str initial_prompt: str
class asr_res_basemodel(BaseModel): class asr_res_basemodel(BaseModel):
result: int result: int
result_message: str result_message: str
detected_text: str detected_text: str
# ------------------------------------------------ # ------------------------------------------------
# Get credentials # Get credentials
# ------------------------------------------------ # ------------------------------------------------
def get_credentials(file: str, user_name: str) -> tuple: def get_credentials(file: str, user_name: str) -> tuple:
try: try:
# Open and read the JSON file # Open and read the JSON file
with open(file, "r") as credential_file: with open(file, "r") as credential_file:
data = json.load(credential_file) data = json.load(credential_file)
# Access data from the JSON # Access data from the JSON
app_id = data['asr'][user_name]['app_id'] app_id = data['asr'][user_name]['app_id']
secret_id = data['asr'][user_name]['secret_id'] secret_id = data['asr'][user_name]['secret_id']
secret_key = data['asr'][user_name]['secret_key'] secret_key = data['asr'][user_name]['secret_key']
return app_id, secret_id, secret_key return app_id, secret_id, secret_key
except Exception as e: except Exception as e:
lm.log(logging.INFO, curr_module, str(e)) lm.log(logging.INFO, curr_module, str(e))
return None, None, None return None, None, None
# ------------------------------------------------ # ------------------------------------------------
# Function of ASR request to Qcloud COS # Function of ASR request to Qcloud COS
# ------------------------------------------------ # ------------------------------------------------
def get_detected_text(asr_req: asr_req_basemodel) -> tuple: def get_detected_text(asr_req: asr_req_basemodel) -> tuple:
lm.log(logging.INFO, curr_module, "qcloud get_detected_text start") lm.log(logging.INFO, curr_module, "qcloud get_detected_text start")
# 初始化处理结果 # 初始化处理结果
ret_code = c_ret_code_success ret_code = c_ret_code_success
err_msg = "" err_msg = ""
# 初始化ID # 初始化ID
app_id = "" app_id = ""
secret_id = "" secret_id = ""
secret_key = "" secret_key = ""
engine_type = "16k_en" engine_type = "16k_en"
# 初始化返回对象 # 初始化返回对象
asr_resp = asr_res_basemodel( asr_resp = asr_res_basemodel(
result = c_ret_code_success, result = c_ret_code_success,
result_message = '', result_message = '',
detected_text = '' detected_text = ''
) )
try: try:
app_id, secret_id, secret_key = get_credentials(credential_file, user_name) app_id, secret_id, secret_key = get_credentials(credential_file, user_name)
cred = credential.Credential(secret_id, secret_key) cred = credential.Credential(secret_id, secret_key)
if (app_id == None or secret_id == None or secret_key == None): if (app_id == None or secret_id == None or secret_key == None):
lm.log(logging.ERROR, curr_module, 'Failed to get credential') lm.log(logging.ERROR, curr_module, 'Failed to get credential')
asr_resp.result = c_ret_code_error asr_resp.result = c_ret_code_error
asr_resp.detected_text = '' asr_resp.detected_text = ''
asr_resp.result_message = 'Failed to get credential' asr_resp.result_message = 'Failed to get credential'
return asr_resp return asr_resp
# 新建FlashRecognizer,一个recognizer可以执行N次识别请求 # 新建FlashRecognizer,一个recognizer可以执行N次识别请求
recognizer = flash_recognizer.FlashRecognizer(app_id, cred) recognizer = flash_recognizer.FlashRecognizer(app_id, cred)
# 新建识别请求 # 新建识别请求
req = flash_recognizer.FlashRecognitionRequest(engine_type) req = flash_recognizer.FlashRecognitionRequest(engine_type)
req.set_filter_modal(0) req.set_filter_modal(0)
req.set_filter_punc(0) req.set_filter_punc(0)
req.set_filter_dirty(0) req.set_filter_dirty(0)
# 目前aac格式可用于解码MacOS Safari/Chrome的录音文件,IOS Safari待确认 # 目前aac格式可用于解码MacOS Safari/Chrome的录音文件,IOS Safari待确认
# req.set_voice_format("wav") # req.set_voice_format("wav")
req.set_voice_format("aac") req.set_voice_format("aac")
req.set_word_info(0) req.set_word_info(0)
req.set_convert_num_mode(1) req.set_convert_num_mode(1)
# 将Base64字符串写到临时文件 # 将Base64字符串写到临时文件
# Decode base64 string # Decode base64 string
decoded_data = base64.b64decode(asr_req.audio_b64_string) decoded_data = base64.b64decode(asr_req.audio_b64_string)
# Write the decoded data to a file # Write the decoded data to a file
characters = string.ascii_letters + string.digits characters = string.ascii_letters + string.digits
random_string = ''.join(random.choice(characters) for i in range(16)) random_string = ''.join(random.choice(characters) for i in range(16))
tmp_file = '/tmp/whisper_' + random_string tmp_file = '/tmp/whisper_' + random_string
with open(tmp_file, 'wb') as file: with open(tmp_file, 'wb') as file:
file.write(decoded_data) file.write(decoded_data)
with open(tmp_file, 'rb') as f: with open(tmp_file, 'rb') as f:
#读取音频数据 #读取音频数据
data = f.read() data = f.read()
#执行识别 #执行识别
resultData = recognizer.recognize(req, data) resultData = recognizer.recognize(req, data)
resp = json.loads(resultData) resp = json.loads(resultData)
request_id = resp["request_id"] request_id = resp["request_id"]
code = resp["code"] code = resp["code"]
if code != 0: if code != 0:
lm.log(logging.ERROR, curr_module, "recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"]) lm.log(logging.ERROR, curr_module, "recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
resp.result = c_ret_code_error resp.result = c_ret_code_error
resp.detected_text = '' resp.detected_text = ''
resp.result_message = 'Recognize faild! Please find detail in log.' resp.result_message = 'Recognize faild! Please find detail in log.'
return resp return resp
lm.log(logging.INFO, curr_module, "request_id: ", request_id) lm.log(logging.INFO, curr_module, "request_id: ", request_id)
#一个channl_result对应一个声道的识别结果 #一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result #大多数音频是单声道,对应一个channl_result
for channl_result in resp["flash_result"]: for channl_result in resp["flash_result"]:
lm.log(logging.INFO, curr_module, "channel_id: " + str(channl_result["channel_id"])) lm.log(logging.INFO, curr_module, "channel_id: " + str(channl_result["channel_id"]))
lm.log(logging.INFO, curr_module, channl_result["text"]) lm.log(logging.INFO, curr_module, channl_result["text"])
asr_resp.result = c_ret_code_success asr_resp.result = c_ret_code_success
asr_resp.detected_text = channl_result["text"] asr_resp.detected_text = channl_result["text"]
asr_resp.result_message = '' asr_resp.result_message = ''
return asr_resp return asr_resp
lm.log(logging.INFO, curr_module, "qcloud get_detected_text complete") lm.log(logging.INFO, curr_module, "qcloud get_detected_text complete")
except Exception as e: except Exception as e:
lm.log(logging.ERROR, curr_module, str(e)) lm.log(logging.ERROR, curr_module, str(e))
asr_resp.result = c_ret_code_error asr_resp.result = c_ret_code_error
asr_resp.detected_text = '' asr_resp.detected_text = ''
asr_resp.result_message = str(e) asr_resp.result_message = str(e)
return asr_resp return asr_resp
# Import FastAPI Libs # Import FastAPI Libs
from pydantic import BaseModel from pydantic import BaseModel
# Import Qcloud COS Service Libs # Import Qcloud COS Service Libs
from qcloud_cos import CosConfig from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client from qcloud_cos import CosS3Client
# Import Other Common Libs # Import Other Common Libs
import sys import sys
import os import os
import logging import logging
from datetime import datetime from datetime import datetime
import mimetypes import mimetypes
import json import json
# ------------------------------------------------ # ------------------------------------------------
# Define constants # Define constants
# ------------------------------------------------ # ------------------------------------------------
c_ret_code_success = 0 c_ret_code_success = 0
c_ret_code_error = 1 c_ret_code_error = 1
region = 'ap-shanghai' region = 'ap-shanghai'
bucket = 'clark-apex-saz-1320304559' bucket = 'clark-apex-saz-1320304559'
user_name = 'oss-program' user_name = 'oss-program'
credential_file = '/home/oracle/python/apex/credential/credential.json' credential_file = '/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json' # credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------ # ------------------------------------------------
# Get credentials # Get credentials
# ------------------------------------------------ # ------------------------------------------------
def get_credentials(file: str, bucket_name: str, user_name: str) -> tuple: def get_credentials(file: str, bucket_name: str, user_name: str) -> tuple:
try: try:
# Open and read the JSON file # Open and read the JSON file
with open(file, "r") as credential_file: with open(file, "r") as credential_file:
data = json.load(credential_file) data = json.load(credential_file)
# Access data from the JSON # Access data from the JSON
secret_id = data['oss'][bucket_name][user_name]['secret_id'] secret_id = data['oss'][bucket_name][user_name]['secret_id']
secret_key = data['oss'][bucket_name][user_name]['secret_key'] secret_key = data['oss'][bucket_name][user_name]['secret_key']
return secret_id, secret_key return secret_id, secret_key
except Exception as e: except Exception as e:
print(e) print(e)
return None, None return None, None
# ------------------------------------------------ # ------------------------------------------------
# Function of upload single file to Qcloud COS # Function of upload single file to Qcloud COS
# ------------------------------------------------ # ------------------------------------------------
def upload_single_file(dir: str, file_name: str, rename_prefix: str, client: CosS3Client, bucket: str, index: int) -> tuple: def upload_single_file(dir: str, file_name: str, rename_prefix: str, client: CosS3Client, bucket: str, index: int) -> tuple:
file_key = "" file_key = ""
file_etag = "" file_etag = ""
ret_code = c_ret_code_success ret_code = c_ret_code_success
err_msg = "" err_msg = ""
try: try:
# Check if file is empty # Check if file is empty
if file_name == None or file_name == "": if file_name == None or file_name == "":
file_key = "" file_key = ""
else: else:
# Check file existance # Check file existance
file_location = os.path.join(dir, file_name) file_location = os.path.join(dir, file_name)
mime_type, _ = mimetypes.guess_type(file_location) mime_type, _ = mimetypes.guess_type(file_location)
if os.path.exists(file_location): if os.path.exists(file_location):
file_key = rename_prefix + "_" + str(index) + "_" + file_name file_key = rename_prefix + "_" + str(index) + "_" + file_name
# Call COS service to upload # Call COS service to upload
with open(file_location, 'rb') as fp: with open(file_location, 'rb') as fp:
response = client.put_object( response = client.put_object(
Bucket=bucket, # Bucket 由 BucketName-APPID 组成 Bucket=bucket, # Bucket 由 BucketName-APPID 组成
Body=fp, Body=fp,
Key=file_key, Key=file_key,
StorageClass='STANDARD', StorageClass='STANDARD',
ContentType=mime_type ContentType=mime_type
) )
file_etag = response['ETag'].strip('"') file_etag = response['ETag'].strip('"')
# delete file after upload # delete file after upload
os.remove(file_location) os.remove(file_location)
else: else:
file_key = "" file_key = ""
return ret_code, err_msg, file_key, file_etag return ret_code, err_msg, file_key, file_etag
except Exception as e: except Exception as e:
ret_code = c_ret_code_error ret_code = c_ret_code_error
err_msg = str(e) err_msg = str(e)
return ret_code, err_msg, file_key, file_etag return ret_code, err_msg, file_key, file_etag
# ------------------------------------------------ # ------------------------------------------------
# Define File Upload Service Request and Response # Define File Upload Service Request and Response
# ------------------------------------------------ # ------------------------------------------------
class file_upload_req_basemodel(BaseModel): class file_upload_req_basemodel(BaseModel):
dir: str dir: str
file1_name: str file1_name: str
file2_name: str file2_name: str
file3_name: str file3_name: str
file4_name: str file4_name: str
file5_name: str file5_name: str
class file_upload_res_basemodel(BaseModel): class file_upload_res_basemodel(BaseModel):
result: int result: int
result_message: str result_message: str
file1_key: str file1_key: str
file1_etag: str file1_etag: str
file2_key: str file2_key: str
file2_etag: str file2_etag: str
file3_key: str file3_key: str
file3_etag: str file3_etag: str
file4_key: str file4_key: str
file4_etag: str file4_etag: str
file5_key: str file5_key: str
file5_etag: str file5_etag: str
# ------------------------------------------------ # ------------------------------------------------
# Define File Upload Service Process Detail # Define File Upload Service Process Detail
# ------------------------------------------------ # ------------------------------------------------
def upload(file_uplaod_req: file_upload_req_basemodel): def upload(file_uplaod_req: file_upload_req_basemodel):
secret_id, secret_key = get_credentials(credential_file, bucket, user_name) secret_id, secret_key = get_credentials(credential_file, bucket, user_name)
#secret_id = # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 #secret_id = # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
#secret_key = # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 #secret_key = # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# --------------------------------------------------- # ---------------------------------------------------
# 10. Set Logging Level # 10. Set Logging Level
# --------------------------------------------------- # ---------------------------------------------------
# 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息 # 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息
logging.basicConfig(level=logging.INFO, stream=sys.stdout) logging.basicConfig(level=logging.INFO, stream=sys.stdout)
# --------------------------------------------------- # ---------------------------------------------------
# 20. Set Credential # 20. Set Credential
# --------------------------------------------------- # ---------------------------------------------------
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成 # 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket # region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224 # COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048 token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填 scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
# --------------------------------------------------- # ---------------------------------------------------
# 30. Config Client # 30. Config Client
# --------------------------------------------------- # ---------------------------------------------------
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config) client = CosS3Client(config)
# --------------------------------------------------- # ---------------------------------------------------
# 40. Compose File Keys # 40. Compose File Keys
# --------------------------------------------------- # ---------------------------------------------------
# Get current date yyyy/mm/dd # Get current date yyyy/mm/dd
current_time = datetime.now() current_time = datetime.now()
formatted_date = current_time.strftime("%Y/%m/%d/%H%M%S") formatted_date = current_time.strftime("%Y/%m/%d/%H%M%S")
# --------------------------------------------------- # ---------------------------------------------------
# 50. Upload Files # 50. Upload Files
# --------------------------------------------------- # ---------------------------------------------------
result = c_ret_code_success result = c_ret_code_success
error_list = [] error_list = []
# File 1 # File 1
ret_code, err_msg, file1_key, file1_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file1_name, formatted_date, client, bucket, 1) ret_code, err_msg, file1_key, file1_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file1_name, formatted_date, client, bucket, 1)
if ret_code != c_ret_code_success: if ret_code != c_ret_code_success:
result = c_ret_code_error result = c_ret_code_error
error_list.append("File 1 upload failed: " + err_msg) error_list.append("File 1 upload failed: " + err_msg)
# File 2 # File 2
ret_code, err_msg, file2_key, file2_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file2_name, formatted_date, client, bucket, 2) ret_code, err_msg, file2_key, file2_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file2_name, formatted_date, client, bucket, 2)
if ret_code != c_ret_code_success: if ret_code != c_ret_code_success:
result = c_ret_code_error result = c_ret_code_error
error_list.append("File 2 upload failed: " + err_msg) error_list.append("File 2 upload failed: " + err_msg)
# File 3 # File 3
ret_code, err_msg, file3_key, file3_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file3_name, formatted_date, client, bucket, 3) ret_code, err_msg, file3_key, file3_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file3_name, formatted_date, client, bucket, 3)
if ret_code != c_ret_code_success: if ret_code != c_ret_code_success:
result = c_ret_code_error result = c_ret_code_error
error_list.append("File 3 upload failed: " + err_msg) error_list.append("File 3 upload failed: " + err_msg)
# File 4 # File 4
ret_code, err_msg, file4_key, file4_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file4_name, formatted_date, client, bucket, 4) ret_code, err_msg, file4_key, file4_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file4_name, formatted_date, client, bucket, 4)
if ret_code != c_ret_code_success: if ret_code != c_ret_code_success:
result = c_ret_code_error result = c_ret_code_error
error_list.append("File 4 upload failed: " + err_msg) error_list.append("File 4 upload failed: " + err_msg)
# File 5 # File 5
ret_code, err_msg, file5_key, file5_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file5_name, formatted_date, client, bucket, 5) ret_code, err_msg, file5_key, file5_etag = upload_single_file(file_uplaod_req.dir, file_uplaod_req.file5_name, formatted_date, client, bucket, 5)
if ret_code != c_ret_code_success: if ret_code != c_ret_code_success:
result = c_ret_code_error result = c_ret_code_error
error_list.append("File 5 upload failed: " + err_msg) error_list.append("File 5 upload failed: " + err_msg)
file_upload_res = file_upload_res_basemodel( file_upload_res = file_upload_res_basemodel(
result = result, result = result,
result_message = ', '.join(error_list), result_message = ', '.join(error_list),
file1_key = file1_key, file1_key = file1_key,
file1_etag = file1_etag, file1_etag = file1_etag,
file2_key = file2_key, file2_key = file2_key,
file2_etag = file2_etag, file2_etag = file2_etag,
file3_key = file3_key, file3_key = file3_key,
file3_etag = file3_etag, file3_etag = file3_etag,
file4_key = file4_key, file4_key = file4_key,
file4_etag = file4_etag, file4_etag = file4_etag,
file5_key = file5_key, file5_key = file5_key,
file5_etag = file5_etag file5_etag = file5_etag
) )
return file_upload_res return file_upload_res
# ------------------------------------------------ # ------------------------------------------------
# Define File Download URL Service Request and Response # Define File Download URL Service Request and Response
# ------------------------------------------------ # ------------------------------------------------
class file_download_req_basemodel(BaseModel): class file_download_req_basemodel(BaseModel):
key: str key: str
class file_download_res_basemodel(BaseModel): class file_download_res_basemodel(BaseModel):
result: int result: int
result_message: str result_message: str
download_url: str download_url: str
# ------------------------------------------------ # ------------------------------------------------
# Define File Download URL Service Process Detail # Define File Download URL Service Process Detail
# ------------------------------------------------ # ------------------------------------------------
def get_download_url(file_download_req: file_download_req_basemodel): def get_download_url(file_download_req: file_download_req_basemodel):
# Define constants # Define constants
secret_id, secret_key = get_credentials(credential_file, bucket, user_name) secret_id, secret_key = get_credentials(credential_file, bucket, user_name)
# --------------------------------------------------- # ---------------------------------------------------
# 10. Set Logging Level # 10. Set Logging Level
# --------------------------------------------------- # ---------------------------------------------------
# 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息 # 正常情况日志级别使用 INFO,需要定位时可以修改为 DEBUG,此时 SDK 会打印和服务端的通信信息
logging.basicConfig(level=logging.INFO, stream=sys.stdout) logging.basicConfig(level=logging.INFO, stream=sys.stdout)
# --------------------------------------------------- # ---------------------------------------------------
# 20. Set Credential # 20. Set Credential
# --------------------------------------------------- # ---------------------------------------------------
# 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成 # 1. 设置用户属性, 包括 secret_id, secret_key, region等。Appid 已在 CosConfig 中移除,请在参数 Bucket 中带上 Appid。Bucket 由 BucketName-Appid 组成
# secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_id = os.environ['COS_SECRET_ID'] # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_key = os.environ['COS_SECRET_KEY'] # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_id = 'AKIDQjrAHBsKctxoSzNYXSHtITVKH0Yv64tb' # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140 # secret_key = 'd2NZ1KXcfrriPwRnxvhKpFlHiy0w6CD2' # 用户的 SecretKey,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
# region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket # region = 'ap-shanghai' # 替换为用户的 region,已创建桶归属的 region 可以在控制台查看,https://console.cloud.tencent.com/cos5/bucket
# COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224 # COS 支持的所有 region 列表参见 https://cloud.tencent.com/document/product/436/6224
token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048 token = None # 如果使用永久密钥不需要填入 token,如果使用临时密钥需要填入,临时密钥生成和使用指引参见 https://cloud.tencent.com/document/product/436/14048
scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填 scheme = 'https' # 指定使用 http/https 协议来访问 COS,默认为 https,可不填
# --------------------------------------------------- # ---------------------------------------------------
# 30. Config Client # 30. Config Client
# --------------------------------------------------- # ---------------------------------------------------
config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme) config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token, Scheme=scheme)
client = CosS3Client(config) client = CosS3Client(config)
try: try:
# 生成下载 URL,未限制请求头部和请求参数 # 生成下载 URL,未限制请求头部和请求参数
url = client.get_presigned_url( url = client.get_presigned_url(
Method='GET', Method='GET',
Bucket=bucket, Bucket=bucket,
Key=file_download_req.key, Key=file_download_req.key,
Expired=120 # 120秒后过期,过期时间请根据自身场景定义 Expired=120 # 120秒后过期,过期时间请根据自身场景定义
) )
ret_code = c_ret_code_success ret_code = c_ret_code_success
err_msg = "" err_msg = ""
except Exception as e: except Exception as e:
ret_code = c_ret_code_error ret_code = c_ret_code_error
err_msg = str(e) err_msg = str(e)
file_download_res = file_download_res_basemodel( file_download_res = file_download_res_basemodel(
result = ret_code, result = ret_code,
result_message = err_msg, result_message = err_msg,
download_url = url download_url = url
) )
return file_download_res return file_download_res
# Import FastAPI Libs # Import FastAPI Libs
from pydantic import BaseModel from pydantic import BaseModel
# Import Qcloud OCR Service Libs # Import Qcloud OCR Service Libs
from tencentcloud.common import credential from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.ocr.v20181119 import ocr_client, models from tencentcloud.ocr.v20181119 import ocr_client, models
# Import Other Common Libs # Import Other Common Libs
import json import json
import base64 import base64
import os import os
# ------------------------------------------------ # ------------------------------------------------
# Define constants # Define constants
# ------------------------------------------------ # ------------------------------------------------
c_ret_code_success = 0 c_ret_code_success = 0
c_ret_code_error = 1 c_ret_code_error = 1
region = 'ap-shanghai' region = 'ap-shanghai'
user_name = 'ocr-program' user_name = 'ocr-program'
credential_file = '/home/oracle/python/apex/credential/credential.json' credential_file = '/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json' # credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------ # ------------------------------------------------
# Define OCR Request Service Request and Response # Define OCR Request Service Request and Response
# ------------------------------------------------ # ------------------------------------------------
class ocr_req_basemodel(BaseModel): class ocr_req_basemodel(BaseModel):
ocr_tool: str # Q: Qcloud, LP: Local PaddleOCR ocr_tool: str # Q: Qcloud, LP: Local PaddleOCR
dir: str dir: str
file_name: str file_name: str
class ocr_res_basemodel(BaseModel): class ocr_res_basemodel(BaseModel):
result: int result: int
result_message: str result_message: str
detected_text: list detected_text: list
# ------------------------------------------------ # ------------------------------------------------
# Get credentials # Get credentials
# ------------------------------------------------ # ------------------------------------------------
def get_credentials(file: str, user_name: str) -> tuple: def get_credentials(file: str, user_name: str) -> tuple:
try: try:
# Open and read the JSON file # Open and read the JSON file
with open(file, "r") as credential_file: with open(file, "r") as credential_file:
data = json.load(credential_file) data = json.load(credential_file)
# Access data from the JSON # Access data from the JSON
secret_id = data['ocr'][user_name]['secret_id'] secret_id = data['ocr'][user_name]['secret_id']
secret_key = data['ocr'][user_name]['secret_key'] secret_key = data['ocr'][user_name]['secret_key']
return secret_id, secret_key return secret_id, secret_key
except Exception as e: except Exception as e:
print(e) print(e)
return None, None return None, None
# ------------------------------------------------ # ------------------------------------------------
# Function of OCR request to Qcloud COS # Function of OCR request to Qcloud COS
# ------------------------------------------------ # ------------------------------------------------
def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple: def get_detected_text(ocr_req: ocr_req_basemodel) -> tuple:
ret_code = c_ret_code_success ret_code = c_ret_code_success
err_msg = "" err_msg = ""
try: try:
secret_id, secret_key = get_credentials(credential_file, user_name) secret_id, secret_key = get_credentials(credential_file, user_name)
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密 # 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305 # 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取 # 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred = credential.Credential(secret_id, secret_key) cred = credential.Credential(secret_id, secret_key)
# 实例化一个http选项,可选的,没有特殊需求可以跳过 # 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile = HttpProfile() httpProfile = HttpProfile()
httpProfile.endpoint = "ocr.tencentcloudapi.com" httpProfile.endpoint = "ocr.tencentcloudapi.com"
# 实例化一个client选项,可选的,没有特殊需求可以跳过 # 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile = ClientProfile() clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的 # 实例化要请求产品的client对象,clientProfile是可选的
client = ocr_client.OcrClient(cred, region, clientProfile) client = ocr_client.OcrClient(cred, region, clientProfile)
# 打开文件 # 打开文件
file_location = os.path.normpath(os.path.join(ocr_req.dir, ocr_req.file_name)) file_location = os.path.normpath(os.path.join(ocr_req.dir, ocr_req.file_name))
with open(file_location, "rb") as image_file: with open(file_location, "rb") as image_file:
encoded_string = base64.b64encode(image_file.read()).decode('utf-8') encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
# print(encoded_string) # print(encoded_string)
# Remove file # Remove file
os.remove(file_location) os.remove(file_location)
# 实例化一个请求对象,每个接口都会对应一个request对象 # 实例化一个请求对象,每个接口都会对应一个request对象
req = models.GeneralHandwritingOCRRequest() req = models.GeneralHandwritingOCRRequest()
params = { params = {
"ImageBase64": encoded_string "ImageBase64": encoded_string
} }
req.from_json_string(json.dumps(params)) req.from_json_string(json.dumps(params))
# 返回的resp是一个GeneralHandwritingOCRResponse的实例,与请求对象对应 # 返回的resp是一个GeneralHandwritingOCRResponse的实例,与请求对象对应
resp = client.GeneralHandwritingOCR(req) resp = client.GeneralHandwritingOCR(req)
# 输出json格式的字符串回包 # 输出json格式的字符串回包
# print(resp.to_json_string()) # print(resp.to_json_string())
list_detected_text = [] list_detected_text = []
for text_detection in (json.loads(resp.to_json_string()))['TextDetections']: for text_detection in (json.loads(resp.to_json_string()))['TextDetections']:
# print(text_detection['DetectedText']) # print(text_detection['DetectedText'])
list_detected_text.append(text_detection['DetectedText']) list_detected_text.append(text_detection['DetectedText'])
ocr_res = ocr_res_basemodel( ocr_res = ocr_res_basemodel(
result = ret_code, result = ret_code,
result_message = err_msg, result_message = err_msg,
detected_text = list_detected_text detected_text = list_detected_text
) )
return ocr_res return ocr_res
except TencentCloudSDKException as err: except TencentCloudSDKException as err:
ret_code = c_ret_code_error ret_code = c_ret_code_error
err_msg = str(err) err_msg = str(err)
ocr_res = ocr_res_basemodel( ocr_res = ocr_res_basemodel(
result = ret_code, result = ret_code,
result_message = err_msg, result_message = err_msg,
detected_text = [] detected_text = []
) )
return ocr_res return ocr_res
# Import FastAPI Libs # Import FastAPI Libs
from pydantic import BaseModel from pydantic import BaseModel
<<<<<<< HEAD
=======
import fastapi_security_util
from jose import JWTError, jwt
from fastapi import HTTPException, status
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
# Import Dashscope Service Libs # Import Dashscope Service Libs
from http import HTTPStatus from http import HTTPStatus
...@@ -8,6 +14,11 @@ from dashscope.api_entities.dashscope_response import Role ...@@ -8,6 +14,11 @@ from dashscope.api_entities.dashscope_response import Role
import dashscope import dashscope
import json import json
import os import os
<<<<<<< HEAD
=======
import asyncio
import uuid
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
# Import your custom logging manager # Import your custom logging manager
import logging import logging
...@@ -47,6 +58,25 @@ def get_credentials(file: str, gen_ai_id: str) -> str: ...@@ -47,6 +58,25 @@ def get_credentials(file: str, gen_ai_id: str) -> str:
return None return None
# ------------------------------------------------
# Sub Function - Verify Access Token
# ------------------------------------------------
def verify_token(token: str):
secret_key, client_db = fastapi_security_util.get_credentials(fastapi_security_util.credential_file)
try:
payload = jwt.decode(token, secret_key, algorithms=[fastapi_security_util.algorithm])
lm.log(logging.ERROR, curr_module, 'payload: ', str(payload))
username: str = payload.get("sub")
if username is None:
return False
return True
except JWTError:
lm.log(logging.ERROR, curr_module, 'JWTError: ', str(JWTError))
return False
# ------------------------------------------------ # ------------------------------------------------
# Define GenAI Message Service Request and Response # Define GenAI Message Service Request and Response
# ------------------------------------------------ # ------------------------------------------------
...@@ -62,6 +92,10 @@ class send_message_res_basemodel(BaseModel): ...@@ -62,6 +92,10 @@ class send_message_res_basemodel(BaseModel):
result: int result: int
result_message: str result_message: str
message_res: str message_res: str
<<<<<<< HEAD
=======
uuid: str
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
# ------------------------------------------------ # ------------------------------------------------
...@@ -89,7 +123,11 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b ...@@ -89,7 +123,11 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b
dashscope.api_key = api_key dashscope.api_key = api_key
# Set system prompt # Set system prompt
<<<<<<< HEAD
messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant.'}] messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant.'}]
=======
messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant. If you are not sure the answer, please don''t reply with wrong answer.'}]
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
# Loop input message list # Loop input message list
for message in messages_req.messages: for message in messages_req.messages:
...@@ -109,11 +147,34 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b ...@@ -109,11 +147,34 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b
model = Generation.Models.qwen_plus model = Generation.Models.qwen_plus
# Use SDK to get answer # Use SDK to get answer
<<<<<<< HEAD
response = Generation.call( response = Generation.call(
model, # Generation.Models.qwen_turbo, model, # Generation.Models.qwen_turbo,
messages=messages, messages=messages,
result_format='message', # set the result to be 'message' format. result_format='message', # set the result to be 'message' format.
) )
=======
responses = Generation.call(
model, # Generation.Models.qwen_turbo,
messages=messages,
result_format='message', # set the result to be 'message' format.
# Use stream style
stream=True,
incremental_output=True
)
for debug_response in responses:
lm.log(logging.INFO, curr_module, 'Normal response', str(debug_response.output))
response = debug_response
# if response.status_code == HTTPStatus.OK:
# print(response.output.choices[0]['message']['content'],end='')
# else:
# print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
# response.request_id, response.status_code,
# response.code, response.message
# ))
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
if response.status_code == HTTPStatus.OK: if response.status_code == HTTPStatus.OK:
lm.log(logging.INFO, curr_module, 'Normal response', str(response)) lm.log(logging.INFO, curr_module, 'Normal response', str(response))
...@@ -151,3 +212,150 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b ...@@ -151,3 +212,150 @@ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_b
return send_message_res return send_message_res
<<<<<<< HEAD
=======
# ------------------------------------------------
# Function to store question (async)
# ------------------------------------------------
def send_question(token: str, send_message_req: send_message_req_basemodel):
if not verify_token(token = token):
raise HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Authentication Failed",
headers={"WWW-Authenticate": "Bearer"},
)
result = c_ret_code_success
result_message = ""
# Store questions and uuid at somewhere
# Write to file named by UUID
uuid_value = str(uuid.uuid4())
uuid_file = os.path.join('/tmp/genai-stage', uuid_value)
with open(uuid_file, 'w') as file:
file.write(json.dumps(send_message_req.dict()))
send_message_res = send_message_res_basemodel(
result = result,
result_message = result_message,
message_res = '',
uuid = uuid_value
)
return send_message_res
# ------------------------------------------------
# Function to call dashscope SDK (async)
# ------------------------------------------------
async def get_answer(uuid: str):
result = c_ret_code_success
result_message = ""
# Try to read uuid file
uuid_file = os.path.join('/tmp/genai-stage', uuid)
with open(uuid_file, 'r') as file:
content = file.read()
lm.log(logging.INFO, curr_module, 'content: ', content)
file.close()
# Init basemodel
messages = []
messages_req = send_message_req_basemodel(
model = "",
messages = messages
)
# Try to deserialize
try:
data = json.loads(content)
messages_req = send_message_req_basemodel(**data)
lm.log(logging.INFO, curr_module, 'Deserilized: ', str(messages_req))
except json.JSONDecodeError as e:
lm.log(logging.INFO, curr_module, 'Failed to decode JSON: ', str(e))
except Exception as e:
lm.log(logging.INFO, curr_module, 'Failed to create model instance: ', str(e))
try:
# Get API Key
api_key = get_credentials(credential_file, gen_ai_id)
if (api_key == None):
result = c_ret_code_error
# Set return tuple for normal result
send_message_res = send_message_res_basemodel(
result = result,
result_message = 'Failed to get API key',
message_res = ''
)
# return send_message_res
yield send_message_res
return
else:
dashscope.api_key = api_key
# Set system prompt
messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant. If you are not sure the answer, please don''t reply with wrong answer.'}]
# Loop input message list
for message in messages_req.messages:
# Set user and assistant prompt
messages.append({'role': message.role, 'content': message.text})
lm.log(logging.INFO, curr_module, 'message: ', str(messages))
# Get Model
# model = Models()
match messages_req.model:
case 'qwen-max':
model = Generation.Models.qwen_max
case 'qwen-turbo':
model = Generation.Models.qwen_turbo
case 'qwen-plus':
model = Generation.Models.qwen_plus
# Use SDK to get answer
responses = Generation.call(
model, # Generation.Models.qwen_turbo,
messages=messages,
result_format='message', # set the result to be 'message' format.
# Use stream style
stream=True,
incremental_output=True
)
for debug_response in responses:
lm.log(logging.INFO, curr_module, 'Normal response', str(debug_response.output))
# yield f"data: {debug_response.output.choices[0]['message']['content']}\n\n"
raw_data = debug_response.output.choices[0]['message']['content']
# 针对Markdown格式进行优化
raw_data = raw_data.replace('\n\n', '\n')
raw_data = raw_data.replace('\n', '\n\n')
if raw_data.startswith('\n'):
raw_data = raw_data[1:]
lines = raw_data.splitlines()
for line in lines:
yield f"data: {line}\n\n"
# response = debug_response
await asyncio.sleep(0.1)
# if response.status_code == HTTPStatus.OK:
# print(response.output.choices[0]['message']['content'],end='')
# else:
# print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
# response.request_id, response.status_code,
# response.code, response.message
# ))
# yield 'data: null\n\n'
yield 'event: end\ndata: The stream is about to end\n\n'
os.remove(uuid_file)
except Exception as e:
result = c_ret_code_error
result_message = str(e)
lm.log(logging.ERROR, 'Exception', result_message)
yield f"data: Exception - {result_message}\n\n"
>>>>>>> 753bfe8e87baa5e77942adc934d18c6febdf34fa
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment