Browse Source

Improvements in authentication and structure

pull/401/head
Bernat Roig 2 weeks ago
committed by GitHub
parent
commit
92d2823a53
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 660
      rest_api_odoo/controllers/rest_api_odoo.py

660
rest_api_odoo/controllers/rest_api_odoo.py

@ -5,6 +5,7 @@
#
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Ayana KP (odoo@cybrosys.com)
# Modified by: Broigm - Improvements in authentication and structure
#
# You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
@ -22,347 +23,394 @@
import json
import logging
import base64
from datetime import datetime, date, timedelta
from odoo import http
from odoo.http import request
from datetime import datetime, date
from werkzeug.exceptions import BadRequest, Unauthorized, NotFound, MethodNotAllowed
_logger = logging.getLogger(__name__)
class RestApi(http.Controller):
"""This is a controller which is used to generate responses based on the
api requests"""
def auth_api_key(self, api_key):
"""This function is used to authenticate the api-key when sending a
request"""
user_id = request.env['res.users'].sudo().search([('api_key', '=', api_key)])
if api_key is not None and user_id:
response = True
elif not user_id:
response = ('<html><body><h2>Invalid <i>API Key</i> '
'!</h2></body></html>')
else:
response = ("<html><body><h2>No <i>API Key</i> Provided "
"!</h2></body></html>")
"""Controlador API REST mejorado con autenticación basada en API Key solamente"""
def _json_response(self, data, status=200):
"""Genera respuesta JSON estandarizada"""
response = request.make_response(
json.dumps(data, ensure_ascii=False, indent=2),
headers=[('Content-Type', 'application/json')]
)
response.status_code = status
return response
def generate_response(self, method, model, rec_id, request_params=None):
"""This function is used to generate the response based on the type
of request and the parameters given"""
option = request.env['connection.api'].search(
[('model_id', '=', model)], limit=1)
model_name = option.model_id.model
def _error_response(self, message, status=400, error_code=None):
"""Genera respuesta de error estandarizada"""
error_data = {
'error': True,
'message': message,
'status_code': status
}
if error_code:
error_data['error_code'] = error_code
# Handle data based on method
if method == 'GET':
# For GET requests, check both JSON body and query parameters
data = request_params or {}
return self._json_response(error_data, status)
def _authenticate_api_key(self, api_key):
"""
Autentica usando solo la API key y configura la sesión del usuario
Returns: (success: bool, user_id: int or None, error_message: str or None)
"""
if not api_key:
return False, None, "API Key no proporcionada"
try:
user = request.env['res.users'].sudo().search([
('api_key', '=', api_key),
('active', '=', True)
], limit=1)
if not user:
return False, None, "API Key inválida o usuario inactivo"
# Verificar si la API key no ha expirado (si implementas expiración)
if hasattr(user, 'api_key_expiry') and user.api_key_expiry:
if user.api_key_expiry < datetime.now():
return False, None, "API Key expirada"
# Configurar la sesión con el usuario autenticado
request.session.uid = user.id
request.env.user = user
return True, user.id, None
except Exception as e:
_logger.error(f"Error en autenticación API: {str(e)}")
return False, None, "Error interno de autenticación"
def _serialize_record_values(self, records):
"""Serializa los valores de los registros para JSON"""
if not records:
return []
serialized_records = []
for record in records:
serialized_record = {}
for key, value in record.items():
if isinstance(value, (datetime, date)):
serialized_record[key] = value.isoformat()
elif isinstance(value, bytes):
serialized_record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
try:
serialized_record[key] = list(value) if value else []
except:
serialized_record[key] = str(value)
else:
serialized_record[key] = value
serialized_records.append(serialized_record)
return serialized_records
def _get_model_config(self, model_name):
"""Obtiene la configuración de la API para un modelo"""
model_obj = request.env['ir.model'].sudo().search([('model', '=', model_name)], limit=1)
if not model_obj:
return None, "Modelo no encontrado"
api_config = request.env['connection.api'].sudo().search([
('model_id', '=', model_obj.id)
], limit=1)
if not api_config:
return None, "Modelo no configurado para API REST"
return api_config, None
def _parse_request_data(self, method):
"""Parsea los datos de la request según el método HTTP"""
data = {}
fields = []
# First, try to get fields from JSON body
if method == 'GET':
# Para GET, intentar obtener campos del query string o JSON body
query_params = dict(request.httprequest.args)
try:
if request.httprequest.data:
json_data = json.loads(request.httprequest.data)
data.update(json_data)
if 'fields' in json_data:
fields = json_data['fields']
except json.JSONDecodeError:
pass # If JSON is invalid, continue with query params
# If no fields from JSON, try query parameters
if not fields:
fields_param = data.get('fields', '')
if fields_param:
fields = [field.strip() for field in fields_param.split(',')]
# If still no fields, use defaults based on record type
if not fields:
if rec_id != 0:
# For specific record, get all fields
fields = None # This will get all available fields
else:
# For all records, use minimal fields
fields = ['id', 'display_name']
elif method != 'DELETE':
# For POST/PUT requests, parse JSON from body
pass
if not fields and 'fields' in query_params:
fields = [field.strip() for field in query_params['fields'].split(',')]
elif method in ['POST', 'PUT']:
try:
if request.httprequest.data:
data = json.loads(request.httprequest.data)
if 'fields' in data:
fields = data['fields']
else:
data = {}
return None, None, "No se proporcionaron datos JSON"
except json.JSONDecodeError:
return ("<html><body><h2>Invalid JSON Data"
"</h2></body></html>")
else:
# DELETE method
data = {}
fields = []
return None, None, "JSON inválido"
# Extract fields for POST/PUT methods
if method in ['POST', 'PUT'] and data:
fields = []
if 'fields' in data:
for field in data['fields']:
fields.append(field)
if not fields and method != 'DELETE' and method != 'GET':
return ("<html><body><h2>No fields selected for the model"
"</h2></body></html>")
if not option:
return ("<html><body><h2>No Record Created for the model"
"</h2></body></html>")
try:
if method == 'GET':
if not option.is_get:
return ("<html><body><h2>Method Not Allowed"
"</h2></body></html>")
else:
datas = []
if rec_id != 0:
# For specific record
search_fields = fields if fields is not None else []
partner_records = request.env[
str(model_name)].search_read(
domain=[('id', '=', rec_id)],
fields=search_fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
return data, fields, None
@http.route(['/api/v1/auth'], type='http', auth='none', methods=['POST'], csrf=False)
def authenticate(self, **kw):
"""Endpoint de autenticación que genera API key"""
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
data = json.loads(request.httprequest.data or '{}')
except json.JSONDecodeError:
return self._error_response("JSON inválido", 400)
username = data.get('username') or request.httprequest.headers.get('username')
password = data.get('password') or request.httprequest.headers.get('password')
database = data.get('database') or request.httprequest.headers.get('database', request.env.cr.dbname)
if not all([username, password]):
return self._error_response("Username y password son requeridos", 400)
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
data = json.dumps({
'records': partner_records
})
datas.append(data)
return request.make_response(data=datas)
else:
# For all records
search_fields = fields if fields is not None else ['id', 'display_name']
partner_records = request.env[
str(model_name)].search_read(
domain=[],
fields=search_fields
# Actualizar sesión con la base de datos
request.session.update(http.get_default_session(), db=database)
# Autenticar credenciales
auth_result = request.session.authenticate(
database,
{'login': username, 'password': password, 'type': 'password'}
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
data = json.dumps({
'records': partner_records
})
datas.append(data)
return request.make_response(data=datas)
if not auth_result:
return self._error_response("Credenciales inválidas", 401)
# Generar o recuperar API key
user = request.env['res.users'].browse(auth_result['uid'])
api_key = user.generate_api_key()
response_data = {
"success": True,
"message": "Autenticación exitosa",
"data": {
"user_id": user.id,
"username": user.login,
"name": user.name,
"api_key": api_key,
"database": database
}
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error in GET method: {str(e)}")
return ("<html><body><h2>Error processing request"
"</h2></body></html>")
if method == 'POST':
if not option.is_post:
return ("<html><body><h2>Method Not Allowed"
"</h2></body></html>")
else:
try:
datas = []
new_resource = request.env[str(model_name)].create(
data['values'])
partner_records = request.env[
str(model_name)].search_read(
domain=[('id', '=', new_resource.id)],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
_logger.error(f"Error en autenticación: {str(e)}")
return self._error_response("Error interno de autenticación", 500)
@http.route(['/api/v1/<model_name>', '/api/v1/<model_name>/<int:record_id>'],
type='http', auth='none', methods=['GET', 'POST', 'PUT', 'DELETE'], csrf=False)
def api_handler(self, model_name, record_id=None, **kw):
"""Endpoint principal de la API REST"""
method = request.httprequest.method
# Autenticación usando API key
api_key = request.httprequest.headers.get('X-API-Key') or request.httprequest.headers.get('api-key')
success, user_id, error_msg = self._authenticate_api_key(api_key)
if not success:
return self._error_response(error_msg, 401, "AUTHENTICATION_FAILED")
# Obtener configuración del modelo
api_config, error_msg = self._get_model_config(model_name)
if not api_config:
return self._error_response(error_msg, 404, "MODEL_NOT_CONFIGURED")
# Verificar permisos del método
method_permissions = {
'GET': api_config.is_get,
'POST': api_config.is_post,
'PUT': api_config.is_put,
'DELETE': api_config.is_delete
}
if not method_permissions.get(method, False):
return self._error_response(f"Método {method} no permitido para este modelo", 405, "METHOD_NOT_ALLOWED")
# Parsear datos de la request
data, fields, error_msg = self._parse_request_data(method)
if error_msg:
return self._error_response(error_msg, 400, "INVALID_REQUEST_DATA")
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
new_data = json.dumps({'New resource': partner_records, })
datas.append(new_data)
return request.make_response(data=datas)
return self._handle_request(method, api_config.model_id.model, record_id, data, fields)
except Exception as e:
_logger.error(f"Error in POST method: {str(e)}")
return ("<html><body><h2>Invalid JSON Data"
"</h2></body></html>")
if method == 'PUT':
if not option.is_put:
return ("<html><body><h2>Method Not Allowed"
"</h2></body></html>")
else:
if rec_id == 0:
return ("<html><body><h2>No ID Provided"
"</h2></body></html>")
else:
resource = request.env[str(model_name)].browse(
int(rec_id))
if not resource.exists():
return ("<html><body><h2>Resource not found"
"</h2></body></html>")
_logger.error(f"Error procesando request {method} para {model_name}: {str(e)}")
return self._error_response("Error interno del servidor", 500, "INTERNAL_SERVER_ERROR")
def _handle_request(self, method, model_name, record_id, data, fields):
"""Maneja las diferentes operaciones CRUD"""
model = request.env[model_name]
if method == 'GET':
return self._handle_get(model, record_id, fields)
elif method == 'POST':
return self._handle_post(model, data, fields)
elif method == 'PUT':
return self._handle_put(model, record_id, data, fields)
elif method == 'DELETE':
return self._handle_delete(model, record_id)
def _handle_get(self, model, record_id, fields):
"""Maneja requests GET"""
try:
if record_id:
# Obtener registro específico
domain = [('id', '=', record_id)]
search_fields = fields if fields else []
else:
# Obtener todos los registros
domain = []
search_fields = fields if fields else ['id', 'display_name']
records = model.search_read(domain=domain, fields=search_fields)
serialized_records = self._serialize_record_values(records)
response_data = {
"success": True,
"count": len(serialized_records),
"data": serialized_records
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en GET: {str(e)}")
return self._error_response("Error obteniendo registros", 500)
def _handle_post(self, model, data, fields):
"""Maneja requests POST (crear)"""
if not data.get('values'):
return self._error_response("Se requiere 'values' para crear registro", 400)
try:
datas = []
resource.write(data['values'])
partner_records = request.env[
str(model_name)].search_read(
domain=[('id', '=', resource.id)],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
new_record = model.create(data['values'])
# Obtener el registro creado con los campos especificados
search_fields = fields if fields else ['id', 'display_name']
record_data = new_record.read(search_fields)[0]
serialized_record = self._serialize_record_values([record_data])
response_data = {
"success": True,
"message": "Registro creado exitosamente",
"data": serialized_record[0] if serialized_record else {}
}
return self._json_response(response_data, 201)
except Exception as e:
_logger.error(f"Error en POST: {str(e)}")
return self._error_response(f"Error creando registro: {str(e)}", 400)
def _handle_put(self, model, record_id, data, fields):
"""Maneja requests PUT (actualizar)"""
if not record_id:
return self._error_response("ID de registro requerido para actualización", 400)
if not data.get('values'):
return self._error_response("Se requiere 'values' para actualizar registro", 400)
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
elif isinstance(value, bytes):
# Convert bytes to base64 string for JSON serialization
import base64
record[key] = base64.b64encode(value).decode('utf-8')
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict)):
# Handle other non-serializable iterables
record = model.browse(record_id)
if not record.exists():
return self._error_response("Registro no encontrado", 404)
record.write(data['values'])
# Obtener el registro actualizado
search_fields = fields if fields else ['id', 'display_name']
record_data = record.read(search_fields)[0]
serialized_record = self._serialize_record_values([record_data])
response_data = {
"success": True,
"message": "Registro actualizado exitosamente",
"data": serialized_record[0] if serialized_record else {}
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en PUT: {str(e)}")
return self._error_response(f"Error actualizando registro: {str(e)}", 400)
def _handle_delete(self, model, record_id):
"""Maneja requests DELETE"""
if not record_id:
return self._error_response("ID de registro requerido para eliminación", 400)
try:
record[key] = list(value) if value else []
except:
record[key] = str(value)
new_data = json.dumps(
{'Updated resource': partner_records,
})
datas.append(new_data)
return request.make_response(data=datas)
record = model.browse(record_id)
if not record.exists():
return self._error_response("Registro no encontrado", 404)
# Guardar información del registro antes de eliminarlo
record_info = {
"id": record.id,
"display_name": record.display_name if hasattr(record, 'display_name') else str(record)
}
record.unlink()
response_data = {
"success": True,
"message": "Registro eliminado exitosamente",
"deleted_record": record_info
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error in PUT method: {str(e)}")
return ("<html><body><h2>Invalid JSON Data "
"!</h2></body></html>")
if method == 'DELETE':
if not option.is_delete:
return ("<html><body><h2>Method Not Allowed"
"</h2></body></html>")
else:
if rec_id == 0:
return ("<html><body><h2>No ID Provided"
"</h2></body></html>")
else:
resource = request.env[str(model_name)].browse(
int(rec_id))
if not resource.exists():
return ("<html><body><h2>Resource not found"
"</h2></body></html>")
else:
records = request.env[
str(model_name)].search_read(
domain=[('id', '=', resource.id)],
fields=['id', 'display_name']
)
remove = json.dumps(
{"Resource deleted": records,
})
resource.unlink()
return request.make_response(data=remove)
@http.route(['/send_request'], type='http',
auth='none',
methods=['GET', 'POST', 'PUT', 'DELETE'], csrf=False)
def fetch_data(self, **kw):
"""This controller will be called when sending a request to the
specified url, and it will authenticate the api-key and then will
generate the result"""
http_method = request.httprequest.method
api_key = request.httprequest.headers.get('api-key')
auth_api = self.auth_api_key(api_key)
model = kw.get('model')
username = request.httprequest.headers.get('login')
password = request.httprequest.headers.get('password')
credential = {'login': username, 'password': password, 'type': 'password'}
request.session.authenticate(request.session.db, credential)
model_id = request.env['ir.model'].search(
[('model', '=', model)])
if not model_id:
return ("<html><body><h3>Invalid model, check spelling or maybe "
"the related "
"module is not installed"
"</h3></body></html>")
if auth_api == True:
if not kw.get('Id'):
rec_id = 0
else:
rec_id = int(kw.get('Id'))
# Pass the query parameters for GET requests
result = self.generate_response(http_method, model_id.id, rec_id, kw)
return result
else:
return auth_api
@http.route(['/odoo_connect'], type="http", auth="none", csrf=False,
methods=['GET'])
def odoo_connect(self, **kw):
"""This is the controller which initializes the api transaction by
generating the api-key for specific user and database"""
username = request.httprequest.headers.get('login')
password = request.httprequest.headers.get('password')
db = request.httprequest.headers.get('db')
_logger.error(f"Error en DELETE: {str(e)}")
return self._error_response(f"Error eliminando registro: {str(e)}", 400)
@http.route(['/api/v1/models'], type='http', auth='none', methods=['GET'], csrf=False)
def list_available_models(self, **kw):
"""Endpoint para listar modelos disponibles en la API"""
api_key = request.httprequest.headers.get('X-API-Key') or request.httprequest.headers.get('api-key')
success, user_id, error_msg = self._authenticate_api_key(api_key)
if not success:
return self._error_response(error_msg, 401)
try:
request.session.update(http.get_default_session(), db=db)
credential = {'login': username, 'password': password,
'type': 'password'}
auth = request.session.authenticate(db, credential)
user = request.env['res.users'].browse(auth['uid'])
api_key = request.env.user.generate_api(username)
datas = json.dumps({"Status": "auth successful",
"User": user.name,
"api-key": api_key})
return request.make_response(data=datas)
api_configs = request.env['connection.api'].sudo().search([])
models_data = []
for config in api_configs:
model_info = {
"model": config.model_id.model,
"name": config.model_id.name,
"methods": {
"GET": config.is_get,
"POST": config.is_post,
"PUT": config.is_put,
"DELETE": config.is_delete
}
}
models_data.append(model_info)
response_data = {
"success": True,
"count": len(models_data),
"data": models_data
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error in authentication: {str(e)}")
return ("<html><body><h2>wrong login credentials"
"</h2></body></html>")
_logger.error(f"Error listando modelos: {str(e)}")
return self._error_response("Error interno del servidor", 500)

Loading…
Cancel
Save