Commit 9090bdcf authored by Administrator's avatar Administrator
Browse files

initial version including oauth2 authentication REST API and simple paddleocr read image REST API

parents
Install Step
python3 -m venv venv
source venv/bin/activate
pip3 install --upgrade pip
pip3 install fastapi
pip3 install python-multipart
pip3 install "python-jose[cryptography]"
pip3 install "passlib[bcrypt]"
For GPU: pip3 install paddlepaddle-gpu -i https://mirror.baidu.com/pypi/simple
For CPU: pip3 install paddlepaddle -i https://mirror.baidu.com/pypi/simple
pip3 install "paddleocr>=2.0.1"
pip3 install uvicorn
pip3 install gunicorn
Running Step
source venv/bin/activate
nohup gunicorn fastapi_service:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind localhost:8000 &
{
"logging": {
"log_level": "INFO",
"log_dir": "./logs",
"filename_template": "%Y%m%d.log"
}
}
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: logging_manager.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-03-18 Clark Lin Initial version
#
# Main features summary:
# - Feature 1
# - Feature 2
# - Feature 3
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
import json
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import datetime
class LoggingManager:
_instance = None
@staticmethod
def get_instance(config_rel_path='../config/logging.json'):
"""Get the singleton instance of LoggingManager."""
if LoggingManager._instance is None:
LoggingManager._instance = LoggingManager(config_rel_path)
return LoggingManager._instance
def __init__(self, config_rel_path='../config/logging.json'):
"""
Initialize the LoggingManager with the relative path to the configuration file.
By default, the configuration file is assumed to be located at ../config/logging.json.
Args:
config_rel_path (str, optional): Relative path to the JSON configuration file.
Defaults to '../config/logging.json'.
"""
if self._instance is not None:
raise RuntimeError("LoggingManager is a singleton class. Use get_instance() instead of directly instantiating.")
script_dir = os.path.dirname(os.path.abspath(__file__))
config_absolute_path = os.path.join(script_dir, config_rel_path)
self.config = self.load_config(config_absolute_path)
self.logger = self.setup_logger()
def load_config(self, config_path):
"""
Load logging configuration from the provided JSON file.
Args:
config_path (str): Absolute path to the JSON configuration file.
Returns:
dict: The loaded logging configuration.
"""
with open(config_path, 'r') as f:
config_data = json.load(f)
return config_data.get('logging', {})
def setup_logger(self):
"""
Setup the logger with the configuration settings.
Returns:
logging.Logger: A configured logger instance.
"""
log_level = self.config.get('log_level', logging.INFO)
log_dir = self.config.get('log_dir', './logs')
os.makedirs(log_dir, exist_ok=True)
# Use today's date to create a new file name each day
today_date = datetime.datetime.today().strftime('%Y%m%d')
filename = f"{today_date}.log"
# Construct the full path for the log file
log_file_path = os.path.join(log_dir, filename)
logger = logging.getLogger(__name__)
logger.setLevel(log_level)
# Create a file handler specifically for today's log file
file_handler = logging.FileHandler(log_file_path)
file_handler.setLevel(log_level)
# Create and set a log formatter
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
def log(self, log_level, module, message, *args, **kwargs):
"""
Log a message at the specified log level.
Args:
log_level (int): One of the logging levels (e.g., logging.INFO).
message (str): The log message to be recorded.
*args, **kwargs: Additional arguments passed to the underlying logging function.
Raises:
ValueError: If the given log_level is invalid.
"""
if log_level not in [
logging.NOTSET,
logging.DEBUG,
logging.INFO,
logging.WARNING,
logging.ERROR,
logging.CRITICAL,
]:
raise ValueError(f'Invalid log level: {log_level}')
# Init message
message = module + ' ' + message
# Append additional content
arguments = list()
for arg in args:
arguments.append(arg)
add_info = ','.join(arguments)
if len(add_info) != 0:
message += '(' + add_info + ')'
else:
pass
# self.logger.log(log_level, message, *args, **kwargs)
self.logger.log(log_level, message)
# Example usage
if __name__ == "__main__":
lm = LoggingManager()
lm.log(logging.INFO, 'This is an info message.')
lm.log(logging.ERROR, 'An error has occurred.')
# For dynamic log level change, you could add a method like this:
# def set_log_level(self, new_level):
# self.logger.setLevel(new_level)
# And then call it:
# lm.set_log_level(logging.DEBUG)
{
"oauth2_secret_key": "<generate random value by openssl rand -hex 32>",
"client_db": {
"client_id: generate random value by openssl rand -hex 32": {
"hashed_client_secret": "<generate by CryptContext(schemes=["bcrypt"], deprecated="auto").hash(password)>",
"name": "fastapi_service_test",
"description": "This client is for fastapi service test"
}
}
}
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: fastapi_security_util.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-04-24 Clark Lin Initial version
#
# Main features summary:
# - Implementation of OAuth2 Authentication
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
from datetime import datetime, timedelta, timezone
from typing import Annotated
from pydantic import BaseModel
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
# Import your custom logging manager
import os
import logging
from common.script.logging_manager import LoggingManager
import json
# Initialize logging manager
curr_module = os.path.basename(__file__)
# lm = LoggingManager()
lm = LoggingManager.get_instance()
# ------------------------------------------------
# Init Global Variables
# ------------------------------------------------
# Credential file
credential_file = '/home/ecs-user/paddleocr/credential/oauth2.json'
# to get a string like this run:
# openssl rand -hex 32
secret_key = ""
algorithm = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Init crypt context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Init Oauth2 schema, specify token endpoint
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ------------------------------------------------
# Sample Repository for Client Verification
# ------------------------------------------------
client_db = {}
# ------------------------------------------------
# Get credentials
# ------------------------------------------------
def get_credentials(file: str) -> tuple:
global secret_key
global client_db
try:
# Open and read the JSON file
with open(file, "r") as credential_file:
data = json.load(credential_file)
# Access data from the JSON
secret_key = data['oauth2_secret_key']
client_db = data['client_db']
return secret_key, client_db
except Exception as e:
print(e)
return None, None
# ------------------------------------------------
# Model Definition
# ------------------------------------------------
class Token(BaseModel):
access_token: str
token_type: str
# ------------------------------------------------
# Sub Function - Verify Password
# ------------------------------------------------
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# ------------------------------------------------
# Sub Function - Use Cript Context to Get Hash
# ------------------------------------------------
def get_password_hash(password):
return pwd_context.hash(password)
# ------------------------------------------------
# Sub Function - Get Client from Dict by Client ID
# ------------------------------------------------
def get_client(db: dict, client_id: str):
if client_id in db:
client_dict = db[client_id]
return client_dict
# ------------------------------------------------
# Sub Function - Authentication Process
# ------------------------------------------------
def authenticate_client(client_db: dict, client_id: str, client_secret: str):
client = get_client(client_db, client_id)
if not client:
return False
if not verify_password(client_secret, client["hashed_client_secret"]):
return False
return client
# ------------------------------------------------
# Sub Function - Generate Access Token
# ------------------------------------------------
def create_access_token(data: dict, expires_delta: timedelta | None = None):
global secret_key
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
lm.log(logging.INFO, curr_module, "algorithm: " + algorithm)
lm.log(logging.INFO, curr_module, "secret_key: " + secret_key)
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=algorithm)
return encoded_jwt
# ------------------------------------------------
# Sub Function - Generate Access Token
# ------------------------------------------------
def get_access_token(form_data: OAuth2PasswordRequestForm) -> Token:
global secret_key
global client_db
# Get credentials
secret_key, client_db = get_credentials(credential_file)
if secret_key == "" or client_db == {}:
lm.log(logging.INFO, curr_module, "Failed to get credentials from file [" + credential_file + "]")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get credentials from file [" + credential_file + "]",
)
# Call authentication process
client = authenticate_client(client_db, form_data.username, form_data.password)
if not client:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data = {
"sub": form_data.username,
"name": client["name"],
"description": client["description"]
},
expires_delta = access_token_expires
)
lm.log(logging.INFO, curr_module, 'get_access_token complated with normal')
return Token(access_token=access_token, token_type="bearer")
\ No newline at end of file
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: fastapi_service.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-04-24 Clark Lin Initial version
#
# Main features summary:
# - REST API for OAuth2 Authentication
# - REST API for PaddleOCR
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
# Import FastAPI Libs
from fastapi import FastAPI
from fastapi import Depends
from pydantic import BaseModel
import fastapi_security_util
from fastapi_security_util import Token
import paddleocr_service
from paddleocr_service import RawImage
from typing import Annotated
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# ------------------------------------------------
# Init Global Variables
# ------------------------------------------------
# Init Oauth2 schema, specify token endpoint
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# ------------------------------------------------
# Define FastAPI
# ------------------------------------------------
app = FastAPI()
# ------------------------------------------------
# Call Token Service
# ------------------------------------------------
@app.post("/token")
def get_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
return fastapi_security_util.get_access_token(form_data)
# ------------------------------------------------
# Call OCR Service
# ------------------------------------------------
@app.post("/paddleocr/read-image")
def read_image(token: Annotated[str, Depends(oauth2_scheme)], image:RawImage):
return paddleocr_service.read_image(token, image)
\ No newline at end of file
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------
# File Name: paddleocr_service.py
# Original Author: Clark Lin
# Email: clark_lin@outlook.com
#
# Change History
# Version Date By Description
# 0.01 2024-04-24 Clark Lin Initial version
#
# Main features summary:
# - Implementation of PaddleOCR Invocation
#
# Copyright Information:
# Copyright © 2024 Oasis
# Licensed TBD
# ------------------------------------------------------------------------------
from paddleocr import PaddleOCR
from io import BytesIO
from PIL import Image
import numpy as np
import base64
import fastapi_security_util
from pydantic import BaseModel
from jose import JWTError, jwt
from fastapi import HTTPException, status
# Import your custom logging manager
import os
import logging
from common.script.logging_manager import LoggingManager
# Initialize logging manager
curr_module = os.path.basename(__file__)
# lm = LoggingManager()
lm = LoggingManager.get_instance()
# ------------------------------------------------
# Model Definition
# ------------------------------------------------
class RawImage(BaseModel):
image_b64_string: str
# ------------------------------------------------
# Sub Function - Verify Access Token
# ------------------------------------------------
def verify_token(token: str):
secret_key, client_db = fastapi_security_util.get_credentials(fastapi_security_util.credential_file)
try:
payload = jwt.decode(token, secret_key, algorithms=[fastapi_security_util.algorithm])
username: str = payload.get("sub")
if username is None:
return False
return True
except JWTError:
lm.log(logging.ERROR, curr_module, 'JWTError: ', str(JWTError))
return False
# ------------------------------------------------
# Sub Function - Read Image
# ------------------------------------------------
def read_image(token: str, image: RawImage):
if not verify_token(token = token):
raise HTTPException(
status_code = status.HTTP_401_UNAUTHORIZED,
detail = "Authentication Failed",
headers={"WWW-Authenticate": "Bearer"},
)
l_text = []
try:
ocr = PaddleOCR(use_angle_cls=True, lang="ch")
# Convert the base64 string to a byte array
image_data = base64.b64decode(image.image_b64_string)
# 将解码后的二进制数据转换为 PIL Image 对象
image_buffer = BytesIO(image_data)
pil_image = Image.open(image_buffer)
# 将 PIL Image 对象转换为 numpy 数组
image_array = np.array(pil_image)
# 初始化 PaddleOCR 并加载图像数组
result = ocr.ocr(image_array, det=True, rec=True)
for idx in range(len(result)):
res = result[idx]
for line in res:
l_text.append(line[1][0])
lm.log(logging.INFO, curr_module, 'read_image completed with normal')
except Exception as e:
lm.log(logging.ERROR, curr_module, 'Exception: ', str(e))
return l_text
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment