176 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			176 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from datetime import date, datetime, timedelta, timezone
 | 
						|
from typing import Annotated
 | 
						|
from fastapi import Depends, FastAPI, HTTPException, status
 | 
						|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
 | 
						|
import jwt
 | 
						|
from jwt.exceptions import InvalidTokenError
 | 
						|
from pydantic import BaseModel
 | 
						|
from dotenv import load_dotenv
 | 
						|
import os
 | 
						|
import logging
 | 
						|
import requests
 | 
						|
import json
 | 
						|
import numpy as np
 | 
						|
import holidays
 | 
						|
import calendar
 | 
						|
from odoo_api.client import OdooAPIClient
 | 
						|
 | 
						|
# to get a string like this run:
 | 
						|
# openssl rand -hex 32
 | 
						|
 | 
						|
load_dotenv()
 | 
						|
SECRET_KEY = os.getenv('SECRET_KEY')
 | 
						|
ALGORITHM = os.getenv('ALGORITHM')
 | 
						|
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv('ACCESS_TOKEN_EXPIRE_MINUTES'))
 | 
						|
ODDO_URL = os.getenv('ODDO_URL')
 | 
						|
DB_NAME = os.getenv('DB_NAME')
 | 
						|
 | 
						|
origins_str = os.getenv('ORIGINS', '')
 | 
						|
origins = [origin.strip() for origin in origins_str.split(',') if origin.strip()]
 | 
						|
 | 
						|
class Token(BaseModel):
 | 
						|
    access_token: str
 | 
						|
    token_type: str
 | 
						|
 | 
						|
 | 
						|
class User(BaseModel):
 | 
						|
    uid: str
 | 
						|
    full_name: str | None = None
 | 
						|
    email: str | None = None
 | 
						|
 | 
						|
 | 
						|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
 | 
						|
logger = logging.getLogger("uvicorn")
 | 
						|
OdooClient = OdooAPIClient(ODDO_URL, DB_NAME, SECRET_KEY, ALGORITHM)
 | 
						|
 | 
						|
app = FastAPI()
 | 
						|
 | 
						|
from fastapi.middleware.cors import CORSMiddleware
 | 
						|
 | 
						|
app = FastAPI()
 | 
						|
 | 
						|
logger.info(f"origins: {origins}")
 | 
						|
 | 
						|
app.add_middleware(
 | 
						|
    CORSMiddleware,
 | 
						|
    allow_origins=origins,
 | 
						|
    allow_credentials=True,
 | 
						|
    allow_methods=["*"],
 | 
						|
    allow_headers=["*"],
 | 
						|
)
 | 
						|
 | 
						|
def create_access_token(data: dict, expires_delta: timedelta | None = None):
 | 
						|
    to_encode = data.copy()
 | 
						|
    if expires_delta:
 | 
						|
        expire = datetime.now(timezone.utc) + expires_delta
 | 
						|
    else:
 | 
						|
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
 | 
						|
 | 
						|
    to_encode.update({"exp": expire})
 | 
						|
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
 | 
						|
 | 
						|
    return encoded_jwt
 | 
						|
 | 
						|
 | 
						|
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]) -> User:
 | 
						|
    credentials_exception = HTTPException(
 | 
						|
        status_code=status.HTTP_401_UNAUTHORIZED,
 | 
						|
        detail="Could not validate credentials",
 | 
						|
        headers={"WWW-Authenticate": "Bearer"},
 | 
						|
    )
 | 
						|
    try:
 | 
						|
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
 | 
						|
        uid = payload.get("uid")
 | 
						|
        email = payload.get("sub")
 | 
						|
        full_name = payload.get("full_name")
 | 
						|
        if uid is None or email is None:
 | 
						|
            raise credentials_exception
 | 
						|
        
 | 
						|
        return User(uid=uid, email=email, full_name=full_name)
 | 
						|
    except InvalidTokenError:
 | 
						|
        raise credentials_exception
 | 
						|
 | 
						|
 | 
						|
