首页 > 代码库 > python黑帽编程笔记

python黑帽编程笔记

在书中已知通过向主机不常用端口发送UDP数据包,通过是否有不可达的信息返回包来判断主机是否存活。 等待ICMP响应

 

Windows和Linux上的包嗅探

 

import socket
import os
# 监听的主机
host = "172.16.0.143"

# 创建原始套接字,然后绑定在公开接口上
if os.name == "nt":       #windows os.name =nt     windows允许我们嗅探所有协议的所有数据包 LINUX智能嗅探ICMP
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# 设置在捕获的数据包中包含IP头
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# 如果在windows下 需要设置IOCTL以启用混杂模式
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

# 读取一个单独的数据包
print sniffer.recvfrom(65565)

# 在windows下关闭混杂模式
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

运行脚本 ping一下 可以抓到相应ICMP数据包

 

技术分享

 

在此模式下 嗅探器可以接收任何高层协议  如TCP、UDP或ICMP的所有的IP头信息。这些信息被打包成二进制数的形式

 如上图。

接下来的一个脚本 是来解析这些信息 并把IP头的部分 提取并翻译成人类能懂的形式。

# -*- coding: UTF-8 -*-
import socket
import os
import struct
from ctypes import *

# 监听的端口
host   = "172.16.0.143"

class IP(Structure):
    _fields_ = [
        ("ihl",           c_ubyte, 4),
        ("version",       c_ubyte, 4),
        ("tos",           c_ubyte),
        ("len",           c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",           c_ubyte),
        ("protocol_num",  c_ubyte),
        ("sum",           c_ushort),
        ("src",           c_uint),
        ("dst",           c_uint)
    ]  #定义一个dic

    def __new__(self, socket_buffer=None):    #传递一下
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # 协议字段与协议名称对应
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}     #定义一个字典  分析数据里的数字 对照对应的协议

        # 可读性更强的IP地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))  # 把一堆数据 按照人类可以理解的IP地址去转换  源地址IP
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst)) #目标地址IP

        #协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num] #把数字转换成名字
        except:
            self.protocol = str(self.protocol_num)

# 跟上一段代码类似
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# we want the IP headers included in the capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# if we‘re on Windows we need to send some ioctls
# to setup promiscuous mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:
    while True:

        # read in a single packet
        raw_buffer = sniffer.recvfrom(65565)[0]

        # 将缓冲区的前20个字节按IP头进行解析
        ip_header = IP(raw_buffer[0:20])
        #输出协议和通信双方的IP地址
        print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

except KeyboardInterrupt:
    # if we‘re on Windows turn off promiscuous mode
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

解码ICMP

有了上一段代码我们就能嗅探解码任何数据的IP层,因为发送UDP数据到关闭的端口时会产生ICMP相应,所以我们还需要对ICMP数据进行解码。ICMP内容中包含的信息非常繁杂,但每条信息都包含三个固定的字段

所以我们要解码ICMP  

我们扫描器的目标是查找类型值位3,代码值也位3的ICMP数据包。这种ICMP的响应数据意味着目标不可达,而代码值为3是由于目标主机产生了端口不可达的错误

 

技术分享

 

添加代码实现ICMP数据包的解码功能。

import socket
import os
import struct
import threading

from ctypes import *

# host to listen on
host   = "172.16.0.50"

class IP(Structure):

    _fields_ = [
        ("ihl",           c_ubyte, 4),
        ("version",       c_ubyte, 4),
        ("tos",           c_ubyte),
        ("len",           c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",           c_ubyte),
        ("protocol_num",  c_ubyte),
        ("sum",           c_ushort),
        ("src",           c_uint),
        ("dst",           c_uint)
    ]

    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # map protocol constants to their names
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # human readable IP addresses
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        # human readable protocol
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)



class ICMP(Structure):

    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]

    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        pass

# create a raw socket and bind it to the public interface
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# we want the IP headers included in the capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# if we‘re on Windows we need to send some ioctls
# to setup promiscuous mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)



try:
    while True:

        # read in a single packet
        raw_buffer = sniffer.recvfrom(65565)[0]

        # create an IP header from the first 20 bytes of the buffer
        ip_header = IP(raw_buffer[0:20])

        print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

        # if it‘s ICMP we want it
        if ip_header.protocol == "ICMP":

            # calculate where our ICMP packet starts
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            # create our ICMP structure50
            icmp_header = ICMP(buf)

            print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

# handle CTRL-C
except KeyboardInterrupt:
    # if we‘re on Windows turn off promiscuous mode
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
import socket
import os
import struct
import threading

from netaddr import IPNetwork,IPAddress
from ctypes import *

# host to listen on
host   = "172.16.0.36"

# subnet to target
subnet = "172.16.0.1/24"  #扫描的目标子网

# 自定义的字符串 ,我们将在ICMP响应中进行核对。
magic_message = "PYTHONRULES!"

def udp_sender(subnet,magic_message):
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # 批量发送UDP数据包

    for ip in IPNetwork(subnet):
        try:
            sender.sendto(magic_message,("%s" % ip,65212))
        except:
            pass


class IP(Structure):

    _fields_ = [
        ("ihl",           c_ubyte, 4),
        ("version",       c_ubyte, 4),
        ("tos",           c_ubyte),
        ("len",           c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",           c_ubyte),
        ("protocol_num",  c_ubyte),
        ("sum",           c_ushort),
        ("src",           c_uint32),
        ("dst",           c_uint32)
    ]

    def __new__(self, socket_buffer=None):
            return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):

        # map protocol constants to their names
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # human readable IP addresses
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))

        # human readable protocol
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)



class ICMP(Structure):

    _fields_ = [
        ("type",         c_ubyte),
        ("code",         c_ubyte),
        ("checksum",     c_ushort),
        ("unused",       c_ushort),
        ("next_hop_mtu", c_ushort)
        ]

    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer):
        pass

# create a raw socket and bind it to the public interface
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

# we want the IP headers included in the capture
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

# if we‘re on Windows we need to send some ioctls
# to setup promiscuous mode
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)


# start sending packets
t = threading.Thread(target=udp_sender,args=(subnet,magic_message))
t.start()

try:
    i=1
    while True:
        # read in a single packet
        raw_buffer = sniffer.recvfrom(65565)[0]

        # create an IP header from the first 20 bytes of the buffer
        ip_header = IP(raw_buffer[0:20])

        #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address)

        # if it‘s ICMP we want it
        if ip_header.protocol == "ICMP":

            # calculate where our ICMP packet starts
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            # create our ICMP structure
            icmp_header = ICMP(buf)

            #print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code)

            # now check for the TYPE 3 and CODE 3 which indicates
            # a host is up but no port available to talk to
            if icmp_header.code == 3 and icmp_header.type == 3:
                # check to make sure we are receiving the response
                # that lands in our subnet
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):

                    # test for our magic message
                    if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message:
                        print "%d Host Up: %s" % (i,ip_header.src_address)
# handle CTRL-C
except KeyboardInterrupt:
    # if we‘re on Windows turn off promiscuous mode
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

 

python黑帽编程笔记