You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							244 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							244 lines
						
					
					
						
							12 KiB
						
					
					
				
								# -*- coding: utf-8 -*-
							 | 
						|
								################################################################################
							 | 
						|
								#
							 | 
						|
								#    Cybrosys Technologies Pvt. Ltd.
							 | 
						|
								#
							 | 
						|
								#    Copyright (C) 2025-TODAY Cybrosys Technologies(<https://www.cybrosys.com>).
							 | 
						|
								#    Author: Unnimaya C O (odoo@cybrosys.com)
							 | 
						|
								#
							 | 
						|
								#    You can modify it under the terms of the GNU AFFERO
							 | 
						|
								#    GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
							 | 
						|
								#
							 | 
						|
								#    This program is distributed in the hope that it will be useful,
							 | 
						|
								#    but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						|
								#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						|
								#    GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
							 | 
						|
								#
							 | 
						|
								#    You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
							 | 
						|
								#    (AGPL v3) along with this program.
							 | 
						|
								#    If not, see <http://www.gnu.org/licenses/>.
							 | 
						|
								#
							 | 
						|
								################################################################################
							 | 
						|
								import mysql.connector
							 | 
						|
								from odoo import api, fields, models
							 | 
						|
								from odoo.exceptions import ValidationError
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								class MysqlConnector(models.Model):
							 | 
						|
								    """Model for connecting with Mysql"""
							 | 
						|
								    _name = 'mysql.connector'
							 | 
						|
								    _description = 'Mysql Connector'
							 | 
						|
								
							 | 
						|
								    name = fields.Char(string='Name', help='Name of the record',
							 | 
						|
								                       required=True)
							 | 
						|
								    credential_id = fields.Many2one('mysql.credential',
							 | 
						|
								                                    string='Connection', domain="[('state', '=', 'connect')]",
							 | 
						|
								                                    help='Choose the My Sql connection',
							 | 
						|
								                                    required=True)
							 | 
						|
								    sql_table = fields.Char(string='Mysql Table Name',
							 | 
						|
								                            help='Name of the table in Mysql database',
							 | 
						|
								                            required=True)
							 | 
						|
								    model_id = fields.Many2one('ir.model', string='Odoo Table Name',
							 | 
						|
								                               help='Database table in Odoo to which you have'
							 | 
						|
								                                    ' to map the data',
							 | 
						|
								                               domain=lambda self: [(
							 | 
						|
								                                   'access_ids', 'not in',
							 | 
						|
								                                   self.env.user.groups_id.ids)],
							 | 
						|
								                               required=True, ondelete="cascade")
							 | 
						|
								    sync_ids = fields.One2many('sync.table',
							 | 
						|
								                               'connection_id',
							 | 
						|
								                               string='Field Mapping',
							 | 
						|
								                               help='Select the fields to be mapped')
							 | 
						|
								    is_fetched = fields.Boolean(string='Is Fetched',
							 | 
						|
								                                help='True if once data fetched from mysql')
							 | 
						|
								    state = fields.Selection([
							 | 
						|
								        ('draft', 'Draft'),
							 | 
						|
								        ('fetched', 'Fetched'),
							 | 
						|
								        ('sync', 'Synced')
							 | 
						|
								    ], string='Status', readonly=True, copy=False, default='draft', help="State of the record")
							 | 
						|
								
							 | 
						|
								    @api.onchange('model_id', 'sql_table')
							 | 
						|
								    def _onchange_model_id(self):
							 | 
						|
								        """Method for reloading the one2many field"""
							 | 
						|
								        self.sync_ids = False
							 | 
						|
								        self.is_fetched = False
							 | 
						|
								
							 | 
						|
								    def action_sync_table(self):
							 | 
						|
								        """Method for syncing tables"""
							 | 
						|
								        field_list = self.mysql_connect(f"SHOW COLUMNS FROM {self.sql_table};")
							 | 
						|
								        records = self.mysql_connect(f"SELECT * FROM {self.sql_table};")
							 | 
						|
								        required_fields = []
							 | 
						|
								        for field in self.env['ir.model.fields'].search(
							 | 
						|
								                [('model', '=', self.model_id.model),
							 | 
						|
								                 ('required', '=', True),
							 | 
						|
								                 ('ttype', 'not in',
							 | 
						|
								                  ['one2many', 'many2many'])]).mapped('name'):
							 | 
						|
								            if not self.env[self.model_id.model].default_get([field]):
							 | 
						|
								                required_fields.append(field)
							 | 
						|
								        if not set(required_fields).issubset(
							 | 
						|
								                self.sync_ids.ir_field_id.mapped('name')):
							 | 
						|
								            missing_fields = set(required_fields) - set(
							 | 
						|
								                self.sync_ids.ir_field_id.mapped('name'))
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                "Must provide values for the Odoo fields {}".format(", ".join(
							 | 
						|
								                    missing_fields)))
							 | 
						|
								        vals = {}
							 | 
						|
								        unique = next(
							 | 
						|
								            (rec['Field'] for rec in field_list if
							 | 
						|
								             rec.get('Key', '').upper() == 'PRI'),
							 | 
						|
								            next((rec['Field'] for rec in field_list if
							 | 
						|
								                  rec.get('Key', '').upper() == 'UNI'), None))
							 | 
						|
								        if not unique:
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                f"The MySQL table {self.sql_table} cannot be imported "
							 | 
						|
								                f"because it "
							 | 
						|
								                "doesn't have a Unique or Primary key.")
							 | 
						|
								        if records:
							 | 
						|
								            for item in records:
							 | 
						|
								                imported = self.env['imported.data'].sudo().search(
							 | 
						|
								                    [('model_id', '=', self.model_id.id),
							 | 
						|
								                     ('mysql_ref', '=', item[unique]),
							 | 
						|
								                     ('mysql_table', '=', self.sql_table),
							 | 
						|
								                     ('log_note', '=', 'Success')])
							 | 
						|
								                if not imported:
							 | 
						|
								                    mysql_to_odoo_type_mapping = {
							 | 
						|
								                        'tinyint': ['boolean'],
							 | 
						|
								                        'smallint': ['integer', 'Many2one'],
							 | 
						|
								                        'mediumint': ['integer', 'Many2one'],
							 | 
						|
								                        'int': ['integer', 'Many2one'],
							 | 
						|
								                        'bigint': ['integer', 'Many2one'],
							 | 
						|
								                        'float': ['float'],
							 | 
						|
								                        'double': ['float'],
							 | 
						|
								                        'decimal': ['float'],
							 | 
						|
								                        'numeric': ['float'],
							 | 
						|
								                        'char': ['char'],
							 | 
						|
								                        'varchar': ['char'],
							 | 
						|
								                        'text': ['text'],
							 | 
						|
								                        'mediumtext': ['text'],
							 | 
						|
								                        'longtext': ['text'],
							 | 
						|
								                        'binary': ['binary'],
							 | 
						|
								                        'varbinary': ['binary'],
							 | 
						|
								                        'blob': ['binary'],
							 | 
						|
								                        'tinyblob': ['binary'],
							 | 
						|
								                        'mediumblob': ['binary'],
							 | 
						|
								                        'longblob': ['binary'],
							 | 
						|
								                        'date': ['date'],
							 | 
						|
								                        'datetime': ['datetime'],
							 | 
						|
								                        'timestamp': ['datetime'],
							 | 
						|
								                    }
							 | 
						|
								                    for rec in self.sync_ids:
							 | 
						|
								                        if rec.ir_field_id:
							 | 
						|
								                            mysql_data_type = rec.data_type.lower().split('(')[0]
							 | 
						|
								                            if not rec.foreign_key and not (
							 | 
						|
								                                    rec.ir_field_id.ttype in
							 | 
						|
								                                    mysql_to_odoo_type_mapping[
							 | 
						|
								                                        mysql_data_type]):
							 | 
						|
								                                raise ValidationError(
							 | 
						|
								                                    f'Data type of {rec.mysql_field} '
							 | 
						|
								                                    f'cannot be converted to the data'
							 | 
						|
								                                    f'type of {rec.ir_field_id.name}')
							 | 
						|
								                            if rec.foreign_key:
							 | 
						|
								                                foreign_record = self.env[
							 | 
						|
								                                    'imported.data'].sudo().search(
							 | 
						|
								                                    [('mysql_table', '=', rec.ref_table),
							 | 
						|
								                                     ('mysql_ref', '=', item[rec.mysql_field])])
							 | 
						|
								                                if foreign_record:
							 | 
						|
								                                    if rec.ir_field_id.ttype == 'many2one':
							 | 
						|
								                                        vals[
							 | 
						|
								                                            rec.ir_field_id.name] = foreign_record.odoo_ref
							 | 
						|
								                                    else:
							 | 
						|
								                                        foreign_record_browse = self.env[foreign_record.model_id.model].browse(
							 | 
						|
								                                            foreign_record.odoo_ref)
							 | 
						|
								                                        rec_name_field = self.env[foreign_record.model_id.model]._rec_name
							 | 
						|
								                                        rec_name_value = getattr(foreign_record_browse, rec_name_field, None)
							 | 
						|
								                                        if rec_name_value is not None:
							 | 
						|
								                                            vals[rec.ir_field_id.name] = rec_name_value
							 | 
						|
								                                if not foreign_record:
							 | 
						|
								                                    raise ValidationError(
							 | 
						|
								                                        f'The {rec.mysql_field} column '
							 | 
						|
								                                        f'of {self.sql_table} table establishes a '
							 | 
						|
								                                        f'foreign key relationship with the '
							 | 
						|
								                                        f'{rec.ref_table} table in  MySQL. Please '
							 | 
						|
								                                        f' synchronize the {rec.ref_table} table'
							 | 
						|
								                                        f' first.')
							 | 
						|
								                            else:
							 | 
						|
								                                vals[rec.ir_field_id.name] = item[rec.mysql_field]
							 | 
						|
								                    if vals:
							 | 
						|
								                        record = self.env[self.model_id.model].create(vals)
							 | 
						|
								                        imported.sudo().create({
							 | 
						|
								                            'model_id': self.model_id.id,
							 | 
						|
								                            'mysql_ref': item[unique],
							 | 
						|
								                            'mysql_table': self.sql_table,
							 | 
						|
								                            'odoo_ref': record.id,
							 | 
						|
								                            'log_note': 'Success'
							 | 
						|
								                        })
							 | 
						|
								            self.write({
							 | 
						|
								                'state': 'sync'
							 | 
						|
								            })
							 | 
						|
								
							 | 
						|
								    def action_fetch_data(self):
							 | 
						|
								        """Method for fetching the columns of Mysql table"""
							 | 
						|
								        records = self.mysql_connect(f"SHOW COLUMNS FROM {self.sql_table};")
							 | 
						|
								        if not any(key in rec.get('Key', '') for rec in records for
							 | 
						|
								                   key in ['PRI', 'UNI']):
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                "The MySQL table cannot be imported because it "
							 | 
						|
								                "doesn't have a Unique or Primary key.")
							 | 
						|
								        self.sync_ids.unlink()
							 | 
						|
								        if records:
							 | 
						|
								            for rec in records:
							 | 
						|
								                constraints = self.mysql_connect(
							 | 
						|
								                    f"SELECT CONSTRAINT_NAME, COLUMN_NAME,"
							 | 
						|
								                    f" REFERENCED_TABLE_NAME, "
							 | 
						|
								                    f"REFERENCED_COLUMN_NAME FROM "
							 | 
						|
								                    f"INFORMATION_SCHEMA.KEY_COLUMN_USAGE "
							 | 
						|
								                    f"WHERE TABLE_NAME = "
							 | 
						|
								                    f"'{self.sql_table}' and COLUMN_NAME = "
							 | 
						|
								                    f"'{rec['Field']}' and REFERENCED_TABLE_NAME != 'None' and "
							 | 
						|
								                    f"REFERENCED_COLUMN_NAME != 'None'")
							 | 
						|
								                vals = {
							 | 
						|
								                    'connection_id': self.id,
							 | 
						|
								                    'data_type': rec['Type'],
							 | 
						|
								                    'mysql_field': rec['Field'],
							 | 
						|
								                    'model_id': self.model_id.id,
							 | 
						|
								                    'foreign_key': True if constraints else False
							 | 
						|
								                }
							 | 
						|
								                if constraints:
							 | 
						|
								                    vals['ref_table'] = constraints[0]['REFERENCED_TABLE_NAME']
							 | 
						|
								                    vals['ref_col'] = constraints[0]['REFERENCED_COLUMN_NAME']
							 | 
						|
								                if rec['Field'] not in self.sync_ids.sudo(
							 | 
						|
								                ).search([('connection_id',
							 | 
						|
								                           '=', self.id)]).mapped('mysql_field'):
							 | 
						|
								                    self.sudo().write({
							 | 
						|
								                        'sync_ids': [
							 | 
						|
								                            (0, 0, vals)]
							 | 
						|
								                    })
							 | 
						|
								            self.write({
							 | 
						|
								                'state': 'fetched'
							 | 
						|
								            })
							 | 
						|
								
							 | 
						|
								    def mysql_connect(self, query):
							 | 
						|
								        """Method for connecting with Mysql"""
							 | 
						|
								        try:
							 | 
						|
								            connection = mysql.connector.connect(
							 | 
						|
								                host=self.credential_id.host,
							 | 
						|
								                user=self.credential_id.user,
							 | 
						|
								                password=self.credential_id.password,
							 | 
						|
								                database=self.credential_id.name
							 | 
						|
								            )
							 | 
						|
								            if not connection.is_connected():
							 | 
						|
								                raise ValidationError(f"Error connecting to MySQL")
							 | 
						|
								            cursor = connection.cursor(dictionary=True)
							 | 
						|
								            # Execute your MySQL query
							 | 
						|
								            cursor.execute(query)
							 | 
						|
								            # Fetch the results and store them in a variable
							 | 
						|
								            results = cursor.fetchall()
							 | 
						|
								            # Set the flag after successfully fetching the data
							 | 
						|
								            self.is_fetched = True
							 | 
						|
								            # Close cursor and connection
							 | 
						|
								            cursor.close()
							 | 
						|
								            connection.close()
							 | 
						|
								            return results
							 | 
						|
								        except mysql.connector.Error as e:
							 | 
						|
								            # Handle any connection errors
							 | 
						|
								            raise ValidationError(f"Error connecting to MySQL: {e}")
							 | 
						|
								
							 |