You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
206 lines
5.9 KiB
206 lines
5.9 KiB
#!/usr/bin/env python3 |
|
# coding=utf-8 |
|
# |
|
# run.py: Run cupola |
|
# Copyright (C) 2015-2017 Sam Black <samwwwblack@lapwing.org> |
|
# |
|
# This program is free software: you can redistribute it and/or modify |
|
# it under the terms of the GNU Affero General Public License as published by |
|
# the Free Software Foundation, either version 3 of the License, or |
|
# (at your option) any later version. |
|
# |
|
# This program is distributed in the hope that it will be useful, |
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
# GNU Affero General Public License for more details. |
|
# |
|
# You should have received a copy of the GNU Affero General Public License |
|
# along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
# |
|
import sys |
|
import click |
|
from datetime import datetime |
|
|
|
from flask import current_app |
|
from flask_babelex import gettext |
|
from flask_migrate import upgrade as db_upgrade |
|
from flask_security.utils import hash_password |
|
from sqlalchemy import exc |
|
from sqlalchemy_utils.functions import create_database |
|
from sqlalchemy_utils.functions import database_exists |
|
from wtforms.validators import Email |
|
from wtforms.validators import ValidationError |
|
|
|
from cupola.models import db |
|
from cupola.models.users import User |
|
from cupola.models.users import user_datastore |
|
from cupola.web import create_app |
|
|
|
app = create_app() |
|
|
|
|
|
def init_db(): |
|
""" |
|
Create the database and populate the required data. |
|
""" |
|
db_url = current_app.config["SQLALCHEMY_DATABASE_URI"] |
|
|
|
try: |
|
if database_exists(db_url): |
|
if "user" in db.metadata.tables.keys(): |
|
click.echo(gettext( |
|
"Database already exists, check your installation.")) |
|
sys.exit(1) |
|
else: |
|
create_database(db_url) |
|
except exc.OperationalError: |
|
create_database(db_url) |
|
|
|
db_upgrade() |
|
# User roles |
|
# TODO: This list should probably be in the models file |
|
user_datastore.create_role(name="Admin", description="Site Administrator") |
|
user_datastore.create_role(name="Member", description="Member") |
|
|
|
db.session.commit() |
|
|
|
|
|
class _DummyEmailField(object): |
|
""" |
|
This is a dummy WTForms Field class to use with the validator. |
|
""" |
|
def __init__(self, data): |
|
self.data = data |
|
|
|
def gettext(self, string): |
|
return string |
|
|
|
|
|
class AddUserError(Exception): |
|
""" |
|
Raise if adding a new user fails. |
|
""" |
|
pass |
|
|
|
|
|
def add_user(email, password, roles, admin_first_run=False): |
|
""" |
|
Add a new user. |
|
|
|
:param email: email of the new user |
|
:type email: str |
|
:param password: password of the new user |
|
:type password: str |
|
:param roles: site roles the new user should have |
|
:type roles: list or tuple |
|
:param admin_first_run: if first run, |
|
automatically set the user "confirmed_at" field |
|
:type admin_first_run: bool |
|
""" |
|
email_test = _DummyEmailField(email) |
|
email_validator = Email() |
|
try: |
|
email_validator(None, email_test) |
|
except ValidationError: |
|
click.echo(gettext("Email is invalid")) |
|
sys.exit(2) |
|
|
|
if User.query.filter_by(email=email).first(): |
|
click.echo(gettext("User already exists")) |
|
sys.exit(2) |
|
|
|
fullname = email.split("@")[0] |
|
if admin_first_run: |
|
user_datastore.create_user(email=email, |
|
password=hash_password(password), |
|
fullname=fullname, |
|
confirmed_at=datetime.now()) |
|
else: |
|
user_datastore.create_user(email=email, |
|
password=hash_password(password), |
|
fullname=fullname) |
|
try: |
|
db.session.commit() |
|
except exc.IntegrityError: |
|
db.session.rollback() |
|
raise AddUserError("User already exists") |
|
|
|
for role in roles: |
|
role_obj = user_datastore.find_role(role) |
|
if role_obj: |
|
user_datastore.add_role_to_user(email, role_obj) |
|
|
|
try: |
|
db.session.commit() |
|
except exc.IntegrityError: |
|
db.session.rollback() |
|
raise AddUserError("User roles not added correctly") |
|
|
|
|
|
@app.cli.command() |
|
def create_user(): |
|
""" |
|
Create a new user. |
|
""" |
|
email = None |
|
while not email: |
|
email_tmp = click.prompt(gettext("Email")) |
|
|
|
email_test = _DummyEmailField(email_tmp) |
|
email_validator = Email() |
|
try: |
|
email_validator(None, email_test) |
|
except ValidationError: |
|
click.echo(gettext("Email is invalid")) |
|
else: |
|
if User.query.filter_by(email=email_tmp).first(): |
|
click.echo(gettext("User already exists")) |
|
else: |
|
email = email_tmp |
|
|
|
password = None |
|
while not password: |
|
password = click.prompt(gettext("Password"), hide_input=True, |
|
confirmation_prompt=True) |
|
|
|
admin = click.confirm(gettext("Administrator?")) |
|
|
|
roles = ("Admin",) if admin else () |
|
|
|
add_user(email, password, roles) |
|
|
|
click.echo(gettext("User added")) |
|
|
|
|
|
@app.cli.command() |
|
def first_run(): |
|
""" |
|
Init the database and |
|
query the user for an administrator email/password. |
|
""" |
|
init_db() |
|
|
|
email = None |
|
while not email: |
|
email_tmp = click.prompt(gettext("Administrator email")) |
|
|
|
email_test = _DummyEmailField(email_tmp) |
|
email_validator = Email() |
|
try: |
|
email_validator(None, email_test) |
|
except ValidationError: |
|
click.echo(gettext("Email is invalid")) |
|
else: |
|
if User.query.filter_by(email=email_tmp).first(): |
|
click.echo(gettext("User already exists")) |
|
else: |
|
email = email_tmp |
|
|
|
password = None |
|
while not password: |
|
password = click.prompt(gettext("Administrator Password"), |
|
hide_input=True, confirmation_prompt=True) |
|
|
|
add_user(email, password, ("Admin",), True) |
|
|
|
click.echo(gettext("Setup complete"))
|
|
|