Skip to content

Breakpoint Moduleđź”—

Breakpoint detection strategies for water‑timeseries.

This module provides a hierarchy of breakpoint detection methods for analyzing water‑timeseries data. It includes:

  • BreakpointMethod – abstract base class with shared helpers.
  • SimpleBreakpoint – a fast rolling‑window statistical detector.
  • BeastBreakpoint – a Bayesian RBEAST‑based detector.

Each concrete class implements calculate_break for a single lake and inherits calculate_breaks_batch from the base class. The classes can be used directly in Python code or indirectly through the water-timeseries CLI.

Exampleđź”—

from water_timeseries.breakpoint import SimpleBreakpoint breakpoint = SimpleBreakpoint(kwargs_break=dict(window=3, method="median", threshold=-0.25))

breakpoint.calculate_break(dataset) # Returns DataFrame with breakpoint infođź”—

BeastBreakpoint đź”—

Bases: BreakpointMethod

Bayesian RBEAST-based breakpoint detector.

This method uses the RBEAST library to detect breakpoints in water‑timeseries data using Bayesian change‑point detection. It identifies points where the statistical properties of the time series change significantly.

Parametersđź”—

kwargs_break : dict, optional Configuration dictionary for RBEAST priors. Common keys include: - trendMaxOrder : int, default 0 Maximum order of the trend component. - trendMinSepDist : int, default 1 Minimum separation distance between change points. break_threshold : float, optional Probability threshold for detecting a break point. Default is 0.5.

Attributesđź”—

breakpoint_columns : list List of column names in the output DataFrame.

Source code in src/water_timeseries/breakpoint.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
class BeastBreakpoint(BreakpointMethod):
    """Bayesian RBEAST-based breakpoint detector.

    This method uses the RBEAST library to detect breakpoints in water‑timeseries
    data using Bayesian change‑point detection. It identifies points where the
    statistical properties of the time series change significantly.

    Parameters
    ----------
    kwargs_break : dict, optional
        Configuration dictionary for RBEAST priors. Common keys include:
        - ``trendMaxOrder`` : int, default 0
            Maximum order of the trend component.
        - ``trendMinSepDist`` : int, default 1
            Minimum separation distance between change points.
    break_threshold : float, optional
        Probability threshold for detecting a break point. Default is 0.5.

    Attributes
    ----------
    breakpoint_columns : list
        List of column names in the output DataFrame.
    """

    def __init__(
        self,
        kwargs_break: dict = dict(trendMaxOrder=0, trendMinSepDist=1),
        break_threshold: float = 0.5,
    ):
        super().__init__(method_name="rbeast")
        self.kwargs_break = kwargs_break
        self.break_threshold = break_threshold
        self.breakpoint_columns = [
            "date_break",
            "date_before_break",
            "date_after_break",
            "break_method",
            "break_number",
            "proba_rbeast",
        ]

    def calculate_break(self, dataset: LakeDataset, object_id: str) -> pd.DataFrame:
        """Calculate breakpoints for a single lake object using RBEAST.

        Parameters
        ----------
        dataset : LakeDataset
            Dataset containing lake water‑area data.
        object_id : str
            Unique identifier (geohash) for the lake object.

        Returns
        -------
        pd.DataFrame
            DataFrame containing breakpoint information with columns defined in
            ``self.breakpoint_columns`` plus calculated temporal statistics.
        """
        # Example implementation for BeastBreakpoint
        # In a real application, this would use the rbeast library or similar
        ds = dataset.ds_normalized
        df = ds.sel(id_geohash=object_id).to_pandas()
        df["date"] = df.index
        data = df[dataset.water_column]

        # Run BEAST (simple: no season). Use priors tuned for sudden drops
        # and allowing short segments (small minimum separation between CPs).
        o = rb.beast(data, season="none", quiet=True, prior=self.kwargs_break)

        cp_prob = o.trend.cpOccPr
        # print(len(cp_prob))

        # get break indices
        break_indices = np.where(cp_prob > self.break_threshold)[0]

        if break_indices.size == 0:
            return pd.DataFrame(columns=self.breakpoint_columns)

        # get previous date
        break_indices_before = np.array(break_indices) - 1
        # get after date
        break_indices_after = np.array(break_indices) + 1
        # return df
        break_dates_before = df.iloc[break_indices_before]["date"].to_list()
        break_dates_after = df.iloc[break_indices_after]["date"].to_list()

        # ensure we're working with copies to avoid pandas SettingWithCopyWarning
        df = df.copy()
        df["proba_rbeast"] = cp_prob
        # print(break_indices)
        break_df = df.iloc[break_indices].copy()

        # safely add the previous-date column
        break_df.loc[:, "date_before_break"] = break_dates_before
        break_df.loc[:, "date_after_break"] = break_dates_after

        # sort by probability descending, then add sequential break numbers
        break_df = break_df.sort_values("proba_rbeast", ascending=False).copy()
        break_df["break_number"] = range(1, len(break_df) + 1)

        break_df_out = break_df.rename(columns={"date": "date_break"}).set_index("id_geohash")
        break_df_out["break_method"] = self.method_name

        df_out = break_df_out[self.breakpoint_columns]

        break_list = []
        df_water = dataset.ds.sel(id_geohash=object_id).to_dataframe()
        for i, row in df_out.iterrows():
            id_geohash = row.name
            df_breaks = pd.concat(
                [
                    row,
                    calculate_water_area_before(
                        df_water, break_date=row["date_break"], water_column=dataset.water_column
                    ),
                    calculate_water_area_after(
                        df_water, break_date_after=row["date_after_break"], water_column=dataset.water_column
                    ),
                ]
            )
            df_breaks.name = id_geohash
            break_list.append(df_breaks)
        break_df = pd.concat(break_list, axis=1).T

        break_df.index.name = "id_geohash"
        break_df = calculate_temporal_stats(break_df)

        return break_df

