from typing import Optional
from time import sleep
from datetime import datetime
from json import dumps
from base64 import b64encode
from hashlib import sha256
from logging import getLogger
from requests import Session
from requests.adapters import HTTPAdapter, Retry
from ksef_utils import __version__ as version
from ksef_utils.utils import (
render_template,
sign_xml,
KSEFUtils,
)
from ksef_utils.logger import debug_requests
logger = getLogger(__name__)
[docs]
class KSEFServer:
def __init__(self, config):
self.config = config
if config.LOGS_VERBOSE:
debug_requests()
retries = Retry(
total=10,
backoff_factor=5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=frozenset(
{"DELETE", "GET", "HEAD", "OPTIONS", "PUT", "TRACE", "POST"}
),
)
self.session = Session()
self.session.mount(self.config.URL, HTTPAdapter(max_retries=retries))
self.session.headers.update({"User-Agent": f"ksef-utils: {version}"})
def init_token(self, **kwargs):
rendered_template = render_template("InitSession.xml", **kwargs)
print(rendered_template)
response = self.session.post(
f"{self.config.URL}/api/online/Session/InitToken",
data=rendered_template,
headers={
"Content-Type": "application/octet-stream",
"Accept": "application/json",
},
)
return response
def init_signed(self, data):
response = self.session.post(
f"{self.config.URL}/api/online/Session/InitSigned",
data=data,
headers={
"Content-Type": "application/octet-stream",
"Accept": "application/json",
},
)
return response
def get_status(self, session_token):
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
response = self.session.get(
f"{self.config.URL}/api/online/Session/Status?PageSize=10&PageOffset=0&IncludeDetails=true",
headers=headers,
)
return response
def get_upo(self, session_token, reference_number):
headers = {
"accept": "application/json",
}
response = self.session.get(
f"{self.config.URL}/api/common/Status/{reference_number}",
headers=headers,
)
return response
def authorization_challenge(self, identifier, identifier_type="onip"):
data = {
"contextIdentifier": {
"type": identifier_type,
"identifier": identifier,
}
}
response = self.session.post(
f"{self.config.URL}/api/online/Session/AuthorisationChallenge?PageSize=10&PageOffset=0&IncludeDetails=false",
json=data,
)
print(type(response.json()))
print(dir(response))
print(response.text)
print(response.reason)
print(response.status_code)
response_json = response.json()
return response_json
def get_session_terminate(self, session_token):
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
response = self.session.get(
f"{self.config.URL}/api/online/Session/Terminate",
headers=headers,
)
return response
def get_invoices(self, session_token, from_date=None, to_date=None):
if not from_date:
from_date = datetime(
datetime.now().year,
datetime.now().month,
1,
tzinfo=self.config.TIMEZONE,
)
if not to_date:
to_date = datetime.now()
from_string = from_date.isoformat(timespec="milliseconds")
to_string = datetime.now(self.config.TIMEZONE).isoformat(
timespec="milliseconds"
)
data = {
"queryCriteria": {
"subjectType": "subject1",
"type": "incremental",
"acquisitionTimestampThresholdFrom": from_string,
"acquisitionTimestampThresholdTo": to_string,
}
}
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
response = self.session.post(
f"{self.config.URL}/api/online/Query/Invoice/Sync?PageSize=10&PageOffset=0",
json=data,
headers=headers,
)
return response
def generate_token(self, session_token):
data = {
"generateToken": {
"credentialsRoleList": [
{
"roleType": "credentials_read",
"roleDescription": "read others credentials",
},
{
"roleType": "credentials_manage",
"roleDescription": "read others credentials",
},
{
"roleType": "invoice_write",
"roleDescription": "write invoices",
},
{
"roleType": "invoice_read",
"roleDescription": "read invoices",
},
{
"roleType": "payment_confirmation_write",
"roleDescription": "payment_confirmation_write",
},
# {
# "roleType": "enforcement_operations",
# "roleDescription": "enforcement_operations",
# },
],
"description": "0_ksef-utils_test_token",
}
}
response = self.session.post(
f"{self.config.URL}/api/online/Credentials/GenerateToken",
json=data,
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
def post_context_grant(
self,
session_token,
credentials_identifier,
credentials_identifier_type="nip",
context_identifier=None,
context_identifier_type="onip",
):
data = {
"grantContextCredentials": {
"contextIdentifier": {
"type": context_identifier_type,
"identifier": (
context_identifier
if context_identifier
else self.config.KSEF_NIP
),
},
"credentialsIdentifier": {
"type": credentials_identifier_type,
# "identifier": self.config.KSEF_NIP,
"identifier": credentials_identifier,
},
"credentialsRole": {
"roleType": "credentials_manage",
"roleDescription": "credentials_manage",
},
}
}
response = self.session.post(
f"{self.config.URL}/api/online/Credentials/ContextGrant",
json=data,
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
[docs]
def post_credentials_grant(self, session_token, onip):
"""
Possible grants:
* introspection
* invoice_read
* invoice_write
* payment_confirmation_write
* credentials_read
* credentials_manage
* self_invoicing
* tax_representative
* enforcement_operations
* subunit_manage
"""
data = {
"grantCredentials": {
"credentialsIdentifier": {
"type": "onip",
"identifier": onip,
},
"credentialsRoleList": [
# {
# "roleType": "introspection",
# "roleDescription": "introspection",
# },
{
"roleType": "invoice_read",
"roleDescription": "invoice_read",
},
{
"roleType": "invoice_write",
"roleDescription": "invoice_write",
},
{
"roleType": "payment_confirmation_write",
"roleDescription": "payment_confirmation_write",
},
{
"roleType": "self_invoicing",
"roleDescription": "self_invoicing",
},
{
"roleType": "tax_representative",
"roleDescription": "tax_representative",
},
],
}
}
response = self.session.post(
f"{self.config.URL}/api/online/Credentials/Grant",
json=data,
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
def get_credentials_grant(self, session_token):
# data = {
# "query": {
# "contextNip": "",
# "sourceIdentifier": "",
# "targetIdentifier": "",
# }
# }
response = self.session.get(
f"{self.config.URL}/api/online/Query/Credential/Context/Sync",
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
def get_generate_internal_identifier(
self, session_token, input_digits_sequence
):
response = self.session.get(
f"{self.config.URL}/api/online/Session/GenerateInternalIdentifier/{input_digits_sequence}",
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
def get_token_status(self, session_token, reference_number):
response = self.session.get(
f"{self.config.URL}/api/online/Credentials/Status/{reference_number}",
headers={
"Content-Type": "application/json",
"SessionToken": session_token,
"Accept": "application/json",
},
)
return response
def get_invoice_status(self, reference_number: str, session_token: str):
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
url = f"{self.config.URL}/api/online/Invoice/Status/{reference_number}"
response = self.session.get(url, headers=headers)
return response
def get_invoice(self, reference_number: str, session_token: str):
headers = {
"accept": "application/octet-stream",
"SessionToken": session_token,
}
url = f"{self.config.URL}/api/online/Invoice/Get/{reference_number}"
response = self.session.get(url, headers=headers)
return response
def send_invoice(self, data: dict, session_token: str):
url = f"{self.config.URL}/api/online/Invoice/Send"
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
response = self.session.put(url, data=dumps(data), headers=headers)
return response
def post_payment_identifier(
self, session_token, ksef_reference_list: Optional[list[str]]
):
data = {"ksefReferenceNumberList": ksef_reference_list}
headers = {
"accept": "application/json",
"SessionToken": session_token,
"Content-Type": "application/json",
}
url = f"{self.config.URL}/api/online/Payment/Identifier/Request"
response = self.session.post(url, json=data, headers=headers)
return response
[docs]
class KSEFService:
def __init__(self, server: KSEFServer):
self.server = server
self.config = server.config
def init_session(self):
response_json = self.server.authorization_challenge(
self.config.KSEF_NIP
)
challenge = response_json.get("challenge")
if not challenge:
print(dumps(response_json, indent=4))
raise Exception(
response_json.get("exception").get("exceptionDetailList")
)
encrypted_token = KSEFUtils.get_encrypted_token(
response_json,
self.config.PUBLIC_KEY,
self.config.KSEF_TOKEN,
)
response = self.server.init_token(
challenge=challenge,
token=encrypted_token,
nip=self.config.KSEF_NIP,
)
self.init_token = response.json().get("sessionToken").get("token")
self.init_token_all = response.json()
return self.init_token
def init_signed(self):
response_json = self.server.authorization_challenge(
self.config.KSEF_NIP
)
challenge = response_json.get("challenge")
if not challenge:
print(dumps(response_json, indent=4))
raise Exception(
response_json.get("exception").get("exceptionDetailList")
)
rendered_template = render_template(
"InitSessionSignedRequest.xml",
challenge=challenge,
nip=self.config.KSEF_NIP,
)
print(rendered_template)
response_signed = sign_xml(rendered_template, self.config)
print(response_signed)
response = self.server.init_signed(
data=response_signed,
)
print(dumps(response.json(), indent=4))
self.init_token = response.json().get("sessionToken").get("token")
self.init_token_all = response.json()
return response
def session_terminate(
self, max_retries: int = 60, interval: int = 5
) -> dict:
response = self.server.get_session_terminate(self.init_token)
logger.info(f"session_terminate: {dumps(response.json(), indent=4)}")
return response.json()
def wait_until_logged(self, max_retries: int = 60, interval: int = 5):
logged_in = False
while not logged_in and max_retries > 0:
logger.info(f"Wait until logged in, max_retries={max_retries}")
response = self.server.get_status(self.init_token)
response_json = response.json()
print(dumps(response_json, indent=4))
if response_json.get("processingCode") == 315:
logged_in = True
sleep(interval)
max_retries -= 1
return response_json
def send_invoice(self, **kwargs):
invoice = render_template("invoice_example.xml", **kwargs)
print(invoice)
invoice_encoded = invoice.encode("utf-8")
invoice_hash = {
"fileSize": len(invoice_encoded),
"hashSHA": {
"algorithm": "SHA-256",
"encoding": "Base64",
"value": b64encode(sha256(invoice_encoded).digest()).decode(
"utf-8"
),
},
}
invoice_payload = {
"invoiceBody": b64encode(invoice_encoded).decode("utf-8"),
"type": "plain",
}
data = {"invoiceHash": invoice_hash, "invoicePayload": invoice_payload}
return self.server.send_invoice(data, self.init_token)
def wait_until_invoice(
self, reference_number: str, max_retries: int = 60, interval: int = 5
) -> dict:
invoice_status: dict = {}
while not invoice_status and max_retries > 0:
logger.info(f"Wait until invoice, max_retries={max_retries}")
response = self.get_invoice_status(reference_number)
print(response.status_code)
print(dumps(response.json(), indent=4))
invoice_status = response.json().get("invoiceStatus")
if not invoice_status.get("ksefReferenceNumber"):
invoice_status = {}
if not invoice_status:
sleep(interval)
max_retries -= 1
return response.json()
def get_invoice_status(self, reference_number: str):
return self.server.get_invoice_status(reference_number, self.init_token)
def get_invoice(self, reference_number: str) -> str:
response = self.server.get_invoice(reference_number, self.init_token)
return response.text
def get_upo(self, reference_number: str):
response = self.server.get_upo(self.init_token, reference_number)
print(f"response_upo: {response}")
return response.json()
def wait_until_upo(
self, reference_number: str, max_retries: int = 90, interval: int = 20
) -> dict:
processing_code = 310
while processing_code != 200 and max_retries > 0:
logger.info(f"Wait until upo, max_retries={max_retries}")
response = self.get_upo(reference_number)
print(dumps(response, indent=4))
processing_code = response.get("processingCode")
if processing_code != 200:
sleep(interval)
max_retries -= 1
return response
def generate_token(self):
response = self.server.generate_token(self.init_token)
return response.json()
def wait_until_token(self, element_reference_number, interval: int = 5):
response = self.server.get_token_status(
self.init_token, element_reference_number
)
processing_code = 302
while processing_code != 200:
response_status = self.server.get_token_status(
self.init_token, element_reference_number
)
processing_code = response_status.json().get("processingCode")
print(dumps(response.json(), indent=4))
if processing_code != 200:
sleep(interval)
return response.json()
def post_payment_identifier(
self, ksef_reference_list: Optional[list[str]] = None
):
response = self.server.post_payment_identifier(
self.init_token, ksef_reference_list
)
return response.json()
def get_invoices(self, from_date=None, to_date=None):
response = self.server.get_invoices(self.init_token, from_date, to_date)
return response.json()
def get_status(self):
response = self.server.get_status(self.init_token)
return response.json()
def post_context_grant(
self,
credentials_identifier,
credentials_identifier_type="nip",
context_identifier=None,
context_identifier_type="onip",
):
response = self.server.post_context_grant(
self.init_token,
credentials_identifier=credentials_identifier,
credentials_identifier_type=credentials_identifier_type,
context_identifier=context_identifier,
context_identifier_type=context_identifier_type,
)
return response.json()
def post_credentials_grant(self, onip):
response = self.server.post_credentials_grant(
self.init_token, onip=onip
)
return response.json()
def get_credentials_grant(self):
response = self.server.get_credentials_grant(self.init_token)
return response.json()
def get_generate_internal_identifier(self, input_digits_sequence="0000"):
response = self.server.get_generate_internal_identifier(
self.init_token, input_digits_sequence
)
return response.json()