Source code for invenio_sip2.ext

# -*- coding: utf-8 -*-
#
# INVENIO-SIP2
# Copyright (C) 2020 UCLouvain
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Flask extension for Invenio-SIP2."""

from __future__ import absolute_import, print_function

import logging
from copy import deepcopy
from datetime import datetime, timezone
from logging.handlers import RotatingFileHandler

from flask import current_app
from invenio_base.utils import obj_or_import_string
from werkzeug.utils import cached_property

from . import config, handlers
from .actions.actions import Action
from .errors import SelfCheckActionError
from .helpers import MessageTypeFixedField, MessageTypeVariableField
from .utils import convert_bool_to_char

logger = logging.getLogger('invenio-sip2')


[docs]def load_fixed_field(app): """Load fixed field configuration.""" for name, field in app.config["SIP2_FIXED_FIELD_DEFINITION"].items(): setattr(MessageTypeFixedField, name, MessageTypeFixedField( name=name, field=field ))
[docs]def load_variable_field(app): """Load variable field configuration.""" for name, field \ in app.config["SIP2_VARIABLE_FIELD_DEFINITION"].items(): setattr(MessageTypeVariableField, name, MessageTypeVariableField( name=name, field=field ))
[docs]class InvenioSIP2(object): """Invenio-SIP2 extension.""" def __init__(self, app=None): """Extension initialization.""" self.datastore = None if app: self.init_app(app)
[docs] def init_app(self, app): """Flask application initialization.""" # TODO: refactoring app init self.init_config(app) self._state = _Sip2State(app) # Set SIP2 datastore if not self.datastore: datastore_factory = obj_or_import_string( app.config['SIP2_DATASTORE_HANDLER']) self.datastore = datastore_factory(app) # Initialize logging if app.config['SIP2_LOGGING_CONSOLE']: self.add_console_handler(app) if app.config['SIP2_LOGGING_FS_LOGFILE']: self.add_fs_handler(app) app.extensions['invenio-sip2'] = self self.app = app
[docs] def add_console_handler(self, app): """Add console handler to logger.""" handler = logging.StreamHandler() handler.setFormatter(self.get_logging_formatter()) self._add_logger_handler(handler, app.config['SIP2_LOGGING_CONSOLE_LEVEL'])
[docs] def add_fs_handler(self, app): """Add file handler to logger.""" handler = RotatingFileHandler( app.config['SIP2_LOGGING_FS_LOGFILE'], backupCount=app.config['SIP2_LOGGING_FS_BACKUPCOUNT'], maxBytes=app.config['SIP2_LOGGING_FS_MAXBYTES'], delay=True, ) handler.setFormatter(self.get_logging_formatter()) self._add_logger_handler(handler, app.config['SIP2_LOGGING_FS_LEVEL'])
[docs] def init_config(self, app): """Initialize configuration.""" # Use theme's base template if theme is installed app.config.setdefault( 'SIP2_BASE_TEMPLATE', app.config.get('BASE_TEMPLATE', 'invenio_sip2/base.html') ) for k in dir(config): if k.startswith('SIP2_'): app.config.setdefault(k, getattr(config, k)) load_fixed_field(app) load_variable_field(app)
def _add_logger_handler(self, handler, level): """Add handler to logger.""" for h in logger.handlers: if isinstance(h, handler.__class__): return logger.setLevel(level) logger.addHandler(handler)
[docs] @classmethod def get_logging_formatter(cls): """Return logging formatter.""" return logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
[docs] @cached_property def sip2(self): """Return the SIP2 action machine.""" return _SIP2( action_config=deepcopy( current_app.config["SIP2_MESSAGE_ACTIONS"] ) )
# TODO: reorganize extension implementation
[docs] @cached_property def sip2_handlers(self): """Return the SIP2 handler machine.""" return self._state
[docs] @cached_property def sip2_message_types(self): """Message type configuration.""" return _Sip2MessageType( message_type_config=deepcopy( current_app.config['SIP2_MESSAGE_TYPES'] ) )
@property def sip2_current_date(self): """Get current date from system.""" return datetime.now(timezone.utc).strftime( current_app.config['SIP2_DATE_FORMAT'] )
[docs] @cached_property def supported_protocol(self): """Supported protocol by the automated circulation system.""" return current_app.config['SIP2_PROTOCOL']
[docs] @cached_property def support_checkin(self): """Support of checkin by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_CHECKIN'] )
[docs] @cached_property def support_checkout(self): """Support of checkout by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_CHECKOUT'] )
[docs] @cached_property def support_online_status(self): """Support of online status by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_ONLINE_STATUS'] )
[docs] @cached_property def support_offline_status(self): """Support of offline status by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_OFFLINE_STATUS'] )
[docs] @cached_property def support_status_update(self): """Support of status update by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_STATUS_UPDATE'] )
[docs] @cached_property def support_renewal_policy(self): """Support of renewal policy by the automated circulation system.""" return convert_bool_to_char( current_app.config['SIP2_SUPPORT_RENEWAL_POLICY'] )
[docs] @cached_property def timeout_period(self): """Timeout period allowed by the automated circulation system.""" return current_app.config['SIP2_TIMEOUT_PERIOD']
[docs] @cached_property def retries_allowed(self): """Number of retries allowed by the automated circulation system.""" return current_app.config['SIP2_RETRIES_ALLOWED']
[docs] @cached_property def supported_messages(self): """Supported messages by the automated circulation system.""" # TODO: return supported message type from config return 'YYYYYYYYYYYYYYYY'
class _SIP2(object): """SIP2 action machine.""" def __init__(self, action_config): """Constructor.""" self.actions = {} for src_command, action in action_config.items(): self.actions.setdefault(src_command, []) _cls = action.pop('action', Action) instance = _cls(**dict(action, command=src_command)) self.actions[src_command] = instance def execute(self, msg, **kwargs): """Execute action on message.""" try: action = self.actions[msg.command] return action.execute(msg, **kwargs) except SelfCheckActionError: pass class _Sip2MessageType(object): message_types = {} def __init__(self, message_type_config): """Constructor.""" for command, message_type in message_type_config.items(): self.message_types[command] = _MessageType(command, **message_type) def get_by_command(self, command): command = self.message_types.get(command) if command: return command raise NotImplementedError class _MessageType(object): def __init__(self, command, **kwargs): self.command = command self.required_fields = [] self.optional_fields = [] self.fixed_fields = [] self.variable_fields = [] self.label = kwargs.pop('label'), required_fields = kwargs.pop('required_fields', []) fixed_fields = kwargs.pop('fixed_fields', []) variable_fields = kwargs.pop('variable_fields', []) for required_field in required_fields: if required_field in fixed_fields: field = MessageTypeFixedField.get(required_field) self.fixed_fields.append(field) else: field = MessageTypeVariableField.get(required_field) self.required_fields.append(field) for variable_field in variable_fields: field = MessageTypeVariableField.get(variable_field) self.variable_fields.append(field) if variable_field not in required_fields: self.optional_fields.append(field) for key, value in kwargs.items(): setattr(self, key, value) class _Sip2State(object): """SIP2 state storing registered action handlers.""" def __init__(self, app): """Initialize state.""" self.app = app self.handlers = {} self.login_handler = {} self.system_status_handler = {} self.patron_handlers = {} self.item_handlers = {} self.circulation_handlers = {} # TODO: configure automatically which command is supported by ACS # register api handlers for remote, conf in app.config['SIP2_REMOTE_ACTION_HANDLERS'].items(): # register login handler self.login_handler[remote] = handlers.make_api_handler( conf.get('login_handler'), with_data=True ) self.system_status_handler[remote] = handlers.make_api_handler( conf.get('system_status_handler'), with_data=False ) # register patron handlers patron_handlers = conf.get('patron_handlers', dict()) validate_patron_handler = handlers.make_api_handler( patron_handlers.get('validate_patron'), with_data=True ) authorize_patron_handler = handlers.make_api_handler( patron_handlers.get('authorize_patron'), with_data=True ) enable_patron_handler = handlers.make_api_handler( patron_handlers.get('enable_patron'), with_data=True ) account_handler = handlers.make_api_handler( patron_handlers.get('account'), with_data=True ) patron_status_handler = handlers.make_api_handler( patron_handlers.get('patron_status'), with_data=True ) self.patron_handlers[remote] = dict( validate=validate_patron_handler, authorize=authorize_patron_handler, enable=enable_patron_handler, account=account_handler, patron_status=patron_status_handler, ) # register item handlers item_handlers = conf.get('item_handlers', dict()) item_handler = handlers.make_api_handler( item_handlers.get('item'), with_data=True ) self.item_handlers[remote] = dict( item=item_handler, ) # register circulation handlers circulation_handlers = conf.get('circulation_handlers', dict()) checkout_handler = handlers.make_api_handler( circulation_handlers.get('checkout'), with_data=True ) checkin_handler = handlers.make_api_handler( circulation_handlers.get('checkin'), with_data=True ) hold_handler = handlers.make_api_handler( circulation_handlers.get('hold'), with_data=True ) renew_handler = handlers.make_api_handler( circulation_handlers.get('renew'), with_data=True ) self.circulation_handlers[remote] = dict( checkout=checkout_handler, checkin=checkin_handler, hold=hold_handler, renew=renew_handler, )