File size: 22,127 Bytes
cb3451f
 
 
fbdb1e5
 
cb3451f
fbdb1e5
3fe3bd5
4791c0a
cb3451f
fbdb1e5
cb3451f
fbdb1e5
cb3451f
fbdb1e5
cb3451f
fbdb1e5
 
beeebb1
6d9770a
4791c0a
 
 
cb3451f
78ea13f
6d9770a
4e530e4
fbdb1e5
3fe3bd5
fbdb1e5
 
6d9770a
 
 
 
 
 
 
4791c0a
 
6d9770a
 
 
 
4791c0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb3451f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fbdb1e5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3edd42b
 
 
 
 
 
 
 
 
 
 
 
fbdb1e5
 
 
 
 
 
 
 
 
3f78502
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6d9770a
 
 
1ec322d
6d9770a
 
 
1ec322d
 
 
 
6d9770a
1ec322d
 
 
 
6d9770a
 
dd8c015
 
 
 
 
 
 
 
 
 
 
 
 
 
fbdb1e5
 
 
 
 
 
 
 
 
 
 
 
ea81cd4
 
fbdb1e5
 
beeebb1
 
 
 
 
 
 
 
4791c0a
fbdb1e5
4791c0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fbdb1e5
 
 
 
 
3fe3bd5
 
 
 
 
 
4080756
3fe3bd5
 
 
 
 
 
 
 
4080756
3fe3bd5
fbdb1e5
 
 
 
 
 
 
 
 
 
3fe3bd5
fbdb1e5
 
 
3fe3bd5
4080756
fbdb1e5
3fe3bd5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46a21ce
 
 
 
4e530e4
 
 
 
 
 
 
 
78ea13f
 
4791c0a
 
ca766b5
4791c0a
 
 
ca766b5
 
4791c0a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ca766b5
 
78ea13f
 
 
 
6d9770a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
import sys
import types

import pytest

from hackathon_advisor.dashboard_chat_contracts import parse_native_tool_call
from hackathon_advisor.model_runtime import (
    DEFAULT_ADAPTER_ID,
    DEFAULT_ADAPTER_REVISION,
    MiniCPMChatRunner,
    MiniCPMTransformersPlanner,
    RuleBasedChatRunner,
    RuleBasedPlanner,
    create_chat_runner,
    create_tool_planner,
    generation_lock,
    render_context,
    runtime_status,
    system_prompt,
    _best_local_device,
    _minicpm_generation_kwargs,
    _load_minicpm_causal_lm,
    _minicpm_chat_inputs,
    _minicpm_chat_inputs_with_tools,
    _normalize_xml_tool_output,
    _resolve_torch_device,
    _strip_unused_generation_inputs,
)
from hackathon_advisor.zerogpu import gpu_task, zero_gpu_duration_seconds, zero_gpu_enabled


class FakeBackends:
    def __init__(self, mps: bool) -> None:
        self.mps = type("MPS", (), {"is_available": staticmethod(lambda: mps)})()


class FakeTorch:
    def __init__(self, cuda: bool = False, mps: bool = False) -> None:
        self.bfloat16 = "bfloat16"
        self.float32 = "float32"
        self.cuda = type("CUDA", (), {"is_available": staticmethod(lambda: cuda)})()
        self.backends = FakeBackends(mps)


class FakeInputs(dict):
    def to(self, device):
        self["device"] = device
        return self


class FakeTokenizer:
    def __init__(self) -> None:
        self.template_call = None
        self.tokenizer_call = None

    def apply_chat_template(self, messages, *, tokenize, add_generation_prompt, enable_thinking):
        self.template_call = {
            "messages": messages,
            "tokenize": tokenize,
            "add_generation_prompt": add_generation_prompt,
            "enable_thinking": enable_thinking,
        }
        return "rendered prompt"

    def __call__(self, prompts, *, return_tensors):
        self.tokenizer_call = {"prompts": prompts, "return_tensors": return_tensors}
        return FakeInputs({"input_ids": [1], "attention_mask": [1], "token_type_ids": [0]})


