yehey's 공부 노트 \n ο(=•ω<=)ρ⌒☆

포트 스캐너 (스레드, Nmap, optparse) 본문

개발/Python

포트 스캐너 (스레드, Nmap, optparse)

yehey 2020. 10. 11. 14:34
#기본적인 TCP 포트 스캔
 
import socket
 
ip = "192.168.245.131"  #취약한 서버 IP 주소
portNums = list(range(1023))

with open("portScanResult.txt",'w') as f:
    for portNum in portNums:
        try:
            with socket.socket() as s:
                s.settimeout(2)
                print("[*] try to connect to {}:{}".format(ip, portNum))
                s.connect((ip, portNum))
                s.send("Python Connect\n".encode())
                banner = s.recv(1024) 
                if banner:
                    f.write("[+] {} port is opened: {}\n".format(portNum, banner.decode()[:20]))
                else :
                    f.write("[+] {} port is opened: None\n".format(portNum))
                    
        except Exception as e:
            if str(e) == "timed out":
                f.write("[+] {} port is opened: {}\n".format(portNum, str(e)))
            else : print(e)
 
# Result
with open("portScanResult.txt",'r') as f:
    print("\n\n\n########## the result ##########\n")
    print(f.read())
    print(">> the result in portScanResult.txt")
#UDP 스캔

import socket

port = 3702
addr = ("127.0.0.1",port) # DNS
socket.setdefaulttimeout(2)
#UDP
#소켓 오픈 : 응답이 없음 (반응없음)
#소켓 닫힘 : 응답이 있음 (소켓이 닫힘을 알림) 
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
    try :
        s.sendto("Data".encode(),addr) # 데이터 전송
        s.recvfrom(1024)
        print("[+]{} udp port is opened".format(port))
    except Exception as e:
        print(e)
        if str(e) == "timed out":#소켓이 열려있지만 응답이 없는 경우, 방화벽이 활성화되어 있는 경우
            print("[+]{} udp port is opened".format(port))
        else :#다른 에러 발생 시 소켓 닫힘으로 간주
            print("[-]{} udp port is closed".format(port))

 

Thread: 

  • 프로그램에서 수행하는 명령의 흐름을 동시에 처리하도록 할 수 있음
  • 일반적인 프로세스의 경우 , 프로그램 명령이 단일적 순차적으로 처리되나 , 스레드를 활용하면 병렬적으로 명령을 처리할 수 있어 빠른 작업이 가능함
  • 하지만 작업을 수행하는 도중 , 동시에 특정 메모리에 접근하게 되면 문제를 일으킬 수 있음
  • 여러 개의 스레드 중 하나라도 잘못된 연산을 하게 되면 문제를 일으킬 수 있음
    디버깅 작업이 어려움

