ready to deployment
This commit is contained in:
175
backend/main.py
Normal file
175
backend/main.py
Normal file
@@ -0,0 +1,175 @@
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from typing import Annotated
|
||||
from fastapi import Depends, FastAPI, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
||||
import jwt
|
||||
from jwt.exceptions import InvalidTokenError
|
||||
from pydantic import BaseModel
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import logging
|
||||
import requests
|
||||
import json
|
||||
import numpy as np
|
||||
import holidays
|
||||
import calendar
|
||||
from odoo_api.client import OdooAPIClient
|
||||
|
||||
# to get a string like this run:
|
||||
# openssl rand -hex 32
|
||||
|
||||
load_dotenv()
|
||||
SECRET_KEY = os.getenv('SECRET_KEY')
|
||||
ALGORITHM = os.getenv('ALGORITHM')
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES')
|
||||
ODDO_URL = os.getenv('ODDO_URL')
|
||||
DB_NAME = os.getenv('DB_NAME')
|
||||
|
||||
origins_str = os.getenv('ORIGINS', '')
|
||||
origins = [origin.strip() for origin in origins_str.split(',') if origin.strip()]
|
||||
|
||||
class Token(BaseModel):
|
||||
access_token: str
|
||||
token_type: str
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
uid: str
|
||||
full_name: str | None = None
|
||||
email: str | None = None
|
||||
|
||||
|
||||
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
|
||||
logger = logging.getLogger("uvicorn")
|
||||
OdooClient = OdooAPIClient(ODDO_URL, DB_NAME, SECRET_KEY, ALGORITHM)
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
logger.info(f"origins: {origins}")
|
||||
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=origins,
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
def create_access_token(data: dict, expires_delta: timedelta | None = None):
|
||||
to_encode = data.copy()
|
||||
if expires_delta:
|
||||
expire = datetime.now(timezone.utc) + expires_delta
|
||||
else:
|
||||
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
|
||||
|
||||
to_encode.update({"exp": expire})
|
||||
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
||||
|
||||
return encoded_jwt
|
||||
|
||||
|
||||
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
|
||||
credentials_exception = HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Could not validate credentials",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
try:
|
||||
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
||||
uid = payload.get("uid")
|
||||
email = payload.get("sub")
|
||||
full_name = payload.get("full_name")
|
||||
if uid is None or email is None:
|
||||
raise credentials_exception
|
||||
|
||||
return User(uid=uid, email=email, full_name=full_name)
|
||||
except InvalidTokenError:
|
||||
raise credentials_exception
|
||||
|
||||
|
||||
def odoo_api_login(username: str, password: str) -> User | None:
|
||||
result = OdooClient.login(username, password)
|
||||
if result.get('uid'):
|
||||
return User(
|
||||
uid=str(result.get('uid')),
|
||||
full_name=result.get('name'),
|
||||
email=result.get('username')
|
||||
)
|
||||
return None
|
||||
|
||||
def get_polish_holidays(year: int, month: int):
|
||||
pl_holidays = holidays.Poland(years=year)
|
||||
|
||||
# Filtrowanie po miesiącu
|
||||
holidays_list = [
|
||||
{"date": date.strftime("%Y-%m-%d"), "name": name}
|
||||
for date, name in sorted(pl_holidays.items())
|
||||
if date.month == month
|
||||
]
|
||||
|
||||
return holidays_list
|
||||
|
||||
def create_response(year: int, month: int, days: list) -> dict:
|
||||
employee = days[0]["employee_id"][1]
|
||||
days_in_month = calendar.monthrange(year, month)[1]
|
||||
holidays = get_polish_holidays(year, month)
|
||||
|
||||
start = date(year, month, 1)
|
||||
end = date(year, month, days_in_month) + timedelta(days=1)
|
||||
|
||||
# 4. Wyliczenie dni roboczych:
|
||||
holiday_dates = [h['date'] for h in holidays]
|
||||
working_days = int(np.busday_count(start.isoformat(), end.isoformat(), holidays=holiday_dates))
|
||||
|
||||
return {
|
||||
"year": year,
|
||||
"month": month,
|
||||
"days_in_month": days_in_month,
|
||||
"employee": employee,
|
||||
"working_days": working_days,
|
||||
"public_holidays": holidays,
|
||||
"days": days
|
||||
}
|
||||
|
||||
@app.post("/token")
|
||||
async def login_for_access_token(
|
||||
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
|
||||
user = odoo_api_login(form_data.username, form_data.password)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Incorrect username or password",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
logger.info(f"User '{user.uid}' logged in successfully.")
|
||||
logger.info(
|
||||
f"User '{user.uid}, {user.full_name}, {user.email}' logged in successfully.")
|
||||
|
||||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
access_token = create_access_token(
|
||||
data={"sub": user.email, "uid": user.uid, "full_name": user.full_name},
|
||||
expires_delta=access_token_expires
|
||||
)
|
||||
return Token(access_token=access_token, token_type="bearer")
|
||||
|
||||
@app.get("/data/{year}/{month}")
|
||||
async def read_own_items(
|
||||
year: int,
|
||||
month: int,
|
||||
current_user: Annotated[User, Depends(get_current_user)],
|
||||
):
|
||||
logger.info(f"User '{current_user.full_name}' accessed data for {year}-{month}.")
|
||||
|
||||
data = OdooClient.get_hr_attendance_data(month=month, year=year)
|
||||
if not data:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail="No attendance data found for the specified month and year."
|
||||
)
|
||||
return create_response(year, month, data)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user