class FakeMiniCPMModel:
    last_instance = None

    @classmethod
    def from_pretrained(cls, model_id, **kwargs):
        instance = cls()
        instance.model_id = model_id
        instance.kwargs = kwargs
        instance.device = None
        cls.last_instance = instance
        return instance

    def to(self, device):
        self.device = device
        return self


class FakeToolsTokenizer(FakeTokenizer):
    """FakeTokenizer that also records the native tools= template path."""

    def apply_chat_template(
        self, messages, *, tokenize, add_generation_prompt, enable_thinking, tools=None
    ):
        self.template_call = {
            "messages": messages,
            "tokenize": tokenize,
            "add_generation_prompt": add_generation_prompt,
            "enable_thinking": enable_thinking,
        }
        if tools is not None:
            self.template_call["tools"] = tools
        return "rendered prompt"


class FakeStreamer:
    """Stands in for transformers.TextIteratorStreamer in the worker-thread flow."""

    def __init__(self, tokenizer, *, skip_prompt, skip_special_tokens) -> None:
        import queue

        self._queue: queue.Queue = queue.Queue()

    def put(self, piece) -> None:
        self._queue.put(piece)

    def end(self) -> None:
        self._queue.put(None)

    def __iter__(self):
        while True:
            piece = self._queue.get()
            if piece is None:
                return
            yield piece


class FakeParameter:
    device = "cpu"


class FakeAdapterContext:
    def __init__(self, log: list[str]) -> None:
        self._log = log

    def __enter__(self):
        self._log.append("adapter_disabled")
        return self

    def __exit__(self, *exc_info):
        self._log.append("adapter_restored")
        return False


class FakeChatModel:
    def __init__(self, pieces: tuple[str, ...], adapter_log: list[str] | None = None) -> None:
        self.pieces = pieces
        self.adapter_log = adapter_log
        self.generate_calls: list[dict] = []
        self.lock_was_held: list[bool] = []

    def parameters(self):
        return iter([FakeParameter()])

    def generate(self, **kwargs) -> None:
        self.lock_was_held.append(generation_lock().locked())
        self.generate_calls.append(kwargs)
        streamer = kwargs["streamer"]
        for piece in self.pieces:
            streamer.put(piece)
        streamer.end()


class FakeAdapterChatModel(FakeChatModel):
    def disable_adapter(self):
        assert self.adapter_log is not None
        return FakeAdapterContext(self.adapter_log)


@pytest.fixture
def fake_transformers(monkeypatch: pytest.MonkeyPatch):
    module = types.SimpleNamespace(TextIteratorStreamer=FakeStreamer)
    monkeypatch.setitem(sys.modules, "transformers", module)
    return module


def chat_runner_with(model: FakeChatModel) -> MiniCPMChatRunner:
    planner = MiniCPMTransformersPlanner(
        "openbmb/MiniCPM5-1B",
        adapter_id="build-small-hackathon/some-lora" if hasattr(model, "disable_adapter") else "",
    )
    planner._model = model
    planner._tokenizer = FakeToolsTokenizer()
    return MiniCPMChatRunner(planner)


def test_chat_inputs_with_tools_passes_native_tools() -> None:
    tokenizer = FakeToolsTokenizer()
    tools = [{"type": "function", "function": {"name": "list_quests"}}]

    inputs = _minicpm_chat_inputs_with_tools(
        tokenizer,
        [{"role": "user", "content": "hello"}],
        tools=tools,
        enable_thinking=False,
        device="cpu",
    )

    assert tokenizer.template_call["tools"] == tools
    assert tokenizer.template_call["enable_thinking"] is False
    assert inputs == {"input_ids": [1], "attention_mask": [1], "device": "cpu"}


def test_chat_runner_streams_under_lock_with_adapter_disabled(fake_transformers) -> None:
    adapter_log: list[str] = []
    model = FakeAdapterChatModel(("<function ", 'name="list_quests">', "</function>"), adapter_log)
    runner = chat_runner_with(model)

    pieces = list(
        runner.stream(
            [{"role": "user", "content": "what quests exist"}],
            tools=[{"type": "function", "function": {"name": "list_quests"}}],
            max_new_tokens=96,
        )
    )

    assert [piece for _count, piece in pieces] == [
        "<function ",
        'name="list_quests">',
        "</function>",
    ]
    assert [count for count, _piece in pieces] == [1, 2, 3]
    assert adapter_log == ["adapter_disabled", "adapter_restored"]
    assert model.lock_was_held == [True]
    assert generation_lock().locked() is False
    assert model.generate_calls[0]["max_new_tokens"] == 96
    assert model.generate_calls[0]["do_sample"] is False
    template_call = runner._planner._tokenizer.template_call
    assert "tools" in template_call


