You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							244 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							244 lines
						
					
					
						
							12 KiB
						
					
					
				| # -*- coding: utf-8 -*- | |
| ################################################################################ | |
| # | |
| #    Cybrosys Technologies Pvt. Ltd. | |
| # | |
| #    Copyright (C) 2025-TODAY Cybrosys Technologies(<https://www.cybrosys.com>). | |
| #    Author: Unnimaya C O (odoo@cybrosys.com) | |
| # | |
| #    You can modify it under the terms of the GNU AFFERO | |
| #    GENERAL PUBLIC LICENSE (AGPL v3), Version 3. | |
| # | |
| #    This program is distributed in the hope that it will be useful, | |
| #    but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | |
| #    GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details. | |
| # | |
| #    You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE | |
| #    (AGPL v3) along with this program. | |
| #    If not, see <http://www.gnu.org/licenses/>. | |
| # | |
| ################################################################################ | |
| import mysql.connector | |
| from odoo import api, fields, models | |
| from odoo.exceptions import ValidationError | |
| 
 | |
| 
 | |
| class MysqlConnector(models.Model): | |
|     """Model for connecting with Mysql""" | |
|     _name = 'mysql.connector' | |
|     _description = 'Mysql Connector' | |
| 
 | |
|     name = fields.Char(string='Name', help='Name of the record', | |
|                        required=True) | |
|     credential_id = fields.Many2one('mysql.credential', | |
|                                     string='Connection', domain="[('state', '=', 'connect')]", | |
|                                     help='Choose the My Sql connection', | |
|                                     required=True) | |
|     sql_table = fields.Char(string='Mysql Table Name', | |
|                             help='Name of the table in Mysql database', | |
|                             required=True) | |
|     model_id = fields.Many2one('ir.model', string='Odoo Table Name', | |
|                                help='Database table in Odoo to which you have' | |
|                                     ' to map the data', | |
|                                domain=lambda self: [( | |
|                                    'access_ids', 'not in', | |
|                                    self.env.user.groups_id.ids)], | |
|                                required=True, ondelete="cascade") | |
|     sync_ids = fields.One2many('sync.table', | |
|                                'connection_id', | |
|                                string='Field Mapping', | |
|                                help='Select the fields to be mapped') | |
|     is_fetched = fields.Boolean(string='Is Fetched', | |
|                                 help='True if once data fetched from mysql') | |
|     state = fields.Selection([ | |
|         ('draft', 'Draft'), | |
|         ('fetched', 'Fetched'), | |
|         ('sync', 'Synced') | |
|     ], string='Status', readonly=True, copy=False, default='draft', help="State of the record") | |
| 
 | |
|     @api.onchange('model_id', 'sql_table') | |
|     def _onchange_model_id(self): | |
|         """Method for reloading the one2many field""" | |
|         self.sync_ids = False | |
|         self.is_fetched = False | |
| 
 | |
