You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
5.9 KiB
206 lines
5.9 KiB
#!/usr/bin/env python3
|
|
# coding=utf-8
|
|
#
|
|
# run.py: Run cupola
|
|
# Copyright (C) 2015-2017 Sam Black <samwwwblack@lapwing.org>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
import sys
|
|
import click
|
|
from datetime import datetime
|
|
|
|
from flask import current_app
|
|
from flask_babelex import gettext
|
|
from flask_migrate import upgrade as db_upgrade
|
|
from flask_security.utils import hash_password
|
|
from sqlalchemy import exc
|
|
from sqlalchemy_utils.functions import create_database
|
|
from sqlalchemy_utils.functions import database_exists
|
|
from wtforms.validators import Email
|
|
from wtforms.validators import ValidationError
|
|
|
|
from cupola.models import db
|
|
from cupola.models.users import User
|
|
from cupola.models.users import user_datastore
|
|
from cupola.web import create_app
|
|
|
|
app = create_app()
|
|
|
|
|
|
def init_db():
|
|
"""
|
|
Create the database and populate the required data.
|
|
"""
|
|
db_url = current_app.config["SQLALCHEMY_DATABASE_URI"]
|
|
|
|
try:
|
|
if database_exists(db_url):
|
|
if "user" in db.metadata.tables.keys():
|
|
click.echo(gettext(
|
|
"Database already exists, check your installation."))
|
|
sys.exit(1)
|
|
else:
|
|
create_database(db_url)
|
|
except exc.OperationalError:
|
|
create_database(db_url)
|
|
|
|
db_upgrade()
|
|
# User roles
|
|
# TODO: This list should probably be in the models file
|
|
user_datastore.create_role(name="Admin", description="Site Administrator")
|
|
user_datastore.create_role(name="Member", description="Member")
|
|
|
|
db.session.commit()
|
|
|
|
|
|
class _DummyEmailField(object):
|
|
"""
|
|
This is a dummy WTForms Field class to use with the validator.
|
|
"""
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def gettext(self, string):
|
|
return string
|
|
|
|
|
|
class AddUserError(Exception):
|
|
"""
|
|
Raise if adding a new user fails.
|
|
"""
|
|
pass
|
|
|
|
|
|
def add_user(email, password, roles, admin_first_run=False):
|
|
"""
|
|
Add a new user.
|
|
|
|
:param email: email of the new user
|
|
:type email: str
|
|
:param password: password of the new user
|
|
:type password: str
|
|
:param roles: site roles the new user should have
|
|
:type roles: list or tuple
|
|
:param admin_first_run: if first run,
|
|
automatically set the user "confirmed_at" field
|
|
:type admin_first_run: bool
|
|
"""
|
|
email_test = _DummyEmailField(email)
|
|
email_validator = Email()
|
|
try:
|
|
email_validator(None, email_test)
|
|
except ValidationError:
|
|
click.echo(gettext("Email is invalid"))
|
|
sys.exit(2)
|
|
|
|
if User.query.filter_by(email=email).first():
|
|
click.echo(gettext("User already exists"))
|
|
sys.exit(2)
|
|
|
|
fullname = email.split("@")[0]
|
|
if admin_first_run:
|
|
user_datastore.create_user(email=email,
|
|
password=hash_password(password),
|
|
fullname=fullname,
|
|
confirmed_at=datetime.now())
|
|
else:
|
|
user_datastore.create_user(email=email,
|
|
password=hash_password(password),
|
|
fullname=fullname)
|
|
try:
|
|
db.session.commit()
|
|
except exc.IntegrityError:
|
|
db.session.rollback()
|
|
raise AddUserError("User already exists")
|
|
|
|
for role in roles:
|
|
role_obj = user_datastore.find_role(role)
|
|
if role_obj:
|
|
user_datastore.add_role_to_user(email, role_obj)
|
|
|
|
try:
|
|
db.session.commit()
|
|
except exc.IntegrityError:
|
|
db.session.rollback()
|
|
raise AddUserError("User roles not added correctly")
|
|
|
|
|
|
@app.cli.command()
|
|
def create_user():
|
|
"""
|
|
Create a new user.
|
|
"""
|
|
email = None
|
|
while not email:
|
|
email_tmp = click.prompt(gettext("Email"))
|
|
|
|
email_test = _DummyEmailField(email_tmp)
|
|
email_validator = Email()
|
|
try:
|
|
email_validator(None, email_test)
|
|
except ValidationError:
|
|
click.echo(gettext("Email is invalid"))
|
|
else:
|
|
if User.query.filter_by(email=email_tmp).first():
|
|
click.echo(gettext("User already exists"))
|
|
else:
|
|
email = email_tmp
|
|
|
|
password = None
|
|
while not password:
|
|
password = click.prompt(gettext("Password"), hide_input=True,
|
|
confirmation_prompt=True)
|
|
|
|
admin = click.confirm(gettext("Administrator?"))
|
|
|
|
roles = ("Admin",) if admin else ()
|
|
|
|
add_user(email, password, roles)
|
|
|
|
click.echo(gettext("User added"))
|
|
|
|
|
|
@app.cli.command()
|
|
def first_run():
|
|
"""
|
|
Init the database and
|
|
query the user for an administrator email/password.
|
|
"""
|
|
init_db()
|
|
|
|
email = None
|
|
while not email:
|
|
email_tmp = click.prompt(gettext("Administrator email"))
|
|
|
|
email_test = _DummyEmailField(email_tmp)
|
|
email_validator = Email()
|
|
try:
|
|
email_validator(None, email_test)
|
|
except ValidationError:
|
|
click.echo(gettext("Email is invalid"))
|
|
else:
|
|
if User.query.filter_by(email=email_tmp).first():
|
|
click.echo(gettext("User already exists"))
|
|
else:
|
|
email = email_tmp
|
|
|
|
password = None
|
|
while not password:
|
|
password = click.prompt(gettext("Administrator Password"),
|
|
hide_input=True, confirmation_prompt=True)
|
|
|
|
add_user(email, password, ("Admin",), True)
|
|
|
|
click.echo(gettext("Setup complete"))
|
|
|