import os import platform import socket import subprocess import tempfile import warnings import requests import wmi import xmltodict import json from cryptography.fernet import Fernet # 去掉warnings warnings.filterwarnings("ignore", message="Unverified HTTPS request is being made to host") base_url = 'https://www.abc.com' configuration_url = f'{base_url}/logon/LogonPoint/Home/Configuration' get_ecdetails_url = f'{base_url}/nf/auth/getECdetails' list_url = f'{base_url}/logon/LogonPoint/Resources/List' resource_list_url = f'{base_url}/cgi/resources/list' get_auth_methods_url = f'{base_url}/cgi/GetAuthMethods' get_authentication_requirements_url = f'{base_url}/nf/auth/getAuthenticationRequirements.do' get_homepage_config_url = f'{base_url}/cgi/getHomepageConfig' personal_bookmark_pl_url = f'{base_url}/vpns/portal/scripts/PersonalBookmark.pl' cvpnize_url = f'{base_url}/cgi/cvpnizeUrl' # print("-------------", os.path) user_info_file = tempfile.gettempdir() + '/user_info.json' rdp_file = tempfile.gettempdir() + '/remote_desktop.rdp' # 提供合法的证书文件路径 cert_file = os.getcwd() + '/abc.pem' cert_file_cer = os.getcwd() + '/abc.cer' nsc_tmas = 'NSC_TMAS' nsc_aaac = 'NSC_AAAC' nsc_temp = 'NSC_TEMP' cache = {} # 忽略配置 exclude_list = ['redirectclipboard', 'drivestoredirect'] # 访问https://www.abc.com/ def connect_configuration(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '0', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.post(url, headers=headers, verify=False, cert=cert_file) # print('------------------------', response.headers) return response.text, response.headers def connect_get_ecdetails(url): headers = { 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.get(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_list(url, **kwargs): headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '35', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } if 'Cookie' in kwargs.keys(): headers['Cookie'] = kwargs.get('Cookie') # print('connect_list.headers', headers) # print(url) response = requests.post(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_auth_methods(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '0', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.post(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_authentication_requirements(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '0', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.post(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers # 开始验证 def connect_do_cert(): authentication_requirements_text = cache['authentication_requirements_text'] authenticate_response = authentication_requirements_text.get('AuthenticateResponse') # print(type(authenticate_response)) authentication_requirements = authenticate_response.get('AuthenticationRequirements') # print('authentication_requirements', authentication_requirements) post_back_url = authentication_requirements['PostBack'] # print(post_back_url) headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '0', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } response = requests.post(f'{base_url}{post_back_url}', headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_do_authentication(username, password): nsc_tmas_value = cache.get(nsc_tmas) headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '76', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Cookie': f'NSC_TMAS={nsc_tmas_value}', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } do_cert_text = cache['do_cert_text'] # print('do_cert_text', do_cert_text) authenticate_response = do_cert_text.get('AuthenticateResponse') authentication_requirements = authenticate_response.get('AuthenticationRequirements') # print('authentication_requirements', authentication_requirements) post_back_url = authentication_requirements.get('PostBack') # print(post_back_url) response = requests.post(f'{base_url}{post_back_url}', data={ 'login': username, 'passwd': password, 'domain': 'ss_corp', 'loginBtn': 'Log On', 'StateContext': None }, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_set_client(): nsc_aaac_value = cache.get(nsc_aaac) headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '76', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Cookie': f'NSC_AAAC={nsc_aaac_value}', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } do_authentication_text = cache['do_authentication_text'] authenticate_response = do_authentication_text.get('AuthenticateResponse') state_context = do_authentication_text.get('StateContext') authentication_requirements = authenticate_response.get('AuthenticationRequirements') post_back_url = authentication_requirements.get('PostBack') # print(post_back_url) response = requests.post(f'{base_url}{post_back_url}', data={ 'nsg-setclient': 'cvpn', 'StateContext': state_context }, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_get_homepage_config(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '35', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'Cookie': f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.get(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_personal_bookmark_pl(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '26', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Cookie': f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # 获得nsc_nonce get_homepage_config_text = cache.get('get_homepage_config_text') nsc_nonce = get_homepage_config_text.get('NSC_NONCE') # print('nsc_nonce', nsc_nonce) response = requests.post(url, data={ 'nsc_nonce': nsc_nonce, }, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_cvpnize_url(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '26', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Cookie': f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } response = requests.post(url, data={ 'cvpnizeUrl': {"urls:": []}, }, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def connect_resource_list(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '35', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'Cookie': f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } # print(url) response = requests.get(url, headers=headers, verify=False, cert=cert_file) return response.text, response.headers def download_remote_desktop_file(url): headers = { 'Accept': 'application/xml, text/xml, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Content-Length': '26', 'Host': 'www.abc.com', 'Origin': 'https://www.abc.com', 'Referer': 'https://www.abc.com/', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Cookie': f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}', 'sec-ch-ua': '"Microsoft Edge";v="99", "Google Chrome";v="103", "Chromium";v="103"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': 'Windows', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.5060.66 Safari/537.36', 'X-Citrix-AM-CredentialTypes': 'none, username, domain, password, newpassword, passcode, savecredentials, textcredential, webview, nsg-epa, nsg-x1, nsg-setclient, nsg-eula, nsg-tlogin, nsg-fullvpn, nsg-hidden, nsg-auth-failure, nsg-auth-success, nsg-epa-success, nsg-l20n, GoBack, nf-recaptcha, ns-dialogue, nf-gw-test, nf-poll, nsg_qrcode, nsg_manageotp, negotiate, nsg_push, nsg_push_otp, nf_sspr_rem', 'X-Citrix-AM-LabelTypes': 'none, plain, heading, information, warning, error, confirmation, image, nsg-epa, nsg-epa-failure, nsg-login-label, tlogin-failure-msg, nsg-tlogin-heading, nsg-tlogin-single-res, nsg-tlogin-multi-res, nsg-tlogin, nsg-login-heading, nsg-fullvpn, nsg-l20n, nsg-l20n-error, certauth-failure-msg, dialogue-label, nsg-change-pass-assistive-text, nsg_confirmation, nsg_kba_registration_heading, nsg_email_registration_heading, nsg_kba_validation_question, nsg_sspr_success, nf-manage-otp', 'X-Citrix-IsUsingHTTPS': 'Yes', 'X-Requested-With': 'XMLHttpRequest' } response = requests.get(url, headers=headers, verify=False, cert=cert_file) if response.status_code == 200: if os.path.exists(rdp_file): # 设置可读写执行 os.chmod(rdp_file, 0o700) os.remove(rdp_file) try: content = response.content.decode() # 修改用户分屏配置 content = find_and_replace_config(content) # print(type(content)) # 将响应内容写入文件 with open(rdp_file, "w") as f: f.write(str(content)) # 设置文件为只读属性 # os.chmod(rdp_file, 0o400) print("file download success!") except Exception as e: print(e) print('file download fail!') else: print('file download fail!') def find_and_replace_config(content): vm_setup_file = f'{os.getcwd()}/vm_setup.txt' if os.path.exists(vm_setup_file): # 读取配置 with open(vm_setup_file, 'r') as file: context = file.read() config_list = context.split('\n') for config in config_list: # 替换 if config.split(':')[0] in content: content_list = content.split('\n') for context_item in content_list: if config.split(':')[0] in context_item and config.split(':')[0] not in exclude_list: # 替换 content = content.replace(context_item, config) elif config.split(':')[0] not in exclude_list: # 直接添加 content += f'\n{config}' return content def run_rdp_file(): resource_list = get_user_resources() if len(resource_list) > 1: # 执行命令 创建新的进程 subprocess.Popen(["mstsc", rdp_file]) # 设置只读 os.chmod(rdp_file, 0o400) else: # 执行命令 当前进程执行 subprocess.run(["mstsc", rdp_file]) # 提取tmas def get_nsc_tmas(): do_cert_header = cache['do_cert_header'] set_cookie = do_cert_header.get('Set-Cookie') value_list = set_cookie.split(';') for value in value_list: if nsc_tmas in value: tmas_list = value.split('=') return tmas_list[1] return None def get_nsc_aaac(): do_authentication_header = cache['do_authentication_header'] set_cookie = do_authentication_header.get('Set-Cookie') value_list = set_cookie.split(';') for value in value_list: if nsc_aaac in value: aaac_list = value.split('=') return aaac_list[1] return None def get_nsc_temp(): do_authentication_header = cache['set_client_header'] set_cookie = do_authentication_header.get('Set-Cookie') value_list = set_cookie.split(';') for value in value_list: if nsc_temp in value: temp_list = value.split('=') return temp_list[1] return None def save_user_info_to_file(username, password): user_info = {'username': username, 'password': password, 'login_ip': get_local_ip(), 'login_cpu_serial': get_cpu_serial()} with open(user_info_file, 'wb') as file: json_str = json.dumps(user_info) encoded_string = encrypt_message(json_str.encode('utf-8'), generate_key()) file.write(encoded_string) def get_user_info(): try: if os.path.exists(user_info_file): with open(user_info_file, 'rb') as file: encoded_string = file.read() # 一次性读取整个文件内容,并去除空白字符 decoded_string = decrypt_message(encoded_string, generate_key()) user_info = json.loads(decoded_string) # 解析JSON字符串 # 获得用户登录ip login_ip = user_info.get('login_ip') login_cpu_serial = user_info.get('login_cpu_serial') # 用户电脑CPU序列号 cpu_serial = get_cpu_serial() local_ip = get_local_ip() if login_ip and login_ip == local_ip and cpu_serial and cpu_serial == login_cpu_serial: return user_info except Exception as e: print(e) os.remove(user_info_file) return None # 获得本地ip def get_local_ip(): try: # 获取主机名 hostname = socket.gethostname() # 通过主机名获取本地 IP 地址 local_ip_address = socket.gethostbyname(hostname) return local_ip_address except Exception as e: print("Error:", e) return None def get_cpu_serial(): try: # 连接到 Windows 管理信息 c = wmi.WMI() # 获取处理器信息 for processor in c.Win32_Processor(): cpu_serial = processor.ProcessorId.strip() return cpu_serial except Exception as e: print("Error:", e) return None def get_current_system(): system = platform.system() return system def json_to_dict(json_str): return json.loads(json_str) def xml_to_dict(xml_str): return xmltodict.parse(xml_str) def get_user_resources(): resource_list_dict = cache.get('resource_list_text') resource_user_list = resource_list_dict.get('resources') return resource_user_list def generate_key(): # 生成一个随机的密钥 # key = Fernet.generate_key() # return key return b'xxxxxxxxxxx' def encrypt_message(message, key): # 使用密钥初始化 Fernet 对象 cipher_suite = Fernet(key) # 对消息进行加密 encrypted_message = cipher_suite.encrypt(message) return encrypted_message def decrypt_message(encrypted_message, key): # 使用密钥初始化 Fernet 对象 cipher_suite = Fernet(key) # 对加密的消息进行解密 decrypted_message = cipher_suite.decrypt(encrypted_message).decode() return decrypted_message def auto_login_vm(): while True: try: print('auto login running...') configuration_text_and_header = connect_configuration(configuration_url) print('connect_configuration finish...') configuration_text = xml_to_dict(configuration_text_and_header[0]) configuration_header = configuration_text_and_header[1] # print(configuration_text_and_header) # call configuration cache['configuration_text'] = configuration_text cache['configuration_header'] = configuration_header ecdetails_text_and_header = connect_get_ecdetails(get_ecdetails_url) print('connect_get_ecdetails finish...') # print(ecdetails_text_and_header) # call ecdetails cache['ecdetails_text'] = json_to_dict(ecdetails_text_and_header[0]) cache['ecdetails_header'] = ecdetails_text_and_header[1] list_text_and_header = connect_list(list_url) print('connect_list finish...') # print(list_text_and_header) # call list cache['list_text'] = json_to_dict(list_text_and_header[0]) cache['list_header'] = list_text_and_header[1] auth_method_text_and_header = connect_auth_methods(get_auth_methods_url) print('connect_auth_methods finish...') # print(auth_method_text_and_header) # call auth method cache['auth_text'] = xml_to_dict(auth_method_text_and_header[0]) cache['auth_header'] = auth_method_text_and_header[1] authentication_requirements_text_and_header = connect_authentication_requirements( get_authentication_requirements_url) print('connect_authentication_requirements finish...') # print(authentication_requirements_text_and_header) # call authentication requirements cache['authentication_requirements_text'] = xml_to_dict(authentication_requirements_text_and_header[0]) cache['authentication_requirements_header'] = authentication_requirements_text_and_header[1] # 开始验证工作 do_cert_text_and_header = connect_do_cert() print('connect_do_cert finish...') # call do cert cache['do_cert_text'] = xml_to_dict(do_cert_text_and_header[0]) cache['do_cert_header'] = do_cert_text_and_header[1] # 提取NSC_TMAS tmas_value = get_nsc_tmas() # print(tmas_value) cache[nsc_tmas] = tmas_value # 判断是否有配置文件 user_info = get_user_info() if user_info is None: username = input('please enter corp id:(e.g SGE080):') password = input('please enter login password:') else: username = user_info.get('username') password = user_info.get('password') if (username is None or len(username.strip()) == 0) or (password is None or len(password.strip()) == 0): continue is_save_user_info = None do_authentication_text_and_header = connect_do_authentication(username, password) print('connect_do_authentication finish...') cache['do_authentication_text'] = xml_to_dict(do_authentication_text_and_header[0]) cache['do_authentication_header'] = do_authentication_text_and_header[1] aaac_value = get_nsc_aaac() # print('aaac_value', aaac_value) cache[nsc_aaac] = aaac_value set_client_text_and_header = connect_set_client() print('connect_set_client finish...') cache['set_client_text'] = xml_to_dict(set_client_text_and_header[0]) cache['set_client_header'] = set_client_text_and_header[1] temp_value = get_nsc_temp() cache[nsc_temp] = temp_value if user_info is None: # 询问是否保存用户信息 is_save_user_info = input('whether to save user information? Y or N:') if is_save_user_info is not None and is_save_user_info.upper() == 'Y': save_user_info_to_file(username, password) resource_list_text_and_header = connect_list(list_url, Cookie=f'NSC_AAAC={cache.get(nsc_aaac)};NSC_TEMP={cache.get(nsc_temp)}') # print(resource_list_text_and_header) print('connect_list finish...') cache['resource_list_text'] = resource_list_text_and_header[0] cache['resource_list_header'] = resource_list_text_and_header[1] # 获得homepage config get_homepage_config_text_and_header = connect_get_homepage_config(get_homepage_config_url) print('connect_get_homepage_config finish...') cache['get_homepage_config_text'] = json_to_dict(get_homepage_config_text_and_header[0]) cache['get_homepage_config_header'] = get_homepage_config_text_and_header[1] # print(get_homepage_config_text_and_header) personal_bookmark_pl_text_and_header = connect_personal_bookmark_pl(personal_bookmark_pl_url) print('connect_personal_bookmark_pl finish...') # print('personal_bookmark_pl_text_and_header', personal_bookmark_pl_text_and_header) cache['personal_bookmark_pl_text'] = personal_bookmark_pl_text_and_header[0] cache['personal_bookmark_pl_header'] = personal_bookmark_pl_text_and_header[1] cvpnize_url_text_and_header = connect_cvpnize_url(cvpnize_url) # print('cvpnize_url_text_and_header', cvpnize_url_text_and_header) cache['cvpnize_url_text'] = cvpnize_url_text_and_header[0] cache['cvpnize_url_header'] = cvpnize_url_text_and_header[1] # 获得VM列表 resource_list_text_and_header = connect_resource_list(resource_list_url) print('connect_resource_list finish...') # print('resource_list_text_and_header', resource_list_text_and_header) cache['resource_list_text'] = json_to_dict(resource_list_text_and_header[0]) cache['resource_list_header'] = resource_list_text_and_header[1] resource_list = get_user_resources() print('\033[31mplease choose your vm:\033[0m') # 输出VM for index, item in enumerate(resource_list): print(index, item.get('description')) vm_index = eval(input('vm index:')) resource_item = resource_list[vm_index] # print(resource_item.get('content')) down_load_url = resource_item.get('content') download_remote_desktop_file(down_load_url) print('opening remote desktop...') run_rdp_file() except Exception as e: print(e) print('\033[31mlogin fail,please confirm your username or password\033[0m') os.remove(user_info_file) if __name__ == '__main__': auto_login_vm()