proxies/run.py

182 lines
5.7 KiB
Python

from asyncio import Queue, get_running_loop, new_event_loop, gather, Event
from ipaddress import AddressValueError, IPv4Address
from logging import basicConfig, log
from re import compile
from tqdm import tqdm
from python_socks.async_.asyncio import ProxyType
from python_socks.async_.asyncio.v2 import Proxy
from python_socks.async_.asyncio.v2._stream import AsyncioSocketStream
try:
from uvloop import install
install()
except ImportError:
print("Unable to find uvloop, defaulting to normal event loop")
CHECK_PORT = 80
try:
from ssl import (CERT_NONE, PROTOCOL_TLS_CLIENT, _create_unverified_context)
except ImportError:
print("Unable to find ssl, we will not be able to make any connections using 443 unless ssl is installed")
CHECK_PORT = 80
CHECK_PROXIES = "./pulled_proxies.txt"
VALID_PROXIES = "./valid_proxies.txt"
DEAD_PROXIES = "./dead_proxies.txt"
CHECKING_LOG = "./proxy_log.log"
LOG_FMT = "%(asctime)s:%(levelname)s:%(levelno)s:%(lineno)d - %(message)s"
basicConfig(filename=CHECKING_LOG, filemode="w+", level=1, format=LOG_FMT, datefmt='[%D %H:%M:%S]', )
CHECK_ADDR: str | IPv4Address = IPv4Address("198.204.232.125")
CHECK_HOST_NAME: str = "tcp.ac"
CHECK_TIMEOUT: float = 10.0
WORKERS = 1000
loop = new_event_loop()
TASKS = []
PROXIES = set()
PROXY_QUEUE = Queue(WORKERS)
LIVE_PROXIES = set()
UNRESPONSIVE = set()
CLOSING = Event()
proxy_re = compile(
r"(?=^(?:(?P<protocol>socks[4-5]):\/\/)?(?:(?P<username>[^:]+):(?P<password>[^@]+)@)?(?P<ipaddr>[^:]+):(?P<port>[^\n]+)\n)"
)
with open(CHECK_PROXIES, "r") as f:
for line in f:
match = proxy_re.match(line)
if not match:
continue
# TODO: Pretty sure could identify by port used but just check all anyways for now
# PROXIES.add((ProxyType.HTTP, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
PROXIES.add((ProxyType.SOCKS4, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
PROXIES.add((ProxyType.SOCKS5, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
REQUEST_PAGE = f"""GET /ip HTTP/1.1
Host: {CHECK_HOST_NAME}
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0
Accept: text/plain;q=0.9,image/avif,image/webp,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Upgrade-Insecure-Requests: 0
Cache-Control: max-age=0""".encode("utf8")
def save_proxies():
with open(VALID_PROXIES, "a+") as f:
for line in f:
LIVE_PROXIES.add(line)
f.seek(0)
f.writelines(LIVE_PROXIES)
with open(DEAD_PROXIES, "a+") as f:
for line in f:
UNRESPONSIVE.add(line)
f.seek(0)
f.writelines(UNRESPONSIVE)
async def proxy_check():
_loop = get_running_loop()
while _loop.is_running() and not CLOSING.is_set():
page_requested = False
protocol, ipaddr, port, username, password = await PROXY_QUEUE.get()
proxy = Proxy.create(protocol, ipaddr, port, username, password, loop=_loop)
try:
if isinstance(CHECK_ADDR, IPv4Address):
check_addr = str(CHECK_ADDR)
elif isinstance(CHECK_ADDR, str):
IPv4Address(CHECK_ADDR)
check_addr = CHECK_ADDR
except AddressValueError:
print(f"CHECK_ADDRESS should be either an IPv4Address, or a string of the form x.x.x.x")
exit(1)
ctx = None
if CHECK_PORT == 443:
ctx = _create_unverified_context(PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = CERT_NONE
ctx.hostname_checks_common_name = False
ctx.get_ca_certs(False)
try:
_stream: AsyncioSocketStream | None = None
try:
_stream = await proxy.connect(check_addr, CHECK_PORT, timeout=CHECK_TIMEOUT, dest_ssl=ctx) # type: ignore
# await _stream.start_tls("tcp.ac", ctx)
LIVE_PROXIES.add(f"{protocol.name.lower()}://{ipaddr}:{port}")
log(10, f"proxy connected: {protocol.name.lower()}://{ipaddr}:{port}")
except Exception as e:
UNRESPONSIVE.add(f"{protocol.name.lower()}://{ipaddr}:{port}")
log(10, f"Connect Failed : {protocol.name.lower()}://{ipaddr}:{port}")
try:
if _stream:
await _stream.close()
except Exception as e:
log(10, f"Error when closing stream | {type(e)} - {e}")
finally:
continue
reading = True
while reading and _stream:
try:
if not page_requested:
await _stream.write_all(REQUEST_PAGE) # type: ignore
page_requested = True
resp = await _stream.reader.readline() # type: ignore
except Exception as e:
log(10, f"Exception when interacting with stream | {type(e)} - {e}")
reading = False
finally:
PROXY_QUEUE.task_done()
async def begin():
_loop = get_running_loop()
for _ in range(WORKERS):
TASKS.append(_loop.create_task(proxy_check()))
for proxy in tqdm(PROXIES):
await PROXY_QUEUE.put(proxy)
CLOSING.set()
await gather(*TASKS)
save_proxies()
loop.stop()
try:
loop.run_until_complete(begin())
except KeyboardInterrupt:
print("KeyboardInterrupt Received, closing event loop early.")
save_proxies()
finally:
loop.close()