def odoo_api_login(username: str, password: str) -> User | None:
 | 
						|
    result = OdooClient.login(username, password)
 | 
						|
    if result.get('uid'):
 | 
						|
        return User(
 | 
						|
            uid=str(result.get('uid')),
 | 
						|
            full_name=result.get('name'),
 | 
						|
            email=result.get('username')
 | 
						|
        )
 | 
						|
    return None
 | 
						|
 | 
						|
def get_polish_holidays(year: int, month: int):
 | 
						|
    pl_holidays = holidays.Poland(years=year)
 | 
						|
 | 
						|
    # Filtrowanie po miesiącu
 | 
						|
    holidays_list = [
 | 
						|
        {"date": date.strftime("%Y-%m-%d"), "name": name}
 | 
						|
        for date, name in sorted(pl_holidays.items())
 | 
						|
        if date.month == month
 | 
						|
    ]
 | 
						|
 | 
						|
    return holidays_list
 | 
						|
 | 
						|
def create_response(year: int, month: int, days: list) -> dict:
 | 
						|
    employee = days[0]["employee_id"][1]
 | 
						|
    days_in_month = calendar.monthrange(year, month)[1]
 | 
						|
    holidays = get_polish_holidays(year, month)
 | 
						|
 | 
						|
    start = date(year, month, 1)
 | 
						|
    end = date(year, month, days_in_month) + timedelta(days=1)
 | 
						|
 | 
						|
    # 4. Wyliczenie dni roboczych:
 | 
						|
    holiday_dates = [h['date'] for h in holidays]
 | 
						|
    working_days = int(np.busday_count(start.isoformat(), end.isoformat(), holidays=holiday_dates))
 | 
						|
 | 
						|
    return {
 | 
						|
        "year": year,
 | 
						|
        "month": month,
 | 
						|
        "days_in_month": days_in_month,
 | 
						|
        "employee": employee,
 | 
						|
        "working_days": working_days,
 | 
						|
        "public_holidays": holidays,
 | 
						|
        "days": days
 | 
						|
    }
 | 
						|
 | 
						|
@app.post("/token")
 | 
						|
async def login_for_access_token(
 | 
						|
        form_data: Annotated[OAuth2PasswordRequestForm, Depends()]) -> Token:
 | 
						|
    user = odoo_api_login(form_data.username, form_data.password)
 | 
						|
    if not user:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=status.HTTP_401_UNAUTHORIZED,
 | 
						|
            detail="Incorrect username or password",
 | 
						|
            headers={"WWW-Authenticate": "Bearer"},
 | 
						|
        )
 | 
						|
    logger.info(f"User '{user.uid}' logged in successfully.")
 | 
						|
    logger.info(
 | 
						|
        f"User '{user.uid}, {user.full_name}, {user.email}' logged in successfully.")
 | 
						|
 | 
						|
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
 | 
						|
    access_token = create_access_token(
 | 
						|
        data={"sub": user.email, "uid": user.uid, "full_name": user.full_name},
 | 
						|
        expires_delta=access_token_expires
 | 
						|
    )
 | 
						|
    return Token(access_token=access_token, token_type="bearer")
 | 
						|
 | 
						|
@app.get("/data/{year}/{month}")
 | 
						|
async def read_own_items(
 | 
						|
    year: int,
 | 
						|
    month: int,
 | 
						|
    current_user: Annotated[User, Depends(get_current_user)],
 | 
						|
):
 | 
						|
    logger.info(f"User '{current_user.full_name}' accessed data for {year}-{month}.")
 | 
						|
 | 
						|
    data = OdooClient.get_hr_attendance_data(month=month, year=year)
 | 
						|
    if not data:
 | 
						|
        raise HTTPException(
 | 
						|
            status_code=404,
 | 
						|
            detail="No attendance data found for the specified month and year."
 | 
						|
        )
 | 
						|
    return create_response(year, month, data)
 | 
						|
    
 | 
						|
 |