====== Module for json-rpc requests in Python ======
requires: requests
This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts.
**It is provided as is and without any warranty or support! **
**More error handling and concern for security is needed before use in production environments! **
===== The Module (opsirpc.py) =====
import logging
import requests
import getpass
import base64
import os.path
# ignore warnings about missing ssl cert field subjectAltName (you may remove this and ln 102)
from requests.packages.urllib3.exceptions import SubjectAltNameWarning
opsicon_logger = logging.getLogger(__name__)
class OpsiError(Exception):
def __init__(self, expression, opsierrorclass, message):
"""
:param expression: the RPC Call
:param opsierrorclass: the Error class
:param message: the Error message
"""
self.expression = expression
self.opsierrorclass = opsierrorclass
self.message = message
class OpsiConnection:
def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None):
# 0 is not a valid id
"""
:param url: Opsiserver URL
:param authfile: base64 encoded UTF-8 String containing username password
:param auth: username password as tuple
:param certfile: Opsiserver SSL Certificate
:param legal_methods_path: Textfile containing one method name per line
"""
self.id = 0
self.server = url
self.certfile = certfile
self.legal_methods = None
if not (authfile or auth):
# get auth from user via commandline input
auth = (input("Username: "), getpass.getpass(prompt="Password: "))
elif not auth:
# get auth from file
# The authfile has to be kept in an secure environment!
with open(authfile, 'rb') as f:
auth = tuple(base64.b64decode(f.read()).decode("utf-8").split())
# create session
self.session = self.__get_session(auth)
if legal_methods_path:
opsicon_logger.debug("Get Methods from File...")
if os.path.isfile(legal_methods_path):
with open(legal_methods_path, 'r') as f:
self.legal_methods = f.read().splitlines()
else:
raise FileNotFoundError(legal_methods_path)
else:
# getPossibleMethods_listOfHashes lacks the modern _getObjects methods!
opsicon_logger.debug("Get Methods from Server...")
self.id += 1
response = self.__rpc_request(self.session,
{"method": "getPossibleMethods_listOfHashes",
"params": [],
"id": self.id})
rjson = response.json()
if rjson['result']:
self.legal_methods = []
for method in rjson['result']:
self.legal_methods.append(method['name'])
opsicon_logger.debug("Got Methods.")
def raw_request(self, payload):
"""
:param payload: json string to send to the server
"""
self.__rpc_request(self.session, payload)
def __getattr__(self, name):
if name in self.legal_methods:
def _rpc_call(*args, **kwargs):
self.id += 1
payload = {"method": name,
"params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:],
"id": self.id}
opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload)
return self.__rpc_request(self.session, payload)
return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs)
else:
raise AttributeError
def __get_session(self, auth):
session = requests.Session()
# avoid proxy issues
session.trust_env = False
# ignore warnings about missing ssl cert field subjectAltName
requests.packages.urllib3.disable_warnings(SubjectAltNameWarning)
# supply cert file
if self.certfile:
session.verify = self.certfile
session.auth = auth
opsicon_logger.debug("Created new session: %s}" % session)
return session
def __rpc_request(self, session, payload):
try:
r = session.post(self.server + '/rpc', json=payload)
r.raise_for_status()
except requests.exceptions.Timeout as e:
raise e
except requests.exceptions.RequestException as e:
raise e
if r.json()["error"]:
raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message'])
return r.json()['result']
===== Example =====
import json
from opsirpc import OpsiConnection as OpsiServerCon
def print_name_example(response):
for o in response:
print(o['name'])
server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447',
authfile = "myPasswordBase64.txt",
certfile = "myCert.crt"
legal_methods_path='rpc_methods.txt')
r = server_connection.product_getObjects(licenseRequired=True)
print_name_example(r)
print(server_connection.product_getObjects('description', id='flashplayer')[0]['description'])
print('\n'.join([json.dumps(d, indent=4, sort_keys=True) for d
in server_connection.getPossibleMethods_listOfHashes()]))
[[https://pastebin.com/raw/2AMSkiTA|rpc_methods.txt]]