|     def action_sync_table(self): | |
|         """Method for syncing tables""" | |
|         field_list = self.mysql_connect(f"SHOW COLUMNS FROM {self.sql_table};") | |
|         records = self.mysql_connect(f"SELECT * FROM {self.sql_table};") | |
|         required_fields = [] | |
|         for field in self.env['ir.model.fields'].search( | |
|                 [('model', '=', self.model_id.model), | |
|                  ('required', '=', True), | |
|                  ('ttype', 'not in', | |
|                   ['one2many', 'many2many'])]).mapped('name'): | |
|             if not self.env[self.model_id.model].default_get([field]): | |
|                 required_fields.append(field) | |
|         if not set(required_fields).issubset( | |
|                 self.sync_ids.ir_field_id.mapped('name')): | |
|             missing_fields = set(required_fields) - set( | |
|                 self.sync_ids.ir_field_id.mapped('name')) | |
|             raise ValidationError( | |
|                 "Must provide values for the Odoo fields {}".format(", ".join( | |
|                     missing_fields))) | |
|         vals = {} | |
|         unique = next( | |
|             (rec['Field'] for rec in field_list if | |
|              rec.get('Key', '').upper() == 'PRI'), | |
|             next((rec['Field'] for rec in field_list if | |
|                   rec.get('Key', '').upper() == 'UNI'), None)) | |
|         if not unique: | |
|             raise ValidationError( | |
|                 f"The MySQL table {self.sql_table} cannot be imported " | |
|                 f"because it " | |
|                 "doesn't have a Unique or Primary key.") | |
|         if records: | |
|             for item in records: | |
|                 imported = self.env['imported.data'].sudo().search( | |
|                     [('model_id', '=', self.model_id.id), | |
|                      ('mysql_ref', '=', item[unique]), | |
|                      ('mysql_table', '=', self.sql_table), | |
|                      ('log_note', '=', 'Success')]) | |
|                 if not imported: | |
|                     mysql_to_odoo_type_mapping = { | |
|                         'tinyint': ['boolean'], | |
|                         'smallint': ['integer', 'Many2one'], | |
|                         'mediumint': ['integer', 'Many2one'], | |
|                         'int': ['integer', 'Many2one'], | |
|                         'bigint': ['integer', 'Many2one'], | |
|                         'float': ['float'], | |
|                         'double': ['float'], | |
|                         'decimal': ['float'], | |
|                         'numeric': ['float'], | |
|                         'char': ['char'], | |
|                         'varchar': ['char'], | |
|                         'text': ['text'], | |
|                         'mediumtext': ['text'], | |
|                         'longtext': ['text'], | |
|                         'binary': ['binary'], | |
|                         'varbinary': ['binary'], | |
|                         'blob': ['binary'], | |
|                         'tinyblob': ['binary'], | |
|                         'mediumblob': ['binary'], | |
|                         'longblob': ['binary'], | |
|                         'date': ['date'], | |
|                         'datetime': ['datetime'], | |
|                         'timestamp': ['datetime'], | |
|                     } | |
|                     for rec in self.sync_ids: | |
|                         if rec.ir_field_id: | |
|                             mysql_data_type = rec.data_type.lower().split('(')[0] | |
|                             if not rec.foreign_key and not ( | |
|                                     rec.ir_field_id.ttype in | |
|                                     mysql_to_odoo_type_mapping[ | |
|                                         mysql_data_type]): | |
|                                 raise ValidationError( | |
|                                     f'Data type of {rec.mysql_field} ' | |
|                                     f'cannot be converted to the data' | |
|                                     f'type of {rec.ir_field_id.name}') | |
|                             if rec.foreign_key: | |
|                                 foreign_record = self.env[ | |
|                                     'imported.data'].sudo().search( | |
|                                     [('mysql_table', '=', rec.ref_table), | |
|                                      ('mysql_ref', '=', item[rec.mysql_field])]) | |
|                                 if foreign_record: | |
|                                     if rec.ir_field_id.ttype == 'many2one': | |
|                                         vals[ | |
|                                             rec.ir_field_id.name] = foreign_record.odoo_ref | |
|                                     else: | |
|                                         foreign_record_browse = self.env[foreign_record.model_id.model].browse( | |
|                                             foreign_record.odoo_ref) | |
|                                         rec_name_field = self.env[foreign_record.model_id.model]._rec_name | |
|                                         rec_name_value = getattr(foreign_record_browse, rec_name_field, None) | |
|                                         if rec_name_value is not None: | |
|                                             vals[rec.ir_field_id.name] = rec_name_value | |
|                                 if not foreign_record: | |
|                                     raise ValidationError( | |
|                                         f'The {rec.mysql_field} column ' | |
|                                         f'of {self.sql_table} table establishes a ' | |
|                                         f'foreign key relationship with the ' | |
|                                         f'{rec.ref_table} table in  MySQL. Please ' | |
|                                         f' synchronize the {rec.ref_table} table' | |
|                                         f' first.') | |
|                             else: | |
|                                 vals[rec.ir_field_id.name] = item[rec.mysql_field] | |
|                     if vals: | |
|                         record = self.env[self.model_id.model].create(vals) | |
|                         imported.sudo().create({ | |
|                             'model_id': self.model_id.id, | |
|                             'mysql_ref': item[unique], | |
|                             'mysql_table': self.sql_table, | |
|                             'odoo_ref': record.id, | |
|                             'log_note': 'Success' | |
|                         }) | |
|             self.write({ | |
|                 'state': 'sync' | |
|             }) | |
| 
 | |
