Source code for ib3.mixins

#
# This file is part of IRC Bot Behavior Bundle (IB3)
# Copyright (C) 2017 Bryan Davis and contributors
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
# more details.
#
# You should have received a copy of the GNU General Public License along with
# this program.  If not, see <http://www.gnu.org/licenses/>.

import functools
import logging

import irc.client

logger = logging.getLogger(__name__)


[docs]class PingServer: """Add checks for connection liveness using PING commands."""
[docs] def __init__(self, max_pings=2, ping_interval=300, *args, **kwargs): """ :param max_pings: Maximum numer of missed pings to tolerate :param ping_interval: Seconds between ping attempts """ super().__init__(*args, **kwargs) self._unanswered_pings = 0 self._unanswered_pings_limit = max_pings self.reactor.scheduler.execute_every( period=ping_interval, func=self._ping_server, ) self.connection.add_global_handler("pong", self._handle_pong)
[docs] def _ping_server(self): """Send a ping or disconnect if too many pings are outstanding.""" if self._unanswered_pings >= self._unanswered_pings_limit: logger.warning("Connection timed out. Disconnecting.") self.disconnect() self._unanswered_pings = 0 else: try: self.connection.ping("keep-alive") self._unanswered_pings += 1 except irc.client.ServerNotConnectedError: pass
[docs] def _handle_pong(self, conn, event): # noqa: U100 Unused argument """Clear ping count when a pong is received.""" self._unanswered_pings = 0
[docs]class DisconnectOnError: """Handle ERROR message by logging and disconnecting."""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.connection.add_global_handler("error", self._handle_error)
def _handle_error(self, conn, event): logger.warning(str(event)) conn.disconnect()
[docs]class RejoinOnKick: """Handle KICK by attempting to rejoin channel."""
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.connection.add_global_handler("kick", self._handle_kick)
def _handle_kick(self, conn, event): nick = event.arguments[0] channel = event.target if nick == conn.get_nickname(): logger.warn( "Kicked from %s by %s", channel, event.source.nick, ) self.reactor.scheduler.execute_after( 30, functools.partial(conn.join, channel), )
[docs]class RejoinOnBan: """Handle ERR_BANNEDFROMCHAN by attempting to rejoin channel."""
[docs] def __init__(self, *args, **kwargs): super(RejoinOnKick, self).__init__(*args, **kwargs) self.connection.add_global_handler( "bannedfromchan", self._handle_bannedfromchan, )
def _handle_bannedfromchan(self, conn, event): logger.warning(str(event)) self.reactor.scheduler.execute_after( 60, functools.partial(conn.join, event.arguments[0]), )
[docs]class JoinChannels: """Join channels one at a time to avoid flooding."""
[docs] def join_channels(self, channels): """Join a list of channels, one at a time.""" try: car, cdr = channels[0], channels[1:] except (IndexError, TypeError): logger.exception("Failed to find channel to join.") else: logger.info("Joining %s", car) self.connection.join(car) if cdr: self.reactor.scheduler.execute_after( 1, functools.partial(self.join_channels, cdr), )