def test_chat_runner_forwards_enable_thinking_to_the_template(fake_transformers) -> None:
    model = FakeChatModel(("thoughts</think>\n\nanswer",))
    runner = chat_runner_with(model)

    list(
        runner.stream(
            [{"role": "user", "content": "hi"}],
            tools=[{"type": "function"}],
            max_new_tokens=4096,
            enable_thinking=True,
        )
    )

    template_call = runner._planner._tokenizer.template_call
    assert template_call["enable_thinking"] is True
    assert model.generate_calls[0]["max_new_tokens"] == 4096
    assert MiniCPMChatRunner.supports_thinking is True
    assert RuleBasedChatRunner.supports_thinking is False


def test_chat_runner_answer_pass_omits_tools_and_adapter_toggle(fake_transformers) -> None:
    model = FakeChatModel(("The map ", "shows ten projects."))
    runner = chat_runner_with(model)

    pieces = list(
        runner.stream(
            [
                {"role": "user", "content": "what is everyone building"},
                {"role": "assistant", "content": "", "tool_calls": []},
                {"role": "tool", "content": "{}"},
            ],
            max_new_tokens=200,
        )
    )

    assert "".join(piece for _count, piece in pieces) == "The map shows ten projects."
    assert model.lock_was_held == [True]
    template_call = runner._planner._tokenizer.template_call
    assert "tools" not in template_call


def test_chat_runner_surfaces_generation_errors(fake_transformers) -> None:
    class ExplodingModel(FakeChatModel):
        def generate(self, **kwargs) -> None:
            kwargs["streamer"].end()
            raise RuntimeError("boom")

    runner = chat_runner_with(ExplodingModel(()))

    with pytest.raises(RuntimeError, match="boom"):
        list(runner.stream([{"role": "user", "content": "hi"}], max_new_tokens=10))
    assert generation_lock().locked() is False


def test_early_close_releases_generation_lock(fake_transformers) -> None:
    model = FakeChatModel(("tok1 ", "tok2 ", "tok3 ", "tok4 ", "tok5"))
    runner = chat_runner_with(model)
    stream = runner.stream([{"role": "user", "content": "hi"}], max_new_tokens=32)

    next(stream)  # consume one piece then abandon mid-stream
    stream.close()

    assert generation_lock().locked() is False


def test_rule_chat_runner_escapes_xml_special_characters() -> None:
    runner = RuleBasedChatRunner()

    output = "".join(
        piece
        for _count, piece in runner.stream(
            [{"role": "user", "content": "find projects about A & B <robots>"}],
            tools=[{"type": "function"}],
            max_new_tokens=96,
        )
    )

    call = parse_native_tool_call(output)
    assert call.name == "search_projects"
    assert call.arguments["query"] == "find projects about A & B <robots>"


def test_rule_chat_runner_routes_tools_pass_through_intents() -> None:
    runner = RuleBasedChatRunner()

    output = "".join(
        piece
        for _count, piece in runner.stream(
            [{"role": "user", "content": "who completed the most quests"}],
            tools=[{"type": "function"}],
            max_new_tokens=96,
        )
    )

    call = parse_native_tool_call(output)
    assert call.name == "top_projects_by_quests"


def test_rule_chat_runner_answer_pass_is_deterministic() -> None:
    runner = RuleBasedChatRunner()

    output = "".join(
        piece
        for _count, piece in runner.stream(
            [{"role": "user", "content": "hi"}, {"role": "tool", "content": "{}"}],
            max_new_tokens=200,
        )
    )

    assert "verified data" in output


