# Import FastAPI Libs from pydantic import BaseModel import fastapi_security_util from jose import JWTError, jwt from fastapi import HTTPException, status # Import Dashscope Service Libs from http import HTTPStatus from dashscope import Generation from dashscope.api_entities.dashscope_response import Role import dashscope import json import os import asyncio import uuid # Import your custom logging manager import logging from common.script.logging_manager import LoggingManager # Initialize logging manager curr_module = os.path.basename(__file__) lm = LoggingManager.get_instance() # ------------------------------------------------ # Define constants # ------------------------------------------------ c_ret_code_success = 0 c_ret_code_error = 1 gen_ai_id = 'aliyun' credential_file = '/home/oracle/python/apex/credential/credential.json' # ------------------------------------------------ # Get credentials # ------------------------------------------------ def get_credentials(file: str, gen_ai_id: str) -> str: try: # Open and read the JSON file with open(file, 'r') as credential_file: data = json.load(credential_file) # Access data from the JSON api_key = data['gen-ai'][gen_ai_id]['api_key'] return api_key except Exception as e: lm.log(logging.ERROR, "Exception occurred in get_credentials: %s", str(e), exc_info=True) return None # ------------------------------------------------ # Sub Function - Verify Access Token # ------------------------------------------------ def verify_token(token: str): secret_key, client_db = fastapi_security_util.get_credentials(fastapi_security_util.credential_file) try: payload = jwt.decode(token, secret_key, algorithms=[fastapi_security_util.algorithm]) lm.log(logging.ERROR, curr_module, 'payload: ', str(payload)) username: str = payload.get("sub") if username is None: return False return True except JWTError: lm.log(logging.ERROR, curr_module, 'JWTError: ', str(JWTError)) return False # ------------------------------------------------ # Define GenAI Message Service Request and Response # ------------------------------------------------ class Massage(BaseModel): role: str text: str class send_message_req_basemodel(BaseModel): model: str messages: list[Massage] class send_message_res_basemodel(BaseModel): result: int result_message: str message_res: str uuid: str # ------------------------------------------------ # Function to call dashscope SDK # ------------------------------------------------ def send_message(messages_req: send_message_req_basemodel) -> send_message_res_basemodel: result = c_ret_code_success result_message = "" try: # Get API Key api_key = get_credentials(credential_file, gen_ai_id) if (api_key == None): result = c_ret_code_error # Set return tuple for normal result send_message_res = send_message_res_basemodel( result = result, result_message = 'Failed to get API key', message_res = '' ) return send_message_res else: dashscope.api_key = api_key # Set system prompt messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant. If you are not sure the answer, please don''t reply with wrong answer.'}] # Loop input message list for message in messages_req.messages: # Set user and assistant prompt messages.append({'role': message.role, 'content': message.text}) lm.log(logging.INFO, curr_module, 'message: ', str(messages)) # Get Model # model = Models() match messages_req.model: case 'qwen-max': model = Generation.Models.qwen_max case 'qwen-turbo': model = Generation.Models.qwen_turbo case 'qwen-plus': model = Generation.Models.qwen_plus # Use SDK to get answer responses = Generation.call( model, # Generation.Models.qwen_turbo, messages=messages, result_format='message', # set the result to be 'message' format. # Use stream style stream=True, incremental_output=True ) for debug_response in responses: lm.log(logging.INFO, curr_module, 'Normal response', str(debug_response.output)) response = debug_response # if response.status_code == HTTPStatus.OK: # print(response.output.choices[0]['message']['content'],end='') # else: # print('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( # response.request_id, response.status_code, # response.code, response.message # )) if response.status_code == HTTPStatus.OK: lm.log(logging.INFO, curr_module, 'Normal response', str(response)) send_message_res = send_message_res_basemodel( result = result, result_message = '', message_res = response['output']['choices'][0]['message']['content'] ) return send_message_res else: lm.log(logging.INFO, curr_module, 'Error response', str(response)) send_message_res = send_message_res_basemodel( result = result, result_message = '', message_res = response.message ) return send_message_res except Exception as e: lm.log(logging.ERROR, "Exception occurred: %s", str(e), exc_info=True) result = c_ret_code_error # Set return tuple for abnormal result send_message_res = send_message_res_basemodel( result = result, result_message = str(e), message_res = '' ) return send_message_res # ------------------------------------------------ # Function to store question (async) # ------------------------------------------------ def send_question(token: str, send_message_req: send_message_req_basemodel): if not verify_token(token = token): raise HTTPException( status_code = status.HTTP_401_UNAUTHORIZED, detail = "Authentication Failed", headers={"WWW-Authenticate": "Bearer"}, ) result = c_ret_code_success result_message = "" # Store questions and uuid at somewhere # Write to file named by UUID uuid_value = str(uuid.uuid4()) uuid_file = os.path.join('/tmp/genai-stage', uuid_value) with open(uuid_file, 'w') as file: file.write(json.dumps(send_message_req.dict())) send_message_res = send_message_res_basemodel( result = result, result_message = result_message, message_res = '', uuid = uuid_value ) return send_message_res # ------------------------------------------------ # Function to call dashscope SDK (async) # ------------------------------------------------ async def get_answer(uuid: str): result = c_ret_code_success result_message = "" # Try to read uuid file uuid_file = os.path.join('/tmp/genai-stage', uuid) with open(uuid_file, 'r') as file: content = file.read() lm.log(logging.INFO, curr_module, 'content: ', content) file.close() # Init basemodel messages = [] messages_req = send_message_req_basemodel( model = "", messages = messages ) # Try to deserialize try: data = json.loads(content) messages_req = send_message_req_basemodel(**data) lm.log(logging.INFO, curr_module, 'Deserilized: ', str(messages_req)) except json.JSONDecodeError as e: lm.log(logging.INFO, curr_module, 'Failed to decode JSON: ', str(e)) except Exception as e: lm.log(logging.INFO, curr_module, 'Failed to create model instance: ', str(e)) try: # Get API Key api_key = get_credentials(credential_file, gen_ai_id) if (api_key == None): result = c_ret_code_error # Set return tuple for normal result send_message_res = send_message_res_basemodel( result = result, result_message = 'Failed to get API key', message_res = '' ) # return send_message_res yield send_message_res return else: dashscope.api_key = api_key # Set system prompt messages = [{'role': Role.SYSTEM, 'content': 'You are a helpful assistant. If you are not sure the answer, please don''t reply with wrong answer.'}] # Loop input message list for message in messages_req.messages: # Set user and assistant prompt messages.append({'role': message.role, 'content': message.text}) lm.log(logging.INFO, curr_module, 'message: ', str(messages)) # Get Model # model = Models() match messages_req.model: case 'qwen-max': model = Generation.Models.qwen_max case 'qwen-turbo': model = Generation.Models.qwen_turbo case 'qwen-plus': model = Generation.Models.qwen_plus # Use SDK to get answer responses = Generation.call( model, # Generation.Models.qwen_turbo, messages=messages, result_format='message', # set the result to be 'message' format. # Use stream style stream=True, incremental_output=True ) for debug_response in responses: lm.log(logging.INFO, curr_module, 'Normal response', str(debug_response.output)) # yield f"data: {debug_response.output.choices[0]['message']['content']}\n\n" raw_data = debug_response.output.choices[0]['message']['content'] # 针对Markdown格式进行优化 raw_data = raw_data.replace('\n\n', '\n') raw_data = raw_data.replace('\n', '\n\n') if raw_data.startswith('\n'): raw_data = raw_data[1:] lines = raw_data.splitlines() for line in lines: yield f"data: {line}\n\n" # response = debug_response await asyncio.sleep(0.1) # if response.status_code == HTTPStatus.OK: # print(response.output.choices[0]['message']['content'],end='') # else: # print('Request id: %s, Status code: %s, error code: %s, error message: %s' % ( # response.request_id, response.status_code, # response.code, response.message # )) # yield 'data: null\n\n' yield 'event: end\ndata: The stream is about to end\n\n' os.remove(uuid_file) except Exception as e: result = c_ret_code_error result_message = str(e) lm.log(logging.ERROR, 'Exception', result_message) yield f"data: Exception - {result_message}\n\n"