from flask import Flask, request, json, Blueprint, current_app, render_template, jsonify, request, g
from ownchatbot.db import get_db
from ownchatbot.owncast_com import send_chat, send_private_chat
from ownchatbot.user_handlers import add_user_to_points, change_name, get_users_points, remove_duplicates, get_email_code, set_email_code
from ownchatbot.bot_messages import do_reward, help_message
from ownchatbot.reward_handlers import all_active_goals, all_active_votes, all_active_rewards
from ownchatbot.kofi_handlers import accept_donation
import json
import random
ocb = Blueprint('webhooks', __name__)
def format(rawjson): # Make data legible
formatted_data = json.dumps(rawjson, indent=4)
return formatted_data
@ocb.route('/kofiHook', methods=["POST"])
def kofiHook():
current_app.logger.info(f'----------------------------------------------------------------------------')
current_app.logger.info(f'Kofi request')
if request.content_type == 'application/x-www-form-urlencoded':
raw_data = request.form.get('data') # Get the kofi data
if raw_data:
raw_data = json.loads(raw_data)
is_authed = raw_data['verification_token']
if is_authed == current_app.config['KOFI_TOKEN']:
type = raw_data['type']
is_public = raw_data['is_public']
if is_public:
from_name = raw_data['from_name']
new_sub = raw_data['is_first_subscription_payment']
message = raw_data['message']
shop_items = raw_data['shop_items']
name = raw_data['from_name']
email = raw_data['email']
amount = raw_data['amount']
sub_payment = raw_data['is_subscription_payment']
first_sub = raw_data['is_first_subscription_payment']
tier_name = raw_data['tier_name']
if type == 'Shop Order':
current_app.logger.info(f'\n{name} purchased {format(shop_items)}\nMessage: {message}\n')
if type == 'Donation':
donation_info = [is_public, name, email, amount, message]
tip_points = current_app.config['KOFI_SETTINGS']['tip_points']
accept_donation(donation_info, tip_points)
if type == 'Subscription':
if first_sub:
if tier_name:
current_app.logger.info(f'\n{name} <{email}> subscribed as a {tier_name} tier member.')
else:
current_app.logger.info(f'\n{name} <{email}> subscribed.')
else:
if tier_name:
current_app.logger.info(f'\n{name} <{email}> renewed their {tier_name} tier membership.')
else:
current_app.logger.info(f'\n{name} <{email}> renewed their membership.')
return jsonify({'status': 'success'}), 200
else:
current_app.logger.info(f'Token invalid. Rejecting.')
return jsonify({'status': 'unauthorized'}), 401
@ocb.route('/chatHook', methods=['POST'])
def chat_hook():
prefix = current_app.config['PREFIX']
data = request.json
db = get_db()
if data['type'] in ['CHAT', 'NAME_CHANGED', 'USER_JOINED']: # Check if the viewer is in the chatbot database
user_id = data['eventData']['user']['id']
authed = data['eventData']['user']['authenticated']
display_name = data['eventData']['user']['displayName']
if add_user_to_points(db, user_id, display_name, authed):
current_app.logger.debug(f'Added/updated {user_id} database.')
if data['type'] == 'USER_JOINED': # Do username house cleaning when a viewer joins
if data['eventData']['user']['authenticated']:
remove_duplicates(db, user_id, display_name)
elif data['type'] == 'NAME_CHANGE':
user_id = data['eventData']['user']['id']
new_name = data['eventData']['newName']
change_name(db, user_id, new_name)
if data['eventData']['user']['authenticated']:
remove_duplicates(db, user_id, new_name)
elif data['type'] == 'CHAT': # If a chat message, sort out what command it is
user_id = data['eventData']['user']['id']
display_name = data['eventData']['user']['displayName']
current_app.logger.debug(f'Chat message from {display_name}:')
current_app.logger.debug(f'{data["eventData"]["rawBody"]}')
lowercase_msg = data['eventData']['rawBody'].lower() # Convert body to lower case to match reward case
if lowercase_msg.startswith(f'{prefix}help'): # Send the help message
help_message(user_id)
elif lowercase_msg.startswith(f'{prefix}points'): # Get the viewer's current points
points = get_users_points(db, user_id)
if points is None:
send_private_chat(user_id, f'{display_name}, couldn\'t get your points, for some highly technical reason.')
else:
send_private_chat(user_id, f'{display_name}, you have {points} points.')
elif lowercase_msg.startswith(f'{prefix}reg_mail'): # Generate a code to verify users account for email registration
mail_reg_code = get_email_code(db, user_id)
if mail_reg_code: # If the viewer already has a code waiting
send_private_chat(user_id, f'{display_name}, your code is {mail_reg_code}. Enter it into the form, with your email address, to enable Kofi perks!')
else: # if not
mail_reg_code = random.randint(100000, 999999)
if set_email_code(db, user_id, mail_reg_code):
send_private_chat(user_id, f'{display_name}, your code is {mail_reg_code}. Enter it into the form on the , with your email address, to enable Kofi perks!')
elif lowercase_msg.startswith(f'{prefix}rewards'): # Send rewards list
if current_app.config['REWARDS']:
rewards_msg = f'Currently active rewards:'
for reward, details in current_app.config['REWARDS'].items():
if details.get('categories'):
if not (set(details['categories']) & set(current_app.config['ACTIVE_CAT'])): # If there are no common categories, continue
continue
if 'type' in details and details['type'] == 'goal':
rewards_msg = f'{rewards_msg}
* {prefix}{reward} goal at {details["target"]} contributed points.'
else:
rewards_msg = f'{rewards_msg}
* {prefix}{reward} for {details["price"]} points.'
if 'info' in details:
rewards_msg = f'{rewards_msg}
{details["info"]}'
else:
rewards_msg = f'{rewards_msg}'
else:
rewards_msg = 'There are currently no active rewards.'
send_private_chat(user_id, rewards_msg)
elif lowercase_msg.startswith(f'{prefix}'): # Send to handle rewards
do_reward(lowercase_msg, user_id)
return data
@ocb.route('/goals', methods=['GET']) # Route for goals overlay
def goals():
db = get_db()
return render_template('goals.html',
goals=all_active_goals(db),
rewards=all_active_rewards())
@ocb.route('/votes', methods=['GET']) # Route for votes overlay
def votes():
db = get_db()
return render_template('votes.html',
votes=all_active_votes(db))