You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1106 lines
57 KiB

# -*- coding: utf-8 -*-
###############################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2023-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Cybrosys Techno Solutions(<https://www.cybrosys.com>)
#
# You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details.
#
# You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE
# (LGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
import boto3
import dropbox
import errno
import ftplib
import json
import logging
import nextcloud_client
import os
import paramiko
import requests
import tempfile
import odoo
from datetime import timedelta
from nextcloud import NextCloud
from requests.auth import HTTPBasicAuth
from werkzeug import urls
from odoo import _, api, fields, models
from odoo.exceptions import UserError, ValidationError
from odoo.http import request
from odoo.service import db
_logger = logging.getLogger(__name__)
ONEDRIVE_SCOPE = ['offline_access openid Files.ReadWrite.All']
MICROSOFT_GRAPH_END_POINT = "https://graph.microsoft.com"
GOOGLE_AUTH_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
GOOGLE_TOKEN_ENDPOINT = 'https://accounts.google.com/o/oauth2/token'
GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
headers = {"content-type": "application/x-www-form-urlencoded"}
class DbBackupConfigure(models.Model):
"""Model for configuring automatic database backup settings.
Methods:
action_nextcloud: Perform a Nextcloud connection test and display a
notification based on the result.
action_s3cloud:Perform an Amazon connection test and display a
notification based on the result.
_compute_redirect_uri: Compute the redirect URIs based on the backup
destination.
_compute_is_google_drive_token_generated: Set True if the Google Drive
refresh token is generated.
generate_gdrive_refresh_token: Generate Google Drive access token from
refresh token if expired.
get_gdrive_tokens: Generate onedrive tokens from authorization code.
_compute_is_onedrive_token_generated: Set true if onedrive tokens are
generated.
_compute_is_dropbox_token_generated: Set True if the dropbox refresh
token is generated.
action_get_dropbox_auth_code : Open a wizard to set up dropbox
Authorization code.
action_get_onedrive_auth_code: Generate onedrive authorization code
generate_onedrive_refresh_token: Generate onedrive access token from
refresh token if expired.
get_onedrive_tokens: Generate onedrive tokens from authorization code
get_dropbox_auth_url: Return dropbox authorization url.
set_dropbox_refresh_token: Generate and set the dropbox refresh token
from authorization code.
_check_db_credentials: Validate entered database name and master
password.
action_sftp_connection: Test the sftp and ftp connection using entered
credentials.
_schedule_auto_backup: Function for generating and storing backup
Database backup for all the active records in backup configuration
model will be created
"""
_name = 'db.backup.configure'
_description = 'Automatic Database Backup'
name = fields.Char(string='Name', required=True, help='Name of the Record')
db_name = fields.Char(string='Database Name', required=True,
help='Name of the current database')
master_pwd = fields.Char(string='Master Password', required=True,
help='Master password of the database')
backup_format = fields.Selection([
('zip', 'Zip'),
('dump', 'Dump')
], string='Backup Format', default='zip', required=True,
help='The format in which the backup should be created')
backup_destination = fields.Selection([
('local', 'Local Storage'),
('google_drive', 'Google Drive'),
('ftp', 'FTP'),
('sftp', 'SFTP'),
('dropbox', 'Dropbox'),
('onedrive', 'Onedrive'),
('next_cloud', 'Next Cloud'),
('amazon_s3', 'Amazon S3')
], string='Backup Destination',
help='The destination where the backup should be stored')
backup_path = fields.Char(string='Backup Path',
help='Local storage directory path')
sftp_host = fields.Char(string='SFTP Host',
help='The host of the SFTP server')
sftp_port = fields.Char(string='SFTP Port', default=22,
help='The port number for the SFTP connection')
sftp_user = fields.Char(string='SFTP User', copy=False,
help='The username for the SFTP connection')
sftp_password = fields.Char(string='SFTP Password', copy=False,
help='The password for the SFTP connection')
sftp_path = fields.Char(string='SFTP Path',
help='The path to the destination directory on '
'the SFTP server')
ftp_host = fields.Char(string='FTP Host',
help='The hostname or IP address of the FTP server')
ftp_port = fields.Char(string='FTP Port', default=21,
help='The port number used for FTP connections')
ftp_user = fields.Char(string='FTP User', copy=False,
help='The username used for FTP authentication')
ftp_password = fields.Char(string='FTP Password', copy=False,
help='The password used for FTP authentication')
ftp_path = fields.Char(string='FTP Path',
help='The path to the directory on the FTP server '
'where the backup should be stored')
dropbox_client_key = fields.Char(string='Dropbox Client ID', copy=False,
help='The client ID for accessing Dropbox '
'API')
dropbox_client_secret = fields.Char(string='Dropbox Client Secret',
copy=False,
help='The client secret for accessing '
'Dropbox API')
dropbox_refresh_token = fields.Char(string='Dropbox Refresh Token',
copy=False,
help='The refresh token for accessing '
'Dropbox API')
is_dropbox_token_generated = fields.Boolean(
string='Dropbox Token Generated',
compute='_compute_is_dropbox_token_generated', copy=False,
help='A boolean indicating whether the Dropbox token has been '
'generated or not')
dropbox_folder = fields.Char(string='Dropbox Folder',
help='The name of the Dropbox folder where '
'backups should be stored')
active = fields.Boolean(string='Active',
help='Set this field to True to activate the backup'
'configuration.')
hide_active = fields.Boolean(string="Hide Active",
help="Make active field to readonly")
auto_remove = fields.Boolean(string='Remove Old Backups',
help='Set this field to True to automatically '
'remove old backups.')
days_to_remove = fields.Integer(string='Remove After',
help='Automatically delete stored backups '
'after this specified number of days')
google_drive_folder_key = fields.Char(string='Drive Folder ID',
help='Folder ID inside Google Drive')
notify_user = fields.Boolean(string='Notify User',
help='Send an email notification to user '
'when the backup operation is '
'successful or failed')
user_id = fields.Many2one('res.users', string='User',
help='Select the user to send email')
backup_filename = fields.Char(string='Backup Filename',
help='For Storing generated backup filename')
generated_exception = fields.Char(string='Exception',
help='Exception Encountered while '
'Backup generation')
onedrive_client_key = fields.Char(string='Onedrive Client ID', copy=False,
help='The client ID of your Onedrive '
'application.')
onedrive_client_secret = fields.Char(string='Onedrive Client Secret',
copy=False,
help='The client secret of your '
'Onedrive application.')
onedrive_access_token = fields.Char(string='Onedrive Access Token',
copy=False,
help='The access token required to '
'authenticate with Onedrive API.'
)
onedrive_refresh_token = fields.Char(string='Onedrive Refresh Token',
copy=False,
help='The refresh token required to '
'obtain a new access token from '
'Onedrive API.')
onedrive_token_validity = fields.Datetime(string='Onedrive Token Validity',
copy=False,
help='The date and time when '
'the Onedrive access token '
'expires.')
onedrive_folder_key = fields.Char(string='Folder ID',
help='The unique identifier of the '
'Onedrive folder where the backup '
'file will be stored.')
is_onedrive_token_generated = fields.Boolean(
string='onedrive Tokens Generated',
compute='_compute_is_onedrive_token_generated', copy=False,
help='Indicates whether the Onedrive tokens have been generated.')
gdrive_refresh_token = fields.Char(string='Google drive Refresh Token',
copy=False,
help='The refresh token for the Google '
'Drive account')
gdrive_access_token = fields.Char(string='Google Drive Access Token',
copy=False,
help='The access token for the Google '
'Drive account')
is_google_drive_token_generated = fields.Boolean(
string='Google drive Token Generated',
compute='_compute_is_google_drive_token_generated', copy=False,
help='A flag to indicate whether a token has been generated for the '
'Google Drive account')
gdrive_client_key = fields.Char(string='Google Drive Client ID', copy=False,
help='The client ID for the Google Drive API'
)
gdrive_client_secret = fields.Char(string='Google Drive Client Secret',
copy=False,
help='The client secret for the Google '
'Drive API')
gdrive_token_validity = fields.Datetime(
string='Google Drive Token Validity', copy=False,
help='Specify the validity period of the Google Drive access token.')
gdrive_redirect_uri = fields.Char(string='Google Drive Redirect URI',
compute='_compute_redirect_uri',
help='The redirect URI used for the '
'Google Drive API authentication '
'flow')
onedrive_redirect_uri = fields.Char(string='Onedrive Redirect URI',
compute='_compute_redirect_uri',
help='The redirect URI used for the '
'Onedrive API authentication flow')
aws_access_key = fields.Char(string="Amazon S3 Access Key",
help="Field used to store the Access Key "
"for an Amazon S3 bucket.")
aws_secret_access_key = fields.Char(string='Amazon S3 Secret Key',
help="Field used to store the Secret "
"Key for an Amazon S3 bucket.")
bucket_file_name = fields.Char(string='Bucket Name',
help="Field used to store the name of an "
"Amazon S3 bucket.")
aws_folder_name = fields.Char(string='File Name',
help="field used to store the name of a "
"folder in an Amazon S3 bucket.")
domain = fields.Char(string='Domain Name', help="Field used to store the "
"name of a domain")
next_cloud_user_name = fields.Char(string='User Name',
help="Field used to store the user "
"name for a Nextcloud account.")
next_cloud_password = fields.Char(string='Password',
help="Field used to store the password "
"for a Nextcloud account.")
nextcloud_folder_key = fields.Char(string='Next Cloud Folder Id',
help="Field used to store the unique "
"identifier for a Nextcloud folder.")
gdrive_backup_error_test = fields.Boolean(string="Google Drive Error Test")
onedrive_backup_error_test = fields.Boolean(string="OneDrive Error Test")
@api.onchange('backup_destination')
def _onchange_backup_destination(self):
self.write({
"gdrive_backup_error_test": False,
"onedrive_backup_error_test": False
})
@api.onchange('gdrive_client_key', 'gdrive_client_secret',
'google_drive_folder', 'onedrive_client_key',
'onedrive_client_secret', 'onedrive_folder_key')
def _onchange_gdrive_backup_error_test(self):
if self.backup_destination == 'google_drive':
if self.gdrive_backup_error_test:
self.write({
"gdrive_backup_error_test": False
})
if self.backup_destination == 'onedrive':
if self.onedrive_backup_error_test:
self.write({
"onedrive_backup_error_test": False
})
def action_nextcloud(self):
"""If it has next_cloud_password, domain, and next_cloud_user_name
which will perform an action for nextcloud connection test"""
if self.domain and self.next_cloud_password and \
self.next_cloud_user_name:
try:
ncx = NextCloud(self.domain,
auth=HTTPBasicAuth(self.next_cloud_user_name,
self.next_cloud_password))
data = ncx.list_folders('/').__dict__
if data['raw'].status_code == 207:
self.active = True
self.hide_active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'title': _("Connection Test Succeeded!"),
'message': _("Everything seems properly set up!"),
'sticky': False,
}
}
else:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
except Exception:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
def action_s3cloud(self):
"""If it has aws_secret_access_key, which will perform s3 cloud
operations for connection test"""
if self.aws_access_key and self.aws_secret_access_key:
try:
bo3 = boto3.client(
's3',
aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_access_key)
response = bo3.list_buckets()
for bucket in response['Buckets']:
if self.bucket_file_name == bucket['Name']:
self.active = True
self.hide_active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'title': _("Connection Test Succeeded!"),
'message': _(
"Everything seems properly set up!"),
'sticky': False,
}
}
raise UserError(
_("Bucket not found. Please check the bucket name and"
" try again."))
except Exception:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
def _compute_redirect_uri(self):
"""Compute the redirect URIs based on the backup destination"""
for rec in self:
base_url = request.env['ir.config_parameter'].get_param(
'web.base.url')
rec.onedrive_redirect_uri = base_url + '/onedrive/authentication'
rec.gdrive_redirect_uri = base_url + '/google_drive/authentication'
@api.depends('gdrive_access_token', 'gdrive_refresh_token')
def _compute_is_google_drive_token_generated(self):
"""Set True if the Google Drive refresh token is generated"""
for rec in self:
rec.is_google_drive_token_generated = bool(
rec.gdrive_access_token) and bool(rec.gdrive_refresh_token)
def action_get_gdrive_auth_code(self):
"""Generate Google drive authorization code"""
action = self.env["ir.actions.act_window"].sudo()._for_xml_id(
"auto_database_backup.db_backup_configure_action")
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
url_return = base_url + '/web#id=%d&action=%d&view_type=form&model=%s' % (
self.id, action['id'], 'db.backup.configure')
state = {
'backup_config_id': self.id,
'url_return': url_return
}
encoded_params = urls.url_encode({
'response_type': 'code',
'client_id': self.gdrive_client_key,
'scope': 'https://www.googleapis.com/auth/drive https://www.googleapis.com/auth/drive.file',
'redirect_uri': base_url + '/google_drive/authentication',
'access_type': 'offline',
'state': json.dumps(state),
'approval_prompt': 'force',
})
auth_url = "%s?%s" % (GOOGLE_AUTH_ENDPOINT, encoded_params)
return {
'type': 'ir.actions.act_url',
'target': 'self',
'url': auth_url,
}
def generate_gdrive_refresh_token(self):
"""Generate Google Drive access token from refresh token if expired"""
data = {
'refresh_token': self.gdrive_refresh_token,
'client_id': self.gdrive_client_key,
'client_secret': self.gdrive_client_secret,
'grant_type': 'refresh_token',
}
try:
res = requests.post(GOOGLE_TOKEN_ENDPOINT, data=data,
headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'gdrive_access_token': response.get('access_token'),
'gdrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError as error:
error_key = error.response.json().get("error", "nc")
error_msg = _(
"An error occurred while generating the token. Your "
"authorization code may be invalid or has already expired [%s]."
"You should check your Client ID and secret on the Google "
"APIs plateform or try to stop and restart your calendar "
"synchronisation.",
error_key)
raise UserError(error_msg)
def get_gdrive_tokens(self, authorize_code):
"""Generate onedrive tokens from authorization code"""
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
data = {
'code': authorize_code,
'client_id': self.gdrive_client_key,
'client_secret': self.gdrive_client_secret,
'grant_type': 'authorization_code',
'redirect_uri': base_url + '/google_drive/authentication'
}
try:
res = requests.post(GOOGLE_TOKEN_ENDPOINT, params=data,
headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'gdrive_access_token': response.get('access_token'),
'gdrive_refresh_token': response.get('refresh_token'),
'gdrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
if self.gdrive_backup_error_test:
self.write({
'gdrive_backup_error_test': False
})
except Exception:
if not self.gdrive_backup_error_test:
self.write({"gdrive_backup_error_test": True})
@api.depends('onedrive_access_token', 'onedrive_refresh_token')
def _compute_is_onedrive_token_generated(self):
"""Set true if onedrive tokens are generated"""
for rec in self:
rec.is_onedrive_token_generated = bool(
rec.onedrive_access_token) and bool(rec.onedrive_refresh_token)
@api.depends('dropbox_refresh_token')
def _compute_is_dropbox_token_generated(self):
"""Set True if the dropbox refresh token is generated"""
for rec in self:
rec.is_dropbox_token_generated = bool(rec.dropbox_refresh_token)
def action_get_dropbox_auth_code(self):
"""Open a wizard to set up dropbox Authorization code"""
return {
'type': 'ir.actions.act_window',
'name': 'Dropbox Authorization Wizard',
'res_model': 'dropbox.auth.code',
'view_mode': 'form',
'target': 'new',
}
def action_get_onedrive_auth_code(self):
"""Generate onedrive authorization code"""
authority = \
'https://login.microsoftonline.com/common/oauth2/v2.0/authorize'
action = self.env["ir.actions.act_window"].sudo()._for_xml_id(
"auto_database_backup.db_backup_configure_action")
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
url_return = base_url + '/web#id=%d&action=%d&view_type=form&model=%s' % (
self.id, action['id'], 'db.backup.configure')
state = {
'backup_config_id': self.id,
'url_return': url_return
}
encoded_params = urls.url_encode({
'response_type': 'code',
'client_id': self.onedrive_client_key,
'state': json.dumps(state),
'scope': ONEDRIVE_SCOPE,
'redirect_uri': base_url + '/onedrive/authentication',
'prompt': 'consent',
'access_type': 'offline'
})
auth_url = "%s?%s" % (authority, encoded_params)
return {
'type': 'ir.actions.act_url',
'target': 'self',
'url': auth_url,
}
def generate_onedrive_refresh_token(self):
"""Generate onedrive access token from refresh token if expired"""
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
data = {
'client_id': self.onedrive_client_key,
'client_secret': self.onedrive_client_secret,
'scope': ONEDRIVE_SCOPE,
'grant_type': "refresh_token",
'redirect_uri': base_url + '/onedrive/authentication',
'refresh_token': self.onedrive_refresh_token
}
try:
res = requests.post(
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
data=data, headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'onedrive_access_token': response.get('access_token'),
'onedrive_refresh_token': response.get('refresh_token'),
'onedrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError as error:
_logger.exception("Bad microsoft onedrive request : %s !",
error.response.content)
raise error
def get_onedrive_tokens(self, authorize_code):
"""Generate onedrive tokens from authorization code"""
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
data = {
'code': authorize_code,
'client_id': self.onedrive_client_key,
'client_secret': self.onedrive_client_secret,
'grant_type': 'authorization_code',
'scope': ONEDRIVE_SCOPE,
'redirect_uri': base_url + '/onedrive/authentication'
}
try:
res = requests.post(
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
data=data, headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'onedrive_access_token': response.get('access_token'),
'onedrive_refresh_token': response.get('refresh_token'),
'onedrive_token_validity': fields.Datetime.now() + timedelta
(seconds=expires_in) if expires_in else False,
})
if self.onedrive_backup_error_test:
self.write({
'onedrive_backup_error_test': False
})
except Exception:
if not self.onedrive_backup_error_test:
self.write({"onedrive_backup_error_test": True})
def get_dropbox_auth_url(self):
"""Return dropbox authorization url"""
dbx_auth = dropbox.oauth.DropboxOAuth2FlowNoRedirect(
self.dropbox_client_key, self.dropbox_client_secret,
token_access_type='offline')
return dbx_auth.start()
def set_dropbox_refresh_token(self, auth_code):
"""Generate and set the dropbox refresh token from authorization code"""
try:
dbx_auth = dropbox.oauth.DropboxOAuth2FlowNoRedirect(
self.dropbox_client_key, self.dropbox_client_secret,
token_access_type='offline')
outh_result = dbx_auth.finish(auth_code)
self.dropbox_refresh_token = outh_result.refresh_token
except Exception:
raise ValidationError(
'Please Enter Valid Authentication Code')
@api.constrains('db_name')
def _check_db_credentials(self):
"""Validate entered database name and master password"""
database_list = db.list_dbs()
if self.db_name not in database_list:
raise ValidationError(_("Invalid Database Name!"))
try:
odoo.service.db.check_super(self.master_pwd)
except Exception:
raise ValidationError(_("Invalid Master Password!"))
def action_sftp_connection(self):
"""Test the sftp and ftp connection using entered credentials"""
if self.backup_destination == 'sftp':
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=self.sftp_host,
username=self.sftp_user,
password=self.sftp_password,
port=self.sftp_port)
sftp = client.open_sftp()
sftp.close()
except Exception as e:
raise UserError(_("SFTP Exception: %s", e))
finally:
client.close()
elif self.backup_destination == 'ftp':
try:
ftp_server = ftplib.FTP()
ftp_server.connect(self.ftp_host, int(self.ftp_port))
ftp_server.login(self.ftp_user, self.ftp_password)
ftp_server.quit()
except Exception as e:
raise UserError(_("FTP Exception: %s", e))
self.hide_active = True
self.active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _("Connection Test Succeeded!"),
'message': _("Everything seems properly set up!"),
'sticky': False,
}
}
@api.onchange('backup_destination')
def _onchange_back_up_local(self):
"""
On change handler for the 'backup_destination' field. This method is
triggered when the value of 'backup_destination' is changed. If the
chosen backup destination is 'local', it sets the 'hide_active' field
to True which make active field to readonly to False.
"""
if self.backup_destination == 'local':
self.hide_active = True
def _schedule_auto_backup(self):
"""Function for generating and storing backup Database backup for all
the active records in backup configuration model will be created"""
records = self.search([])
mail_template_success = self.env.ref(
'auto_database_backup.mail_template_data_db_backup_successful')
mail_template_failed = self.env.ref(
'auto_database_backup.mail_template_data_db_backup_failed')
for rec in records:
backup_time = fields.datetime.utcnow().strftime(
"%Y-%m-%d_%H-%M-%S")
backup_filename = "%s_%s.%s" % (
rec.db_name, backup_time, rec.backup_format)
rec.backup_filename = backup_filename
# Local backup
if rec.backup_destination == 'local':
try:
if not os.path.isdir(rec.backup_path):
os.makedirs(rec.backup_path)
backup_file = os.path.join(rec.backup_path, backup_filename)
fds = open(backup_file, "wb")
odoo.service.db.dump_db(rec.db_name, fds, rec.backup_format)
fds.close()
# remove older backups
if rec.auto_remove:
for filename in os.listdir(rec.backup_path):
file = os.path.join(rec.backup_path, filename)
create_time = fields.datetime.fromtimestamp(
os.path.getctime(file))
backup_duration = \
fields.datetime.utcnow() - create_time
if backup_duration.days >= rec.days_to_remove:
os.remove(file)
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('FTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# FTP backup
elif rec.backup_destination == 'ftp':
try:
ftp_server = ftplib.FTP()
ftp_server.connect(rec.ftp_host, int(rec.ftp_port))
ftp_server.login(rec.ftp_user, rec.ftp_password)
ftp_server.encoding = "utf-8"
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
try:
ftp_server.cwd(rec.ftp_path)
except ftplib.error_perm:
ftp_server.mkd(rec.ftp_path)
ftp_server.cwd(rec.ftp_path)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp,
rec.backup_format)
ftp_server.storbinary('STOR %s' % backup_filename,
open(temp.name, "rb"))
if rec.auto_remove:
files = ftp_server.nlst()
for file in files:
create_time = fields.datetime.strptime(
ftp_server.sendcmd('MDTM ' + file)[4:],
"%Y%m%d%H%M%S")
diff_days = (
fields.datetime.now() - create_time).days
if diff_days >= rec.days_to_remove:
ftp_server.delete(file)
ftp_server.quit()
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('FTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# SFTP backup
elif rec.backup_destination == 'sftp':
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=rec.sftp_host,
username=rec.sftp_user,
password=rec.sftp_password,
port=rec.sftp_port)
sftp = client.open_sftp()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp,
rec.backup_format)
try:
sftp.chdir(rec.sftp_path)
except IOError as e:
if e.errno == errno.ENOENT:
sftp.mkdir(rec.sftp_path)
sftp.chdir(rec.sftp_path)
sftp.put(temp.name, backup_filename)
if rec.auto_remove:
files = sftp.listdir()
expired = list(filter(
lambda fl: (fields.datetime.now() -
fields.datetime.fromtimestamp(
sftp.stat(fl).st_mtime)).days >=
rec.days_to_remove, files))
for file in expired:
sftp.unlink(file)
sftp.close()
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('SFTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
finally:
client.close()
# Google Drive backup
elif rec.backup_destination == 'google_drive':
try:
if (rec.gdrive_token_validity is not False and
rec.gdrive_token_validity <= fields.Datetime.now()):
rec.generate_gdrive_refresh_token()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp, rec.backup_format)
try:
headers = {
"Authorization": "Bearer %s" % rec.gdrive_access_token}
para = {
"name": backup_filename,
"parents": [rec.google_drive_folder_key],
}
files = {
'data': ('metadata', json.dumps(para),
'application/json; charset=UTF-8'),
'file': open(temp.name, "rb")
}
requests.post(
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart",
headers=headers,
files=files
)
if rec.auto_remove:
query = "parents = '%s'" % rec.google_drive_folder_key
files_req = requests.get(
"https://www.googleapis.com/drive/v3/files?q=%s" % query,
headers=headers)
files = files_req.json()['files']
for file in files:
file_date_req = requests.get(
"https://www.googleapis.com/drive/v3/files/%s?fields=createdTime" %
file['id'], headers=headers)
create_time = file_date_req.json()['createdTime'][
:19].replace('T', ' ')
diff_days = (
fields.datetime.now()
- fields.datetime.strptime(
create_time, '%Y-%m-%d %H:%M:%S')).days
if diff_days >= rec.days_to_remove:
requests.delete(
"https://www.googleapis.com/drive/v3/files/%s" %
file['id'], headers=headers)
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('Google Drive Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
except Exception:
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
raise ValidationError(
'Please check the credentials before activation')
else:
raise ValidationError('Please check connection')
# Dropbox backup
elif rec.backup_destination == 'dropbox':
try:
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp, rec.backup_format)
try:
drop_connection = dropbox.Dropbox(
app_key=rec.dropbox_client_key,
app_secret=rec.dropbox_client_secret,
oauth2_refresh_token=rec.dropbox_refresh_token)
dropbox_destination = rec.dropbox_folder + '/' + backup_filename
drop_connection.files_upload(temp.read(),
dropbox_destination)
if rec.auto_remove:
files = drop_connection.files_list_folder(
rec.dropbox_folder)
file_entries = files.entries
expired_files = list(filter(
lambda fl: (fields.datetime.now() -
fl.client_modified).days >=
rec.days_to_remove,
file_entries))
for file in expired_files:
drop_connection.files_delete_v2(file.path_display)
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('Dropbox Exception: %s', error)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
except Exception:
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
raise ValidationError(
'Please check the credentials before activation')
else:
raise ValidationError('Please check connection')
# Onedrive Backup
elif rec.backup_destination == 'onedrive':
try:
if (rec.onedrive_token_validity is not False and
rec.onedrive_token_validity <= fields.Datetime.now()):
rec.generate_onedrive_refresh_token()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp, rec.backup_format)
headers = {
'Authorization': 'Bearer %s' % rec.onedrive_access_token,
'Content-Type': 'application/json'}
upload_session_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s:/%s:/createUploadSession" % (
rec.onedrive_folder_key, backup_filename)
try:
upload_session = requests.post(upload_session_url,
headers=headers)
upload_url = upload_session.json().get('uploadUrl')
requests.put(upload_url, data=temp.read())
if rec.auto_remove:
list_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s/children" % rec.onedrive_folder_key
response = requests.get(list_url, headers=headers)
files = response.json().get('value')
for file in files:
create_time = file['createdDateTime'][:19].replace(
'T', ' ')
diff_days = (
fields.datetime.now() - fields.datetime.strptime(
create_time, '%Y-%m-%d %H:%M:%S')).days
if diff_days >= rec.days_to_remove:
delete_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s" % \
file['id']
requests.delete(delete_url, headers=headers)
if rec.notify_user:
mail_template_success.send_mail(rec.id, force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('Onedrive Exception: %s', error)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
except Exception:
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
raise ValidationError(
'Please check the credentials before activation')
else:
raise ValidationError('Please check connection')
# amazon S3 backup
elif rec.backup_destination == 'amazon_s3':
if rec.aws_access_key and rec.aws_secret_access_key:
try:
# create a boto3 client for Amazon S3 with provided
# access key id and secret access key
bo3 = boto3.client(
's3',
aws_access_key_id=rec.aws_access_key,
aws_secret_access_key=rec.aws_secret_access_key)
# create a boto3 resource for Amazon S3 with provided
# access key id and secret access key
s3 = boto3.resource(
's3',
aws_access_key_id=rec.aws_access_key,
aws_secret_access_key=rec.aws_secret_access_key, )
# create a folder in the specified bucket, if it
# doesn't already exist
s3.Object(rec.bucket_file_name,
rec.aws_folder_name + '/').put()
bucket = s3.Bucket(rec.bucket_file_name)
# get all the prefixes in the bucket
prefixes = set()
for obj in bucket.objects.all():
key = obj.key
if key.endswith('/'):
prefix = key[:-1] # Remove the trailing slash
prefixes.add(prefix)
# if the specified folder is present in the bucket,
# take a backup of the database and upload it to the
# S3 bucket
if rec.aws_folder_name in prefixes:
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = \
f"{rec.aws_folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
s3.Object(rec.bucket_file_name,
remote_file_path).upload_file(
backup_file_path)
# If notify_user is enabled, send a success email
# user notifying them about the successful backup
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
# if auto_remove is enabled, remove the backups that
# are older than specified days from the S3 bucket
if rec.auto_remove:
folder_path = rec.aws_folder_name
response = bo3.list_objects(
Bucket=rec.bucket_file_name,
Prefix=folder_path)
today = fields.date.today()
for file in response['Contents']:
file_path = file['Key']
last_modified = file['LastModified']
date = last_modified.date()
age_in_days = (today - date).days
if age_in_days >= rec.days_to_remove:
bo3.delete_object(
Bucket=rec.bucket_file_name,
Key=file_path)
except Exception as error:
# if any error occurs, set the 'generated_exception'
# field to the error message and log the error
rec.generated_exception = error
_logger.info('Amazon S3 Exception: %s', error)
# If an exception occurs, send a failed email
# notifying them about the failed backup
if rec.notify_user:
mail_template_failed.send_mail(rec.id,
force_send=True)
# nextcloud backup
elif rec.backup_destination == 'next_cloud':
try:
if rec.domain and rec.next_cloud_password and \
rec.next_cloud_user_name:
try:
# Connect to NextCloud using the provided username
# and password
ncx = NextCloud(rec.domain,
auth=HTTPBasicAuth(
rec.next_cloud_user_name,
rec.next_cloud_password))
# Connect to NextCloud again to perform additional
# operations
nc = nextcloud_client.Client(rec.domain)
nc.login(rec.next_cloud_user_name,
rec.next_cloud_password)
# Get the folder name from the NextCloud folder ID
folder_name = rec.nextcloud_folder_key
# If auto_remove is enabled, remove backup files
# older than specified days
if rec.auto_remove:
folder_path = "/" + folder_name
for item in nc.list(folder_path):
backup_file_name = item.path.split("/")[-1]
backup_date_str = backup_file_name.split("_")[
2]
backup_date = fields.datetime.strptime(
backup_date_str, '%Y-%m-%d').date()
if (fields.date.today() - backup_date).days \
>= rec.days_to_remove:
nc.delete(item.path)
# If notify_user is enabled, send a success email
# notification
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('NextCloud Exception: %s', error)
if rec.notify_user:
# If an exception occurs, send a failed email
# notification
mail_template_failed.send_mail(rec.id,
force_send=True)
# Get the list of folders in the root directory of NextCloud
data = ncx.list_folders('/').__dict__
folders = [
[file_name['href'].split('/')[-2], file_name['file_id']]
for file_name in data['data'] if
file_name['href'].endswith('/')]
# If the folder name is not found in the list of folders,
# create the folder
if folder_name.replace('/', '') not in [file[0] for file in
folders]:
nc.mkdir(folder_name)
# Dump the database to a temporary file
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = f"/{folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
nc.put_file(remote_file_path, backup_file_path)
else:
# Dump the database to a temporary file
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
odoo.service.db.dump_db(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = f"/{folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
nc.put_file(remote_file_path, backup_file_path)
except Exception:
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
raise ValidationError(
'Please check the credentials before activation')
else:
raise ValidationError('Please check connection')