#!/usr/bin/env python # -*- coding: utf-8 -*- # In a terminal: # indiserver -v indi_simulator_ccd indi_simulator_telescope indi_simulator_focus # In an other one: python my_tuto_indi_client.py # Note that all INDI constants are accessible from the module as # PyIndi.CONSTANTNAME import sys, time, logging import PyIndi import threading import io # To manipulate the selected device, we must first access it as the device can # appear dynamically. The properties of the device are also dynamic, that is, # they can come and go. # All communication in INDI is done by updating properties. class IndiClient(PyIndi.BaseClient): def __init__(self): super(IndiClient, self).__init__() self.logger = logging.getLogger('PyQtIndi.IndiClient') self.logger.info('Creating an instance of PyQtIndi.IndiClient') def newDevice(self, d): """Emmited when a new device is created from INDI server.""" self.logger.info("New device " + d.getDeviceName()) def removeDevice(self, d): """Emmited when a device is deleted from INDI server.""" self.logger.info(f"remove device {d.getDeviceName()}") def newProperty(self, p): """Emmited when a new property is created for an INDI driver.""" self.logger.info("New property "+ p.getName() + " for device " + p.getDeviceName()) def updateProperty(self, p): """ Emmited when a new property value arrives from INDI server.""" self.logger.info("Update property " + p.getName() + " for device " + p.getDeviceName()) global blob_event if p.getType() == PyIndi.INDI_BLOB: print("New BLOB ", p.getName()) blob_event.set() def removeProperty(self, p): """Emmited when a new property is deleted for an INDI driver.""" self.logger.info("Remove property "+ p.getName() + " for device " + p.getDeviceName()) def newBLOB(self, bp): self.logger.info("New BLOB "+ bp.name) def newSwitch(self, svp): self.logger.info ("New Switch "+ svp.name + " for device " + svp.device) def newNumber(self, nvp): self.logger.info("New Number "+ nvp.name + " for device " + nvp.device) def newText(self, tvp): self.logger.info("New Text "+ tvp.name + " for device " + tvp.device) def newLight(self, lvp): self.logger.info("New Light "+ lvp.name + " for device " + lvp.device) def newMessage(self, d, m): """Emmited when a new message arrives from INDI server.""" self.logger.info("New Message " + d.messageQueue(m)) pass def serverConnected(self): """Emmited when the server is connected.""" print("Server connected (" + self.getHost() + ":" + str(self.getPort()) + ")") def serverDisconnected(self, code): """Emmited when the server gets disconnected.""" self.logger.info("Server disconnected (exit code = " + str(code) + "," + str(self.getHost()) + ":" + str(self.getPort()) + ")") if __name__ == "__main__": logging.basicConfig(format='%(asctime)s %(message)s', level=logging.ERROR) # Instantiate the client indi_client = IndiClient() # Set indi server localhost and port 7624 indi_client.setServer("localhost",7624) # Connect to indi server print("Connecting to indiserver") if (not(indi_client.connectServer())): print("No indiserver running on " + indi_client.getHost() + ":" + str(indi_client.getPort()) + " - Try to run", file=sys.stderr) print("indiserver -v indi_simulator_telescope indi_simulator_ccd " + "indi_simulator_focus", file=sys.stderr) sys.exit(1) # Start endless loop, client works asynchron in background # while True: # time.sleep(1) # Waiting for discover devices time.sleep(1) # Print list of devices. The list is obtained from the wrapper function # getDevices as indiClient is an instance of PyIndi.BaseClient and the # original C++ array is mapped to a Python List. # Each device in this list is an instance of PyIndi.BaseDevice, so we use # getDeviceName to print its actual name. print("List of devices") device_list = indi_client.getDevices() for device in device_list: print(f" > {device.getDeviceName()}") print_properties = False if print_properties is True: # Print all properties and their associated values. print("List of Device Properties") for device in device_list: print(f"-- {device.getDeviceName()}") generic_property_list = device.getProperties() for generic_property in generic_property_list: print(f" > {generic_property.getName()} " + f"{generic_property.getTypeAsString()}") if generic_property.getType() == PyIndi.INDI_TEXT: for widget in PyIndi.PropertyText(generic_property): print(f"\t{widget.getName()}({widget.getLabel()}) " + f"= {widget.getText()}") if generic_property.getType() == PyIndi.INDI_NUMBER: for widget in PyIndi.PropertyNumber(generic_property): print(f"\t{widget.getName()}({widget.getLabel()}) " + f"= {widget.getValue()}") if generic_property.getType() == PyIndi.INDI_SWITCH: for widget in PyIndi.PropertySwitch(generic_property): print(f"\t{widget.getName()}({widget.getLabel()}) " + f"= {widget.getStateAsString()}") if generic_property.getType() == PyIndi.INDI_LIGHT: for widget in PyIndi.PropertyLight(generic_property): print(f"\t{widget.getLabel()}({widget.getLabel()}) " + f"= {widget.getStateAsString()}") if generic_property.getType() == PyIndi.INDI_BLOB: for widget in PyIndi.PropertyBlob(generic_property): print(f"\t{widget.getName()}({widget.getLabel()}) " + f"= ") use_telescope = False if use_telescope is True: # Connect the scope telescope = "Telescope Simulator" device_telescope = None telescope_connect = None # Get the telescope device device_telescope = indi_client.getDevice(telescope) while device_telescope is False: time.sleep(0.5) device_telescope = indi_client.getDevice(telescope) print("device_telescope OK") # Wait CONNECTION property be defined for telescope telescope_connect = device_telescope.getSwitch("CONNECTION") while telescope_connect is False: time.sleep(0.5) telescope_connect = device_telescope.getSwitch("CONNECTION") print("telescope_connect OK") # If the telescope device is not connected, we do connect it if device_telescope.isConnected() is False: # Property vectors are mapped to iterable Python objects # Hence we can access each element of the vector using Python # indexing each element of the "CONNECTION" vector is a ISwitch telescope_connect.reset() # The "CONNECT" switch telescope_connect[0].setState(PyIndi.ISS_ON) # Send this new value to the device indi_client.sendNewProperty(telescope_connect) print("Telescope connected OK") # Now let's make a goto to Vega # Beware that ra/dec are in decimal hours/degrees Vega = {"ra": (279.23473479 * 24.0) / 360.0, "dec": +38.78368896} # We want to set the ON_COORD_SET switch to engage tracking after goto # device.getSwitch is a helper to retrieve a property vector telescope_on_coord_set = device_telescope.getSwitch("ON_COORD_SET") while telescope_on_coord_set is False: time.sleep(0.5) telescope_on_coord_set = device_telescope.getSwitch("ON_COORD_SET") # The order below is defined in the property vector, look at the # standard Properties page or enumerate them in the Python shell # when you're developing your program telescope_on_coord_set.reset() # Index 0-TRACK, 1-SLEW, 2-SYNC telescope_on_coord_set[0].setState(PyIndi.ISS_ON) indi_client.sendNewProperty(telescope_on_coord_set) # We set the desired coordinates telescope_radec = device_telescope.getNumber("EQUATORIAL_EOD_COORD") while telescope_radec is False: time.sleep(0.5) telescope_radec = device_telescope.getNumber("EQUATORIAL_EOD_COORD") telescope_radec[0].setValue(Vega["ra"]) telescope_radec[1].setValue(Vega["dec"]) indi_client.sendNewProperty(telescope_radec) # And wait for the scope has finished moving while telescope_radec.getState() == PyIndi.IPS_BUSY: print("Scope Moving ", telescope_radec[0].value, telescope_radec[1].value) time.sleep(2) # Let's take some pictures ccd = "CCD Simulator" device_ccd = indi_client.getDevice(ccd) while device_ccd is False: time.sleep(0.5) device_ccd = indi_client.getDevice(ccd) print("device_ccd OK") ccd_connect = device_ccd.getSwitch("CONNECTION") while ccd_connect is False: time.sleep(0.5) ccd_connect = device_ccd.getSwitch("CONNECTION") print("ccd_connect OK") if device_ccd.isConnected() is False: ccd_connect.reset() # The "CONNECT" switch ccd_connect[0].setState(PyIndi.ISS_ON) indi_client.sendNewProperty(ccd_connect) # indi_client.sendNewSwitch(ccd_connect) print("CCD connected OK") ccd_exposure = device_ccd.getNumber("CCD_EXPOSURE") while ccd_exposure is False: time.sleep(0.5) ccd_exposure = device_ccd.getNumber("CCD_EXPOSURE") print("ccd_exposure OK") # Ensure the CCD simulator snoops the telescope simulator # otherwise you may not have a picture of Vega ccd_active_devices = device_ccd.getText("ACTIVE_DEVICES") while ccd_active_devices is False: time.sleep(0.5) ccd_active_devices = device_ccd.getText("ACTIVE_DEVICES") ccd_active_devices[0].setText("Telescope Simulator") indi_client.sendNewProperty(ccd_active_devices) # indi_client.sendNewText(ccd_active_devices) print("ccd_active_devices OK") # We should inform the indi server that we want to receive the # "CCD1" blob from this device indi_client.setBLOBMode(PyIndi.B_ALSO, ccd, "CCD1") # CCD1 - property of type PropertyBlob, which stores the captured photo # from the camera ccd_ccd1 = device_ccd.getBLOB("CCD1") while ccd_ccd1 is False: time.sleep(0.5) ccd_ccd1 = device_ccd.getBLOB("CCD1") print("ccd_ccd1 OK") # A list of our exposure times exposures = [1.0, 5.0] # We use here the threading.Event facility of Python # We define an event for newBlob event blob_event = threading.Event() blob_event.clear() i = 0 ccd_exposure[0].setValue(exposures[i]) indi_client.sendNewProperty(ccd_exposure) # indi_client.sendNewNumber(ccd_exposure) while i < len(exposures): # Wait for the ith exposure blob_event.wait() # We can start immediately the next one if i + 1 < len(exposures): ccd_exposure[0].setValue(exposures[i + 1]) blob_event.clear() indi_client.sendNewProperty(ccd_exposure) # indi_client.sendNewNumber(ccd_exposure) # And meanwhile process the received one for blob in ccd_ccd1: print("Name: ", blob.getName(), " size: ", blob.getSize(), " format: ",blob.getFormat()) # pyindi-client adds a getblobdata() method to IBLOB item # for accessing the contents of the blob, which is a bytearray # in Python. fits = blob.getblobdata() print("fits data type: ", type(fits)) # Here you may use astropy.io.fits to access the fits data # and perform some computations while the ccd is exposing # but this is outside the scope of this tutorial. # Write image data to BytesIO buffer blobfile = io.BytesIO(fits) # Open a file and save buffer to disk with open(f"frame_{i}.fit", "wb") as f: f.write(blobfile.getvalue()) i += 1 # Let's focus focus = "FOCUSER Simulator" device_focus = indi_client.getDevice(focus) while device_focus is False: time.sleep(0.5) device_focus = indi_client.getDevice(focus) print("device_focus OK") focus_connect = device_focus.getSwitch("CONNECTION") while focus_connect is False: time.sleep(0.5) focus_connect = device_focus.getSwitch("CONNECTION") print("focus_connect OK") if device_focus.isConnected() is False: focus_connect.reset() # The "CONNECT" switch focus_connect[0].setState(PyIndi.ISS_ON) indi_client.sendNewProperty(focus_connect) # indi_client.sendNewSwitch(focus_connect) print("Focus connected OK") focus_max_pos = device_focus.getNumber("FOCUS_MAX") while focus_max_pos is False: time.sleep(0.5) focus_max_pos = device_focus.getNumber("FOCUS_MAX") print(f"focus_max_pos = {focus_max_pos[0].value}") focus_max_pos[0].setValue(8333) indi_client.sendNewProperty(focus_max_pos) focus_max_pos = device_focus.getNumber("FOCUS_MAX") while focus_max_pos is False: time.sleep(0.5) focus_max_pos = device_focus.getNumber("FOCUS_MAX") print(f"focus_max_pos = {focus_max_pos[0].value}") focus_speed = device_focus.getNumber("FOCUS_SPEED") while focus_speed is False: time.sleep(0.5) focus_speed = device_focus.getNumber("FOCUS_SPEED") print(f"focus_speed = {focus_speed[0].value}") # Disconnect from the indiserver print("Disconnecting") indi_client.disconnectServer()