Помощь логистам: как сделать SMS-уведомления о разгрузке товара на складе на основе FastAPI

Привет, Хабр! Меня зовут Анастасия Иванова, я технический писатель МТС Exolve. Сегодня расскажу о системе SMS-уведомлений, созданной с помощью FastAPI. Она оповещает логистов о прибытии автомобилей на склад и автоматически назначает приёмщиков.  Система интегрирована с платформой МТС Exolve, на её примере покажем, как применять современные методы асинхронного взаимодействия. Подробности — под катом.

d21a1d4b8b22dd1a3d63f6eb932b471a.jpg

Установка и настройка проекта

Предположим, что у нас есть единый склад и секторы разгрузки, обозначенные латинскими буквами. Нужно оповещать грузчиков о прибытии автомобиля в сектор разгрузки. Эту задачу мы и будем решать.

Для начала убедимся, что у нас установлены все необходимые зависимости. Мы используем FastAPI для создания веб-приложения, Redis для хранения временных данных и FastAPI Cache для эффективного кэширования запросов.

Структура проекта:

smsfastapi/
    /venv
    /example_db
        __init__.py
        shemas.py
        infodb.py
    /router
        __init__.py
        stock.py
        worker.py
    config.py
    main.py
    dev.env

В файле config.py хранятся конфигурационные переменные, такие как API-ключ и номер телефона для отправки SMS. Следующий код инициализирует соединение с Redis в функции «startup» и разъединение в »shutdown» в файле main.py:

Код

from fastapi import FastAPI
from fastapi_cache import FastAPICache
from fastapi.responses import HTMLResponse
from redis import asyncio as aioredis
from fastapi_cache.backends.redis import RedisBackend
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
from router import stock, worker


origins = [
	"http://localhost:3000",
	"http://localhost:5173",
	"http://localhost:8000"
]


app = FastAPI(title='Rule of Stock',
          	description='API для управления разгрузкой автомобилей, прибывающих на склад. \n\n')


app.add_middleware(
	CORSMiddleware,
	allow_origins=origins,
	allow_credentials=True,
	allow_methods=["GET", "POST", "OPTIONS", "DELETE", "PATCH", "PUT"],
	allow_headers=["Content-Type", "Set-Cookie", "Access-Control-Allow-Headers", "Access-Control-Allow-Origin",
               	"Authorization"],
)


@app.get("/", response_class=HTMLResponse)
async def start():
	"""
	Назначение: \n
	Показать пользователю стартовую страницу со ссылкой на Swagger для использования API.\n
	"""
	return """
    	
        	
            	Rule of Stock
            	
        	
        	
            	

API для управления разгрузкой автомобилей, прибывающих на склад

Для проверки работы API перейдите по ссылке здесь.

-------------------------------------------------------------

""" @app.on_event("startup") async def startup(): redis = await aioredis.from_url("redis://localhost:6379", encoding="utf8", decode_responses=True) cache_redis = FastAPICache.init(RedisBackend(redis), prefix="stock_cache") return cache_redis @app.on_event("shutdown") async def shutdown(): backend = FastAPICache.get_backend() if isinstance(backend, RedisBackend): redis = backend.redis await redis.close()

Модель данных

Схема данных для автомобилей и приёмщиков создана. Наши автомобили описаны схемой BaseAuto, а приёмщики — BaseWorker с использованием Pydantic:

from pydantic import BaseModel, EmailStr, Field, validator

class BaseAuto(BaseModel):
	number: str
	name: str

class BaseWorker(BaseModel):
	name: str
	is_work: bool
	is_free: bool

В качестве базы данных для простоты будем использовать список складов с элементами stocks_list = [{'name':'A', 'is_free': True},
{'name':'B', 'is_free': True}, ……]

и словарь приёмщиков:

workers_list = {
 'Ivan': {'is_work': True,
 'stocks': ['A', 'B', 'C', 'D'],
 'is_free': True,
 'phone': '79801331100'},
 'Roman': {'is_work': True,
 'stocks': ['E', 'F', 'G', 'H'],
 'is_free': True,
 'phone': '79801034411'},….}

