User Tools

Site Tools


userspace:json-rpc_python_requests

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

userspace:json-rpc_python_requests [2018/03/27 07:40]
schuhj [Example]
userspace:json-rpc_python_requests [2021/08/23 08:37]
Line 1: Line 1:
-====== Module for json-rpc requests in Python ====== 
-requires: requests 
  
-This Module aims to provide a seamless integration of Opsi RPC Methods in Python Scripts. 
- 
-**It is provided as is and without any warranty or support! **  
- 
-**More error handling and concern for security is needed before use in production environments! **  
- 
-===== The Module ===== 
- 
-<code python> 
-import logging 
-import requests 
-import getpass 
-import base64 
-import os.path 
-# ignore warnings about missing ssl cert field 
-from requests.packages.urllib3.exceptions import SubjectAltNameWarning 
- 
-opsicon_logger = logging.getLogger(__name__) 
- 
- 
-class OpsiError(Exception): 
-    """Exception raised for remote RPC errors 
- 
-    Attributes: 
-        expression -- the RPC Call 
-        class -- the Error class 
-        message -- the Error message 
-    """ 
- 
-    def __init__(self, expression, opsierrorclass, message): 
-        self.expression = expression 
-        self.opsierrorclass = opsierrorclass 
-        self.message = message 
- 
- 
-class OpsiConnection: 
-    def __init__(self, url, authfile=None, auth=None, certfile=None, legal_methods_path=None): 
-        # 0 is not a valid id 
-        self.id = 0 
-        self.server = url 
-        self.certfile = certfile 
-        self.legal_methods = None 
-        if not (authfile or auth): 
-            # get auth from user via commandline input 
-            auth = (input("Username: "), getpass.getpass(prompt="Password: ")) 
-        elif not auth: 
-            # get auth from file 
-            # authfile expected to contain base64 encoded UTF-8 String (username password) 
-            # The authfile has to be kept in an secure environment! 
-            with open(authfile, 'rb') as f: 
-                auth = tuple(base64.b64decode(f.read()).decode("utf-8").split()) 
-        # create session 
-        self.session = self.__get_session(auth) 
- 
-        if legal_methods_path: 
-            opsicon_logger.debug("Get Methods from File...") 
-            if os.path.isfile(legal_methods_path): 
-                with open(legal_methods_path, 'r') as f: 
-                    self.legal_methods = f.read().splitlines() 
-            else: 
-                raise FileNotFoundError(legal_methods_path) 
-        else: 
-            # getPossibleMethods_listOfHashes does not always deliver a comprehensive list of methods! 
-            opsicon_logger.debug("Get Methods from Server...") 
-            self.id += 1 
-            response = self.__rpc_request(self.session, 
-                                          {"method": "getPossibleMethods_listOfHashes", 
-                                           "params": [], 
-                                           "id": self.id}) 
-            rjson = response.json() 
-            if rjson['result']: 
-                self.legal_methods = [] 
-            for method in rjson['result']: 
-                self.legal_methods.append(method['name']) 
-            opsicon_logger.debug("Got Methods.") 
- 
-    def raw_request(self, payload): 
-        self.__rpc_request(self.session, payload) 
- 
-    def __getattr__(self, name): 
-        if name in self.legal_methods: 
-            def _rpc_call(*args, **kwargs): 
-                self.id += 1 
-                payload = {"method": name, 
-                           "params": [list(args)[1:], kwargs] if len(kwargs) > 0 else list(args)[1:], 
-                           "id": self.id} 
-                opsicon_logger.debug("Interpreting as rpc call: \n%s}" % payload) 
-                return self.__rpc_request(self.session, payload) 
- 
-            return lambda *args, **kwargs: _rpc_call(self, *args, **kwargs) 
-        else: 
-            raise AttributeError 
- 
-    def __get_session(self, auth): 
-        session = requests.Session() 
-        # avoid proxy issues 
-        session.trust_env = False 
-        # ignore warnings about missing ssl cert field  
-        requests.packages.urllib3.disable_warnings(SubjectAltNameWarning) 
-        # supply cert file 
-        if self.certfile: 
-            session.verify = self.certfile 
-        session.auth = auth 
-        opsicon_logger.debug("Created new session: %s}" % session) 
-        return session 
- 
-    def __rpc_request(self, session, payload): 
-        try: 
-            r = session.post(self.server + '/rpc', json=payload) 
-            r.raise_for_status() 
-        except requests.exceptions.Timeout as e: 
-            # TODO: Maybe set up retry loop 
-            raise e 
-        except requests.exceptions.RequestException as e: 
-            # TODO: Maybe think about better handling 
-            raise e 
-        if r.json()["error"]: 
-            raise OpsiError(payload, r.json()['error']['class'], r.json()['error']['message']) 
-        return r 
-</code> 
-===== Example ===== 
-<code python> 
-from opsirpc import OpsiConnection as OpsiServerCon 
- 
-def print_name_example(response): 
-    for o in response.json()['result']: 
-        print(o['name']) 
- 
- 
-server_connection = OpsiServerCon(url = 'https://my.opsiserver.dom:4447', 
-                                  authfile = "myPasswordBase64.txt", 
-                                  certfile = "myCert.crt" 
-                                  legal_methods_path='rpc_methods.txt') 
- 
-r = server_connection.product_getObjects(licenseRequired=True) 
-print_name_example(r) 
- 
-print(server_connection.product_getObjects('description', id='flashplayer').json()['result'][0]['description']) 
-</code> 
userspace/json-rpc_python_requests.txt ยท Last modified: 2021/08/23 08:37 (external edit)