def test_create_chat_runner_matches_advisor_backend() -> None:
    minicpm = MiniCPMTransformersPlanner("openbmb/MiniCPM5-1B")

    assert isinstance(create_chat_runner(minicpm), MiniCPMChatRunner)
    assert isinstance(create_chat_runner(RuleBasedPlanner()), RuleBasedChatRunner)


def test_base_model_context_is_null_without_adapter() -> None:
    planner = MiniCPMTransformersPlanner("openbmb/MiniCPM5-1B", adapter_id="")
    planner._model = FakeChatModel(())

    with planner.base_model_context():
        pass  # no adapter -> nullcontext, nothing to toggle


def test_rule_planner_emits_valid_search_call() -> None:
    planner = RuleBasedPlanner()

    resolution = planner.plan("search similar lullaby audio projects", {})

    assert resolution.status == "valid"
    assert resolution.call.name == "search_projects"
    assert resolution.call.arguments["query"] == "search similar lullaby audio projects"


def test_rule_planner_uses_plan_when_idea_exists() -> None:
    planner = RuleBasedPlanner()

    resolution = planner.plan("make a build plan", {"ideas": [{"title": "A", "pitch": "B"}]})

    assert resolution.status == "valid"
    assert resolution.call.name == "make_plan"


def test_rule_planner_keeps_empty_board_commands_as_commands() -> None:
    planner = RuleBasedPlanner()

    plan = planner.plan("make a build plan", {})
    rank = planner.plan("compare ideas", {})

    assert plan.status == "valid"
    assert plan.call.name == "make_plan"
    assert rank.status == "valid"
    assert rank.call.name == "compare_ideas"


def test_rule_planner_defaults_blank_to_list_projects() -> None:
    planner = RuleBasedPlanner()

    resolution = planner.plan("", {})

    assert resolution.status == "valid"
    assert resolution.call.name == "list_projects"


def test_rule_planner_routes_project_reference_commands() -> None:
    planner = RuleBasedPlanner()

    listed = planner.plan("show current map", {})
    project = planner.plan("read project lolaby", {})
    project_url = planner.plan("open space https://huggingface.co/spaces/build-small-hackathon/lolaby", {})

    assert listed.status == "valid"
    assert listed.call.name == "list_projects"
    assert project.status == "valid"
    assert project.call.name == "get_project"
    assert project.call.arguments["id"] == "lolaby"
    assert project_url.status == "valid"
    assert project_url.call.name == "get_project"
    assert project_url.call.arguments["id"] == "build-small-hackathon/lolaby"


def test_rule_planner_keeps_project_words_inside_ideas() -> None:
    planner = RuleBasedPlanner()

    resolution = planner.plan("A dashboard that helps teams show projects to mentors", {})

    assert resolution.status == "valid"
    assert resolution.call.name == "save_idea"


def test_rule_planner_does_not_match_commands_inside_idea_words() -> None:
    planner = RuleBasedPlanner()

    planting = planner.plan(
        "A neighborhood seed swap archive that reminds gardeners when to plant shared seeds",
        {},
    )
    cooking_plan = planner.plan(
        "A countertop helper that turns pantry leftovers into a weekly cooking plan",
        {},
    )

    assert planting.status == "valid"
    assert planting.call.name == "save_idea"
    assert cooking_plan.status == "valid"
    assert cooking_plan.call.name == "save_idea"


def test_rule_planner_splits_explicit_idea_pitch() -> None:
    planner = RuleBasedPlanner()

    resolution = planner.plan(
        "idea: Hands-on science coach -- A lab-notebook companion for household experiments.",
        {},
    )

    assert resolution.status == "valid"
    assert resolution.call.name == "save_idea"
    assert resolution.call.arguments["title"] == "Hands-on science coach"
    assert resolution.call.arguments["pitch"] == "A lab-notebook companion for household experiments."


def test_render_context_includes_state() -> None:
    context = render_context(
        "make a plan",
        {
            "ideas": [{"title": "Archive Cartographer", "pitch": "Map family memories."}],
            "trace": [{"input": "first", "verdict": "ECHO x2", "overall": 5.1}],
        },
    )

    assert "Archive Cartographer" in context
    assert "ECHO x2" in context
    assert '<function name="tool_name">' in context
    assert "Available tools:" in context
    assert "search_projects" in context


