This is an old revision of the document!
requires: requests
This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts.
It is provided as is and without any warranty or support!
More error handling and concern for security is needed before use in production environments!
import logging import requests import getpass import base64 import os.path # ignore warnings about missing ssl cert field from requests.packages.urllib3.exceptions import SubjectAltNameWarning opsicon_logger = logging.getLogger(__name__) class OpsiError(Exception): """Exception raised for remote RPC errors Attributes: expression -- the RPC Call class -- the Error class message -- the Error message """ def __init__(self, expression, opsierrorclass, message): self.expression = expression self.opsierrorclass = opsierrorclass self.message = message class OpsiConnection: def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None): # 0 is not a valid id self.id = 0 self.server = url self.certfile = certfile self.legal_methods = None if not (authfile or auth): # get auth from user via commandline input auth = (input("Username: "), getpass.getpass(prompt="Password: ")) elif not auth: # get auth from file # authfile expected to contain base64 encoded UTF-8 String (username password) # The authfile has to be kept in an secure environment! with open(authfile, 'rb') as f: auth = tuple(base64.b64decode(f.read()).decode("utf-8").split()) # create session self.session = self.__get_session(auth) if legal_methods_path: opsicon_logger.debug("Get Methods from File...") if os.path.isfile(legal_methods_path): with open(legal_methods_path, 'r') as f: self.legal_methods = f.read().splitlines() else: raise FileNotFoundError(legal_methods_path) else: # getPossibleMethods_listOfHashes does not always deliver a comprehensive list of methods! opsicon_logger.debug("Get Methods from Server...") self.id += 1 response = self.__rpc_request(self.session, {"method": "getPossibleMethods_listOfHashes", "params": [], "id": self.id}) rjson = response.json() if rjson['result']: self.legal_methods = [] for method in rjson['result']: self.legal_methods.append(method['name']) opsicon_logger.debug("Got Methods.") def raw_request(self, payload): self.__rpc_request(self.session, payload) def __getattr__(self, name): if name in self.legal_methods: def _rpc_call(*args, **kwargs): self.id += 1 payload = {"method": name, "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:], "id": self.id} opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload) return self.__rpc_request(self.session, payload) return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs) else: raise AttributeError def __get_session(self, auth): session = requests.Session() # avoid proxy issues session.trust_env = False # ignore warnings about missing ssl cert field requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) # supply cert file if self.certfile: session.verify = self.certfile session.auth = auth opsicon_logger.debug("Created new session: %s}" % session) return session def __rpc_request(self, session, payload): try: r = session.post(self.server + '/rpc', json=payload) r.raise_for_status() except requests.exceptions.Timeout as e: # TODO: Maybe set up retry loop raise e except requests.exceptions.RequestException as e: # TODO: Maybe think about better handling raise e if r.json()["error"]: raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message']) return r
from opsirpc import OpsiConnection as OpsiServerCon def print_name_example(response): for o in response.json()['result']: print(o['name']) server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447', authfile = "myPasswordBase64.txt", certfile = "myCert.crt" legal_methods_path='rpc_methods.txt') r = server_connection.product_getObjects(licenseRequired=True) print_name_example(r) print(server_connection.product_getObjects('description', id='flashplayer').json()['result'][0]['description'])