====== Module for json-rpc requests in Python ====== requires: requests This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts. **It is provided as is and without any warranty or support! ** **More error handling and concern for security is needed before use in production environments! ** ===== The Module (opsirpc.py) ===== import logging import requests import getpass import base64 import os.path # ignore warnings about missing ssl cert field subjectAltName (you may remove this and ln 102) from requests.packages.urllib3.exceptions import SubjectAltNameWarning opsicon_logger = logging.getLogger(__name__) class OpsiError(Exception): def __init__(self, expression, opsierrorclass, message): """ :param expression: the RPC Call :param opsierrorclass: the Error class :param message: the Error message """ self.expression = expression self.opsierrorclass = opsierrorclass self.message = message class OpsiConnection: def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None): # 0 is not a valid id """ :param url: Opsiserver URL :param authfile: base64 encoded UTF-8 String containing username password :param auth: username password as tuple :param certfile: Opsiserver SSL Certificate :param legal_methods_path: Textfile containing one method name per line """ self.id = 0 self.server = url self.certfile = certfile self.legal_methods = None if not (authfile or auth): # get auth from user via commandline input auth = (input("Username: "), getpass.getpass(prompt="Password: ")) elif not auth: # get auth from file # The authfile has to be kept in an secure environment! with open(authfile, 'rb') as f: auth = tuple(base64.b64decode(f.read()).decode("utf-8").split()) # create session self.session = self.__get_session(auth) if legal_methods_path: opsicon_logger.debug("Get Methods from File...") if os.path.isfile(legal_methods_path): with open(legal_methods_path, 'r') as f: self.legal_methods = f.read().splitlines() else: raise FileNotFoundError(legal_methods_path) else: # getPossibleMethods_listOfHashes lacks the modern _getObjects methods! opsicon_logger.debug("Get Methods from Server...") self.id += 1 response = self.__rpc_request(self.session, {"method": "getPossibleMethods_listOfHashes", "params": [], "id": self.id}) rjson = response.json() if rjson['result']: self.legal_methods = [] for method in rjson['result']: self.legal_methods.append(method['name']) opsicon_logger.debug("Got Methods.") def raw_request(self, payload): """ :param payload: json string to send to the server """ self.__rpc_request(self.session, payload) def __getattr__(self, name): if name in self.legal_methods: def _rpc_call(*args, **kwargs): self.id += 1 payload = {"method": name, "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:], "id": self.id} opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload) return self.__rpc_request(self.session, payload) return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs) else: raise AttributeError def __get_session(self, auth): session = requests.Session() # avoid proxy issues session.trust_env = False # ignore warnings about missing ssl cert field subjectAltName requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) # supply cert file if self.certfile: session.verify = self.certfile session.auth = auth opsicon_logger.debug("Created new session: %s}" % session) return session def __rpc_request(self, session, payload): try: r = session.post(self.server + '/rpc', json=payload) r.raise_for_status() except requests.exceptions.Timeout as e: raise e except requests.exceptions.RequestException as e: raise e if r.json()["error"]: raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message']) return r.json()['result'] ===== Example ===== import json from opsirpc import OpsiConnection as OpsiServerCon def print_name_example(response): for o in response: print(o['name']) server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447', authfile = "myPasswordBase64.txt", certfile = "myCert.crt" legal_methods_path='rpc_methods.txt') r = server_connection.product_getObjects(licenseRequired=True) print_name_example(r) print(server_connection.product_getObjects('description', id='flashplayer')[0]['description']) print('\n'.join([json.dumps(d, indent=4, sort_keys=True) for d in server_connection.getPossibleMethods_listOfHashes()])) [[https://pastebin.com/raw/2AMSkiTA|rpc_methods.txt]]