You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							161 lines
						
					
					
						
							6.9 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							161 lines
						
					
					
						
							6.9 KiB
						
					
					
				
								"""Microsoft login"""
							 | 
						|
								# -*- coding: utf-8 -*-
							 | 
						|
								#############################################################################
							 | 
						|
								#
							 | 
						|
								#    Cybrosys Technologies Pvt. Ltd.
							 | 
						|
								#
							 | 
						|
								#    Copyright (C) 2023-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
							 | 
						|
								#    Author: Cybrosys Techno Solutions(<https://www.cybrosys.com>)
							 | 
						|
								#
							 | 
						|
								#    You can modify it under the terms of the GNU LESSER
							 | 
						|
								#    GENERAL PUBLIC LICENSE (LGPL v3), Version 3.
							 | 
						|
								#
							 | 
						|
								#    This program is distributed in the hope that it will be useful,
							 | 
						|
								#    but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						|
								#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						|
								#    GNU LESSER GENERAL PUBLIC LICENSE (LGPL v3) for more details.
							 | 
						|
								#
							 | 
						|
								#    You should have received a copy of the GNU LESSER GENERAL PUBLIC LICENSE
							 | 
						|
								#    (LGPL v3) along with this program.
							 | 
						|
								#    If not, see <http://www.gnu.org/licenses/>.
							 | 
						|
								#
							 | 
						|
								#############################################################################
							 | 
						|
								import json, logging, requests
							 | 
						|
								from odoo.http import request
							 | 
						|
								from odoo import api, models
							 | 
						|
								from odoo import exceptions
							 | 
						|
								from odoo.addons import base
							 | 
						|
								from odoo.addons.auth_signup.models.res_users import SignupError
							 | 
						|
								
							 | 
						|
								base.models.res_users.USER_PRIVATE_FIELDS.append('oauth_access_token')
							 | 
						|
								_logger = logging.getLogger(__name__)
							 | 
						|
								
							 | 
						|
								try:
							 | 
						|
								    import jwt
							 | 
						|
								except ImportError:
							 | 
						|
								    _logger.warning(
							 | 
						|
								        "Login with Microsoft account won't be available.Please install PyJWT "
							 | 
						|
								        "python library, ")
							 | 
						|
								    jwt = None
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								class ResUsers(models.Model):
							 | 
						|
								    """This class is used to inheriting the res.users and provides the oauth
							 | 
						|
								    access"""
							 | 
						|
								    _inherit = 'res.users'
							 | 
						|
								
							 | 
						|
								    @api.model
							 | 
						|
								    def _auth_oauth_rpc(self, endpoint, access_token):
							 | 
						|
								        """This is used to pass the response of sign in."""
							 | 
						|
								        res = super()._auth_oauth_rpc(endpoint, access_token)
							 | 
						|
								        if endpoint:
							 | 
						|
								            return requests.get(endpoint,
							 | 
						|
								                                params={'access_token': access_token}).json()
							 | 
						|
								        return res
							 | 
						|
								
							 | 
						|
								    @api.model
							 | 
						|
								    def _auth_oauth_code_validate(self, provider, code):
							 | 
						|
								        """ Return the validation data corresponding to the access token """
							 | 
						|
								        auth_oauth_provider = self.env['auth.oauth.provider'].browse(provider)
							 | 
						|
								        req_params = dict(
							 | 
						|
								            client_id=auth_oauth_provider.client_id,
							 | 
						|
								            client_secret=auth_oauth_provider.client_secret_id,
							 | 
						|
								            grant_type='authorization_code',
							 | 
						|
								            code=code,
							 | 
						|
								            redirect_uri=request.httprequest.url_root + 'auth_oauth/signin',
							 | 
						|
								        )
							 | 
						|
								        headers = {'Accept': 'application/json'}
							 | 
						|
								        token_info = requests.post(auth_oauth_provider.validation_endpoint,
							 | 
						|
								                                   headers=headers, data=req_params).json()
							 | 
						|
								        if token_info.get("error"):
							 | 
						|
								            raise Exception(token_info['error'])
							 | 
						|
								        access_token = token_info.get('access_token')
							 | 
						|
								        validation = {
							 | 
						|
								            'access_token': access_token
							 | 
						|
								        }
							 | 
						|
								        if token_info.get('id_token'):
							 | 
						|
								            if not jwt:
							 | 
						|
								                raise exceptions.AccessDenied()
							 | 
						|
								            data = jwt.decode(token_info['id_token'], verify=False)
							 | 
						|
								        else:
							 | 
						|
								            data = self._auth_oauth_rpc(auth_oauth_provider.data_endpoint,
							 | 
						|
								                                        access_token)
							 | 
						|
								        validation.update(data)
							 | 
						|
								        return validation
							 | 
						|
								
							 | 
						|
								    @api.model
							 | 
						|
								    def _auth_oauth_signin(self, provider, validation, params):
							 | 
						|
								        """ Retrieve and sign in the user corresponding to provider and validated access token
							 | 
						|
								                    :param provider: oauth provider id (int)
							 | 
						|
								                    :param validation: result of validation of access token (dict)
							 | 
						|
								                    :param params: oauth parameters (dict)
							 | 
						|
								                    :return: user login (str)
							 | 
						|
								                    :raise: AccessDenied if signin failed
							 | 
						|
								
							 | 
						|
								                    This method can be overridden to add alternative signin methods.
							 | 
						|
								                """
							 | 
						|
								        user = self.search([('login', '=', str(validation.get('email')))])
							 | 
						|
								        if not user:
							 | 
						|
								            user = self.create({
							 | 
						|
								                'login': str(validation.get('email')),
							 | 
						|
								                'name': str(validation.get('name'))
							 | 
						|
								            })
							 | 
						|
								            provider_id = self.env['auth.oauth.provider'].sudo().browse(
							 | 
						|
								                provider)
							 | 
						|
								            if provider_id.template_user_id:
							 | 
						|
								                user.is_contractor = provider_id.template_user_id.is_contractor
							 | 
						|
								                user.contractor = provider_id.template_user_id.contractor
							 | 
						|
								                user.groups_id = [
							 | 
						|
								                    (6, 0, provider_id.template_user_id.groups_id.ids)]
							 | 
						|
								        user.write({
							 | 
						|
								            'oauth_provider_id': provider,
							 | 
						|
								            'oauth_uid': validation['user_id'],
							 | 
						|
								            'oauth_access_token': params['access_token'],
							 | 
						|
								        })
							 | 
						|
								        oauth_uid = validation['user_id']
							 | 
						|
								        try:
							 | 
						|
								            oauth_user = self.search([("oauth_uid", "=", oauth_uid),
							 | 
						|
								                                      ('oauth_provider_id', '=', provider)])
							 | 
						|
								            if not oauth_user:
							 | 
						|
								                raise exceptions.AccessDenied()
							 | 
						|
								            assert len(oauth_user) == 1
							 | 
						|
								            oauth_user.write({'oauth_access_token': params['access_token']})
							 | 
						|
								            return oauth_user.login
							 | 
						|
								        except (exceptions.AccessDenied, exceptions.access_denied_exception):
							 | 
						|
								            if self.env.context.get('no_user_creation'):
							 | 
						|
								                return None
							 | 
						|
								            state = json.loads(params['state'])
							 | 
						|
								            token = state.get('t')
							 | 
						|
								            values = self._generate_signup_values(provider, validation, params)
							 | 
						|
								            try:
							 | 
						|
								                _, login, _ = self.signup(values, token)
							 | 
						|
								                return login
							 | 
						|
								            except SignupError:
							 | 
						|
								                raise exceptions.access_denied_exception
							 | 
						|
								        return super()._auth_oauth_signin(provider, validation,
							 | 
						|
								                                                        params)
							 | 
						|
								
							 | 
						|
								    @api.model
							 | 
						|
								    def auth_oauth(self, provider, params):
							 | 
						|
								        """This is used to take the access token to sign in with the user account."""
							 | 
						|
								        if params.get('code'):
							 | 
						|
								            validation = self._auth_oauth_code_validate(provider,
							 | 
						|
								                                                        params['code'])
							 | 
						|
								            access_token = validation.pop('access_token')
							 | 
						|
								            params['access_token'] = access_token
							 | 
						|
								        else:
							 | 
						|
								            access_token = params.get('access_token')
							 | 
						|
								            validation = self._auth_oauth_validate(provider, access_token)
							 | 
						|
								        if not validation.get('user_id'):
							 | 
						|
								            if validation.get('id'):
							 | 
						|
								                validation['user_id'] = validation['id']
							 | 
						|
								            elif validation.get('oid'):
							 | 
						|
								                validation['user_id'] = validation['oid']
							 | 
						|
								            else:
							 | 
						|
								                raise exceptions.AccessDenied()
							 | 
						|
								        login = self._auth_oauth_signin(provider, validation, params)
							 | 
						|
								        if not login:
							 | 
						|
								            raise exceptions.AccessDenied()
							 | 
						|
								        if provider and params:
							 | 
						|
								            return (self.env.cr.dbname, login, access_token)
							 | 
						|
								        return super(ResUsers, self).auth_oauth(provider, params)
							 | 
						|
								
							 |