yoink/yoink.py

137 lines
4.5 KiB
Python

# Author: perp
import argparse
import aiohttp
import asyncio
from rich.tree import Tree
from rich.text import Text
from rich import print as rprint
class Yoink:
"""
Yoink is a program to verify Shodan keys
"""
def __init__(self, file, filt, output):
"""
Constructor.
Args:
file: str, Shodan keys
filt: str, Filter for Shodan keys
output: str, Output file to write
"""
self.file = file
self.filter = filt
self.output = output
self.tree = Tree("[bold yellow]Shodan Keys", guide_style="bold yellow")
self.tasks = []
self.valid = []
self.checked = 0
async def add_tree(self, key, query_credits, scan_credits, plan):
"""
Add the key to a tree.
Args:
key: str, Current valid key
query_credits: int, Query credits left
scan_credits: int, Scan credits left
plan: str, Current plan
"""
self.valid.append(key)
text = self.tree.add(f"[green]Key: [bold blue]{key}")
text.add(f"[green]Plan: [blue]{plan}")
text.add(f"[green]Query Credits: [blue]{query_credits}")
text.add(f"[blue]Scan Credits: [green]{scan_credits}")
async def verify(self, session, key):
"""
Verify the key by a GET request.
Args:
session: object, AioHTTP ClientSession
key: str, Shodan key
"""
# Loop forever (Ratelimit retries)
while True:
# GET request to API
async with session.get(f"https://api.shodan.io/api-info?key={key}") as request:
# 200 worked
if request.status == 200:
# Parse the JSON
json = await request.json()
query_credits = json["query_credits"]
scan_credits = json["scan_credits"]
plan = json["plan"]
# We don't want empty credits
if scan_credits != 0 or query_credits != 0:
# Check the plan with filter
match plan:
case "oss":
if self.filter == "oss":
await self.add_tree(key, query_credits, scan_credits, plan)
case "dev":
if self.filter == "dev":
await self.add_tree(key, query_credits, scan_credits, plan)
case "edu":
if self.filter == "edu":
await self.add_tree(key, query_credits, scan_credits, plan)
case _:
await self.add_tree(key, query_credits, scan_credits, plan)
break
# 429 ratelimited
elif request.status == 429:
await asyncio.sleep(2)
continue
self.checked += 1
async def run(self):
"""
Creates AioHTTP ClientSession, open the file, add each key as a task, gathers the tasks.
"""
# Create a reusable session
async with aiohttp.ClientSession() as session:
# Open the keys file
with open(self.file, "r") as file:
# Create each task & append
for key in file.readlines():
task = asyncio.ensure_future(self.verify(session, key.strip()))
self.tasks.append(task)
# Start each task
await asyncio.gather(*self.tasks)
rprint(self.tree)
rprint(f"Checked [bold blue]{self.checked}[reset], Valid/Filtered [bold green]{len(self.valid)}[reset]")
if self.output != None:
# Save to output file
with open(self.output, "a") as file:
for key in self.valid:
file.write(key+"\n")
if __name__ == "__main__":
# Add arguments & parse
parser = argparse.ArgumentParser(description="Yoink is a program to verify Shodan keys")
parser.add_argument("-f", "--file", help="Path to Shodan keys", required=True)
parser.add_argument("-t", "--filter", help="Filter type of Shodan plan (oss, dev, edu)", required=False)
parser.add_argument("-o", "--output", help="Path to output", required=False)
args = parser.parse_args()
# Create event loop & run
loop = asyncio.new_event_loop()
loop.run_until_complete(Yoink(args.file, args.filter, args.output).run())