This repository has been archived on 2024-08-20. You can view files and clone it, but cannot push or open issues or pull requests.
Shrlok/shrlok/shrlok.py

147 lines
4 KiB
Python
Raw Normal View History

2022-07-04 21:52:48 +02:00
#!/usr/bin/env python3
"""shrlok server: receive text/files from a socket, put them on a Web server.
The server expects messages with the following format to come through the
socket:
1. a length, as ASCII digits,
2. a null byte,
3. a JSON object containing at least the "type" key with a known value,
4. another null byte,
5. the text or file itself.
The length is of the JSON object + the null char + the content itself.
Example, with \\0 representing a null byte:
28\\0{"type":"txt"}\\0hello shrlok!
The content is 13 bytes (assuming no LF at the end), the header is 14 bytes,
plus the null byte it is 13 + 14 + 1 = 28 bytes, thus the prefixed length.
After the content is succesfully retrieved and put on an appropriate location,
the server will reply the file path through the socket and close the connection.
"""
2022-07-04 21:52:48 +02:00
import argparse
import json
import os
import socketserver
import tempfile
from pathlib import Path
ARGS = None
HTML_TEMPLATE = """\
<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<title>{title}</title>
<style>
body {{ max-width: 40em; }}
</style>
</head>
<body>
{content}
</body>
</html>
"""
class Handler(socketserver.StreamRequestHandler):
def handle(self):
data = self.receive_input()
2022-07-04 21:52:48 +02:00
# Extract header.
2022-07-04 21:52:48 +02:00
try:
first_zero = data.index(b"\0")
header_data, data = data[:first_zero], data[first_zero + 1:]
header = json.loads(header_data.decode())
except (ValueError, IndexError):
print("Bad header.")
return
file_name = None
2022-07-04 21:52:48 +02:00
if header.get("type") == "txt":
file_name = write_text(data, title=header.get("title"))
else:
print("Unknown type.")
if file_name is None:
2022-07-05 16:42:15 +02:00
return
2022-07-04 21:52:48 +02:00
print(f"{len(data)} bytes — {header}'{file_name}'.")
try:
self.request.sendall(file_name.encode())
except BrokenPipeError:
print("Broken pipe.")
def receive_input(self):
length = None
data = b""
while True:
chunk = self.request.recv(4096)
if not chunk:
break
data += chunk
if length is None:
try:
first_zero = data.index(b"\0")
except ValueError:
return b""
length_data, data = data[:first_zero], data[first_zero + 1:]
length = int(length_data.decode())
if len(data) >= length: # retrieval completed
break
return data
2022-07-04 21:52:48 +02:00
def write_text(data: bytes, title=None):
content = "<pre>{}</pre>".format(data.decode(errors="replace"))
html = HTML_TEMPLATE.format(title=title or "?", content=content)
path = str(Path(ARGS.root) / "txt")
if not os.path.isdir(path):
2022-07-05 16:42:15 +02:00
print(f"Missing '{path}' folder.")
return None
2022-07-04 21:52:48 +02:00
with tempfile.NamedTemporaryFile(
mode="wt",
dir=path,
suffix=".html",
delete=False
) as output_file:
output_file.write(html)
2022-07-05 16:42:15 +02:00
os.chmod(output_file.name, 0o644)
2022-07-04 21:52:48 +02:00
return output_file.name
def main():
2022-07-05 16:42:15 +02:00
argp = argparse.ArgumentParser(description="Share stuff through a socket.")
2022-07-04 21:52:48 +02:00
argp.add_argument("root", help="root path where to put files")
2022-07-05 16:42:15 +02:00
argp.add_argument("-s", "--socket", help="socket path")
2022-07-04 21:52:48 +02:00
global ARGS
ARGS = argp.parse_args()
2022-07-05 16:42:15 +02:00
socket_path = ARGS.socket
if not socket_path:
runtime_dir = os.environ.get("RUNTIME_DIRECTORY")
if not runtime_dir:
exit("No socket path nor runtime directory specified.")
socket_path = os.path.join(runtime_dir, "shr.sock")
print("Socket path:", socket_path)
2022-07-04 21:52:48 +02:00
if os.path.exists(socket_path):
os.unlink(socket_path)
2022-07-05 16:42:15 +02:00
2022-07-04 21:52:48 +02:00
try:
with socketserver.UnixStreamServer(socket_path, Handler) as server:
os.chmod(socket_path, 0o664)
2022-07-04 21:52:48 +02:00
server.serve_forever()
except KeyboardInterrupt:
print("Stopping server.")
finally:
os.unlink(socket_path)
if __name__ == "__main__":
main()