This shows you the differences between two versions of the page.
userspace:json-rpc_python_requests [2018/03/27 07:40] schuhj [Example] |
userspace:json-rpc_python_requests [2021/08/23 08:37] |
||
---|---|---|---|
Line 1: | Line 1: | ||
- | ====== Module for json-rpc requests in Python ====== | ||
- | requires: requests | ||
- | This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts. | ||
- | |||
- | **It is provided as is and without any warranty or support! ** | ||
- | |||
- | **More error handling and concern for security is needed before use in production environments! ** | ||
- | |||
- | ===== The Module ===== | ||
- | |||
- | <code python> | ||
- | import logging | ||
- | import requests | ||
- | import getpass | ||
- | import base64 | ||
- | import os.path | ||
- | # ignore warnings about missing ssl cert field | ||
- | from requests.packages.urllib3.exceptions import SubjectAltNameWarning | ||
- | |||
- | opsicon_logger = logging.getLogger(__name__) | ||
- | |||
- | |||
- | class OpsiError(Exception): | ||
- | """ | ||
- | |||
- | Attributes: | ||
- | expression -- the RPC Call | ||
- | class -- the Error class | ||
- | message -- the Error message | ||
- | """ | ||
- | |||
- | def __init__(self, | ||
- | self.expression = expression | ||
- | self.opsierrorclass = opsierrorclass | ||
- | self.message = message | ||
- | |||
- | |||
- | class OpsiConnection: | ||
- | def __init__(self, | ||
- | # 0 is not a valid id | ||
- | self.id = 0 | ||
- | self.server = url | ||
- | self.certfile = certfile | ||
- | self.legal_methods = None | ||
- | if not (authfile or auth): | ||
- | # get auth from user via commandline input | ||
- | auth = (input(" | ||
- | elif not auth: | ||
- | # get auth from file | ||
- | # authfile expected to contain base64 encoded UTF-8 String (username password) | ||
- | # The authfile has to be kept in an secure environment! | ||
- | with open(authfile, | ||
- | auth = tuple(base64.b64decode(f.read()).decode(" | ||
- | # create session | ||
- | self.session = self.__get_session(auth) | ||
- | |||
- | if legal_methods_path: | ||
- | opsicon_logger.debug(" | ||
- | if os.path.isfile(legal_methods_path): | ||
- | with open(legal_methods_path, | ||
- | self.legal_methods = f.read().splitlines() | ||
- | else: | ||
- | raise FileNotFoundError(legal_methods_path) | ||
- | else: | ||
- | # getPossibleMethods_listOfHashes does not always deliver a comprehensive list of methods! | ||
- | opsicon_logger.debug(" | ||
- | self.id += 1 | ||
- | response = self.__rpc_request(self.session, | ||
- | {" | ||
- | " | ||
- | " | ||
- | rjson = response.json() | ||
- | if rjson[' | ||
- | self.legal_methods = [] | ||
- | for method in rjson[' | ||
- | self.legal_methods.append(method[' | ||
- | opsicon_logger.debug(" | ||
- | |||
- | def raw_request(self, | ||
- | self.__rpc_request(self.session, | ||
- | |||
- | def __getattr__(self, | ||
- | if name in self.legal_methods: | ||
- | def _rpc_call(*args, | ||
- | self.id += 1 | ||
- | payload = {" | ||
- | " | ||
- | " | ||
- | opsicon_logger.debug(" | ||
- | return self.__rpc_request(self.session, | ||
- | |||
- | return lambda *args, **kwargs: _rpc_call(self, | ||
- | else: | ||
- | raise AttributeError | ||
- | |||
- | def __get_session(self, | ||
- | session = requests.Session() | ||
- | # avoid proxy issues | ||
- | session.trust_env = False | ||
- | # ignore warnings about missing ssl cert field | ||
- | requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) | ||
- | # supply cert file | ||
- | if self.certfile: | ||
- | session.verify = self.certfile | ||
- | session.auth = auth | ||
- | opsicon_logger.debug(" | ||
- | return session | ||
- | |||
- | def __rpc_request(self, | ||
- | try: | ||
- | r = session.post(self.server + '/ | ||
- | r.raise_for_status() | ||
- | except requests.exceptions.Timeout as e: | ||
- | # TODO: Maybe set up retry loop | ||
- | raise e | ||
- | except requests.exceptions.RequestException as e: | ||
- | # TODO: Maybe think about better handling | ||
- | raise e | ||
- | if r.json()[" | ||
- | raise OpsiError(payload, | ||
- | return r | ||
- | </ | ||
- | ===== Example ===== | ||
- | <code python> | ||
- | from opsirpc import OpsiConnection as OpsiServerCon | ||
- | |||
- | def print_name_example(response): | ||
- | for o in response.json()[' | ||
- | print(o[' | ||
- | |||
- | |||
- | server_connection = OpsiServerCon(url = ' | ||
- | authfile = " | ||
- | certfile = " | ||
- | legal_methods_path=' | ||
- | |||
- | r = server_connection.product_getObjects(licenseRequired=True) | ||
- | print_name_example(r) | ||
- | |||
- | print(server_connection.product_getObjects(' | ||
- | </ |