calculate_break(dataset, object_id) đź”—

Calculate breakpoints for a single lake object using RBEAST.

Parametersđź”—

dataset : LakeDataset Dataset containing lake water‑area data. object_id : str Unique identifier (geohash) for the lake object.

Returnsđź”—

pd.DataFrame DataFrame containing breakpoint information with columns defined in self.breakpoint_columns plus calculated temporal statistics.

Source code in src/water_timeseries/breakpoint.py
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def calculate_break(self, dataset: LakeDataset, object_id: str) -> pd.DataFrame:
    """Calculate breakpoints for a single lake object using RBEAST.

    Parameters
    ----------
    dataset : LakeDataset
        Dataset containing lake water‑area data.
    object_id : str
        Unique identifier (geohash) for the lake object.

    Returns
    -------
    pd.DataFrame
        DataFrame containing breakpoint information with columns defined in
        ``self.breakpoint_columns`` plus calculated temporal statistics.
    """
    # Example implementation for BeastBreakpoint
    # In a real application, this would use the rbeast library or similar
    ds = dataset.ds_normalized
    df = ds.sel(id_geohash=object_id).to_pandas()
    df["date"] = df.index
    data = df[dataset.water_column]

    # Run BEAST (simple: no season). Use priors tuned for sudden drops
    # and allowing short segments (small minimum separation between CPs).
    o = rb.beast(data, season="none", quiet=True, prior=self.kwargs_break)

    cp_prob = o.trend.cpOccPr
    # print(len(cp_prob))

    # get break indices
    break_indices = np.where(cp_prob > self.break_threshold)[0]

    if break_indices.size == 0:
        return pd.DataFrame(columns=self.breakpoint_columns)

    # get previous date
    break_indices_before = np.array(break_indices) - 1
    # get after date
    break_indices_after = np.array(break_indices) + 1
    # return df
    break_dates_before = df.iloc[break_indices_before]["date"].to_list()
    break_dates_after = df.iloc[break_indices_after]["date"].to_list()

    # ensure we're working with copies to avoid pandas SettingWithCopyWarning
    df = df.copy()
    df["proba_rbeast"] = cp_prob
    # print(break_indices)
    break_df = df.iloc[break_indices].copy()

    # safely add the previous-date column
    break_df.loc[:, "date_before_break"] = break_dates_before
    break_df.loc[:, "date_after_break"] = break_dates_after

    # sort by probability descending, then add sequential break numbers
    break_df = break_df.sort_values("proba_rbeast", ascending=False).copy()
    break_df["break_number"] = range(1, len(break_df) + 1)

    break_df_out = break_df.rename(columns={"date": "date_break"}).set_index("id_geohash")
    break_df_out["break_method"] = self.method_name

    df_out = break_df_out[self.breakpoint_columns]

    break_list = []
    df_water = dataset.ds.sel(id_geohash=object_id).to_dataframe()
    for i, row in df_out.iterrows():
        id_geohash = row.name
        df_breaks = pd.concat(
            [
                row,
                calculate_water_area_before(
                    df_water, break_date=row["date_break"], water_column=dataset.water_column
                ),
                calculate_water_area_after(
                    df_water, break_date_after=row["date_after_break"], water_column=dataset.water_column
                ),
            ]
        )
        df_breaks.name = id_geohash
        break_list.append(df_breaks)
    break_df = pd.concat(break_list, axis=1).T

    break_df.index.name = "id_geohash"
    break_df = calculate_temporal_stats(break_df)

    return break_df

