Skip to content

socket_handler

WebSocketHandler

Handles WebSocket connections, sending messages, and receiving responses.

This class provides methods for: * Establishing WebSocket connections. * Sending single and multiple messages. * Handling message receiving, errors, connection opening, and closing.

Source code in libs\cafex_api\src\cafex_api\socket_handler.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class WebSocketHandler:
    """Handles WebSocket connections, sending messages, and receiving
    responses.

    This class provides methods for:
    * Establishing WebSocket connections.
    * Sending single and multiple messages.
    * Handling message receiving, errors, connection opening, and closing.
    """

    def __init__(self):
        self.multi_message_wait_time = 1
        self._ws = None
        self.arr_multi_response = []
        self.list_messages = ""
        self.logger = CoreLogger(name=__name__).get_logger()

    def set_socket_connection(self, socket_url: str) -> websocket.WebSocket:
        """Establishes a WebSocket connection to the specified URL.

        This method creates a WebSocket connection using the `create_connection`
        function from the `websocket` library.

        Args:
            socket_url: The URL of the WebSocket server.

        Returns:
            A WebSocket object representing the established connection.

        Raises:
            ValueError: If the `socket_url` is empty or `None`.
            Exception: If an error occurs while establishing the WebSocket connection.
        """
        if not socket_url:
            raise ValueError("socket_url cannot be empty or None")
        try:
            self._ws = websocket.create_connection(socket_url)
            return self._ws
        except Exception as e:
            self.logger.exception("Error establishing WebSocket connection: %s", e)
            raise e

    def send_socket_message(self, socket_url: str, message: str) -> str:
        """Sends a single message to the WebSocket server and returns the
        response.

        Args:
            socket_url (str): The URL of the WebSocket server.
            message (str): The message to be sent.

        Returns:
            str: The response received from the WebSocket server.

        Raises:
            ValueError: If the `socket_url` or `message` is empty or `None`.
            Exception: If an error occurs while sending the message.

        Examples:
            send_socket_message('ws://example.com/json','abcd')
        """
        if not socket_url:
            raise ValueError("socket_url cannot be empty or None")
        if not message:
            raise ValueError("pstr_message cannot be empty or None")

        try:
            if self._ws is None:
                self._ws = self.set_socket_connection(socket_url)
            self._ws.send(message)
            response = self._ws.recv()
            return response
        except Exception as e:
            self.logger.exception("Error while sending message to the WebSocket server: %s", e)
            raise e

    def send_multi_socket_message(
        self,
        socket_url: str,
        messages: List[Dict],
        wait_time: int = 1,
        on_message: Optional[Callable] = None,
        on_error: Optional[Callable] = None,
        on_open: Optional[Callable] = None,
        on_close: Optional[Callable] = None,
        ssl_options: Optional[dict] = None,
    ) -> None:
        """Sends multiple messages to the WebSocket server and captures the
        response until an exit criteria is met. This method allows overwriting
        the default behavior of WebSocket's on_message, on_error, on_open, and
        on_close methods.

        Args:
            socket_url (str): The URL of the WebSocket server.
            messages (List[Dict]): A list of messages to be sent.
            on_message (Optional[Callable], optional): A custom on_message callback function.
            on_error (Optional[Callable], optional): A custom on_error callback function.
            on_open (Optional[Callable], optional): A custom on_open callback function.
            on_close (Optional[Callable], optional): A custom on_close callback function.
            wait_time (int, optional): The time (in seconds) to wait between sending messages.
                                       Defaults to 1.
            ssl_options (Optional[Dict], optional): SSL options for the WebSocket connection.
                                                    Defaults to None.

        Returns:
            None: This method doesn't return anything. But to fetch the response of multi socket
            message,use the WebSocketHandler class variable 'arr_multi_response'

        Raises:
            ValueError: If the `socket_url` is empty or `None`.
            Exception: If an error occurs while sending the messages

        Examples:
            # Using default callbacks
            handler.send_multi_socket_message(socket_url, messages)

            # Using custom callbacks
            handler.send_multi_socket_message(socket_url, messages,
                                              on_message=custom_on_message, ...)

            # Bypass SSL verification
            ssl_opt = {"cert_reqs": ssl.CERT_NONE}
            handler.send_multi_socket_message(socket_url, messages, ssl_opt=ssl_opt)

        Note:
            When user wants to create custom implementation of
            on_message/on_error/on_open/on_close, make sure to leverage
            WebSocketHandler's class variables 'arr_multi_response' and 'list_messages'
        """
        if not socket_url:
            raise ValueError("socket_url cannot be empty or None")

        try:
            # Use default callbacks if not provided
            on_message = on_message or self.__on_message
            on_error = on_error or self.__on_error
            on_open = on_open or self.__on_open
            on_close = on_close or self.__on_close

            websocket.enableTrace(True)
            self.list_messages = messages
            self.arr_multi_response = []
            self.multi_message_wait_time = wait_time
            self._ws = websocket.WebSocketApp(
                socket_url,
                on_message=on_message,
                on_error=on_error,
                on_open=on_open,
                on_close=on_close,
            )
            self._ws.on_open = self.__on_open
            self._ws.run_forever(sslopt=ssl_options)
        except Exception as e:
            self.logger.exception(
                "Error while sending multiple messages to WebSocket server: %s", e
            )
            raise e

    def __on_message(self, _ws: websocket.WebSocket, message: str) -> None:
        try:
            self.logger.info("***ON_MESSAGE***")
            self.logger.info(message)
            self.arr_multi_response.append(message)
        except Exception as e:
            raise e

    def __on_error(self, _ws: websocket.WebSocket, error: Union[str, Exception]):
        try:
            self.logger.exception("***ON_ERROR***")
            self.logger.exception(error)
        except Exception as e:
            raise e

    def __on_close(
        self,
        _ws: websocket.WebSocket,
        _close_status_code: Optional[int] = None,
        _close_msg: Optional[str] = None,
    ) -> None:
        try:
            self.logger.warning("***ON_CLOSE***")
            self.logger.warning(self.arr_multi_response)
        except Exception as e:
            raise e

    def __on_open(self, *_args: Any) -> None:
        try:

            def run(*_args) -> None:
                try:
                    for message in self.list_messages:
                        self._ws.send(json.dumps(message))
                        time.sleep(self.multi_message_wait_time)
                    self._ws.close()
                except Exception as ex:
                    self.logger.exception("Error in run: %s", ex)

            thread.start_new_thread(run, ())
            self.logger.info("WebSocket connection opened")
        except Exception as e:
            raise e