def test_system_prompt_keeps_runtime_role_user_facing() -> None:
    prompt = system_prompt()

    assert "The Unwritten Almanac" in prompt
    assert "Mothback" not in prompt
    assert "Build Small" not in prompt


def test_create_tool_planner_defaults_to_minicpm(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.delenv("ADVISOR_MODEL_BACKEND", raising=False)
    monkeypatch.delenv("ADVISOR_ADAPTER_ID", raising=False)
    monkeypatch.delenv("ADVISOR_ADAPTER_REVISION", raising=False)

    planner = create_tool_planner()

    status = runtime_status(planner).to_dict()
    assert isinstance(planner, MiniCPMTransformersPlanner)
    assert status["backend"] == "minicpm-transformers"
    assert status["loaded"] is False
    assert status["adapter_id"] == DEFAULT_ADAPTER_ID
    assert status["adapter_revision"] == DEFAULT_ADAPTER_REVISION


def test_create_tool_planner_accepts_explicit_rules_backend(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv("ADVISOR_MODEL_BACKEND", "rules")

    planner = create_tool_planner()

    assert isinstance(planner, RuleBasedPlanner)
    assert runtime_status(planner).to_dict()["loaded"] is True


def test_create_tool_planner_accepts_adapter_env(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv("ADVISOR_MODEL_BACKEND", "minicpm-transformers")
    monkeypatch.setenv("ADVISOR_MODEL_ID", "openbmb/MiniCPM5-1B")
    monkeypatch.setenv("ADVISOR_ADAPTER_ID", DEFAULT_ADAPTER_ID)
    monkeypatch.setenv("ADVISOR_ADAPTER_REVISION", "abc123")

    planner = create_tool_planner()
    status = runtime_status(planner).to_dict()

    assert isinstance(planner, MiniCPMTransformersPlanner)
    assert status["backend"] == "minicpm-transformers"
    assert status["model_id"] == "openbmb/MiniCPM5-1B"
    assert status["adapter_id"] == DEFAULT_ADAPTER_ID
    assert status["adapter_revision"] == "abc123"
    assert status["loaded"] is False


def test_create_tool_planner_rejects_unknown_backend(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv("ADVISOR_MODEL_BACKEND", "bogus")

    with pytest.raises(RuntimeError, match="Unsupported"):
        create_tool_planner()


def test_minicpm_status_is_lazy() -> None:
    planner = MiniCPMTransformersPlanner("openbmb/MiniCPM5-1B", DEFAULT_ADAPTER_ID)
    status = runtime_status(planner).to_dict()

    assert status["backend"] == "minicpm-transformers"
    assert status["adapter_id"] == DEFAULT_ADAPTER_ID
    assert status["adapter_revision"] == ""
    assert status["loaded"] is False


def test_zerogpu_disabled_leaves_function_unwrapped(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.delenv("ADVISOR_ZERO_GPU", raising=False)

    def marker() -> str:
        return "ok"

    assert zero_gpu_enabled() is False
    assert gpu_task(marker) is marker


def test_zerogpu_duration_validates_positive_values(monkeypatch: pytest.MonkeyPatch) -> None:
    monkeypatch.setenv("ADVISOR_ZERO_GPU_DURATION", "7")
    assert zero_gpu_duration_seconds() == 7

    monkeypatch.setenv("ADVISOR_ZERO_GPU_DURATION", "0")
    with pytest.raises(RuntimeError, match="positive"):
        zero_gpu_duration_seconds()

    monkeypatch.setenv("ADVISOR_ZERO_GPU_DURATION", "121")
    with pytest.raises(RuntimeError, match="at most 120"):
        zero_gpu_duration_seconds()


def test_generation_inputs_drop_token_type_ids() -> None:
    inputs = {"input_ids": [1], "attention_mask": [1], "token_type_ids": [0]}

    _strip_unused_generation_inputs(inputs)

    assert inputs == {"input_ids": [1], "attention_mask": [1]}


def test_minicpm_loader_matches_official_cuda_dtype() -> None:
    model = _load_minicpm_causal_lm(FakeMiniCPMModel, "openbmb/MiniCPM5-1B", "cuda", FakeTorch())

    assert model.model_id == "openbmb/MiniCPM5-1B"
    assert model.kwargs == {"torch_dtype": "bfloat16", "trust_remote_code": True}
    assert model.device == "cuda"


def test_minicpm_loader_uses_device_map_for_auto() -> None:
    model = _load_minicpm_causal_lm(FakeMiniCPMModel, "openbmb/MiniCPM5-1B", "auto", FakeTorch())

    assert model.kwargs == {
        "torch_dtype": "bfloat16",
        "device_map": "auto",
        "trust_remote_code": True,
    }
    assert model.device is None


def test_minicpm_chat_inputs_follow_official_template_flow() -> None:
    tokenizer = FakeTokenizer()

    inputs = _minicpm_chat_inputs(
        tokenizer,
        [{"role": "user", "content": "hello"}],
        enable_thinking=False,
        device="cuda",
    )

    assert tokenizer.template_call == {
        "messages": [{"role": "user", "content": "hello"}],
        "tokenize": False,
        "add_generation_prompt": True,
        "enable_thinking": False,
    }
    assert tokenizer.tokenizer_call == {"prompts": ["rendered prompt"], "return_tensors": "pt"}
    assert inputs == {"input_ids": [1], "attention_mask": [1], "device": "cuda"}


def test_minicpm_generation_kwargs_match_demo_sampling_policy() -> None:
    inputs = {"input_ids": [1], "attention_mask": [1]}

    sampled = _minicpm_generation_kwargs(inputs, max_new_tokens=32, temperature=0.9, top_p=0.95)
    deterministic = _minicpm_generation_kwargs(inputs, max_new_tokens=32, temperature=0.0)

    assert sampled == {
        "input_ids": [1],
        "attention_mask": [1],
        "max_new_tokens": 32,
        "temperature": 0.9,
        "top_p": 0.95,
        "do_sample": True,
    }
    assert deterministic == {
        "input_ids": [1],
        "attention_mask": [1],
        "max_new_tokens": 32,
        "do_sample": False,
    }


def test_model_xml_fragment_is_normalized() -> None:
    output = 'name="save_idea">{"title":"A","pitch":"B"}'

    assert _normalize_xml_tool_output(output) == '<function name="save_idea">{"title":"A","pitch":"B"}</function>'


def test_resolve_device_keeps_auto_and_explicit_cpu() -> None:
    assert _resolve_torch_device("auto", FakeTorch()) == "auto"
    assert _resolve_torch_device("cpu", FakeTorch(cuda=True, mps=True)) == "cpu"


def test_resolve_device_prefers_cuda_then_mps_then_cpu(monkeypatch) -> None:
    monkeypatch.delenv("ADVISOR_ZERO_GPU", raising=False)

    assert _best_local_device(FakeTorch(cuda=True, mps=True)) == "cuda"
    assert _best_local_device(FakeTorch(cuda=False, mps=True)) == "mps"
    assert _best_local_device(FakeTorch(cuda=False, mps=False)) == "cpu"
    # "local" resolves through the same ladder
    assert _resolve_torch_device("local", FakeTorch(cuda=False, mps=True)) == "mps"


def test_resolve_device_unavailable_request_degrades_gracefully(monkeypatch) -> None:
    monkeypatch.delenv("ADVISOR_ZERO_GPU", raising=False)

    # asking for cuda on an MPS-only box lands on mps, not a crash
    assert _resolve_torch_device("cuda", FakeTorch(cuda=False, mps=True)) == "mps"


def test_resolve_device_skips_cuda_under_zero_gpu(monkeypatch) -> None:
    # In a ZeroGPU main process there is no local CUDA, and probing it is avoided.
    monkeypatch.setenv("ADVISOR_ZERO_GPU", "1")

    assert _best_local_device(FakeTorch(cuda=True, mps=False)) == "cpu"


def test_runtime_status_reports_configured_device() -> None:
    planner = MiniCPMTransformersPlanner("openbmb/MiniCPM5-1B", device="local")

    assert runtime_status(planner).to_dict()["device"] == "local"
    assert runtime_status(RuleBasedPlanner()).to_dict()["device"] == ""