BreakpointMethod đź”—

Base class for breakpoint detection methods.

Parametersđź”—

method_name : str Short identifier stored in the break_method column of the output DataFrames. Sub‑classes pass values such as "simple" or "rbeast".

Source code in src/water_timeseries/breakpoint.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class BreakpointMethod:
    """Base class for breakpoint detection methods.

    Parameters
    ----------
    method_name : str
        Short identifier stored in the ``break_method`` column of the output
        DataFrames. Sub‑classes pass values such as ``"simple"`` or ``"rbeast"``.
    """

    def __init__(self, method_name: str):
        self.method_name = method_name

    def get_first_break_date(self, df: pd.DataFrame, column: str = "water") -> tuple:
        """Placeholder implementation for the abstract base.

        Concrete subclasses override this method.  The default returns a
        ``(None, None, None)`` tuple so that calling code can safely handle the lack of
        a breakpoint.

        Parameters
        ----------
        df : pd.DataFrame
            DataFrame with a datetime-like index and a water column.
        column : str, optional
            Column name to evaluate. Defaults to "water".

        Returns
        -------
        tuple
            (first_break_date, previous_date, after_date) - All values are None
            for the default implementation.
        """
        return (None, None, None)

    def calculate_break(self, dataset: LakeDataset) -> pd.DataFrame:
        """Calculate breakpoints for a single object.

        Sub‑classes must implement the actual detection algorithm and return a
        ``pandas.DataFrame`` containing at least the columns defined in
        ``self.breakpoint_columns``.

        Parameters
        ----------
        dataset : LakeDataset
            Dataset containing lake water‑area data.

        Returns
        -------
        pd.DataFrame
            DataFrame containing breakpoint information.
        """
        pass

    def calculate_breaks_batch(self, dataset: LakeDataset, progress_bar: bool = False) -> pd.DataFrame:
        """Run ``calculate_break`` for every lake in *dataset*.

        Parameters
        ----------
        dataset : LakeDataset
            Dataset providing both raw and normalized water‑area arrays.
        progress_bar : bool, optional
            Show a ``tqdm`` progress bar when ``True``. Default is ``False``.

        Returns
        -------
        pd.DataFrame
            Concatenated results from all lakes in the dataset.
        """
        # Batch processing of breakpoints for all objects in the dataset
        # dataset.ds_normalized.load()
        dataset.ds.load()
        dataset.ds_normalized.load()
        results = []
        if progress_bar:
            progress = tqdm(dataset.ds_normalized.id_geohash.values)
        else:
            progress = dataset.ds_normalized.id_geohash.values
        for object_id in progress:
            result = self.calculate_break(dataset, object_id)
            results.append(result)
        return pd.concat(results)

