Browse Source

Merge commit 'refs/pull/401/head' of https://github.com/CybroOdoo/CybroAddons into 18.0

pull/401/head
docker-odoo 5 days ago
parent
commit
4b46df1735
  1. 3
      rest_api_odoo/__manifest__.py
  2. 2
      rest_api_odoo/controllers/__init__.py
  3. 179
      rest_api_odoo/controllers/jwt_auth.py
  4. 856
      rest_api_odoo/controllers/rest_api_odoo.py
  5. 998
      rest_api_odoo/controllers/swagger_controller.py
  6. 259
      rest_api_odoo/models/connection_api.py
  7. 251
      rest_api_odoo/models/res_users.py
  8. 3
      rest_api_odoo/security/ir.model.access.csv
  9. 176
      rest_api_odoo/views/api_dashboard_views.xml
  10. 228
      rest_api_odoo/views/connection_api_views.xml
  11. 158
      rest_api_odoo/views/res_users_views.xml

3
rest_api_odoo/__manifest__.py

@ -35,7 +35,8 @@
"data": [ "data": [
'security/ir.model.access.csv', 'security/ir.model.access.csv',
'views/res_users_views.xml', 'views/res_users_views.xml',
'views/connection_api_views.xml' 'views/connection_api_views.xml',
'views/api_dashboard_views.xml',
], ],
'images': ['static/description/banner.jpg'], 'images': ['static/description/banner.jpg'],
'license': 'LGPL-3', 'license': 'LGPL-3',

2
rest_api_odoo/controllers/__init__.py

@ -20,3 +20,5 @@
# #
############################################################################# #############################################################################
from . import rest_api_odoo from . import rest_api_odoo
from . import swagger_controller
from . import jwt_auth

179
rest_api_odoo/controllers/jwt_auth.py

@ -0,0 +1,179 @@
# -*- coding: utf-8 -*-
import jwt
import base64
import secrets
import logging
from datetime import datetime, timedelta
from odoo import fields
from odoo.http import request
_logger = logging.getLogger(__name__)
class JWTAuthMixin:
"""Mixin para autenticación JWT reutilizable"""
def _get_jwt_secret(self):
"""Obtiene la clave secreta para JWT desde configuración del sistema"""
secret = request.env['ir.config_parameter'].sudo().get_param('rest_api.jwt_secret')
if not secret:
# Generar y guardar una nueva clave secreta
try:
secret = base64.b64encode(secrets.token_bytes(32)).decode('utf-8')
except (ImportError, AttributeError):
# Fallback si secrets no está disponible
import uuid
import hashlib
secret = hashlib.sha256(str(uuid.uuid4()).encode()).hexdigest()
request.env['ir.config_parameter'].sudo().set_param('rest_api.jwt_secret', secret)
_logger.info("Generated new JWT secret key")
return secret
def _generate_jwt_token(self, user_id, expires_in_hours=24):
"""
Genera un JWT token para el usuario
Args:
user_id: ID del usuario
expires_in_hours: Horas hasta expiración (default: 24h)
Returns:
str: JWT token o None si hay error
"""
try:
now = datetime.utcnow()
payload = {
'user_id': user_id,
'iat': now, # Issued at
'exp': now + timedelta(hours=expires_in_hours), # Expiration
'iss': 'odoo-rest-api', # Issuer
'aud': 'odoo-client', # Audience
'jti': f"{user_id}_{int(now.timestamp())}" # JWT ID
}
secret = self._get_jwt_secret()
token = jwt.encode(payload, secret, algorithm='HS256')
_logger.info(f"Generated JWT token for user {user_id}, expires in {expires_in_hours}h")
return token
except Exception as e:
_logger.error(f"Error generating JWT token: {str(e)}")
return None
def _validate_jwt_token(self, token):
"""
Valida un JWT token
Args:
token: Token JWT (puede incluir 'Bearer ' al inicio)
Returns:
tuple: (success: bool, user_id: int or None, error_message: str or None)
"""
if not token:
return False, None, "Token no proporcionado"
# Limpiar token (remover 'Bearer ' si está presente)
if token.startswith('Bearer '):
token = token[7:]
elif token.startswith('bearer '):
token = token[7:]
try:
secret = self._get_jwt_secret()
# Decodificar y validar token
payload = jwt.decode(
token,
secret,
algorithms=['HS256'],
audience='odoo-client',
issuer='odoo-rest-api'
)
user_id = payload.get('user_id')
if not user_id:
return False, None, "Token inválido: user_id no encontrado"
# Verificar que el usuario existe y está activo
user = request.env['res.users'].sudo().browse(user_id)
if not user.exists():
return False, None, "Usuario no encontrado"
if not user.active:
return False, None, "Usuario inactivo"
# Configurar contexto de sesión
request.session.uid = user_id
if hasattr(request, 'env'):
request.env.user = user
# Log successful authentication
_logger.debug(f"JWT authentication successful for user {user_id} ({user.login})")
return True, user_id, None
except jwt.ExpiredSignatureError:
_logger.warning("JWT token expired")
return False, None, "Token expirado"
except jwt.InvalidTokenError as e:
_logger.warning(f"Invalid JWT token: {str(e)}")
return False, None, f"Token inválido: {str(e)}"
except jwt.InvalidAudienceError:
_logger.warning("JWT token has invalid audience")
return False, None, "Token inválido: audiencia incorrecta"
except jwt.InvalidIssuerError:
_logger.warning("JWT token has invalid issuer")
return False, None, "Token inválido: emisor incorrecto"
except Exception as e:
_logger.error(f"Error validating JWT token: {str(e)}")
return False, None, "Error interno validando token"
def _decode_jwt_payload(self, token):
"""
Decodifica un JWT token sin validar (útil para debugging)
Args:
token: Token JWT
Returns:
dict: Payload del token o None si hay error
"""
try:
if token.startswith('Bearer '):
token = token[7:]
# Decodificar sin verificar (solo para obtener payload)
payload = jwt.decode(token, options={"verify_signature": False})
return payload
except Exception as e:
_logger.error(f"Error decoding JWT payload: {str(e)}")
return None
def _get_token_info(self, token):
"""
Obtiene información de un JWT token
Args:
token: Token JWT
Returns:
dict: Información del token
"""
payload = self._decode_jwt_payload(token)
if not payload:
return None
try:
exp_timestamp = payload.get('exp')
iat_timestamp = payload.get('iat')
info = {
'user_id': payload.get('user_id'),
'issued_at': datetime.fromtimestamp(iat_timestamp) if iat_timestamp else None,
'expires_at': datetime.fromtimestamp(exp_timestamp) if exp_timestamp else None,
'issuer': payload.get('iss'),
'audience': payload.get('aud'),
'jwt_id': payload.get('jti'),
'is_expired': datetime.utcnow() > datetime.fromtimestamp(exp_timestamp) if exp_timestamp else True
}
return info
except Exception as e:
_logger.error(f"Error getting token info: {str(e)}")
return None

856
rest_api_odoo/controllers/rest_api_odoo.py

