requires: requests
This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts.
It is provided as is and without any warranty or support!
More error handling and concern for security is needed before use in production environments!
import logging import requests import getpass import base64 import os.path # ignore warnings about missing ssl cert field subjectAltName (you may remove this and ln 102) from requests.packages.urllib3.exceptions import SubjectAltNameWarning opsicon_logger = logging.getLogger(__name__) class OpsiError(Exception): def __init__(self, expression, opsierrorclass, message): """ :param expression: the RPC Call :param opsierrorclass: the Error class :param message: the Error message """ self.expression = expression self.opsierrorclass = opsierrorclass self.message = message class OpsiConnection: def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None): # 0 is not a valid id """ :param url: Opsiserver URL :param authfile: base64 encoded UTF-8 String containing username password :param auth: username password as tuple :param certfile: Opsiserver SSL Certificate :param legal_methods_path: Textfile containing one method name per line """ self.id = 0 self.server = url self.certfile = certfile self.legal_methods = None if not (authfile or auth): # get auth from user via commandline input auth = (input("Username: "), getpass.getpass(prompt="Password: ")) elif not auth: # get auth from file # The authfile has to be kept in an secure environment! with open(authfile, 'rb') as f: auth = tuple(base64.b64decode(f.read()).decode("utf-8").split()) # create session self.session = self.__get_session(auth) if legal_methods_path: opsicon_logger.debug("Get Methods from File...") if os.path.isfile(legal_methods_path): with open(legal_methods_path, 'r') as f: self.legal_methods = f.read().splitlines() else: raise FileNotFoundError(legal_methods_path) else: # getPossibleMethods_listOfHashes lacks the modern _getObjects methods! opsicon_logger.debug("Get Methods from Server...") self.id += 1 response = self.__rpc_request(self.session, {"method": "getPossibleMethods_listOfHashes", "params": [], "id": self.id}) rjson = response.json() if rjson['result']: self.legal_methods = [] for method in rjson['result']: self.legal_methods.append(method['name']) opsicon_logger.debug("Got Methods.") def raw_request(self, payload): """ :param payload: json string to send to the server """ self.__rpc_request(self.session, payload) def __getattr__(self, name): if name in self.legal_methods: def _rpc_call(*args, **kwargs): self.id += 1 payload = {"method": name, "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:], "id": self.id} opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload) return self.__rpc_request(self.session, payload) return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs) else: raise AttributeError def __get_session(self, auth): session = requests.Session() # avoid proxy issues session.trust_env = False # ignore warnings about missing ssl cert field subjectAltName requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) # supply cert file if self.certfile: session.verify = self.certfile session.auth = auth opsicon_logger.debug("Created new session: %s}" % session) return session def __rpc_request(self, session, payload): try: r = session.post(self.server + '/rpc', json=payload) r.raise_for_status() except requests.exceptions.Timeout as e: raise e except requests.exceptions.RequestException as e: raise e if r.json()["error"]: raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message']) return r.json()['result']
import json from opsirpc import OpsiConnection as OpsiServerCon def print_name_example(response): for o in response: print(o['name']) server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447', authfile = "myPasswordBase64.txt", certfile = "myCert.crt" legal_methods_path='rpc_methods.txt') r = server_connection.product_getObjects(licenseRequired=True) print_name_example(r) print(server_connection.product_getObjects('description', id='flashplayer')[0]['description']) print('\n'.join([json.dumps(d, indent=4, sort_keys=True) for d in server_connection.getPossibleMethods_listOfHashes()]))