calculate_break(dataset) đź”—

Calculate breakpoints for a single object.

Sub‑classes must implement the actual detection algorithm and return a pandas.DataFrame containing at least the columns defined in self.breakpoint_columns.

Parametersđź”—

dataset : LakeDataset Dataset containing lake water‑area data.

Returnsđź”—

pd.DataFrame DataFrame containing breakpoint information.

Source code in src/water_timeseries/breakpoint.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def calculate_break(self, dataset: LakeDataset) -> pd.DataFrame:
    """Calculate breakpoints for a single object.

    Sub‑classes must implement the actual detection algorithm and return a
    ``pandas.DataFrame`` containing at least the columns defined in
    ``self.breakpoint_columns``.

    Parameters
    ----------
    dataset : LakeDataset
        Dataset containing lake water‑area data.

    Returns
    -------
    pd.DataFrame
        DataFrame containing breakpoint information.
    """
    pass

calculate_breaks_batch(dataset, progress_bar=False) đź”—

Run calculate_break for every lake in dataset.

Parametersđź”—

dataset : LakeDataset Dataset providing both raw and normalized water‑area arrays. progress_bar : bool, optional Show a tqdm progress bar when True. Default is False.

Returnsđź”—

pd.DataFrame Concatenated results from all lakes in the dataset.

Source code in src/water_timeseries/breakpoint.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def calculate_breaks_batch(self, dataset: LakeDataset, progress_bar: bool = False) -> pd.DataFrame:
    """Run ``calculate_break`` for every lake in *dataset*.

    Parameters
    ----------
    dataset : LakeDataset
        Dataset providing both raw and normalized water‑area arrays.
    progress_bar : bool, optional
        Show a ``tqdm`` progress bar when ``True``. Default is ``False``.

    Returns
    -------
    pd.DataFrame
        Concatenated results from all lakes in the dataset.
    """
    # Batch processing of breakpoints for all objects in the dataset
    # dataset.ds_normalized.load()
    dataset.ds.load()
    dataset.ds_normalized.load()
    results = []
    if progress_bar:
        progress = tqdm(dataset.ds_normalized.id_geohash.values)
    else:
        progress = dataset.ds_normalized.id_geohash.values
    for object_id in progress:
        result = self.calculate_break(dataset, object_id)
        results.append(result)
    return pd.concat(results)

get_first_break_date(df, column='water') đź”—

Placeholder implementation for the abstract base.

Concrete subclasses override this method. The default returns a (None, None, None) tuple so that calling code can safely handle the lack of a breakpoint.

Parametersđź”—

df : pd.DataFrame DataFrame with a datetime-like index and a water column. column : str, optional Column name to evaluate. Defaults to "water".

Returnsđź”—

tuple (first_break_date, previous_date, after_date) - All values are None for the default implementation.

Source code in src/water_timeseries/breakpoint.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_first_break_date(self, df: pd.DataFrame, column: str = "water") -> tuple:
    """Placeholder implementation for the abstract base.

    Concrete subclasses override this method.  The default returns a
    ``(None, None, None)`` tuple so that calling code can safely handle the lack of
    a breakpoint.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with a datetime-like index and a water column.
    column : str, optional
        Column name to evaluate. Defaults to "water".

    Returns
    -------
    tuple
        (first_break_date, previous_date, after_date) - All values are None
        for the default implementation.
    """
    return (None, None, None)

SimpleBreakpoint đź”—

Bases: BreakpointMethod

Fast rolling‑window statistical breakpoint detector.

This method detects breakpoints by comparing current water values against rolling window statistics (mean, median, or max). A breakpoint is identified when values fall below a threshold in both a primary and secondary window for consecutive time points, which helps reduce false positives.

Parametersđź”—

kwargs_break : dict, optional Configuration dictionary with the following keys: - window : int, default 3 Size of the primary rolling window. - method : str, default "median" Rolling statistic to use: "mean", "median", or "max". - threshold : float, default -0.25 Threshold for detecting a break (values below this indicate a break).

