Spaces:
Running on Zero
Running on Zero
File size: 2,270 Bytes
9d7cf7f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | from bottle import request, response
import bottle
import queue
import threading
from .spec import bytes_to_object, object_to_bytes, BPY_PORT
from ..rig_package.parser.bpy import BpyParser, transfer_rigging
def run():
path_queue = queue.Queue()
result_queue = queue.Queue()
app = bottle.Bottle()
@app.route('/load', method='GET') # type: ignore
def load():
data = request.body.read() # type: ignore
path_queue.put(('load', data))
res = result_queue.get()
payload = object_to_bytes(res)
response.content_type = 'application/octet-stream' # type: ignore
return payload
@app.route('/ping', method='GET') # type: ignore
def ping():
return 'pong'
@app.route('/export', method='post') # type: ignore
def export():
data = request.body.read() # type: ignore
path_queue.put(('export', data))
res = result_queue.get()
payload = object_to_bytes(res)
response.content_type = 'application/octet-stream' # type: ignore
return payload
@app.route('/transfer', method='post') # type: ignore
def transfer():
data = request.body.read() # type: ignore
path_queue.put(('transfer', data))
res = result_queue.get()
payload = object_to_bytes(res)
response.content_type = 'application/octet-stream' # type: ignore
return payload
def run_server(): bottle.run(app, host='0.0.0.0', port=BPY_PORT, server='tornado')
threading.Thread(target=run_server, daemon=False).start()
while True:
d = path_queue.get()
op = d[0]
data = bytes_to_object(d[1])
if op == 'load':
print("[SERVER] received load path:", data)
asset = BpyParser.load(data)
result_queue.put(asset)
elif op == 'export':
print("[SERVER] received export path:", data['filepath'])
BpyParser.export(**data)
result_queue.put('ok')
elif op == 'transfer':
print("[SERVER] received transfer path:", data['target_path'])
transfer_rigging(**data)
result_queue.put('ok')
else:
result_queue.put(f"unsupported op: {str(op)}") |