send_multi_socket_message(socket_url, messages, wait_time=1, on_message=None, on_error=None, on_open=None, on_close=None, ssl_options=None)

Sends multiple messages to the WebSocket server and captures the response until an exit criteria is met. This method allows overwriting the default behavior of WebSocket's on_message, on_error, on_open, and on_close methods.

Parameters:

Name Type Description Default
socket_url str

The URL of the WebSocket server.

required
messages List[Dict]

A list of messages to be sent.

required
on_message Optional[Callable]

A custom on_message callback function.

None
on_error Optional[Callable]

A custom on_error callback function.

None
on_open Optional[Callable]

A custom on_open callback function.

None
on_close Optional[Callable]

A custom on_close callback function.

None
wait_time int

The time (in seconds) to wait between sending messages. Defaults to 1.

1
ssl_options Optional[Dict]

SSL options for the WebSocket connection. Defaults to None.

None

Returns:

Name Type Description
None None

This method doesn't return anything. But to fetch the response of multi socket

None

message,use the WebSocketHandler class variable 'arr_multi_response'

Raises:

Type Description
ValueError

If the socket_url is empty or None.

Exception

If an error occurs while sending the messages

Examples:

Using default callbacks

handler.send_multi_socket_message(socket_url, messages)

Using custom callbacks

handler.send_multi_socket_message(socket_url, messages, on_message=custom_on_message, ...)

Bypass SSL verification

ssl_opt = {"cert_reqs": ssl.CERT_NONE} handler.send_multi_socket_message(socket_url, messages, ssl_opt=ssl_opt)

Note

When user wants to create custom implementation of on_message/on_error/on_open/on_close, make sure to leverage WebSocketHandler's class variables 'arr_multi_response' and 'list_messages'