Attributesđź”—

breakpoint_columns : list List of column names in the output DataFrame.

Source code in src/water_timeseries/breakpoint.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
class SimpleBreakpoint(BreakpointMethod):
    """Fast rolling‑window statistical breakpoint detector.

    This method detects breakpoints by comparing current water values against
    rolling window statistics (mean, median, or max). A breakpoint is identified
    when values fall below a threshold in both a primary and secondary window
    for consecutive time points, which helps reduce false positives.

    Parameters
    ----------
    kwargs_break : dict, optional
        Configuration dictionary with the following keys:
        - ``window`` : int, default 3
            Size of the primary rolling window.
        - ``method`` : str, default "median"
            Rolling statistic to use: "mean", "median", or "max".
        - ``threshold`` : float, default -0.25
            Threshold for detecting a break (values below this indicate a break).

    Attributes
    ----------
    breakpoint_columns : list
        List of column names in the output DataFrame.
    """

    def __init__(self, kwargs_break: dict = dict(window=3, method="median", threshold=-0.25)):
        super().__init__(method_name="simple")
        self.kwargs_break = kwargs_break
        self.breakpoint_columns = ["date_break", "date_before_break", "date_after_break", "break_method"]

    def get_first_break_date(self, df: pd.DataFrame, column: str = "water") -> tuple:
        """Find the first break date and the immediately preceding index value.

        The detection uses a dual‑window approach: a primary rolling window
        (kwargs_break['window']) and a secondary window that is window+2.
        A break is detected when the current value falls below BOTH window
        calculations for consecutive points, reducing false positives.

        Parameters
        ----------
        df : pd.DataFrame
            DataFrame with a datetime‑like index and a water column.
        column : str, optional
            Column name to evaluate. Defaults to "water".

        Returns
        -------
        tuple
            (first_break_date, previous_date, after_date) where each element is
            a pandas Timestamp or None if no break was found.
        """
        df = df.drop(columns=["id_geohash"]).dropna()

        # Determine the rolling window sizes
        primary_window = self.kwargs_break["window"]
        secondary_window = primary_window + 2

        # Calculate rolling statistics based on the specified method
        # (mean, median, or max of the rolling window)
        if self.kwargs_break["method"] == "max":
            primary_rolling = df.rolling(window=primary_window).max()
            secondary_rolling = df.rolling(window=secondary_window).max()
        elif self.kwargs_break["method"] == "mean":
            primary_rolling = df.rolling(window=primary_window).mean()
            secondary_rolling = df.rolling(window=secondary_window).mean()
        elif self.kwargs_break["method"] == "median":
            primary_rolling = df.rolling(window=primary_window).median()
            secondary_rolling = df.rolling(window=secondary_window).median()
        else:
            raise ValueError("Please assign correct rolling value: 'max', 'mean', or 'median'")

        # Calculate the difference from the rolling reference
        # This measures how much the current value deviates from the window baseline
        rolling_diff_primary = df - primary_rolling
        rolling_diff_secondary = df - secondary_rolling

        # Create masks for values that fall below the threshold
        # Both windows must indicate a break for a point to be considered
        mask_primary = rolling_diff_primary[column] < self.kwargs_break["threshold"]
        mask_secondary = rolling_diff_secondary[column] < self.kwargs_break["threshold"]

        # Require consecutive confirmation: current point meets condition
        # AND the next point also meets it (using shifted mask)
        consecutive_mask = mask_primary & mask_secondary.shift(-1)

        # Find the first break date where both conditions are met
        first_break_date = (
            rolling_diff_primary[consecutive_mask].index.min()
            if not rolling_diff_primary[consecutive_mask].empty
            else None
        )

        # Determine the preceding and following dates if a break was found
        previous_date = None
        after_date = None
        if first_break_date is not None:
            try:
                pos = df.index.get_loc(first_break_date)
                # get_loc may return a slice or integer; handle integer positions
                if isinstance(pos, slice):
                    pos = pos.start if pos.start is not None else 0
                if pos > 0:
                    previous_date = df.index[pos - 1]
                    after_date = df.index[pos + 1] if pos + 1 < len(df) else None
            except Exception:
                previous_date = None
                after_date = None

        return first_break_date, previous_date, after_date

    def calculate_break(self, dataset: LakeDataset, object_id: str) -> pd.DataFrame:
        """Calculate breakpoints for a single lake object.

        Parameters
        ----------
        dataset : LakeDataset
            Dataset containing lake water‑area data.
        object_id : str
            Unique identifier (geohash) for the lake object.

        Returns
        -------
        pd.DataFrame
            DataFrame containing breakpoint information with columns defined in
            ``self.breakpoint_columns`` plus calculated temporal statistics.
        """
        # dataset._normalize_ds()
        ds = dataset.ds_normalized
        df_normed = ds.sel(id_geohash=object_id).to_pandas()
        first_break, previous_date, after_date = self.get_first_break_date(df=df_normed, column=dataset.water_column)
        if first_break is None:
            return pd.DataFrame(columns=self.breakpoint_columns)
        df_out = pd.DataFrame(
            {
                self.breakpoint_columns[0]: [first_break],
                self.breakpoint_columns[1]: [previous_date],
                self.breakpoint_columns[2]: [after_date],
                self.breakpoint_columns[3]: [self.method_name],
            },
            index=[object_id],
        )

        break_list = []
        df_water = dataset.ds.sel(id_geohash=object_id).to_dataframe()
        # TODO: can this be optimized to process the entire dataframe?
        for i, row in df_out.iterrows():
            id_geohash = row.name
            df_breaks = pd.concat(
                [
                    row,
                    calculate_water_area_before(
                        df_water, break_date=row["date_break"], water_column=dataset.water_column
                    ),
                    calculate_water_area_after(
                        df_water, break_date_after=row["date_after_break"], water_column=dataset.water_column
                    ),
                ]
            )
            df_breaks.name = id_geohash
            break_list.append(df_breaks)

        break_df = pd.concat(break_list, axis=1).T
        # calculate additional stats
        break_df = calculate_temporal_stats(break_df)

        return break_df