|     def action_fetch_data(self): | |
|         """Method for fetching the columns of Mysql table""" | |
|         records = self.mysql_connect(f"SHOW COLUMNS FROM {self.sql_table};") | |
|         if not any(key in rec.get('Key', '') for rec in records for | |
|                    key in ['PRI', 'UNI']): | |
|             raise ValidationError( | |
|                 "The MySQL table cannot be imported because it " | |
|                 "doesn't have a Unique or Primary key.") | |
|         self.sync_ids.unlink() | |
|         if records: | |
|             for rec in records: | |
|                 constraints = self.mysql_connect( | |
|                     f"SELECT CONSTRAINT_NAME, COLUMN_NAME," | |
|                     f" REFERENCED_TABLE_NAME, " | |
|                     f"REFERENCED_COLUMN_NAME FROM " | |
|                     f"INFORMATION_SCHEMA.KEY_COLUMN_USAGE " | |
|                     f"WHERE TABLE_NAME = " | |
|                     f"'{self.sql_table}' and COLUMN_NAME = " | |
|                     f"'{rec['Field']}' and REFERENCED_TABLE_NAME != 'None' and " | |
|                     f"REFERENCED_COLUMN_NAME != 'None'") | |
|                 vals = { | |
|                     'connection_id': self.id, | |
|                     'data_type': rec['Type'], | |
|                     'mysql_field': rec['Field'], | |
|                     'model_id': self.model_id.id, | |
|                     'foreign_key': True if constraints else False | |
|                 } | |
|                 if constraints: | |
|                     vals['ref_table'] = constraints[0]['REFERENCED_TABLE_NAME'] | |
|                     vals['ref_col'] = constraints[0]['REFERENCED_COLUMN_NAME'] | |
|                 if rec['Field'] not in self.sync_ids.sudo( | |
|                 ).search([('connection_id', | |
|                            '=', self.id)]).mapped('mysql_field'): | |
|                     self.sudo().write({ | |
|                         'sync_ids': [ | |
|                             (0, 0, vals)] | |
|                     }) | |
|             self.write({ | |
|                 'state': 'fetched' | |
|             }) | |
| 
 | |
|     def mysql_connect(self, query): | |
|         """Method for connecting with Mysql""" | |
|         try: | |
|             connection = mysql.connector.connect( | |
|                 host=self.credential_id.host, | |
|                 user=self.credential_id.user, | |
|                 password=self.credential_id.password, | |
|                 database=self.credential_id.name | |
|             ) | |
|             if not connection.is_connected(): | |
|                 raise ValidationError(f"Error connecting to MySQL") | |
|             cursor = connection.cursor(dictionary=True) | |
|             # Execute your MySQL query | |
|             cursor.execute(query) | |
|             # Fetch the results and store them in a variable | |
|             results = cursor.fetchall() | |
|             # Set the flag after successfully fetching the data | |
|             self.is_fetched = True | |
|             # Close cursor and connection | |
|             cursor.close() | |
|             connection.close() | |
|             return results | |
|         except mysql.connector.Error as e: | |
|             # Handle any connection errors | |
|             raise ValidationError(f"Error connecting to MySQL: {e}")
 | |
| 
 |