# -*- coding:utf-8 -*- """ Copyright (c) 2020 Baidu.com, Inc. All Rights Reserved This module is for flash firmware of Baidu server Author: Li,Jinfeng(ACG Cloud) Date: 2020/05/13 08:30:49 change list: ----2025/12/31---- 1)Add the function of creating a new thread to ping the target machine. 2)add optional param to support do mc reset cold before update bmc firmware 3)add optional param to support do power off before update bios firmware, and do power on after finish update ----2026/03/17---- 1)Support --version and -V to get tool version ----2026/03/27---- 1)BUG Fix:only root can ping ilo_ip """ import argparse import logging import re import time import sys import json import socket import random from retrying import retry import requests from requests_toolbelt.adapters.source import SourceAddressAdapter import urllib3 import threading import subprocess import os from datetime import datetime import ipaddress from requests_toolbelt.multipart.encoder import MultipartEncoder, MultipartEncoderMonitor urllib3.disable_warnings(urllib3.exceptions.HeaderParsingError) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # suppress overly verbose logs from libraries that aren't helpful logging.getLogger("urllib3.connectionpool").setLevel(logging.WARNING) # real_create_conn = socket.create_connection # # def set_src_addr(*args): # address, timeout = args[0], args[1] # source_address = ('1.1.3.1', 0) # return real_create_conn(address, timeout, source_address) # socket.create_connection = set_src_addr GLOBAL_UPLOAD_TIMESTAMP = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') #mod by zgf 20260317 begin:Support --version and -V to get tool version VERSION = "2.0" #mod by zgf 20260317 end TIME_PATTERN = re.compile(r'(?:time|时间)[=<](\d+\.?\d*)\s*ms', re.IGNORECASE) def retry_if_error(exception): """Return True if we should retry (in this case when it's an IOError), False otherwise""" global GLOBAL_UPLOAD_TIMESTAMP GLOBAL_UPLOAD_TIMESTAMP = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') print(f"[{GLOBAL_UPLOAD_TIMESTAMP}]retrying... ", exception) return isinstance(exception, FlasherException) def ping_ilo_ip(ip_address, log_file, interval=1, count=None): """ ping ilo_ip per second :param count: ping count,if count = None, ping tile update over :return: if count != None, return success time """ success_count = 0 ping_count = 0 #mod by zgf 20260327 begin:BUG Fix:only root can ping ilo_ip ip_obj = ipaddress.ip_address(ip_address) is_ipv6 = ip_obj.version == 6 cmd = ["ping6" if is_ipv6 else "ping", "-c", "1", "-W", "2"] cmd.append(ip_address) try: with open(log_file, 'a', encoding='utf-8') as f: start_time = datetime.now() f.write(f"\n{'='*50}\n") f.write(f"ping {ip_address} at {start_time}\n") f.write(f"count={count if count else 'None'}\n") f.write(f"{'='*50}\n\n") while True: global GLOBAL_UPLOAD_TIMESTAMP GLOBAL_UPLOAD_TIMESTAMP = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') try: res = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, encoding='utf-8', errors='replace', timeout=3 ) if res.returncode == 0: match = TIME_PATTERN.search(res.stdout) delay = float(match.group(1)) if match else 0.0 success_count += 1 log_line = f"[{GLOBAL_UPLOAD_TIMESTAMP}] Ping success,延迟: {delay:.2f} ms (成功次数: {success_count})\n" else: log_line = f"[{GLOBAL_UPLOAD_TIMESTAMP}] Ping 失败! 错误信息:{res.stderr.strip() or res.stdout.strip()}\n" except Exception as e: log_line = f"[{GLOBAL_UPLOAD_TIMESTAMP}] error: {str(e)}\n" f.write(log_line) f.flush() #print(log_line.strip()) ping_count += 1 if count and ping_count >= count: break time.sleep(interval) except Exception as e: print(f"error: {e}") finally: end_time = datetime.now() duration = end_time - start_time try: with open(log_file, 'a', encoding='utf-8') as f: f.write(f"\n{'='*50}\n") f.write(f"ping {ip_address} end at {end_time}\n") f.write(f"runtime: {duration}\n") f.write(f"ping count: {ping_count}\n") f.write(f"success count: {success_count}\n") f.write(f"{'='*50}\n") except Exception as e: print(f"write into file fail: {e}") print(f" ping count: {ping_count}") print(f" success count: {success_count}") print(f" run time: {duration}") print(f" Ping log file: {log_file}") if count: return success_count else: return None class Flasher: """Class for managing flash procedure of firmware""" def __init__(self, ilo_ip, username, password, fw_type, filename, mode=4, action=3, reset=0, poweroff=0, secure=True): self.ilo_ip = ilo_ip self.username = username self.password = password self.fw_type = fw_type self.filename = filename self.mode = str(mode) self.action = str(action) self.reset = reset self.poweroff = poweroff # self.action = action self.secure = secure self._session = requests.Session() # 禁用底层自动重试,防止 stream 上传时文件指针无法重置而上传空文件。 # 让异常抛出到应用层,由外层的 @retry 重新打开文件进行重传。 adapter = requests.adapters.HTTPAdapter(max_retries=0) self._session.mount('http://', adapter) self._session.mount('https://', adapter) self._token = None def __create_url(self, path): scheme = "http" if self.secure: scheme = "https" if ":" in self.ilo_ip: url = "{}://[{}]{}".format(scheme, self.ilo_ip, path) else: url = "{}://{}{}".format(scheme, self.ilo_ip, path) return url def __request(self, method, path, headers=None, files=None, data=None): url = self.__create_url(path) method = method.lower() if method not in ["get", "post", "delete"]: raise FlasherInternalError("unsupported method: {}".format(method)) # Internal retry loop replacing the @retry decorator # We process 3 attempts. max_attempts = 3 for attempt in range(1, max_attempts + 1): try: func = getattr(self._session, method) # If data is a stream (MultipartEncoder/Monitor), verify=False, timeout=300 response = func(url, headers=headers, files=files, data=data, stream=True, verify=False, timeout=300) # Check status code try: response.raise_for_status() except requests.exceptions.HTTPError: raise FlasherException( "{} unexpected return status code: {}, header: {}".format( url, response.status_code, response.request.headers)) # If success, return response return response except (requests.ConnectionError, requests.Timeout, requests.RequestException, AttributeError, FlasherException) as error: # Determine if we should retry is_last_attempt = (attempt == max_attempts) # Check if data is a stream that cannot be rewound (MultipartEncoder or Monitor) # If it is a stream, we CANNOT retry inside __request because the stream is exhausted. # We must raise exception so the caller (upload) can restart the process (re-open file). is_stream_data = isinstance(data, (MultipartEncoder, MultipartEncoderMonitor)) if is_last_attempt or is_stream_data: # Map exceptions to FlasherException format if needed, then raise if isinstance(error, requests.ConnectionError): raise FlasherException("{} not reachable:{}".format(url, error)) elif isinstance(error, requests.Timeout): raise FlasherException("{} {} timeout".format(method, url)) elif isinstance(error, requests.RequestException): raise FlasherException("{} {}".format(error, url)) elif isinstance(error, AttributeError): raise FlasherException("AttributeError:{}".format(error)) else: raise error # Already FlasherException or other # If we are here, it's not the last attempt AND data is safe to retry (not a stream) print(f"[{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}] __request failed, retrying... {error}") # Wait random time between 3s and 10s (similar to wait_random_min=3000, wait_random_max=10000) time.sleep(random.uniform(3, 10)) except Exception as e: print('other eror:', e) if attempt == max_attempts: pass # Should probably raise or handle, original code passed. # Original code had 'pass' for generic Exception, which is risky but preserving behavior. # If loop finishes without return (should affectively be handled by raise in loop), return None or raise # Based on original logic, if generic Exception happened, it might fall through. # But for the handled exceptions, it raises on last attempt. return None def create_session(self): """Return (True, None) or (False, error_message)""" data = {"username": self.username, "password": self.password} path = "/baidu/session" headers = {"Content-Encoding": "gzip"} try: response = self.__request("post", path, data=data, headers=headers) except FlasherException as error: raise FlasherCreateSessionError("{}".format(error)) try: body = json.loads(response.raw.read().decode('utf-8')) self._token = body["CSRFToken"] except Exception as e: raise FlasherCreateSessionError("{},{}".format(e,response.raw.read())) return self._token def version_summary(self): """Return (True, None) or (False, error_message)""" headers = {"X-CSRFTOKEN": self._token} path = "/api/version_summary" try: response = self.__request("get", path, headers=headers) except FlasherException as error: raise FlasherUploadError("{}".format(error)) body = json.loads(response.raw.read().decode('utf-8')) # body = response.json() bmc_version = body[1].get("dev_version") bios_version = body[2].get("dev_version") scp_version = body[3].get("dev_version") if bios_version == None and scp_version == None: raise FlasherUploadError("unexpected status: {}".format(status)) return (bmc_version, bios_version, scp_version) @retry(stop_max_attempt_number=3, wait_random_min=3000, wait_random_max=10000, retry_on_exception=retry_if_error) def upload(self): """Return (True, None) or (False, error_message)""" global GLOBAL_UPLOAD_TIMESTAMP GLOBAL_UPLOAD_TIMESTAMP = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') headers = {"X-CSRFTOKEN": self._token} # files = {"fwimage": open(self.filename, "rb")} # Removed redundant open path = "/baidu/maintenance/{}/firmware".format(self.fw_type.upper()) file_path = self.filename def _progress_callback(monitor, file_size): """Progress Callback Function: Prints the upload progress""" sent_bytes = monitor.bytes_read total_bytes = file_size progress_percent = min((sent_bytes / total_bytes) * 100, 100) print( f"[{GLOBAL_UPLOAD_TIMESTAMP}][progress] {progress_percent:.2f}% " f"([{GLOBAL_UPLOAD_TIMESTAMP}]{sent_bytes}/{total_bytes} bytes) " ) try: # get filesize file_size = os.path.getsize(file_path) print(f"[{[{GLOBAL_UPLOAD_TIMESTAMP}]}][file upload] filepath:{file_path},filesize:{file_size} bytes") # 3. make multipart request with open(file_path, "rb") as f: encoder = MultipartEncoder( fields={ "fwimage": ( os.path.basename(file_path), f, "application/octet-stream" ) } ) def create_progress_callback(total_size): last_percent = -1 def callback(monitor): nonlocal last_percent bytes_read = monitor.bytes_read current_percent = (bytes_read / total_size) * 100 integer_percent = int(current_percent) if integer_percent > last_percent or bytes_read == total_size: print( f"\r[{GLOBAL_UPLOAD_TIMESTAMP}][progress] {current_percent:.2f}% ({bytes_read}/{total_size} bytes)\n", end="", flush=True ) last_percent = integer_percent if bytes_read == total_size: print() return callback monitor = MultipartEncoderMonitor( encoder, create_progress_callback(file_size) ) request_headers = headers.copy() request_headers['Content-Type'] = monitor.encoder.content_type response = self.__request( method="post", path=path, data=monitor, headers=request_headers ) except FileNotFoundError: raise FlasherUploadError(f"file not exist:{file_path}") except PermissionError: raise FlasherUploadError(f"no file access:{file_path}") except FlasherException as e: raise FlasherUploadError(f"upload fail:{str(e)}") except Exception as e: raise FlasherUploadError(f"known error:{str(e)}") try: print("response", response) body = json.loads(response.raw.read().decode('utf-8')) #body = response.json() except json.JSONDecodeError: raise FlasherUploadError( f"error format(not JSON):\n" f"Headers: {response.headers}\n" f"Content: {response.text[:200]}..." ) except Exception as e: raise FlasherUploadError(f"Failed to parse the response:{str(e)}") status = body.get("status") or body.get("stats") if status != 1: raise FlasherUploadError(f"upload fail(response_code error):status={status},body:{body}") print(f"[success] file_name:{file_path},response_code:{body}") return (True, None) def config(self): """Return (True, None) or (False, error_message)""" headers = {"X-CSRFTOKEN": self._token,"Content-Type": "application/json"} data = json.dumps({"Action": self.mode}) path = "/baidu/maintenance/{}/configuration".format(self.fw_type.upper()) try: response = self.__request("post", path, headers=headers, data=data) except FlasherException as error: raise FlasherConfigError("{}".format(error)) body = json.loads(response.raw.read().decode('utf-8')) # body = response.json() status = body.get("status") if body.get("status") else body.get("stats") if status != 1: raise FlasherConfigError("path:{},data:{} unexpected status: {}".format(path, data, status)) def upgrade(self): headers = {"X-CSRFTOKEN": self._token, "Content-Type": "application/json"} data = json.dumps({"Action": self.action}) path = "/baidu/maintenance/{}/upgrade".format(self.fw_type.upper()) try: response = self.__request("post", path, headers=headers, data=data) except FlasherException as error: # 升级请求超时,可能BMC已经在刷固件,尝试查询进度确认 print(f"Upgrade request failed: {error}, checking progress...") try: progress = self.get_progress() if progress >= 0: print(f"Upgrade is in progress ({progress}%), continuing...") return except Exception: pass raise FlasherUpgradeError("{}".format(error)) body = json.loads(response.raw.read().decode('utf-8')) status = body.get("status") or body.get("stats") if status != 1: raise FlasherUpgradeError("unexpected status: {}".format(status)) def get_progress(self): """Return (True, progress) or (False, error_message)""" headers = {"X-CSRFTOKEN": self._token} path = "/baidu/maintenance/{}/status".format(self.fw_type.upper()) try: response = self.__request("get", path, headers=headers) except FlasherException as error: raise FlasherProgreessError("{}".format(error)) body = json.loads(response.raw.read().decode('utf-8')) #print(f"DEBUG get_progress: body = {json.dumps(body, indent=2, ensure_ascii=False)}") if "UpgradeTasks" in body and len(body["UpgradeTasks"]) > 0: task = body["UpgradeTasks"][0] percentage_str = task.get("Percentage", "0%") task_state = task.get("TaskState", "Unknown") error_code = task.get("ErrorCode", 0) if error_code != 0: raise FlasherProgreessError("Upgrade failed with error code: {}".format(error_code)) match = re.search(r'(\d+)%', percentage_str) if match: progress = int(match.group(1)) else: progress = 0 if task_state == "Completed": progress = 100 return progress # body = response.json() status = body.get("status") if body.get("status") else body.get("stats") if status not in [1,2,3] : raise FlasherProgreessError("unexpected status: get_progress {} {}".format(status, str(body))) progress_content = body.get('Progress') if not body.get("progress") else body.get("progress") if progress_content is None: if status in [2, 3]: return 100 else: return 0 if isinstance(progress_content, int): progress = progress_content elif isinstance(progress_content, str): match = re.search("[0-9]+|Complete|Success", progress_content) if not match: raise FlasherProgreessError("unexpected progress: {}".format(progress_content)) progress = int(match.group()) if isinstance(match.group(), int) else 100 else: raise FlasherProgreessError("unexpected progress: {}".format(progress_content)) return progress def send_ipmi_command(self, command): """Send IPMI CMD""" print(f"> ipmitool -I lanplus -H {self.ilo_ip} -U {self.username} -P {self.password} {command}") result = subprocess.run( ["ipmitool", "-I", "lanplus", "-H", self.ilo_ip, "-U", self.username, "-P", self.password] + command.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, check=False ) print(f"< returncode: {result.returncode}, stdout: {result.stdout.strip()}") if result.stderr: print(f"< error: {result.stderr.strip()}") return f"{result.stdout}\n{result.stderr}".strip() def mc_reset(self, args, log_file, interval=1, count=5, max_retries=2): """Do mc reset cold depend on -r param""" try: if args.type == "bmc" and args.reset == 1: # send mc reset cold current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) print(f"do mc reset cold") self.send_ipmi_command("mc reset cold") # sleep 420s for bmc ready time.sleep(420) # ping ilo_ip for attempt in range(max_retries): if attempt > 0: time.sleep(60) else: print(f"\nping count:{attempt}...") success_count = ping_ilo_ip(self.ilo_ip, log_file, interval=1, count=count) if success_count == count: print(f"bmc is ready") with open(log_file, 'a', encoding='utf-8') as f: f.write(f"BMC is ready\n") return True else: print(f"count:{attempt+1} fail,(success:{success_count}/{count})") with open(log_file, 'a', encoding='utf-8') as f: f.write(f"count:{attempt+1} fail,(success:{success_count}/{count})\n") print(f"all ping test fail") with open(log_file, 'a', encoding='utf-8') as f: f.write(f"\n all ping test fail!\n") return False except FlasherDoMcRest as error: error_msg = f"MC Reset Cold fail: {error}" print(error_msg) with open(log_file, 'a', encoding='utf-8') as f: f.write(f"\n error: {error_msg}\n") raise FlasherDoMcRest("Failed to do mc reset cold:".format(error.stderr)) return False def set_power_S5(self): """Set power off using IPMI command, if support Baidu oem command, use raw off, else use power off""" try: result = self.send_ipmi_command("raw 0x30 0x03 0x61 0x7e 0x00") print("set_power_S5:", result) # not support Baidu oem raw cmd if "Unable to send RAW command" in result: print("not support Baidu oem cmd, use power off") # send off cmd current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) self.send_ipmi_command("power off") # sleep for 60s time.sleep(60) result = self.send_ipmi_command("power status") data_parts = result.strip().split() if len(data_parts) >= 4 and data_parts[3] == "off": print(" The power supply is already in the off state, proceed with the upgrade.") return True else: print(" Timeout: Power status did not change to off within 60 seconds.", end='', flush = True) return False # support Baidu oem raw cmd else: print("do raw off cmd") data_parts = result.strip().split() if len(data_parts) >= 4: power_status = data_parts[3] # get raw power status print("current power status", power_status) if power_status in ["00", "10"]: # send off cmd current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) self.send_ipmi_command("raw 0x30 0x02 0x61 0x7E 0x00 0x00") time.sleep(60) result = self.send_ipmi_command("raw 0x30 0x03 0x61 0x7e 0x00") data_parts = result.strip().split() if len(data_parts) >= 4 and (data_parts[3] == "05" or data_parts[3] == "15"): print(" The power supply is already in the off state, proceed with the upgrade.") return True else: print(" Timeout: Power status did not change to 0x15 within 60 seconds.", end='', flush = True) return False else: print(" Unexpected response format:", result, end='', flush = True) return False except FlasherSetPowerS5 as error: raise FlasherSetPowerS5("Failed to check power status:".format(error.stderr)) return False def set_power_S0(self): """Set power on using IPMI command""" try: result = self.send_ipmi_command("raw 0x30 0x03 0x61 0x7e 0x00") print("set_power_S0:", result) # not support Baidu oem raw cmd if "Unable to send RAW command" in result: print("not support Baidu oem cmd, use power on") # send on cmd current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) self.send_ipmi_command("power on") time.sleep(60) result = self.send_ipmi_command("power status") data_parts = result.strip().split() power_status = data_parts[3] # get power status if power_status in ["off"]: # send power on comd for each 30s, max 10 times print("now is in S5 status, send power on cmd...") max_attempts = 10 for attempt in range(1, max_attempts + 1): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) time.sleep(30) result = self.send_ipmi_command("power on") print("send power on cmd and result is", result) if "Command not supported in present state" in result: #try again print("need to try again...") else: print("send power on cmd success, wait 60 seconds...") time.sleep(60) result = self.send_ipmi_command("power status") data_parts = result.strip().split() if len(data_parts) >= 4 and data_parts[3] == "on": print(" The power supply is already in the on state, proceed with the upgrade.") return True else: print(" Timeout: Power status did not change to on within 60 seconds.", end='', flush = True) return False else: print("power status is on") # support Baidu oem raw cmd else: print("do raw on cmd") data_parts = result.strip().split() if len(data_parts) >= 4: power_status = data_parts[3] print("current power status", power_status) if power_status in ["05", "15"]: # send power on comd for each 30s, max 10 times print("now is in S5 status, send power on cmd...") max_attempts = 10 for attempt in range(1, max_attempts + 1): current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d-%H:%M:%S") print(formatted_time) time.sleep(60) result = self.send_ipmi_command("raw 0x30 0x02 0x61 0x7e 0x00 0x01") print("send power on cmd and result is", result) if "Command not supported in present state" in result: #try again print("need to try again...") else: print("send power on cmd success, wait 60 seconds...") time.sleep(60) result = self.send_ipmi_command("raw 0x30 0x03 0x61 0x7e 0x00") data_parts = result.strip().split() if len(data_parts) >= 4 and (data_parts[3] == "00" or data_parts[3] == "10"): print(" The power supply is already in the on state, proceed with the upgrade.") return True else: print(" Timeout: Power status did not change to 0x10 within 30 seconds.", end='', flush = True) return False else: print("power is on") else: print(" Unexpected response format:", result, end='', flush = True) return False except FlasherSetPowerS0 as error: raise FlasherSetPowerS0("Failed to check power status:".format(error.stderr)) return False def get_mc_info(self, args): try: if args.type == "bios" and args.config == 4 and args.when == 3: result = self.send_ipmi_command("mc info") for line in result.splitlines(): if "Firmware Revision" in line: return line.split(":")[1].strip() except FalsherGetMcInfo as error: print(" Failed to get mc info:", error, end='', flush = True) return False def compare_versions(self, current, required): current_parts = list(map(int, current.split('.'))) required_parts = list(map(int, required.split('.'))) for cur, req in zip(current_parts, required_parts): if cur < req: return False if cur > req: return True return True def run(self): """one command to do all flash operations""" self.create_session() self.upload() self.config() self.upgrade() class FlasherException(Exception): """Base exception for Flasher""" class FlasherInternalError(FlasherException): """Flasher internal error, indicating programming error""" class FlasherCreateSessionError(FlasherException): """Failed to create session""" class FlasherUploadError(FlasherException): """Failed to upload firmware image""" class FlasherConfigError(FlasherException): """Failed to config flash mode""" class FlasherUpgradeError(FlasherException): """Failed to upgrade""" class FlasherProgreessError(FlasherException): """Failed to query progreess""" class FlasherSetPowerS5(FlasherException): """Failed to set power status to S5""" class FlasherSetPowerS0(FlasherException): """Failed to set power status to S0""" class FlasherDoMcRest(FlasherException): """Failed to do mc reset cold""" class FalsherGetMcInfo(FlasherException): """Failed to get mc info""" class FlasherSendIPMIError(FlasherException): """Failed to send IPMI command""" def main(): """Entry point""" parser = argparse.ArgumentParser(description="Tool for flashing firmware of Baidu servers using Web API") parser.add_argument("bmc_ip", help="BMC IP") parser.add_argument("type", choices=["bios", "bmc"], help="firmware type") parser.add_argument("image", help="firmware image file") parser.add_argument("-u", "--username", help="username", default="ADMIN") parser.add_argument("-p", "--password", help="password", default="ADMIN") parser.add_argument("-c", "--config", help="flash which areas", type=int, default=4) parser.add_argument("-w", "--when", help="when to flash", type=int, default=3) #support new function -r --reset: whether do bmc cold reset before update bmc firmware, type=int, default=0, no reset # -f --poweroff: whether do power off before update bios firmware and do power on when finish,, default=0, no power off parser.add_argument("-r", "--reset", help="bmc reset cold", type=int, default=0) parser.add_argument("-f", "--poweroff", help="do power off", type=int, default=0) #mod by zgf 20260317 begin:Support --version and -V to get tool version parser.add_argument("-V", "--version", action="version", version=f"%(prog)s {VERSION}", help="show version") #mod by zgf 20260317 end args = parser.parse_args() print("Flash {} of {} to {}".format(args.type.upper(), args.bmc_ip, args.image)) flasher = Flasher(args.bmc_ip, args.username, args.password, args.type, args.image, args.config, args.when, args.reset, args.poweroff) try: #ping before start do firmware update global GLOBAL_UPLOAD_TIMESTAMP GLOBAL_UPLOAD_TIMESTAMP = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') log_filename="ping_ilo_ip_" + args.bmc_ip + "_" + GLOBAL_UPLOAD_TIMESTAMP + ".log" thread = threading.Thread( target=ping_ilo_ip, args=(args.bmc_ip, log_filename, 1, None), daemon=True) if args.type == "bmc" and args.reset == 1: flasher.mc_reset(args, log_filename) thread.start() if args.type == "bios" and args.poweroff == 1: flasher.set_power_S5() print(f"[{GLOBAL_UPLOAD_TIMESTAMP}] Create session...") flasher.create_session() print(f"[{GLOBAL_UPLOAD_TIMESTAMP}] Upload...") flasher.upload() print(f"[{GLOBAL_UPLOAD_TIMESTAMP}] Config...") flasher.config() print(f"[{GLOBAL_UPLOAD_TIMESTAMP}] Upgrade...") flasher.upgrade() while True: time.sleep(3.0) progress = flasher.get_progress() print(f"\r[{GLOBAL_UPLOAD_TIMESTAMP}] Progress: [{progress}]\n", end='') if progress == 100: break if args.type == "bios" and args.poweroff == 1: flasher.set_power_S0() return_code = 0 print("") except FlasherException as error: print("") print(error) return_code = 1 sys.exit(return_code) if __name__ == "__main__": main()