User Tools

Site Tools


userspace:json-rpc_python_requests

Module for json-rpc requests in Python

requires: requests

This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts.

It is provided as is and without any warranty or support!

More error handling and concern for security is needed before use in production environments!

The Module (opsirpc.py)

import logging
import requests
import getpass
import base64
import os.path
 
# ignore warnings about missing ssl cert field subjectAltName (you may remove this and ln 102)
from requests.packages.urllib3.exceptions import SubjectAltNameWarning
 
opsicon_logger = logging.getLogger(__name__)
 
 
class OpsiError(Exception):
 
    def __init__(self, expression, opsierrorclass, message):
        """
 
        :param expression: the RPC Call
        :param opsierrorclass: the Error class
        :param message: the Error message
        """
        self.expression = expression
        self.opsierrorclass = opsierrorclass
        self.message = message
 
 
class OpsiConnection:
 
    def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None):
        # 0 is not a valid id
        """
 
        :param url: Opsiserver URL
        :param authfile: base64 encoded UTF-8 String containing username password
        :param auth: username password as tuple
        :param certfile: Opsiserver SSL Certificate
        :param legal_methods_path: Textfile containing one method name per line 
        """
        self.id = 0
        self.server = url
        self.certfile = certfile
        self.legal_methods = None
        if not (authfile or auth):
            # get auth from user via commandline input
            auth = (input("Username: "), getpass.getpass(prompt="Password: "))
        elif not auth:
            # get auth from file
            # The authfile has to be kept in an secure environment!
            with open(authfile, 'rb') as f:
                auth = tuple(base64.b64decode(f.read()).decode("utf-8").split())
        # create session
        self.session = self.__get_session(auth)
 
        if legal_methods_path:
            opsicon_logger.debug("Get Methods from File...")
            if os.path.isfile(legal_methods_path):
                with open(legal_methods_path, 'r') as f:
                    self.legal_methods = f.read().splitlines()
            else:
                raise FileNotFoundError(legal_methods_path)
        else:
            # getPossibleMethods_listOfHashes lacks the modern _getObjects methods!
            opsicon_logger.debug("Get Methods from Server...")
            self.id += 1
            response = self.__rpc_request(self.session,
                                          {"method": "getPossibleMethods_listOfHashes",
                                           "params": [],
                                           "id": self.id})
            rjson = response.json()
            if rjson['result']:
                self.legal_methods = []
            for method in rjson['result']:
                self.legal_methods.append(method['name'])
            opsicon_logger.debug("Got Methods.")
 
    def raw_request(self, payload):
        """
 
        :param payload: json string to send to the server
        """
        self.__rpc_request(self.session, payload)
 
    def __getattr__(self, name):
        if name in self.legal_methods:
            def _rpc_call(*args, **kwargs):
                self.id += 1
                payload = {"method": name,
                           "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:],
                           "id": self.id}
                opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload)
                return self.__rpc_request(self.session, payload)
 
            return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs)
        else:
            raise AttributeError
 
    def __get_session(self, auth):
        session = requests.Session()
        # avoid proxy issues
        session.trust_env = False
        # ignore warnings about missing ssl cert field subjectAltName
        requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
        # supply cert file
        if self.certfile:
            session.verify = self.certfile
        session.auth = auth
        opsicon_logger.debug("Created new session: %s}" % session)
        return session
 
    def __rpc_request(self, session, payload):
        try:
            r = session.post(self.server + '/rpc', json=payload)
            r.raise_for_status()
        except requests.exceptions.Timeout as e:
            raise e
        except requests.exceptions.RequestException as e:
            raise e
        if r.json()["error"]:
            raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message'])
        return r.json()['result']

Example

import json
 
from opsirpc import OpsiConnection as OpsiServerCon
 
 
def print_name_example(response):
    for o in response:
        print(o['name'])
 
 
server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447',
                                  authfile = "myPasswordBase64.txt",
                                  certfile = "myCert.crt"
                                  legal_methods_path='rpc_methods.txt')
 
r = server_connection.product_getObjects(licenseRequired=True)
print_name_example(r)
 
print(server_connection.product_getObjects('description', id='flashplayer')[0]['description'])
print('\n'.join([json.dumps(d, indent=4, sort_keys=True) for d
                 in server_connection.getPossibleMethods_listOfHashes()]))

rpc_methods.txt

userspace/json-rpc_python_requests.txt · Last modified: 2021/08/23 08:37 (external edit)