proxies/test.py
2022-03-29 18:00:00 +00:00

172 lines
5.5 KiB
Python

from asyncio import Queue, get_running_loop, new_event_loop, gather, Event
from ipaddress import AddressValueError, IPv4Address
from logging import basicConfig, log
from re import compile
from python_socks.async_.asyncio import ProxyType
from python_socks.async_.asyncio.v2 import Proxy
from python_socks.async_.asyncio.v2._stream import AsyncioSocketStream
try:
from uvloop import install
install()
except ImportError:
print("Unable to find uvloop, defaulting to normal event loop")
CHECK_PORT = 80
try:
from ssl import (CERT_NONE, PROTOCOL_TLS_CLIENT, _create_unverified_context)
except ImportError:
print("Unable to find ssl, we will not be able to make any connections using 443 unless ssl is installed")
CHECK_PORT = 80
CHECK_ADDR: str | IPv4Address = IPv4Address("198.204.232.125")
CHECK_HOST_NAME: str = "tcp.ac"
CHECK_TIMEOUT: float = 10.0
WORKERS = 1000
CHECK_PROXIES = "./proxies_to_check.txt"
VALID_PROXIES = "./valid_proxies.txt"
DEAD_PROXIES = "./dead_proxies.txt"
CHECKING_LOG = "./proxy_log.log"
LOG_FMT = "%(asctime)s:%(levelname)s:%(levelno)s:%(lineno)d - %(message)s"
basicConfig(filename=CHECKING_LOG, filemode="w+", level=1, format=LOG_FMT, datefmt='[%D %H:%M:%S]', )
loop = new_event_loop()
TASKS = []
PROXIES = set()
PROXY_QUEUE = Queue(WORKERS)
LIVE_PROXIES = set()
UNRESPONSIVE = set()
CLOSING = Event()
proxy_re = compile(
r"(?=^(?:(?P<protocol>socks[4-5]):\/\/)?(?:(?P<username>[^:]+):(?P<password>[^@]+)@)?(?P<ipaddr>[^:]+):(?P<port>[^$]+)$)"
)
with open(CHECK_PROXIES, "r") as f:
for line in f:
match = proxy_re.match(line)
if not match:
continue
# TODO: Pretty sure could identify by port used but just check all anyways for now
PROXIES.add((ProxyType.HTTP, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
PROXIES.add((ProxyType.SOCKS4, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
PROXIES.add((ProxyType.SOCKS5, match.group("ipaddr"), match.group("port"), match.groupdict().get("username", None), match.groupdict().get("password", None),))
REQUEST_PAGE = f"""GET /ip HTTP/1.1
Host: {CHECK_HOST_NAME}
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0
Accept: text/plain;q=0.9,image/avif,image/webp,*/*;q=0.8
Accept-Language: en-US,en;q=0.5
Accept-Encoding: gzip, deflate
Connection: keep-alive
Upgrade-Insecure-Requests: 0
Cache-Control: max-age=0""".encode("utf8")
def save_proxies():
with open(VALID_PROXIES, "w+") as f:
f.writelines(LIVE_PROXIES)
with open(DEAD_PROXIES, "w+") as f:
f.writelines(UNRESPONSIVE)
async def proxy_check():
_loop = get_running_loop()
while _loop.is_running() and not CLOSING.is_set():
page_requested = False
protocol, ipaddr, port, username, password = await PROXY_QUEUE.get()
proxy = Proxy.create(protocol, ipaddr, port, username, password, loop=_loop)
try:
if isinstance(CHECK_ADDR, IPv4Address):
check_addr = str(CHECK_ADDR)
elif isinstance(CHECK_ADDR, str):
IPv4Address(CHECK_ADDR)
check_addr = CHECK_ADDR
except AddressValueError:
print(f"CHECK_ADDRESS should be either an IPv4Address, or a string of the form x.x.x.x")
exit(1)
ctx = None
if CHECK_PORT == 443:
ctx = _create_unverified_context(PROTOCOL_TLS_CLIENT)
ctx.check_hostname = False
ctx.verify_mode = CERT_NONE
ctx.hostname_checks_common_name = False
ctx.get_ca_certs(False)
try:
_stream: AsyncioSocketStream | None = None
try:
_stream = await proxy.connect(check_addr, CHECK_PORT, timeout=CHECK_TIMEOUT, dest_ssl=ctx) # type: ignore
# await _stream.start_tls("tcp.ac", ctx)
LIVE_PROXIES.add(f"{protocol.name.lower()}://{ipaddr}:{port}")
log(10, f"proxy connected: {protocol.name.lower()}://{ipaddr}:{port}")
except Exception as e:
UNRESPONSIVE.add(f"{protocol.name.lower()}://{ipaddr}:{port}")
log(10, f"Connect Failed : {protocol.name.lower()}://{ipaddr}:{port}")
try:
if _stream:
await _stream.close()
except Exception as e:
log(10, f"Error when closing stream | {type(e)} - {e}")
finally:
continue
reading = True
while reading and _stream:
try:
if not page_requested:
await _stream.write_all(REQUEST_PAGE)
page_requested = True
resp = await _stream.reader.readline()
except Exception as e:
log(10, f"Exception when interacting with stream | {type(e)} - {e}")
reading = False
finally:
PROXY_QUEUE.task_done()
async def begin():
_loop = get_running_loop()
for _ in range(WORKERS):
TASKS.append(_loop.create_task(proxy_check()))
for proxy in PROXIES:
await PROXY_QUEUE.put(proxy)
CLOSING.set()
await gather(*TASKS)
save_proxies()
loop.stop()
try:
loop.run_until_complete(begin())
except KeyboardInterrupt:
print("KeyboardInterrupt Received, closing event loop early.")
save_proxies()
finally:
loop.close()