|
|
@ -22,222 +22,244 @@ |
|
|
|
|
|
|
|
import json |
|
|
|
import logging |
|
|
|
from odoo import http |
|
|
|
from odoo.http import request |
|
|
|
|
|
|
|
from datetime import datetime |
|
|
|
from odoo import http, models |
|
|
|
from odoo.http import request, Response |
|
|
|
from ast import literal_eval |
|
|
|
|
|
|
|
_logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
class RestApi(http.Controller): |
|
|
|
"""This is a controller which is used to generate responses based on the |
|
|
|
api requests""" |
|
|
|
class IrHttp(models.AbstractModel): |
|
|
|
"""This model is used to authenticate the api-key when sending a request""" |
|
|
|
_inherit = "ir.http" |
|
|
|
|
|
|
|
def auth_api_key(self, api_key): |
|
|
|
@classmethod |
|
|
|
def _auth_method_rest_api(cls): |
|
|
|
"""This function is used to authenticate the api-key when sending a |
|
|
|
request""" |
|
|
|
|
|
|
|
user_id = request.env['res.users'].search([('api_key', '=', api_key)]) |
|
|
|
if api_key is not None and user_id: |
|
|
|
response = True |
|
|
|
elif not user_id: |
|
|
|
response = ('<html><body><h2>Invalid <i>API Key</i> ' |
|
|
|
'!</h2></body></html>') |
|
|
|
else: |
|
|
|
response = ("<html><body><h2>No <i>API Key</i> Provided " |
|
|
|
"!</h2></body></html>") |
|
|
|
try: |
|
|
|
api_key = request.httprequest.headers.get("Authorization").lstrip("Bearer ") |
|
|
|
if not api_key: |
|
|
|
raise PermissionError("Invalid API key provided.") |
|
|
|
|
|
|
|
user_id = request.env["res.users.apikeys"]._check_credentials( |
|
|
|
scope="rpc", key=api_key |
|
|
|
) |
|
|
|
request.update_env(user_id) |
|
|
|
except Exception as e: |
|
|
|
_logger.error(e) |
|
|
|
return Response( |
|
|
|
json.dumps({"message": e.args[0]}), |
|
|
|
status=401, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
class RestApi(http.Controller): |
|
|
|
"""This is a controller which is used to generate responses based on the |
|
|
|
api requests""" |
|
|
|
|
|
|
|
def simplest_type(self, input): |
|
|
|
"""Try cast input into native Python class, otherwise return as string""" |
|
|
|
try: |
|
|
|
return literal_eval(input) |
|
|
|
except Exception: |
|
|
|
# Handle lowercase booleans |
|
|
|
if input == "true": |
|
|
|
return True |
|
|
|
if input == "false": |
|
|
|
return False |
|
|
|
return input |
|
|
|
|
|
|
|
return response |
|
|
|
def sanitize_records(self, records): |
|
|
|
"""Sanitize records for response""" |
|
|
|
for record in records: |
|
|
|
for key, value in record.items(): |
|
|
|
# Manually convert datetime fields to string format |
|
|
|
if isinstance(value, datetime): |
|
|
|
record[key] = value.isoformat() |
|
|
|
return records |
|
|
|
|
|
|
|
def generate_response(self, method, model, rec_id): |
|
|
|
def generate_response(self, method, **query): |
|
|
|
"""This function is used to generate the response based on the type |
|
|
|
of request and the parameters given""" |
|
|
|
option = request.env['connection.api'].search( |
|
|
|
[('model_id', '=', model)], limit=1) |
|
|
|
try: |
|
|
|
model = query.pop("model") |
|
|
|
option = request.env["connection.api"].search( |
|
|
|
[("model_id", "=", model)], limit=1 |
|
|
|
) |
|
|
|
model_name = option.model_id.model |
|
|
|
model_display_name = option.model_id.name |
|
|
|
|
|
|
|
if method != 'DELETE': |
|
|
|
try: |
|
|
|
data = json.loads(request.httprequest.data) |
|
|
|
else: |
|
|
|
except Exception: |
|
|
|
data = {} |
|
|
|
|
|
|
|
fields = [] |
|
|
|
if data: |
|
|
|
for field in data['fields']: |
|
|
|
for field in data["fields"]: |
|
|
|
fields.append(field) |
|
|
|
if not fields and method != 'DELETE': |
|
|
|
return ("<html><body><h2>No fields selected for the model" |
|
|
|
"</h2></body></html>") |
|
|
|
if not option: |
|
|
|
return ("<html><body><h2>No Record Created for the model" |
|
|
|
"</h2></body></html>") |
|
|
|
try: |
|
|
|
if method == 'GET': |
|
|
|
|
|
|
|
# Return records' ID by default if not specified |
|
|
|
if not fields: |
|
|
|
fields.append("id") |
|
|
|
|
|
|
|
# Get all model's fields if wildcard is used |
|
|
|
if "*" in fields: |
|
|
|
fields = [] |
|
|
|
for field in data['fields']: |
|
|
|
record_fields = request.env[str(model_name)].fields_get( |
|
|
|
[], attributes=["type"] |
|
|
|
) |
|
|
|
for field, value in record_fields.items(): |
|
|
|
value_type = value.get("type") |
|
|
|
if not (value_type == "binary"): |
|
|
|
fields.append(field) |
|
|
|
if not option: |
|
|
|
raise NotImplementedError("No Record Created for the model. ") |
|
|
|
if method == "GET": |
|
|
|
if not option.is_get: |
|
|
|
return ("<html><body><h2>Method Not Allowed" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
datas = [] |
|
|
|
if rec_id != 0: |
|
|
|
partner_records = request.env[ |
|
|
|
str(model_name)].search_read( |
|
|
|
domain=[('id', '=', rec_id)], |
|
|
|
fields=fields |
|
|
|
) |
|
|
|
data = json.dumps({ |
|
|
|
'records': partner_records |
|
|
|
}) |
|
|
|
datas.append(data) |
|
|
|
return request.make_response(data=datas) |
|
|
|
else: |
|
|
|
partner_records = request.env[ |
|
|
|
str(model_name)].search_read( |
|
|
|
domain=[], |
|
|
|
fields=fields |
|
|
|
) |
|
|
|
data = json.dumps({ |
|
|
|
'records': partner_records |
|
|
|
}) |
|
|
|
datas.append(data) |
|
|
|
return request.make_response(data=datas) |
|
|
|
except: |
|
|
|
return ("<html><body><h2>Invalid JSON Data" |
|
|
|
"</h2></body></html>") |
|
|
|
if method == 'POST': |
|
|
|
raise NameError() |
|
|
|
limit = 0 |
|
|
|
if query.get("limit"): |
|
|
|
limit = int(str(query.get("limit"))) |
|
|
|
offset = 0 |
|
|
|
if query.get("offset"): |
|
|
|
offset = int(str(query.get("offset"))) |
|
|
|
|
|
|
|
domains = [] |
|
|
|
for key, value in query.items(): |
|
|
|
if not (key == "limit" or key == "offset"): |
|
|
|
domains.append((key, "=", self.simplest_type(value))) |
|
|
|
partner_records = request.env[str(model_name)].search_read( |
|
|
|
domains, fields, limit=limit, offset=offset |
|
|
|
) |
|
|
|
|
|
|
|
return Response( |
|
|
|
json.dumps({"records": self.sanitize_records(partner_records)}), |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
if method == "POST": |
|
|
|
if not option.is_post: |
|
|
|
return ("<html><body><h2>Method Not Allowed" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
try: |
|
|
|
raise NotImplementedError() |
|
|
|
if not data or "values" not in data: |
|
|
|
raise ValueError("No Data Provided") |
|
|
|
|
|
|
|
data = json.loads(request.httprequest.data) |
|
|
|
datas = [] |
|
|
|
new_resource = request.env[str(model_name)].create( |
|
|
|
data['values']) |
|
|
|
partner_records = request.env[ |
|
|
|
str(model_name)].search_read( |
|
|
|
domain=[('id', '=', new_resource.id)], |
|
|
|
fields=fields |
|
|
|
) |
|
|
|
new_data = json.dumps({'New resource': partner_records, }) |
|
|
|
datas.append(new_data) |
|
|
|
return request.make_response(data=datas) |
|
|
|
except: |
|
|
|
return ("<html><body><h2>Invalid JSON Data" |
|
|
|
"</h2></body></html>") |
|
|
|
if method == 'PUT': |
|
|
|
new_resource = request.env[str(model_name)].create(data["values"]) |
|
|
|
partner_records = request.env[str(model_name)].search_read( |
|
|
|
[("id", "=", new_resource.id)], fields |
|
|
|
) |
|
|
|
return Response( |
|
|
|
json.dumps({"new_record": self.sanitize_records(partner_records)}), |
|
|
|
status=201, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
if method == "PUT": |
|
|
|
if not option.is_put: |
|
|
|
return ("<html><body><h2>Method Not Allowed" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
if rec_id == 0: |
|
|
|
return ("<html><body><h2>No ID Provided" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
resource = request.env[str(model_name)].browse( |
|
|
|
int(rec_id)) |
|
|
|
raise NotImplementedError() |
|
|
|
|
|
|
|
if "id" not in query: |
|
|
|
raise ValueError("No ID Provided") |
|
|
|
if not data or "values" not in data: |
|
|
|
raise ValueError("No Data Provided") |
|
|
|
|
|
|
|
resource_id = str(query.get("id")) |
|
|
|
resource = request.env[str(model_name)].browse(int(resource_id)) |
|
|
|
if not resource.exists(): |
|
|
|
return ("<html><body><h2>Resource not found" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
try: |
|
|
|
datas = [] |
|
|
|
raise ValueError("Resource not found") |
|
|
|
|
|
|
|
data = json.loads(request.httprequest.data) |
|
|
|
resource.write(data['values']) |
|
|
|
partner_records = request.env[ |
|
|
|
str(model_name)].search_read( |
|
|
|
domain=[('id', '=', resource.id)], |
|
|
|
fields=fields |
|
|
|
) |
|
|
|
new_data = json.dumps( |
|
|
|
{'Updated resource': partner_records, |
|
|
|
}) |
|
|
|
datas.append(new_data) |
|
|
|
return request.make_response(data=datas) |
|
|
|
|
|
|
|
except: |
|
|
|
return ("<html><body><h2>Invalid JSON Data " |
|
|
|
"!</h2></body></html>") |
|
|
|
if method == 'DELETE': |
|
|
|
resource.write(data["values"]) |
|
|
|
partner_records = request.env[str(model_name)].search_read( |
|
|
|
[("id", "=", resource.id)], fields |
|
|
|
) |
|
|
|
return Response( |
|
|
|
json.dumps( |
|
|
|
{"updated_record": self.sanitize_records(partner_records)} |
|
|
|
), |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
if method == "DELETE": |
|
|
|
if not option.is_delete: |
|
|
|
return ("<html><body><h2>Method Not Allowed" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
if rec_id == 0: |
|
|
|
return ("<html><body><h2>No ID Provided" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
resource = request.env[str(model_name)].browse( |
|
|
|
int(rec_id)) |
|
|
|
raise NotImplementedError() |
|
|
|
|
|
|
|
if "id" not in query: |
|
|
|
raise ValueError("No ID Provided") |
|
|
|
|
|
|
|
resource_id = str(query.get("id")) |
|
|
|
resource = request.env[str(model_name)].browse(int(resource_id)) |
|
|
|
if not resource.exists(): |
|
|
|
return ("<html><body><h2>Resource not found" |
|
|
|
"</h2></body></html>") |
|
|
|
else: |
|
|
|
|
|
|
|
records = request.env[ |
|
|
|
str(model_name)].search_read( |
|
|
|
domain=[('id', '=', resource.id)], |
|
|
|
fields=['id', 'display_name'] |
|
|
|
) |
|
|
|
remove = json.dumps( |
|
|
|
{"Resource deleted": records, |
|
|
|
}) |
|
|
|
raise ValueError("Resource not found") |
|
|
|
|
|
|
|
partner_records = request.env[str(model_name)].search_read( |
|
|
|
[("id", "=", resource.id)], fields |
|
|
|
) |
|
|
|
resource.unlink() |
|
|
|
return request.make_response(data=remove) |
|
|
|
return Response( |
|
|
|
json.dumps( |
|
|
|
{ |
|
|
|
"message": "Resource deleted", |
|
|
|
"data": self.sanitize_records(partner_records), |
|
|
|
} |
|
|
|
), |
|
|
|
status=202, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
|
|
|
|
@http.route(['/send_request'], type='http', |
|
|
|
auth='none', |
|
|
|
methods=['GET', 'POST', 'PUT', 'DELETE'], csrf=False) |
|
|
|
# If not using any method above, simply return an error |
|
|
|
raise NotImplementedError() |
|
|
|
except ValueError as e: |
|
|
|
return Response( |
|
|
|
json.dumps({"message": e.args[0]}), |
|
|
|
status=403, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
except NotImplementedError as e: |
|
|
|
return Response( |
|
|
|
json.dumps( |
|
|
|
{ |
|
|
|
"message": f"Method not allowed. {e.args[0]}Please contact your admininstrator to enable {method} method for {model_display_name or 'this'} record." |
|
|
|
} |
|
|
|
), |
|
|
|
status=405, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
except Exception as e: |
|
|
|
_logger.error(e) |
|
|
|
return Response( |
|
|
|
json.dumps({"message": f"Internal server error. {e.args[0]}"}), |
|
|
|
status=500, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
|
|
|
|
@http.route( |
|
|
|
["/send_request"], |
|
|
|
type="http", |
|
|
|
auth="rest_api", |
|
|
|
methods=["GET", "POST", "PUT", "DELETE"], |
|
|
|
csrf=False, |
|
|
|
) |
|
|
|
def fetch_data(self, **kw): |
|
|
|
"""This controller will be called when sending a request to the |
|
|
|
specified url, and it will authenticate the api-key and then will |
|
|
|
generate the result""" |
|
|
|
|
|
|
|
http_method = request.httprequest.method |
|
|
|
api_key = request.httprequest.headers.get('api-key') |
|
|
|
auth_api = self.auth_api_key(api_key) |
|
|
|
model = kw.get('model') |
|
|
|
username = request.httprequest.headers.get('login') |
|
|
|
password = request.httprequest.headers.get('password') |
|
|
|
request.session.authenticate(request.session.db, username, |
|
|
|
password) |
|
|
|
model_id = request.env['ir.model'].search( |
|
|
|
[('model', '=', model)]) |
|
|
|
model = kw.pop("model") |
|
|
|
model_id = request.env["ir.model"].search([("model", "=", model)]) |
|
|
|
if not model_id: |
|
|
|
return ("<html><body><h3>Invalid model, check spelling or maybe " |
|
|
|
"the related " |
|
|
|
"module is not installed" |
|
|
|
"</h3></body></html>") |
|
|
|
|
|
|
|
if auth_api == True: |
|
|
|
if not kw.get('Id'): |
|
|
|
rec_id = 0 |
|
|
|
else: |
|
|
|
rec_id = int(kw.get('Id')) |
|
|
|
result = self.generate_response(http_method, model_id.id, rec_id) |
|
|
|
return result |
|
|
|
else: |
|
|
|
return auth_api |
|
|
|
|
|
|
|
@http.route(['/odoo_connect'], type="http", auth="none", csrf=False, |
|
|
|
methods=['GET']) |
|
|
|
def odoo_connect(self, **kw): |
|
|
|
"""This is the controller which initializes the api transaction by |
|
|
|
generating the api-key for specific user and database""" |
|
|
|
|
|
|
|
username = request.httprequest.headers.get('login') |
|
|
|
password = request.httprequest.headers.get('password') |
|
|
|
db = request.httprequest.headers.get('db') |
|
|
|
try: |
|
|
|
request.session.update(http.get_default_session(), db=db) |
|
|
|
auth = request.session.authenticate(request.session.db, username, |
|
|
|
password) |
|
|
|
user = request.env['res.users'].browse(auth) |
|
|
|
api_key = request.env.user.generate_api(username) |
|
|
|
datas = json.dumps({"Status": "auth successful", |
|
|
|
"User": user.name, |
|
|
|
"api-key": api_key}) |
|
|
|
return request.make_response(data=datas) |
|
|
|
except: |
|
|
|
return ("<html><body><h2>wrong login credentials" |
|
|
|
"</h2></body></html>") |
|
|
|
return Response( |
|
|
|
json.dumps( |
|
|
|
{ |
|
|
|
"message": "Invalid model, check spelling or maybe the related module is not installed" |
|
|
|
} |
|
|
|
), |
|
|
|
status=403, |
|
|
|
content_type="application/json", |
|
|
|
) |
|
|
|
|
|
|
|
return self.generate_response(http_method, model=model_id.id, **kw) |
|
|
|