calculate_break(dataset, object_id) đź”—

Calculate breakpoints for a single lake object.

Parametersđź”—

dataset : LakeDataset Dataset containing lake water‑area data. object_id : str Unique identifier (geohash) for the lake object.

Returnsđź”—

pd.DataFrame DataFrame containing breakpoint information with columns defined in self.breakpoint_columns plus calculated temporal statistics.

Source code in src/water_timeseries/breakpoint.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def calculate_break(self, dataset: LakeDataset, object_id: str) -> pd.DataFrame:
    """Calculate breakpoints for a single lake object.

    Parameters
    ----------
    dataset : LakeDataset
        Dataset containing lake water‑area data.
    object_id : str
        Unique identifier (geohash) for the lake object.

    Returns
    -------
    pd.DataFrame
        DataFrame containing breakpoint information with columns defined in
        ``self.breakpoint_columns`` plus calculated temporal statistics.
    """
    # dataset._normalize_ds()
    ds = dataset.ds_normalized
    df_normed = ds.sel(id_geohash=object_id).to_pandas()
    first_break, previous_date, after_date = self.get_first_break_date(df=df_normed, column=dataset.water_column)
    if first_break is None:
        return pd.DataFrame(columns=self.breakpoint_columns)
    df_out = pd.DataFrame(
        {
            self.breakpoint_columns[0]: [first_break],
            self.breakpoint_columns[1]: [previous_date],
            self.breakpoint_columns[2]: [after_date],
            self.breakpoint_columns[3]: [self.method_name],
        },
        index=[object_id],
    )

    break_list = []
    df_water = dataset.ds.sel(id_geohash=object_id).to_dataframe()
    # TODO: can this be optimized to process the entire dataframe?
    for i, row in df_out.iterrows():
        id_geohash = row.name
        df_breaks = pd.concat(
            [
                row,
                calculate_water_area_before(
                    df_water, break_date=row["date_break"], water_column=dataset.water_column
                ),
                calculate_water_area_after(
                    df_water, break_date_after=row["date_after_break"], water_column=dataset.water_column
                ),
            ]
        )
        df_breaks.name = id_geohash
        break_list.append(df_breaks)

    break_df = pd.concat(break_list, axis=1).T
    # calculate additional stats
    break_df = calculate_temporal_stats(break_df)

    return break_df

