Source code for tomoscan.tomoscan_2bm

"""Software for tomography scanning with EPICS at APS beamline 2-BM

   Classes
   -------
   TomoScan2BM
     Derived class for tomography scanning with EPICS at APS beamline 2-BM
"""
import time
import os
import h5py 
import sys
import traceback
import numpy as np
import cv2
import json
import pathlib

import sys,os
import time
import re
import serial
import telnetlib
import http.client as httplib
import base64
import string
import threading

from epics import PV
from pathlib import Path

from tomoscan import data_management as dm
from tomoscan.tomoscan_helical import TomoScanHelical
from tomoscan import log

EPSILON = .001
CREDENTIALS_FILE_NAME = os.path.join(str(pathlib.Path.home()), '.webcam_credentials')


[docs] class TomoScan2BM(TomoScanHelical): """Derived class used for tomography scanning with EPICS at APS beamline 2-BM Parameters ---------- pv_files : list of str List of files containing EPICS pvNames to be used. macros : dict Dictionary of macro definitions to be substituted when reading the pv_files """ def __init__(self, pv_files, macros): super().__init__(pv_files, macros) prefix = self.pv_prefixes['MctOptics'] self.epics_pvs['ImagePixelSize'] = PV(prefix + 'ImagePixelSize') # set TomoScan xml files self.epics_pvs['CamNDAttributesFile'].put('TomoScanDetectorAttributes.xml') self.epics_pvs['FPXMLFileName'].put('TomoScanLayout.xml') macro = 'DET=' + self.pv_prefixes['Camera'] + ',' + 'TS=' + self.epics_pvs['Testing'].__dict__['pvname'].replace('Testing', '', 1) self.control_pvs['CamNDAttributesMacros'].put(macro) # Enable auto-increment on file writer self.epics_pvs['FPAutoIncrement'].put('Yes') # # Set standard file template on file writer # self.epics_pvs['FPFileTemplate'].put("%s%s_%3.3d.h5", wait=True) # Disable over writing warning self.epics_pvs['OverwriteWarning'].put('Yes') log.setup_custom_logger("./tomoscan.log") # try to read username/password for pdu and webcam access_fname = os.path.join(str(pathlib.Path.home()), 'access.json') with open(access_fname, 'r') as fp: self.access_dic = json.load(fp) # Set AD plugins self.epics_pvs['PVANDArrayPort'].put('OVER1') self.epics_pvs['PVAEnableCallbacks'].put('Enable') self.epics_pvs['ROIEnableCallbacks'].put('Disable') self.epics_pvs['CBEnableCallbacks'].put('Disable') # Configure callbacks for mctoptics prefix = self.pv_prefixes['MctOptics'] self.epics_pvs['CameraSelect'] = PV(prefix + 'CameraSelect') camera_select = self.epics_pvs['CameraSelect'].value if camera_select == None: log.error('mctOptics is down. Please start mctOptics first') else: self.epics_pvs['Camera0'] = PV(prefix + 'Camera0PVPrefix') self.epics_pvs['Camera1'] = PV(prefix + 'Camera1PVPrefix') self.epics_pvs['FilePlugin0'] = PV(prefix + 'FilePlugin0PVPrefix') self.epics_pvs['FilePlugin1'] = PV(prefix + 'FilePlugin1PVPrefix') self.epics_pvs['PvaPlugin1'] = PV(prefix + 'PvaPlugin1PVPrefix') self.epics_pvs['RoiPlugin0'] = PV(prefix + 'RoiPlugin0PVPrefix') self.epics_pvs['RoiPlugin1'] = PV(prefix + 'RoiPlugin1PVPrefix') self.epics_pvs['CbPlugin0'] = PV(prefix + 'CbPlugin0PVPrefix') self.epics_pvs['CbPlugin1'] = PV(prefix + 'CbPlugin1PVPrefix') self.epics_pvs['CameraSelect'].add_callback(self.pv_callback_2bm)
[docs] def pv_callback_2bm(self, pvname=None, value=None, char_value=None, **kw): """Callback function that is called by pyEpics when certain EPICS PVs are changed """ log.debug('pv_callback_2bm pvName=%s, value=%s, char_value=%s', pvname, value, char_value) if (pvname.find('CameraSelect') != -1): thread = threading.Thread(target=self.reinit_camera, args=()) thread.start()
[docs] def reinit_camera(self): """Init camera PVs based on the mctOptics selection. Parameters ---------- camera : int, optional The camera to use. Optique Peter system support 2 cameras """ if not self.scan_is_running: ######## prefix = self.pv_prefixes['MctOptics'] self.epics_pvs['CameraSelect'] = PV(prefix + 'CameraSelect') camera_select = self.epics_pvs['CameraSelect'].value log.info('changing camera prefix to camera %s', camera_select) if camera_select == None: log.error('mctOptics is down. Please start mctOptics first') else: self.epics_pvs['Camera0'] = PV(prefix + 'Camera0PVPrefix') self.epics_pvs['Camera1'] = PV(prefix + 'Camera1PVPrefix') self.epics_pvs['FilePlugin0'] = PV(prefix + 'FilePlugin0PVPrefix') self.epics_pvs['FilePlugin1'] = PV(prefix + 'FilePlugin1PVPrefix') self.epics_pvs['PvaPlugin0'] = PV(prefix + 'PvaPlugin0PVPrefix') self.epics_pvs['PvaPlugin1'] = PV(prefix + 'PvaPlugin1PVPrefix') self.epics_pvs['RoiPlugin0'] = PV(prefix + 'RoiPlugin0PVPrefix') self.epics_pvs['RoiPlugin1'] = PV(prefix + 'RoiPlugin1PVPrefix') self.epics_pvs['CbPlugin0'] = PV(prefix + 'CbPlugin0PVPrefix') self.epics_pvs['CbPlugin1'] = PV(prefix + 'CbPlugin1PVPrefix') if camera_select == 0: camera_prefix = self.epics_pvs['Camera0'].get(as_string=True) hdf_prefix = self.epics_pvs['FilePlugin0'].get(as_string=True) pva_prefix = self.epics_pvs['PvaPlugin0'].get(as_string=True) roi_prefix = self.epics_pvs['RoiPlugin0'].get(as_string=True) cb_prefix = self.epics_pvs['CbPlugin0'].get(as_string=True) else: camera_prefix = self.epics_pvs['Camera1'].get(as_string=True) hdf_prefix = self.epics_pvs['FilePlugin1'].get(as_string=True) pva_prefix = self.epics_pvs['PvaPlugin1'].get(as_string=True) roi_prefix = self.epics_pvs['RoiPlugin1'].get(as_string=True) cb_prefix = self.epics_pvs['CbPlugin1'].get(as_string=True) self.epics_pvs['CameraPVPrefix'].put(camera_prefix) log.info(camera_prefix) self.epics_pvs['FilePluginPVPrefix'].put(hdf_prefix) log.info(hdf_prefix) self.epics_pvs['PvaPluginPVPrefix'].put(pva_prefix) log.info(pva_prefix) self.epics_pvs['RoiPluginPVPrefix'].put(roi_prefix) log.info(roi_prefix) self.epics_pvs['CbPluginPVPrefix'].put(cb_prefix) log.info(cb_prefix) self.pv_prefixes['FilePlugin'] = hdf_prefix # need to update TomoScan PV Prefix to the new camera / hdf plugin self.epics_pvs['CameraPVPrefix'].put(camera_prefix, wait=True) self.epics_pvs['FilePluginPVPrefix'].put(hdf_prefix, wait=True) # Update PVPrefix PV camera_prefix = camera_prefix + 'cam1:' self.control_pvs['CamManufacturer'] = PV(camera_prefix + 'Manufacturer_RBV') self.control_pvs['CamModel'] = PV(camera_prefix + 'Model_RBV') self.control_pvs['CamAcquire'] = PV(camera_prefix + 'Acquire') self.control_pvs['CamAcquireBusy'] = PV(camera_prefix + 'AcquireBusy') self.control_pvs['CamImageMode'] = PV(camera_prefix + 'ImageMode') self.control_pvs['CamTriggerMode'] = PV(camera_prefix + 'TriggerMode') self.control_pvs['CamNumImages'] = PV(camera_prefix + 'NumImages') self.control_pvs['CamNumImagesCounter'] = PV(camera_prefix + 'NumImagesCounter_RBV') self.control_pvs['CamAcquireTime'] = PV(camera_prefix + 'AcquireTime') self.control_pvs['CamAcquireTimeRBV'] = PV(camera_prefix + 'AcquireTime_RBV') self.control_pvs['CamBinX'] = PV(camera_prefix + 'BinX') self.control_pvs['CamBinY'] = PV(camera_prefix + 'BinY') self.control_pvs['CamWaitForPlugins'] = PV(camera_prefix + 'WaitForPlugins') self.control_pvs['PortNameRBV'] = PV(camera_prefix + 'PortName_RBV') self.control_pvs['CamNDAttributesFile'] = PV(camera_prefix + 'NDAttributesFile') self.control_pvs['CamNDAttributesMacros'] = PV(camera_prefix + 'NDAttributesMacros') # If this is a Point Grey camera then assume we are running ADSpinnaker # and create some PVs specific to that driver manufacturer = self.control_pvs['CamManufacturer'].get(as_string=True) model = self.control_pvs['CamModel'].get(as_string=True) if (manufacturer.find('Point Grey') != -1) or (manufacturer.find('FLIR') != -1): self.control_pvs['CamExposureMode'] = PV(camera_prefix + 'ExposureMode') self.control_pvs['CamTriggerOverlap'] = PV(camera_prefix + 'TriggerOverlap') self.control_pvs['CamPixelFormat'] = PV(camera_prefix + 'PixelFormat') self.control_pvs['CamArrayCallbacks'] = PV(camera_prefix + 'ArrayCallbacks') self.control_pvs['CamFrameRateEnable'] = PV(camera_prefix + 'FrameRateEnable') self.control_pvs['CamTriggerSource'] = PV(camera_prefix + 'TriggerSource') self.control_pvs['CamTriggerSoftware'] = PV(camera_prefix + 'TriggerSoftware') if model.find('Grasshopper3 GS3-U3-23S6M') != -1: self.control_pvs['CamVideoMode'] = PV(camera_prefix + 'GC_VideoMode_RBV') if model.find('Blackfly S BFS-PGE-161S7M') != -1: self.control_pvs['GC_ExposureAuto'] = PV(camera_prefix + 'GC_ExposureAuto') prefix = hdf_prefix self.control_pvs['FPNDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['FPFileWriteMode'] = PV(prefix + 'FileWriteMode') self.control_pvs['FPNumCapture'] = PV(prefix + 'NumCapture') self.control_pvs['FPNumCaptured'] = PV(prefix + 'NumCaptured_RBV') self.control_pvs['FPCapture'] = PV(prefix + 'Capture') self.control_pvs['FPCaptureRBV'] = PV(prefix + 'Capture_RBV') self.control_pvs['FPFilePath'] = PV(prefix + 'FilePath') self.control_pvs['FPFilePathRBV'] = PV(prefix + 'FilePath_RBV') self.control_pvs['FPFilePathExists'] = PV(prefix + 'FilePathExists_RBV') self.control_pvs['FPFileName'] = PV(prefix + 'FileName') self.control_pvs['FPFileNameRBV'] = PV(prefix + 'FileName_RBV') self.control_pvs['FPFileNumber'] = PV(prefix + 'FileNumber') self.control_pvs['FPAutoIncrement'] = PV(prefix + 'AutoIncrement') self.control_pvs['FPFileTemplate'] = PV(prefix + 'FileTemplate') self.control_pvs['FPFullFileName'] = PV(prefix + 'FullFileName_RBV') self.control_pvs['FPAutoSave'] = PV(prefix + 'AutoSave') self.control_pvs['FPEnableCallbacks'] = PV(prefix + 'EnableCallbacks') self.control_pvs['FPXMLFileName'] = PV(prefix + 'XMLFileName') self.control_pvs['FPWriteStatus'] = PV(prefix + 'WriteStatus') # Set some initial PV values file_path = self.config_pvs['FilePath'].get(as_string=True) self.control_pvs['FPFilePath'].put(file_path) file_name = self.config_pvs['FileName'].get(as_string=True) self.control_pvs['FPFileName'].put(file_name) self.control_pvs['FPAutoSave'].put('No') self.control_pvs['FPFileWriteMode'].put('Stream') self.control_pvs['FPEnableCallbacks'].put('Enable') prefix = pva_prefix self.control_pvs['PVANDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['PVAEnableCallbacks'] = PV(prefix + 'EnableCallbacks') # Set some initial PV values self.control_pvs['PVANDArrayPort'].put('OVER1') self.control_pvs['PVAEnableCallbacks'].put('Enable') prefix = roi_prefix self.control_pvs['ROINDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['ROIScale'] = PV(prefix + 'Scale') self.control_pvs['ROIBinX'] = PV(prefix + 'BinX') self.control_pvs['ROIBinY'] = PV(prefix + 'BinY') self.control_pvs['ROIEnableCallbacks'] = PV(prefix + 'EnableCallbacks') # Set some initial PV values self.control_pvs['ROIEnableCallbacks'].put('Disable') prefix = cb_prefix self.control_pvs['CBPortNameRBV'] = PV(prefix + 'PortName_RBV') self.control_pvs['CBNDArrayPort'] = PV(prefix + 'NDArrayPort') self.control_pvs['CBPreCount'] = PV(prefix + 'PreCount') self.control_pvs['CBPostCount'] = PV(prefix + 'PostCount') self.control_pvs['CBCapture'] = PV(prefix + 'Capture') self.control_pvs['CBCaptureRBV'] = PV(prefix + 'Capture_RBV') self.control_pvs['CBTrigger'] = PV(prefix + 'Trigger') self.control_pvs['CBTriggerRBV'] = PV(prefix + 'Trigger_RBV') self.control_pvs['CBCurrentQtyRBV'] = PV(prefix + 'CurrentQty_RBV') self.control_pvs['CBEnableCallbacks'] = PV(prefix + 'EnableCallbacks') self.control_pvs['CBStatusMessage'] = PV(prefix + 'StatusMessage') # Set some initial PV values self.control_pvs['CBEnableCallbacks'].put('Disable') self.epics_pvs = {**self.config_pvs, **self.control_pvs} # Wait 1 second for all PVs to connect time.sleep(1) self.check_pvs_connected()
[docs] def open_frontend_shutter(self): """Opens the shutters to collect flat fields or projections. This does the following: - Checks if we are in testing mode. If we are, do nothing else opens the 2-BM front-end shutter. """ if self.epics_pvs['Testing'].get(): log.warning('In testing mode, so not opening shutters.') else: # Open 2-BM front-end shutter if not self.epics_pvs['OpenShutter'] is None: pv = self.epics_pvs['OpenShutter'] value = self.epics_pvs['OpenShutterValue'].get(as_string=True) status = self.epics_pvs['ShutterStatus'].get(as_string=True) log.info('shutter status: %s', status) log.info('open shutter: %s, value: %s', pv, value) self.epics_pvs['OpenShutter'].put(value, wait=True) self.wait_frontend_shutter_open() # self.wait_pv(self.epics_pvs['ShutterStatus'], 1) status = self.epics_pvs['ShutterStatus'].get(as_string=True) log.info('shutter status: %s', status)
[docs] def open_shutter(self): """Opens the shutters to collect flat fields or projections. This does the following: - Opens the 2-BM fast shutter. """ # Open 2-BM fast shutter if not self.epics_pvs['OpenFastShutter'] is None: pv = self.epics_pvs['OpenFastShutter'] value = self.epics_pvs['OpenFastShutterValue'].get(as_string=True) log.info('open fast shutter: %s, value: %s', pv, value) self.epics_pvs['OpenFastShutter'].put(value, wait=True) log.warning("Wait 2s - Temporarily while there is no fast shutter at 2bmb ") time.sleep(2)
[docs] def close_frontend_shutter(self): """Closes the shutters to collect dark fields. This does the following: - Closes the 2-BM front-end shutter. """ if self.epics_pvs['Testing'].get(): log.warning('In testing mode, so not opening shutters.') else: # Close 2-BM front-end shutter if not self.epics_pvs['CloseShutter'] is None: pv = self.epics_pvs['CloseShutter'] value = self.epics_pvs['CloseShutterValue'].get(as_string=True) status = self.epics_pvs['ShutterStatus'].get(as_string=True) log.info('shutter status: %s', status) log.info('close shutter: %s, value: %s', pv, value) self.epics_pvs['CloseShutter'].put(value, wait=True) self.wait_pv(self.epics_pvs['ShutterStatus'], 0) status = self.epics_pvs['ShutterStatus'].get(as_string=True) log.info('shutter status: %s', status)
[docs] def close_shutter(self): """Closes the shutters to collect dark fields. This does the following: - Closes the 2-BM fast shutter. """ # Close 2-BM fast shutter if not self.epics_pvs['CloseFastShutter'] is None: pv = self.epics_pvs['CloseFastShutter'] value = self.epics_pvs['CloseFastShutterValue'].get(as_string=True) log.info('close fast shutter: %s, value: %s', pv, value) self.epics_pvs['CloseFastShutter'].put(value, wait=True) log.warning("Wait 2s - Temporarily while there is no fast shutter at 2bmb ") time.sleep(2)
[docs] def set_trigger_mode(self, trigger_mode, num_images): """Sets the trigger mode SIS3820 and the camera. Parameters ---------- trigger_mode : str Choices are: "FreeRun", "Internal", or "PSOExternal" num_images : int Number of images to collect. Ignored if trigger_mode="FreeRun". This is used to set the ``NumImages`` PV of the camera. """ camera_model = self.epics_pvs['CamModel'].get(as_string=True) if(camera_model=='Oryx ORX-10G-51S5M' or camera_model=='Oryx ORX-10G-310S9M'): self.set_trigger_mode_oryx(trigger_mode, num_images) elif(camera_model=='Grasshopper3 GS3-U3-23S6M'): self.set_trigger_mode_grasshopper(trigger_mode, num_images) elif(camera_model=='Q-12A180-Fm/CXP-6'): self.set_trigger_mode_adimec(trigger_mode, num_images) else: log.error('Camera is not supported') exit(1)
[docs] def set_trigger_mode_oryx(self, trigger_mode, num_images): self.epics_pvs['CamAcquire'].put('Done') ### self.wait_pv(self.epics_pvs['CamAcquire'], 0) ### log.info('set trigger mode: %s', trigger_mode) if trigger_mode == 'FreeRun': self.epics_pvs['CamImageMode'].put('Continuous', wait=True) self.epics_pvs['CamTriggerMode'].put('Off', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 0) # self.epics_pvs['CamAcquire'].put('Acquire') elif trigger_mode == 'Internal': self.epics_pvs['CamTriggerMode'].put('Off', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 0) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamNumImages'].put(num_images, wait=True) else: # set camera to external triggering # These are just in case the scan aborted with the camera in another state self.epics_pvs['CamTriggerMode'].put('Off', wait=True) # VN: For FLIR we first switch to Off and then change overlap. any reason of that? self.epics_pvs['CamTriggerSource'].put('Line2', wait=True) self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True) self.epics_pvs['CamExposureMode'].put('Timed', wait=True) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamArrayCallbacks'].put('Enable') self.epics_pvs['CamFrameRateEnable'].put(0) self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True) self.epics_pvs['CamTriggerMode'].put('On', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def set_trigger_mode_grasshopper(self, trigger_mode, num_images): self.epics_pvs['CamAcquire'].put('Done') ### self.wait_pv(self.epics_pvs['CamAcquire'], 0) ### log.info('set trigger mode: %s', trigger_mode) if trigger_mode == 'FreeRun': self.epics_pvs['CamImageMode'].put('Continuous', wait=True) self.epics_pvs['CamTriggerMode'].put('Off', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 0) # self.epics_pvs['CamAcquire'].put('Acquire') elif trigger_mode == 'Internal': self.epics_pvs['CamTriggerMode'].put('Off', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 0) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamNumImages'].put(num_images, wait=True) else: # set camera to external triggering # These are just in case the scan aborted with the camera in another state self.epics_pvs['CamTriggerMode'].put('On', wait=True) # VN: For PG we need to switch to On to be able to switch to readout overlap mode self.epics_pvs['CamTriggerSource'].put('Line0', wait=True) self.epics_pvs['CamTriggerOverlap'].put('ReadOut', wait=True) self.epics_pvs['CamExposureMode'].put('Timed', wait=True) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamArrayCallbacks'].put('Enable') self.epics_pvs['CamFrameRateEnable'].put(0) self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True) self.epics_pvs['CamTriggerMode'].put('On', wait=True) self.wait_pv(self.epics_pvs['CamTriggerMode'], 1)
[docs] def set_trigger_mode_adimec(self, trigger_mode, num_images): self.epics_pvs['CamAcquire'].put('Done') ### self.wait_pv(self.epics_pvs['CamAcquire'], 0) ### log.info('set trigger mode: %s', trigger_mode) if trigger_mode == 'FreeRun': self.epics_pvs['CamImageMode'].put('Continuous', wait=True) self.epics_pvs['CamExposureMode'].put('Timed', wait=True) self.wait_pv(self.epics_pvs['CamExposureMode'], 0) elif trigger_mode == 'Internal': self.epics_pvs['CamExposureMode'].put('Timed', wait=True) self.wait_pv(self.epics_pvs['CamExposureMode'], 0) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamNumImages'].put(num_images, wait=True) else: # set camera to external triggering self.epics_pvs['CamExposureMode'].put('TimedTriggerCont', wait=True) self.wait_pv(self.epics_pvs['CamExposureMode'], 3) self.epics_pvs['CamImageMode'].put('Multiple') self.epics_pvs['CamNumImages'].put(self.num_angles, wait=True)
[docs] def begin_scan(self): """Performs the operations needed at the very start of a scan. This does the following: - Set data directory. - Set the TomoScan xml files - Calls the base class method. - Opens the front-end shutter. - Sets the PSO controller. - Creates theta array using list from PSO. - Turns on data capture. """ log.info('begin scan') # Set data directory file_path = self.epics_pvs['DetectorTopDir'].get(as_string=True) + self.epics_pvs['ExperimentYearMonth'].get(as_string=True) + os.path.sep + self.epics_pvs['UserLastName'].get(as_string=True) + os.path.sep self.epics_pvs['FilePath'].put(file_path, wait=True) # NetBooter = NetBooter_Control(mode='telnet',id=self.access_dic['pdu_username'],password=self.access_dic['pdu_password'],ip=self.access_dic['pdu_ip_address']) # NetBooter.power_off(1) # Call the base class method super().begin_scan() # Opens the front-end shutter self.open_frontend_shutter()
[docs] def end_scan(self): """Performs the operations needed at the very end of a scan. This does the following: - Calls ``save_configuration()``. - Put the camera back in "FreeRun" mode and acquiring so the user sees live images. - Sets the speed of the rotation stage back to the maximum value. - Calls ``move_sample_in()``. - Calls the base class method. - Closes shutter. - Add theta to the raw data file. - Copy raw data to data analysis computer. """ if self.return_rotation == 'Yes': # Reset rotation position by mod 360 , the actual return # to start position is handled by super().end_scan() # allow stage to stop log.info('wait until the stage is stopped') time.sleep(self.epics_pvs['RotationAccelTime'].get()*1.2) ang = self.epics_pvs['RotationRBV'].get() current_angle = np.sign(ang)*(np.abs(ang)%360) self.epics_pvs['RotationSet'].put('Set', wait=True) self.epics_pvs['Rotation'].put(current_angle, wait=True) self.epics_pvs['RotationSet'].put('Use', wait=True) # Close shutter self.close_shutter() # Stop the file plugin self.epics_pvs['FPCapture'].put('Done') self.wait_pv(self.epics_pvs['FPCaptureRBV'], 0) # Add theta in the hdf file self.add_theta() log.info('Adding a frame from the IP camera') with open(CREDENTIALS_FILE_NAME, 'r') as file: for line in file: username, password = line.strip().split('|') ret, frame = cv2.VideoCapture('http://' + username +':' + password + '@10.54.113.162/cgi-bin/mjpeg?stream=1').read() #station A # NetBooter = NetBooter_Control(mode='telnet',id=self.access_dic['pdu_username'],password=self.access_dic['pdu_password'],ip=self.access_dic['pdu_ip_address']) # NetBooter.power_on(1) # log.info('wait 10 sec while the web camera has focused') # time.sleep(10) # ret, frame = cv2.VideoCapture('http://remotecam02bma:Cam-02-bm-a@164.54.113.137/cgi-bin/mjpeg?stream=1').read()# we should hide the password #ret, frame = cv2.VideoCapture('http://' + self.access_dic['webcam_username'] +':' + self.access_dic['webcam_password'] + '@' + self.access_dic['webcam_ip_address'] + '/cgi-bin/mjpeg?stream=1').read() # NetBooter.power_off(1) if ret==True: full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True) with h5py.File(full_file_name,'r+') as fid: fid.create_dataset('exchange/web_camera_frame', data=frame) log.info('The frame was added') else: log.warning('The frame was not added') # Copy raw data to data analysis computer log.info('Automatic data trasfer to data analysis computer is enabled.') full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True) remote_analysis_dir = self.epics_pvs['RemoteAnalysisDir'].get(as_string=True) copy_to_analysis_dir = self.epics_pvs['CopyToAnalysisDir'].get() if copy_to_analysis_dir == 1: log.info('Using FDT') dm.fdt_scp(full_file_name, remote_analysis_dir, Path(self.epics_pvs['DetectorTopDir'].get())) self.epics_pvs['ScanStatus'].put('fdt file transfer complete') elif copy_to_analysis_dir == 2: log.info('Using scp') dm.scp(full_file_name, remote_analysis_dir) self.epics_pvs['ScanStatus'].put('scp file transfer complete') else: log.warning('Automatic data trasfer to data analysis computer is disabled.') # Call the base class method super().end_scan()
[docs] def set_scan_exposure_time(self, exposure_time=None): camera_model = self.epics_pvs['CamModel'].get(as_string=True) if(camera_model=='Q-12A180-Fm/CXP-6'): if exposure_time is None: exposure_time = self.epics_pvs['ExposureTime'].value self.epics_pvs['CamAcquisitionFrameRate'].put(1/exposure_time, wait=True, timeout=10.0) self.epics_pvs['CamAcquireTime'].put(exposure_time, wait=True, timeout = 10.0) else: super().set_scan_exposure_time(exposure_time)
[docs] def add_theta(self): """Add theta at the end of a scan. """ log.info('add theta') full_file_name = self.epics_pvs['FPFullFileName'].get(as_string=True) if os.path.exists(full_file_name): try: with h5py.File(full_file_name, "a") as f: if self.theta is not None: unique_ids = f['/defaults/NDArrayUniqueId'] hdf_location = f['/defaults/HDF5FrameLocation'] total_dark_fields = self.num_dark_fields * ((self.dark_field_mode in ('Start', 'Both')) + (self.dark_field_mode in ('End', 'Both'))) total_flat_fields = self.num_flat_fields * ((self.flat_field_mode in ('Start', 'Both')) + (self.flat_field_mode in ('End', 'Both'))) proj_ids = unique_ids[hdf_location[:] == b'/exchange/data'] flat_ids = unique_ids[hdf_location[:] == b'/exchange/data_white'] dark_ids = unique_ids[hdf_location[:] == b'/exchange/data_dark'] # create theta dataset in hdf5 file if len(proj_ids) > 0: theta_ds = f.create_dataset('/exchange/theta', (len(proj_ids),)) theta_ds[:] = self.theta[proj_ids - proj_ids[0]] # warnings that data is missing if len(proj_ids) != len(self.theta): log.warning(f'There are {len(self.theta) - len(proj_ids)} missing data frames') missed_ids = [ele for ele in range(len(self.theta)) if ele not in proj_ids-proj_ids[0]] missed_theta = self.theta[missed_ids] # log.warning(f'Missed ids: {list(missed_ids)}') log.warning(f'Missed theta: {list(missed_theta)}') if len(flat_ids) != total_flat_fields: log.warning(f'There are {total_flat_fields - len(flat_ids)} missing flat field frames') if (len(dark_ids) != total_dark_fields): log.warning(f'There are {total_dark_fields - len(dark_ids)} missing dark field frames') except: log.error('Add theta: Failed accessing: %s', full_file_name) traceback.print_exc(file=sys.stdout) else: log.error('Failed adding theta. %s file does not exist', full_file_name)
[docs] def wait_pv(self, epics_pv, wait_val, timeout=-1): """Wait on a pv to be a value until max_timeout (default forever) delay for pv to change """ time.sleep(.01) start_time = time.time() while True: pv_val = epics_pv.get() if isinstance(pv_val, float): if abs(pv_val - wait_val) < EPSILON: return True if pv_val != wait_val: if timeout > -1: current_time = time.time() diff_time = current_time - start_time if diff_time >= timeout: log.error(' *** ERROR: DROPPED IMAGES ***') log.error(' *** wait_pv(%s, %d, %5.2f reached max timeout. Return False', epics_pv.pvname, wait_val, timeout) return False time.sleep(.01) else: return True
[docs] def wait_frontend_shutter_open(self, timeout=-1): """Waits for the front end shutter to open, or for ``abort_scan()`` to be called. While waiting this method periodically tries to open the shutter.. Parameters ---------- timeout : float The maximum number of seconds to wait before raising a ShutterTimeoutError exception. Raises ------ ScanAbortError If ``abort_scan()`` is called ShutterTimeoutError If the open shutter has not completed within timeout value. """ start_time = time.time() pv = self.epics_pvs['OpenShutter'] value = self.epics_pvs['OpenShutterValue'].get(as_string = True) log.info('open shutter: %s, value: %s', pv, value) elapsed_time = 0 while True: if self.epics_pvs['ShutterStatus'].get() == int(value): log.warning("Shutter is open in %f s", elapsed_time) return if not self.scan_is_running: exit() value = self.epics_pvs['OpenShutterValue'].get() time.sleep(1.0) current_time = time.time() elapsed_time = current_time - start_time log.warning("Waiting on shutter to open: %f s", elapsed_time) self.epics_pvs['OpenShutter'].put(value, wait=True) if timeout > 0: if elapsed_time >= timeout: exit()
[docs] class NetBooter_Control: ''' Offer NetBooter Control class: Support serial/telnet/http control Support outlet status checker / power on / power off / reboot Power on/off return setting success or fail, but reboot no return How to use it: From Serial NetBooter = NetBooter_Control(mode='serial',serial_port='COM1') NetBooter.power_on(1) #Return (True,'') for set Outlet 1 ON success NetBooter.power_off(5) #Return (True,'') for set Outlet 5 OFF success NetBooter.reboot(3) #No return, use NetBooter internal reboot function, don't suggest to use it Outlet3_Status = NetBooter.check_outlet_status(3) #Return (True,'') for Outlet 3 is ON | (False,'') for OFF From HTTP NetBooter = NetBooter_Control(mode='http',ip='192.168.1.101') NetBooter.power_on(2) #Return (True,'') for set Outlet 2 ON success NetBooter.power_off(4) #Return (True,'') for set Outlet 4 OFF success Outlet3_Status = NetBooter.check_outlet_status(3) #Return (True,'') for Outlet 3 is ON | (False,'') for OFF ''' def __init__(self,mode='serial',serial_port='COM1',id='admin',password='admin',ip='0.0.0.0'): ''' Class init Input: mode(serial/telnet/http) id/password [for login NetBooter] For serial: serial_port([Windows]COM1/COM2/COM3/[Linux]/dev/tty...) For telnet/http: ip ''' if not isinstance(mode,str): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid mode '+str(mode)) if not isinstance(id,str): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid id '+str(id)) if not isinstance(password,str): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid password '+str(password)) self.mode = mode.lower() self.id = id self.password = password if self.mode == 'serial': self.NetBooter_serial = serial.Serial() self.NetBooter_serial.port = serial_port self.NetBooter_serial.baudrate = 9600 self.NetBooter_serial.timeout = 3 self.NetBooter_serial.bytesize = serial.EIGHTBITS self.NetBooter_serial.parity = serial.PARITY_NONE self.NetBooter_serial.stopbits = serial.STOPBITS_ONE self.NetBooter_serial.xonxoff = 0 try: self.NetBooter_serial.open() except Exception as e: raise Exception(str(e)) if not self.NetBooter_serial.isOpen(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Fail to open '+str(serial_port)) for outlet in xrange(1,6): self.power_on(outlet) elif self.mode == 'telnet': self.ip = ip self.NetBooter_telnet = telnetlib.Telnet(self.ip) elif self.mode == 'http': self.ip = ip self.auth = base64.b64encode(bytearray(('%s:%s' % (self.id, self.password)).replace('\n', ''), 'utf-8')) self.NetBooter_httpconnection = httplib.HTTPConnection(self.ip,timeout=10) self.__check_netbooter__() def __check_netbooter__(self): if self.mode == 'serial': try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\nsysshow\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() temp1 = self.NetBooter_serial.read(300) self.NetBooter_serial.write('\nsysshow\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() temp2 = self.NetBooter_serial.read(300) status = temp1+temp2 self.NetBooter_serial.flushOutput() except Exception as e: raise Exception(str(e)) if status.find('System Name') == -1: raise Exception('Invalid NetBooter') elif self.mode == 'telnet': pass elif self.mode == 'http': NetBooter_Pattern = re.compile(r'Synaccess.*?NetBooter',re.I) NetBooter_rly_Pattern = re.compile(r'<a onclick="ajxCmd\(\'(.*?rly.*?)\d\'\);">') NetBooter_rb_Pattern = re.compile(r'<a onclick="ajxCmd\(\'(.*?rb.*?)\d\'\);">') try: self.NetBooter_httpconnection.putrequest("POST",'') self.NetBooter_httpconnection.putheader("Authorization", "Basic %s" % self.auth) self.NetBooter_httpconnection.endheaders() response = self.NetBooter_httpconnection.getresponse() res = response.read() except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Init http connection to NetBooter fail: '+str(e)) if response.status != 200: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Init http connection to NetBooter fail: '+str(response.status)) if not NetBooter_Pattern.search(res): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] http connection is not NetBooter: '+str(res)) rly_pair = NetBooter_rly_Pattern.search(res) if rly_pair: self.rly_url = rly_pair.group(1) else: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Fail to find NetBooter rly url: '+str(res)) rb_pair = NetBooter_rb_Pattern.search(res) if rb_pair: self.rb_url = rb_pair.group(1) else: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Fail to find NetBooter rb url: '+str(res)) def __del__(self): if self.mode == 'serial': self.NetBooter_serial.close() elif self.mode == 'telnet': self.NetBooter_telnet.close() elif self.mode == 'http': self.NetBooter_httpconnection.close()
[docs] def check_outlet_status(self,outlet): ''' Check outlet status Input: outlet(1/2/3/4/5) Output: True,''(For ON)/False,''(For OFF)/Exception,Exception Reason ''' if outlet not in (1,2,3,4,5,'1','2','3','4','5'): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid NetBooter outlet: '+str(outlet)) outlet = int(outlet) if self.mode == 'serial': if not self.NetBooter_serial.readable() or not self.NetBooter_serial.writable(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] NetBooter Serial not Readable/Writeable') try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\nsysshow\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() temp1 = self.NetBooter_serial.read(300) self.NetBooter_serial.write('\nsysshow\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() temp2 = self.NetBooter_serial.read(300) status = temp1+temp2 self.NetBooter_serial.flushOutput() except Exception as e: raise Exception(str(e)) try: for line in status.split('\n'): if line.find('Outlet Status(1-On, 0-Off. Outlet 1 to 5):') > -1: #Clean Unrecognizable Code line = line[43:].replace('\x00','') #Outlet list should be ['','0/1','0/1','0/1','0/1','0/1',''] outlets = line.split(' ') if outlets[outlet] == '0': return False,'' elif outlets[outlet] == '1': return True,'' else: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid Status: '+str(outlets)) except Exception as e: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e) return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Not find outlet: '+str(status) elif self.mode == 'telnet': try: self.NetBooter_telnet.write('\r\nsysshow\r\n'.encode('ascii')) temp = self.NetBooter_telnet.read_until(b'Note - use WEB access for more settings',2) except Exception as e: raise Exception(str(e)) try: for line in status.split('\n'): if line.find('Outlet Status(1-On, 0-Off. Outlet 1 to 5):') > -1: #Clean Unrecognizable Code line = line[43:].replace('\x00','') #Outlet list should be ['','0/1','0/1','0/1','0/1','0/1',''] outlets = line.split(' ') if outlets[outlet] == '0': return False,'' elif outlets[outlet] == '1': return True,'' else: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid Status: '+str(outlets)) except Exception as e: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e) return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Not find outlet: '+str(status) elif self.mode == 'http': res = self.NetBooter_httppost(url="/status.xml") if res[0] != True: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] No proper response from NetBooter: '+res[1] swoutlet = outlet - 1 pattern = re.compile(r'<rly%s>(1|0)</rly%s>'%(swoutlet,swoutlet)) if not pattern.search(res[1]): return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Not find proper outlet status: '+res[1] status = pattern.search(res[1]).group()[6:7] if status == '0': return False,'' elif status == '1': return True,'' else: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid Status: '+str(status))
[docs] def login(self): ''' Login NetBooter for serial/telnet mode No output ''' if self.mode == 'serial': if not self.NetBooter_serial.writable(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] NetBooter Serial not Writeable') try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\n!\nlogin\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write(str(self.id)+'\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write(str(self.password)+'\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) elif self.mode == 'telnet': try: self.NetBooter_telnet.write('\r\nlogin\r\n'.encode('ascii')) self.NetBooter_telnet.write((str(self.id)+'\r\n').encode('ascii')) self.NetBooter_telnet.write((str(self.password)+'\r\n').encode('ascii')) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e))
[docs] def power_on(self,outlet): ''' Set specific outlet on Input: outlet(1/2/3/4/5) Output: True,''[Set success]/False,''[Set fail]/Exception,'' ''' if outlet not in (1,2,3,4,5,'1','2','3','4','5'): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid NetBooter outlet: '+str(outlet)) outlet = int(outlet) if self.mode == 'http': current_status = self.check_outlet_status(outlet) if current_status[0] == True: return True,'' elif current_status[0] == False: swoutlet = outlet - 1 url = "/%s%s"%(self.rly_url,swoutlet) res = self.NetBooter_httppost(url) if res[0] == True: if res[1] == 'Success! ': new_status = self.check_outlet_status(outlet) if new_status[0] == True: return True,'' elif new_status[0] == False: return False,'['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Power on outlet fail2: '+new_status[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+new_status[1] else: return False,'['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Power on outlet fail1: '+res[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+res[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+current_status[1] time.sleep(2) self.login() if self.mode == 'serial': if not self.NetBooter_serial.writable(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] NetBooter Serial not Writeable') try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\npset '+str(outlet)+' 1\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() time.sleep(1) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) elif self.mode == 'telnet': try: self.NetBooter_telnet.write(('\r\npset '+str(outlet)+' 1\r\n').encode('ascii')) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) res_on = self.check_outlet_status(outlet) if res_on[0] == True: return True,'' elif res_on[0] == False: return False,'' else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+res_on[1]
[docs] def power_off(self,outlet): ''' Set specific outlet off Input: outlet(1/2/3/4/5) Output: True,''[Set success]/False,''[Set fail]/Exception,'' ''' if outlet not in (1,2,3,4,5,'1','2','3','4','5'): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid NetBooter outlet: '+str(outlet)) outlet = int(outlet) if self.mode == 'http': current_status = self.check_outlet_status(outlet) if current_status[0] == False: return True,'' elif current_status[0] == True: swoutlet = outlet - 1 url = "/%s%s"%(self.rly_url,swoutlet) res = self.NetBooter_httppost(url) if res[0] == True: if res[1] == 'Success! ': new_status = self.check_outlet_status(outlet) if new_status[0] == False: return True,'' elif new_status[0] == True: return False,'['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Power off outlet fail2: '+new_status[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+new_status[1] else: return False,'['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Power off outlet fail1: '+res[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+res[1] else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+current_status[1] time.sleep(2) self.login() if self.mode == 'serial': if not self.NetBooter_serial.writable(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] NetBooter Serial not Writeable') try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\npset '+str(outlet)+' 0\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() time.sleep(1) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) elif self.mode == 'telnet': try: self.NetBooter_telnet.write(('\r\npset '+str(outlet)+' 0\r\n').encode('ascii')) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) res_off = self.check_outlet_status(outlet) if res_off[0] == False: return True,'' elif res_off[0] == True: return False,'' else: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+res_off[1]
[docs] def reboot(self,outlet): ''' Set specific outlet reboot by internal reboot function from NetBooter Input: outlet(1/2/3/4/5) No output ''' if outlet not in (1,2,3,4,5,'1','2','3','4','5'): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Invalid NetBooter outlet: '+str(outlet)) outlet = int(outlet) if self.mode == 'http': current_status = self.check_outlet_status(outlet) swoutlet = outlet - 1 url = "/%s%s"%(self.rb_url,swoutlet) res = self.NetBooter_httppost(url) time.sleep(3) if res[0] == True: if res[1] == 'Success! ': new_status = self.check_outlet_status(outlet) self.login() if self.mode == 'serial': if not self.NetBooter_serial.writable(): raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] NetBooter Serial not Writeable') try: self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() self.NetBooter_serial.write('\nrb '+str(outlet)+'\n') self.NetBooter_serial.flush() self.NetBooter_serial.flushInput() self.NetBooter_serial.flushOutput() #time.sleep(1) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e)) elif self.mode == 'telnet': try: self.NetBooter_telnet.write(('\r\nrb '+str(outlet)+'\r\n').encode('ascii')) except Exception as e: raise Exception('['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e))
[docs] def NetBooter_httppost(self,url): ''' Common NetBooter http post Input: url(/status.xml[for get stauts] or /cmd.cgi?rly=#1[for set power on/off]) ''' try: self.NetBooter_httpconnection.putrequest("POST", url) self.NetBooter_httpconnection.putheader("Authorization", "Basic %s" % self.auth) self.NetBooter_httpconnection.endheaders() response = self.NetBooter_httpconnection.getresponse() res = response.read() except Exception as e: return 'Exception','['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+']'+str(e) if response.status != 200: return False,'['+os.path.basename(__file__)+']['+sys._getframe().f_code.co_name+'] Unknown http connection status: '+str(response.status) return True,res