Viewport onto your services
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

206 lines
5.9 KiB

#!/usr/bin/env python3
# coding=utf-8
#
# run.py: Run cupola
# Copyright (C) 2015-2017 Sam Black <samwwwblack@lapwing.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import click
from datetime import datetime
from flask import current_app
from flask_babelex import gettext
from flask_migrate import upgrade as db_upgrade
from flask_security.utils import hash_password
from sqlalchemy import exc
from sqlalchemy_utils.functions import create_database
from sqlalchemy_utils.functions import database_exists
from wtforms.validators import Email
from wtforms.validators import ValidationError
from cupola.models import db
from cupola.models.users import User
from cupola.models.users import user_datastore
from cupola.web import create_app
app = create_app()
def init_db():
"""
Create the database and populate the required data.
"""
db_url = current_app.config["SQLALCHEMY_DATABASE_URI"]
try:
if database_exists(db_url):
if "user" in db.metadata.tables.keys():
click.echo(gettext(
"Database already exists, check your installation."))
sys.exit(1)
else:
create_database(db_url)
except exc.OperationalError:
create_database(db_url)
db_upgrade()
# User roles
# TODO: This list should probably be in the models file
user_datastore.create_role(name="Admin", description="Site Administrator")
user_datastore.create_role(name="Member", description="Member")
db.session.commit()
class _DummyEmailField(object):
"""
This is a dummy WTForms Field class to use with the validator.
"""
def __init__(self, data):
self.data = data
def gettext(self, string):
return string
class AddUserError(Exception):
"""
Raise if adding a new user fails.
"""
pass
def add_user(email, password, roles, admin_first_run=False):
"""
Add a new user.
:param email: email of the new user
:type email: str
:param password: password of the new user
:type password: str
:param roles: site roles the new user should have
:type roles: list or tuple
:param admin_first_run: if first run,
automatically set the user "confirmed_at" field
:type admin_first_run: bool
"""
email_test = _DummyEmailField(email)
email_validator = Email()
try:
email_validator(None, email_test)
except ValidationError:
click.echo(gettext("Email is invalid"))
sys.exit(2)
if User.query.filter_by(email=email).first():
click.echo(gettext("User already exists"))
sys.exit(2)
fullname = email.split("@")[0]
if admin_first_run:
user_datastore.create_user(email=email,
password=hash_password(password),
fullname=fullname,
confirmed_at=datetime.now())
else:
user_datastore.create_user(email=email,
password=hash_password(password),
fullname=fullname)
try:
db.session.commit()
except exc.IntegrityError:
db.session.rollback()
raise AddUserError("User already exists")
for role in roles:
role_obj = user_datastore.find_role(role)
if role_obj:
user_datastore.add_role_to_user(email, role_obj)
try:
db.session.commit()
except exc.IntegrityError:
db.session.rollback()
raise AddUserError("User roles not added correctly")
@app.cli.command()
def create_user():
"""
Create a new user.
"""
email = None
while not email:
email_tmp = click.prompt(gettext("Email"))
email_test = _DummyEmailField(email_tmp)
email_validator = Email()
try:
email_validator(None, email_test)
except ValidationError:
click.echo(gettext("Email is invalid"))
else:
if User.query.filter_by(email=email_tmp).first():
click.echo(gettext("User already exists"))
else:
email = email_tmp
password = None
while not password:
password = click.prompt(gettext("Password"), hide_input=True,
confirmation_prompt=True)
admin = click.confirm(gettext("Administrator?"))
roles = ("Admin",) if admin else ()
add_user(email, password, roles)
click.echo(gettext("User added"))
@app.cli.command()
def first_run():
"""
Init the database and
query the user for an administrator email/password.
"""
init_db()
email = None
while not email:
email_tmp = click.prompt(gettext("Administrator email"))
email_test = _DummyEmailField(email_tmp)
email_validator = Email()
try:
email_validator(None, email_test)
except ValidationError:
click.echo(gettext("Email is invalid"))
else:
if User.query.filter_by(email=email_tmp).first():
click.echo(gettext("User already exists"))
else:
email = email_tmp
password = None
while not password:
password = click.prompt(gettext("Administrator Password"),
hide_input=True, confirmation_prompt=True)
add_user(email, password, ("Admin",), True)
click.echo(gettext("Setup complete"))