m4pl1mp/plugins/twitter_plugin.py
2022-02-01 22:44:42 -06:00

194 lines
12 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import irc3
from datetime import datetime
import twitter
import re
import os
import time
import timeago
import os
import requests
from lxml.html import fromstring
from difflib import SequenceMatcher
###########################################################################################
###########################################################################################
CONSUMER_KEY = os.environ['CONSUMER_KEY']
CONSUMER_SECRET = os.environ['CONSUMER_SECRET']
ACCESS_TOKEN_KEY = os.environ['ACCESS_TOKEN_KEY']
ACCESS_TOKEN_SECRET = os.environ['ACCESS_TOKEN_SECRET']
TOO_LONG = 2000
YOUTUBE_REGEX = re.compile('http(?:s?):\/\/(?:www\.)?youtu(?:be\.com\/watch\?v=|\.be\/)([\w\-\_]*)(&(amp;)?[\w\?=]*)?', re.IGNORECASE)
TWITTER_REGEX = re.compile('https?:\/\/twitter\.com\/(?:#!\/)?(\w+)\/status(es)?\/(\d+)$', re.IGNORECASE)
URL_REGEX = re.compile('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', re.IGNORECASE)
twitter = twitter.Api(consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET,
access_token_key=ACCESS_TOKEN_KEY, access_token_secret=ACCESS_TOKEN_SECRET)
###########################################################################################
###########################################################################################
@irc3.plugin
class Plugin:
#######################################################################################
#######################################################################################
def __init__(self, bot):
self.bot = bot
#######################################################################################
#######################################################################################
@irc3.extend
def _similar(self, a, b):
return SequenceMatcher(None, a, b).ratio()
#######################################################################################
#######################################################################################
@irc3.extend
def _check_for_url(self, og_tweet, d_nick, d_url, d_unrolled, d_text, d_target):
match_list = URL_REGEX.findall(d_unrolled)
read_size = 0
if match_list:
url = match_list.pop()
try:
if not d_unrolled.find('https://twitter.com/') == -1:
if not d_unrolled.find('status') == -1:
try:
e_status = d_unrolled.split('/')[-1]
e_tweet = twitter.GetStatus(e_status)
e_text = e_tweet.text
if self._similar(og_tweet.text,e_text) > 0.7: return
msg = "\x02\x0302{nick1:}:{nick2:}\x0F\x02\x0303 ▶▶ {e_text:} \x0F".format(nick1=d_nick,nick2='UNROLLED',e_text=e_text)
if e_tweet.media:
for y in range(len(e_tweet.media)):
m_turl = e_tweet.media[y].url
m_murl = e_tweet.media[y].media_url
m_eurl = e_tweet.media[y].expanded_url
msg = msg + " ▶▶ [media] \x0F\x02\x0312{media_url:}".format(media_url=m_murl)
if e_tweet.urls:
for y in range(len(e_tweet.urls)):
e_turl = e_tweet.urls[y].url
e_eurl = e_tweet.urls[y].expanded_url
msg = msg + " ▶▶ [url] \x0F\x02\x0312{e_url:}".format(e_url=e_eurl)
msg = self.bot.emo(msg)
self.bot.privmsg(d_target, msg)
return
except Exception as e:
msg = "wu/tang >>>>>>>>>>> sub-unrolling: {}".format(e)
msg = self.bot.emo(msg)
self.bot.privmsg(d_target, msg)
return
r = requests.get(d_unrolled, timeout=3, stream=True)
content_type = r.headers.get("Content-Type")
content_length = r.headers.get('Content-Length')
if not content_length:
content_length = 0
if content_type.startswith('image'):
msg = "\x02\x0302{nick1:}:{nick2:}\x0F\x02\x0303 ▶▶ [media] {media:} \x0F".format(nick1=d_nick,nick2='UNROLLED',media=d_unrolled)
msg = self.bot.emo(msg)
self.bot.privmsg(d_target, msg)
return
if not content_type.startswith("text/html"):
return
if int(content_length) > 200000:
self.bot.privmsg(d_target, "pre-fetch aborted -> fuck your large ass content -> {} -> {}".format(d_url,d_unrolled))
while read_size <= (2000 * 10):
for content in r.iter_content(chunk_size=2000):
tree = fromstring(content)
title = tree.find(".//title")
if title is not None:
title = title.text.strip()[:100]
print('title: {}'.format(title))
similarity = self.bot._similar(title,d_text)
if similarity > 0.4:
print('wu/tang: similarity')
return
msg = "\x02\x0302{nick1:}:{nick2:}\x0F\x02\x0304 ▶▶ \x0F\x1D\x0314{url:}\x0F\x0304 ▶▶ \x0F\x0303{unrolled:} \x0F\x0304▶▶ \x0F\x1D\x0314{title:}\x0F".format(nick1=d_nick,nick2='UNROLLED',url=d_url,unrolled=d_unrolled,title=title)
msg = self.bot.emo(msg)
self.bot.privmsg(d_target, msg)
return
read_size = read_size + 2000
except Exception as e:
self.bot.privmsg("_debug_check_for_url_error: {}".format(e))
print("original: {} nick: {} url: {} unrolled: {} text: {} error: {}".format(og_tweet,d_nick,d_url,d_unrolled,d_text,e))
pass
#######################################################################################
#######################################################################################
def _check_for_twitter(self, mask=None, data=None, target=None, **kw):
match_list = TWITTER_REGEX.findall(data)
if match_list:
status_id = match_list[0][2]
try:
tweet = twitter.GetStatus(status_id=status_id)
tweet_text = tweet.text
user = tweet.user.screen_name
fav_count = tweet.favorite_count
retweet_count = tweet.retweet_count
if tweet.coordinates:
location = tweet.coordinates
else:
location = ""
tweet_time = time.strptime(tweet.created_at, '%a %b %d %H:%M:%S +0000 %Y')
time_since = timeago.format(time.strftime('%Y-%m-%d %H:%M:%S', tweet_time), datetime.now())
msg = "\x02\x0302{} \x0F\x0303▶\x0F \x02\x0301{}\x0F\x0314 | Retweets:\x0F \x1D\x0306{}\x0F\x0314 Favorites:\x0F\x1D\x0306 {} \x0F\x1D\x0314\x1D {} {}".format(user, tweet_text, retweet_count, fav_count, time_since, location)
msg = self.bot.emo(msg)
self.bot.privmsg(target, msg)
match_list = URL_REGEX.findall(msg)
try:
if(match_list):
try:
if len(tweet.urls) == 0:
if tweet.media:
for y in range(len(tweet.media)):
m_turl = tweet.media[y].url
m_murl = tweet.media[y].media_url
m_eurl = tweet.media[y].expanded_url
msg = "\x02\x0302{nick1:}:{nick2:}\x0F\x02\x0304 ▶▶ [media] \x0F\x02\x0312{m_turl:}\x0F\x0304 ▶▶ \x0F\x0303{m_murl:} \x0F\x0304▶▶ \x0F\x1D\x0314{m_eurl:}\x0F".format(nick1=user,nick2='UNROLLED',m_turl=m_turl,m_murl=m_murl,m_eurl=m_eurl)
msg = self.bot.emo(msg)
self.bot.privmsg(target, msg)
else:
for y in range(len(tweet.urls)):
t_turl = tweet.urls[y].url
try:
match_list.remove(t_turl)
except:
print('cant remove from matchlist, does not exist')
t_eurl = tweet.urls[y].expanded_url
yt_match_list = YOUTUBE_REGEX.findall(t_eurl)
if yt_match_list:
d_video_id = t_eurl.split('=')[1]
self.bot.madjust = "{}:UNROLLED".format(user)
self.bot.yt(mask,target,{'<keyword>': [d_video_id],'yt': True})
self.bot.madjust = ""
else:
if not str(tweet.id) == t_eurl.split('/')[-1]:
self.bot._check_for_url(tweet,user,t_turl,t_eurl,tweet_text,target)
else:
print('bypassing original tweet')
try:
if len(match_list) > 0:
print('items in matchlist remain')
print(match_list)
except:
print('errrororororororo: cant remove from matchlist, does not exist')
except Exception as e:
self.bot.privmsg(target,'twitter_plugin <> _debug_unrolling -> wu/tang: {}'.format(e))
except Exception as e:
self.bot.privmsg(target,'twitter_plugin <> wu/tang: {}'.format(e))
except Exception as e:
_msg = e.message[0]['message']
_code = e.message[0]['code']
_erid = ''
if _code == 179:
_erid = "PROTECTED TW33T"
else:
_erid = "DON'T CARE EXCEPTION"
msg = "\x02\x0302{} \x0F\x0304▶ \x0F\x02\x0312{} aka {}\x0F\x0303".format(_code,_msg,_erid)
msg = self.bot.emo(msg)
self.bot.privmsg(target, msg)
pass
#######################################################################################
#######################################################################################
@irc3.event(irc3.rfc.PRIVMSG)
def on_privmsg_search_for_twitter(self, mask=None, target=None, data=None, **kw):
if data.startswith("?"): return
if mask.nick.lower() not in self.bot.ignore_list:
self._check_for_twitter(mask, data, target)
#######################################################################################
#######################################################################################
###########################################################################################
###########################################################################################