odoo 防暴力破解_on_login_cooldown

网友投稿 1004 2022-10-11 11:00:17

odoo 防暴力破解_on_login_cooldown

源码:

如果用户多次登录错误, 则登录IP会被封一段时间

def _on_login_cooldown(self, failures, previous): """ Decides whether the user trying to log in is currently "on cooldown" and not even allowed to attempt logging in. The default cooldown function simply puts the user on cooldown for seconds after each failure following the th (0 to disable). Can be overridden to implement more complex backoff strategies, or e.g. wind down or reset the cooldown period as the previous failure recedes into the far past. :param int failures: number of recorded failures (since last success) :param previous: timestamp of previous failure :type previous: datetime.datetime :returns: whether the user is currently in cooldown phase (true if cooldown, false if no cooldown and login can continue) :rtype: bool """ cfg = self.env['ir.config_parameter'].sudo() min_failures = int(cfg.get_param('base.login_cooldown_after', 5)) if min_failures == 0: return False delay = int(cfg.get_param('base.login_cooldown_duration', 60)) return failures >= min_failures and (datetime.datetime.now() - previous) < datetime.timedelta(seconds=delay) @contextlib.contextmanager def _assert_can_auth(self): """ Checks that the current environment even allows the current auth request to happen. The baseline implementation is a simple linear login cooldown: after a number of failures trying to log-in, the user (by login) is put on cooldown. During the cooldown period, login *attempts* are ignored and logged. .. warning:: The login counter is not shared between workers and not specifically thread-safe, the feature exists mostly for rate-limiting on large number of login attempts (brute-forcing passwords) so that should not be much of an issue. For a more complex strategy (e.g. database or distribute storage) override this method. To simply change the cooldown criteria (configuration, ...) override _on_login_cooldown instead. .. note:: This is a *context manager* so it can be called around the login procedure without having to call it itself. """ # needs request for remote address if not request: yield return reg = self.env.registry failures_map = getattr(reg, '_login_failures', None) if failures_map is None: failures_map = reg._login_failures = collections.defaultdict(lambda : (0, datetime.datetime.min)) source = request. (failures, previous) = failures_map[source] if self._on_login_cooldown(failures, previous): _logger.warn( "Login attempt ignored for %s on %s: " "%d failures since last success, last failure at %s. " "You can configure the number of login failures before a " "user is put on cooldown as well as the duration in the " "System Parameters. Disable this feature by setting " "\"base.login_cooldown_after\" to 0.", source, self.env.cr.dbname, failures, previous) if ipaddress.ip_address(source).is_private: _logger.warn( "The rate-limited IP address %s is classified as private " "and *might* be a proxy. If your Odoo is behind a proxy, " "it may be mis-configured. Check that you are running " "Odoo in Proxy Mode and that the proxy is properly configured, see " "for details.", source ) raise AccessDenied(_("Too many login failures, please wait a bit before trying again.")) try: yield except AccessDenied: (failures, __) = reg._login_failures[source] reg._login_failures[source] = (failures + 1, datetime.datetime.now()) raise else: reg._login_failures.pop(source, None)

数据库管理界面需要改源码并配合fail2ban

@type='auth="none", methods=['POST'], csrf=False) def create(self, master_pwd, name, lang, password, **post): try: if not re.match(DBNAME_PATTERN, name): raise Exception(_('Invalid database name. Only alphanumerical characters, underscore, hyphen and dot are allowed.')) # country code could be = "False" which is actually True in python country_code = post.get('country_code') or False dispatch_rpc('db', 'create_database', [master_pwd, name, bool(post.get('demo')), lang, password, post['login'], country_code, post['phone']]) request.session.authenticate(name, post['login'], password) return except Exception as e: error = "Database creation error: %s" % (str(e) or repr(e)) return self._render_template(error=error) @type='auth="none", methods=['POST'], csrf=False) def duplicate(self, master_pwd, name, new_name): try: if not re.match(DBNAME_PATTERN, new_name): raise Exception(_('Invalid database name. Only alphanumerical characters, underscore, hyphen and dot are allowed.')) dispatch_rpc('db', 'duplicate_database', [master_pwd, name, new_name]) return except Exception as e: error = "Database duplication error: %s" % (str(e) or repr(e)) return self._render_template(error=error) @type='auth="none", methods=['POST'], csrf=False) def drop(self, master_pwd, name): try: dispatch_rpc('db','drop', [master_pwd, name]) request._cr = None # dropping a database leads to an unusable cursor return except Exception as e: error = "Database deletion error: %s" % (str(e) or repr(e)) return self._render_template(error=error) @type='auth="none", methods=['POST'], csrf=False) def backup(self, master_pwd, name, backup_format = 'zip'): try: odoo.service.db.check_super(master_pwd) ts = datetime.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S") filename = "%s_%s.%s" % (name, ts, backup_format) headers = [ ('Content-Type', 'application/octet-stream; charset=binary'), ('Content-Disposition', content_disposition(filename)), ] dump_stream = odoo.service.db.dump_db(name, None, backup_format) response = werkzeug.wrappers.Response(dump_stream, headers=headers, direct_passthrough=True) return response except Exception as e: _logger.exception('Database.backup') error = "Database backup error: %s" % (str(e) or repr(e)) return self._render_template(error=error) @type='auth="none", methods=['POST'], csrf=False) def restore(self, master_pwd, backup_file, name, copy=False): try: data_file = None db.check_super(master_pwd) with tempfile.NamedTemporaryFile(delete=False) as data_file: backup_file.save(data_file) db.restore_db(name, data_file.name, str2bool(copy)) return except Exception as e: error = "Database restore error: %s" % (str(e) or repr(e)) return self._render_template(error=error) finally: if data_file: os.unlink(data_file.name) @type='auth="none", methods=['POST'], csrf=False) def change_password(self, master_pwd, master_pwd_new): try: dispatch_rpc('db', 'change_admin_password', [master_pwd, master_pwd_new]) return except Exception as e: error = "Master password update error: %s" % (str(e) or repr(e)) return self._render_template(error=error) ```

懂得,原来世界如此简单!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:章鱼小程序开发总结及经验分享
下一篇:elasticsearch通过guice注入Node组装启动过程
相关文章