You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1074 lines
55 KiB

# -*- coding: utf-8 -*-
###############################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
# Author: Cybrosys Techno Solutions (odoo@cybrosys.com)
#
# You can modify it under the terms of the GNU LESSER
# GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details.
#
# You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE
# (LGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
import boto3
import dropbox
import errno
import ftplib
import json
import logging
import nextcloud_client
import os
import paramiko
import requests
import shutil
import subprocess
import tempfile
import odoo
from datetime import timedelta
from nextcloud import NextCloud
from requests.auth import HTTPBasicAuth
from werkzeug import urls
from odoo import api, fields, models, _
from odoo.exceptions import UserError, ValidationError
from odoo.tools.misc import find_pg_tool, exec_pg_environ
from odoo.http import request
from odoo.service import db
_logger = logging.getLogger(__name__)
ONEDRIVE_SCOPE = ['offline_access openid Files.ReadWrite.All']
MICROSOFT_GRAPH_END_POINT = "https://graph.microsoft.com"
GOOGLE_AUTH_ENDPOINT = 'https://accounts.google.com/o/oauth2/auth'
GOOGLE_TOKEN_ENDPOINT = 'https://accounts.google.com/o/oauth2/token'
GOOGLE_API_BASE_URL = 'https://www.googleapis.com'
class DbBackupConfigure(models.Model):
"""DbBackupConfigure class provides an interface to manage database
backups of Local Server, Remote Server, Google Drive, Dropbox, Onedrive,
Nextcloud and Amazon S3"""
_name = 'db.backup.configure'
_description = 'Automatic Database Backup'
name = fields.Char(string='Name', required=True, help='Add the name')
db_name = fields.Char(string='Database Name', required=True,
help='Name of the database')
master_pwd = fields.Char(string='Master Password', required=True,
help='Master password')
backup_format = fields.Selection([
('zip', 'Zip'),
('dump', 'Dump')
], string='Backup Format', default='zip', required=True,
help='Format of the backup')
backup_destination = fields.Selection([
('local', 'Local Storage'),
('google_drive', 'Google Drive'),
('ftp', 'FTP'),
('sftp', 'SFTP'),
('dropbox', 'Dropbox'),
('onedrive', 'Onedrive'),
('next_cloud', 'Next Cloud'),
('amazon_s3', 'Amazon S3')
], string='Backup Destination', help='Destination of the backup')
backup_path = fields.Char(string='Backup Path',
help='Local storage directory path')
sftp_host = fields.Char(string='SFTP Host', help='SFTP host details')
sftp_port = fields.Char(string='SFTP Port', default=22,
help='SFTP port details')
sftp_user = fields.Char(string='SFTP User', copy=False,
help='SFTP user details')
sftp_password = fields.Char(string='SFTP Password', copy=False,
help='SFTP password')
sftp_path = fields.Char(string='SFTP Path', help='SFTP path details')
ftp_host = fields.Char(string='FTP Host', help='FTP host details')
ftp_port = fields.Char(string='FTP Port', default=21,
help='FTP port details')
ftp_user = fields.Char(string='FTP User', copy=False,
help='FTP user details')
ftp_password = fields.Char(string='FTP Password', copy=False,
help='FTP password')
ftp_path = fields.Char(string='FTP Path', help='FTP path details')
dropbox_client_key = fields.Char(string='Dropbox Client ID', copy=False,
help='Client id of the dropbox')
dropbox_client_secret = fields.Char(string='Dropbox Client Secret',
copy=False,
help='Client secret id of the dropbox')
dropbox_refresh_token = fields.Char(string='Dropbox Refresh Token',
copy=False,
help='Refresh token for the dropbox')
is_dropbox_token_generated = fields.Boolean(
string='Dropbox Token Generated',
compute='_compute_is_dropbox_token_generated',
copy=False, help='Is the dropbox token generated or not?')
dropbox_folder = fields.Char(string='Dropbox Folder',
help='Dropbox folder')
active = fields.Boolean(default=False, string='Active',
help='Activate the Scheduled Action or not')
hide_active = fields.Boolean(string="Hide Active",
help="Make active field to readonly")
auto_remove = fields.Boolean(string='Remove Old Backups',
help='Remove old backups')
days_to_remove = fields.Integer(string='Remove After',
help='Automatically delete stored backups'
' after this specified number of days')
google_drive_folder_key = fields.Char(string='Drive Folder ID',
help='Folder id of the drive')
notify_user = fields.Boolean(string='Notify User',
help='Send an email notification to user when'
'the backup operation is successful'
'or failed')
user_id = fields.Many2one('res.users', string='User',
help='Name of the user')
backup_filename = fields.Char(string='Backup Filename',
help='For Storing generated backup filename')
generated_exception = fields.Char(string='Exception',
help='Exception Encountered while Backup'
'generation')
onedrive_client_key = fields.Char(string='Onedrive Client ID', copy=False,
help='Client ID of the onedrive')
onedrive_client_secret = fields.Char(string='Onedrive Client Secret',
copy=False, help='Client secret id of'
' the onedrive')
onedrive_access_token = fields.Char(string='Onedrive Access Token',
copy=False,
help='Access token for one drive')
onedrive_refresh_token = fields.Char(string='Onedrive Refresh Token',
copy=False,
help='Refresh token for one drive')
onedrive_token_validity = fields.Datetime(string='Onedrive Token Validity',
copy=False,
help='Token validity date')
onedrive_folder_key = fields.Char(string='Folder ID',
help='Folder id of the onedrive')
is_onedrive_token_generated = fields.Boolean(
string='onedrive Tokens Generated',
compute='_compute_is_onedrive_token_generated',
copy=False, help='Whether to generate onedrive token?')
gdrive_refresh_token = fields.Char(string='Google drive Refresh Token',
copy=False,
help='Refresh token for google drive')
gdrive_access_token = fields.Char(string='Google Drive Access Token',
copy=False,
help='Access token for google drive')
is_google_drive_token_generated = fields.Boolean(
string='Google drive Token Generated',
compute='_compute_is_google_drive_token_generated', copy=False,
help='Google drive token generated or not')
gdrive_client_key = fields.Char(string='Google Drive Client ID',
copy=False,
help='Client id of the google drive')
gdrive_client_secret = fields.Char(string='Google Drive Client Secret',
copy=False,
help='Client secret id of the google'
' drive')
gdrive_token_validity = fields.Datetime(
string='Google Drive Token Validity', copy=False,
help='Token validity of the google drive')
onedrive_redirect_uri = fields.Char(string='Onedrive Redirect URI',
compute='_compute_redirect_uri',
help='Redirect URI of the onedrive')
gdrive_redirect_uri = fields.Char(string='Google Drive Redirect URI',
compute='_compute_redirect_uri',
help='Redirect URI of the google drive')
domain = fields.Char(string='Domain Name', help="Field used to store the "
"name of a domain")
next_cloud_user_name = fields.Char(string='User Name',
help="Field used to store the user name"
" for a Nextcloud account.")
next_cloud_password = fields.Char(string='Password',
help="Field used to store the password"
" for a Nextcloud account.")
nextcloud_folder_key = fields.Char(string='Next Cloud Folder Id',
help="Field used to store the unique "
"identifier for a Nextcloud "
"folder.")
aws_access_key = fields.Char(string="Amazon S3 Access Key",
help="Field used to store the Access Key"
" for an Amazon S3 bucket.")
aws_secret_access_key = fields.Char(string='Amazon S3 Secret Key',
help="Field used to store the Secret"
" Key for an Amazon S3 bucket.")
bucket_file_name = fields.Char(string='Bucket Name',
help="Field used to store the name of an"
" Amazon S3 bucket.")
aws_folder_name = fields.Char(string='File Name',
help="field used to store the name of a"
" folder in an Amazon S3 bucket.")
def action_s3cloud(self):
"""If it has aws_secret_access_key, which will perform s3cloud
operations for connection test"""
if self.aws_access_key and self.aws_secret_access_key:
try:
s3_client = boto3.client(
's3',
aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_access_key
)
response = s3_client.head_bucket(Bucket=self.bucket_file_name)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
self.active = True
self.hide_active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'title': _("Connection Test Succeeded!"),
'message': _(
"Everything seems properly set up!"),
'sticky': False,
}
}
raise UserError(
_("Bucket not found. Please check the bucket name and"
" try again."))
except Exception:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
def action_nextcloud(self):
"""If it has next_cloud_password, domain, and next_cloud_user_name
which will perform an action for nextcloud connection test"""
if self.domain and self.next_cloud_password and \
self.next_cloud_user_name:
try:
ncx = NextCloud(self.domain,
auth=HTTPBasicAuth(self.next_cloud_user_name,
self.next_cloud_password))
data = ncx.list_folders('/').__dict__
if data['raw'].status_code == 207:
self.active = True
self.hide_active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'success',
'title': _("Connection Test Succeeded!"),
'message': _("Everything seems properly set up!"),
'sticky': False,
}
}
else:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
except Exception:
self.active = False
self.hide_active = False
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'type': 'danger',
'title': _("Connection Test Failed!"),
'message': _("An error occurred while testing the "
"connection."),
'sticky': False,
}
}
@api.depends('onedrive_redirect_uri', 'gdrive_redirect_uri')
def _compute_redirect_uri(self):
"""Compute the redirect URI for onedrive and Google Drive"""
for rec in self:
base_url = request.env['ir.config_parameter'].get_param(
'web.base.url')
rec.onedrive_redirect_uri = base_url + '/onedrive/authentication'
rec.gdrive_redirect_uri = base_url + '/google_drive/authentication'
@api.depends('onedrive_access_token', 'onedrive_refresh_token')
def _compute_is_onedrive_token_generated(self):
"""Set true if onedrive tokens are generated"""
for rec in self:
rec.is_onedrive_token_generated = bool(
rec.onedrive_access_token) and bool(rec.onedrive_refresh_token)
@api.depends('dropbox_refresh_token')
def _compute_is_dropbox_token_generated(self):
"""Set True if the dropbox refresh token is generated"""
for rec in self:
rec.is_dropbox_token_generated = bool(rec.dropbox_refresh_token)
@api.depends('gdrive_access_token', 'gdrive_refresh_token')
def _compute_is_google_drive_token_generated(self):
"""Set True if the Google Drive refresh token is generated"""
for rec in self:
rec.is_google_drive_token_generated = bool(
rec.gdrive_access_token) and bool(rec.gdrive_refresh_token)
def action_get_dropbox_auth_code(self):
"""Open a wizards to set up dropbox Authorization code"""
return {
'type': 'ir.actions.act_window',
'name': 'Dropbox Authorization Wizard',
'res_model': 'dropbox.auth.code',
'view_mode': 'form',
'target': 'new',
'context': {'dropbox_auth': True}
}
def action_get_onedrive_auth_code(self):
"""Generate onedrive authorization code"""
AUTHORITY = \
'https://login.microsoftonline.com/common/oauth2/v2.0/authorize'
action = self.env["ir.actions.act_window"].sudo()._for_xml_id(
"auto_database_backup.db_backup_configure_action")
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
url_return = base_url + \
'/web#id=%d&action=%d&view_type=form&model=%s' % (
self.id, action['id'], 'db.backup.configure')
state = {
'backup_config_id': self.id,
'url_return': url_return
}
encoded_params = urls.url_encode({
'response_type': 'code',
'client_id': self.onedrive_client_key,
'state': json.dumps(state),
'scope': ONEDRIVE_SCOPE,
'redirect_uri': base_url + '/onedrive/authentication',
'prompt': 'consent',
'access_type': 'offline'
})
auth_url = "%s?%s" % (AUTHORITY, encoded_params)
return {
'type': 'ir.actions.act_url',
'target': 'self',
'url': auth_url,
}
def action_get_gdrive_auth_code(self):
"""Generate google drive authorization code"""
action = self.env["ir.actions.act_window"].sudo()._for_xml_id(
"auto_database_backup.db_backup_configure_action")
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
url_return = base_url + \
'/web#id=%d&action=%d&view_type=form&model=%s' % (
self.id, action['id'], 'db.backup.configure')
state = {
'backup_config_id': self.id,
'url_return': url_return
}
encoded_params = urls.url_encode({
'response_type': 'code',
'client_id': self.gdrive_client_key,
'scope': 'https://www.googleapis.com/auth/drive https://www.googleapis.com/auth/drive.file',
'redirect_uri': base_url + '/google_drive/authentication',
'access_type': 'offline',
'state': json.dumps(state),
'approval_prompt': 'force',
})
auth_url = "%s?%s" % (GOOGLE_AUTH_ENDPOINT, encoded_params)
return {
'type': 'ir.actions.act_url',
'target': 'self',
'url': auth_url,
}
def generate_onedrive_refresh_token(self):
"""Generate onedrive access token from refresh token if expired"""
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
headers = {"Content-type": "application/x-www-form-urlencoded"}
data = {
'client_id': self.onedrive_client_key,
'client_secret': self.onedrive_client_secret,
'scope': ONEDRIVE_SCOPE,
'grant_type': "refresh_token",
'redirect_uri': base_url + '/onedrive/authentication',
'refresh_token': self.onedrive_refresh_token
}
try:
res = requests.post(
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
data=data, headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'onedrive_access_token': response.get('access_token'),
'onedrive_refresh_token': response.get('refresh_token'),
'onedrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError as error:
_logger.exception("Bad microsoft onedrive request : %s !",
error.response.content)
raise error
def get_onedrive_tokens(self, authorize_code):
"""Generate onedrive tokens from authorization code."""
headers = {"content-type": "application/x-www-form-urlencoded"}
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
data = {
'code': authorize_code,
'client_id': self.onedrive_client_key,
'client_secret': self.onedrive_client_secret,
'grant_type': 'authorization_code',
'scope': ONEDRIVE_SCOPE,
'redirect_uri': base_url + '/onedrive/authentication'
}
try:
res = requests.post(
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
data=data, headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'onedrive_access_token': response.get('access_token'),
'onedrive_refresh_token': response.get('refresh_token'),
'onedrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError as error:
_logger.exception("Bad microsoft onedrive request : %s !",
error.response.content)
raise error
def generate_gdrive_refresh_token(self):
"""Generate Google Drive access token from refresh token if expired"""
headers = {"content-type": "application/x-www-form-urlencoded"}
data = {
'refresh_token': self.gdrive_refresh_token,
'client_id': self.gdrive_client_key,
'client_secret': self.gdrive_client_secret,
'grant_type': 'refresh_token',
}
try:
res = requests.post(GOOGLE_TOKEN_ENDPOINT, data=data,
headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'gdrive_access_token': response.get('access_token'),
'gdrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError as error:
error_key = error.response.json().get("error", "nc")
error_msg = _(
"An error occurred while generating the token. Your"
"authorization code may be invalid or has already expired [%s]."
"You should check your Client ID and secret on the Google APIs"
" plateform or try to stop and restart your calendar"
" synchronisation.",
error_key)
raise UserError(error_msg)
def get_gdrive_tokens(self, authorize_code):
"""Generate onedrive tokens from authorization code."""
base_url = request.env['ir.config_parameter'].get_param('web.base.url')
headers = {"content-type": "application/x-www-form-urlencoded"}
data = {
'code': authorize_code,
'client_id': self.gdrive_client_key,
'client_secret': self.gdrive_client_secret,
'grant_type': 'authorization_code',
'redirect_uri': base_url + '/google_drive/authentication'
}
try:
res = requests.post(GOOGLE_TOKEN_ENDPOINT, params=data,
headers=headers)
res.raise_for_status()
response = res.content and res.json() or {}
if response:
expires_in = response.get('expires_in')
self.write({
'gdrive_access_token': response.get('access_token'),
'gdrive_refresh_token': response.get('refresh_token'),
'gdrive_token_validity': fields.Datetime.now() + timedelta(
seconds=expires_in) if expires_in else False,
})
except requests.HTTPError:
error_msg = _(
"Something went wrong during your token generation. Maybe your"
" Authorization Code is invalid")
raise UserError(error_msg)
def get_dropbox_auth_url(self):
"""Return dropbox authorization url"""
dbx_auth = dropbox.oauth.DropboxOAuth2FlowNoRedirect(
self.dropbox_client_key,
self.dropbox_client_secret,
token_access_type='offline')
return dbx_auth.start()
def set_dropbox_refresh_token(self, auth_code):
"""Generate and set the dropbox refresh token from authorization code"""
dbx_auth = dropbox.oauth.DropboxOAuth2FlowNoRedirect(
self.dropbox_client_key,
self.dropbox_client_secret,
token_access_type='offline')
outh_result = dbx_auth.finish(auth_code)
self.dropbox_refresh_token = outh_result.refresh_token
@api.constrains('db_name')
def _check_db_credentials(self):
"""Validate entered database name and master password"""
database_list = db.list_dbs(force=True)
if self.db_name not in database_list:
raise ValidationError(_("Invalid Database Name!"))
try:
odoo.service.db.check_super(self.master_pwd)
except Exception:
raise ValidationError(_("Invalid Master Password!"))
def action_sftp_connection(self):
"""Test the sftp and ftp connection using entered credentials"""
if self.backup_destination == 'sftp':
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=self.sftp_host,
username=self.sftp_user,
password=self.sftp_password,
port=self.sftp_port)
sftp = client.open_sftp()
sftp.close()
except Exception as e:
raise UserError(_("SFTP Exception: %s", e))
finally:
client.close()
elif self.backup_destination == 'ftp':
try:
ftp_server = ftplib.FTP()
ftp_server.connect(self.ftp_host, int(self.ftp_port))
ftp_server.login(self.ftp_user, self.ftp_password)
ftp_server.quit()
except Exception as e:
raise UserError(_("FTP Exception: %s", e))
self.hide_active = True
self.active = True
return {
'type': 'ir.actions.client',
'tag': 'display_notification',
'params': {
'title': _("Connection Test Succeeded!"),
'message': _("Everything seems properly set up!"),
'sticky': False,
}
}
@api.onchange('backup_destination')
def _onchange_back_up_local(self):
"""
On change handler for the 'backup_destination' field. This method is
triggered when the value of 'backup_destination' is changed. If the
chosen backup destination is 'local', it sets the 'hide_active' field
to True which make active field to readonly to False.
"""
if self.backup_destination == 'local':
self.hide_active = True
def _schedule_auto_backup(self):
"""Function for generating and storing backup.
Database backup for all the active records in backup configuration
model will be created."""
records = self.search([])
mail_template_success = self.env.ref(
'auto_database_backup.mail_template_data_db_backup_successful')
mail_template_failed = self.env.ref(
'auto_database_backup.mail_template_data_db_backup_failed')
for rec in records:
backup_time = fields.datetime.utcnow().strftime(
"%Y-%m-%d_%H-%M-%S")
backup_filename = "%s_%s.%s" % (
rec.db_name, backup_time, rec.backup_format)
rec.backup_filename = backup_filename
# Local backup
if rec.backup_destination == 'local':
try:
if not os.path.isdir(rec.backup_path):
os.makedirs(rec.backup_path)
backup_file = os.path.join(rec.backup_path,
backup_filename)
f = open(backup_file, "wb")
self.dump_data(rec.db_name, f, rec.backup_format)
f.close()
# Remove older backups
if rec.auto_remove:
for filename in os.listdir(rec.backup_path):
file = os.path.join(rec.backup_path, filename)
create_time = fields.datetime.fromtimestamp(
os.path.getctime(file))
backup_duration = fields.datetime.utcnow() - create_time
if backup_duration.days >= rec.days_to_remove:
os.remove(file)
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('FTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# FTP backup
elif rec.backup_destination == 'ftp':
try:
ftp_server = ftplib.FTP()
ftp_server.connect(rec.ftp_host, int(rec.ftp_port))
ftp_server.login(rec.ftp_user, rec.ftp_password)
ftp_server.encoding = "utf-8"
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
try:
ftp_server.cwd(rec.ftp_path)
except ftplib.error_perm:
ftp_server.mkd(rec.ftp_path)
ftp_server.cwd(rec.ftp_path)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
ftp_server.storbinary('STOR %s' % backup_filename,
open(temp.name, "rb"))
if rec.auto_remove:
files = ftp_server.nlst()
for file in files:
create_time = fields.datetime.strptime(
ftp_server.sendcmd('MDTM ' + file)[4:],
"%Y%m%d%H%M%S")
diff_days = (
fields.datetime.now() - create_time).days
if diff_days >= rec.days_to_remove:
ftp_server.delete(file)
ftp_server.quit()
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('FTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# SFTP backup
elif rec.backup_destination == 'sftp':
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname=rec.sftp_host,
username=rec.sftp_user,
password=rec.sftp_password,
port=rec.sftp_port)
sftp = client.open_sftp()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp, rec.backup_format)
try:
sftp.chdir(rec.sftp_path)
except IOError as e:
if e.errno == errno.ENOENT:
sftp.mkdir(rec.sftp_path)
sftp.chdir(rec.sftp_path)
sftp.put(temp.name, backup_filename)
if rec.auto_remove:
files = sftp.listdir()
expired = list(filter(
lambda fl: (fields.datetime.now()
- fields.datetime.fromtimestamp(
sftp.stat(fl).st_mtime)).days >=
rec.days_to_remove, files))
for file in expired:
sftp.unlink(file)
sftp.close()
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('SFTP Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
finally:
client.close()
# Google Drive backup
elif rec.backup_destination == 'google_drive':
try:
if rec.gdrive_token_validity <= fields.Datetime.now():
rec.generate_gdrive_refresh_token()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
try:
headers = {
"Authorization": "Bearer %s" % rec.gdrive_access_token}
para = {
"name": backup_filename,
"parents": [rec.google_drive_folder_key],
}
files = {
'data': ('metadata', json.dumps(para),
'application/json; charset=UTF-8'),
'file': open(temp.name, "rb")
}
requests.post(
"https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart",
headers=headers,
files=files
)
if rec.auto_remove:
query = "parents = '%s'" % rec.google_drive_folder_key
files_req = requests.get(
"https://www.googleapis.com/drive/v3/files?q=%s" % query,
headers=headers)
files = files_req.json()['files']
for file in files:
file_date_req = requests.get(
"https://www.googleapis.com/drive/v3/files/%s?fields=createdTime" %
file['id'], headers=headers)
create_time = file_date_req.json()[
'createdTime'][
:19].replace('T', ' ')
diff_days = (
fields.datetime.now() - fields.datetime.strptime(
create_time, '%Y-%m-%d %H:%M:%S')).days
if diff_days >= rec.days_to_remove:
requests.delete(
"https://www.googleapis.com/drive/v3/files/%s" %
file['id'], headers=headers)
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as e:
rec.generated_exception = e
_logger.info('Google Drive Exception: %s', e)
if rec.notify_user:
mail_template_failed.send_mail(rec.id,
force_send=True)
except Exception:
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
raise ValidationError(
'Please check the credentials before activation')
else:
raise ValidationError('Please check connection')
# Dropbox backup
elif rec.backup_destination == 'dropbox':
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
try:
dbx = dropbox.Dropbox(
app_key=rec.dropbox_client_key,
app_secret=rec.dropbox_client_secret,
oauth2_refresh_token=rec.dropbox_refresh_token)
dropbox_destination = (rec.dropbox_folder + '/' +
backup_filename)
dbx.files_upload(temp.read(), dropbox_destination)
if rec.auto_remove:
files = dbx.files_list_folder(rec.dropbox_folder)
file_entries = files.entries
expired_files = list(filter(
lambda fl: (fields.datetime.now() -
fl.client_modified).days >=
rec.days_to_remove,
file_entries))
for file in expired_files:
dbx.files_delete_v2(file.path_display)
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('Dropbox Exception: %s', error)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# Onedrive Backup
elif rec.backup_destination == 'onedrive':
if rec.onedrive_token_validity <= fields.Datetime.now():
rec.generate_onedrive_refresh_token()
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp, rec.backup_format)
headers = {
'Authorization': 'Bearer %s' % rec.onedrive_access_token,
'Content-Type': 'application/json'}
upload_session_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s:/%s:/createUploadSession" % (
rec.onedrive_folder_key, backup_filename)
try:
upload_session = requests.post(upload_session_url,
headers=headers)
upload_url = upload_session.json().get('uploadUrl')
requests.put(upload_url, data=temp.read())
if rec.auto_remove:
list_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s/children" % rec.onedrive_folder_key
response = requests.get(list_url, headers=headers)
files = response.json().get('value')
for file in files:
create_time = file['createdDateTime'][:19].replace(
'T',
' ')
diff_days = (
fields.datetime.now() - fields.datetime.strptime(
create_time, '%Y-%m-%d %H:%M:%S')).days
if diff_days >= rec.days_to_remove:
delete_url = MICROSOFT_GRAPH_END_POINT + "/v1.0/me/drive/items/%s" % \
file['id']
requests.delete(delete_url, headers=headers)
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('Onedrive Exception: %s', error)
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
# NextCloud Backup
elif rec.backup_destination == 'next_cloud':
try:
if rec.domain and rec.next_cloud_password and \
rec.next_cloud_user_name:
try:
# Connect to NextCloud using the provided username
# and password
ncx = NextCloud(rec.domain,
auth=HTTPBasicAuth(
rec.next_cloud_user_name,
rec.next_cloud_password))
# Connect to NextCloud again to perform additional
# operations
nc = nextcloud_client.Client(rec.domain)
nc.login(rec.next_cloud_user_name,
rec.next_cloud_password)
# Get the folder name from the NextCloud folder ID
folder_name = rec.nextcloud_folder_key
# If auto_remove is enabled, remove backup files
# older than specified days
if rec.auto_remove:
folder_path = "/" + folder_name
for item in nc.list(folder_path):
backup_file_name = item.path.split("/")[-1]
backup_date_str = \
backup_file_name.split("_")[1]
backup_date = fields.datetime.strptime(
backup_date_str, '%Y-%m-%d').date()
if (fields.date.today() - backup_date).days \
>= rec.days_to_remove:
nc.delete(item.path)
# If notify_user is enabled, send a success email
# notification
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as error:
rec.generated_exception = error
_logger.info('NextCloud Exception: %s', error)
if rec.notify_user:
# If an exception occurs, send a failed email
# notification
mail_template_failed.send_mail(rec.id,
force_send=True)
# Get the list of folders in the root directory of NextCloud
data = ncx.list_folders('/').__dict__
folders = [
[file_name['href'].split('/')[-2],
file_name['file_id']]
for file_name in data['data'] if
file_name['href'].endswith('/')]
# If the folder name is not found in the list of folders,
# create the folder
if folder_name not in [file[0] for file in folders]:
nc.mkdir(folder_name)
# Dump the database to a temporary file
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = f"/{folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
nc.put_file(remote_file_path, backup_file_path)
else:
# Dump the database to a temporary file
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = f"/{folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
nc.put_file(remote_file_path, backup_file_path)
except Exception:
raise ValidationError('Please check connection')
# Amazon S3 Backup
elif rec.backup_destination == 'amazon_s3':
if rec.aws_access_key and rec.aws_secret_access_key:
try:
# Create a boto3 client for Amazon S3 with provided
# access key id and secret access key
bo3 = boto3.client(
's3',
aws_access_key_id=rec.aws_access_key,
aws_secret_access_key=rec.aws_secret_access_key)
# If auto_remove is enabled, remove the backups that
# are older than specified days from the S3 bucket
if rec.auto_remove:
folder_path = rec.aws_folder_name
response = bo3.list_objects(
Bucket=rec.bucket_file_name,
Prefix=folder_path)
today = fields.date.today()
for file in response['Contents']:
file_path = file['Key']
last_modified = file['LastModified']
date = last_modified.date()
age_in_days = (today - date).days
if age_in_days >= rec.days_to_remove:
bo3.delete_object(
Bucket=rec.bucket_file_name,
Key=file_path)
# Create a boto3 resource for Amazon S3 with provided
# access key id and secret access key
s3 = boto3.resource(
's3',
aws_access_key_id=rec.aws_access_key,
aws_secret_access_key=rec.aws_secret_access_key)
# Create a folder in the specified bucket, if it
# doesn't already exist
s3.Object(rec.bucket_file_name,
rec.aws_folder_name + '/').put()
bucket = s3.Bucket(rec.bucket_file_name)
# Get all the prefixes in the bucket
prefixes = set()
for obj in bucket.objects.all():
key = obj.key
if key.endswith('/'):
prefix = key[:-1] # Remove the trailing slash
prefixes.add(prefix)
# If the specified folder is present in the bucket,
# take a backup of the database and upload it to the
# S3 bucket
if rec.aws_folder_name in prefixes:
temp = tempfile.NamedTemporaryFile(
suffix='.%s' % rec.backup_format)
with open(temp.name, "wb+") as tmp:
self.dump_data(rec.db_name, tmp,
rec.backup_format)
backup_file_path = temp.name
remote_file_path = f"{rec.aws_folder_name}/{rec.db_name}_" \
f"{backup_time}.{rec.backup_format}"
s3.Object(rec.bucket_file_name,
remote_file_path).upload_file(
backup_file_path)
# If notify_user is enabled, send an email to the
# user notifying them about the successful backup
if rec.notify_user:
mail_template_success.send_mail(rec.id,
force_send=True)
except Exception as error:
# If any error occurs, set the 'generated_exception'
# field to the error message and log the error
rec.generated_exception = error
_logger.info('Amazon S3 Exception: %s', error)
# If notify_user is enabled, email the user
# notifying them about the failed backup
if rec.notify_user:
mail_template_failed.send_mail(rec.id, force_send=True)
def dump_data(self, db_name, stream, backup_format):
"""Dump database `db` into file-like object `stream` if stream is None
return a file object with the dump. """
cron_user_id = self.env.ref('auto_database_backup.ir_cron_auto_db_backup').user_id.id
if cron_user_id != self.env.user.id:
_logger.error(
'Unauthorized database operation. Backups should only be available from the cron job.')
raise ValidationError("Unauthorized database operation. Backups should only be available from the cron job.")
_logger.info('DUMP DB: %s format %s', db_name, backup_format)
cmd = [find_pg_tool('pg_dump'), '--no-owner', db_name]
env = exec_pg_environ()
if backup_format == 'zip':
with tempfile.TemporaryDirectory() as dump_dir:
filestore = odoo.tools.config.filestore(db_name)
cmd.insert(-1,'--file=' + os.path.join(dump_dir, 'dump.sql'))
subprocess.run(cmd, env=env, stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT, check=True)
if os.path.exists(filestore):
shutil.copytree(filestore,
os.path.join(dump_dir, 'filestore'))
with open(os.path.join(dump_dir, 'manifest.json'), 'w') as fh:
db = odoo.sql_db.db_connect(db_name)
with db.cursor() as cr:
json.dump(self._dump_db_manifest(cr), fh, indent=4)
if stream:
odoo.tools.osutil.zip_dir(dump_dir, stream,
include_dir=False,
fnct_sort=lambda
file_name: file_name != 'dump.sql')
else:
t = tempfile.TemporaryFile()
odoo.tools.osutil.zip_dir(dump_dir, t, include_dir=False,
fnct_sort=lambda
file_name: file_name != 'dump.sql')
t.seek(0)
return t
else:
cmd.insert(-1,'--format=c')
process = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
stdout, _ = process.communicate()
if stream:
stream.write(stdout)
else:
return stdout
def _dump_db_manifest(self, cr):
""" This function generates a manifest dictionary for database dump."""
pg_version = "%d.%d" % divmod(cr._obj.connection.server_version / 100, 100)
cr.execute(
"SELECT name, latest_version FROM ir_module_module WHERE state = 'installed'")
modules = dict(cr.fetchall())
manifest = {
'odoo_dump': '1',
'db_name': cr.dbname,
'version': odoo.release.version,
'version_info': odoo.release.version_info,
'major_version': odoo.release.major_version,
'pg_version': pg_version,
'modules': modules,
}
return manifest