Всем привет. Хочу поделиться с сообществом своей наработкой, а именно модулем HTTPServer. Сервер работает в блокирующем режиме, но для себя использовал и неблокирующую версию. При написании сервера использовал вот эту статью:
https://andreymal.org/socket3
Рекомендую почитать для полного понимания.
Во вложении файл сервера и пример использования. Также дублирую пример здесь. Сервер работает только с get-запросами, но принимает и параметры. Предложения по доработке и добрая критика приветствуются.
upd:
Что-то архив не могу залить. Даю листинг здесь.
https://andreymal.org/socket3
Рекомендую почитать для полного понимания.
Во вложении файл сервера и пример использования. Также дублирую пример здесь. Сервер работает только с get-запросами, но принимает и параметры. Предложения по доработке и добрая критика приветствуются.
Код:
# -*- coding: utf-8 -*-
from HTTPServer import Server
from machine import Pin
import gc
led = Pin(2, Pin.OUT) # Присваиваем переменной led GPIO2 и назначаем его выходом
led.high() # Переводим порт в состояние 1
def test1(path): # Пример функции, принимающей только параметр path..
print('this is func test1 with path='+path)
return 'this is func test1 with path='+path
def test2(path,params): # Эта фуекция принимает на вход path и params, где params имеет вид словаря вида {'param_name':'param_value'}
print('this is func test2 with path='+path+' and params='+str(params))
return 'this is func test2 with path='+path+' and params='+str(params)
def stop(): # Пример ф-ии без входных параметров. Остановка сервера.
test.stop = True
return 'stoped'
def free(): # Пример ф-ии без параметров. Показывает в браузере состояние памяти.
result = "mem free:"+str(gc.mem_free())+" mem allocated:"+str(gc.mem_alloc())
return result
def switch(): # Ф-ия без параметров. Просто переключает состояние светодиода на ESP
led.value(not led.value())
return 'Pin 2 status '+str(led.value())
test = Server(8266) # Создаём объект сервера с указанием номера порта. По умолчанию используется порт 8080
test.RouteAdd('/test1', test1) # Добавляем обрабатывающую ф-ию test1 для маршрута /test1
test.RouteAdd('/test2', test2) # Так поступаем и для остальных маршрутов и их функций
test.RouteAdd('/stop', stop)
test.RouteAdd('/free', free)
test.RouteAdd('/switch', switch)
test.Run() # Запускаем сервер
upd:
Что-то архив не могу залить. Даю листинг здесь.
Код:
# -*- coding: utf-8 -*-
import socket
class Server:
def send_answer(self, conn, status="200 OK", typ="text/plain; charset=utf-8", data=""):
data = data.encode("utf-8")
conn.send(b"HTTP/1.1 " + status.encode("utf-8") + b"\r\n")
conn.send(b"Server: simplehttp\r\n")
conn.send(b"Connection: close\r\n")
conn.send(b"Content-Type: " + typ.encode("utf-8") + b"\r\n")
conn.send(b"Content-Length: " + bytes(len(data)) + b"\r\n")
conn.send(b"\r\n")# после пустой строки в HTTP начинаются данные
conn.send(data)
def RouteAdd(self, path, funcname):
if path not in self.routes:
self.routes[path] = funcname
def parse(self, conn, addr):# обработка соединения в отдельной функции
data = b""
while not b"\r\n" in data: # ждём первую строку
tmp = conn.recv(1024)
if not tmp: # сокет закрыли, пустой объект
break
else:
data += tmp
if not data: # данные не пришли
return # не обрабатываем
udata = data.decode("utf-8")
# берём только первую строку
udata = udata.split("\r\n", 1)[0]
# разбиваем по пробелам нашу строку
method, string, protocol = udata.split(" ", 2)
if string.find('?') != -1:
address = string.split('?')[0]
params = dict(b.split('=') for b in string.split('?')[1].split('&'))
else:
address = string
params = {}
if method != "GET":
self.send_answer(conn, "404 Not Found", data="Page not found")
return
if address in self.routes:
if len(params) > 0:
self.send_answer(conn, typ="text/html; charset=utf-8", data=self.routes[address](address, params))
else:
try:
self.send_answer(conn, typ="text/html; charset=utf-8", data=self.routes[address]())
except:
self.send_answer(conn, typ="text/html; charset=utf-8", data=self.routes[address](address))
return
else:
self.send_answer(conn, "404 Not Found", data="Page not found")
return
def __init__(self,port=8080):
self.routes = {}
self.stop = False
self.sock = socket.socket()
self.sock.bind( ("", port) )
self.sock.settimeout(2)
self.sock.listen(5)
def Run(self):
try:
while 1: # работаем постоянно
try:
if self.stop: break
conn, addr = self.sock.accept()
#print("New connection from " + addr[0])
except:
continue
try:
self.parse(conn, addr)
except:
self.send_answer(conn, "500 Internal Server Error", data="Error")
finally:
# так при любой ошибке
# сокет закроем корректно
conn.close()
finally:
self.sock.close()
# так при возникновении любой ошибки сокет
# всегда закроется корректно и будет всё хорошо
Последнее редактирование: