## Copyright (c) 2011-2023, Art Obrezan. ## All rights reserved. See the license at the end of the file. import hashlib import socket import struct def test(): with socket.create_connection(('127.0.0.1', 3306)) as connection: packet = read_packet(connection) if not is_error_packet(packet): scramble = handshake_scramble(packet) packet = auth_packet(scramble, 'username', 'password', 'mysql') write_packet(connection, packet, 1) packet = read_packet(connection) # query res = doquery(connection, 'show tables') return res ##----------------------------------------------------------------------------- ## HANDSHAKE PARSING ##----------------------------------------------------------------------------- def handshake_protocol(packet): return (packet[0]) def handshake_scramble(packet): pos = packet.find(0) scramble = bytearray(20) for i in range(8): scramble[i] = packet[pos+5+i] for i in range(12): scramble[i+8] = packet[pos+32+i] return bytes(scramble) def handshake_version(packet): pos = packet.find(0) return str(packet[1:pos], encoding='latin1') def handshake_threadid(packet): pos = packet.find(0) return (packet[pos+1] + packet[pos+2]*0xFF + packet[pos+3]*0xFFFF + packet[pos+4]*0xFFFFFF) def handshake_capabilities(packet): pos = packet.find(0) return (packet[pos+14] + packet[pos+15]*0xFF) def handshake_language(packet): pos = packet.find(0) return (packet[pos+16]) def handshake_status(packet): pos = packet.find(0) return (packet[pos+17] + packet[pos+18]*0xFF) ##----------------------------------------------------------------------------- TOKEN_LEN = 20 MAX_PACKET_SIZE = 1024 * 1024 # in bytes UTF8_GENERAL_CI = 33 def auth_packet(scramble, user, password=None, database=None): buf_user = bytes(user, encoding='latin1') + b'\0' buf_database = b'' if database: buf_database = bytes(database, encoding='latin1') + b'\0' buf_auth = b'\0' if password: token = password_to_token(password, scramble) buf_auth = bytes(bytearray([TOKEN_LEN]) + token) flags = client_capabilities() if database: flags = flags + 0x08 # client-connect-with-db buf_flags = struct.pack('= packet_length: break return row def nth_field_entry(n, buf): pos = 0 len = 0 offset = 0 for i in n: pos = pos + offset + len len, offset = decode_length_coded_binary(buf, pos) pos = pos + offset return (pos, len) def send_query(connection, query): COM_QUERY = b'\3' data = COM_QUERY + bytes(query, encoding='utf8') packet_number = 0 write_packet(connection, data, packet_number) ##----------------------------------------------------------------------------- ## LENGTH CODED BINARY ##----------------------------------------------------------------------------- def decode_length_coded_binary(buf, pos): len = None val = buf[pos] if 251 > val: len = (val, 1) elif 251 == val: len = (-1, 1) elif 252 == val: int16 = decode_int16(buf, pos) len = (int16, 3) elif 253 == val: int24 = decode_int24(buf, pos) len = (int24, 4) elif 254 == val: int64 = decode_int64(buf, pos) len = (int64, 9) return len def decode_int16(buf, pos): data = (buf[pos+1] + buf[pos+2]*0xFF) return data def decode_int24(buf, pos): data = (buf[pos+1] + buf[pos+2]*0xFF + buf[pos+3]*0xFFFF) return data def decode_int64(buf, pos): data = (buf[pos+1] + buf[pos+2]*0xFF + buf[pos+3]*0xFFFF + buf[pos+4]*0xFFFFFF + buf[pos+5]*0XFFFFFFFF + buf[pos+6]*0xFFFFFFFFFF + buf[pos+7]*0XFFFFFFFFFFFF + buf[pos+8]*0XFFFFFFFFFFFFFF) return data ##----------------------------------------------------------------------------- ## PACKET READING/WRITING/CHECKING ##----------------------------------------------------------------------------- def read_packet(connection): len, num = read_packet_header(connection) buf = connection.recv(len) return buf def read_packet_header(connection): buf = connection.recv(4) len = buf[0] + buf[1]*0xFF + buf[2]*0xFFFF num = buf[3] return len, num def write_packet(connection, data, packet_number): write_packet_header(connection, data, packet_number) connection.sendall(data) def write_packet_header(connection, data, packet_number): len4bytes = struct.pack('