Semaphore:

  • 특정 작업을 하는 동안에 락 ( 을 걸어 임계구역을 설정하여 다른 스레드가 작업을 하지 않도록 제어함
  • CPU 가 스레드를 돌아가며 실행하기 때문에 결과를 연속해서 출력하게 함
  • 임계구역 설정으로 스레드의 수를 제어할 수 있음
  • 자원의 원자성을 보호할 수 있음
  • 언락 ( 을 실행하면 다시 원래대로 동작함
#Thread를 활용한 port 스캐너

import threading               # 스레드 관련 라이브러리
import socket                  # 소켓 관련 라이브러리
import time                    # 시간 라이브러리
 
resultLock = threading.Semaphore(value=1)    # 결과 출력을 제어할 세마포어
maxConnection = 100             # 스레드 개수를 제어할 세마포어
connection_lock = threading.BoundedSemaphore(value=maxConnection) 
port_result = {}    # 결과를 저장하는 dict
 
# 스레드 함수
def scanPort(tgtHost, portNum):
    try:                        # 접속 시도
        with socket.socket() as s:
            data = None
            
            s.settimeout(2)
            s.connect((tgtHost, portNum))
            s.send("Python Connect\n".encode())

            data = s.recv(1024).decode()
            
    except Exception as e :
        if str(e) == "timed out":
                data = str(e)
        else : data = 'error'
    finally:
        if data is None:
            data = "no_data"
        elif data == 'error':
            connection_lock.release() # 스레드 세마포어 해제
            return
        resultLock.acquire()# 출력 세마포어 설정
        print("[+] Port {} openend: {}".format(portNum, data[:20]).strip())
        resultLock.release()# 출력 세마포어 해제
        port_result[portNum]= data
        connection_lock.release() # 스레드 세마포어 해제
 
# 메인 함수
def main():
    tgtHost = "192.168.245.131" # 스캔 대상 tgtHost

    for portNum in range(1024): # 반복문 수행 0~1024 포트
        connection_lock.acquire()
        t = threading.Thread(target=scanPort, args =(tgtHost, portNum)) # 쓰레드 초기화
        t.start() # 스레드 실행
    time.sleep(5)

    print(port_result)
            
 
    # csv 파일 저장
    with open("portScanResult.csv",'w') as f: # 결과를 저장할 csv 파일 열기 (excel파일)
        f.write("portNum, banner\n") # 컬럼 쓰기
        for p in sorted(port_result.keys()):       
            f.write("{}, {}".format(p, port_result[p]))

    # 결과 출력
    print("\n\n\n+++++++++++ the result +++++++++++")
    print('portNum' + '\t' + 'banner')
    for p in sorted(port_result.keys()):       
        print("{} \t {}".format(p, port_result[p][:20].strip()))
    print(">> the result in portScanResult.csv")
 
if __name__ == "__main__":
    startTime = time.time()
    main()
    endTime = time.time()
    print("exceuted Time:", (endTime-startTime))

 

Nmap:

  • 포트 스캐너 공개 소프트웨어
  • 스텔스 기능과 os 정보, 방화벽 필터링 확인 등의 기능을 제공

주요 옵션

옵션 설명
-sT SYN 오픈 스캔
-sS SYN 하프 오픈 스캔
-sX X-mas 스캔
-sN NULL 스캔
-sU UDP 스캔

 

#Nmap을 사용한 포트 스캐너(결과를 실행화면에 출력)

import nmap

nm = nmap.PortScanner()
result = nm.scan(hosts="127.0.0.1",ports='20-443', arguments='-sV')

for host in nm.all_hosts():
    print('-------------------------------------------------')
    print('Host : %s (%s)' %(host, nm[host].hostnames()))
    print('State : %s' % nm[host].state())
    proto = 'tcp'
    print('--------')
    print('Protocol : %s' %proto)
    lport = nm[host][proto].keys()
    for port in sorted(lport):
        print ('port : %s\tstate : %s' % (port,str(nm[host][proto][port]['state'])))

#Namp을 활용한 포트스캐너 (결과를 csv 파일에 저장)

import nmap

nm = nmap.PortScanner()
result = nm.scan(hosts="127.0.0.1",ports='20-443', arguments='-sV')

print(nm.csv())

with open('result.csv', 'w') as f:
    f.writelines(nm.csv().replace(';',','))
    

 

Optparse:

  • 기본적으로 제공되는 python 라이브러리
  • Argument 를 효과적으로 전달함
  • 대부분의 프로그램에서 사용하는 옵션이 가능함
  • 제작한 툴을 오픈소스화 가능함 github ,
  • 동적인 방법으로 기능 제공
#Optparse를 활용한 포트 스캐너 (csv 파일에 결과 출력)

#! python
# -*- coding: utf-8 -*-
# PortScanProgram with nmap Library
 
import nmap
import time
import datetime
import os.path
import os
import optparse
 
#optparse
parser = optparse.OptionParser('usage PortScanner -t <tgtHost> -p <tgtPort> -n <nmapOption>')
parser.add_option('-t', dest='tgtHost', type='string', help='must specify target Host')
parser.add_option('-p', dest='tgtPort', type='string', help='must specify target Port')
parser.add_option('-n', dest='nmapOption', type='string', help='specify nmapOption')
(option, args) = parser.parse_args()
 
if (option.tgtHost == None) | (option.tgtPort == None):
    print(parser.usage)
    exit(0) 
 
# 전역 변수 초기화
startTime = time.time()                # 시작 시간 기록
nm = nmap.PortScanner()                # nmap 객체 생성
hostList = option.tgtHost.split(',')                # 호스트 리스트 작성
portList = option.tgtPort              # 포트 리스트 작성
if option.nmapOption:
        myArg = option.nmapOption      # 옵션 설정
 
# 폴더 생성        
fileList = [] # 파일 이름 저장할 리스트 생성
dirName = 'scan_result'  # 폴더 이름 지정
if not os.path.exists('./' + dirName): # 폴더 없을 시 폴더 생성
    os.system('mkdir ' + dirName)
    print(dirName + " directory is made\n")
else: # 폴더 존재 시 폴더 생성 안 함
    print(dirName + " directory exists already\n")
if not os.path.isdir(dirName): # 해당 파일이 폴더가 아닐 경우 오류 발생
        print("Err: Same name file exists with directory name")
        exit()  # 프로그램 종료
 
# 스캔 시작
for host in hostList:
        host = host.strip() # 사용자가 공백을 입력했을 경우를 위해 스트립함
        print("scan {}:{}\nOptions: {}".format(host,portList,myArg))
        result = nm.scan(hosts=host,
                         ports=portList,
                         arguments=myArg) # 스캔 수행
        
        ntime = str(datetime.datetime.now()).replace(':','_') # 날짜 기록
        ntime = ntime.replace(' ','_')
        filename=ntime+'_'+host+'_result.csv' # 파일 이름 생성

        with open('./'+dirName+'/'+filename,'w') as f: # 파일 열기
            try: f.write("os info : " + (result['scan'][host]['osmatch'][0]['name']) + '\n')
            except : f.write("os info : unknown\n")
            f.write(nm.csv().replace(';',',')) # ms-csv형식으로 쓰기
        fileList.append(filename) # 파일 리스트에 목록 추가
 
# 결과 출력 : [2) python-nmap 체험하기]에서 사용한 코드를 그대로 사용하였다.
        for host in nm.all_hosts():
                print('\n\n------------------------result-------------------------')
                print('Host : %s ( %s)' %(host, nm[host].hostname()))
                print('State : %s' % nm[host].state())
                try:                            # OS 정보 출력
                        print('OS : '+ result['scan'][host]['osmatch'][0]['name'] + '\n')
                except:
                        print('OS : unknown\n')
                proto = 'tcp'
                print('--------')
                print('Protocol : %s' %proto)
                lport = nm[host][proto].keys()
                sorted(lport)
                for port in lport:
                        print('port : %s\tstate : %s' % (port,
                                str(nm[host][proto][port]['state'])))
                print("\n\n----------------------------------------------------------")
 
print("It's Finished!!")
 
endTime = time.time() # 종료 시간 기록
 
print("\n\n----------------------------------------------------------")
print("executed Time : " + str(endTime - startTime)) # 실행 시간 출력
 
print("\n\n>>>>>>>>>>> please check your result files")
print("This is your path:\n\t" + os.path.realpath(dirName) + '\n')
for fileName in fileList: # 생성한 파일 목록 출력
        print(fileName)

 

Comments