Blob Blame Raw
#!/usr/bin/env python3
# coding=utf-8
#
# run.py: Run cupola
# Copyright (C) 2015-2017 Sam Black <samwwwblack@lapwing.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
import sys
import click
from datetime import datetime

from flask import current_app
from flask_babelex import gettext
from flask_migrate import upgrade as db_upgrade
from flask_security.utils import hash_password
from sqlalchemy import exc
from sqlalchemy_utils.functions import create_database
from sqlalchemy_utils.functions import database_exists
from wtforms.validators import Email
from wtforms.validators import ValidationError

from cupola.models import db
from cupola.models.users import User
from cupola.models.users import user_datastore
from cupola.web import create_app

app = create_app()


def init_db():
    """
    Create the database and populate the required data.
    """
    db_url = current_app.config["SQLALCHEMY_DATABASE_URI"]

    try:
        if database_exists(db_url):
            if "user" in db.metadata.tables.keys():
                click.echo(gettext(
                    "Database already exists, check your installation."))
                sys.exit(1)
        else:
            create_database(db_url)
    except exc.OperationalError:
        create_database(db_url)

    db_upgrade()
    # User roles
    # TODO: This list should probably be in the models file
    user_datastore.create_role(name="Admin", description="Site Administrator")
    user_datastore.create_role(name="Member", description="Member")

    db.session.commit()


class _DummyEmailField(object):
    """
    This is a dummy WTForms Field class to use with the validator.
    """
    def __init__(self, data):
        self.data = data

    def gettext(self, string):
        return string


class AddUserError(Exception):
    """
    Raise if adding a new user fails.
    """
    pass


def add_user(email, password, roles, admin_first_run=False):
    """
    Add a new user.

    :param email: email of the new user
    :type email: str
    :param password: password of the new user
    :type password: str
    :param roles: site roles the new user should have
    :type roles: list or tuple
    :param admin_first_run: if first run,
        automatically set the user "confirmed_at" field
    :type admin_first_run: bool
    """
    email_test = _DummyEmailField(email)
    email_validator = Email()
    try:
        email_validator(None, email_test)
    except ValidationError:
        click.echo(gettext("Email is invalid"))
        sys.exit(2)

    if User.query.filter_by(email=email).first():
        click.echo(gettext("User already exists"))
        sys.exit(2)

    fullname = email.split("@")[0]
    if admin_first_run:
        user_datastore.create_user(email=email,
                                   password=hash_password(password),
                                   fullname=fullname,
                                   confirmed_at=datetime.now())
    else:
        user_datastore.create_user(email=email,
                                   password=hash_password(password),
                                   fullname=fullname)
    try:
        db.session.commit()
    except exc.IntegrityError:
        db.session.rollback()
        raise AddUserError("User already exists")

    for role in roles:
        role_obj = user_datastore.find_role(role)
        if role_obj:
            user_datastore.add_role_to_user(email, role_obj)

    try:
        db.session.commit()
    except exc.IntegrityError:
        db.session.rollback()
        raise AddUserError("User roles not added correctly")


@app.cli.command()
def create_user():
    """
    Create a new user.
    """
    email = None
    while not email:
        email_tmp = click.prompt(gettext("Email"))

        email_test = _DummyEmailField(email_tmp)
        email_validator = Email()
        try:
            email_validator(None, email_test)
        except ValidationError:
            click.echo(gettext("Email is invalid"))
        else:
            if User.query.filter_by(email=email_tmp).first():
                click.echo(gettext("User already exists"))
            else:
                email = email_tmp

    password = None
    while not password:
        password = click.prompt(gettext("Password"), hide_input=True,
                                confirmation_prompt=True)

    admin = click.confirm(gettext("Administrator?"))

    roles = ("Admin",) if admin else ()

    add_user(email, password, roles)

    click.echo(gettext("User added"))


@app.cli.command()
def first_run():
    """
    Init the database and
    query the user for an administrator email/password.
    """
    init_db()

    email = None
    while not email:
        email_tmp = click.prompt(gettext("Administrator email"))

        email_test = _DummyEmailField(email_tmp)
        email_validator = Email()
        try:
            email_validator(None, email_test)
        except ValidationError:
            click.echo(gettext("Email is invalid"))
        else:
            if User.query.filter_by(email=email_tmp).first():
                click.echo(gettext("User already exists"))
            else:
                email = email_tmp

    password = None
    while not password:
        password = click.prompt(gettext("Administrator Password"),
                                hide_input=True, confirmation_prompt=True)

    add_user(email, password, ("Admin",), True)

    click.echo(gettext("Setup complete"))