Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Clark Lin
Study Performance Tracking App
Commits
5d1e2830
Commit
5d1e2830
authored
May 22, 2024
by
Clark Lin
Browse files
optimized structure of fastapi services
parent
efa62dc7
Changes
9
Show whitespace changes
Inline
Side-by-side
python/common/config/logging.json
0 → 100644
View file @
5d1e2830
{
"logging"
:
{
"log_level"
:
"INFO"
,
"log_dir"
:
"./logs"
,
"filename_template"
:
"%Y%m%d.log"
}
}
python/common/script/logging_manager.py
0 → 100644
View file @
5d1e2830
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: logging_manager.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-03-18 Clark Lin Initial version
#
# Main features summary:
# - Feature 1
# - Feature 2
# - Feature 3
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
import
json
import
logging
from
logging.handlers
import
TimedRotatingFileHandler
import
os
import
datetime
class
LoggingManager
:
_instance
=
None
@
staticmethod
def
get_instance
(
config_rel_path
=
'../config/logging.json'
):
"""Get the singleton instance of LoggingManager."""
if
LoggingManager
.
_instance
is
None
:
LoggingManager
.
_instance
=
LoggingManager
(
config_rel_path
)
return
LoggingManager
.
_instance
def
__init__
(
self
,
config_rel_path
=
'../config/logging.json'
):
"""
Initialize the LoggingManager with the relative path to the configuration file.
By default, the configuration file is assumed to be located at ../config/logging.json.
Args:
config_rel_path (str, optional): Relative path to the JSON configuration file.
Defaults to '../config/logging.json'.
"""
if
self
.
_instance
is
not
None
:
raise
RuntimeError
(
"LoggingManager is a singleton class. Use get_instance() instead of directly instantiating."
)
script_dir
=
os
.
path
.
dirname
(
os
.
path
.
abspath
(
__file__
))
config_absolute_path
=
os
.
path
.
join
(
script_dir
,
config_rel_path
)
self
.
config
=
self
.
load_config
(
config_absolute_path
)
self
.
logger
=
self
.
setup_logger
()
def
load_config
(
self
,
config_path
):
"""
Load logging configuration from the provided JSON file.
Args:
config_path (str): Absolute path to the JSON configuration file.
Returns:
dict: The loaded logging configuration.
"""
with
open
(
config_path
,
'r'
)
as
f
:
config_data
=
json
.
load
(
f
)
return
config_data
.
get
(
'logging'
,
{})
def
setup_logger
(
self
):
"""
Setup the logger with the configuration settings.
Returns:
logging.Logger: A configured logger instance.
"""
log_level
=
self
.
config
.
get
(
'log_level'
,
logging
.
INFO
)
log_dir
=
self
.
config
.
get
(
'log_dir'
,
'./logs'
)
os
.
makedirs
(
log_dir
,
exist_ok
=
True
)
# Use today's date to create a new file name each day
today_date
=
datetime
.
datetime
.
today
().
strftime
(
'%Y%m%d'
)
filename
=
f
"
{
today_date
}
.log"
# Construct the full path for the log file
log_file_path
=
os
.
path
.
join
(
log_dir
,
filename
)
logger
=
logging
.
getLogger
(
__name__
)
logger
.
setLevel
(
log_level
)
# Create a file handler specifically for today's log file
file_handler
=
logging
.
FileHandler
(
log_file_path
)
file_handler
.
setLevel
(
log_level
)
# Create and set a log formatter
formatter
=
logging
.
Formatter
(
'%(asctime)s - %(levelname)s - %(message)s'
)
file_handler
.
setFormatter
(
formatter
)
logger
.
addHandler
(
file_handler
)
return
logger
def
log
(
self
,
log_level
,
module
,
message
,
*
args
,
**
kwargs
):
"""
Log a message at the specified log level.
Args:
log_level (int): One of the logging levels (e.g., logging.INFO).
message (str): The log message to be recorded.
*args, **kwargs: Additional arguments passed to the underlying logging function.
Raises:
ValueError: If the given log_level is invalid.
"""
if
log_level
not
in
[
logging
.
NOTSET
,
logging
.
DEBUG
,
logging
.
INFO
,
logging
.
WARNING
,
logging
.
ERROR
,
logging
.
CRITICAL
,
]:
raise
ValueError
(
f
'Invalid log level:
{
log_level
}
'
)
# Init message
message
=
module
+
' '
+
message
# Append additional content
arguments
=
list
()
for
arg
in
args
:
arguments
.
append
(
arg
)
add_info
=
','
.
join
(
arguments
)
if
len
(
add_info
)
!=
0
:
message
+=
'('
+
add_info
+
')'
else
:
pass
# self.logger.log(log_level, message, *args, **kwargs)
self
.
logger
.
log
(
log_level
,
message
)
# Example usage
if
__name__
==
"__main__"
:
lm
=
LoggingManager
()
lm
.
log
(
logging
.
INFO
,
'This is an info message.'
)
lm
.
log
(
logging
.
ERROR
,
'An error has occurred.'
)
# For dynamic log level change, you could add a method like this:
# def set_log_level(self, new_level):
# self.logger.setLevel(new_level)
# And then call it:
# lm.set_log_level(logging.DEBUG)
python/fastapi_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
fastapi
import
FastAPI
from
pydantic
import
BaseModel
import
qcloud_cos_service
import
qcloud_ocr_service
import
qcloud_asr_service
import
local_paddle_ocr_service
import
local_whisper_asr_service
import
tongyi_genai_service
# ------------------------------------------------
# Define FastAPI
# ------------------------------------------------
app
=
FastAPI
()
# ------------------------------------------------
# Call File Upload Service Process
# ------------------------------------------------
@
app
.
post
(
"/upload/"
)
async
def
upload
(
file_uplaod_req
:
qcloud_cos_service
.
file_upload_req_basemodel
):
return
qcloud_cos_service
.
upload
(
file_uplaod_req
)
# ------------------------------------------------
# Call File Download URL Service Process
# ------------------------------------------------
@
app
.
post
(
"/get_download_url/"
)
async
def
get_download_url
(
file_download_req
:
qcloud_cos_service
.
file_download_req_basemodel
):
return
qcloud_cos_service
.
get_download_url
(
file_download_req
)
# ------------------------------------------------
# Call OCR Service Process
# ------------------------------------------------
@
app
.
post
(
"/get_detected_text/"
)
async
def
get_detected_text
(
ocr_req
:
qcloud_ocr_service
.
ocr_req_basemodel
):
if
ocr_req
.
ocr_tool
==
'Q'
:
# Call Qcloud API
return
qcloud_ocr_service
.
get_detected_text
(
ocr_req
)
elif
ocr_req
.
ocr_tool
==
'LP'
:
# Call Local PaddleOCR API
return
local_paddle_ocr_service
.
get_detected_text
(
ocr_req
)
# ------------------------------------------------
# Call ASR Service Process
# ------------------------------------------------
@
app
.
post
(
"/get_detected_asr_text/"
)
async
def
get_detected_text
(
asr_req
:
local_whisper_asr_service
.
asr_req_basemodel
):
if
asr_req
.
asr_tool
==
'W'
:
# Call Local Whisper ASR API
return
local_whisper_asr_service
.
get_detected_text
(
asr_req
)
elif
asr_req
.
asr_tool
==
'Q'
:
# Call Qcloud ASR API
return
qcloud_asr_service
.
get_detected_text
(
asr_req
)
# ------------------------------------------------
# Call File Upload Service Process
# ------------------------------------------------
@
app
.
post
(
"/send_message_to_gen_ai/tongyi/"
)
async
def
send_message
(
send_message_req
:
tongyi_genai_service
.
send_message_req_basemodel
):
return
tongyi_genai_service
.
send_message
(
send_message_req
)
python/local_paddle_ocr_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
pydantic
import
BaseModel
# Import Other Common Libs
import
json
import
base64
import
os
import
requests
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success
=
0
c_ret_code_error
=
1
credential_file
=
'/home/oracle/python/apex/credential/credential.json'
# env = 'aliyun_ecs'
env
=
'ctyun_ecs'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class
ocr_req_basemodel
(
BaseModel
):
ocr_tool
:
str
# Q: Qcloud, LP: Local PaddleOCR
dir
:
str
file_name
:
str
class
ocr_res_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
detected_text
:
list
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def
get_credentials
(
file
:
str
,
env
:
str
)
->
tuple
:
try
:
# Open and read the JSON file
with
open
(
file
,
"r"
)
as
credential_file
:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
base_url
=
data
[
'paddle'
][
env
][
'base_url'
]
client_id
=
data
[
'paddle'
][
env
][
'client_id'
]
client_secret
=
data
[
'paddle'
][
env
][
'client_secret'
]
return
base_url
,
client_id
,
client_secret
except
Exception
as
e
:
print
(
e
)
return
None
,
None
,
None
# ------------------------------------------------
# Function of OCR request to Paddle OCR
# ------------------------------------------------
def
get_detected_text
(
ocr_req
:
ocr_req_basemodel
)
->
tuple
:
ret_code
=
c_ret_code_success
err_msg
=
""
# ------------------------------------------------
# Get URL and credential
# ------------------------------------------------
base_url
,
client_id
,
client_secret
=
get_credentials
(
credential_file
,
env
);
if
base_url
==
None
or
client_id
==
None
or
client_secret
==
None
:
ret_code
=
c_ret_code_error
err_msg
=
"Failed to get credential from file ["
+
credential_file
+
"] and env ["
+
env
+
"]"
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
# ------------------------------------------------
# Get Token
# ------------------------------------------------
url
=
base_url
+
"/token"
access_token
=
""
headers
=
{
"Authorization"
:
"Bearer "
+
access_token
,
"Content-type"
:
"application/x-www-form-urlencoded"
}
req_body
=
"grant_type=&username={0}&password={1}"
.
format
(
client_id
,
client_secret
)
try
:
response
=
requests
.
post
(
url
=
url
,
headers
=
headers
,
data
=
req_body
,
verify
=
False
)
response
.
raise_for_status
response_dict
=
response
.
json
()
print
(
response
.
text
)
access_token
=
response_dict
[
"access_token"
]
print
(
access_token
)
except
requests
.
exceptions
.
HTTPError
as
httpError
:
print
(
"httpError"
)
print
(
httpError
)
ret_code
=
c_ret_code_error
err_msg
=
"HTTP Error: Failed to get access token: "
+
str
(
httpError
)
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
except
Exception
as
err
:
print
(
"err"
)
print
(
err
)
ret_code
=
c_ret_code_error
err_msg
=
"Unexpected Error: Failed to get access token: "
+
str
(
err
)
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
# ------------------------------------------------
# Get Detected Text
# ------------------------------------------------
url
=
base_url
+
"/paddleocr/read-image"
# 打开文件
with
open
(
os
.
path
.
normpath
(
os
.
path
.
join
(
ocr_req
.
dir
,
ocr_req
.
file_name
)),
"rb"
)
as
image_file
:
encoded_string
=
base64
.
b64encode
(
image_file
.
read
()).
decode
(
'utf-8'
)
# print(encoded_string)
headers
=
{
"accept"
:
"application/json"
,
"Authorization"
:
"Bearer "
+
access_token
,
"Content-Type"
:
"application/json"
}
data
=
{
"image_b64_string"
:
encoded_string
}
json_data
=
json
.
dumps
(
data
)
try
:
response
=
requests
.
post
(
url
=
url
,
headers
=
headers
,
data
=
json_data
,
verify
=
False
)
response
.
raise_for_status
response_dict
=
response
.
json
()
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
response_dict
)
return
ocr_res
except
requests
.
exceptions
.
HTTPError
as
httpError
:
ret_code
=
c_ret_code_error
err_msg
=
"HTTP Error: Failed to get detected text: "
+
str
(
httpError
)
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
except
Exception
as
err
:
ret_code
=
c_ret_code_error
if
(
response
.
text
):
err_msg
=
"Unexpected Error: Failed to get detected text: "
+
str
(
err
)
+
", "
+
response
.
text
else
:
err_msg
=
"Unexpected Error: Failed to get detected text: "
+
str
(
err
)
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
python/local_whisper_asr_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
pydantic
import
BaseModel
# Import Other Common Libs
import
json
import
base64
import
os
import
requests
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success
=
0
c_ret_code_error
=
1
credential_file
=
'/home/oracle/python/apex/credential/credential.json'
# env = 'aliyun_ecs'
env
=
'ctyun_ecs'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class
asr_req_basemodel
(
BaseModel
):
asr_tool
:
str
# W: Whisper
model
:
str
audio_b64_string
:
str
initial_prompt
:
str
class
asr_res_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
detected_text
:
str
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def
get_credentials
(
file
:
str
,
env
:
str
)
->
tuple
:
try
:
# Open and read the JSON file
with
open
(
file
,
"r"
)
as
credential_file
:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
base_url
=
data
[
'whisper'
][
env
][
'base_url'
]
client_id
=
data
[
'whisper'
][
env
][
'client_id'
]
client_secret
=
data
[
'whisper'
][
env
][
'client_secret'
]
return
base_url
,
client_id
,
client_secret
except
Exception
as
e
:
print
(
e
)
return
None
,
None
,
None
# ------------------------------------------------
# Function of ASR request to Whisper ASR
# ------------------------------------------------
def
get_detected_text
(
asr_req
:
asr_req_basemodel
)
->
tuple
:
# print(asr_req)
ret_code
=
c_ret_code_success
err_msg
=
""
# ------------------------------------------------
# Get URL and credential
# ------------------------------------------------
base_url
,
client_id
,
client_secret
=
get_credentials
(
credential_file
,
env
);
if
base_url
==
None
or
client_id
==
None
or
client_secret
==
None
:
ret_code
=
c_ret_code_error
err_msg
=
"Failed to get credential from file ["
+
credential_file
+
"] and env ["
+
env
+
"]"
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
""
)
return
asr_res
# ------------------------------------------------
# Get Token
# ------------------------------------------------
url
=
base_url
+
"/token"
access_token
=
""
headers
=
{
"Authorization"
:
"Bearer "
+
access_token
,
"Content-type"
:
"application/x-www-form-urlencoded"
}
req_body
=
"grant_type=&username={0}&password={1}"
.
format
(
client_id
,
client_secret
)
try
:
response
=
requests
.
post
(
url
=
url
,
headers
=
headers
,
data
=
req_body
,
verify
=
False
)
response
.
raise_for_status
response_dict
=
response
.
json
()
# print(response.text)
access_token
=
response_dict
[
"access_token"
]
# print(access_token)
except
requests
.
exceptions
.
HTTPError
as
httpError
:
print
(
"httpError"
)
print
(
httpError
)
ret_code
=
c_ret_code_error
err_msg
=
"HTTP Error: Failed to get access token: "
+
str
(
httpError
)
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
""
)
return
asr_res
except
Exception
as
err
:
print
(
"err"
)
print
(
err
)
ret_code
=
c_ret_code_error
err_msg
=
"Unexpected Error: Failed to get access token: "
+
str
(
err
)
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
""
)
return
asr_res
# ------------------------------------------------
# Get Detected Text
# ------------------------------------------------
url
=
base_url
+
"/whisper/read-audio"
headers
=
{
"accept"
:
"application/json"
,
"Authorization"
:
"Bearer "
+
access_token
,
"Content-Type"
:
"application/json"
}
data
=
{
"model"
:
asr_req
.
model
,
"audio_b64_string"
:
asr_req
.
audio_b64_string
,
"initial_prompt"
:
asr_req
.
initial_prompt
}
json_data
=
json
.
dumps
(
data
)
try
:
response
=
requests
.
post
(
url
=
url
,
headers
=
headers
,
data
=
json_data
,
verify
=
False
)
response
.
raise_for_status
response_dict
=
response
.
json
()
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
response_dict
[
"text"
]
)
return
asr_res
except
requests
.
exceptions
.
HTTPError
as
httpError
:
ret_code
=
c_ret_code_error
err_msg
=
"HTTP Error: Failed to get detected text: "
+
str
(
httpError
)
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
""
)
return
asr_res
except
Exception
as
err
:
ret_code
=
c_ret_code_error
if
(
response
.
text
):
err_msg
=
"Unexpected Error: Failed to get detected text: "
+
str
(
err
)
+
", "
+
response
.
text
else
:
err_msg
=
"Unexpected Error: Failed to get detected text: "
+
str
(
err
)
asr_res
=
asr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
""
)
return
asr_res
python/qcloud_asr_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
pydantic
import
BaseModel
# Import Qcloud OCR Service Libs
import
sys
# sys.path.append("../qcloud")
from
qcloud.common
import
credential
from
qcloud.asr
import
flash_recognizer
# Common log part
import
os
import
logging
from
common.script.logging_manager
import
LoggingManager
curr_module
=
os
.
path
.
basename
(
__file__
)
# Initialize logging manager
lm
=
LoggingManager
.
get_instance
()
# lm = LoggingManager()
# Import Other Common Libs
import
json
import
base64
import
os
import
random
import
string
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success
=
0
c_ret_code_error
=
1
user_name
=
'asr-program'
credential_file
=
'credential/credential.json'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class
asr_req_basemodel
(
BaseModel
):
asr_tool
:
str
# W: Whisper; Q: Qcloud
model
:
str
audio_b64_string
:
str
initial_prompt
:
str
class
asr_res_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
detected_text
:
str
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def
get_credentials
(
file
:
str
,
user_name
:
str
)
->
tuple
:
try
:
# Open and read the JSON file
with
open
(
file
,
"r"
)
as
credential_file
:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
app_id
=
data
[
'asr'
][
user_name
][
'app_id'
]
secret_id
=
data
[
'asr'
][
user_name
][
'secret_id'
]
secret_key
=
data
[
'asr'
][
user_name
][
'secret_key'
]
return
app_id
,
secret_id
,
secret_key
except
Exception
as
e
:
lm
.
log
(
logging
.
INFO
,
curr_module
,
str
(
e
))
return
None
,
None
,
None
# ------------------------------------------------
# Function of ASR request to Qcloud COS
# ------------------------------------------------
def
get_detected_text
(
asr_req
:
asr_req_basemodel
)
->
tuple
:
lm
.
log
(
logging
.
INFO
,
curr_module
,
"qcloud get_detected_text start"
)
# 初始化处理结果
ret_code
=
c_ret_code_success
err_msg
=
""
# 初始化ID
app_id
=
""
secret_id
=
""
secret_key
=
""
engine_type
=
"16k_en"
# 初始化返回对象
asr_resp
=
asr_res_basemodel
(
result
=
c_ret_code_success
,
result_message
=
''
,
detected_text
=
''
)
try
:
app_id
,
secret_id
,
secret_key
=
get_credentials
(
credential_file
,
user_name
)
cred
=
credential
.
Credential
(
secret_id
,
secret_key
)
if
(
app_id
==
None
or
secret_id
==
None
or
secret_key
==
None
):
lm
.
log
(
logging
.
ERROR
,
curr_module
,
'Failed to get credential'
)
asr_resp
.
result
=
c_ret_code_error
asr_resp
.
detected_text
=
''
asr_resp
.
result_message
=
'Failed to get credential'
return
asr_resp
# 新建FlashRecognizer,一个recognizer可以执行N次识别请求
recognizer
=
flash_recognizer
.
FlashRecognizer
(
app_id
,
cred
)
# 新建识别请求
req
=
flash_recognizer
.
FlashRecognitionRequest
(
engine_type
)
req
.
set_filter_modal
(
0
)
req
.
set_filter_punc
(
0
)
req
.
set_filter_dirty
(
0
)
# 目前aac格式可用于解码MacOS Safari/Chrome的录音文件,IOS Safari待确认
# req.set_voice_format("wav")
req
.
set_voice_format
(
"aac"
)
req
.
set_word_info
(
0
)
req
.
set_convert_num_mode
(
1
)
# 将Base64字符串写到临时文件
# Decode base64 string
decoded_data
=
base64
.
b64decode
(
asr_req
.
audio_b64_string
)
# Write the decoded data to a file
characters
=
string
.
ascii_letters
+
string
.
digits
random_string
=
''
.
join
(
random
.
choice
(
characters
)
for
i
in
range
(
16
))
tmp_file
=
'/tmp/whisper_'
+
random_string
with
open
(
tmp_file
,
'wb'
)
as
file
:
file
.
write
(
decoded_data
)
with
open
(
tmp_file
,
'rb'
)
as
f
:
#读取音频数据
data
=
f
.
read
()
#执行识别
resultData
=
recognizer
.
recognize
(
req
,
data
)
resp
=
json
.
loads
(
resultData
)
request_id
=
resp
[
"request_id"
]
code
=
resp
[
"code"
]
if
code
!=
0
:
lm
.
log
(
logging
.
ERROR
,
curr_module
,
"recognize faild! request_id: "
,
request_id
,
" code: "
,
code
,
", message: "
,
resp
[
"message"
])
resp
.
result
=
c_ret_code_error
resp
.
detected_text
=
''
resp
.
result_message
=
'Recognize faild! Please find detail in log.'
return
resp
lm
.
log
(
logging
.
INFO
,
curr_module
,
"request_id: "
,
request_id
)
#一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result
for
channl_result
in
resp
[
"flash_result"
]:
lm
.
log
(
logging
.
INFO
,
curr_module
,
"channel_id: "
+
str
(
channl_result
[
"channel_id"
]))
lm
.
log
(
logging
.
INFO
,
curr_module
,
channl_result
[
"text"
])
asr_resp
.
result
=
c_ret_code_success
asr_resp
.
detected_text
=
channl_result
[
"text"
]
asr_resp
.
result_message
=
''
return
asr_resp
lm
.
log
(
logging
.
INFO
,
curr_module
,
"qcloud get_detected_text complete"
)
except
Exception
as
e
:
lm
.
log
(
logging
.
ERROR
,
curr_module
,
str
(
e
))
asr_resp
.
result
=
c_ret_code_error
asr_resp
.
detected_text
=
''
asr_resp
.
result_message
=
str
(
e
)
return
asr_resp
python/qcloud
-
cos
-
service.py
→
python/qcloud
_
cos
_
service.py
View file @
5d1e2830
# Import FastAPI Libs
from
fastapi
import
FastAPI
from
pydantic
import
BaseModel
# Import Qcloud COS Service Libs
...
...
@@ -24,6 +23,7 @@ region = 'ap-shanghai'
bucket
=
'clark-apex-saz-1320304559'
user_name
=
'oss-program'
credential_file
=
'/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------
# Get credentials
...
...
@@ -36,8 +36,8 @@ def get_credentials(file: str, bucket_name: str, user_name: str) -> tuple:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
secret_id
=
data
[
bucket_name
][
user_name
][
'secret_id'
]
secret_key
=
data
[
bucket_name
][
user_name
][
'secret_key'
]
secret_id
=
data
[
'oss'
][
bucket_name
][
user_name
][
'secret_id'
]
secret_key
=
data
[
'oss'
][
bucket_name
][
user_name
][
'secret_key'
]
return
secret_id
,
secret_key
...
...
@@ -94,7 +94,7 @@ def upload_single_file(dir: str, file_name: str, rename_prefix: str, client: Cos
# ------------------------------------------------
# Define File Upload Service Request and Response
# ------------------------------------------------
class
F
ile_
U
pload_
R
eq
(
BaseModel
):
class
f
ile_
u
pload_
r
eq
_basemodel
(
BaseModel
):
dir
:
str
file1_name
:
str
file2_name
:
str
...
...
@@ -102,7 +102,7 @@ class File_Upload_Req(BaseModel):
file4_name
:
str
file5_name
:
str
class
F
ile_
U
pload_
R
es
(
BaseModel
):
class
f
ile_
u
pload_
r
es
_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
file1_key
:
str
...
...
@@ -116,13 +116,11 @@ class File_Upload_Res(BaseModel):
file5_key
:
str
file5_etag
:
str
app
=
FastAPI
()
# ------------------------------------------------
# Define File Upload Service Process Detail
# ------------------------------------------------
@
app
.
post
(
"/upload/"
)
async
def
upload
(
file_uplaod_req
:
File_Upload_Req
):
def
upload
(
file_uplaod_req
:
file_upload_req_basemodel
):
secret_id
,
secret_key
=
get_credentials
(
credential_file
,
bucket
,
user_name
)
#secret_id = # 用户的 SecretId,建议使用子账号密钥,授权遵循最小权限指引,降低使用风险。子账号密钥获取可参见 https://cloud.tencent.com/document/product/598/37140
...
...
@@ -195,7 +193,7 @@ async def upload(file_uplaod_req: File_Upload_Req):
result
=
c_ret_code_error
error_list
.
append
(
"File 5 upload failed: "
+
err_msg
)
file_upload_res
=
F
ile_
U
pload_
R
es
(
file_upload_res
=
f
ile_
u
pload_
r
es
_basemodel
(
result
=
result
,
result_message
=
', '
.
join
(
error_list
),
file1_key
=
file1_key
,
...
...
@@ -216,10 +214,10 @@ async def upload(file_uplaod_req: File_Upload_Req):
# ------------------------------------------------
# Define File Download URL Service Request and Response
# ------------------------------------------------
class
F
ile_
D
ownload_
R
eq
(
BaseModel
):
class
f
ile_
d
ownload_
r
eq
_basemodel
(
BaseModel
):
key
:
str
class
F
ile_
D
ownload_
R
es
(
BaseModel
):
class
f
ile_
d
ownload_
r
es
_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
download_url
:
str
...
...
@@ -228,8 +226,7 @@ class File_Download_Res(BaseModel):
# ------------------------------------------------
# Define File Download URL Service Process Detail
# ------------------------------------------------
@
app
.
post
(
"/get_download_url/"
)
async
def
get_download_url
(
file_download_req
:
File_Download_Req
):
def
get_download_url
(
file_download_req
:
file_download_req_basemodel
):
# Define constants
secret_id
,
secret_key
=
get_credentials
(
credential_file
,
bucket
,
user_name
)
...
...
@@ -273,7 +270,7 @@ async def get_download_url(file_download_req: File_Download_Req):
ret_code
=
c_ret_code_error
err_msg
=
str
(
e
)
file_download_res
=
F
ile_
D
ownload_
R
es
(
file_download_res
=
f
ile_
d
ownload_
r
es
_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
download_url
=
url
...
...
python/qcloud_ocr_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
pydantic
import
BaseModel
# Import Qcloud OCR Service Libs
from
tencentcloud.common
import
credential
from
tencentcloud.common.profile.client_profile
import
ClientProfile
from
tencentcloud.common.profile.http_profile
import
HttpProfile
from
tencentcloud.common.exception.tencent_cloud_sdk_exception
import
TencentCloudSDKException
from
tencentcloud.ocr.v20181119
import
ocr_client
,
models
# Import Other Common Libs
import
json
import
base64
import
os
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success
=
0
c_ret_code_error
=
1
region
=
'ap-shanghai'
user_name
=
'ocr-program'
credential_file
=
'/home/oracle/python/apex/credential/credential.json'
# credential_file = 'C://PyCharmProjects//qcloud-service//config//credential.json'
# ------------------------------------------------
# Define OCR Request Service Request and Response
# ------------------------------------------------
class
ocr_req_basemodel
(
BaseModel
):
ocr_tool
:
str
# Q: Qcloud, LP: Local PaddleOCR
dir
:
str
file_name
:
str
class
ocr_res_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
detected_text
:
list
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def
get_credentials
(
file
:
str
,
user_name
:
str
)
->
tuple
:
try
:
# Open and read the JSON file
with
open
(
file
,
"r"
)
as
credential_file
:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
secret_id
=
data
[
'ocr'
][
user_name
][
'secret_id'
]
secret_key
=
data
[
'ocr'
][
user_name
][
'secret_key'
]
return
secret_id
,
secret_key
except
Exception
as
e
:
print
(
e
)
return
None
,
None
# ------------------------------------------------
# Function of OCR request to Qcloud COS
# ------------------------------------------------
def
get_detected_text
(
ocr_req
:
ocr_req_basemodel
)
->
tuple
:
ret_code
=
c_ret_code_success
err_msg
=
""
try
:
secret_id
,
secret_key
=
get_credentials
(
credential_file
,
user_name
)
# 实例化一个认证对象,入参需要传入腾讯云账户 SecretId 和 SecretKey,此处还需注意密钥对的保密
# 代码泄露可能会导致 SecretId 和 SecretKey 泄露,并威胁账号下所有资源的安全性。以下代码示例仅供参考,建议采用更安全的方式来使用密钥,请参见:https://cloud.tencent.com/document/product/1278/85305
# 密钥可前往官网控制台 https://console.cloud.tencent.com/cam/capi 进行获取
cred
=
credential
.
Credential
(
secret_id
,
secret_key
)
# 实例化一个http选项,可选的,没有特殊需求可以跳过
httpProfile
=
HttpProfile
()
httpProfile
.
endpoint
=
"ocr.tencentcloudapi.com"
# 实例化一个client选项,可选的,没有特殊需求可以跳过
clientProfile
=
ClientProfile
()
clientProfile
.
httpProfile
=
httpProfile
# 实例化要请求产品的client对象,clientProfile是可选的
client
=
ocr_client
.
OcrClient
(
cred
,
region
,
clientProfile
)
# 打开文件
file_location
=
os
.
path
.
normpath
(
os
.
path
.
join
(
ocr_req
.
dir
,
ocr_req
.
file_name
))
with
open
(
file_location
,
"rb"
)
as
image_file
:
encoded_string
=
base64
.
b64encode
(
image_file
.
read
()).
decode
(
'utf-8'
)
# print(encoded_string)
# Remove file
os
.
remove
(
file_location
)
# 实例化一个请求对象,每个接口都会对应一个request对象
req
=
models
.
GeneralHandwritingOCRRequest
()
params
=
{
"ImageBase64"
:
encoded_string
}
req
.
from_json_string
(
json
.
dumps
(
params
))
# 返回的resp是一个GeneralHandwritingOCRResponse的实例,与请求对象对应
resp
=
client
.
GeneralHandwritingOCR
(
req
)
# 输出json格式的字符串回包
# print(resp.to_json_string())
list_detected_text
=
[]
for
text_detection
in
(
json
.
loads
(
resp
.
to_json_string
()))[
'TextDetections'
]:
# print(text_detection['DetectedText'])
list_detected_text
.
append
(
text_detection
[
'DetectedText'
])
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
list_detected_text
)
return
ocr_res
except
TencentCloudSDKException
as
err
:
ret_code
=
c_ret_code_error
err_msg
=
str
(
err
)
ocr_res
=
ocr_res_basemodel
(
result
=
ret_code
,
result_message
=
err_msg
,
detected_text
=
[]
)
return
ocr_res
python/tongyi_genai_service.py
0 → 100644
View file @
5d1e2830
# Import FastAPI Libs
from
pydantic
import
BaseModel
# Import Dashscope Service Libs
from
http
import
HTTPStatus
from
dashscope
import
Generation
from
dashscope.api_entities.dashscope_response
import
Role
import
dashscope
import
json
import
os
# Import your custom logging manager
import
logging
from
common.script.logging_manager
import
LoggingManager
# Initialize logging manager
curr_module
=
os
.
path
.
basename
(
__file__
)
lm
=
LoggingManager
.
get_instance
()
# ------------------------------------------------
# Define constants
# ------------------------------------------------
c_ret_code_success
=
0
c_ret_code_error
=
1
gen_ai_id
=
'aliyun'
credential_file
=
'/home/oracle/python/apex/credential/credential.json'
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def
get_credentials
(
file
:
str
,
gen_ai_id
:
str
)
->
str
:
try
:
# Open and read the JSON file
with
open
(
file
,
'r'
)
as
credential_file
:
data
=
json
.
load
(
credential_file
)
# Access data from the JSON
api_key
=
data
[
'gen-ai'
][
gen_ai_id
][
'api_key'
]
return
api_key
except
Exception
as
e
:
lm
.
log
(
logging
.
ERROR
,
"Exception occurred in get_credentials: %s"
,
str
(
e
),
exc_info
=
True
)
return
None
# ------------------------------------------------
# Define GenAI Message Service Request and Response
# ------------------------------------------------
class
Massage
(
BaseModel
):
role
:
str
text
:
str
class
send_message_req_basemodel
(
BaseModel
):
model
:
str
messages
:
list
[
Massage
]
class
send_message_res_basemodel
(
BaseModel
):
result
:
int
result_message
:
str
message_res
:
str
# ------------------------------------------------
# Function to call dashscope SDK
# ------------------------------------------------
def
send_message
(
messages_req
:
send_message_req_basemodel
)
->
send_message_res_basemodel
:
result
=
c_ret_code_success
result_message
=
""
try
:
# Get API Key
api_key
=
get_credentials
(
credential_file
,
gen_ai_id
)
if
(
api_key
==
None
):
result
=
c_ret_code_error
# Set return tuple for normal result
send_message_res
=
send_message_res_basemodel
(
result
=
result
,
result_message
=
'Failed to get API key'
,
message_res
=
''
)
return
send_message_res
else
:
dashscope
.
api_key
=
api_key
# Set system prompt
messages
=
[{
'role'
:
Role
.
SYSTEM
,
'content'
:
'You are a helpful assistant.'
}]
# Loop input message list
for
message
in
messages_req
.
messages
:
# Set user and assistant prompt
messages
.
append
({
'role'
:
message
.
role
,
'content'
:
message
.
text
})
lm
.
log
(
logging
.
INFO
,
curr_module
,
'message: '
,
str
(
messages
))
# Get Model
# model = Models()
match
messages_req
.
model
:
case
'qwen-max'
:
model
=
Generation
.
Models
.
qwen_max
case
'qwen-turbo'
:
model
=
Generation
.
Models
.
qwen_turbo
case
'qwen-plus'
:
model
=
Generation
.
Models
.
qwen_plus
# Use SDK to get answer
response
=
Generation
.
call
(
model
,
# Generation.Models.qwen_turbo,
messages
=
messages
,
result_format
=
'message'
,
# set the result to be 'message' format.
)
if
response
.
status_code
==
HTTPStatus
.
OK
:
lm
.
log
(
logging
.
INFO
,
curr_module
,
'Normal response'
,
str
(
response
))
send_message_res
=
send_message_res_basemodel
(
result
=
result
,
result_message
=
''
,
message_res
=
response
[
'output'
][
'choices'
][
0
][
'message'
][
'content'
]
)
return
send_message_res
else
:
lm
.
log
(
logging
.
INFO
,
curr_module
,
'Error response'
,
str
(
response
))
send_message_res
=
send_message_res_basemodel
(
result
=
result
,
result_message
=
''
,
message_res
=
response
.
message
)
return
send_message_res
except
Exception
as
e
:
lm
.
log
(
logging
.
ERROR
,
"Exception occurred: %s"
,
str
(
e
),
exc_info
=
True
)
result
=
c_ret_code_error
# Set return tuple for abnormal result
send_message_res
=
send_message_res_basemodel
(
result
=
result
,
result_message
=
str
(
e
),
message_res
=
''
)
return
send_message_res
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment