Привет.
Сижу на больничном, решил обломать ботов долбящих ssh на моем сервачке.
Оцените кому не лень пару скриптов на питоне, может пользу принесут.Сервер:
Запускается, проверяет наличие открытого ключа, если его нет просит ввести и пишет в базу, слушает порт 9999.
Тем кто подключается выдает строку 512 байт и ждет в ответ её escda подпись.
Проверяет подпись, если верна создает отсутствующий файл /tmp/_Good_check_ или удаляет его если он существует.
Если подпись не верна или передан мусор рвет соединение.
Клиент:
Содает пару ключей, пишет в базу закрытый, открытый показывает для ввода на сервере.
подключается к серверу, получает строку, подписывает, отправляет обратно.
Смысл этого в том чтобы на наличие или отсутствие файла /tmp/_Good_check_ повесить задание в кроне,
например файл существует - запускаем sshd, файла нет - выключаем sshd, или правила файрвола менять.
Скрипт не требует привилегий можно обмазать firejail-ом или чем-то подобным, протестил на скармливание больших данных, мусора, большую нагрузку.
#==========================================>server:
#!/usr/bin/env python3
import sqlite3
import socket
import os
import tempfile
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
def init_db():
conn = sqlite3.connect('server.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS server_config (id INTEGER PRIMARY KEY, public_key BLOB)''')
conn.commit()
conn.close()
def get_public_key():
conn = sqlite3.connect('server.db')
c = conn.cursor()
c.execute("SELECT public_key FROM server_config LIMIT 1")
result = c.fetchone()
conn.close()
return result[0] if result else None
def main():
init_db()
public_key_bytes = get_public_key()
if public_key_bytes is None:
print("No public key found. Enter client's public key (DER hex):")
user_input = input().strip()
try:
public_key_bytes = bytes.fromhex(user_input)
if len(public_key_bytes) < 10:
raise ValueError("Invalid public key length")
conn = sqlite3.connect('server.db')
c = conn.cursor()
c.execute("INSERT INTO server_config (public_key) VALUES (?)", (public_key_bytes,))
conn.commit()
conn.close()
print("Public key stored.")
except ValueError as e:
print(f"Invalid hex input: {e}. Exiting.")
exit(1)
#ipv4
# server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# server_socket.bind(('0.0.0.0', 9999)) #ipv4
# server_socket.listen()
#ipv6
# server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# server_socket.bind(('::', 9999)) # :: - это все IPv6 адреса
# server_socket.listen()
#universal Используем AF_INET6 с IPV6_V6ONLY = 0 для совместимости
server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # Позволяет IPv4 через IPv6
server_socket.bind(('::', 9999))
server_socket.listen()
print("Server listening on port 9999...")
print("Server will reject connections if signature exceeds 80 bytes")
while True:
try:
client_socket, addr = server_socket.accept()
print(f"Connection from {addr}")
challenge = os.urandom(512)
client_socket.sendall(challenge)
# Ограничение на размер подписи до 80 байт #от 70 до 72 видел, может больше бывает, ставим 80.
signature = b''
total_received = 0
max_signature_size = 80
try:
while total_received < max_signature_size:
chunk = client_socket.recv(1024)
if not chunk:
break
signature += chunk
total_received += len(chunk)
# Проверка на превышение лимита
if total_received >= max_signature_size:
print(f"Signature size limit exceeded: {total_received} bytes")
client_socket.close()
break
except Exception as e:
print(f"Error receiving signature: {e}")
client_socket.close()
continue
# Закрываем соединение после получения данных
client_socket.close()
# Проверяем размер подписи
if total_received > max_signature_size:
print(f"Signature too large: {total_received} bytes (limit: {max_signature_size})")
continue
# Проверяем, что получили данные
if len(signature) == 0:
print("Empty signature received")
continue
print(f"Received signature: {len(signature)} bytes")
try:
public_key = serialization.load_der_public_key(public_key_bytes)
public_key.verify(signature, challenge, ec.ECDSA(hashes.SHA256()))
if os.path.isfile('/tmp/_Good_check_'):
# Файл существует - удаляем его
os.remove('/tmp/_Good_check_')
print(f"Файл /tmp/_Good_check_ удален.")
else:
# Файла нет - создаем его
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.check') as f:
f.write('Signature valid')
temp_path = f.name
os.rename(temp_path, '/tmp/_Good_check_')
print(f"Файл /tmp/_Good_check_ создан.")
print("Signature verified successfully")
except Exception as e:
print(f"Verification failed: {str(e)}")
except Exception as e:
print(f"Error handling connection: {e}")
continue
if __name__ == '__main__':
main()
#============================================>client:
#!/usr/bin/env python3
import sqlite3
import socket
import os
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
def init_db():
conn = sqlite3.connect('client.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS client_keys (id INTEGER PRIMARY KEY, private_key BLOB, public_key BLOB)''')
conn.commit()
conn.close()
def get_client_keys():
conn = sqlite3.connect('client.db')
c = conn.cursor()
c.execute("SELECT private_key, public_key FROM client_keys LIMIT 1")
result = c.fetchone()
conn.close()
return result if result else (None, None)
def generate_keys():
priv_key = ec.generate_private_key(ec.SECP256K1(), backend=default_backend())
pub_key = priv_key.public_key()
private_bytes = priv_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_bytes = pub_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
conn = sqlite3.connect('client.db')
c = conn.cursor()
c.execute("INSERT INTO client_keys (private_key, public_key) VALUES (?, ?)", (private_bytes, public_bytes))
conn.commit()
conn.close()
print(f"Public key (DER hex): {public_bytes.hex()}")
def main():
init_db()
private_key_bytes, public_key_bytes = get_client_keys()
if private_key_bytes is None:
generate_keys()
private_key_bytes, public_key_bytes = get_client_keys()
try:
private_key = serialization.load_der_private_key(
private_key_bytes,
password=None,
backend=default_backend()
)
except Exception as e:
print(f"Key load failed: {str(e)}")
exit(1)
#ipv4
# client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# client_socket.connect(('localhost', 9999))
#ipv6
# client_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
# client_socket.connect(('x:x:x:x:x:x:x:x', 9999))
#universal
addr_info = socket.getaddrinfo('127.0.0.1', 9999, socket.AF_UNSPEC, socket.SOCK_STREAM)
client_socket = socket.socket(addr_info[0][0], addr_info[0][1])
client_socket.connect(addr_info[0][4])
challenge = client_socket.recv(4096)
if not challenge:
print("No challenge received.")
exit(1)
try:
signature = private_key.sign(challenge, ec.ECDSA(hashes.SHA256()))
except Exception as e:
print(f"Signing failed: {str(e)}")
exit(1)
client_socket.sendall(signature) #sig
# client_socket.sendall(challenge) #trash
client_socket.close()
sig_len=len(signature)
print("Signature sent. Sig_len:", sig_len)
# chall = challenge.hex()
# print("c:",chall)
# sig = signature.hex()
# print("s:",sig)
if __name__ == '__main__':
main()