@ -1,257 +1,657 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#############################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Ayana KP (odoo@cybrosys.com)
#
# You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details.
#
# You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE
# (LGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
#############################################################################
import json import json
import logging import logging
from odoo import http import base64
import ast
from datetime import datetime, date, timedelta
from odoo import http, fields
from odoo.http import request from odoo.http import request
from datetime import datetime, date from .jwt_auth import JWTAuthMixin
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
class RestApi(http.Controller): class RestApi(http.Controller, JWTAuthMixin):
"""This is a controller which is used to generate responses based on the """Controlador API REST mejorado con JWT y filtrado avanzado"""
api requests"""
def auth_api_key(self, api_key): def _json_response(self, data, status=200):
"""This function is used to authenticate the api-key when sending a """Genera respuesta JSON estandarizada"""
request""" try:
user_id = request.env['res.users'].sudo().search([('api_key', '=', api_key)]) response = request.make_response(
if api_key is not None and user_id: json.dumps(data, ensure_ascii=False, indent=2, default=str),
response = True headers=[('Content-Type', 'application/json; charset=utf-8')]
elif not user_id: )
response = ('<html><body><h2>Invalid <i>API Key</i> ' response.status_code = status
'!</h2></body></html>')
else:
response = ("<html><body><h2>No <i>API Key</i> Provided "
"!</h2></body></html>")
return response return response
except Exception as e:
_logger.error(f"Error creating JSON response: {str(e)}")
fallback_data = {
'error': True,
'message': 'Error interno creando respuesta JSON',
'status_code': 500
}
return request.make_response(
json.dumps(fallback_data, indent=2),
status=500,
headers=[('Content-Type', 'application/json; charset=utf-8')]
)
def _error_response(self, message, status=400, error_code=None):
"""Genera respuesta de error estandarizada"""
error_data = {
'error': True,
'message': message,
'status_code': status
}
if error_code:
error_data['error_code'] = error_code
def generate_response(self, method, model, rec_id): return self._json_response(error_data, status)
"""This function is used to generate the response based on the type
of request and the parameters given""" def _serialize_record_values(self, records):
option = request.env['connection.api'].search( """Serializa los valores de los registros para JSON"""
[('model_id', '=', model)], limit=1) if not records:
model_name = option.model_id.model return []
if method != 'DELETE':
data = json.loads(request.httprequest.data) serialized_records = []
for record in records:
serialized_record = {}
for key, value in record.items():
try:
if isinstance(value, (datetime, date)):
serialized_record[key] = value.isoformat()
elif isinstance(value, bytes):
serialized_record[key] = base64.b64encode(value).decode('utf-8')
elif isinstance(value, tuple) and len(value) == 2:
# Para relaciones Many2one que vienen como (id, name)
serialized_record[key] = list(value)
elif hasattr(value, '__iter__') and not isinstance(value, (str, dict, bytes)):
try:
serialized_record[key] = list(value) if value else []
except:
serialized_record[key] = str(value)
else: else:
serialized_record[key] = value
except Exception as e:
_logger.warning(f"Error serializing field {key}: {str(e)}")
serialized_record[key] = str(value) if value is not None else None
serialized_records.append(serialized_record)
return serialized_records
def _get_model_config(self, model_name):
"""Obtiene la configuración de la API para un modelo"""
try:
model_obj = request.env['ir.model'].sudo().search([('model', '=', model_name)], limit=1)
if not model_obj:
return None, "Modelo no encontrado"
api_config = request.env['connection.api'].sudo().search([
('model_id', '=', model_obj.id),
('active', '=', True)
], limit=1)
if not api_config:
return None, "Modelo no configurado para API REST"
return api_config, None
except Exception as e:
_logger.error(f"Error getting model config for {model_name}: {str(e)}")
return None, f"Error obteniendo configuración del modelo: {str(e)}"
def _parse_request_data(self, method):
"""Parsea los datos de la request con parámetros avanzados"""
data = {} data = {}
fields = [] fields = []
if data: domain = []
for field in data['fields']: limit = None
fields.append(field) offset = None
if not fields and method != 'DELETE': order = None
return ("<html><body><h2>No fields selected for the model"
"</h2></body></html>")
if not option:
return ("<html><body><h2>No Record Created for the model"
"</h2></body></html>")
try: try:
if method == 'GET': if method == 'GET':
fields = [] query_params = dict(request.httprequest.args)
for field in data['fields']:
fields.append(field) # Parsear domain
if not option.is_get: if 'domain' in query_params:
return ("<html><body><h2>Method Not Allowed" try:
"</h2></body></html>") domain = ast.literal_eval(query_params['domain'])
else: if not isinstance(domain, list):
datas = [] domain = []
if rec_id != 0:
partner_records = request.env[
str(model_name)].search_read(
domain=[('id', '=', rec_id)],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
data = json.dumps({
'records': partner_records
})
datas.append(data)
return request.make_response(data=datas)
else:
partner_records = request.env[
str(model_name)].search_read(
domain=[],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
data = json.dumps({
'records': partner_records
})
datas.append(data)
return request.make_response(data=datas)
except: except:
return ("<html><body><h2>Invalid JSON Data" _logger.warning("Invalid domain format, ignoring")
"</h2></body></html>") domain = []
if method == 'POST':
if not option.is_post: # Parsear fields
return ("<html><body><h2>Method Not Allowed" if 'fields' in query_params:
"</h2></body></html>") fields = [field.strip() for field in query_params['fields'].split(',') if field.strip()]
else:
# Parsear limit
if 'limit' in query_params:
try: try:
data = json.loads(request.httprequest.data) limit = int(query_params['limit'])
datas = [] if limit <= 0:
new_resource = request.env[str(model_name)].create( limit = None
data['values'])
partner_records = request.env[
str(model_name)].search_read(
domain=[('id', '=', new_resource.id)],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
new_data = json.dumps({'New resource': partner_records, })
datas.append(new_data)
return request.make_response(data=datas)
except: except:
return ("<html><body><h2>Invalid JSON Data" pass
"</h2></body></html>")
if method == 'PUT': # Parsear offset
if not option.is_put: if 'offset' in query_params:
return ("<html><body><h2>Method Not Allowed"
"</h2></body></html>")
else:
if rec_id == 0:
return ("<html><body><h2>No ID Provided"
"</h2></body></html>")
else:
resource = request.env[str(model_name)].browse(
int(rec_id))
if not resource.exists():
return ("<html><body><h2>Resource not found"
"</h2></body></html>")
else:
try: try:
datas = [] offset = int(query_params['offset'])
data = json.loads(request.httprequest.data) if offset < 0:
resource.write(data['values']) offset = None
partner_records = request.env[ except:
str(model_name)].search_read( pass
domain=[('id', '=', resource.id)],
fields=fields
)
for record in partner_records:
for key, value in record.items():
if isinstance(value, (datetime, date)):
record[key] = value.isoformat()
new_data = json.dumps(
{'Updated resource': partner_records,
})
datas.append(new_data)
return request.make_response(data=datas)
# Parsear order
if 'order' in query_params:
order = query_params['order'].strip()
if not order:
order = None
# También intentar JSON body para GET (opcional)
try:
if request.httprequest.data:
json_data = json.loads(request.httprequest.data.decode('utf-8'))
data.update(json_data)
if 'fields' in json_data and not fields:
fields = json_data['fields']
if 'domain' in json_data and not domain:
domain = json_data['domain']
except: except:
return ("<html><body><h2>Invalid JSON Data " pass
"!</h2></body></html>")
if method == 'DELETE': elif method in ['POST', 'PUT']:
if not option.is_delete: try:
return ("<html><body><h2>Method Not Allowed" if request.httprequest.data:
"</h2></body></html>") data = json.loads(request.httprequest.data.decode('utf-8'))
else: if 'fields' in data:
if rec_id == 0: fields = data['fields']
return ("<html><body><h2>No ID Provided"
"</h2></body></html>")
else: else:
resource = request.env[str(model_name)].browse( return None, None, None, None, None, None, "No se proporcionaron datos JSON"
int(rec_id)) except (json.JSONDecodeError, UnicodeDecodeError) as e:
if not resource.exists(): return None, None, None, None, None, None, f"JSON inválido: {str(e)}"
return ("<html><body><h2>Resource not found"
"</h2></body></html>") return data, fields, domain, limit, offset, order, None
except Exception as e:
_logger.error(f"Error parsing request data: {str(e)}")
return None, None, None, None, None, None, f"Error procesando datos de la request: {str(e)}"
@http.route(['/api/v1/auth'], type='http', auth='none', methods=['POST'], csrf=False)
def authenticate(self, **kw):
"""Endpoint de autenticación que genera JWT token"""
try:
if request.httprequest.data:
data = json.loads(request.httprequest.data.decode('utf-8'))
else: else:
data = {}
except (json.JSONDecodeError, UnicodeDecodeError):
return self._error_response("JSON inválido", 400)
records = request.env[ username = data.get('username') or request.httprequest.headers.get('username')
str(model_name)].search_read( password = data.get('password') or request.httprequest.headers.get('password')
domain=[('id', '=', resource.id)], database = data.get('database') or request.httprequest.headers.get('database') or request.env.cr.dbname
fields=['id', 'display_name'] expires_in = data.get('expires_in_hours', 24)
)
remove = json.dumps( if not all([username, password]):
{"Resource deleted": records, return self._error_response("Username y password son requeridos", 400)
})
resource.unlink() # Validar expires_in
return request.make_response(data=remove) if not isinstance(expires_in, int) or expires_in < 1 or expires_in > 168: # Max 7 días
expires_in = 24
@http.route(['/send_request'], type='http',
auth='none', try:
methods=['GET', 'POST', 'PUT', 'DELETE'], csrf=False) # Autenticar credenciales
def fetch_data(self, **kw):
"""This controller will be called when sending a request to the
specified url, and it will authenticate the api-key and then will
generate the result"""
http_method = request.httprequest.method
api_key = request.httprequest.headers.get('api-key')
auth_api = self.auth_api_key(api_key)
model = kw.get('model')
username = request.httprequest.headers.get('login')
password = request.httprequest.headers.get('password')
credential = {'login': username, 'password': password, 'type': 'password'} credential = {'login': username, 'password': password, 'type': 'password'}
request.session.authenticate(request.session.db, credential)
model_id = request.env['ir.model'].search( auth_result = request.session.authenticate(database, credential)
[('model', '=', model)]) if not auth_result:
if not model_id: return self._error_response("Credenciales inválidas", 401)
return ("<html><body><h3>Invalid model, check spelling or maybe "
"the related " uid = auth_result['uid']
"module is not installed"
"</h3></body></html>") if not uid:
return self._error_response("Credenciales inválidas", 401)
if auth_api == True:
if not kw.get('Id'): # Generar JWT token
rec_id = 0 user = request.env['res.users'].browse(uid)
else: token = self._generate_jwt_token(uid, expires_in)
rec_id = int(kw.get('Id'))
result = self.generate_response(http_method, model_id.id, rec_id) if not token:
return result return self._error_response("Error generando token de acceso", 500)
response_data = {
"success": True,
"message": "Autenticación exitosa",
"data": {
"user_id": user.id,
"username": user.login,
"name": user.name,
"access_token": token,
"token_type": "Bearer",
"expires_in": expires_in * 3600, # En segundos
"database": database
}
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en autenticación: {str(e)}")
return self._error_response("Error interno de autenticación", 500)
@http.route(['/api/v1/refresh'], type='http', auth='none', methods=['POST'], csrf=False)
def refresh_token(self, **kw):
"""Endpoint para refrescar un JWT token"""
try:
success, user_id, error_msg = self._authenticate_request()
if not success:
return self._error_response(error_msg, 401, "TOKEN_INVALID")
# Generar nuevo token
try:
data = json.loads(request.httprequest.data.decode('utf-8')) if request.httprequest.data else {}
expires_in = data.get('expires_in_hours', 24)
if not isinstance(expires_in, int) or expires_in < 1 or expires_in > 168:
expires_in = 24
except:
expires_in = 24
new_token = self._generate_jwt_token(user_id, expires_in)
if not new_token:
return self._error_response("Error generando nuevo token", 500)
user = request.env['res.users'].browse(user_id)
response_data = {
"success": True,
"message": "Token renovado exitosamente",
"data": {
"access_token": new_token,
"token_type": "Bearer",
"expires_in": expires_in * 3600,
"user_id": user.id,
"username": user.login
}
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error refreshing token: {str(e)}")
return self._error_response("Error interno renovando token", 500)
def _authenticate_request(self):
"""
Autentica la request usando JWT token
Returns: (success: bool, user_id: int or None, error_message: str or None)
"""
# Buscar token en headers
auth_header = request.httprequest.headers.get('Authorization')
if not auth_header:
# Fallback a headers alternativos para compatibilidad
token = request.httprequest.headers.get('X-API-Key') or request.httprequest.headers.get('api-key')
if token:
auth_header = f"Bearer {token}"
if not auth_header:
return False, None, "Token de autorización no proporcionado (use Authorization: Bearer <token>)"
return self._validate_jwt_token(auth_header)
@http.route(['/api/v1/<model_name>', '/api/v1/<model_name>/<int:record_id>'],
type='http', auth='none', methods=['GET', 'POST', 'PUT', 'DELETE'], csrf=False)
def api_handler(self, model_name, record_id=None, **kw):
"""Endpoint principal de la API REST con JWT y filtrado avanzado"""
method = request.httprequest.method
# Autenticación usando JWT
success, user_id, error_msg = self._authenticate_request()
if not success:
return self._error_response(error_msg, 401, "AUTHENTICATION_FAILED")
# Obtener configuración del modelo
api_config, error_msg = self._get_model_config(model_name)
if not api_config:
return self._error_response(error_msg, 404, "MODEL_NOT_CONFIGURED")
# Verificar permisos del método
method_permissions = {
'GET': api_config.is_get,
'POST': api_config.is_post,
'PUT': api_config.is_put,
'DELETE': api_config.is_delete
}
if not method_permissions.get(method, False):
return self._error_response(f"Método {method} no permitido para este modelo", 405, "METHOD_NOT_ALLOWED")
# Parsear datos de la request con parámetros avanzados
data, fields, domain, limit, offset, order, error_msg = self._parse_request_data(method)
if error_msg:
return self._error_response(error_msg, 400, "INVALID_REQUEST_DATA")
try:
return self._handle_request(method, api_config.model_id.model, record_id, data, fields, domain, limit, offset, order, api_config)
except Exception as e:
_logger.error(f"Error procesando request {method} para {model_name}: {str(e)}")
return self._error_response("Error interno del servidor", 500, "INTERNAL_SERVER_ERROR")
def _handle_request(self, method, model_name, record_id, data, fields, domain, limit, offset, order, api_config=None):
"""Maneja las diferentes operaciones CRUD con parámetros avanzados"""
try:
model = request.env[model_name]
if method == 'GET':
return self._handle_get(model, record_id, fields, domain, limit, offset, order, api_config)
elif method == 'POST':
return self._handle_post(model, data, fields)
elif method == 'PUT':
return self._handle_put(model, record_id, data, fields)
elif method == 'DELETE':
return self._handle_delete(model, record_id)
except Exception as e:
_logger.error(f"Error in _handle_request: {str(e)}")
raise
def _handle_get(self, model, record_id, fields, domain, limit, offset, order, api_config=None):
"""Maneja requests GET con filtrado avanzado"""
try:
# Aplicar límites de la configuración
max_limit = getattr(api_config, 'max_records_limit', 1000) if api_config else 1000
if limit and limit > max_limit:
limit = max_limit
if record_id:
# Obtener registro específico
search_domain = [('id', '=', record_id)]
search_fields = fields if fields else []
records = model.search_read(domain=search_domain, fields=search_fields)
total_count = len(records)
response_data = {
"success": True,
"count": len(records),
"total": total_count,
"data": self._serialize_record_values(records)
}
else: else:
return auth_api # Obtener registros con filtros
search_domain = domain if domain else []
@http.route(['/odoo_connect'], type="http", auth="none", csrf=False, search_fields = fields if fields else ['id', 'display_name']
methods=['GET'])
def odoo_connect(self, **kw): # Contar total sin límite para metadatos
"""This is the controller which initializes the api transaction by
generating the api-key for specific user and database"""
username = request.httprequest.headers.get('login')
password = request.httprequest.headers.get('password')
db = request.httprequest.headers.get('db')
try: try:
request.session.update(http.get_default_session(), db=db) total_count = model.search_count(search_domain)
credential = {'login': username, 'password': password,
'type': 'password'}
auth = request.session.authenticate(db, credential)
user = request.env['res.users'].browse(auth['uid'])
api_key = request.env.user.generate_api(username)
datas = json.dumps({"Status": "auth successful",
"User": user.name,
"api-key": api_key})
return request.make_response(data=datas)
except: except:
return ("<html><body><h2>wrong login credentials" total_count = None
"</h2></body></html>")
# Búsqueda con parámetros
search_params = {
'domain': search_domain,
'fields': search_fields
}
if limit:
search_params['limit'] = limit
if offset:
search_params['offset'] = offset
if order:
search_params['order'] = order
records = model.search_read(**search_params)
response_data = {
"success": True,
"count": len(records),
"data": self._serialize_record_values(records)
}
# Agregar metadatos de paginación
if total_count is not None:
response_data["total"] = total_count
if offset:
response_data["offset"] = offset
if limit:
response_data["limit"] = limit
# Información de paginación
if limit and total_count is not None:
current_offset = offset or 0
has_more = (current_offset + limit) < total_count
response_data["has_more"] = has_more
if has_more:
response_data["next_offset"] = current_offset + limit
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en GET: {str(e)}")
return self._error_response(f"Error obteniendo registros: {str(e)}", 500)
def _handle_post(self, model, data, fields):
"""Maneja requests POST (crear)"""
if not data.get('values'):
return self._error_response("Se requiere 'values' para crear registro", 400)
try:
new_record = model.create(data['values'])
# Obtener el registro creado con los campos especificados
search_fields = fields if fields else ['id', 'display_name']
record_data = new_record.read(search_fields)[0]
response_data = {
"success": True,
"message": "Registro creado exitosamente",
"count": 1,
"data": self._serialize_record_values([record_data])
}
return self._json_response(response_data, 201)
except Exception as e:
_logger.error(f"Error en POST: {str(e)}")
return self._error_response(f"Error creando registro: {str(e)}", 400)
def _handle_put(self, model, record_id, data, fields):
"""Maneja requests PUT (actualizar)"""
if not record_id:
return self._error_response("ID de registro requerido para actualización", 400)
if not data.get('values'):
return self._error_response("Se requiere 'values' para actualizar registro", 400)
try:
record = model.browse(record_id)
if not record.exists():
return self._error_response("Registro no encontrado", 404)
record.write(data['values'])
# Obtener el registro actualizado
search_fields = fields if fields else ['id', 'display_name']
record_data = record.read(search_fields)[0]
response_data = {
"success": True,
"message": "Registro actualizado exitosamente",
"count": 1,
"data": self._serialize_record_values([record_data])
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en PUT: {str(e)}")
return self._error_response(f"Error actualizando registro: {str(e)}", 400)
def _handle_delete(self, model, record_id):
"""Maneja requests DELETE"""
if not record_id:
return self._error_response("ID de registro requerido para eliminación", 400)
try:
record = model.browse(record_id)
if not record.exists():
return self._error_response("Registro no encontrado", 404)
# Guardar información del registro antes de eliminarlo
record_info = {
"id": record.id,
"display_name": record.display_name if hasattr(record, 'display_name') else str(record)
}
record.unlink()
response_data = {
"success": True,
"message": "Registro eliminado exitosamente",
"deleted_record": record_info
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error en DELETE: {str(e)}")
return self._error_response(f"Error eliminando registro: {str(e)}", 400)
@http.route(['/api/v1/models'], type='http', auth='none', methods=['GET'], csrf=False)
def list_available_models(self, **kw):
"""Endpoint para listar modelos disponibles en la API"""
success, user_id, error_msg = self._authenticate_request()
if not success:
return self._error_response(error_msg, 401)
try:
api_configs = request.env['connection.api'].sudo().search([('active', '=', True)])
models_data = []
for config in api_configs:
model_info = {
"model": config.model_id.model,
"name": config.model_id.name,
"description": config.description or f"API REST para el modelo {config.model_id.name}",
"methods": {
"GET": config.is_get,
"POST": config.is_post,
"PUT": config.is_put,
"DELETE": config.is_delete
},
"max_records_limit": config.max_records_limit,
"endpoints": {
"collection": f"/api/v1/{config.model_id.model}",
"item": f"/api/v1/{config.model_id.model}/{{id}}",
"schema": f"/api/v1/schema/{config.model_id.model}"
}
}
models_data.append(model_info)
# Agregar información de documentación
base_url = request.env['ir.config_parameter'].sudo().get_param('web.base.url', 'http://localhost:8069')
response_data = {
"success": True,
"count": len(models_data),
"data": models_data,
"documentation": {
"swagger_ui": f"{base_url}/api/v1/docs",
"openapi_spec": f"{base_url}/api/v1/openapi.json"
},
"authentication": {
"type": "JWT Bearer Token",
"header": "Authorization: Bearer <token>",
"auth_endpoint": f"{base_url}/api/v1/auth",
"refresh_endpoint": f"{base_url}/api/v1/refresh"
}
}
return self._json_response(response_data)
except Exception as e:
_logger.error(f"Error listando modelos: {str(e)}")
return self._error_response("Error interno del servidor", 500)
@http.route(['/api/v1/health'], type='http', auth='none', methods=['GET'], csrf=False)
def health_check(self, **kwargs):
"""Endpoint de verificación de salud de la API"""
try:
# Verificar conexión a BD
request.env.cr.execute("SELECT 1")
# Verificar configuraciones activas
active_configs = len(request.env["connection.api"].sudo().search([("active", "=", True)]))
# Verificar configuración JWT
jwt_secret = request.env['ir.config_parameter'].sudo().get_param('rest_api.jwt_secret')
health_status = {
"status": "healthy",
"timestamp": fields.Datetime.now().isoformat(),
"database": request.env.cr.dbname,
"active_models": active_configs,
"version": "2.0.0",
"auth_method": "JWT Bearer Token",
"jwt_configured": bool(jwt_secret),
"features": {
"dynamic_schemas": True,
"advanced_filtering": True,
"jwt_authentication": True,
"pagination": True,
"field_selection": True
}
}
return self._json_response(health_status)
except Exception as e:
error_status = {
"status": "unhealthy",
"error": str(e),
"timestamp": fields.Datetime.now().isoformat()
}
return request.make_response(
json.dumps(error_status),
status=503,
headers=[("Content-Type", "application/json; charset=utf-8")]
)
@http.route(['/api', '/api/'], type='http', auth='none', methods=['GET'], csrf=False)
def api_root(self, **kw):
"""Endpoint raíz de la API que proporciona información básica"""
try:
base_url = request.env['ir.config_parameter'].sudo().get_param('web.base.url', 'http://localhost:8069')
api_info = {
"message": "Bienvenido a la REST API de Odoo v2.0",
"version": "2.0.0",
"status": "active",
"documentation": f"{base_url}/api/v1/docs",
"features": [
"JWT Bearer Token Authentication",
"Dynamic Schema Generation",
"Advanced Filtering with Domain",
"Pagination Support",
"Field Selection",
"Interactive Swagger Documentation"
],
"endpoints": {
"auth": f"{base_url}/api/v1/auth",
"refresh": f"{base_url}/api/v1/refresh",
"models": f"{base_url}/api/v1/models",
"health": f"{base_url}/api/v1/health",
"docs": f"{base_url}/api/v1/docs",
"openapi": f"{base_url}/api/v1/openapi.json"
},
"authentication": {
"type": "JWT Bearer Token",
"header": "Authorization",
"format": "Bearer <token>",
"expires_in_hours": "configurable (default: 24h)"
}
}
return self._json_response(api_info)
except Exception as e:
_logger.error(f"Error en api_root: {str(e)}")
return self._error_response("Error interno del servidor", 500)

998
rest_api_odoo/controllers/swagger_controller.py

@ -0,0 +1,998 @@
# -*- coding: utf-8 -*-
import json
import logging
from datetime import datetime
from odoo import http, fields
from odoo.http import request
_logger = logging.getLogger(__name__)
class SwaggerController(http.Controller):
"""Controlador Swagger/OpenAPI con esquemas dinámicos mejorados"""
@http.route(["/api/v1/docs", "/api/docs"], type="http", auth="none", methods=["GET"], csrf=False)
def swagger_ui(self, **kwargs):
"""Muestra la interfaz de Swagger UI mejorada"""
try:
base_url = self._get_base_url()
swagger_html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Odoo REST API - Documentation</title>
<link rel="stylesheet" type="text/css" href="https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui.css" />
<style>
html {{ box-sizing: border-box; overflow: -moz-scrollbars-vertical; overflow-y: scroll; }}
body {{ margin: 0; background: #fafafa; font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; }}
#swagger-ui {{ max-width: 1400px; margin: 0 auto; padding: 20px; }}
.topbar {{ background-color: #89bf04 !important; }}
.swagger-ui .info .title {{ color: #89bf04; font-size: 2.2em; }}
.swagger-ui .info .description {{ font-size: 1.1em; line-height: 1.6; }}
.auth-banner {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white; padding: 15px; border-radius: 8px; margin: 20px 0;
text-align: center;
}}
</style>
</head>
<body>
<div class="auth-banner">
<h3>🔐 Autenticación JWT</h3>
<p>Esta API usa <strong>JWT Bearer Tokens</strong>. Usa el endpoint <code>/auth</code> para obtener tu token.</p>
<p>Luego agrega: <code>Authorization: Bearer tu_token_aqui</code></p>
</div>
<div id="swagger-ui"></div>
<script src="https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui-bundle.js"></script>
<script src="https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui-standalone-preset.js"></script>
<script>
window.onload = function() {{
const ui = SwaggerUIBundle({{
url: '{base_url}/api/v1/openapi.json',
dom_id: '#swagger-ui',
deepLinking: true,
presets: [SwaggerUIBundle.presets.apis, SwaggerUIStandalonePreset],
plugins: [SwaggerUIBundle.plugins.DownloadUrl],
layout: "StandaloneLayout",
validatorUrl: null,
tryItOutEnabled: true,
supportedSubmitMethods: ['get', 'post', 'put', 'delete'],
docExpansion: 'list',
defaultModelsExpandDepth: 2,
defaultModelExpandDepth: 3,
requestInterceptor: function(req) {{
// Auto-agregar Content-Type para requests que no sean GET
if (req.method !== 'GET' && !req.headers['Content-Type']) {{
req.headers['Content-Type'] = 'application/json';
}}
return req;
}},
responseInterceptor: function(res) {{
// Log responses para debugging
if (res.status >= 400) {{
console.warn('API Error:', res.status, res.statusText, res.body);
}}
return res;
}}
}});
window.ui = ui;
}}
</script>
</body>
</html>"""
return request.make_response(swagger_html, headers=[("Content-Type", "text/html; charset=utf-8")])
except Exception as e:
_logger.error(f"Error serving Swagger UI: {str(e)}")
return request.make_response(f"<h1>Error</h1><p>Could not load documentation: {str(e)}</p>", status=500)
@http.route(["/api/v1/openapi.json"], type="http", auth="none", methods=["GET"], csrf=False)
def openapi_spec(self, **kwargs):
"""Genera la especificación OpenAPI/Swagger con mejoras"""
try:
base_url = self._get_base_url()
api_configs = []
try:
api_configs = request.env["connection.api"].sudo().search([("active", "=", True)])
except Exception as e:
_logger.warning(f"Could not load API configurations: {str(e)}")
# Generar esquemas dinámicos mejorados
dynamic_schemas = self._generate_enhanced_schemas(api_configs)
openapi_spec = {
"openapi": "3.0.3",
"info": {
"title": "Odoo REST API",
"description": self._get_enhanced_description(base_url),
"version": "2.0.0",
"contact": {"name": "API Support", "email": "support@example.com"},
"license": {"name": "LGPL-3", "url": "https://www.gnu.org/licenses/lgpl-3.0.html"}
},
"servers": [{"url": f"{base_url}/api/v1", "description": "Production server"}],
"security": [{"BearerAuth": []}],
"components": {
"securitySchemes": {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "JWT Bearer token obtenido del endpoint /auth"
}
},
"schemas": {
**self._get_base_schemas(),
**dynamic_schemas
},
"responses": self._get_common_responses()
},
"paths": self._generate_enhanced_paths(api_configs),
"tags": self._generate_enhanced_tags(api_configs)
}
return request.make_response(
json.dumps(openapi_spec, indent=2, ensure_ascii=False),
headers=[("Content-Type", "application/json; charset=utf-8")]
)
except Exception as e:
_logger.error(f"Error generating OpenAPI spec: {str(e)}")
return request.make_response(
json.dumps({"error": f"Error loading API specification: {str(e)}"}),
headers=[("Content-Type", "application/json; charset=utf-8")]
)
@http.route(['/api/v1/schema/<model_name>'], type='http', auth='none', methods=['GET'], csrf=False)
def get_model_schema(self, model_name, **kwargs):
"""Obtiene el esquema de un modelo específico"""
try:
# Obtener configuración del modelo
model_obj = request.env['ir.model'].sudo().search([('model', '=', model_name)], limit=1)
if not model_obj:
return self._error_response("Modelo no encontrado", 404)
api_config = request.env['connection.api'].sudo().search([
('model_id', '=', model_obj.id),
('active', '=', True)
], limit=1)
if not api_config:
return self._error_response("Modelo no configurado para API REST", 404)
model_class = request.env[model_name].sudo()
schema = self._generate_enhanced_model_schema(model_class, api_config)
response_data = {
"model": model_name,
"display_name": model_obj.name,
"schema": schema,
"endpoints": {
"collection": f"/api/v1/{model_name}",
"item": f"/api/v1/{model_name}/{{id}}"
},
"available_methods": {
"GET": api_config.is_get,
"POST": api_config.is_post,
"PUT": api_config.is_put,
"DELETE": api_config.is_delete
}
}
return request.make_response(
json.dumps(response_data, indent=2, ensure_ascii=False),
headers=[("Content-Type", "application/json; charset=utf-8")]
)
except Exception as e:
return request.make_response(
json.dumps({"error": f"Error getting schema: {str(e)}"}),
status=500,
headers=[("Content-Type", "application/json; charset=utf-8")]
)
def _get_base_url(self):
"""Obtiene la URL base del servidor"""
try:
if request.env:
return request.env["ir.config_parameter"].sudo().get_param("web.base.url", "http://localhost:8069")
return request.httprequest.host_url.rstrip('/')
except:
return "http://localhost:8069"
def _get_enhanced_description(self, base_url):
"""Descripción mejorada de la API"""
return f"""
# Odoo REST API v2.0
API REST completa para Odoo con autenticación JWT y documentación dinámica.
## 🔐 Autenticación
Esta API utiliza **JWT Bearer Tokens** para autenticación:
1. **Obtener token:**
```bash
curl -X POST {base_url}/api/v1/auth \\
-H "Content-Type: application/json" \\
-d '{{"username": "admin", "password": "admin"}}'
```
2. **Usar token en requests:**
```bash
curl -X GET {base_url}/api/v1/models \\
-H "Authorization: Bearer YOUR_JWT_TOKEN"
```
## 📊 Características
- **Esquemas dinámicos** basados en modelos reales de Odoo
- **Filtrado avanzado** con domain, limit, offset
- **Autenticación JWT** segura con expiración configurable
- **Validación automática** de tipos de datos
- **Documentación interactiva** con Swagger UI
## 🚀 Endpoints Principales
- `POST /auth` - Autenticación y obtención de token
- `POST /refresh` - Renovar token JWT
- `GET /models` - Lista de modelos disponibles
- `GET /health` - Estado de la API
- `GET /schema/{{model}}` - Esquema específico de un modelo
## 📝 Formato de Respuestas
Todas las respuestas siguen un formato consistente:
**Éxito:**
```json
{{
"success": true,
"count": 10,
"data": [...]
}}
```
**Error:**
```json
{{
"error": true,
"message": "Descripción del error",
"status_code": 400,
"error_code": "ERROR_CODE"
}}
```
"""
def _get_base_schemas(self):
"""Esquemas base mejorados"""
return {
"ErrorResponse": {
"type": "object",
"required": ["error", "message", "status_code"],
"properties": {
"error": {"type": "boolean", "example": True},
"message": {"type": "string", "example": "Error description"},
"status_code": {"type": "integer", "example": 400},
"error_code": {"type": "string", "example": "VALIDATION_ERROR"}
}
},
"AuthRequest": {
"type": "object",
"required": ["username", "password"],
"properties": {
"username": {"type": "string", "example": "admin"},
"password": {"type": "string", "format": "password", "example": "admin"},
"database": {"type": "string", "example": "odoo"},
"expires_in_hours": {"type": "integer", "minimum": 1, "maximum": 168, "default": 24, "example": 24}
}
},
"AuthResponse": {
"type": "object",
"properties": {
"success": {"type": "boolean", "example": True},
"message": {"type": "string", "example": "Authentication successful"},
"data": {
"type": "object",
"properties": {
"user_id": {"type": "integer", "example": 2},
"username": {"type": "string", "example": "admin"},
"name": {"type": "string", "example": "Administrator"},
"access_token": {"type": "string", "example": "eyJ0eXAiOiJKV1QiLCJhbGc..."},
"token_type": {"type": "string", "example": "Bearer"},
"expires_in": {"type": "integer", "example": 86400},
"database": {"type": "string", "example": "odoo"}
}
}
}
},
"HealthResponse": {
"type": "object",
"properties": {
"status": {"type": "string", "enum": ["healthy", "unhealthy"]},
"timestamp": {"type": "string", "format": "date-time"},
"database": {"type": "string"},
"active_models": {"type": "integer"},
"version": {"type": "string"},
"auth_method": {"type": "string", "example": "JWT Bearer Token"}
}
}
}
def _get_common_responses(self):
"""Respuestas comunes reutilizables"""
return {
"UnauthorizedError": {
"description": "Token JWT missing, invalid, or expired",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/ErrorResponse"},
"example": {
"error": True,
"message": "Token expirado",
"status_code": 401,
"error_code": "TOKEN_EXPIRED"
}
}
}
},
"NotFoundError": {
"description": "Resource not found",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/ErrorResponse"}}
}
},
"ValidationError": {
"description": "Invalid request data",
"content": {
"application/json": {"schema": {"$ref": "#/components/schemas/ErrorResponse"}}
}
}
}
def _generate_enhanced_schemas(self, api_configs):
"""Genera esquemas dinámicos mejorados"""
schemas = {}
for config in api_configs:
try:
if not hasattr(config, 'model_id') or not config.model_id:
continue
model_name = config.model_id.model
try:
model_class = request.env[model_name].sudo()
except KeyError:
_logger.warning(f"Model {model_name} not found")
continue
# Generar esquema mejorado del modelo
model_schema = self._generate_enhanced_model_schema(model_class, config)
schemas[f"{model_name}_values"] = model_schema
# Esquemas de request
schemas[f"{model_name}_create_request"] = {
"type": "object",
"required": ["values"],
"properties": {
"values": {"$ref": f"#/components/schemas/{model_name}_values"},
"fields": {
"type": "array",
"items": {"type": "string"},
"description": "Campos específicos a retornar en la respuesta",
"example": list(model_schema.get("properties", {}).keys())[:5]
}
},
"example": {
"values": self._generate_example_values(model_class, config),
"fields": ["id", "display_name"]
}
}
# Esquema de respuesta de lectura
read_properties = dict(model_schema.get("properties", {}))
read_properties.update({
"id": {"type": "integer", "description": "ID único del registro", "example": 1},
"display_name": {"type": "string", "description": "Nombre para mostrar"}
})
schemas[f"{model_name}_read_response"] = {
"type": "object",
"properties": read_properties
}
# Respuesta de colección con metadatos
schemas[f"{model_name}_collection_response"] = {
"type": "object",
"properties": {
"success": {"type": "boolean", "example": True},
"count": {"type": "integer", "example": 1},
"total": {"type": "integer", "description": "Total de registros (sin limit)"},
"offset": {"type": "integer", "description": "Registros omitidos"},
"limit": {"type": "integer", "description": "Límite aplicado"},
"data": {
"type": "array",
"items": {"$ref": f"#/components/schemas/{model_name}_read_response"}
}
}
}
except Exception as e:
_logger.error(f"Error generating enhanced schema for model {config.model_id.model}: {str(e)}")
continue
return schemas
def _generate_enhanced_model_schema(self, model_class, config):
"""Genera esquema mejorado para un modelo específico"""
properties = {}
required = []
try:
allowed_fields = self._get_allowed_fields(config)
forbidden_fields = self._get_forbidden_fields(config)
for field_name, field_obj in model_class._fields.items():
if field_name in forbidden_fields:
continue
if allowed_fields and field_name not in allowed_fields:
continue
field_schema = self._odoo_field_to_enhanced_json_schema(field_obj, field_name, model_class)
if field_schema:
properties[field_name] = field_schema
if getattr(field_obj, 'required', False):
required.append(field_name)
except Exception as e:
_logger.warning(f"Error processing enhanced model fields: {str(e)}")
schema = {
"type": "object",
"properties": properties
}
if required:
schema["required"] = required
return schema
def _odoo_field_to_enhanced_json_schema(self, field_obj, field_name, model_class):
"""Convierte un campo de Odoo a esquema JSON mejorado"""
field_type = type(field_obj).__name__
base_schema = {
"description": getattr(field_obj, 'help', '') or getattr(field_obj, 'string', field_name)
}
# Mapeo de tipos mejorado
type_mapping = {
'Char': {"type": "string"},
'Text': {"type": "string"},
'Html': {"type": "string", "format": "html"},
'Boolean': {"type": "boolean"},
'Integer': {"type": "integer"},
'Float': {"type": "number", "format": "float"},
'Monetary': {"type": "number", "format": "currency"},
'Date': {"type": "string", "format": "date"},
'Datetime': {"type": "string", "format": "date-time"},
'Binary': {"type": "string", "format": "binary"},
'Selection': {"type": "string"},
'Many2one': {"type": "integer"},
'One2many': {"type": "array", "items": {"type": "integer"}},
'Many2many': {"type": "array", "items": {"type": "integer"}},
}
schema = type_mapping.get(field_type, {"type": "string"})
schema.update(base_schema)
# Mejoras específicas por tipo de campo
if field_type == 'Char' and hasattr(field_obj, 'size') and field_obj.size:
schema["maxLength"] = field_obj.size
# Campos Selection con opciones reales
if field_type == 'Selection' and hasattr(field_obj, 'selection'):
try:
if callable(field_obj.selection):
try:
# Intentar obtener opciones dinámicas
options = field_obj.selection(model_class, field_name)
if options:
schema["enum"] = [opt[0] for opt in options if opt[0]]
schema["example"] = options[0][0] if options else None
schema["x-options"] = [{"value": opt[0], "label": opt[1]} for opt in options]
except:
schema["description"] += " (opciones dinámicas)"
else:
options = [opt[0] for opt in field_obj.selection if opt[0]]
if options:
schema["enum"] = options
schema["example"] = options[0]
schema["x-options"] = [{"value": opt[0], "label": opt[1]} for opt in field_obj.selection]
except Exception as e:
_logger.warning(f"Error getting selection options for {field_name}: {str(e)}")
# Campos relacionales con información del modelo relacionado
if field_type == 'Many2one':
comodel_name = getattr(field_obj, 'comodel_name', None)
if comodel_name:
schema.update({
"description": f"ID del registro relacionado del modelo {comodel_name}",
"x-related-model": comodel_name,
"minimum": 1
})
# Campos One2many y Many2many
if field_type in ['One2many', 'Many2many']:
comodel_name = getattr(field_obj, 'comodel_name', None)
if comodel_name:
schema["description"] = f"Lista de IDs de registros del modelo {comodel_name}"
schema["x-related-model"] = comodel_name
schema["items"]["minimum"] = 1
# Mejores ejemplos basados en el nombre del campo
if "example" not in schema:
schema["example"] = self._get_smart_example(field_name, schema.get("type"), field_type)
# Propiedades adicionales
if hasattr(field_obj, 'required') and field_obj.required:
schema["x-required"] = True
if hasattr(field_obj, 'readonly') and field_obj.readonly:
schema["readOnly"] = True
return schema
def _get_smart_example(self, field_name, json_type, odoo_type):
"""Genera ejemplos inteligentes basados en el nombre del campo"""
field_lower = field_name.lower()
# Ejemplos específicos por nombre de campo
smart_examples = {
'name': 'Ejemplo de nombre',
'email': 'usuario@ejemplo.com',
'phone': '+34123456789',
'mobile': '+34987654321',
'website': 'https://ejemplo.com',
'url': 'https://ejemplo.com',
'street': 'Calle Ejemplo 123',
'city': 'Madrid',
'zip': '28001',
'description': 'Descripción detallada del elemento',
'note': 'Nota adicional',
'comment': 'Comentario del usuario',
'reference': 'REF-001',
'code': 'COD123',
'login': 'usuario',
'password': 'contraseña_segura',
'price': 99.99,
'amount': 100.0,
'quantity': 1,
'qty': 5,
}
# Buscar coincidencias en el nombre del campo
for pattern, example in smart_examples.items():
if pattern in field_lower:
return example
# Ejemplos por tipo JSON
type_examples = {
"string": f"Valor de {field_name}",
"integer": 1,
"number": 10.5,
"boolean": True,
"array": [1, 2, 3]
}
return type_examples.get(json_type, None)
def _generate_example_values(self, model_class, config):
"""Genera valores de ejemplo para un modelo"""
example_values = {}
try:
allowed_fields = self._get_allowed_fields(config)
forbidden_fields = self._get_forbidden_fields(config)
for field_name, field_obj in model_class._fields.items():
if field_name in forbidden_fields:
continue
if allowed_fields and field_name not in allowed_fields:
continue
if field_name in ['id', 'create_date', 'write_date', 'create_uid', 'write_uid']:
continue
field_type = type(field_obj).__name__
example = self._get_smart_example(field_name,
self._get_json_type_for_odoo_field(field_type),
field_type)
if example is not None:
example_values[field_name] = example
except Exception as e:
_logger.warning(f"Error generating example values: {str(e)}")
return example_values
def _get_json_type_for_odoo_field(self, odoo_type):
"""Mapeo simple de tipo Odoo a tipo JSON"""
mapping = {
'Char': 'string', 'Text': 'string', 'Html': 'string', 'Selection': 'string',
'Integer': 'integer', 'Many2one': 'integer',
'Float': 'number', 'Monetary': 'number',
'Boolean': 'boolean',
'One2many': 'array', 'Many2many': 'array'
}
return mapping.get(odoo_type, 'string')
def _generate_enhanced_paths(self, api_configs):
"""Genera paths mejorados con parámetros adicionales"""
paths = {
"/auth": {
"post": {
"tags": ["Authentication"],
"summary": "Authenticate and get JWT token",
"description": "Autentica credenciales y devuelve un JWT token para usar en requests subsiguientes",
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/AuthRequest"}
}
}
},
"responses": {
"200": {
"description": "Authentication successful",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/AuthResponse"}
}
}
},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
},
"security": []
}
},
"/refresh": {
"post": {
"tags": ["Authentication"],
"summary": "Refresh JWT token",
"description": "Genera un nuevo JWT token usando el token actual",
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"expires_in_hours": {"type": "integer", "default": 24}
}
}
}
}
},
"responses": {
"200": {
"description": "Token refreshed successfully",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/AuthResponse"}
}
}
},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
},
"/health": {
"get": {
"tags": ["System"],
"summary": "API health check",
"description": "Verifica el estado de salud de la API y conexiones",
"responses": {
"200": {
"description": "API is healthy",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HealthResponse"}
}
}
},
"503": {
"description": "API is unhealthy",
"content": {
"application/json": {
"schema": {"$ref": "#/components/schemas/HealthResponse"}
}
}
}
},
"security": []
}
}
}
# Generar paths para cada modelo configurado
for config in api_configs:
try:
if not hasattr(config, 'model_id') or not config.model_id:
continue
model_name = config.model_id.model
model_display_name = config.model_id.name
collection_path = f"/{model_name}"
item_path = f"/{model_name}/{{id}}"
schema_path = f"/schema/{model_name}"
# Endpoint para obtener esquema del modelo
paths[schema_path] = {
"get": {
"tags": ["Schemas"],
"summary": f"Get {model_display_name} schema",
"description": f"Obtiene el esquema completo del modelo {model_display_name}",
"responses": {
"200": {"description": "Schema retrieved successfully"},
"404": {"$ref": "#/components/responses/NotFoundError"}
},
"security": []
}
}
if collection_path not in paths:
paths[collection_path] = {}
if item_path not in paths:
paths[item_path] = {}
# GET endpoints con parámetros mejorados
if config.is_get:
paths[collection_path]["get"] = {
"tags": [model_name],
"summary": f"Get all {model_display_name} records",
"description": f"Obtiene registros del modelo {model_display_name} con filtrado avanzado",
"parameters": [
{
"name": "domain",
"in": "query",
"description": "Filtros en formato Odoo domain",
"schema": {"type": "string"},
"example": "[['active', '=', True]]"
},
{
"name": "fields",
"in": "query",
"description": "Campos específicos a retornar (separados por comas)",
"schema": {"type": "string"},
"example": "id,name,email"
},
{
"name": "limit",
"in": "query",
"description": "Número máximo de registros",
"schema": {"type": "integer", "minimum": 1, "maximum": config.max_records_limit},
"example": 10
},
{
"name": "offset",
"in": "query",
"description": "Número de registros a omitir",
"schema": {"type": "integer", "minimum": 0},
"example": 0
},
{
"name": "order",
"in": "query",
"description": "Ordenamiento (ej: 'name asc', 'create_date desc')",
"schema": {"type": "string"},
"example": "name asc"
}
],
"responses": {
"200": {
"description": "List of records",
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_collection_response"}
}
}
},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
paths[item_path]["get"] = {
"tags": [model_name],
"summary": f"Get specific {model_display_name} record",
"description": f"Obtiene un registro específico del modelo {model_display_name}",
"parameters": [
{
"name": "id",
"in": "path",
"required": True,
"description": "ID del registro",
"schema": {"type": "integer", "minimum": 1}
},
{
"name": "fields",
"in": "query",
"description": "Campos específicos a retornar",
"schema": {"type": "string"}
}
],
"responses": {
"200": {
"description": "Record found",
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_collection_response"}
}
}
},
"404": {"$ref": "#/components/responses/NotFoundError"},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
# POST endpoint
if config.is_post:
paths[collection_path]["post"] = {
"tags": [model_name],
"summary": f"Create new {model_display_name} record",
"description": f"Crea un nuevo registro en el modelo {model_display_name}",
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_create_request"}
}
}
},
"responses": {
"201": {
"description": "Record created successfully",
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_collection_response"}
}
}
},
"400": {"$ref": "#/components/responses/ValidationError"},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
# PUT endpoint
if config.is_put:
paths[item_path]["put"] = {
"tags": [model_name],
"summary": f"Update {model_display_name} record",
"description": f"Actualiza un registro existente del modelo {model_display_name}",
"parameters": [
{
"name": "id",
"in": "path",
"required": True,
"description": "ID del registro a actualizar",
"schema": {"type": "integer", "minimum": 1}
}
],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_create_request"}
}
}
},
"responses": {
"200": {
"description": "Record updated successfully",
"content": {
"application/json": {
"schema": {"$ref": f"#/components/schemas/{model_name}_collection_response"}
}
}
},
"404": {"$ref": "#/components/responses/NotFoundError"},
"400": {"$ref": "#/components/responses/ValidationError"},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
# DELETE endpoint
if config.is_delete:
paths[item_path]["delete"] = {
"tags": [model_name],
"summary": f"Delete {model_display_name} record",
"description": f"Elimina un registro del modelo {model_display_name}",
"parameters": [
{
"name": "id",
"in": "path",
"required": True,
"description": "ID del registro a eliminar",
"schema": {"type": "integer", "minimum": 1}
}
],
"responses": {
"200": {
"description": "Record deleted successfully",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"success": {"type": "boolean", "example": True},
"message": {"type": "string", "example": "Record deleted successfully"},
"deleted_record": {
"type": "object",
"properties": {
"id": {"type": "integer"},
"display_name": {"type": "string"}
}
}
}
}
}
}
},
"404": {"$ref": "#/components/responses/NotFoundError"},
"401": {"$ref": "#/components/responses/UnauthorizedError"}
}
}
except Exception as e:
_logger.warning(f"Error generating enhanced paths for model: {str(e)}")
continue
return paths
def _generate_enhanced_tags(self, api_configs):
"""Genera tags mejorados para agrupar endpoints"""
tags = [
{"name": "Authentication", "description": "Autenticación JWT y gestión de tokens"},
{"name": "System", "description": "Endpoints del sistema (salud, estado)"},
{"name": "Schemas", "description": "Esquemas de modelos disponibles"}
]
for config in api_configs:
if hasattr(config, 'model_id') and config.model_id:
available_methods = []
if config.is_get: available_methods.append("GET")
if config.is_post: available_methods.append("POST")
if config.is_put: available_methods.append("PUT")
if config.is_delete: available_methods.append("DELETE")
tags.append({
"name": config.model_id.model,
"description": f"Operaciones CRUD para {config.model_id.name} - Métodos: {', '.join(available_methods)}",
"externalDocs": {
"description": "Esquema del modelo",
"url": f"/api/v1/schema/{config.model_id.model}"
}
})
return tags
def _get_allowed_fields(self, config):
"""Obtiene campos permitidos de la configuración"""
if not hasattr(config, 'allowed_fields') or not config.allowed_fields:
return None
return [f.strip() for f in config.allowed_fields.split(',') if f.strip()]
def _get_forbidden_fields(self, config):
"""Obtiene campos prohibidos de la configuración"""
default_forbidden = ['__last_update', 'create_uid', 'create_date', 'write_uid', 'write_date']
if not hasattr(config, 'forbidden_fields') or not config.forbidden_fields:
return default_forbidden
return [f.strip() for f in config.forbidden_fields.split(',') if f.strip()]

259
rest_api_odoo/models/connection_api.py

@ -5,6 +5,7 @@
# #
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>) # Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Ayana KP (odoo@cybrosys.com) # Author: Ayana KP (odoo@cybrosys.com)
# Modified by: Broigm - API configuration improvements
# #
# You can modify it under the terms of the GNU LESSER # You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3. # GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
@ -19,29 +20,247 @@
# If not, see <http://www.gnu.org/licenses/>. # If not, see <http://www.gnu.org/licenses/>.
# #
############################################################################# #############################################################################
from odoo import fields, models from odoo import fields, models, api, _
from odoo.exceptions import ValidationError
class ConnectionApi(models.Model): class ConnectionApi(models.Model):
"""This class is used to create an api model in which we can create """Configuración de modelos para REST API con mejores controles"""
records with models and fields, and also we can specify methods."""
_name = 'connection.api' _name = 'connection.api'
_description = 'Connection Rest Api' _description = 'REST API Configuration'
_rec_name = 'model_id' _rec_name = 'display_name'
model_id = fields.Many2one('ir.model', string="Model", model_id = fields.Many2one(
'ir.model',
string="Model",
required=True,
ondelete='cascade',
domain="[('transient', '=', False)]", domain="[('transient', '=', False)]",
help="Select model which can be accessed by " help="Modelo que será accesible a través de la REST API."
"REST api requests.") )
is_get = fields.Boolean(string='GET',
help="Select this to enable GET method " display_name = fields.Char(
"while sending requests.") string="Name",
is_post = fields.Boolean(string='POST', compute='_compute_display_name',
help="Select this to enable POST method" store=True
"while sending requests.") )
is_put = fields.Boolean(string='PUT',
help="Select this to enable PUT method " # Permisos de métodos HTTP
"while sending requests.") is_get = fields.Boolean(
is_delete = fields.Boolean(string='DELETE', string='GET (Read)',
help="Select this to enable DELETE method " default=True,
"while sending requests.") help="Permite operaciones de lectura (GET) en este modelo."
)
is_post = fields.Boolean(
string='POST (Create)',
default=False,
help="Permite operaciones de creación (POST) en este modelo."
)
is_put = fields.Boolean(
string='PUT (Update)',
default=False,
help="Permite operaciones de actualización (PUT) en este modelo."
)
is_delete = fields.Boolean(
string='DELETE',
default=False,
help="Permite operaciones de eliminación (DELETE) en este modelo."
)
# Configuraciones adicionales
active = fields.Boolean(
string="Active",
default=True,
help="Si está desactivado, el modelo no estará disponible en la API."
)
allowed_fields = fields.Text(
string="Allowed Fields",
help="Lista de campos permitidos separados por comas. Si está vacío, todos los campos son permitidos."
)
forbidden_fields = fields.Text(
string="Forbidden Fields",
default="__last_update,create_uid,create_date,write_uid,write_date",
help="Lista de campos prohibidos separados por comas."
)
max_records_limit = fields.Integer(
string="Max Records Limit",
default=1000,
help="Límite máximo de registros que se pueden obtener en una sola request GET."
)
require_record_id_for_write = fields.Boolean(
string="Require ID for Write Operations",
default=True,
help="Si está marcado, las operaciones PUT/DELETE requieren un ID específico."
)
# Campos informativos
api_endpoint = fields.Char(
string="API Endpoint",
compute='_compute_api_endpoint',
help="URL del endpoint de la API para este modelo."
)
description = fields.Text(
string="Description",
help="Descripción del propósito de esta configuración de API."
)
@api.depends('model_id')
def _compute_display_name(self):
"""Calcula el nombre para mostrar"""
for record in self:
if record.model_id:
record.display_name = f"API: {record.model_id.name}"
else:
record.display_name = "API Configuration"
@api.depends('model_id')
def _compute_api_endpoint(self):
"""Calcula la URL del endpoint de la API"""
for record in self:
if record.model_id:
base_url = self.env['ir.config_parameter'].sudo().get_param('web.base.url', 'http://localhost:8069')
record.api_endpoint = f"{base_url}/api/v1/{record.model_id.model}"
else:
record.api_endpoint = ""
@api.constrains('model_id')
def _check_unique_model(self):
"""Valida que no haya configuraciones duplicadas para el mismo modelo"""
for record in self:
if record.model_id:
existing = self.search([
('model_id', '=', record.model_id.id),
('id', '!=', record.id)
])
if existing:
raise ValidationError(
_("Ya existe una configuración de API para el modelo %s") % record.model_id.name
)
@api.constrains('max_records_limit')
def _check_max_records_limit(self):
"""Valida el límite máximo de registros"""
for record in self:
if record.max_records_limit <= 0:
raise ValidationError(_("El límite máximo de registros debe ser mayor a 0"))
if record.max_records_limit > 10000:
raise ValidationError(_("El límite máximo de registros no puede exceder 10,000"))
def get_allowed_fields(self):
"""Obtiene la lista de campos permitidos para este modelo"""
self.ensure_one()
if not self.allowed_fields:
# Si no hay campos específicos permitidos, obtener todos los campos del modelo
model_obj = self.env[self.model_id.model]
all_fields = list(model_obj._fields.keys())
else:
all_fields = [field.strip() for field in self.allowed_fields.split(',') if field.strip()]
# Remover campos prohibidos
forbidden = []
if self.forbidden_fields:
forbidden = [field.strip() for field in self.forbidden_fields.split(',') if field.strip()]
allowed = [field for field in all_fields if field not in forbidden]
return allowed
def get_forbidden_fields(self):
"""Obtiene la lista de campos prohibidos"""
self.ensure_one()
if not self.forbidden_fields:
return []
return [field.strip() for field in self.forbidden_fields.split(',') if field.strip()]
def is_method_allowed(self, method):
"""Verifica si un método HTTP está permitido"""
self.ensure_one()
if not self.active:
return False
method_map = {
'GET': self.is_get,
'POST': self.is_post,
'PUT': self.is_put,
'DELETE': self.is_delete
}
return method_map.get(method.upper(), False)
def action_test_api_endpoint(self):
"""Acción para probar el endpoint de la API (útil para botón en vista)"""
self.ensure_one()
if not self.model_id:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': _('No model selected'),
'type': 'warning',
}
}
return {
'type': 'ir.actions.act_url',
'url': self.api_endpoint,
'target': 'new',
}
@api.model
def get_api_statistics(self):
"""Obtiene estadísticas de uso de la API"""
stats = {}
# Estadísticas por modelo
api_configs = self.search([('active', '=', True)])
stats['active_models'] = len(api_configs)
stats['total_models'] = len(self.search([]))
# Estadísticas de usuarios con API keys
users_with_keys = self.env['res.users'].search([('api_key', '!=', False)])
stats['users_with_api_keys'] = len(users_with_keys)
stats['total_api_requests'] = sum(users_with_keys.mapped('api_requests_count'))
return stats
def toggle_active(self):
"""Alterna el estado activo de la configuración"""
for record in self:
record.active = not record.active
@api.model
def create_default_configurations(self):
"""Crea configuraciones por defecto para modelos comunes"""
default_models = [
('res.partner', {'is_get': True, 'is_post': True, 'is_put': True, 'is_delete': False}),
('product.product', {'is_get': True, 'is_post': True, 'is_put': True, 'is_delete': False}),
('sale.order', {'is_get': True, 'is_post': True, 'is_put': True, 'is_delete': False}),
('account.move', {'is_get': True, 'is_post': False, 'is_put': False, 'is_delete': False}),
]
created_configs = []
for model_name, config in default_models:
model_obj = self.env['ir.model'].search([('model', '=', model_name)], limit=1)
if model_obj:
existing = self.search([('model_id', '=', model_obj.id)])
if not existing:
api_config = self.create({
'model_id': model_obj.id,
'description': f'Default API configuration for {model_obj.name}',
**config
})
created_configs.append(api_config)
return created_configs

251
rest_api_odoo/models/res_users.py

@ -5,6 +5,7 @@
# #
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>) # Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Ayana KP (odoo@cybrosys.com) # Author: Ayana KP (odoo@cybrosys.com)
# Modified by: Broigm - Improvements in API key management
# #
# You can modify it under the terms of the GNU LESSER # You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3. # GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
@ -20,25 +21,245 @@
# #
############################################################################# #############################################################################
import uuid import uuid
from odoo import fields, models import secrets
import string
import logging
from datetime import datetime, timedelta
from odoo import fields, models, api
_logger = logging.getLogger(__name__)
class ResUsers(models.Model): class ResUsers(models.Model):
"""This class is used to inherit users and add api key generation""" """Extensión del modelo de usuarios para gestión de API keys"""
_inherit = 'res.users' _inherit = 'res.users'
api_key = fields.Char(string="API Key", readonly=True, api_key = fields.Char(
help="Api key for connecting with the " string="API Key",
"Database.The key will be " readonly=True,
"generated when authenticating " help="Clave API para autenticación en REST API. Se genera automáticamente."
"rest api.") )
api_key_expiry = fields.Datetime(
string="API Key Expiry",
help="Fecha de expiración de la API key (opcional)"
)
api_key_created = fields.Datetime(
string="API Key Created",
help="Fecha de creación de la API key"
)
api_key_last_used = fields.Datetime(
string="API Key Last Used",
help="Última vez que se usó la API key"
)
api_requests_count = fields.Integer(
string="API Requests Count",
default=0,
help="Contador de requests realizados con esta API key"
)
def generate_api(self, username): def generate_api_key(self, force_new=False):
"""This function is used to generate api-key for each user""" """
users = self.env['res.users'].sudo().search([('login', '=', username)]) Genera una nueva API key o devuelve la existente
if not users.api_key: Args:
users.api_key = str(uuid.uuid4()) force_new (bool): Fuerza la generación de una nueva key
key = users.api_key Returns:
str: API key generada
"""
self.ensure_one()
if not self.api_key or force_new:
try:
# Generar una API key más segura usando secrets si está disponible
alphabet = string.ascii_letters + string.digits
api_key = ''.join(secrets.choice(alphabet) for _ in range(64))
except (ImportError, AttributeError):
# Fallback usando uuid si secrets no está disponible
api_key = str(uuid.uuid4()).replace('-', '') + str(uuid.uuid4()).replace('-', '')[:32]
self.sudo().write({
'api_key': api_key,
'api_key_created': fields.Datetime.now(),
'api_key_last_used': None,
'api_requests_count': 0
})
return self.api_key
def regenerate_api_key(self):
"""Regenera la API key (útil para botón en interfaz)"""
self.ensure_one()
return self.generate_api_key(force_new=True)
def revoke_api_key(self):
"""Revoca la API key actual"""
self.ensure_one()
self.sudo().write({
'api_key': False,
'api_key_expiry': False,
'api_key_created': False,
'api_key_last_used': False
})
return True
def set_api_key_expiry(self, days=None):
"""
Establece fecha de expiración para la API key
Args:
days (int): Días hasta la expiración (default: sin expiración)
"""
self.ensure_one()
if days:
expiry_date = fields.Datetime.now() + timedelta(days=days)
self.sudo().write({'api_key_expiry': expiry_date})
else: else:
key = users.api_key self.sudo().write({'api_key_expiry': False})
return key
def update_api_key_usage(self):
"""Actualiza estadísticas de uso de la API key"""
self.ensure_one()
try:
self.sudo().write({
'api_key_last_used': fields.Datetime.now(),
'api_requests_count': self.api_requests_count + 1
})
except Exception as e:
_logger.warning(f"Could not update API key usage for user {self.id}: {str(e)}")
@api.model
def cleanup_expired_api_keys(self):
"""Limpia API keys expiradas (para ejecutar en cron)"""
try:
expired_users = self.search([
('api_key_expiry', '!=', False),
('api_key_expiry', '<', fields.Datetime.now())
])
for user in expired_users:
try:
user.revoke_api_key()
except Exception as e:
_logger.warning(f"Could not revoke API key for user {user.id}: {str(e)}")
return len(expired_users)
except Exception as e:
_logger.error(f"Error in cleanup_expired_api_keys: {str(e)}")
return 0
def is_api_key_valid(self):
"""Verifica si la API key es válida y no ha expirado"""
self.ensure_one()
if not self.api_key:
return False
if self.api_key_expiry and self.api_key_expiry < fields.Datetime.now():
return False
return True
def action_generate_api_key(self):
"""Acción para generar API key desde la interfaz"""
self.ensure_one()
try:
api_key = self.generate_api_key()
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': f'API Key generada exitosamente: {api_key}',
'type': 'success',
'sticky': True,
}
}
except Exception as e:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': f'Error generando API Key: {str(e)}',
'type': 'danger',
'sticky': True,
}
}
def action_regenerate_api_key(self):
"""Acción para regenerar API key desde la interfaz"""
self.ensure_one()
try:
api_key = self.regenerate_api_key()
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': f'API Key regenerada exitosamente: {api_key}',
'type': 'success',
'sticky': True,
}
}
except Exception as e:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': f'Error regenerando API Key: {str(e)}',
'type': 'danger',
'sticky': True,
}
}
def action_revoke_api_key(self):
"""Acción para revocar API key desde la interfaz"""
self.ensure_one()
try:
self.revoke_api_key()
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': 'API Key revocada exitosamente',
'type': 'warning',
'sticky': False,
}
}
except Exception as e:
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': f'Error revocando API Key: {str(e)}',
'type': 'danger',
'sticky': True,
}
}
# Método legacy para compatibilidad con código anterior
def generate_api(self, username):
"""Método de compatibilidad con la versión anterior"""
return self.generate_api_key()
@api.model
def get_api_statistics(self):
"""Obtiene estadísticas de uso de API keys"""
try:
users_with_keys = self.search([('api_key', '!=', False)])
active_keys = users_with_keys.filtered(lambda u: u.is_api_key_valid())
expired_keys = users_with_keys.filtered(lambda u: not u.is_api_key_valid())
total_requests = sum(users_with_keys.mapped('api_requests_count'))
return {
'total_users_with_keys': len(users_with_keys),
'active_keys': len(active_keys),
'expired_keys': len(expired_keys),
'total_api_requests': total_requests,
'average_requests_per_user': total_requests / len(users_with_keys) if users_with_keys else 0
}
except Exception as e:
_logger.error(f"Error getting API statistics: {str(e)}")
return {
'total_users_with_keys': 0,
'active_keys': 0,
'expired_keys': 0,
'total_api_requests': 0,
'average_requests_per_user': 0
}

3
rest_api_odoo/security/ir.model.access.csv

@ -1,2 +1,3 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_connection_api_user,access.connection.api.user,model_connection_api,,1,1,1,1 access_connection_api_user,access_connection_api_user,model_connection_api,base.group_user,1,0,0,0
access_connection_api_manager,access_connection_api_manager,model_connection_api,base.group_system,1,1,1,1

1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
2 access_connection_api_user access.connection.api.user access_connection_api_user model_connection_api base.group_user 1 1 0 1 0 1 0
3 access_connection_api_manager access_connection_api_manager model_connection_api base.group_system 1 1 1 1

176
rest_api_odoo/views/api_dashboard_views.xml

@ -0,0 +1,176 @@
<?xml version="1.0" encoding="utf-8"?>
<odoo>
<!-- Modelo para el dashboard de la API -->
<record id="api_dashboard_action" model="ir.actions.client">
<field name="name">API Dashboard</field>
<field name="tag">api_dashboard</field>
</record>
<!-- Template para el dashboard de la API -->
<template id="api_dashboard_template">
<div class="container-fluid">
<div class="row mb-4">
<div class="col-12">
<div class="card">
<div class="card-header bg-primary text-white">
<h3 class="card-title mb-0">
<i class="fa fa-code"/> Odoo REST API Dashboard
</h3>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-8">
<h4>Bienvenido a la REST API de Odoo</h4>
<p class="lead">
Esta API te permite integrar aplicaciones externas con Odoo
usando estándares REST y autenticación basada en API Keys.
</p>
<div class="row mt-4">
<div class="col-md-6">
<div class="card border-success">
<div class="card-body text-center">
<i class="fa fa-book fa-3x text-success mb-3"/>
<h5>Documentación Interactiva</h5>
<p>Explora y prueba todos los endpoints disponibles</p>
<a href="/api/v1/docs" target="_blank" class="btn btn-success">
Ver Swagger UI
</a>
</div>
</div>
</div>
<div class="col-md-6">
<div class="card border-info">
<div class="card-body text-center">
<i class="fa fa-cogs fa-3x text-info mb-3"/>
<h5>Configuración de Modelos</h5>
<p>Gestiona qué modelos están disponibles en la API</p>
<a href="/web#action=rest_api_root_action" class="btn btn-info">
Configurar API
</a>
</div>
</div>
</div>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
<h5><i class="fa fa-info-circle"/> Información Rápida</h5>
</div>
<div class="card-body">
<ul class="list-unstyled">
<li><strong>Base URL:</strong> <code id="base-url">/api/v1</code></li>
<li><strong>Autenticación:</strong> API Key</li>
<li><strong>Formato:</strong> JSON</li>
<li><strong>Versión:</strong> 1.0.0</li>
</ul>
<hr/>
<h6><i class="fa fa-external-link"/> Enlaces Útiles</h6>
<div class="list-group list-group-flush">
<a href="/api" target="_blank" class="list-group-item list-group-item-action">
<i class="fa fa-home"/> API Root
</a>
<a href="/api/v1/openapi.json" target="_blank" class="list-group-item list-group-item-action">
<i class="fa fa-file-code-o"/> OpenAPI Spec
</a>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col-12">
<div class="card">
<div class="card-header">
<h5><i class="fa fa-rocket"/> Comenzar Rápidamente</h5>
</div>
<div class="card-body">
<div class="row">
<div class="col-md-6">
<h6>1. Obtener API Key</h6>
<pre class="bg-light p-3"><code>curl -X POST <span id="auth-url">/api/v1/auth</span> \
-H "Content-Type: application/json" \
-d '{
"username": "tu_usuario",
"password": "tu_contraseña"
}'</code></pre>
</div>
<div class="col-md-6">
<h6>2. Usar la API</h6>
<pre class="bg-light p-3"><code>curl -X GET <span id="models-url">/api/v1/models</span> \
-H "X-API-Key: tu_api_key_aqui"</code></pre>
</div>
</div>
</div>
</div>
</div>
</div>
</div>
<script>
// Actualizar URLs con la URL base actual
document.addEventListener('DOMContentLoaded', function() {
const baseUrl = window.location.origin;
document.getElementById('base-url').textContent = baseUrl + '/api/v1';
document.getElementById('auth-url').textContent = baseUrl + '/api/v1/auth';
document.getElementById('models-url').textContent = baseUrl + '/api/v1/models';
});
</script>
</template>
<!-- Acción para el dashboard principal -->
<record id="api_main_dashboard_action" model="ir.actions.act_url">
<field name="name">API Documentation</field>
<field name="url">/api/v1/docs</field>
<field name="target">new</field>
</record>
<!-- Vista para estadísticas de la API -->
<record id="api_stats_view" model="ir.ui.view">
<field name="name">api.stats.view</field>
<field name="model">connection.api</field>
<field name="arch" type="xml">
<list create="false" edit="false" delete="false">
<field name="display_name"/>
<field name="model_id"/>
<field name="active"/>
<field name="is_get"/>
<field name="is_post"/>
<field name="is_put"/>
<field name="is_delete"/>
<field name="max_records_limit"/>
</list>
</field>
</record>
<!-- Acción para estadísticas -->
<record id="api_stats_action" model="ir.actions.act_window">
<field name="name">API Statistics</field>
<field name="res_model">connection.api</field>
<field name="view_mode">list</field>
<field name="view_id" ref="api_stats_view"/>
<field name="context">{'search_default_active': 1}</field>
</record>
<!-- Menú principal actualizado con el dashboard -->
<menuitem id="rest_api_dashboard_menu"
name="API Dashboard"
parent="rest_api_root"
action="api_main_dashboard_action"
sequence="50"/>
<menuitem id="rest_api_stats_menu"
name="API Statistics"
parent="rest_api_root"
action="api_stats_action"
sequence="35"
groups="base.group_system"/>
</odoo>

228
rest_api_odoo/views/connection_api_views.xml

@ -1,61 +1,237 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<odoo> <odoo>
<!-- Form view for 'connection.api' model. --> <!-- Form view para 'connection.api' -->
<record id="connection_api_view_form" model="ir.ui.view"> <record id="connection_api_view_form" model="ir.ui.view">
<field name="name">connection.api.view.form</field> <field name="name">connection.api.view.form</field>
<field name="model">connection.api</field> <field name="model">connection.api</field>
<field name="arch" type="xml"> <field name="arch" type="xml">
<form> <form>
<header>
<button name="toggle_active" type="object"
string="Activate" class="btn-primary"
invisible="active"/>
<button name="toggle_active" type="object"
string="Deactivate" class="btn-secondary"
invisible="not active"/>
<button name="action_test_api_endpoint" type="object"
string="Test Endpoint" class="btn-info"
invisible="not model_id"/>
<field name="active" widget="boolean_toggle"/>
</header>
<sheet> <sheet>
<div class="oe_title">
<h1>
<field name="display_name" readonly="1"/>
</h1>
<h3>
<field name="api_endpoint" readonly="1"
widget="url" invisible="not api_endpoint"/>
</h3>
</div>
<group> <group>
<group string="Resource"> <group string="Model Configuration">
<field name="model_id" string="Model"/> <field name="model_id" options="{'no_create': True, 'no_open': True}"/>
<field name="description" placeholder="Describe the purpose of this API configuration..."/>
</group>
<group string="HTTP Methods">
<field name="is_get" string="GET (Read)"/>
<field name="is_post" string="POST (Create)"/>
<field name="is_put" string="PUT (Update)"/>
<field name="is_delete" string="DELETE"/>
</group>
</group>
<notebook>
<page string="Field Configuration" name="fields">
<group>
<group string="Allowed Fields">
<field name="allowed_fields"
placeholder="field1, field2, field3... (leave empty for all fields)"
widget="text" nolabel="1"/>
<div class="text-muted">
<small>Lista de campos permitidos separados por comas.</small>
</div>
</group>
<group string="Forbidden Fields">
<field name="forbidden_fields" widget="text" nolabel="1"/>
<div class="text-muted">
<small>Lista de campos prohibidos separados por comas.</small>
</div>
</group>
</group>
</page>
<page string="Security &amp; Limits" name="security">
<group>
<group string="Request Limits">
<field name="max_records_limit"/>
<field name="require_record_id_for_write"/>
</group>
</group>
</page>
<page string="API Information" name="api_info">
<group>
<group string="Endpoint Information">
<field name="api_endpoint" readonly="1" widget="url"/>
</group> </group>
<group string="Methods"> <group string="Documentation">
<field name="is_get"/> <div>
<field name="is_post"/> <a href="/api/v1/docs" target="_blank" class="btn btn-info">
<field name="is_put"/> <i class="fa fa-book"/> View Swagger Documentation
<field name="is_delete"/> </a>
<br/><br/>
<a href="/api/v1/openapi.json" target="_blank" class="btn btn-secondary">
<i class="fa fa-file-code-o"/> View OpenAPI Spec
</a>
</div>
</group> </group>
</group> </group>
<div class="alert alert-info" role="alert" invisible="not api_endpoint">
<h4>Usage Examples:</h4>
<p><strong>Get all records:</strong><br/>
<code>GET <field name="api_endpoint" readonly="1" nolabel="1"/></code></p>
<p><strong>Get specific record:</strong><br/>
<code>GET <field name="api_endpoint" readonly="1" nolabel="1"/>/123</code></p>
<p><strong>Create record:</strong><br/>
<code>POST <field name="api_endpoint" readonly="1" nolabel="1"/></code></p>
<p><strong>Update record:</strong><br/>
<code>PUT <field name="api_endpoint" readonly="1" nolabel="1"/>/123</code></p>
<p><strong>Delete record:</strong><br/>
<code>DELETE <field name="api_endpoint" readonly="1" nolabel="1"/>/123</code></p>
<p><em>Remember to include your API Key in the header: <code>X-API-Key: your_api_key_here</code></em></p>
</div>
</page>
</notebook>
</sheet> </sheet>
</form> </form>
</field> </field>
</record> </record>
<!-- List view for 'connection.api' model. -->
<!-- List view para 'connection.api' -->
<record id="connection_api_view_list" model="ir.ui.view"> <record id="connection_api_view_list" model="ir.ui.view">
<field name="name">connection.api.view.list</field> <field name="name">connection.api.view.list</field>
<field name="model">connection.api</field> <field name="model">connection.api</field>
<field name="arch" type="xml"> <field name="arch" type="xml">
<list> <list decoration-muted="not active" create="true" edit="true" delete="true">
<field name="model_id" string="Model"/> <field name="active" column_invisible="1"/>
<field name="is_get"/> <field name="display_name"/>
<field name="is_post"/> <field name="model_id"/>
<field name="is_put"/> <field name="is_get" widget="boolean_toggle"/>
<field name="is_delete"/> <field name="is_post" widget="boolean_toggle"/>
<field name="is_put" widget="boolean_toggle"/>
<field name="is_delete" widget="boolean_toggle"/>
<field name="max_records_limit"/>
<field name="active" widget="boolean_toggle"/>
<button name="action_test_api_endpoint" type="object"
string="Test" icon="fa-external-link"
title="Test API Endpoint"/>
<button name="toggle_active" type="object"
string="Toggle Active" icon="fa-toggle-on"
title="Activate/Deactivate"/>
</list> </list>
</field> </field>
</record> </record>
<!-- Action for 'connection.api' model with List and form views. -->
<!-- Vista de búsqueda -->
<record id="connection_api_view_search" model="ir.ui.view">
<field name="name">connection.api.view.search</field>
<field name="model">connection.api</field>
<field name="arch" type="xml">
<search>
<field name="model_id" string="Model"/>
<field name="display_name" string="Name"/>
<separator/>
<filter name="active" string="Active" domain="[('active', '=', True)]"/>
<filter name="inactive" string="Inactive" domain="[('active', '=', False)]"/>
<separator/>
<group expand="0" string="Group By">
<filter name="group_by_model" string="Model" context="{'group_by': 'model_id'}"/>
<filter name="group_by_active" string="Status" context="{'group_by': 'active'}"/>
</group>
</search>
</field>
</record>
<!-- Acción principal -->
<record id="rest_api_root_action" model="ir.actions.act_window"> <record id="rest_api_root_action" model="ir.actions.act_window">
<field name="name">Rest API Records</field> <field name="name">REST API Configuration</field>
<field name="type">ir.actions.act_window</field>
<field name="res_model">connection.api</field> <field name="res_model">connection.api</field>
<field name="view_mode">list,form</field> <field name="view_mode">list,form</field>
<field name="context">{'search_default_active': 1}</field>
<field name="help" type="html"> <field name="help" type="html">
<p class="o_view_nocontent_smiling_face"> <p class="o_view_nocontent_smiling_face">
Create! Configure your first REST API endpoint!
</p>
<p>
Create API configurations to expose Odoo models through REST endpoints.
</p>
<p>
<a href="/api/v1/docs" target="_blank" class="btn btn-primary">
<i class="fa fa-book"/> View API Documentation
</a>
</p> </p>
</field> </field>
</record> </record>
<!-- Menu items for the REST API. -->
<menuitem id="rest_api_root" <!-- Server action SIN f-strings -->
name="Rest API" <record id="action_create_default_api_configs" model="ir.actions.server">
sequence="10" <field name="name">Create Default API Configurations</field>
web_icon="rest_api_odoo,static/description/icon.png"/> <field name="model_id" ref="model_connection_api"/>
<menuitem id="rest_api_details_root" <field name="state">code</field>
name="Rest API" <field name="code"><![CDATA[
# Crear configuraciones básicas sin f-strings
default_models = ['res.partner', 'product.product', 'sale.order']
created = []
for model_name in default_models:
model_obj = env['ir.model'].search([('model', '=', model_name)], limit=1)
if model_obj:
existing = model.search([('model_id', '=', model_obj.id)])
if not existing:
config = model.create({
'model_id': model_obj.id,
'description': 'Default API configuration for ' + model_obj.name,
'is_get': True,
'is_post': True,
'is_put': True,
'is_delete': False
})
created.append(config)
if created:
message = "Created " + str(len(created)) + " default configurations"
msg_type = 'success'
else:
message = "Default configurations already exist"
msg_type = 'info'
action = {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': message,
'type': msg_type,
'sticky': False,
}
}
]]></field>
</record>
<!-- Menús -->
<menuitem id="rest_api_root" name="REST API" sequence="10"/>
<menuitem id="rest_api_config_menu"
name="API Configuration"
parent="rest_api_root" parent="rest_api_root"
action="rest_api_root_action" action="rest_api_root_action"
sequence="10"/> sequence="10"/>
<menuitem id="rest_api_create_defaults_menu"
name="Create Default Configs"
parent="rest_api_root"
action="action_create_default_api_configs"
sequence="20"/>
</odoo> </odoo>

158
rest_api_odoo/views/res_users_views.xml

@ -1,18 +1,166 @@
<?xml version="1.0" encoding="utf-8"?> <?xml version="1.0" encoding="utf-8"?>
<odoo> <odoo>
<!-- Inherited user view for Adding API key. --> <!-- Vista de usuario para gestión de API keys -->
<record id="view_users_form" model="ir.ui.view"> <record id="view_users_form_api_enhanced" model="ir.ui.view">
<field name="name">view.users.form.inherit.rest.api.odoo</field> <field name="name">view.users.form.inherit.rest.api.enhanced</field>
<field name="inherit_id" ref="base.view_users_form"/> <field name="inherit_id" ref="base.view_users_form"/>
<field name="model">res.users</field> <field name="model">res.users</field>
<field name="arch" type="xml"> <field name="arch" type="xml">
<xpath expr="//page[@name='access_rights']" position="after"> <xpath expr="//page[@name='access_rights']" position="after">
<page string="API" name="rest-api"> <page string="REST API" name="rest-api" groups="base.group_system">
<group> <group>
<field name="api_key" groups="base.group_user"/> <group string="API Key Management">
<field name="api_key" readonly="1" password="True"/>
<field name="api_key_created" readonly="1"/>
<field name="api_key_expiry" readonly="1"/>
</group> </group>
<group string="Usage Statistics">
<field name="api_requests_count" readonly="1"/>
<field name="api_key_last_used" readonly="1"/>
</group>
</group>
<div class="oe_button_box">
<button name="action_generate_api_key" type="object"
string="Generate API Key"
class="btn-primary"
invisible="api_key"
confirm="This will generate a new API key. Continue?"/>
<button name="action_regenerate_api_key" type="object"
string="Regenerate API Key"
class="btn-warning"
invisible="not api_key"
confirm="This will invalidate the current API key. Continue?"/>
<button name="action_revoke_api_key" type="object"
string="Revoke API Key"
class="btn-danger"
invisible="not api_key"
confirm="This will permanently revoke the API key. Continue?"/>
</div>
<div class="alert alert-info" role="alert" invisible="not api_key">
<h4>API Usage Instructions</h4>
<p><strong>Authentication:</strong> Include your API key in the request header:</p>
<pre><code>X-API-Key: [Your API Key Here]</code></pre>
<p><strong>Available Endpoints:</strong></p>
<ul>
<li><code>GET /api/v1/models</code> - List available models</li>
<li><code>GET /api/v1/{model}</code> - Get all records</li>
<li><code>GET /api/v1/{model}/{id}</code> - Get specific record</li>
<li><code>POST /api/v1/{model}</code> - Create new record</li>
<li><code>PUT /api/v1/{model}/{id}</code> - Update record</li>
<li><code>DELETE /api/v1/{model}/{id}</code> - Delete record</li>
</ul>
<p><strong>Example:</strong></p>
<pre><code>curl -X GET http://your-server/api/v1/res.partner -H "X-API-Key: [Key]"</code></pre>
</div>
<div class="alert alert-warning" role="alert" invisible="api_key">
<h4>No API Key Generated</h4>
<p>Click "Generate API Key" to create an API key for this user.</p>
</div>
</page> </page>
</xpath> </xpath>
</field> </field>
</record> </record>
<!-- Vista de lista corregida - Usando base.view_users_tree -->
<record id="view_users_list_api_info" model="ir.ui.view">
<field name="name">view.users.list.api.info</field>
<field name="inherit_id" ref="base.view_users_tree"/>
<field name="model">res.users</field>
<field name="arch" type="xml">
<field name="login_date" position="after">
<field name="api_key" string="Has API Key" widget="boolean" optional="hide"/>
<field name="api_requests_count" string="API Requests" optional="hide"/>
<field name="api_key_last_used" string="API Last Used" optional="hide"/>
</field>
</field>
</record>
<!-- Server action SIMPLIFICADO sin f-strings -->
<record id="action_cleanup_expired_api_keys" model="ir.actions.server">
<field name="name">Cleanup Expired API Keys</field>
<field name="model_id" ref="base.model_res_users"/>
<field name="state">code</field>
<field name="code"><![CDATA[
# Código simple sin imports prohibidos y sin f-strings
expired_users = model.search([
('api_key_expiry', '!=', False),
('api_key_expiry', '&lt;', fields.Datetime.now())
])
cleaned_count = 0
for user in expired_users:
user.write({
'api_key': False,
'api_key_expiry': False,
'api_key_created': False,
'api_key_last_used': False
})
cleaned_count += 1
if cleaned_count > 0:
message = "Cleaned " + str(cleaned_count) + " expired API keys."
msg_type = 'success'
else:
message = "No expired API keys found."
msg_type = 'info'
action = {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'message': message,
'type': msg_type,
'sticky': False,
}
}
]]></field>
</record>
<!-- Menús -->
<menuitem id="rest_api_users_menu"
name="API Users Management"
parent="rest_api_root"
action="base.action_res_users"
sequence="30"
groups="base.group_system"/>
<menuitem id="rest_api_cleanup_menu"
name="Cleanup Expired Keys"
parent="rest_api_root"
action="action_cleanup_expired_api_keys"
sequence="40"
groups="base.group_system"/>
<!-- Cron job SIMPLIFICADO -->
<record id="ir_cron_cleanup_expired_api_keys" model="ir.cron">
<field name="name">Cleanup Expired API Keys</field>
<field name="model_id" ref="base.model_res_users"/>
<field name="state">code</field>
<field name="code"><![CDATA[
# Código simple para cron
try:
expired_users = model.search([
('api_key_expiry', '!=', False),
('api_key_expiry', '&lt;', fields.Datetime.now())
])
for user in expired_users:
user.write({
'api_key': False,
'api_key_expiry': False,
'api_key_created': False,
'api_key_last_used': False
})
except Exception:
pass
]]></field>
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="active">True</field>
</record>
</odoo> </odoo>

Loading…
Cancel
Save