from flask import flash, render_template, Blueprint, current_app, redirect, request, url_for, session, g, send_from_directory from datetime import timezone from ownchatbot.db import get_db, reread_goals, reread_votes, rem_vote, reset_vote, reset_goal, clear_fulfilled_rewards, clear_reward_queue, rem_cool, rem_from_queue from ownchatbot.reward_handlers import all_active_votes, all_active_goals, all_active_rewards, get_queue, fulfill_reward, save_rewards, activate_category, deactivate_category, refund_reward, reread_categories, save_config, save_alerts, del_alert_file from ownchatbot.user_handlers import get_all_users, get_all_users_by_name, refund_points, adjust_points, change_email, get_email_code, del_email_code, save_todolist from ownchatbot.bot_messages import save_announce from ownchatbot.owncast_com import send_private_chat import json import emoji from ownchatbot.kofi_handlers import save_kofi_settings, kofi_pngs import random import pkce import requests from functools import wraps import os ocb = Blueprint('web_panels', __name__) state_value = '' def requires_login(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user' not in session: return redirect(url_for('web_panels.login')) return f(*args, **kwargs) return decorated_function @ocb.route('/login') # Remove this comment def login(): # Verify the streamer using indieauth, to their owncast instance code_verifier, code_challenge = pkce.generate_pkce_pair() # Generate a code verifier and code challenge global state_value state_value = code_verifier owncast_url = current_app.config['OWNCAST_URL'] client_id = current_app.config['ACCESS_ID'] redirect_url = f'{owncast_url}/api/auth/provider/indieauth?client_id={client_id}&redirect_uri={url_for("web_panels.auth_response", _external=True)}&response_type=code&code_challenge_method=S256&code_challenge={code_challenge}&state={code_verifier}' return redirect(redirect_url) @ocb.route('/auth_response') def auth_response(): code = request.args.get('code') state = request.args.get('state') if state == state_value: # Check that the state value returned matches the state value sent current_app.logger.info(f'CSRF code is valid.') owncast_url = current_app.config['OWNCAST_URL'] owncast_auth_url = f'{owncast_url}/api/auth/provider/indieauth' access_id = current_app.config['ACCESS_ID'] access_token = current_app.config['ACCESS_TOKEN'] # https://owncast.online/api/latest/#tag/Auth # https://aaronparecki.com/2021/04/13/26/indieauth token_response = requests.post(owncast_auth_url, data={ 'client_id': access_id, 'client_secret': access_token, 'code': code, 'redirect_uri': url_for("web_panels.auth_response", _external=True), 'grant_type': 'authorization_code', 'code_verifier': state }) return_data = token_response.json() f_return_data = json.dumps(return_data, indent=4) auth_photo = return_data['profile']['photo'] session['user'] = f_return_data current_app.logger.info(f'Authenticated.') return redirect(url_for('web_panels.mgmt')) else: current_app.logger.info(f'Invalid CSRF Code.') return 'Not Authorized' @ocb.route('/logout') def logout(): session.pop('user', None) return redirect(url_for('web_panels.user_panel')) @ocb.route('/mgmt', methods=['GET']) # The streamer's management panel @requires_login def mgmt(): owncast_url = current_app.config['OWNCAST_URL'] db = get_db() users = get_all_users(db) utc_timezone = timezone.utc rewards = current_app.config['REWARDS'] active_rewards = [] for each_reward in all_active_rewards(): # Get the name of all active rewards active_rewards.append(each_reward) active_categories = current_app.config['ACTIVE_CAT'] inactive_categories = current_app.config['INACTIVE_CAT'] all_cats = current_app.config['ALL_CAT'] points_interval = current_app.config['POINTS_INTERVAL'] points_award = current_app.config['POINTS_AWARD'] gunicorn_logging = current_app.config['GUNICORN'] prefix = current_app.config['PREFIX'] access_id = current_app.config['ACCESS_ID'] access_token = current_app.config['ACCESS_TOKEN'] kofi_token = current_app.config['KOFI_TOKEN'] kofi_integration = current_app.config['KOFI_INTEGRATION'] announce_enable = current_app.config['ANNOUNCE_ENABLE'] announce_interval = current_app.config['ANNOUNCE_INTERVAL'] announcements = current_app.config['ANNOUNCEMENTS'] todolist_items = current_app.config['LIST'] active_tab = request.args.get('activeTab') alerts_dict = current_app.config['ALERTS'] settings_info = [ access_id, points_interval, points_award, gunicorn_logging, prefix, access_token, owncast_url, kofi_token, kofi_integration, announce_enable, announce_interval ] return render_template('mgmt.html', queue=get_queue(db), votes=all_active_votes(db), goals=all_active_goals(db), rewards=rewards, active_rewards=active_rewards, prefix=current_app.config['PREFIX'], kofi_settings=current_app.config['KOFI_SETTINGS'], kofi_integration=kofi_integration, announcements=announcements, users=users, utc_timezone=utc_timezone, active_categories=active_categories, inactive_categories=inactive_categories, settings_info=settings_info, items=todolist_items, alerts_dict=alerts_dict, activeTab=active_tab) @ocb.route('/mgmt_queue', methods=['GET']) # The streamer's management panel @requires_login def mgmtqueue(): owncast_url = current_app.config['OWNCAST_URL'] db = get_db() users = get_all_users(db) utc_timezone = timezone.utc rewards = current_app.config['REWARDS'] active_rewards = [] for each_reward in all_active_rewards(): # Get the name of all active rewards active_rewards.append(each_reward) return render_template('queue.html', queue=get_queue(db), votes=all_active_votes(db), goals=all_active_goals(db), rewards=rewards, utc_timezone=utc_timezone) @ocb.route('/userpanel', methods=['GET']) # The viewers panel def user_panel(): db = get_db() instance = request.args.get('instance') all_rewards = rewards = current_app.config['REWARDS'] username = request.args.get('username') points_interval = current_app.config['POINTS_INTERVAL'] points_award = current_app.config['POINTS_AWARD'] if username is not None: users = get_all_users_by_name(db, username) else: users = [] utc_timezone = timezone.utc return render_template('userpanel.html', queue=get_queue(db), votes=all_active_votes(db), goals=all_active_goals(db), rewards=all_active_rewards(), all_rewards=all_rewards, prefix=current_app.config['PREFIX'], kofi_settings=current_app.config['KOFI_SETTINGS'], kofi_integration=current_app.config['KOFI_INTEGRATION'], points_interval=points_interval, points_award=points_award, username=username, users=users, instance=instance, utc_timezone=utc_timezone) @ocb.route('/mgmt/fulfill', methods=['GET']) @requires_login def fulfilled(): db = get_db() reward_id = request.args.get('reward_id') username = request.args.get('username') fulfill_reward(db, reward_id) return redirect(url_for('web_panels.mgmtqueue')) @ocb.route('/mgmt/refund', methods=['GET']) @requires_login def refund(): db = get_db() reward_id = request.args.get('reward_id') reward = request.args.get('reward') rewards = current_app.config['REWARDS'] points = rewards[reward]['price'] username = request.args.get('username') user_id = request.args.get('rewarder_id') refund_points(db, user_id, points) # resets points refund_reward(db, reward_id) # marks the reward as refunded return redirect(url_for('web_panels.mgmtqueue')) @ocb.route('/mgmt/edit_account/', methods=['GET', 'POST']) # Streamer manually edit user's account @requires_login def edit_account(user_id): db = get_db() name = request.args.get('name') points = request.args.get('points') email = request.args.get('email') if request.method == 'POST': user_id = request.form['user_id'] name = request.form['name'] newpoints = request.form['newpoints'] adjust_points(db, user_id, newpoints) newemail = request.form['newemail'] if newemail == 'None': current_app.logger.info(f'No email change requested') else: if change_email(db, user_id, newemail): if newemail == '': current_app.logger.info(f'Removed {name}\'s email') else: current_app.logger.info(f'Changed {name}\'s email to {newemail}') return redirect(url_for('web_panels.mgmt', activeTab='accounts')) return render_template('edit_account.html', name=name, user_id=user_id, points=points, email=email) @ocb.route('/mgmt/delete/', methods=['GET', 'POST']) @requires_login def delete(reward_name): del_reward = current_app.config['REWARDS'] del_reward.pop(reward_name) if save_rewards(del_reward): if rem_cool(reward_name): rem_from_queue(reward_name) if reread_votes(): if reread_goals(): pass return redirect(url_for('web_panels.mgmt', activeTab='managerewards')) @ocb.route('/mgmt/edit/', methods=['GET', 'POST']) @requires_login def edit(reward_name): active_categories = current_app.config['ACTIVE_CAT'] all_the_rewards = current_app.config['REWARDS'] reward_data = all_the_rewards[reward_name] all_cats = current_app.config['ALL_CAT'] if request.method == 'POST': reward_data['cooldown'] = int(request.form['cooldown']) reward_data['type'] = request.form['type'] if reward_data['type'] == 'goal': reward_data['target'] = int(request.form['target']) if "milestones" not in reward_data: # If using old rewards.py, and no milestones key exists, create one reward_data["milestones"] = {"milestone1": [], "milestone2": [], "milestone3": []} if request.form['milestone1_points'] == '': reward_data['milestones']['milestone1'] = [] else: milestone1_points = int(request.form['milestone1_points']) reward_data['milestones']['milestone1'] = [request.form['milestone1_desc'], milestone1_points] if request.form['milestone2_points'] == '': reward_data['milestones']['milestone2'] = [] else: milestone2_points = int(request.form['milestone2_points']) reward_data['milestones']['milestone2'] = [request.form['milestone2_desc'], milestone2_points] if request.form['milestone3_points'] == '': reward_data['milestones']['milestone3'] = [] else: milestone3_points = int(request.form['milestone3_points']) reward_data['milestones']['milestone3'] = [request.form['milestone3_desc'], milestone3_points] else: reward_data['price'] = int(request.form['price']) reward_data['info'] = emoji.demojize(request.form['info']) if reward_data['type'] == 'special': reward_data['cmd'] = request.form['cmd'] reward_data['categories'] = request.form.getlist('category') reward_data['cooldown'] = int(request.form['cooldown']) all_the_rewards[reward_name] = reward_data save_rewards(all_the_rewards) if reward_data['type'] == 'goal': # Sync goals and votes in the db with rewards.py reread_goals() if reward_data['type'] == 'vote': reread_votes() return redirect(url_for('web_panels.mgmt', activeTab='managerewards')) return render_template('edit.html', all_cats=all_cats, reward_name=reward_name, active_categories=active_categories, reward_data=reward_data) @ocb.route('/mgmt/settings', methods=['GET', 'POST']) # OwnchatBot settings panel @requires_login def settings(): todolist_items = current_app.config['LIST'] points_interval = int(request.form['points_interval']) points_award = int(request.form['points_award']) gunicorn_logging = 'gunicorn_logging' in request.form prefix = request.form['prefix'] access_id = request.form['access_id'] access_token = request.form['access_token'] owncast_url = request.form['owncast_url'] kofi_integration = 'kofi_integration' in request.form kofi_token = request.form['kofi_token'] config_dict = { 'POINTS_INTERVAL': points_interval, 'POINTS_AWARD': points_award, 'GUNICORN': gunicorn_logging, 'PREFIX': prefix, 'ACCESS_ID': access_id, 'ACCESS_TOKEN': access_token, 'OWNCAST_URL': owncast_url, 'KOFI_TOKEN': kofi_token, 'KOFI_INTEGRATION': kofi_integration } if save_config(config_dict): # Save new config.py current_app.logger.info('Saved new config.') return redirect(url_for('web_panels.mgmt', activeTab='settings')) @ocb.route('/mgmt/announcements', methods=['GET', 'POST']) # OwnchatBot settings panel @requires_login def announcements(): announce_enable = 'announce_enable' in request.form announce_interval = int(request.form['announce_interval']) new_announcements = [] new_announcements = request.form['announcements'].strip().split('\n') announce_dict = { 'ANNOUNCEMENTS': new_announcements, 'ANNOUNCE_ENABLE': announce_enable, 'ANNOUNCE_INTERVAL': announce_interval } if save_announce(announce_dict): # Save new announce.py current_app.logger.info('Saved new announcements.') return redirect(url_for('web_panels.mgmt', activeTab='announcements')) @ocb.route('/mgmt/ksettings', methods=['GET', 'POST']) # OwnchatBot settings panel @requires_login def ksettings(): kofi_settings_dict = current_app.config['KOFI_SETTINGS'] if request.method == 'POST': enable_donations = 'enable_donations' in request.form set_donation_points = request.form['set_donation_points'] enable_subs = 'enable_subs' in request.form sub_points = int(request.form['sub_points']) kofi_url = request.form['kofi_url'] kofi_settings_dict['donations'] = enable_donations kofi_settings_dict['subs'] = enable_subs kofi_settings_dict['sub_points'] = sub_points kofi_settings_dict['kofi_url'] = kofi_url if save_kofi_settings(kofi_settings_dict): current_app.logger.info(f'Saved Kofi settings') return redirect(url_for('web_panels.mgmt', activeTab='kofi-settings')) @ocb.route('/mgmt/add/', methods=['GET', 'POST']) @requires_login def add(reward_type): all_cats = current_app.config['ALL_CAT'] active_categories = current_app.config['ACTIVE_CAT'] all_the_rewards = current_app.config['REWARDS'] if request.method == 'POST': name = request.form['name'] name = name.lower() # Force the name to all lower case name = emoji.demojize(name) # Remove any emojis name = name.replace(" ", "") # Remove any spaces from the name type = request.form['type'] if name in all_the_rewards: # Check for duplicate reward names flash("A reward with this name already exists.", "error") # Flash an error message return redirect(url_for('web_panels.add', reward_type=reward_type)) # Redirect back to the add page if type != 'category': # If we're only adding a category, skip all of this cooldown = int(request.form['cooldown']) if type == 'redeem' or type == 'special' or type == 'vote': price = int(request.form['price']) if type == 'goal': target = int(request.form['target']) milestone1_desc = request.form['milestone1_desc'] if request.form['milestone1_points'] == '': milestone1_points = '' else: milestone1_points = int(request.form['milestone1_points']) milestone2_desc = request.form['milestone2_desc'] if request.form['milestone2_points'] == '': milestone2_points = '' else: milestone2_points = int(request.form['milestone2_points']) milestone3_desc = request.form['milestone3_desc'] if request.form['milestone3_points'] == '': milestone3_points = '' else: milestone3_points = int(request.form['milestone3_points']) info = request.form['info'] info = emoji.demojize(info) # Remove any emojis if type == 'special': cmd = request.form['cmd'] categories = request.form.getlist('category') if type == 'redeem': if categories == ['']: all_the_rewards[name] = {'price': price, 'type': type, 'info': info, 'cooldown': cooldown} else: all_the_rewards[name] = {'price': price, 'type': type, 'info': info, 'categories': categories, 'cooldown': cooldown} if type == 'goal': if categories == ['']: all_the_rewards[name] = {'target': target, 'type': type, 'info': info, 'cooldown': cooldown} else: all_the_rewards[name] = {'target': target, 'type': type, 'info': info, 'categories': categories, 'cooldown': cooldown} all_the_rewards[name]["milestones"] = {"milestone1": [], "milestone2": [], "milestone3": []} # Create empty milestones key if milestone1_points: all_the_rewards[name]["milestones"]["milestone1"] = [milestone1_desc, milestone1_points] if milestone2_points: all_the_rewards[name]["milestones"]["milestone2"] = [milestone2_desc, milestone2_points] if milestone3_points: all_the_rewards[name]["milestones"]["milestone3"] = [milestone3_desc, milestone3_points] if type == 'vote': if categories == ['']: all_the_rewards[name] = {'price': price, 'type': type, 'info': info} else: all_the_rewards[name] = {'price': price, 'type': type, 'info': info, 'categories': categories, 'cooldown': cooldown} if type == 'special': if categories == ['']: all_the_rewards[name] = {'price': price, 'type': type, 'info': info, 'cmd': cmd, 'cooldown': cooldown} else: all_the_rewards[name] = {'price': price, 'type': type, 'info': info, 'cmd': cmd, 'categories': categories, 'cooldown': cooldown} save_rewards(all_the_rewards) if type == 'goal': # Remove old goals and votes from the database reread_goals() if type == 'vote': reread_votes() else: # If we're only adding a category inactive_categories = current_app.config['INACTIVE_CAT'] inactive_categories.append(name) # Add it to the INACTIVE_CAT variable reread_categories() # Write it to categories.py return redirect(url_for('web_panels.mgmt', activeTab='managerewards')) return render_template('add.html', all_cats=all_cats, reward_type=reward_type, active_categories=active_categories) @ocb.route('/set_viewer_email', methods=['GET', 'POST']) def set_viewer_email(): db = get_db() mail_reg_code = int(request.form['code']) user_id = request.form['user_id'] db_mail_reg_code = get_email_code(db, user_id) new_email = request.form['new_email'] instance = request.form['instance'] user_name = request.form['user_name'] if mail_reg_code == db_mail_reg_code: if change_email(db, user_id, new_email): del_email_code(db, user_id) flash(f"Email Address \"{new_email}\" successfully registered.", "success") send_private_chat(user_id, f'{user_name}, thanks for registering for Kofi perks! I appreciate your support!') current_app.logger.info(f'Changed {user_id}\'s email to {new_email}') else: flash(f"Incorrect code. Email Address \"{new_email}\" was not registered.", "failure") current_app.logger.info(f'The code entered, \"{mail_reg_code}\", does not match \"{db_mail_reg_code}\" found in database.') return redirect(url_for('web_panels.user_panel', instance=instance, username=user_name)) @ocb.route('/mgmt/activate/', methods=['GET', 'POST']) @requires_login def activate(category): activate_category(category) return redirect(url_for('web_panels.mgmt', activeTab='categories')) @ocb.route('/mgmt/deactivate/', methods=['GET', 'POST']) @requires_login def deactivate(category): deactivate_category(category) return redirect(url_for('web_panels.mgmt', activeTab='categories')) @ocb.route('/mgmt/delcat//', methods=['GET', 'POST']) @requires_login def delcat(cat_name, cat_act): active_categories = current_app.config['ACTIVE_CAT'] inactive_categories = current_app.config['INACTIVE_CAT'] if cat_act == 'inactive': inactive_categories.remove(cat_name) else: active_categories.remove(cat_name) reread_categories() current_rewards = current_app.config['REWARDS'] for reward, details in current_rewards.items(): # Remove from rewards.py as well if cat_name in details['categories']: details['categories'].remove(cat_name) save_rewards(current_rewards) return redirect(url_for('web_panels.mgmt', activeTab='categories')) @ocb.route('/mgmt/reset//', methods=['GET', 'POST']) # Reset votes and goals to zero @requires_login def reset(reward_name, reward_type): if reward_type == "goal": reset_goal(reward_name) if reward_type == "vote": reset_vote(reward_name) return redirect(url_for('web_panels.mgmt', activeTab='managerewards')) @ocb.route('/mgmt/rereadvotes', methods=['GET', 'POST']) def rereadv(): reread_votes() return redirect(url_for('web_panels.mgmt')) @ocb.route('/mgmt/clearfulfilled', methods=['GET', 'POST']) @requires_login def clearfulfilled(): if clear_fulfilled_rewards(): current_app.logger.info('Cleared fulfilled rewards.') return redirect(url_for('web_panels.mgmtqueue')) @ocb.route('/mgmt/clearqueue', methods=['GET', 'POST']) @requires_login def clear_queue(): clear_reward_queue() return redirect(url_for('web_panels.mgmtqueue')) @ocb.route('/mgmt/addtodoitem', methods=['POST']) @requires_login def add_todo_item(): if request.method == 'POST': todolist_items = current_app.config['LIST'] item = request.form.get('item') if item: todolist_items.append({'name': item, 'crossed': 'no'}) if save_todolist(todolist_items): # Save todo list current_app.logger.info('Saved to-do list.') return redirect(url_for('web_panels.mgmt', activeTab='todolist')) return redirect(url_for('web_panels.mgmt', activeTab='todolist')) @ocb.route('/mgmt/cross/') @requires_login def cross(item_id): todolist_items = current_app.config['LIST'] if 0 <= item_id < len(todolist_items): # Make sure the item exists todolist_items[item_id]['crossed'] = 'yes' if save_todolist(todolist_items): # Save todo list current_app.logger.info('Saved to-do list.') return redirect(url_for('web_panels.mgmt', activeTab='todolist')) @ocb.route('/mgmt/uncross/') @requires_login def uncross(item_id): todolist_items = current_app.config['LIST'] if 0 <= item_id < len(todolist_items): todolist_items[item_id]['crossed'] = 'no' if save_todolist(todolist_items): # Save todo list current_app.logger.info('Saved to-do list.') return redirect(url_for('web_panels.mgmt', activeTab='todolist')) @ocb.route('/mgmt/remtodoitem/') @requires_login def rem_todo_item(item_id): todolist_items = current_app.config['LIST'] if 0 <= item_id < len(todolist_items): # Make sure the item exists removed = todolist_items.pop(item_id) current_app.logger.info(f'Removed \"{removed}\" from the to-do list.') if save_todolist(todolist_items): # Save todo list current_app.logger.info('Saved to-do list.') return redirect(url_for('web_panels.mgmt', activeTab='todolist')) @ocb.route('/mgmt/clearlist') @requires_login def clear_list(): todolist_items = current_app.config['LIST'] todolist_items = [] # Clear the list if save_todolist(todolist_items): # Save todo list current_app.logger.info('Saved to-do list.') return redirect(url_for('web_panels.mgmt', activeTab='todolist')) @ocb.route('/mgmt/alertupload/', methods=['POST']) @requires_login def alert_upload(alert_type): alerts_dict = current_app.config['ALERTS'] alert_file = request.files[alert_type] if alert_type in alerts_dict.keys(): # If the alert already exists, delete the existing alert file old_file = alerts_dict[alert_type.upper()] current_app.logger.info(f'{alert_type} already set.') if del_alert_file(old_file): pass filepath = os.path.join(current_app.config['ASSETS_FOLDER'], alert_file.filename) alert_file.save(filepath) alerts_dict[alert_type] = alert_file.filename if save_alerts(alerts_dict): # Save new alerts.py return redirect(url_for('web_panels.mgmt', activeTab='alerts')) return redirect(url_for('web_panels.mgmt', activeTab='alerts')) @ocb.route('/mgmt/delalert/') @requires_login def del_alert(alert_type): alerts_dict = current_app.config['ALERTS'] try: alert_file = alerts_dict[alert_type] alerts_dict.pop(alert_type) if save_alerts(alerts_dict): current_app.logger.info(f'Removed {alert_type} from alerts.py.') if del_alert_file(alert_file): pass except KeyError: current_app.logger.info(f'No {alert_type} alert set. Nothing to do.') return redirect(url_for('web_panels.mgmt', activeTab='alerts')) @ocb.route('/assets/') # Route for displaying alert images def assets(asset_name): return send_from_directory(current_app.config['ASSETS_FOLDER'], asset_name) @ocb.route('/alert/') # Route for alerts overlay def ocb_alert(alert_type): alert_type = alert_type.upper()+'_ALERT' alerts_dict = current_app.config['ALERTS'] try: alert_name = alerts_dict[alert_type] except KeyError: current_app.logger.info(f'No {alert_type} alert set. Nothing to do.') return f'You have not configured alert type \"{alert_type}\"' if alert_type == 'FOLLOWER_ALERT': return render_template('follower.html', alert_name=alert_name) elif alert_type == 'MILESTONE_ALERT': return render_template('rmilestone.html', alert_name=alert_name) elif alert_type == 'GOAL_ALERT': return render_template('rgoal.html', alert_name=alert_name) @ocb.route('/goals', methods=['GET']) # Route for goals overlay def goals(): db = get_db() return render_template('goals.html', goals=all_active_goals(db), rewards=all_active_rewards()) @ocb.route('/votes', methods=['GET']) # Route for votes overlay def votes(): db = get_db() return render_template('votes.html', votes=all_active_votes(db)) @ocb.route('/todo') def todo(): todolist_items = current_app.config['LIST'] return render_template('list.html', items=todolist_items)