实例探究Python以并发方式编写高性能端口扫描器的方法

yipeiwu_com5年前Python基础

关于端口扫描器
端口扫描工具(Port Scanner)指用于探测服务器或主机开放端口情况的工具。常被计算机管理员用于确认安全策略,同时被攻击者用于识别目标主机上的可运作的网络服务。

端口扫描定义是客户端向一定范围的服务器端口发送对应请求,以此确认可使用的端口。虽然其本身并不是恶意的网络活动,但也是网络攻击者探测目标主机服务,以利用该服务的已知漏洞的重要手段。端口扫描的主要用途仍然只是确认远程机器某个服务的可用性。

扫描多个主机以获取特定的某个端口被称为端口清扫(Portsweep),以此获取特定的服务。例如,基于SQL服务的计算机蠕虫就会清扫大量主机的同一端口以在 1433 端口上建立TCP连接。

Python实现

端口扫描器原理很简单,无非就是操作socket,能connect就认定这个端口开放着。

import socket 
def scan(port): 
  s = socket.socket() 
  if s.connect_ex(('localhost', port)) == 0: 
    print port, 'open' 
  s.close() 
if __name__ == '__main__': 
  map(scan,range(1,65536)) 

这样一个最简单的端口扫描器出来了。
等等喂,半天都没反应,那是因为socket是阻塞的,每次连接要等很久才超时。
我们自己给它加上的超时。

s.settimeout(0.1)

再跑一遍,感觉快多了。

多线程版本

import socket 
import threading 
def scan(port): 
  s = socket.socket() 
  s.settimeout(0.1) 
  if s.connect_ex(('localhost', port)) == 0: 
    print port, 'open' 
  s.close() 
 
if __name__ == '__main__': 
  threads = [threading.Thread(target=scan, args=(i,)) for i in xrange(1,65536)] 
  map(lambda x:x.start(),threads) 

运行一下,哇,好快,快到抛出错误了。thread.error: can't start new thread。
想一下,这个进程开启了65535个线程,有两种可能,一种是超过最大线程数了,一种是超过最大socket句柄数了。在linux可以通过ulimit来修改。
如果不修改最大限制,怎么用多线程不报错呢?
加个queue,变成生产者-消费者模式,开固定线程。

多线程+队列版本

import socket 
import threading 
from Queue import Queue 
def scan(port): 
  s = socket.socket() 
  s.settimeout(0.1) 
  if s.connect_ex(('localhost', port)) == 0: 
    print port, 'open' 
  s.close() 
 
def worker(): 
  while not q.empty(): 
    port = q.get() 
    try: 
      scan(port) 
    finally: 
      q.task_done() 
 
if __name__ == '__main__': 
  q = Queue() 
  map(q.put,xrange(1,65535)) 
  threads = [threading.Thread(target=worker) for i in xrange(500)] 
  map(lambda x:x.start(),threads) 
  q.join() 

这里开500个线程,不停的从队列取任务来做。

multiprocessing+队列版本
总不能开65535个进程吧?还是用生产者消费者模式

import multiprocessing 
def scan(port): 
  s = socket.socket() 
  s.settimeout(0.1) 
  if s.connect_ex(('localhost', port)) == 0: 
    print port, 'open' 
  s.close() 
 
def worker(q): 
  while not q.empty(): 
    port = q.get() 
    try: 
      scan(port) 
    finally: 
      q.task_done() 
 
if __name__ == '__main__': 
  q = multiprocessing.JoinableQueue() 
  map(q.put,xrange(1,65535)) 
  jobs = [multiprocessing.Process(target=worker, args=(q,)) for i in xrange(100)] 
  map(lambda x:x.start(),jobs) 

注意这里把队列作为一个参数传入到worker中去,因为是process safe的queue,不然会报错。
还有用的是JoinableQueue(),顾名思义就是可以join()的。

gevent的spawn版本

from gevent import monkey; monkey.patch_all(); 
import gevent 
import socket 
... 
if __name__ == '__main__': 
  threads = [gevent.spawn(scan, i) for i in xrange(1,65536)] 
  gevent.joinall(threads) 

注意monkey patch必须在被patch的东西之前import,不然会Exception KeyError.比如不能先import threading,再monkey patch.

gevent的Pool版本

from gevent import monkey; monkey.patch_all(); 
import socket 
from gevent.pool import Pool 
... 
if __name__ == '__main__': 
  pool = Pool(500) 
  pool.map(scan,xrange(1,65536)) 
  pool.join() 

concurrent.futures版本

import socket 
from Queue import Queue 
from concurrent.futures import ThreadPoolExecutor 
... 
if __name__ == '__main__': 
  q = Queue() 
  map(q.put,xrange(1,65536)) 
  with ThreadPoolExecutor(max_workers=500) as executor: 
    for i in range(500): 
      executor.submit(worker,q) 

相关文章

python 数据提取及拆分的实现代码

python 数据提取及拆分的实现代码

K线数据提取 依据原有数据集格式,按要求生成新表: 1、每分钟的close数据的第一条、最后一条、最大值及最小值, 2、每分钟vol数据的增长量(每分钟vol的最后一条数据减第一条数据...

使用python和Django完成博客数据库的迁移方法

使用python和Django完成博客数据库的迁移方法

上一讲完成了基本博客的配置和项目工程的生成。这次开始将博客一些基本的操作主要是数据库方面学习。 1.设计博客数据库表结构 博客最主要的功能就是展示我们写的文章,它需要从某个地方获取博客文...

python统计指定目录内文件的代码行数

python统计指定目录内文件的代码行数

python统计指定目录内文件的代码行数,程序实现统计指定目录内各个python文件的代码总行数,注释行数,空行数,并算出所占百分比 这符合一些公司的小需求,实际代码量的统计工作 效果如...

python连接数据库的方法

python连接数据库的方法

MYSQL模块暂时还不支持python3.0以上的版本,由于我下载的python是3.0版本的,所以想要连接数据库只能利用其它的方法。 Python3.x连接MySQL的方案有:ours...

分享一个pycharm专业版安装的永久使用方法

分享一个pycharm专业版安装的永久使用方法

刚开始接触Python,首先要解决的就是Python开发环境的搭建。 目前比较好用的Python开发工具是PyCharm,他有社区办和专业版两个版本,但是社区版支持有限,我们既然想好好学...