Полную версию примера базы данных можно посмотреть здесь.

В файле config.py получим данные из переменных окружения:

from dotenv import dotenv_values

info_env = dotenv_values('dev.env')

API_KEY = info_env.get('API_KEY')
PHONE_SEND = info_env.get('PHONE_SEND')	 

Работа с приёмщиками

В файле __init__.py. каталога /router импортируем основные библиотеки для работы с маршрутами:

from fastapi import APIRouter
import aiohttp
from fastapi_cache.decorator import cache
from fastapi import FastAPI, HTTPException

Маршруты для работы с приёмщиками определены в файле worker.py. Для улучшения производительности кэшируем данные через FastAPI Cache. Это решение ускорит доступ к постоянным данным о приёмщиках.

Код

from .__init__ import *
from config import API_KEY, PHONE_SEND
from example_db import (infodb, shemas)

routework = APIRouter(prefix="/worker", tags=['Worker'])

@routework.get('/read')
@cache(expire=30, namespace='personal')
async def read_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список приёмщиков.\n
	"""
	result =  [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items()][skip:limit]
	return result
@routework.get('/free')
@cache(expire=30, namespace='personal')
async def free_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список свободных приёмщиков.\n
	"""
	result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if worker_data['is_free']][skip:limit]
	return result

@routework.get('/busy')
@cache(expire=30, namespace='personal')
async def busy_workers(skip: int = 0, limit: int = 100,) -> list:
	"""
Назначение: \n
	Получить список занятых приёмщиков.\n
	"""
	result = [{worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if not worker_data['is_free']][skip:limit]
	return result

Работа со складами

Маршруты для работы со складами находятся в файле stock.py. И мы снова кэшируем данные.

Код

from .__init__ import *

from example_db import (infodb, shemas)

routestock = APIRouter(prefix="/stock", tags=['Stock'])

@routestock.get('/read')
@cache(expire=30, namespace='stock')
async def read_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список постов.\n
	"""
	result = infodb.stocks_list[skip:limit]
	return result


@routestock.get('/free')
@cache(expire=30, namespace='stock')
async def free_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список свободных складов.\n
	"""
	result = [stock_busy for stock_busy in infodb.stocks_list if stock_busy["is_free"]][skip:limit]
	return result


@routestock.get('/busy')
@cache(expire=30, namespace='stock')
async def busy_stock(skip: int = 0, limit: int = 100,) -> list:
	"""
	Назначение: \n
	Получить список занятых складов.\n
	"""
	result = [stock_busy for stock_busy in infodb.stocks_list if not stock_busy["is_free"]][skip:limit]
	return result

Назначение приёмщика и отправка SMS

Когда автомобиль приезжает на склад, водитель прикладывает электронный пропуск к воротам. В этот момент мы назначаем приёмщика, отправляя SMS-уведомление с использованием МТС Exolve.

Взаимодействие с aiohttp

Один из ключевых аспектов этого проекта — взаимодействие с внешним API для отправки SMS-уведомлений через платформу МТС Exolve. Для этого нам нужна библиотека aiohttp, через которую мы выполним асинхронные HTTP-запросы.

Взглянем на маршрут назначения приёмщика и отправки SMS-уведомления:

Код

@routework.post("/{stock}/message")
async def send_sms(stock: str, info_auto: shemas.BaseAuto):
    """
    Назначение: \n
    Назначить приёмщика для разгрузки автомобиля и оповестить его по SMS.\n
    """
    url = "https://api.exolve.ru/messaging/v1/SendSMS"
    headers = {"Authorization": f"Bearer {API_KEY}"}
    
    # Находим свободного приёмщика в базе данных по условию привязки к складу и занятости
    worker_free = next(({worker_name: worker_data} for worker_name, worker_data in infodb.workers_list.items() if (stock in worker_data['stocks'] and worker_data['is_free'])), None)
    worker_name = list(worker_free.keys())[0]
   
    # Меняем статус сектора на «занятый»
    for elem in infodb.stocks_list:
        if elem["name"] == stock: elem["is_free"] = False
    
    # Меняем статус приёмщика на «занятый»
    infodb.workers_list[worker_name]["is_free"] = False
    
    sms_data = {
        "number": PHONE_SEND,
        "destination": worker_free[worker_name]["phone"],
        "text": f"Автомобиль {info_auto.number} прибыл на разгрузку в сектор {stock}"
    }
    
    # Создаём асинхронную сессию
    async with aiohttp.ClientSession() as session:
        async with session.post(url, headers=headers, json=sms_data) as response:
            response_data = await response.json()
            return response_data

Особенности асинхронных сессий

Ключевой момент в этом коде — асинхронная сессия aiohttp.ClientSession. Она позволяет отправлять HTTP-запросы, не блокируя основной поток выполнения. Это критически важно для высокой производительности и эффективного использования ресурсов.

Особенности асинхронного подхода

При таком подходе все операции в приложении должны быть асинхронными. Например, если используем реляционную базу данных PostgreSQL, то для взаимодействия с ней нужен асинхронный драйвер, такой как asyncpg. Так мы предотвратим блокировку сокета и обеспечим асинхронную обработку запросов.

Такому принципу нужно следовать на всех этапах разработки чтобы достичь максимальной производительности и эффективности веб-приложения.

Завершение разгрузки

Когда разгрузка завершена, приёмщик прикладывает пропуск в секторе, где он находится. Так он посылает сигнал на маршрут в обозначенный файл stock.py:

@routestock.post("/{stock}/change")
async def end_load(stock: str, info_worker: shemas.BaseWorker):
	"""
	Назначение: \n
	Изменить статус сектора и сотрудника на «свободный».\n
	"""
	# меняем статус сектора на «свободный»
	for elem in infodb.stocks_list:
    	if elem["name"] == stock: elem["is_free"] = True
	# меняем статус сотрудника на «свободный»
	infodb.workers_list[info_worker.name]["is_free”] = True
	return infodb.workers_list[info_worker.name]

Завершение и начало рабочего дня

Рабочий день приёмщика начинается и заканчивается с использованием электронных пропусков. Сигнал от пропуска приходит на API. Это показано в следующих маршрутах:

@routework.post("/finish")
async def end_work(info_worker: shemas.BaseWorker)-> dict:
	"""
	Назначение: \n
	Сигнал от электронного пропуска. Конец рабочего дня.\n
	"""
	infodb.workers_list[info_worker.name][”is_free”] = False
	infodb.workers_list[info_worker.name][”is_work”] = False
	return infodb.workers_list[info_worker.name]


@routework.post("/workday")
async def start_work(info_worker: shemas.BaseWorker)-> dict:
	"""
	Назначение: \n
	Сигнал от электронного пропуска. Начало рабочего дня.\n
	"""
	infodb.workers_list[info_worker.name][”is_free”] = True
	infodb.workers_list[info_worker.name][”is_work”] = True
	return infodb.workers_list[info_worker.name]

Дополнение к главному файлу приложения

Дополним main.py строками смонтированных маршрутов и создадим команду запуска:

app.include_router(stock.routestock)
app.include_router(worker.routework)

if __name__ == "__main__":
	uvicorn.run("main:app", port=8000, host="0.0.0.0", reload=True, workers=4)

Для тестирования API нужно запустить приложение из среды разработки и перейти по адресу http://0.0.0.0:8000/docs#/. FastAPI сразу формирует готовую спецификацию Swagger, с её помощью можно быстро проверить работоспособность проекта.

Заключение

Этот проект — пример того, как можно использовать FastAPI, Redis и aiohttp для создания системы SMS-уведомлений, интегрированной с внешними сервисами. Он показывает, как важна асинхронность при работе с внешними API.

В реальных проектах нужно обеспечить безопасность данных и обработку ошибок, а также использовать реляционные базы данных для более сложных операций. На базе этого проекта можно разработать более сложную систему с интеграцией внешних сервисов и асинхронным взаимодействием с базами данных.

Дополнительные ресурсы

© Habrahabr.ru