import logging
import functools
from datetime import datetime
from numbers import Number
from typing import Union, Iterable, Tuple
import dateutil
import numpy as np
import pandas as pd
from bionumpy.bnpdataclass import BNPDataClass
from dateutil.parser import parse as _parse
from dateutil.relativedelta import relativedelta
from pytz import utc
from chap_core.exceptions import InvalidDateError
logger = logging.getLogger(__name__)
[docs]
def parse(date_string: str, default: datetime = None):
if len(date_string) == 6 and date_string.isdigit():
date_string = date_string[:4] + "-" + date_string[4:6]
return _parse(date_string, default=default)
[docs]
class DateUtilWrapper:
_used_attributes: tuple = ()
def __init__(self, date: datetime):
self._date = date
def __getattr__(self, item: str):
if item in self._used_attributes:
return getattr(self._date, item)
return super().__getattribute__(item)
[docs]
class TimeStamp(DateUtilWrapper):
_used_attributes = ("year", "month", "day", "__str__", "__repr__")
@property
def week(self):
return self._date.isocalendar()[1]
def __init__(self, date: datetime):
self._date = date
@property
def date(self) -> datetime:
return self._date
[docs]
@classmethod
def parse(cls, text_repr: str):
return cls(parse(text_repr))
def __le__(self, other: "TimeStamp"):
return self._comparison(other, "__le__")
def __ge__(self, other: "TimeStamp"):
return self._comparison(other, "__ge__")
def __gt__(self, other: "TimeStamp"):
return self._comparison(other, "__gt__")
def __lt__(self, other: "TimeStamp"):
return self._comparison(other, "__lt__")
def __repr__(self):
return f"TimeStamp({self.year}-{self.month}-{self.day})"
def __eq__(self, other):
return self._date == other._date
def __sub__(self, other: "TimeStamp"):
if not isinstance(other, TimeStamp):
return NotImplemented
return TimeDelta(relativedelta(self._date, other._date))
def _comparison(self, other: "TimeStamp", func_name: str):
return getattr(self._date.replace(tzinfo=utc), func_name)(other._date.replace(tzinfo=utc))
[docs]
class TimePeriod:
_used_attributes = ()
_extension = None
def __init__(self, date: datetime | Number, *args, **kwargs):
if not isinstance(date, (datetime, TimeStamp)):
date = self.__date_from_numbers(date, *args, **kwargs)
if isinstance(date, TimeStamp):
date = date._date
self._date = date
@property
def last_day(self):
return self.end_timestamp - delta_day
@classmethod
def __date_from_numbers(cls, year: int, month: int = 1, day: int = 1):
return datetime(int(year), int(month), int(day))
[docs]
@classmethod
def from_id(cls, id: str):
if len(id) == 4:
return Year(int(id))
if "SunW" in id:
return Week(*map(int, id.split("SunW")), iso_day=7)
if "W" in id:
return Week(*map(int, id.split("W")))
elif len(id) == 6:
return Month(int(id[:4]), int(id[4:]))
elif len(id) == 8:
return Day(int(id[:4]), int(id[4:6]), int(id[6:]))
@property
def id(self):
raise NotImplementedError("Must be implemented in subclass")
[docs]
@classmethod
def timestamp_diff(cls, first_timestamp: TimeStamp, second_timestamp: TimeStamp):
return second_timestamp - first_timestamp
def __eq__(self, other):
r = self._date == other._date
r2 = self._extension == other._extension
if not r or not r2:
pass
return r and r2
def __le__(self, other: "TimePeriod"):
if isinstance(other, TimeStamp):
return TimeStamp(self._date) <= other
return self._date < other._exclusive_end()
def __ge__(self, other: "TimePeriod"):
if isinstance(other, TimeStamp):
return TimeStamp(self._exclusive_end()) > other
return self._exclusive_end() > other._date
def __gt__(self, other: "TimePeriod"):
if isinstance(other, TimeStamp):
return TimeStamp(self._date) > other
return self._date >= other._exclusive_end()
def __lt__(self, other: "TimePeriod"):
if isinstance(other, TimeStamp):
return TimeStamp(self._exclusive_end()) <= other
return self._exclusive_end() <= other._date
def __sub__(self, other: "TimePeriod"):
if not isinstance(other, TimePeriod):
return NotImplemented
assert self._extension == other._extension
return TimeDelta(relativedelta(self._date, other._date))
def _exclusive_end(self):
return self._date + self._extension
def __getattr__(self, item):
if item in self._used_attributes:
return getattr(self._date, item)
# return self.__getattribute__(item)
return super().__getattribute__(item)
@property
def time_delta(self) -> "TimeDelta":
return TimeDelta(self._extension)
[docs]
@classmethod
def parse(cls, text_repr: str):
if "W" in text_repr or "/" in text_repr:
if "SunW" in text_repr:
return cls.from_id(text_repr)
return cls.parse_week(text_repr)
try:
year = int(text_repr)
return Year(year)
except ValueError:
pass
default_dates = [datetime(2010, 1, 1), datetime(2009, 11, 10)]
dates = [parse(text_repr, default=default_date) for default_date in default_dates]
date = dates[0]
if dates[0].day == dates[1].day:
return Day(date)
elif dates[0].month == dates[1].month:
return Month(date)
return Year(date)
[docs]
@classmethod
def from_pandas(cls, period: pd.Period):
return cls.parse(str(period))
[docs]
@classmethod
def parse_week(cls, week: str):
if "W" in week:
year, weeknr = week.split("W")
return Week(int(year), int(weeknr))
elif "/" in week:
start, end = week.split("/")
start_date = dateutil.parser.parse(start)
end_date = dateutil.parser.parse(end)
assert relativedelta(end_date, start_date).days == 6, f"Week must be 7 days {start_date} {end_date}"
return Week(start_date) # type: ignore
@property
def start_timestamp(self):
return TimeStamp(self._date)
@property
def end_timestamp(self):
return TimeStamp(self._exclusive_end())
@property
def n_days(self):
return (self._exclusive_end() - self._date).days
[docs]
class Day(TimePeriod):
_used_attributes = ["year", "month", "day"]
_extension = relativedelta(days=1)
def __repr__(self):
return f"Day({self.year}-{self.month}-{self.day})"
[docs]
def topandas(self):
return pd.Period(year=self.year, month=self.month, day=self.day, freq="D")
[docs]
def to_string(self):
return f"{self.year}-{self.month:02d}-{self.day:02d}"
@property
def id(self):
return self._date.strftime("%Y%m%d")
[docs]
class WeekNumbering:
[docs]
@staticmethod
def get_week_info(date: datetime) -> Tuple[int, int, int]:
return date.isocalendar()
[docs]
@staticmethod
def get_date(year: int, week: int, day: int) -> datetime:
try:
return datetime.strptime(f"{year}-W{week}-{day % 7}", "%G-W%V-%w")
except ValueError as e:
logger.error(f"Invalid date {year}-W{week}-{day % 7}")
raise InvalidDateError(f"Invalid date {year}-W{week}-{day % 7}") from e
[docs]
class Week(TimePeriod):
_used_attributes = [] # 'year']
_extension = relativedelta(weeks=1)
_week_numbering = WeekNumbering
_sep_strings = {1: "W", 7: "SunW"}
@property
def id(self):
if self._day_nr != 1:
assert self._day_nr == 7, "Only support Sunday or Monday as the first day of the week"
return f"{self.year}{self._sep_strings[self._day_nr]}{self.week:02d}"
return f"{self.year}W{self.week:02d}"
[docs]
def to_string(self):
return f"{self.year}{self._sep_strings[self._day_nr]}{self.week}"
def __init__(self, date, *args, **kwargs):
if args or kwargs:
year = date
week_nr = args[0] if args else kwargs["week"]
day_nr = kwargs.get("iso_day", 1)
self._day_nr = day_nr
self._date = self.__date_from_numbers(year, week_nr)
self.week = week_nr
self.year = year
# self.year = self._date.year
else:
if isinstance(date, TimeStamp):
date = date._date
year, week, day = date.isocalendar()
self.week = week
self.year = year
self._day_nr = day
self._date = date
def __sub__(self, other: "TimePeriod"):
if not isinstance(other, TimePeriod):
return NotImplemented
assert self._extension == other._extension
return TimeDelta(self._date - other._date)
def __str__(self):
return f"{self.year}{self._sep_strings[self._day_nr]}{self.week:02d}"
__repr__ = __str__
def __date_from_numbers(self, year: int, week_nr: int):
date = self._week_numbering.get_date(year, week_nr, self._day_nr)
# date = datetime.strptime(f'{year}-W{week_nr}-1', "%Y-W%W-%w")
assert date.isocalendar()[:2] == (year, week_nr), (
date.isocalendar()[:2],
year,
week_nr,
)
return date
@classmethod
def _isocalendar_week_to_date(cls, year: int, week_nr: int, day: int):
return datetime.strptime(f"{year}-W{week_nr}-{day}", "%Y-W%V-%w")
[docs]
def topandas(self):
# return self.__str__()
assert self._day_nr in (1, 0, 7), self._day_nr
# daystr = "MON" if self._day_nr == 1 else "SUN"
return pd.Period(self._date, freq=("W"))
[docs]
def clean_timestring(timestring: str):
if isinstance(timestring, Number):
return str(timestring)
if "W" in timestring:
year, week = timestring.split("W")
return f"{year}W{int(week):02d}"
return timestring
[docs]
class Month(TimePeriod):
_used_attributes = ["year", "month"]
_extension = relativedelta(months=1)
@property
def id(self):
return self._date.strftime("%Y%m")
[docs]
def to_string(self):
return f"{self.year}-{self.month:02d}"
[docs]
def topandas(self):
return pd.Period(year=self.year, month=self.month, freq="M")
def __repr__(self):
return f"Month({self.year}-{self.month})"
[docs]
class Year(TimePeriod):
_used_attributes = ["year"]
_extension = relativedelta(years=1)
@property
def id(self):
return str(self.year)
def __repr__(self):
return f"Year({self.year})"
[docs]
def topandas(self):
return pd.Period(year=self.year, freq="Y")
[docs]
def to_string(self):
return f"{self.year}"
[docs]
class TimeDelta(DateUtilWrapper):
def __init__(self, relative_delta: relativedelta):
self._relative_delta = relative_delta
self._date = None
def __eq__(self, other):
return self._relative_delta == other._relative_delta
def __add__(self, other: Union[TimeStamp, TimePeriod]):
if not isinstance(other, (TimeStamp, TimePeriod)):
return NotImplemented
return other.__class__(other._date + self._relative_delta)
def __radd__(self, other: Union[TimeStamp, TimePeriod]):
return self.__add__(other)
def __sub__(self, other: Union[TimeStamp, TimePeriod]):
if not isinstance(other, (TimeStamp, TimePeriod)):
return NotImplemented
return other.__class__(other._date - self._relative_delta)
def __rsub__(self, other: Union[TimeStamp, TimePeriod]):
return self.__sub__(other)
def __mul__(self, other: int):
return self.__class__(self._relative_delta * other)
def __rmul__(self, other: int):
return self.__mul__(other)
def _n_months(self):
return self._relative_delta.months + 12 * self._relative_delta.years
def __floordiv__(self, divident: "TimeDelta"):
if divident._relative_delta.days != 0:
for name in ("months", "years"):
assert not getattr(divident._relative_delta, name, 0) > 0, f"Cannot divide by {divident}"
assert not getattr(self._relative_delta, name, 0) > 0, f"Cannot divide {self} by {divident}"
# assert divident._relative_delta.months == 0 and divident._relative_delta.years == 0, f'Cannot divide by {divident}'
# assert self._relative_delta.months == 0 and self._relative_delta.years == 0, f'Cannot divide {self} by {divident}'
return self._relative_delta.days // divident._relative_delta.days
return self._n_months() // divident._n_months()
def __mod__(self, other: "TimeDelta"):
assert other._relative_delta.days == 0
return self.__class__(relativedelta(months=self._n_months() % other._n_months()))
def __repr__(self):
return f"TimeDelta({self._relative_delta})"
[docs]
def n_periods(self, start_stamp: TimeStamp, end_stamp: TimeStamp):
assert sum(bool(getattr(self._relative_delta, name, 0)) for name in ("days", "months", "years")) == 1, (
f"Cannot get number of periods for {self}"
)
if self._relative_delta.days != 0:
n_days_diff = (end_stamp.date - start_stamp.date).days
return n_days_diff // self._relative_delta.days
if self._relative_delta.weeks != 0:
n_days_diff = (end_stamp.date - start_stamp.date).days
return n_days_diff // (self._relative_delta.weeks * 7)
if self._relative_delta.months != 0 or self._relative_delta.years != 0:
return (end_stamp - start_stamp) // self
[docs]
class PeriodRange(BNPDataClass):
def __init__(
self,
start_timestamp: TimeStamp,
end_timestamp: TimeStamp,
time_delta: TimeDelta,
):
self._start_timestamp = start_timestamp
self._end_timestamp = end_timestamp
self._time_delta = time_delta
@property
def month(self):
return np.array([p.start_timestamp.month for p in self])
@property
def year(self):
return np.array([p.start_timestamp.year for p in self])
@property
def week(self):
return np.array([p.start_timestamp.week for p in self])
@property
def delta(self):
return self._time_delta
[docs]
@classmethod
def from_time_periods(cls, start_period: TimePeriod, end_period: TimePeriod):
assert start_period.time_delta == end_period.time_delta
return cls(
TimeStamp(start_period._date),
TimeStamp(end_period._exclusive_end()),
start_period.time_delta,
)
[docs]
@classmethod
def from_timestamps(cls, start_timestamp: TimeStamp, end_timestamp: TimeStamp, time_delta: TimeDelta):
return cls(start_timestamp, end_timestamp, time_delta)
def __len__(self):
if self._time_delta._relative_delta.days != 0:
assert self._time_delta._relative_delta.months == 0 and self._time_delta._relative_delta.years == 0
days = (self._end_timestamp._date - self._start_timestamp._date).days
return days // self._time_delta._relative_delta.days
delta = relativedelta(self._end_timestamp._date, self._start_timestamp._date)
return TimeDelta(delta) // self._time_delta
def __eq__(self, other: TimePeriod) -> np.ndarray[bool]:
"""Check each period in the range for equality to the given period"""
return self._vectorize("__eq__", other)
def _vectorize(self, funcname: str, other: TimePeriod):
if isinstance(other, PeriodRange):
assert len(self) == len(other), (len(self), len(other), self, other)
return np.array([getattr(period, funcname)(other_period) for period, other_period in zip(self, other)])
return np.array([getattr(period, funcname)(other) for period in self])
def __ne__(self, other: TimePeriod) -> np.ndarray[bool]:
"""Check each period in the range for inequality to the given period"""
return self._vectorize("__ne__", other)
__lt__ = functools.partialmethod(_vectorize, "__lt__")
__le__ = functools.partialmethod(_vectorize, "__le__")
__gt__ = functools.partialmethod(_vectorize, "__gt__")
__ge__ = functools.partialmethod(_vectorize, "__ge__")
@property
def _period_class(self):
if self._time_delta == delta_month:
return Month
elif self._time_delta == delta_year:
return Year
elif self._time_delta == delta_day:
return Day
elif self._time_delta == delta_week:
return Week
raise ValueError(f"Unknown time delta {self._time_delta}")
def __iter__(self):
return (self._period_class((self._start_timestamp + self._time_delta * i)._date) for i in range(len(self)))
def __getitem__(self, item: slice | int):
"""Slice by numeric index in the period range"""
if isinstance(item, Number):
if item < 0:
item += len(self)
return self._period_class((self._start_timestamp + self._time_delta * item)._date)
assert item.step is None
start = self._start_timestamp
end = self._end_timestamp
if item.stop is not None:
if item.stop < 0:
end -= self._time_delta * abs(item.stop)
else:
end = start + self._time_delta * item.stop # Not sure about the logic here, test more
if item.start is not None:
offset = item.start if item.start >= 0 else len(self) + item.start
start = start + self._time_delta * offset
if start > end:
raise ValueError(f"Invalid slice {item} for period range {self} of length {len(self)}")
return PeriodRange(start, end, self._time_delta)
[docs]
def topandas(self):
if self._time_delta == delta_month:
return pd.Series([pd.Period(year=p.year, month=p.month, freq="M") for p in self])
elif self._time_delta == delta_year:
return pd.Series([pd.Period(year=p.year, freq="Y") for p in self])
elif self._time_delta == delta_day:
return pd.Series([pd.Period(year=p.year, month=p.month, day=p.day, freq="D") for p in self])
elif self._time_delta == delta_week:
return pd.Series([p.topandas() for p in self])
else:
raise ValueError(f"Cannot convert period range with time delta {self._time_delta} to pandas")
[docs]
def to_period_index(self):
return pd.period_range(
start=self[0].topandas(),
end=self[-1].topandas(),
freq=self[-1].topandas().freq,
)
[docs]
@classmethod
def from_pandas(cls, periods: Iterable[pd.Period]):
time_deltas = {
"M": delta_month,
"ME": delta_month,
"Y": delta_year,
"D": delta_day,
"W-MON": delta_week,
"W-SUN": delta_week,
}
periods = list(periods)
if not len(periods):
raise ValueError("Cannot create a period range from an empty list")
frequency = periods[0].freqstr
time_delta = time_deltas[frequency]
assert all(p.freqstr == frequency for p in periods), f"All periods must have the same frequency {periods}"
time_periods = [TimePeriod.parse(str(period)) for period in periods]
cls._check_consequtive(time_delta, time_periods)
return cls.from_time_periods(time_periods[0], time_periods[-1])
@classmethod
def _check_consequtive(cls, time_delta, time_periods, fill_missing=False):
# if time_delta == delta_week:
# return cls._check_consequtive_weeks(time_periods, fill_missing)
is_consec = [p2 == p1 + time_delta for p1, p2 in zip(time_periods, time_periods[1:])]
if not all(is_consec):
if fill_missing:
indices = [(p - time_periods[0]) // time_delta for p in time_periods][:-1]
mask = np.full((time_periods[-1] - time_periods[0]) // time_delta, True)
mask[indices] = False
return np.flatnonzero(mask)
print(f"Periods {time_periods}")
mask = ~np.array(list(is_consec))
print(mask)
for wrong in np.flatnonzero(mask):
print(f"Wrong period {time_periods[wrong], time_periods[wrong + 1]} with time delta {time_delta}")
print(time_periods[wrong] + time_delta, time_periods[wrong + 1])
raise ValueError("Periods must be consecutive.")
return []
@classmethod
def _get_delta(cls, periods: list[TimePeriod]):
delta = periods[0].time_delta
if not all(period.time_delta == delta for period in periods):
raise ValueError(f"All periods must have the same time delta {periods}")
return delta
[docs]
@classmethod
def from_strings(cls, period_strings: Iterable[str], fill_missing=False):
periods = []
for period_string in period_strings:
try:
p = TimePeriod.parse(period_string)
except InvalidDateError:
logger.error(f"Invalid date {period_string}")
raise
periods.append(p)
return cls.from_period_list(fill_missing, periods)
[docs]
@classmethod
def from_ids(cls, ids: Iterable[str], fill_missing=False):
periods = [TimePeriod.from_id(id) for id in ids]
return cls.from_period_list(fill_missing, periods)
[docs]
@classmethod
def from_start_and_n_periods(cls, start_period: pd.Period, n_periods: int):
if not isinstance(start_period, TimePeriod):
period = TimePeriod.from_pandas(start_period)
else:
period = start_period
delta = period.time_delta
return cls.from_time_periods(period, period + delta * (n_periods - 1))
[docs]
@classmethod
def from_period_list(cls, fill_missing, periods):
delta = cls._get_delta(periods)
missing = cls._check_consequtive(delta, periods, fill_missing)
ret = cls.from_time_periods(periods[0], periods[-1])
if fill_missing:
assert len(ret) == len(missing) + len(periods), (
len(ret),
len(missing),
len(periods),
periods,
missing,
)
return ret, missing
return ret
@property
def shape(self):
return (len(self),)
def __repr__(self):
return f"PeriodRange({self._start_timestamp}, {self._end_timestamp}, {self._time_delta})"
[docs]
def searchsorted(self, period: TimePeriod, side="left"):
"""Find the index where the period would be inserted to maintain order"""
if side not in ("left", "right"):
raise ValueError(f"Invalid side {side}")
assert period.time_delta == self._time_delta, (period, self._time_delta)
n_steps = self._time_delta.n_periods(self._start_timestamp, period.start_timestamp)
# n_steps = TimeDelta(relativedelta(period._date, self._start_timestamp._date)) // self._time_delta
if side == "right":
n_steps += 1
n_steps = min(max(0, n_steps), len(self)) # if period is outside
return n_steps
[docs]
def concatenate(self, other: "PeriodRange") -> "PeriodRange":
assert self._time_delta == other._time_delta
assert other._start_timestamp == self._end_timestamp, "Can only concnatenate when other starts where self ends"
return PeriodRange(self._start_timestamp, other._end_timestamp, self._time_delta)
def __array_function__(self, func, types, args, kwargs):
if func.__name__ == "concatenate":
assert len(args[0]) == 2
return self.concatenate(args[0][1])
return NotImplemented
@property
def start_timestamp(self):
return self._start_timestamp
@property
def end_timestamp(self):
return self._end_timestamp
[docs]
def todict(self):
return {
"start_timestamp": self._start_timestamp,
"end_timestamp": self._end_timestamp,
"time_delta": self._time_delta,
}
[docs]
def tolist(self):
return [p.to_string() for p in self]
delta_month = TimeDelta(relativedelta(months=1))
delta_year = TimeDelta(relativedelta(years=1))
delta_day = TimeDelta(relativedelta(days=1))
delta_week = TimeDelta(relativedelta(weeks=1))
[docs]
def convert_time_period_string(row):
if len(row) == 6 and "W" not in row:
return f"{row[:4]}-{row[4:]}"
return row