User Tools

Site Tools


userspace:json-rpc_python_requests

This is an old revision of the document!


Module for json-rpc requests in Python

requires: requests

This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts.

It is provided as is and without any warranty or support!

More error handling and concern for security is needed before use in production environments!

The Module (opsirpc.py)

import logging
import requests
import getpass
import base64
import os.path
# ignore warnings about missing ssl cert field
from requests.packages.urllib3.exceptions import SubjectAltNameWarning
 
opsicon_logger = logging.getLogger(__name__)
 
 
class OpsiError(Exception):
    """Exception raised for remote RPC errors
 
    Attributes:
        expression -- the RPC Call
        class -- the Error class
        message -- the Error message
    """
 
    def __init__(self, expression, opsierrorclass, message):
        self.expression = expression
        self.opsierrorclass = opsierrorclass
        self.message = message
 
 
class OpsiConnection:
    def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None):
        # 0 is not a valid id
        self.id = 0
        self.server = url
        self.certfile = certfile
        self.legal_methods = None
        if not (authfile or auth):
            # get auth from user via commandline input
            auth = (input("Username: "), getpass.getpass(prompt="Password: "))
        elif not auth:
            # get auth from file
            # authfile expected to contain base64 encoded UTF-8 String (username password)
            # The authfile has to be kept in an secure environment!
            with open(authfile, 'rb') as f:
                auth = tuple(base64.b64decode(f.read()).decode("utf-8").split())
        # create session
        self.session = self.__get_session(auth)
 
        if legal_methods_path:
            opsicon_logger.debug("Get Methods from File...")
            if os.path.isfile(legal_methods_path):
                with open(legal_methods_path, 'r') as f:
                    self.legal_methods = f.read().splitlines()
            else:
                raise FileNotFoundError(legal_methods_path)
        else:
            # getPossibleMethods_listOfHashes does not always deliver a comprehensive list of methods!
            opsicon_logger.debug("Get Methods from Server...")
            self.id += 1
            response = self.__rpc_request(self.session,
                                          {"method": "getPossibleMethods_listOfHashes",
                                           "params": [],
                                           "id": self.id})
            rjson = response.json()
            if rjson['result']:
                self.legal_methods = []
            for method in rjson['result']:
                self.legal_methods.append(method['name'])
            opsicon_logger.debug("Got Methods.")
 
    def raw_request(self, payload):
        self.__rpc_request(self.session, payload)
 
    def __getattr__(self, name):
        if name in self.legal_methods:
            def _rpc_call(*args, **kwargs):
                self.id += 1
                payload = {"method": name,
                           "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:],
                           "id": self.id}
                opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload)
                return self.__rpc_request(self.session, payload)
 
            return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs)
        else:
            raise AttributeError
 
    def __get_session(self, auth):
        session = requests.Session()
        # avoid proxy issues
        session.trust_env = False
        # ignore warnings about missing ssl cert field 
        requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
        # supply cert file
        if self.certfile:
            session.verify = self.certfile
        session.auth = auth
        opsicon_logger.debug("Created new session: %s}" % session)
        return session
 
    def __rpc_request(self, session, payload):
        try:
            r = session.post(self.server + '/rpc', json=payload)
            r.raise_for_status()
        except requests.exceptions.Timeout as e:
            # TODO: Maybe set up retry loop
            raise e
        except requests.exceptions.RequestException as e:
            # TODO: Maybe think about better handling
            raise e
        if r.json()["error"]:
            raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message'])
        return r

Example

from opsirpc import OpsiConnection as OpsiServerCon
 
def print_name_example(response):
    for o in response.json()['result']:
        print(o['name'])
 
 
server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447',
                                  authfile = "myPasswordBase64.txt",
                                  certfile = "myCert.crt"
                                  legal_methods_path='rpc_methods.txt')
 
r = server_connection.product_getObjects(licenseRequired=True)
print_name_example(r)
 
print(server_connection.product_getObjects('description', id='flashplayer').json()['result'][0]['description'])

rpc_methods.txt

userspace/json-rpc_python_requests.1522136770.txt.gz · Last modified: 2021/08/23 08:37 (external edit)