Source code for ib3.auth

#
# This file is part of IRC Bot Behavior Bundle (IB3)
# Copyright (C) 2017 Bryan Davis and contributors
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
import logging

import irc.events

from .mixins import JoinChannels

logger = logging.getLogger(__name__)


[docs]class AbstractAuth(JoinChannels): """Base class for authentication mixins."""
[docs] def __init__( self, server_list, nickname, realname, ident_password, channels=None, username=None, **kwargs, ): """ :param server_list: List of servers the bot will use. :param nickname: The bot's nickname :param realname: The bot's realname :param ident_password: The bot's password :param channels: List of channels to join after authenticating :param username: IRC username (default: nickname) """ self._primary_nick = nickname self._username = username or nickname self._ident_password = ident_password self._channels = channels or [] super().__init__( server_list=server_list, nickname=nickname, realname=realname, username=self._username, **kwargs, )
[docs]class NickServ(AbstractAuth): """Authenticate with NickServ before joining channels."""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) for event in ["welcome", "privnotice"]: self.connection.add_global_handler( event, getattr(self, f"_handle_{event}"), )
[docs] def _handle_welcome(self, conn, event): # noqa: U100 Unused argument """Handle WELCOME message. Starts authentication handshake by sending NickServ an ``identify`` message. """ logger.info("Connected to server %s", conn.get_server_name()) self._identify_to_nickserv()
[docs] def _handle_privnotice(self, conn, event): # noqa: U100 Unused argument """Handle NOTICE sent directly to user. Check for messages from NickServ requesting auth, warning of password failures, and acknowledging successful auth. """ msg = event.arguments[0] if event.source.nick == "NickServ": if "NickServ identify" in msg: logger.info("Authentication requested by Nickserv: %s", msg) self._identify_to_nickserv() elif "You are now identified" in msg: logger.debug("Authentication succeeded") self.join_channels(self._channels) elif "Invalid password" in msg: logger.error("Password invalid. Check your config!") self.die()
[docs] def _identify_to_nickserv(self): """Send NickServ our username and password.""" logger.info("Authenticating to NickServ") self.connection.privmsg( "NickServ", "identify {} {}".format( self._primary_nick, self._ident_password, ), )
[docs]class SASL(AbstractAuth): """Authenticate using SASL before joining channels."""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) listen = { "cap": "cap", "authenticate": "authenticate", "saslsuccess": irc.events.numeric.get("903", "903"), "saslmechs": irc.events.numeric.get("908", "908"), "welcome": "welcome", } self.reactor._on_connect = self._handle_connect for handler, event in listen.items(): logger.debug("Registering for %s", event) self.connection.add_global_handler( event, getattr(self, f"_handle_{handler}"), )
[docs] def _handle_connect(self, sock): # noqa: U100 Unused argument """Send CAP REQ :sasl on connect.""" self.connection.cap("REQ", "sasl")
[docs] def _handle_cap(self, conn, event): """Handle CAP responses.""" if event.arguments and event.arguments[0] == "ACK": conn.send_raw("AUTHENTICATE PLAIN") else: logger.warning("Unexpected CAP response: %s", event) conn.disconnect()
[docs] def _handle_authenticate(self, conn, event): """Handle AUTHENTICATE responses.""" if event.target == "+": creds = "{username}\0{username}\0{password}".format( username=self._username, password=self._ident_password, ) conn.send_raw( "AUTHENTICATE {}".format( base64.b64encode(creds.encode("utf8")).decode("utf8"), ), ) else: logger.warning("Unexpected AUTHENTICATE response: %s", event) conn.disconnect()
[docs] def _handle_saslsuccess(self, conn, event): # noqa: U100 Unused argument """Handle 903 RPL_SASLSUCCESS responses.""" self.connection.cap("END")
[docs] def _handle_saslmechs(self, conn, event): # noqa: U100 Unused argument """Handle 908 RPL_SASLMECHS responses.""" logger.warning("SASL PLAIN not supported: %s", event) self.die()
[docs] def _handle_welcome(self, conn, event): # noqa: U100 Unused argument """Handle WELCOME message.""" logger.info("Connected to server %s", conn.get_server_name()) self.join_channels(self._channels)