Source code in libs\cafex_api\src\cafex_api\socket_handler.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def send_multi_socket_message(
    self,
    socket_url: str,
    messages: List[Dict],
    wait_time: int = 1,
    on_message: Optional[Callable] = None,
    on_error: Optional[Callable] = None,
    on_open: Optional[Callable] = None,
    on_close: Optional[Callable] = None,
    ssl_options: Optional[dict] = None,
) -> None:
    """Sends multiple messages to the WebSocket server and captures the
    response until an exit criteria is met. This method allows overwriting
    the default behavior of WebSocket's on_message, on_error, on_open, and
    on_close methods.

    Args:
        socket_url (str): The URL of the WebSocket server.
        messages (List[Dict]): A list of messages to be sent.
        on_message (Optional[Callable], optional): A custom on_message callback function.
        on_error (Optional[Callable], optional): A custom on_error callback function.
        on_open (Optional[Callable], optional): A custom on_open callback function.
        on_close (Optional[Callable], optional): A custom on_close callback function.
        wait_time (int, optional): The time (in seconds) to wait between sending messages.
                                   Defaults to 1.
        ssl_options (Optional[Dict], optional): SSL options for the WebSocket connection.
                                                Defaults to None.

    Returns:
        None: This method doesn't return anything. But to fetch the response of multi socket
        message,use the WebSocketHandler class variable 'arr_multi_response'

    Raises:
        ValueError: If the `socket_url` is empty or `None`.
        Exception: If an error occurs while sending the messages

    Examples:
        # Using default callbacks
        handler.send_multi_socket_message(socket_url, messages)

        # Using custom callbacks
        handler.send_multi_socket_message(socket_url, messages,
                                          on_message=custom_on_message, ...)

        # Bypass SSL verification
        ssl_opt = {"cert_reqs": ssl.CERT_NONE}
        handler.send_multi_socket_message(socket_url, messages, ssl_opt=ssl_opt)

    Note:
        When user wants to create custom implementation of
        on_message/on_error/on_open/on_close, make sure to leverage
        WebSocketHandler's class variables 'arr_multi_response' and 'list_messages'
    """
    if not socket_url:
        raise ValueError("socket_url cannot be empty or None")

    try:
        # Use default callbacks if not provided
        on_message = on_message or self.__on_message
        on_error = on_error or self.__on_error
        on_open = on_open or self.__on_open
        on_close = on_close or self.__on_close

        websocket.enableTrace(True)
        self.list_messages = messages
        self.arr_multi_response = []
        self.multi_message_wait_time = wait_time
        self._ws = websocket.WebSocketApp(
            socket_url,
            on_message=on_message,
            on_error=on_error,
            on_open=on_open,
            on_close=on_close,
        )
        self._ws.on_open = self.__on_open
        self._ws.run_forever(sslopt=ssl_options)
    except Exception as e:
        self.logger.exception(
            "Error while sending multiple messages to WebSocket server: %s", e
        )
        raise e

send_socket_message(socket_url, message)

Sends a single message to the WebSocket server and returns the response.

Parameters:

Name Type Description Default
socket_url str

The URL of the WebSocket server.

required
message str

The message to be sent.

required

Returns:

Name Type Description
str str

The response received from the WebSocket server.

Raises:

Type Description
ValueError

If the socket_url or message is empty or None.

Exception

If an error occurs while sending the message.

Examples:

send_socket_message('ws://example.com/json','abcd')

Source code in libs\cafex_api\src\cafex_api\socket_handler.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def send_socket_message(self, socket_url: str, message: str) -> str:
    """Sends a single message to the WebSocket server and returns the
    response.

    Args:
        socket_url (str): The URL of the WebSocket server.
        message (str): The message to be sent.

    Returns:
        str: The response received from the WebSocket server.

    Raises:
        ValueError: If the `socket_url` or `message` is empty or `None`.
        Exception: If an error occurs while sending the message.

    Examples:
        send_socket_message('ws://example.com/json','abcd')
    """
    if not socket_url:
        raise ValueError("socket_url cannot be empty or None")
    if not message:
        raise ValueError("pstr_message cannot be empty or None")

    try:
        if self._ws is None:
            self._ws = self.set_socket_connection(socket_url)
        self._ws.send(message)
        response = self._ws.recv()
        return response
    except Exception as e:
        self.logger.exception("Error while sending message to the WebSocket server: %s", e)
        raise e

set_socket_connection(socket_url)

Establishes a WebSocket connection to the specified URL.

This method creates a WebSocket connection using the create_connection function from the websocket library.

Parameters:

Name Type Description Default
socket_url str

The URL of the WebSocket server.

required

Returns:

Type Description
WebSocket

A WebSocket object representing the established connection.

Raises:

Type Description
ValueError

If the socket_url is empty or None.

Exception

If an error occurs while establishing the WebSocket connection.

Source code in libs\cafex_api\src\cafex_api\socket_handler.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def set_socket_connection(self, socket_url: str) -> websocket.WebSocket:
    """Establishes a WebSocket connection to the specified URL.

    This method creates a WebSocket connection using the `create_connection`
    function from the `websocket` library.

    Args:
        socket_url: The URL of the WebSocket server.

    Returns:
        A WebSocket object representing the established connection.

    Raises:
        ValueError: If the `socket_url` is empty or `None`.
        Exception: If an error occurs while establishing the WebSocket connection.
    """
    if not socket_url:
        raise ValueError("socket_url cannot be empty or None")
    try:
        self._ws = websocket.create_connection(socket_url)
        return self._ws
    except Exception as e:
        self.logger.exception("Error establishing WebSocket connection: %s", e)
        raise e