get_first_break_date(df, column='water') đź”—

Find the first break date and the immediately preceding index value.

The detection uses a dual‑window approach: a primary rolling window (kwargs_break['window']) and a secondary window that is window+2. A break is detected when the current value falls below BOTH window calculations for consecutive points, reducing false positives.

Parametersđź”—

df : pd.DataFrame DataFrame with a datetime‑like index and a water column. column : str, optional Column name to evaluate. Defaults to "water".

Returnsđź”—

tuple (first_break_date, previous_date, after_date) where each element is a pandas Timestamp or None if no break was found.

Source code in src/water_timeseries/breakpoint.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_first_break_date(self, df: pd.DataFrame, column: str = "water") -> tuple:
    """Find the first break date and the immediately preceding index value.

    The detection uses a dual‑window approach: a primary rolling window
    (kwargs_break['window']) and a secondary window that is window+2.
    A break is detected when the current value falls below BOTH window
    calculations for consecutive points, reducing false positives.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with a datetime‑like index and a water column.
    column : str, optional
        Column name to evaluate. Defaults to "water".

    Returns
    -------
    tuple
        (first_break_date, previous_date, after_date) where each element is
        a pandas Timestamp or None if no break was found.
    """
    df = df.drop(columns=["id_geohash"]).dropna()

    # Determine the rolling window sizes
    primary_window = self.kwargs_break["window"]
    secondary_window = primary_window + 2

    # Calculate rolling statistics based on the specified method
    # (mean, median, or max of the rolling window)
    if self.kwargs_break["method"] == "max":
        primary_rolling = df.rolling(window=primary_window).max()
        secondary_rolling = df.rolling(window=secondary_window).max()
    elif self.kwargs_break["method"] == "mean":
        primary_rolling = df.rolling(window=primary_window).mean()
        secondary_rolling = df.rolling(window=secondary_window).mean()
    elif self.kwargs_break["method"] == "median":
        primary_rolling = df.rolling(window=primary_window).median()
        secondary_rolling = df.rolling(window=secondary_window).median()
    else:
        raise ValueError("Please assign correct rolling value: 'max', 'mean', or 'median'")

    # Calculate the difference from the rolling reference
    # This measures how much the current value deviates from the window baseline
    rolling_diff_primary = df - primary_rolling
    rolling_diff_secondary = df - secondary_rolling

    # Create masks for values that fall below the threshold
    # Both windows must indicate a break for a point to be considered
    mask_primary = rolling_diff_primary[column] < self.kwargs_break["threshold"]
    mask_secondary = rolling_diff_secondary[column] < self.kwargs_break["threshold"]

    # Require consecutive confirmation: current point meets condition
    # AND the next point also meets it (using shifted mask)
    consecutive_mask = mask_primary & mask_secondary.shift(-1)

    # Find the first break date where both conditions are met
    first_break_date = (
        rolling_diff_primary[consecutive_mask].index.min()
        if not rolling_diff_primary[consecutive_mask].empty
        else None
    )

    # Determine the preceding and following dates if a break was found
    previous_date = None
    after_date = None
    if first_break_date is not None:
        try:
            pos = df.index.get_loc(first_break_date)
            # get_loc may return a slice or integer; handle integer positions
            if isinstance(pos, slice):
                pos = pos.start if pos.start is not None else 0
            if pos > 0:
                previous_date = df.index[pos - 1]
                after_date = df.index[pos + 1] if pos + 1 < len(df) else None
        except Exception:
            previous_date = None
            after_date = None

    return first_break_date, previous_date, after_date